Last active
December 29, 2023 17:33
-
-
Save filmaj/f4a5f398b9701b34878503051efa8e34 to your computer and use it in GitHub Desktop.
arc.codes s3 download/upload utility in node with streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const arc = require('@architect/functions'); | |
const { GetObjectCommand, S3Client } = require('@aws-sdk/client-s3'); | |
const { Upload } = require('@aws-sdk/lib-storage'); | |
const client = new S3Client({}); | |
const CONSTANTS = require('@architect/shared/constants'); | |
const { timer } = require('@architect/shared/utils'); | |
const jsonlines = require('@jsonlines/core') | |
const stream = require('stream'); | |
/* | |
storing a 'cache' file in S3: a JSON-LD file, gzipped | |
at rest, this file contains array tuples [string, number] that represent repo names and TTL expiry for said repo | |
`download` downloads this file from S3 and converts it into a massive obj with key:value pairs of repo_name:ttl_expiry | |
`upload` does the reverse: converts the massive object into array tuples and zips it up and ships to S3 | |
*/ | |
module.exports = { | |
download: async () => { | |
let st = new Date().valueOf(); | |
const services = await arc.services(); | |
const bucket = services['storage-private'].appdata; | |
let et = new Date().valueOf(); | |
console.log(`DL [${timer(et, st)}] arc services loaded`); | |
const command = new GetObjectCommand({ | |
Bucket: bucket, | |
Key: CONSTANTS.S3_CACHE_FILE | |
}); | |
st = new Date().valueOf(); | |
const response = await client.send(command); | |
return new Promise((res, rej) => { | |
const repos = {}; | |
let n = 0; | |
let h = 0; | |
const now = Math.floor(new Date().valueOf() / 1000); | |
response.Body | |
.pipe(jsonlines.parse({ gzip: true })) | |
.on('data', (data) => { | |
// data is a tuple of [repo_name, ttl_expiry] | |
n += 1; | |
// filter out expired items from s3 cache | |
if (now < data[1]) { | |
h += 1; | |
repos[data[0]] = data[1]; | |
} | |
}) | |
.on('error', (err) => { | |
console.error('DL ERROR pulling s3 cache file!', err); | |
rej(err); | |
}) | |
.on('close', () => { | |
et = new Date().valueOf(); | |
console.log(`DL [${timer(et, st)}] repo cache read: ${h} valid (${Math.floor(h / n * 100 * 100) / 100}%)`); | |
res(repos); | |
}); | |
}); | |
}, | |
upload: async (repos) => { | |
let st = new Date().valueOf(); | |
const services = await arc.services(); | |
const Bucket = services['storage-private'].appdata; | |
let et = new Date().valueOf(); | |
console.log(`UL [${timer(et, st)}] arc services loaded`); | |
const reponames = Object.keys(repos); | |
const arr = []; | |
for (let i = 0; i < reponames.length; i++) { | |
const repo = reponames[i]; | |
arr.push([repo, repos[repo]]); | |
} | |
st = new Date().valueOf(); | |
const str = stream.Readable.from(arr) | |
.pipe(jsonlines.stringify({ gzip: true })) | |
.on('error', (err) => { | |
console.error('ERROR piping s3 cache file to s3', err); | |
throw err; | |
}) | |
.on('close', () => { | |
et = new Date().valueOf(); | |
console.log(`UL [${timer(et, st)}] s3 cache stream closed`); | |
}); | |
const parallelUploads3 = new Upload({ | |
client, | |
queueSize: 1, // optional concurrency configuration | |
leavePartsOnError: false, // optional manually handle dropped parts | |
params: { | |
Bucket, | |
Key: CONSTANTS.S3_CACHE_FILE, | |
Body: str | |
} | |
}); | |
const res = await parallelUploads3.done(); | |
et = new Date().valueOf(); | |
console.log(`UL [${timer(et, st)}] S3 HTTP ${res.$metadata.httpStatusCode}`); | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment