Skip to content

Instantly share code, notes, and snippets.

@chill-cod3r
Created April 11, 2025 11:53
Show Gist options
  • Save chill-cod3r/7bd0a62951182a4e2823765e6aa4891f to your computer and use it in GitHub Desktop.
Save chill-cod3r/7bd0a62951182a4e2823765e6aa4891f to your computer and use it in GitHub Desktop.
semi-decent stream handling
// basic stream handling (keeping it manual so pipeline doesn't automatically close the response)
const http = require("http");
const fs = require("fs");
const { Transform, PassThrough } = require("stream");
const { finished } = require("stream/promises");
const createUpperCaseTransform = () => {
return new Transform({
transform(chunk, encoding, callback) {
// callback(null, chunk.toString().toUpperCase());
callback(new Error('hooo'), null); // Uncomment to test error
},
});
};
const server = http.createServer(async (req, res) => {
console.log("Request received");
// Track if error handler has run to prevent multiple responses
let didErrorHandlerRun = false;
// Central error handler function
const handleError = (err) => {
if (didErrorHandlerRun) return; // Prevent multiple error responses
didErrorHandlerRun = true;
console.error("Stream error:", err.message);
// Only set headers if they haven't been sent yet
if (!res.headersSent) {
res.writeHead(500, {
"Content-Type": "text/plain",
});
}
res.end("Server error occurred");
};
try {
// Set response headers
res.writeHead(200, {
"Content-Type": "text/plain",
});
// Create our streams
const fileStream = fs.createReadStream("largefile.txt");
const upperCaseTransform = createUpperCaseTransform();
// Set up error handlers for source streams BEFORE piping
fileStream.on("error", handleError);
upperCaseTransform.on("error", handleError);
// Manual piping - res is the last stream
fileStream.pipe(upperCaseTransform).pipe(res);
// Use finished to properly wait for the response to complete
// This handles edge cases like client disconnects and response completion
await finished(res);
console.log("Stream completed successfully");
} catch (err) {
// This catch will handle:
// 1. Errors from the 'finished' promise
// 2. Any other async errors in the try block
handleError(err);
}
});
server.listen(3000, () => {
console.log("Server listening on port 3000");
});
// v2 - simulated retry
const http = require("http");
const fs = require("fs");
const { Transform, PassThrough } = require("stream");
const { finished } = require("stream/promises");
// Flags to control behavior
let shouldFailFirstAttempt = true;
let isFirstAttempt = true;
// Separate error simulation from implementation
const createUpperCaseTransform = () => {
return new Transform({
transform(chunk, encoding, callback) {
// Error simulation logic - separate from actual implementation
if (shouldFailFirstAttempt && isFirstAttempt) {
console.log("Simulating first attempt failure");
callback(new Error('Simulated first attempt failure'), null);
} else {
callback(null, chunk.toString().toUpperCase());
}
},
});
};
const createStreamPipeline = () => {
// Create our streams
const fileStream = fs.createReadStream("largefile.txt");
const upperCaseTransform = createUpperCaseTransform();
return { fileStream, upperCaseTransform };
};
const server = http.createServer(async (req, res) => {
console.log("Request received");
// Track if error handler has run to prevent multiple responses
let didErrorHandlerRun = false;
let retryCount = 0;
const MAX_RETRIES = 1; // Set to 1 for one retry attempt
// Central error handler function
const handleError = (err, shouldRetry = true) => {
if (didErrorHandlerRun) return; // Prevent multiple error responses
console.error("Stream error:", err.message);
// If we should retry and haven't exceeded retry limit
if (shouldRetry && retryCount < MAX_RETRIES) {
retryCount++;
console.log(`Retrying stream (attempt ${retryCount} of ${MAX_RETRIES})`);
isFirstAttempt = false; // No longer the first attempt
// Start over with the request handler logic
startStreamProcessing();
return;
}
// If we're here, we've either exceeded retries or shouldn't retry
didErrorHandlerRun = true;
// Only set headers if they haven't been sent yet
if (!res.headersSent) {
res.writeHead(500, {
"Content-Type": "text/plain",
});
}
res.end("Server error occurred");
};
// Extract the stream processing logic to allow for retries
const startStreamProcessing = () => {
try {
// Set response headers - only if they haven't been sent
if (!res.headersSent) {
res.writeHead(200, {
"Content-Type": "text/plain",
});
}
// Create new stream instances for this attempt
const { fileStream, upperCaseTransform } = createStreamPipeline();
// Set up error handlers for source streams BEFORE piping
fileStream.on("error", (err) => handleError(err));
upperCaseTransform.on("error", (err) => handleError(err));
// Manual piping - res is the last stream
fileStream.pipe(upperCaseTransform).pipe(res);
} catch (err) {
// This catch will handle synchronous errors in the setup
handleError(err);
}
};
// Start the initial processing
startStreamProcessing();
try {
// Single await for the response completion at the top level
await finished(res);
console.log("Stream completed successfully");
} catch (err) {
// This catches any errors that bubble up through the response stream
handleError(err, false); // false = don't retry at this level
}
});
server.listen(3000, () => {
console.log("Server listening on port 3000");
console.log("Configuration: shouldFailFirstAttempt =", shouldFailFirstAttempt);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment