Skip to content

Instantly share code, notes, and snippets.

@evertheylen
Created October 4, 2024 19:53
Show Gist options
  • Save evertheylen/321885f2333db05ef134d6c4e2cdda74 to your computer and use it in GitHub Desktop.
Save evertheylen/321885f2333db05ef134d6c4e2cdda74 to your computer and use it in GitHub Desktop.
Drop-in replacement for `new DecompressionStream("gzip")`, which can handle multiple concatenated streams
// Drop-in replacement for `new DecompressionStream("gzip")`, which can handle multiple concatenated streams
// --> see https://github.com/whatwg/compression/issues/39
// Example usage:
// (await fetch("foobar.gz")).body.pipeThrough(multiGzipDecompressor()).pipeTo(...)
// Copyright 2024 Evert Heylen.
// License: MIT
// ------------------------------------------------
// gzip magic + compression method (deflate = 08, only supported one?)
const gzipHeader = [31, 139, 8] // 1f8b08
type In = ArrayBufferLike;
type Out = Uint8Array;
export class MultiGzipDecompressor implements Transformer<In, Out> {
/**
* Class to swap out the underlying DecompressionStream on TypeErrors, combined with a
* system that splits chunks if a (potential) gzip header is found.
*/
stream: DecompressionStream;
swaps: number = 0;
_writer!: WritableStreamDefaultWriter<ArrayBuffer | ArrayBufferView>;
_reader!: ReadableStreamDefaultReader<Uint8Array> ;
totalBytes = 0;
protected log(...args: any[]) {
console.log(`multiGzip[${this.swaps}]: `, ...args);
}
start(controller: TransformStreamDefaultController<Uint8Array>) {
this.stream = new DecompressionStream("gzip");
this.swaps = (this.swaps ?? 0) + 1;
this._writer = this.stream.writable.getWriter();
this._reader = this.stream.readable.getReader();
this.totalBytes = 0;
//const stopUsingDecomp = new Promise<{done: 'swap'}>((resolve) => { signalStopUsingDecomp = resolve });
(async () => {
try {
const r = this._reader;
while (true) {
//let res = await Promise.race([decompReader.read(), stopUsingDecomp]);
const { done, value } = await r.read();
if (done) {
//this.log(`got done`)
break;
}
//this.log(`enqueueing ${value.byteLength} bytes`, value.slice(0, 10));
controller.enqueue(value);
}
} catch (e) {
this.log(`read task got error`, e);
}
})();
}
// Split up chunks that (potentially contain GZIP headers)
async transform(chunk: Uint8Array, controller: TransformStreamDefaultController<Uint8Array>) {
if (! (chunk instanceof Uint8Array)) {
chunk = new Uint8Array(chunk);
}
//this.log(`transform got chunk of size ${chunk.byteLength}`);
while (true) {
// skip the first byte as we won't split if the first byte is the header anyway
let headerPos = 1;
const n = chunk.byteLength;
while (headerPos < n) {
headerPos = chunk.indexOf(gzipHeader[0], headerPos);
if (headerPos >= 0) { // if we find a match on the first byte ...
if (headerPos+1 < n) { // if the next byte is known ...
if (chunk[headerPos+1] === gzipHeader[1]) { // and it matches ...
if (headerPos+2 < n) { // and the next-next byte is known ...
if (chunk[headerPos+2] !== gzipHeader[2]) { // and the next-next byte is NOT a match ...
// then move headerPos beyond
headerPos += 1;
} else {
// full match!
break;
}
} else {
break; // unknown next-next byte => maybe a match
}
} else {
headerPos += 1; // next byte is NOT a match => reset headerPos
}
} else {
break; // unknown next byte => leave headerPos
}
} else {
// no match on even the first byte => it's never going to happen
headerPos = n;
}
}
if (headerPos >= n) {
// no match, send the chunk and stop iterating
//this.log(`no match, just sending the entire chunk of size ${chunk.byteLength}`, bytesToHex(chunk));
await this.transformCleanChunk(chunk, controller);
break;
} else {
// a header was matched! split the chunk!
await this.transformCleanChunk(chunk.slice(0, headerPos), controller);
//this.log(`SPLITTING at ${headerPos}`);
// in next iteration, we will try to find another header match
chunk = chunk.slice(headerPos);
}
}
};
// Handle chunks and swap DecompressionStreams when needed. "clean" refers to it never containing
// data from multiple concatenated gzip streams
async transformCleanChunk(chunk: Uint8Array, controller: TransformStreamDefaultController<Uint8Array>) {
try {
//this.log(`normal chunk transform of length ${chunk.byteLength} START`);
await this._writer.write(chunk);
//this.log(`normal chunk transform of length ${chunk.byteLength} END`);
this.totalBytes += chunk.byteLength;
} catch (e) {
if (e instanceof TypeError) {
// swap in a new DecompressionStream!
// this.log(`-------------------------------`)
// this.log(`SWAP! error`, e)
// this.log(` -> prev decomp has eaten ${this.totalBytes} bytes`);
this._writer.close();
this.start(controller);
//signalStopUsingDecomp({done: 'swap'})
// this.log(`rewrite chunk transform of length ${chunk.byteLength} START`);
await this._writer.write(chunk);
// this.log(`rewrite chunk transform of length ${chunk.byteLength} END`);
} else {
this.log(`multiGzip: rethrowing`, e);
throw e;
}
}
}
flush(controller: TransformStreamDefaultController<Uint8Array>) {
this._writer.close();
this._reader.releaseLock();
//this.log(`-------------------------------`)
//this.log(` -> final decomp has eaten ${this.totalBytes} bytes`);
}
}
export function multiGzipDecompressor() {
return new TransformStream(new MultiGzipDecompressor());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment