Created
May 21, 2024 17:46
-
-
Save titouancreach/bfd6b74cec913ac97079bdd0c655bd24 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { NodeRuntime } from "@effect/platform-node"; | |
import { Schema } from "@effect/schema"; | |
import { SqsLive, SqsService } from "@repo/shared/src/Sqs"; | |
import { Chunk, Console, Effect, Option, Stream, pipe } from "effect"; | |
import { logError, logInfo } from "effect/Effect"; | |
import { Consumer } from "sqs-consumer"; | |
function makeStream<T, U>({ | |
queueName, | |
schema, | |
}: { queueName: string; schema: Schema.Schema<T, U> }) { | |
return Effect.gen(function* () { | |
const sqsService = yield* SqsService; | |
const url = yield* sqsService.createQueue({ queueName }); | |
const stream = Stream.async<T>((emit) => { | |
const startConsumerEffect = Effect.gen(function* () { | |
const consumer = Consumer.create({ | |
queueUrl: url, | |
handleMessage: async (message) => { | |
const sendMessageToStreamEffect = Effect.gen(function* () { | |
const chunk = yield* pipe( | |
message.Body, | |
Option.fromNullable, | |
Option.map(JSON.parse), // probably better | |
Effect.flatMap(Schema.decodeUnknown(schema)), | |
); | |
yield* logInfo(`[${queueName}] ===> `, chunk); | |
emit(Effect.succeed(Chunk.of(chunk))); | |
}); | |
// let it throw here. sqs-consumer will handle it and push the message back to the queue | |
await Effect.runPromise( | |
sendMessageToStreamEffect | |
.pipe( | |
Effect.tapErrorTag("ParseError", (err) => | |
// Since ParseError.message is already well formated, don't break the formating with logError. | |
Console.error(err.message), | |
), | |
) | |
.pipe(Effect.tapErrorTag("NoSuchElementException", logError)), | |
); | |
}, | |
}); | |
yield* logInfo("Starting consumer for queue: ", queueName); | |
consumer.start(); | |
}); | |
Effect.runSync(startConsumerEffect); | |
}); | |
return stream; | |
}).pipe(Effect.provide(SqsLive)); | |
} | |
const effect = makeStream({ | |
queueName: "MyLazyQueueName", | |
schema: Schema.Struct({ | |
numeroDeTel: Schema.String, | |
}), | |
}); | |
const program = effect.pipe( | |
Effect.andThen((stream) => | |
stream.pipe( | |
Stream.runForEach((chunk) => | |
Console.log("received message from the stream", chunk.numeroDeTel), | |
), | |
), | |
), | |
); | |
NodeRuntime.runMain(pipe(Effect.all([program]), Effect.andThen(Effect.never))); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment