Skip to content

Instantly share code, notes, and snippets.

@dmmulroy
Last active April 23, 2024 13:56
Show Gist options
  • Save dmmulroy/fcf306edfa542b90e2765799f2751750 to your computer and use it in GitHub Desktop.
Save dmmulroy/fcf306edfa542b90e2765799f2751750 to your computer and use it in GitHub Desktop.
# glitch_eventsub.gleam
websocket_message.NotificationMessage(metadata, payload) -> {
// process.send(state.websocket_message_mailbox, message)
io.println("")
io.println("eventsub client - notification message")
io.debug(payload)
io.println("")
let assert Ok(subject) =
dict.get(state.subscriptions, metadata.subscription_type)
process.send(subject, payload.event) # <--- this where it fails with =WARNING REPORT==== 23-Apr-2024::09:48:23.647225 ===
Actor discarding unexpected message: #(//erl(#Ref<0.159261793.3941335041.77202>)
actor.continue(state)
}
import gleam/dict.{type Dict}
import gleam/erlang/process.{type Selector, type Subject}
import gleam/function
import gleam/io
import gleam/option.{type Option, None, Some}
import gleam/otp/actor.{type StartError}
import gleam/otp/supervisor
import gleam/result
import glitch/api/client.{type Client as ApiClient} as api_client
import glitch/api/eventsub.{
type CreateEventSubscriptionRequest, CreateEventSubscriptionRequest,
}
import glitch/error.{type TwitchError}
import glitch/eventsub/websocket_message.{type WebSocketMessage}
import glitch/eventsub/websocket_server.{type WebSocketServer}
import glitch/extended/function_ext.{ignore}
import glitch/types/event.{type Event}
import glitch/types/subscription.{type SubscriptionType}
pub type Client =
Subject(Message)
pub opaque type ClientState {
State(
api_client: ApiClient,
websocket_message_mailbox: Subject(WebSocketMessage),
session_id: Option(String),
subscriptions: Dict(SubscriptionType, Subject(Event)),
status: Status,
websocket_server: WebSocketServer,
)
}
pub opaque type Message {
Subscribe(to: SubscriptionType, mailbox: Subject(Event))
GetState(Subject(ClientState))
WebSocketMessage(WebSocketMessage)
Start
Stop
}
pub type Status {
Running
Stopped
}
pub fn new(
api_client api_client: ApiClient,
websocket_mailbox parent_websocket_message_mailbox: Subject(WebSocketMessage),
parent_subject parent_subject: Subject(Client),
) -> fn(Nil) -> Result(Client, StartError) {
fn(_) {
actor.start_spec(actor.Spec(
init: fn() {
// Allows parent to send messages to this process
let self = process.new_subject()
process.send(parent_subject, self)
// Receives messages from parent
let selector: Selector(Message) =
process.selecting(process.new_selector(), self, function.identity)
// Receive websocket_servers' subject from
let child_subject_mailbox = process.new_subject()
// Weebsocket server communicates to this process via this subject
let websocket_message_mailbox = process.new_subject()
// // Lets us send messages to the websocket_server
let start_websocket_server =
websocket_server.new(child_subject_mailbox, websocket_message_mailbox)
let websocket_server_child_spec =
supervisor.worker(start_websocket_server)
let assert Ok(_supervisor_subject) =
supervisor.start(supervisor.add(_, websocket_server_child_spec))
let assert Ok(websocket_server) =
process.receive(child_subject_mailbox, 1000)
let initial_state =
State(
api_client,
parent_websocket_message_mailbox,
None,
dict.new(),
Stopped,
websocket_server,
)
let websocket_mailbox_selector =
process.selecting(
process.new_selector(),
websocket_message_mailbox,
WebSocketMessage,
)
let merged_selector =
process.merge_selector(selector, websocket_mailbox_selector)
actor.Ready(initial_state, merged_selector)
},
init_timeout: 1000,
loop: handle_message,
))
}
}
pub fn start(client: Client) {
actor.send(client, Start)
}
pub fn subscribe(
client: Client,
subscription_request: CreateEventSubscriptionRequest,
subscription_event_mailbox,
) -> Result(Nil, TwitchError) {
let state = actor.call(client, GetState(_), 1000)
use _ <- result.try(eventsub.create_eventsub_subscription(
state.api_client,
subscription_request,
))
actor.send(
client,
Subscribe(
subscription_request.subscription_type,
subscription_event_mailbox,
),
)
Ok(Nil)
}
pub fn websocket_message_mailbox(client: Client) -> Subject(WebSocketMessage) {
let state = actor.call(client, GetState, 1000)
state.websocket_message_mailbox
}
pub fn session_id(client: Client) -> Result(String, Nil) {
let state = actor.call(client, GetState, 1000)
option.to_result(state.session_id, Nil)
}
pub fn api_client(client: Client) -> api_client.Client {
let state = actor.call(client, GetState, 1000)
state.api_client
}
fn handle_message(message: Message, state: ClientState) {
case message {
GetState(state_mailbox) -> {
process.send(state_mailbox, state)
actor.continue(state)
}
Subscribe(subscription_type, mailbox) -> {
let subscriptions =
dict.insert(state.subscriptions, subscription_type, mailbox)
actor.continue(State(..state, subscriptions: subscriptions))
}
WebSocketMessage(message) -> handle_websocket_message(state, message)
Start -> {
websocket_server.start(state.websocket_server)
actor.continue(State(..state, status: Running))
}
Stop -> panic as "todo"
}
}
fn handle_websocket_message(state: ClientState, message: WebSocketMessage) {
case message {
websocket_message.Close -> {
// TODO SHUTDOWN
process.send(state.websocket_message_mailbox, message)
actor.continue(state)
}
websocket_message.NotificationMessage(metadata, payload) -> {
// process.send(state.websocket_message_mailbox, message)
io.println("")
io.println("eventsub client - notification message")
io.debug(payload)
io.println("")
let assert Ok(subject) =
dict.get(state.subscriptions, metadata.subscription_type)
process.send(subject, payload.event)
actor.continue(state)
}
websocket_message.WelcomeMessage(_metadata, payload) -> {
process.send(state.websocket_message_mailbox, message)
let session_id = payload.session.id
actor.continue(State(..state, session_id: Some(session_id)))
}
_ -> {
actor.continue(state)
}
}
}
import constants
import gleam/erlang/process.{type Selector, type Subject}
import gleam/function
import gleam/io
import gleam/option.{None, Some}
import gleam/otp/actor
import glitch/api/client.{type Client as ApiClient}
import glitch/api/eventsub.{CreateEventSubscriptionRequest}
import glitch/types/condition
import glitch/types/event
import glitch/types/subscription
import glitch/types/transport
pub type KeyboardRedemption =
Subject(Message)
pub type State {
State(
api_client: ApiClient,
selector: Selector(Message),
mailbox: Subject(event.Event),
)
}
pub type Message {
GetState(Subject(State))
Subscribe(subscription_event_mailbox: Subject(event.Event))
Redemption(event.Event)
}
pub fn new(api_client: ApiClient, parent_subject) {
fn(_) {
actor.start_spec(actor.Spec(
init: fn() {
let self = process.new_subject()
process.send(parent_subject, self)
let selector =
process.new_selector()
|> process.selecting(self, function.identity)
actor.Ready(
State(api_client, selector, process.new_subject()),
selector,
)
},
init_timeout: 1000,
loop: handle_message,
))
}
}
pub fn event_mailbox(self) {
let state = actor.call(self, GetState, 1000)
state.mailbox
}
pub fn subscribe(self, subscription_event_mailbox: Subject(event.Event)) {
io.println("HERE FAM: SUB")
actor.send(self, Subscribe(subscription_event_mailbox))
}
pub fn new_subscription_request(session_id: String) {
CreateEventSubscriptionRequest(
subscription.ChannelPointsCustomRewardRedemptionAdd,
"1",
condition.Condition(
broadcaster_user_id: Some(constants.twitch_id),
from_broadcaster_id: None,
moderator_user_id: None,
to_broadcaster_id_user_id: None,
reward_id: Some(constants.keyboard_redemption_id),
client_id: None,
extension_client_id: None,
user_id: None,
),
transport.Transport(
method: transport.WebSocket,
callback: None,
secret: None,
session_id: Some(session_id),
connected_at: None,
disconnected_at: None,
conduit_id: None,
),
)
}
fn handle_message(message, state: State) {
io.println("kb handle_message")
io.debug(message)
case message {
GetState(state_mailbox) -> {
process.send(state_mailbox, state)
actor.continue(state)
}
Subscribe(subscription_event_mailbox) -> {
io.println("subscribing to event")
let selector =
process.selecting(
process.new_selector(),
subscription_event_mailbox,
Redemption,
)
|> process.merge_selector(state.selector)
State(..state, selector: selector)
|> actor.continue
|> actor.with_selector(state.selector)
}
Redemption(event) -> {
io.println("Receieved redemption")
io.debug(event)
actor.continue(state)
}
}
}
import gleam/erlang/process.{type Selector}
import gleam/io
import gleam/option.{type Option, None, Some}
import gleam/otp/actor
import gleam/otp/supervisor
import glitch/eventsub/client.{type Client as EventSubClient} as eventsub_client
import glitch/eventsub/websocket_message.{type WebSocketMessage, WelcomeMessage}
import glitch_helpers
import keyboard_redemption.{type KeyboardRedemption}
type State {
State(
eventsub: EventSubClient,
keyboard_redemption: KeyboardRedemption,
session_id: Option(String),
selector: Selector(Message),
)
}
pub type Message {
WebSocketMessage(WebSocketMessage)
}
pub fn main() {
let assert Ok(_) =
actor.start_spec(actor.Spec(
init: fn() {
let eventsub_subject_mailbox = process.new_subject()
let keyboard_redemption_subject_mailbox = process.new_subject()
let websocket_message_mailbox = process.new_subject()
let websocket_message_mailbox_selector =
process.selecting(
process.new_selector(),
websocket_message_mailbox,
WebSocketMessage,
)
let api_client = glitch_helpers.new_api_client()
let start_eventsub =
eventsub_client.new(
api_client,
websocket_message_mailbox,
eventsub_subject_mailbox,
)
let eventsub_child_spec = supervisor.worker(start_eventsub)
let start_keyboard_redemption =
keyboard_redemption.new(
api_client,
keyboard_redemption_subject_mailbox,
)
let keyboard_redemption_child_spec =
supervisor.worker(start_keyboard_redemption)
let children = fn(children) {
children
|> supervisor.add(eventsub_child_spec)
|> supervisor.add(keyboard_redemption_child_spec)
}
let assert Ok(_supervisor_subject) = supervisor.start(children)
let assert Ok(eventsub) =
process.receive(eventsub_subject_mailbox, 1000)
let assert Ok(keyboard_redemption) =
process.receive(keyboard_redemption_subject_mailbox, 1000)
eventsub_client.start(eventsub)
actor.Ready(
State(
eventsub,
keyboard_redemption,
None,
websocket_message_mailbox_selector,
),
websocket_message_mailbox_selector,
)
},
init_timeout: 1000,
loop: handle_message,
))
process.sleep_forever()
}
fn handle_message(message, state: State) {
io.println("supervisor - handle_message")
io.debug(message)
case message {
WebSocketMessage(websocket_message) -> {
case websocket_message {
websocket_message.WelcomeMessage(_, payload) -> {
let session_id = payload.session.id
let subscription_request =
keyboard_redemption.new_subscription_request(session_id)
let mailbox =
keyboard_redemption.event_mailbox(state.keyboard_redemption)
let assert Ok(_) =
eventsub_client.subscribe(
state.eventsub,
subscription_request,
mailbox,
)
keyboard_redemption.subscribe(state.keyboard_redemption, mailbox)
actor.continue(State(..state, session_id: Some(session_id)))
}
_ -> actor.continue(state)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment