Last active
June 8, 2020 12:38
-
-
Save rklaehn/3f26c3f80e5870831f52 to your computer and use it in GitHub Desktop.
akka http file server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akkahttptest | |
import akka.http.Http | |
import akka.stream.ActorFlowMaterializer | |
import akka.actor.ActorSystem | |
import akka.stream.scaladsl.{Sink, Source} | |
import akka.http.model._ | |
object TestClient extends App { | |
implicit val system = ActorSystem("ServerTest") | |
implicit val materializer = ActorFlowMaterializer() | |
val host = "127.0.0.1" | |
val httpClient = Http(system).outgoingConnection(host, 80) | |
val printChunksConsumer = Sink.foreach[HttpResponse] { res => | |
if(res.status == StatusCodes.OK) { | |
res.entity.getDataBytes().map { chunk => | |
System.out.write(chunk.toArray) | |
System.out.flush() | |
}.to(Sink.ignore).run() | |
} else | |
println(res.status) | |
} | |
val finishFuture = Source.single(HttpRequest()).via(httpClient).runWith(printChunksConsumer) | |
System.in.read() | |
system.shutdown() | |
system.awaitTermination() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akkahttptest | |
import java.nio.channels.FileChannel | |
import java.nio.file.{Path, Paths, StandardOpenOption} | |
import java.nio.{ByteBuffer, MappedByteBuffer} | |
import akka.actor.ActorSystem | |
import akka.http.Http | |
import akka.http.model.HttpEntity.ChunkStreamPart | |
import akka.http.model._ | |
import akka.stream.ActorFlowMaterializer | |
import akka.stream.scaladsl.{Sink, Source} | |
import akka.util.{ByteString, Timeout} | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
import scala.util.Try | |
import scala.util.control.NonFatal | |
class ByteBufferIterator(buffer:ByteBuffer, chunkSize:Int) extends Iterator[ByteString] { | |
require(buffer.isReadOnly) | |
require(chunkSize > 0) | |
override def hasNext = buffer.hasRemaining | |
override def next(): ByteString = { | |
val size = chunkSize min buffer.remaining() | |
val temp = buffer.slice() | |
temp.limit(size) | |
buffer.position(buffer.position() + size) | |
ByteString(temp) | |
} | |
} | |
object Main extends App { | |
def map(path: Path) : MappedByteBuffer = { | |
val channel = FileChannel.open(path, StandardOpenOption.READ) | |
val result = channel.map(FileChannel.MapMode.READ_ONLY, 0L, channel.size()) | |
channel.close() | |
result | |
} | |
implicit val system = ActorSystem() | |
implicit val materializer = ActorFlowMaterializer() | |
implicit val askTimeout: Timeout = 500.millis | |
import HttpMethods._ | |
val requestHandler: HttpRequest ⇒ HttpResponse = { | |
case HttpRequest(GET, uri, headers, _, _) => | |
val path = Paths.get(uri.path.toString()) | |
val result = Try { | |
val mappedByteBuffer = map(path) | |
val iterator = new ByteBufferIterator(mappedByteBuffer, 4096) | |
val chunks = Source(() => iterator).map { x => | |
println("Chunk of size " + x.size) | |
ChunkStreamPart(x) | |
} | |
HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`application/octet-stream`, chunks)) | |
} recover { | |
case NonFatal(cause) => | |
HttpResponse(StatusCodes.InternalServerError, entity = cause.getMessage) | |
} | |
result.get | |
case _: HttpRequest ⇒ HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!") | |
} | |
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http(system).bind(interface = "localhost", port = 8080) | |
val bindingFuture: Future[Http.ServerBinding] = serverSource.to(Sink.foreach { connection => | |
// foreach materializes the source | |
println("Accepted new connection from " + connection.remoteAddress) | |
// ... and then actually handle the connection | |
connection.handleWithSyncHandler(requestHandler) | |
}).run() | |
System.in.read() | |
system.shutdown() | |
system.awaitTermination() | |
} |
I updated the example to work with 1.0-M2
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Here is a modified version of your test client and server that work with 0.11:
https://gist.github.com/abrighton/acd43a6cd9c0b997c456
The chunking part seems to work. You just need to be careful not to shutdown before it is done.