Created
November 16, 2016 13:47
-
-
Save muller/d03c05ffaddecf6dab7c0b564e74c6f7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.security.cert.X509Certificate | |
import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager} | |
import akka.http.scaladsl.ConnectionContext | |
object NoSSL { | |
def apply() = { | |
object NoCheckX509TrustManager extends X509TrustManager { | |
override def checkClientTrusted(chain: Array[X509Certificate], authType: String) = () | |
override def checkServerTrusted(chain: Array[X509Certificate], authType: String) = () | |
override def getAcceptedIssuers = Array[X509Certificate]() | |
} | |
val context = SSLContext.getInstance("TLS") | |
context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), null) | |
ConnectionContext.https(context) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken} | |
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} | |
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.{Flow, Framing, Source} | |
import akka.util.ByteString | |
import play.api.libs.json._ | |
import scala.collection.immutable | |
import scala.concurrent.ExecutionContext | |
object ReactiveNakadiApp extends App { | |
implicit val system = ActorSystem() | |
implicit val materializer = ActorMaterializer() | |
implicit val executionContext = materializer.executionContext | |
new ReactiveNakadi(args(0), args(1)).read().runForeach(println) | |
} | |
class ReactiveNakadi(token: String, uri: String)(implicit system: ActorSystem, mat: ActorMaterializer, ec: ExecutionContext) { | |
val authorization = Authorization(OAuth2BearerToken(token)) | |
val request = HttpRequest( | |
uri = Uri(uri), | |
headers = immutable.Seq(authorization)) | |
implicit val eventUnmarshaller = | |
Unmarshaller.strict[ByteString, JsValue] { string => | |
Json.parse(string.utf8String) | |
} | |
val tail = Flow[HttpResponse] | |
.flatMapConcat(_.entity.getDataBytes) | |
.via(Framing.delimiter(ByteString('\n'), Int.MaxValue)) | |
val parse = Flow[ByteString].mapAsync(Int.MaxValue)(Unmarshal(_).to[JsValue]) | |
def read() = { | |
val response = Http() singleRequest(request, NoSSL()) | |
Source.fromFuture(response) | |
.via(tail) | |
.via(parse) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment