Skip to content

Instantly share code, notes, and snippets.

@mattiamanzati
Created July 23, 2024 20:58
Show Gist options
  • Save mattiamanzati/aa91e9218c4a6d352b85839309bd702d to your computer and use it in GitHub Desktop.
Save mattiamanzati/aa91e9218c4a6d352b85839309bd702d to your computer and use it in GitHub Desktop.
import * as Schema from "@effect/schema/Schema"
import type * as Serializable from "@effect/schema/Serializable"
import type { NonEmptyArray } from "effect/Array"
import * as Effect from "effect/Effect"
import type * as Exit from "effect/Exit"
import { pipe } from "effect/Function"
import * as Option from "effect/Option"
import * as Ref from "effect/Ref"
import type * as Scope from "effect/Scope"
/**
* Shims to self-contain the example.
*/
type AnyMessage = Schema.TaggedRequest.Any
interface Envelope<A extends AnyMessage> {
messageId: string
payload: A
}
interface Acknowledged {
_tag: "Acknowledged"
}
interface Processed<A extends AnyMessage> {
_tag: "Processed"
exit: Exit.Exit<Serializable.WithResult.Success<A>, Serializable.WithResult.Error<A>>
}
type MessageState<A extends AnyMessage> = Acknowledged | Processed<A>
/**
* Interface that describes a Mailbox
*/
interface Mailbox<A extends AnyMessage> {
/**
* This method is responsible for putting messages onto the mailbox.
* The envelope contains the messageId which is a globally unique key for the message.
* It can return that the message has just been putted into the queue (acknowledge)
* or if the message has been putted previously into the queue and now
* the result is available (processed).
* If the client asks for a message, it will be called multiple times.
* This method is idempotent.
*/
upsert(envelope: Envelope<A>): Effect.Effect<MessageState<A>>
/**
* Given a message pulled from the queue and an effect,
* 1) locks the message
* 2) runs the effect
* 3) mark the message as processed and stores the exit of the effect
*
* Imagine a sql-based implementation, will in a transaction
* 1) INSERT / LOCK FOR UPDATE
* 2) run the effect
* 3) SET processed = TRUE, exit = ....
* 4) close the transaction
*
* Based on implementation of the Mailbox, in case for some reason
* there are 2 contending fibers for the same messageId
* it can either:
* 1) dont care and run effect twice
* 2) first will lock and run, second will wait for lock to release, then run
* 3) first will lock and run, second will wait for lock then detect as process and skip
*/
process(
envelope: Envelope<A>,
effect: Effect.Effect<Serializable.WithResult.Success<A>, Serializable.WithResult.Error<A>>
): Effect.Effect<void>
/**
* Takes up to maxItems messages from the mailbox.
*
* Based on implementation, the take operation may skip in-process items (SKIP LOCKED)
* or just don't care and return them again (so that process may take care of it)
*/
take(maxItems: Option.Option<number>): Effect.Effect<NonEmptyArray<Envelope<A>>>
}
/**
* NOTES:
* 1) Do we also want the ability to tell if empty?
* 2) Using upsert assumes there is some kind of storage that
* will tell us between different requests if already processed
* and its result. But kinda the same we had before
* 3) Maybe we shuld split write side and read side? (or even just at type level)
* upsert should be called by the EntityManager
* process/take should be used inside the behaviour
* 4) This is a Mailbox, but we'll obviously need something like
* a "MailboxConstructor" service that is used to create mailboxes.
*/
interface RecipientBehaviourContext {
/**
* Closes the scope provided to the behaviour in a fiber outside of entity control
*/
forkShutdown: Effect.Effect<void>
}
export interface RecipientBehaviour<Msg extends AnyMessage, R> {
(mailbox: Mailbox<Msg>): Effect.Effect<void, never, R | RecipientBehaviourContext | Scope.Scope>
}
/**
* NOTES:
* 1) Do you want the lifespan of the entity to be forever?
* -> just don't call forkShutdown
* 2) Do you want the entity to shutdown after X-millis since last processed message?
* -> each call process(...) re-starts a timer that calls forkShutdown after X-millis
* 3) The function is called only once with the just created
* mailbox each time an entity is started.
* 4) This signature allows messages to be put in the mailbox before the entity
* is actually started... tbh this is actually good
* 5) This signature does'nt have any WithResult.Success<.....> madness
* this is because that's now responsability of the mailbox
* the simplest behaviour will be something along the lines of:
*/
class Increment extends Schema.TaggedRequest<Increment>()("Increment", Schema.Never, Schema.Void, {}) {}
class GetCurrent extends Schema.TaggedRequest<GetCurrent>()("GetCurrent", Schema.Never, Schema.Number, {}) {}
type CounterMessage = Increment | GetCurrent
export const CounterBehaviour: RecipientBehaviour<CounterMessage, never> = (mailbox) =>
pipe(
Ref.make(0),
Effect.flatMap((counterRef) =>
pipe(
mailbox.take(Option.some(1)),
Effect.flatMap(([envelope]) => {
switch (envelope.payload._tag) {
case "Increment":
return mailbox.process(envelope, Ref.update(counterRef, (_) => _ + 1))
case "GetCurrent":
return mailbox.process(envelope, Ref.get(counterRef))
}
}),
Effect.forever
)
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment