Last active
January 15, 2016 11:22
-
-
Save janakagamini/09b1195a43bc91a5a670 to your computer and use it in GitHub Desktop.
A simple websocket server to process incoming ecg readings and write to InfluxDB using akka reactive streams. Client to produce ecg data can be found at: https://github.com/janakagamini/ecg_ws_source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example | |
import java.util.concurrent.TimeoutException | |
import akka.actor.ActorSystem | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.model.HttpMethods._ | |
import akka.http.scaladsl.model._ | |
import akka.http.scaladsl.model.ws._ | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl._ | |
import com.paulgoldbaum.influxdbclient.Parameter.Precision | |
import com.paulgoldbaum.influxdbclient.{InfluxDB, Point} | |
import org.json4s._ | |
import org.json4s.native.JsonMethods._ | |
import scala.concurrent.Await | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.duration._ | |
import scala.util.{Failure, Success} | |
case class Reading(timestamp: Long, lead1: Double, lead2: Double) | |
object WSRequest { | |
def unapply(req: HttpRequest): Option[HttpRequest] = { | |
if (req.header[UpgradeToWebsocket].isDefined) { | |
req.header[UpgradeToWebsocket] match { | |
case Some(upgrade) => Some(req) | |
case None => None | |
} | |
} else None | |
} | |
} | |
object ApplicationMain extends App { | |
implicit val actorSystem = ActorSystem() | |
implicit val flowMaterializer = ActorMaterializer() | |
val binding = Http().bindAndHandleSync({ | |
case WSRequest(req@HttpRequest(GET, Uri.Path("/stream"), _, _, _)) => handleWith(req, dataFlow) | |
case _: HttpRequest => HttpResponse(400, entity = "Invalid websocket request") | |
}, interface = "localhost", port = 9003) | |
def handleWith(req: HttpRequest, flow: Flow[Message, Message, Unit]) = req.header[UpgradeToWebsocket].get.handleMessages(flow) | |
// binding is a future, we assume it's ready within a second or timeout | |
try { | |
Await.result(binding, 1 second) | |
println("Server online at http://localhost:9003") | |
} catch { | |
case exc: TimeoutException => | |
println("Server took to long to startup, shutting down") | |
actorSystem.shutdown() | |
} | |
// Pipeline of the data | |
def dataFlow: Flow[Message, Message, Unit] = Flow[Message].map { | |
case TextMessage.Strict(msg) => msg | |
} | |
.via(parseJson) | |
.via(convertToPoint) | |
.via(collectPoints) | |
.via(writeToInflux) | |
// Add more processing blocks here? | |
.map { | |
// No need to return anything to the websocket | |
_ => TextMessage(Source.empty) | |
} | |
def parseJson = Flow[String].map { | |
s => | |
implicit val formats = DefaultFormats | |
parse(s).extract[Reading] | |
} | |
// CHANGE HERE | |
val stream_id = "janakaOptiPlex9020" | |
def convertToPoint = Flow[Reading].map(p => Point("ecg", p.timestamp).addTag("stream_id", stream_id).addField("lead1", p.lead1).addField("lead2", p.lead2)) | |
def collectPoints = Flow[Point].grouped(2000) | |
// CHANGE HERE | |
val influx_db = InfluxDB.connect("localhost", 8086) | |
val database = influx_db.selectDatabase("vsigns") | |
def writeToInflux = Flow[Seq[Point]].map { | |
points => { | |
val r = database.bulkWrite(points.toList, precision = Precision.MILLISECONDS) | |
r.onComplete { | |
case Success(_) => println("Successfully posted data") | |
case Failure(e) => println(e.getMessage) | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "minimal-akka-scala-seed" | |
version := "1.0" | |
scalaVersion := "2.11.7" | |
resolvers += "Gamlor-Repo" at "https://github.com/gamlerhart/gamlor-mvn/raw/master/snapshots" | |
libraryDependencies ++= Seq( | |
"com.typesafe.akka" % "akka-stream-experimental_2.11" % "2.0.1", | |
"com.typesafe.akka" % "akka-http-core-experimental_2.11" % "2.0.1", | |
"com.typesafe.akka" % "akka-http-experimental_2.11" % "2.0.1", | |
"org.json4s" %% "json4s-native" % "3.3.0", | |
"com.paulgoldbaum" %% "scala-influxdb-client" % "0.4.1" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment