Created
April 11, 2025 10:39
-
-
Save ItsWendell/38ab75d75b077ebfb3120a9f0e1c9038 to your computer and use it in GitHub Desktop.
Zip multiple files in bulk from R2 in Cloudflare Workers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { type DeflateOptions, Zip, ZipDeflate } from "fflate"; | |
export interface R2ToZipOptions { | |
/** | |
* Maximum number of files to process concurrently. | |
* | |
* @default 2 | |
*/ | |
concurrency?: number; | |
/** | |
* Base filename for the zip archive (without .zip extension). | |
* | |
* @default random UUID | |
*/ | |
filename?: string; | |
/** | |
* fflate compression options for each file. | |
* | |
* @default { level: 1 } | |
*/ | |
deflateOptions?: DeflateOptions; | |
/** | |
* Size of chunks to read from R2 (in bytes) | |
* | |
* @default 256KB (262144 bytes) | |
*/ | |
chunkSize?: number; | |
/** | |
* Whether to log detailed debug information | |
* | |
* @default false | |
*/ | |
debug?: boolean; | |
/** | |
* Maximum time in ms to spend on a single file before skipping | |
* Set to 0 to disable timeout | |
* | |
* @default 0 (disabled) | |
*/ | |
fileTimeout?: number; | |
} | |
export const R2ToZip = async ( | |
bucket: R2Bucket, | |
keys: string[], | |
options: R2ToZipOptions = {} | |
) => { | |
const startMs = performance.now(); | |
const { | |
concurrency = 2, | |
filename = crypto.randomUUID(), | |
deflateOptions = { level: 1 }, | |
chunkSize = 1024 * 1024 * 2, // 2MB | |
debug = false, | |
fileTimeout = 0, | |
} = options; | |
const headers = new Headers({ | |
"Content-Type": "application/zip", | |
"Content-Disposition": `attachment; filename="${filename}.zip"`, | |
}); | |
let zip: Zip; | |
let filesProcessed = 0; | |
let filesDetected = 0; | |
let filesSucceeded = 0; | |
let bytesProcessed = 0; | |
// Only log if debug is enabled | |
const debugLog = debug | |
? (message: string) => console.debug(message) | |
: () => {}; | |
// Create new writeable / readable stream | |
const stream = new ReadableStream({ | |
async start(controller) { | |
zip = new Zip((err, chunk, final) => { | |
if (err) { | |
console.error("[R2ToZip:ZipStream] Error:", err); | |
controller.error(err); | |
return; | |
} | |
if (chunk) { | |
controller.enqueue(chunk); | |
} | |
if (final) { | |
const durationMs = performance.now() - startMs; | |
const totalMB = (bytesProcessed / (1024 * 1024)).toFixed(2); | |
debugLog( | |
`[R2ToZip] Zip stream finished. Files: ${filesSucceeded}/${filesDetected} (${filesProcessed}/${keys.length}). ${totalMB}MB processed. Duration: ${durationMs.toFixed(0)}ms` | |
); | |
controller.close(); | |
} | |
}); | |
let keyIndex = 0; | |
let activeTasks = 0; | |
let allKeysQueued = false; | |
const processNextKey = async () => { | |
if (keyIndex >= keys.length) { | |
allKeysQueued = true; | |
if (activeTasks === 0) { | |
zip.end(); | |
} | |
return; | |
} | |
const currentKeyIndex = keyIndex++; | |
// biome-ignore lint/style/noNonNullAssertion: Key is guaranteed to exist | |
const key = keys[currentKeyIndex]!; | |
activeTasks++; | |
try { | |
debugLog(`[R2ToZip] Processing: ${key}`); | |
// Use HEAD request first to check if file exists and get size | |
const obj = await bucket.head(key); | |
if (!obj) { | |
debugLog(`[R2ToZip] Object not found: ${key}`); | |
return; // Skip this key | |
} | |
filesDetected++; | |
const fileSize = obj.size || 0; | |
// For very small files, just get the whole object | |
if (fileSize < chunkSize) { | |
const fullObj = await bucket.get(key); | |
if (!fullObj || !fullObj.body) { | |
debugLog(`[R2ToZip] Failed to get object: ${key}`); | |
return; | |
} | |
const buffer = await fullObj.arrayBuffer(); | |
const zipFile = new ZipDeflate(key, deflateOptions); | |
zip.add(zipFile); | |
zipFile.push(new Uint8Array(buffer), true); | |
bytesProcessed += buffer.byteLength; | |
filesSucceeded++; | |
return; | |
} | |
// For larger files, stream with range requests | |
const zipFile = new ZipDeflate(key, deflateOptions); | |
zip.add(zipFile); | |
let offset = 0; | |
let timeoutId: number | undefined; | |
// Set timeout if enabled | |
if (fileTimeout > 0) { | |
timeoutId = setTimeout(() => { | |
console.warn(`[R2ToZip] Timeout processing file: ${key}`); | |
// Signal end of file to ZipDeflate - this may create a corrupted entry | |
// but at least won't hang the entire zip process | |
zipFile.push(new Uint8Array(0), true); | |
}, fileTimeout) as unknown as number; | |
} | |
while (offset < fileSize) { | |
const end = Math.min(offset + chunkSize - 1, fileSize - 1); | |
const range = { | |
length: end - offset + 1, | |
offset, | |
}; | |
const chunk = await bucket.get(key, { | |
range: { | |
length: end - offset + 1, | |
offset, | |
}, | |
}); | |
debugLog( | |
`[R2ToZip] Fetching chunk at range ${range.offset}-${range.length}` | |
); | |
if (!chunk || !chunk.body) { | |
console.error( | |
`[R2ToZip] Failed to get chunk at range ${range} for ${key}` | |
); | |
break; | |
} | |
const buffer = await chunk.arrayBuffer(); | |
zipFile.push(new Uint8Array(buffer)); | |
bytesProcessed += buffer.byteLength; | |
offset += buffer.byteLength; | |
debugLog( | |
`[R2ToZip] Processed chunk at range ${range.offset}-${range.length}. Offset: ${offset}. Chunk size: ${chunk.size} - bytes processed: ${bytesProcessed}` | |
); | |
// Allow other tasks to run | |
if (offset < fileSize) { | |
await new Promise((resolve) => setTimeout(resolve, 0)); | |
} | |
} | |
// Clear timeout if it was set | |
if (timeoutId) clearTimeout(timeoutId); | |
// Signal end of file | |
zipFile.push(new Uint8Array(0), true); | |
filesSucceeded++; | |
} catch (error) { | |
console.error(`[R2ToZip] Error processing ${key}:`, error); | |
} finally { | |
activeTasks--; | |
filesProcessed++; | |
// Start next task or end zip if all done | |
if (allKeysQueued && activeTasks === 0) { | |
zip.end(); | |
} else if (!allKeysQueued) { | |
// Use queueMicrotask instead of void to avoid blocking | |
queueMicrotask(() => processNextKey()); | |
} | |
} | |
}; | |
// Dynamically adjust concurrency based on number of keys | |
// For fewer keys, we can use less concurrency to reduce memory usage | |
const effectiveConcurrency = Math.min( | |
concurrency, | |
keys.length, | |
// For small jobs (<10 files), limit concurrency further | |
keys.length < 10 | |
? Math.max(1, Math.floor(keys.length / 2)) | |
: concurrency | |
); | |
debugLog( | |
`[R2ToZip] Starting ${effectiveConcurrency} tasks for ${keys.length} files...` | |
); | |
// Start initial tasks | |
for (let i = 0; i < effectiveConcurrency; i++) { | |
queueMicrotask(() => processNextKey()); | |
} | |
if (keys.length === 0) { | |
debugLog("[R2ToZip] No keys provided, ending zip immediately."); | |
zip.end(); | |
} | |
}, | |
cancel(reason) { | |
console.warn("[R2ToZip] Output stream cancelled.", reason); | |
zip?.end(); | |
}, | |
}); | |
return { | |
readableStream: stream, | |
headers, | |
}; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment