Skip to content

Instantly share code, notes, and snippets.

@YoEight
Created July 4, 2013 12:17

Revisions

  1. YoEight created this gist Jul 4, 2013.
    63 changes: 63 additions & 0 deletions Server.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,63 @@
    package deiko

    import java.nio.charset.Charset
    import java.util.concurrent.CancellationException
    import org.jboss.netty.buffer.ChannelBuffers
    import org.jboss.netty.channel.{ Channel, ChannelFuture, ChannelFutureListener }
    import org.jboss.netty.handler.codec.http.{ DefaultHttpChunk, HttpChunk }
    import scalaz.stream.{ Process, process1, processes }
    import scalaz.concurrent.Task
    import Process.{ Process1, Sink }
    import unfiltered.request.{ GET, Path, Seg }
    import unfiltered.response.{ NotFound, TransferEncoding, PlainTextContent }
    import unfiltered.netty.{ async, ReceivedMessage, ServerErrorResponse }

    object Foo extends async.Plan with ServerErrorResponse {
    def intent = {
    case req @ GET(Path(Seg("bar" :: name :: Nil))) => stream(name, req.underlying)
    case req => req.respond(NotFound)
    }

    def stream(filename: String, message: ReceivedMessage) {
    val channel = message.context.getChannel
    val end = Process.emit(HttpChunk.LAST_CHUNK)
    val resp = message.defaultResponse(TransferEncoding("Chunked") ~> PlainTextContent)
    val source = (processes.linesR(filename) |> toChunkProcess) ++ end
    val action = for {
    _ <- toTask(channel.write(resp))
    _ <- source.to(sink(channel)).run
    } yield ()

    action.run
    }

    def toTask(future: ChannelFuture): Task[Unit] =
    Task.async[Unit] { callback =>
    future.addListener(new ChannelFutureListener {
    def operationComplete(res: ChannelFuture) {
    if (res.isSuccess) callback(scalaz.\/-(()))
    else if (res.isCancelled) callback(scalaz.-\/(new CancellationException))
    else callback(scalaz.-\/(res.getCause))
    }
    })
    }

    def sink(channel: Channel): Sink[Task, HttpChunk] = {
    def go(input: HttpChunk) =
    toTask(channel.write(input))

    Process.await(Task.delay[HttpChunk => Task[Unit]](go))(Process.emit).repeat
    }

    val toChunkProcess: Process1[String, HttpChunk] =
    process1.lift(toChunk)

    def toChunk(input: String) =
    new DefaultHttpChunk(ChannelBuffers.copiedBuffer(input, Charset.forName("UTF-8")))
    }

    object Server {
    def main(args: Array[String]) {
    unfiltered.netty.Http(9000).handler(Foo).run()
    }
    }
    11 changes: 11 additions & 0 deletions build.sbt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,11 @@
    name := "netty-stream"

    version := "0.1-SNAPSHOT"

    scalaVersion := "2.10.2"

    libraryDependencies += "net.databinder.dispatch" %% "dispatch-core" % "0.10.0"

    libraryDependencies += "net.databinder" %% "unfiltered-netty-server" % "0.6.8"

    libraryDependencies += "org.scalaz.stream" %% "scalaz-stream" % "0.1-SNAPSHOT"