Created
April 4, 2025 04:44
-
-
Save sdmcraft/2bfa59714602a5aadfd58da38900cd49 to your computer and use it in GitHub Desktop.
A toy example to explain stream processing paradigm
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Stream { | |
constructor() { | |
this.source = []; // Queue to hold incoming events | |
this.operations = []; // List of transformations (filter, map) | |
this.reduction = null; // Stores reduction function (if any) | |
this.isProcessing = false; // Prevents duplicate processing | |
this.totals = {}; // Object to track totals by category | |
} | |
/** | |
* Add new data to the stream in real-time. | |
* @param {Object} item - New data item. | |
*/ | |
push(item) { | |
this.source.push(item); | |
this.process(); // Process each item as it arrives | |
} | |
/** | |
* Add a filter transformation. | |
* @param {Function} predicate - Function to filter items. | |
* @returns {Stream} - Returns the modified stream. | |
*/ | |
filter(predicate) { | |
this.operations.push((data) => data.filter(predicate)); | |
return this; | |
} | |
/** | |
* Add a map transformation. | |
* @param {Function} transformFn - Function to transform items. | |
* @returns {Stream} - Returns the modified stream. | |
*/ | |
map(transformFn) { | |
this.operations.push((data) => data.map(transformFn)); | |
return this; | |
} | |
/** | |
* Add a reduction operation. | |
* @param {Function} reducer - Reducer function. | |
* @param {*} initialValue - Initial value for reduction. | |
* @returns {Stream} - Returns the modified stream. | |
*/ | |
reduce(reducer, initialValue) { | |
this.reduction = { | |
reducer, | |
initialValue, | |
state: initialValue // Store accumulated state (can be a simple value or complex object) | |
}; | |
return this; | |
} | |
/** | |
* Process the stream in real-time. | |
*/ | |
async process() { | |
if (this.isProcessing) return; // Prevent re-entrant processing | |
this.isProcessing = true; | |
while (this.source.length > 0) { | |
const batch = [this.source.shift()]; // Process one item at a time | |
let processedData = this.operations.reduce((data, operation) => operation(data), batch); | |
if (this.reduction && processedData.length > 0) { | |
// Apply the reducer to update the state | |
const item = processedData[0]; | |
this.reduction.state = this.reduction.reducer(this.reduction.state, item); | |
// Log the current reduction state | |
console.log("Reduction State:", this.reduction.state); | |
} else { | |
console.log("Processed Item:", processedData[0]); | |
} | |
await new Promise((resolve) => setTimeout(resolve, 100)); // Simulate async processing delay | |
} | |
this.isProcessing = false; // Allow future processing | |
} | |
} | |
// Create a real-time Stream Processor | |
const streamProcessor = new Stream(); | |
// Define the data pipeline | |
streamProcessor | |
.filter(item => item.category === 'A' || item.category === 'B') // Process both categories | |
.map(item => ({ ...item, value: item.value * 2 })) // Double the value | |
.reduce((state, item) => { | |
// Create a more flexible reducer that handles categories within itself | |
const category = item.category; | |
// Clone the state to avoid mutations | |
const newState = {...state}; | |
// Initialize category if needed | |
if (!newState[category]) { | |
newState[category] = 0; | |
} | |
// Update the total for this category | |
newState[category] += item.value; | |
// Add a total across all categories | |
newState.total = (newState.total || 0) + item.value; | |
return newState; | |
}, {}); // Start with an empty object as state | |
// Simulate real-time data streaming | |
setInterval(() => { | |
const randomEvent = { | |
id: Math.floor(Math.random() * 100), | |
value: Math.floor(Math.random() * 50) + 1, | |
category: Math.random() > 0.5 ? 'A' : 'B' | |
}; | |
console.log("Incoming Event:", randomEvent); | |
streamProcessor.push(randomEvent); | |
}, 500); // New event every 500ms |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment