Skip to content

Instantly share code, notes, and snippets.

@stoneream
Created February 1, 2025 09:04
Show Gist options
  • Save stoneream/62903f913780df60bd1376a38ba35fa2 to your computer and use it in GitHub Desktop.
Save stoneream/62903f913780df60bd1376a38ba35fa2 to your computer and use it in GitHub Desktop.
import org.apache.pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model.StatusCodes
import org.apache.pekko.http.scaladsl.model.ws.{Message, TextMessage, WebSocketUpgradeResponse}
import org.apache.pekko.stream.scaladsl.{Keep, Sink, SinkQueueWithCancel, Source, SourceQueueWithComplete}
import org.apache.pekko.stream.{OverflowStrategy, QueueOfferResult}
import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatestplus.play.guice.GuiceOneServerPerTest
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
trait WebSocketTestHelper {
self: ScalaTestWithActorTestKit & AnyFunSuiteLike & GuiceOneServerPerTest =>
private val timeout = testKit.timeout.duration
// TODO ContextFunctionsが使えるようになるとテスト側で書いているgiven句を削除できるようになるはず (現在はNightly)
// https://dotty.epfl.ch/docs/reference/contextual/context-functions-spec.html
def withWebSocketClient[T](url: String)(testCode: (SourceQueueWithComplete[Message], SinkQueueWithCancel[Message]) => Future[T]): T = {
val flow = Http().webSocketClientFlow(url)
val ((queue, upgradeResponse), resultSink) =
Source
.queue[Message](bufferSize = 1024, OverflowStrategy.fail)
.viaMat(flow)(Keep.both)
.toMat(Sink.queue[Message]())(Keep.both)
.run()
val upgrade = Await.result[WebSocketUpgradeResponse](upgradeResponse, timeout)
assert(upgrade.response.status == StatusCodes.SwitchingProtocols)
val result = Await.result(
testCode(queue, resultSink),
Duration.Inf
)
queue.complete()
result
}
def sendRequest(payload: String)(using queue: SourceQueueWithComplete[Message]): Future[Unit] = {
queue
.offer(TextMessage(payload))
.map {
case QueueOfferResult.Enqueued => ()
case QueueOfferResult.Dropped => fail("Queue Dropped")
case QueueOfferResult.QueueClosed => fail("Queue Closed")
case QueueOfferResult.Failure(ex) => fail(ex)
}(testKit.system.executionContext)
}
def receiveResponse()(using resultSink: SinkQueueWithCancel[Message]): Future[String] = {
resultSink
.pull()
.map {
case Some(TextMessage.Strict(text)) => text
case other => fail(s"Unexpected message ($other)")
}(testKit.system.executionContext)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment