Created
December 19, 2024 18:33
-
-
Save mpellegrini/301fcbb9112645fdcb5822f9d7272b96 to your computer and use it in GitHub Desktop.
ETL Pipeline Abstraction in Typescript
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export type Extract<T> = (limit: number, offset: number) => T[] | Promise<T[]> | |
export type Transform<TSource, TDest = TSource> = (result: TSource[]) => TDest[] | Promise<TDest[]> | |
export type Load<T> = (result: T[]) => void | Promise<void> | |
export interface PipelineStep<TSource, TDest = TSource> { | |
name: string | |
extract: Extract<TSource> | |
transform: Transform<TSource, TDest> | |
load: Load<TDest> | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { PipelineStep } from './api.js' | |
export async function executePipeline<T>( | |
pipelineStep: PipelineStep<T>, | |
limit: number, | |
offset = 0, | |
): Promise<void> { | |
// Call the extract function | |
let extracted: T[] | Promise<T[]> = pipelineStep.extract(limit, offset) | |
if (extracted instanceof Promise) { | |
extracted = await extracted | |
} | |
if (extracted.length === 0) { | |
console.log(`Finished processing ${pipelineStep.name}`) | |
return | |
} | |
// Call the transform function | |
let transformed: T[] | Promise<T[]> = pipelineStep.transform(extracted) | |
if (transformed instanceof Promise) { | |
transformed = await transformed | |
} | |
// Call the load function | |
const loaded: void | Promise<void> = pipelineStep.load(transformed) | |
if (loaded instanceof Promise) { | |
await loaded | |
} | |
offset += extracted.length | |
console.log(`Getting another ${extracted.length} more ${pipelineStep.name}`) | |
await executePipeline(pipelineStep, limit, offset) | |
} | |
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- not sure how else to do this yet | |
const pipelineSteps: PipelineStep<any>[] = [ | |
stepImpl | |
] | |
for (const pipelineStep of pipelineSteps) { | |
console.log(`Processing ETL for ${pipelineStep.name}`) | |
await executePipeline(pipelineStep, 1000) | |
} | |
connector.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment