Skip to content

Instantly share code, notes, and snippets.

@colinbes
Last active May 23, 2023 22:37
Show Gist options
  • Select an option

  • Save colinbes/131766db50f6c9fa53da738a22e06c82 to your computer and use it in GitHub Desktop.

Select an option

Save colinbes/131766db50f6c9fa53da738a22e06c82 to your computer and use it in GitHub Desktop.
cats-effect 3 and http4s route pushing SSE using streams backed by cats-effect queue
package com.bdesigns.sse.routes
import cats.effect.*
import cats.effect.implicits.*
import cats.effect.std.*
import cats.effect.std.syntax.all.*
import cats.implicits.*
import cats.syntax.all.*
import com.bdesigns.sse.core.MyAuth
import com.bdesigns.sse.domain.{ClockStreamedResponse, User}
import com.bdesigns.sse.domain.security.{AuthRoute, JwtToken}
import com.bdesigns.sse.domain.syntax.*
import com.bdesigns.sse.http.validation.syntax.*
import fs2.Stream
import io.circe.generic.auto.*
import io.circe.syntax.*
import org.http4s.circe.CirceEntityCodec.*
import org.http4s.dsl.Http4sDsl
import org.http4s.dsl.io.*
import org.http4s.headers.{`Cache-Control`, `Content-Type`}
import org.http4s.implicits.*
import org.http4s.server.Router
import org.http4s.syntax.literals.*
import org.http4s.*
import org.typelevel.log4cats.Logger
import tsec.authentication.{SecuredRequestHandler, TSecAuthService, TSecCookieSettings, asAuthed}
import java.util.Date
import scala.concurrent.duration.*
class EventRoute[F[_]: Concurrent: Temporal: Logger] private (auth: MyAuth[F]) extends HttpValidationDsl[F] {
private val authenticator = auth.authenticator
private val securedHandler: SecuredRequestHandler[F, String, User, JwtToken] =
SecuredRequestHandler(authenticator)
private val mainQueue =
for {
queue <- Queue.unbounded[F, String]
_ <- Logger[F].warn(s"1 code ${queue.hashCode}")
_ <- queue.offer("one")
_ <- Logger[F].warn(s"2 code ${queue.hashCode}")
} yield queue
private def sseStream: Stream[F, ServerSentEvent] =
val sse = mainQueue.flatMap { queue =>
println(s"waiting on take in sseStream ${queue.hashCode()}")
queue.take.map { item =>
println(s"fetch $item from sseStream")
ServerSentEvent(
data = Some(ClockStreamedResponse("date", item).asJsonString),
eventType = Some("myEvent"),
retry = Some(5000.seconds)
)
}
}
Stream.eval(sse)
private val eventRoute: AuthRoute[F] = {
case req @ GET -> Root asAuthed user =>
val headers = Headers(`Content-Type`(MediaType.`text/event-stream`), `Cache-Control`(CacheDirective.`no-store`))
Ok(sseStream, headers)
case POST -> Root / "msg" asAuthed _ =>
println("in msg")
for {
queue <- mainQueue
_ <- Logger[F].warn(s"mainQueue root/msg before code ${queue.hashCode}")
_ <- queue.offer(s"howdy1")
_ <- Logger[F].warn(s"mainQueue root/msg after code ${queue.hashCode}")
size <- queue.size
resp <- Ok(s"queue size is $size, code is ${queue.hashCode()}")
} yield resp
}
private val authedRoutes: HttpRoutes[F] = securedHandler.liftService(TSecAuthService(eventRoute))
val routes: HttpRoutes[F] = Router(
"/events" -> authedRoutes
)
}
object EventRoute {
def apply[F[_]: Concurrent: Temporal: Logger](auth: MyAuth[F]) =
new EventRoute[F](auth)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment