Created
September 21, 2015 18:24
-
-
Save sstone/f4b9448c696911b265a5 to your computer and use it in GitHub Desktop.
montoring rabbitmq jobs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.sstone.amqp | |
import java.util.UUID | |
import akka.actor.{ActorLogging, Actor, ActorSystem, Props} | |
import com.github.sstone.amqp.Amqp._ | |
import com.github.sstone.amqp.RpcServer.{IProcessor, ProcessResult} | |
import com.rabbitmq.client.{AMQP, BasicProperties, ConnectionFactory} | |
import scala.concurrent.Future | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.duration._ | |
object Test1 extends App { | |
implicit val system = ActorSystem("mySystem") | |
val conn = system.actorOf(ConnectionOwner.props(new ConnectionFactory(), reconnectionDelay = 5 seconds), "connection") | |
// create 2 servers | |
val processor = new IProcessor { | |
override def onFailure(delivery: Delivery, e: Throwable): ProcessResult = ??? | |
override def process(delivery: Delivery): Future[ProcessResult] = { | |
Thread.sleep(100) | |
Future.successful(ProcessResult(Some(delivery.body))) // here we could send anything back | |
} | |
} | |
val queueParams = QueueParameters("request_queue", passive = false, durable = false, exclusive = false, autodelete = true) | |
val server1 = ConnectionOwner.createChildActor( | |
conn, | |
RpcServer.props(queueParams, StandardExchanges.amqDirect, "my_key", processor, ChannelParameters(qos = 1))) | |
val server2 = ConnectionOwner.createChildActor( | |
conn, | |
RpcServer.props(queueParams, StandardExchanges.amqDirect, "my_key", processor, ChannelParameters(qos = 1))) | |
val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props()) | |
// create a client that will handle job batches | |
case class Job(expected: Int, received: Vector[Array[Byte]]) | |
class MyClient extends Actor with ActorLogging { | |
val jobs = new collection.mutable.HashMap[String, Job] | |
def receive = { | |
case ('work, data: Array[Byte]) => | |
val items = data.grouped(20).toList | |
val correlationId = UUID.randomUUID().toString | |
val properties = new AMQP.BasicProperties.Builder().correlationId(correlationId).replyTo("response_queue").build() | |
items.map(item => producer ! Publish("", "request_queue", item, Some(properties))) | |
jobs += correlationId -> Job(items.size, Vector.empty[Array[Byte]]) | |
case Delivery(tag, envelope, properties, body) if !jobs.contains(properties.getCorrelationId) => | |
log.warning(s"received response for unknown job ${properties.getCorrelationId}") | |
case Delivery(tag, envelope, properties, body) => | |
val job = jobs(properties.getCorrelationId) | |
val received = job.received :+ body | |
if (received.size == job.expected) { | |
log.info(s"job ${properties.getCorrelationId} has been completed") | |
jobs -= properties.getCorrelationId | |
} else { | |
log.info(s"job ${properties.getCorrelationId}: ${received.size} sub-jobs have been completed out of ${job.expected}") | |
jobs.update(properties.getCorrelationId, job.copy(received = received)) | |
} | |
} | |
} | |
val client = system.actorOf(Props[MyClient]) | |
val responseQueueParams = QueueParameters("response_queue", passive = false, durable = false, exclusive = false, autodelete = true) | |
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(client, channelParams = None, autoack = true)) | |
// wait till everyone is actually connected to the broker | |
Amqp.waitForConnection(system, consumer).await() | |
consumer ! Record(AddQueue(responseQueueParams)) | |
Thread.sleep(1000) | |
val data = new Array[Byte](100) | |
client ! ('work, data) | |
client ! ('work, data) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment