Skip to content

Instantly share code, notes, and snippets.

@colinbes
Created March 5, 2021 14:44
Show Gist options
  • Select an option

  • Save colinbes/f762268ea18e0373fc9d87b03ec07ef5 to your computer and use it in GitHub Desktop.

Select an option

Save colinbes/f762268ea18e0373fc9d87b03ec07ef5 to your computer and use it in GitHub Desktop.
trait RedisKeyEvents extends StreamingActor {
implicit val actorSystem: ActorSystem[IoTSupervisor.IoTCommand]
implicit val executionContext: ExecutionContextExecutor
implicit val timeout: Timeout
val redisUri: URI = RedisConnector.getConnectionUri
val logger: Logger
val redisClient: RedisClient = {
logger.info(s"Connecting to redis at $redisUri")
new RedisClient(redisUri)
}
val redisSubscriberFuture: Future[ActorRef[Msg]] = actorSystem.ask(ref => IoTSupervisor.IoTSpawn(RedisSubscriber(redisUri), "redis-sub", ref))
for {
redisSubscriberActor <- redisSubscriberFuture
} {
redisSubscriberActor ! Register(callback)
redisSubscriberActor ! Subscribe(Array(EVENT_SET, EVENT_EXPIRED))
}
def callback(pubsub: PubSubMessage): Unit = pubsub match {
case S(channel, no) => logger.info("subscribed to " + channel + " and count = " + no)
case U(channel, no) => logger.info("unsubscribed from " + channel + " and count = " + no)
case M(EVENT_EXPIRED, "name") =>
streamingActor ! StreamingEventSourceActor.UserInfoChange("---", online = false)
case M(EVENT_SET, keyname) =>
val username: String = redisClient.get(keyname).getOrElse("---")
streamingActor ! StreamingEventSourceActor.UserInfoChange(username, online = true)
case _ => logger.info(s"Unknown callback pubsub ${pubsub.toString}")
}
}
package com.bdesigns.akka.actors
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import com.redis.{PubSubMessage, RedisClient}
import java.net.URI
object RedisSubscriber {
val EVENT_SET = "__keyevent@0__:set"
val EVENT_DEL = "__keyevent@0__:del"
val EVENT_EXPIRED = "__keyevent@0__:expired"
val EVENT_EVICTED = "__keyevent@0__:evicted"
sealed trait Msg
case class Subscribe(channels: Array[String]) extends Msg
case class Register(callback: PubSubMessage => Any) extends Msg
case class Unsubscribe(channels: Array[String]) extends Msg
case object UnsubscribeAll extends Msg
case class Publish(channel: String, msg: String) extends Msg
def apply(redisUrl: URI): Behavior[Msg] = initial(new RedisClient(redisUrl))
private def initial(redisClient: RedisClient): Behavior[Msg] = {
Behaviors.receive { (_, message) =>
message match {
case Register(cb) =>
process(redisClient, cb)
case _ =>
Behaviors.unhandled
}
}
}
private def process(redisClient: RedisClient, callback: PubSubMessage => Any): Behavior[Msg] = {
Behaviors.receive { (_, message) =>
message match {
case Subscribe(channels) =>
redisClient.subscribe(channels.head, channels.tail:_*)(callback)
Behaviors.same
case Unsubscribe(channels) =>
redisClient.unsubscribe(channels.head, channels.tail:_*)
Behaviors.same
case UnsubscribeAll =>
redisClient.unsubscribe()
Behaviors.same
case _ =>
Behaviors.unhandled
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment