Using RabbitMQ with Scala Futures

RabbitMQ and Futures provide related, but different functionality: RabbitMQ allows persistent storage for a queue of events, and Futures allow you to delegate lambda functions to a thread pool.

To make this concrete, lets say we make a function that reads a task from RabbitMQ:

def doSomething {
  val factory = new ConnectionFactory()
  factory.setHost("127.0.0.1")
  factory.setUsername("guest")
  factory.setPassword("guest")
  factory.setPort(5672)

  val message = 
    new String(
      factory
        .newConnection
        .createChannel
        .basicGet("queueName", true)
        .getBody(),
     "UTF-8")

  println(message)
}

Here we’re automatically acknowledging the task, if you want to guarantee the work is done before removing it from the queue, you can take control:

val deliveryTag =  
  response
    .getEnvelope()
    .getDeliveryTag();

channel.basicAck(deliveryTag, false);

Now that we can retrieve a task from RabbitMQ, we can process it, and back new messages, and repeat – if notice that this resembles Akka, you may be interested to know that there are Akka-RabbitMQ connectors.

Without Akka, we have a single threaded application, so we can start several processes to handle tasks, if we want to parallelize it.

Think this is a pain? Enter futures.

Futures are just functions (like doSomething above), which we can create many of and queue. They are handled by an implicit thread queue.

If you want to make your own, you can (this helps see how it works, and you can define the number of threads):

implicit val ec = new ExecutionContext {
   val threadPool = Executors.newFixedThreadPool(25)

   def execute(runnable: Runnable) {
     threadPool.submit(runnable)
   }

  def reportFailure(t: Throwable) {}
}

I find that it can also be helpful to add code to your queued task to pull several RabbitMQ tasks in sequence, complete them, and then end. This gives you another point of control, as any used resources can be freed and re-allocated after a specified number of tasks or elapsed time. This is similar to how IIS worker processes operate, except in Scala you can do it yourself in ~20-30 lines of code.

Leave a Reply

Your email address will not be published. Required fields are marked *