Using RabbitMQ in Scala

RabbitMQ is a a great place to stash the state of long running scripts. It has APIs for basically every language, so you could conceivably write different steps in a batch job in different languages, to take advantages of strong languages in each. I also like that this allows you to kill a script and restart from where it was, as these things always seem to lock up on me eventually.

For Scala, there are more library options than you might expect, as RabbitMQ is available for Java and Scala actors. I don’t think Actors are really worth it for this (unless you really need it), so the simplest solution is to depend on the lower level RabbitMQ client:

libraryDependencies += "com.rabbitmq" % "amqp-client" % "3.6.1"

You’ll need to create a connection (below are the defaults):

val factory = new ConnectionFactory()
factory.setHost("127.0.0.1")
factory.setUsername("guest")
factory.setPassword("guest")
factory.setPort(5672)

To add tasks to a queue, you’ll want to make a queue first. I did this through the RabbitMQ UI, because then you don’t have to write as much code.

Then you decide what format your messages are (in this case just URLs). The one irritation is you have to convert them to a byte array, but I guess this means you could store all sorts of interesting things (TCP packets, etc).

val conn = factory.newConnection()
val channel = conn.createChannel

channel.queueBind("ssl-akka", "ssl-akka", "ssl-akka")

for (domain <- domains) {
  val messageBodyBytes = ("https://www." + domain).getBytes();
  val exchangeName = "ssl-akka"
  val routingKey = "ssl-akka"

  channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
}

channel.close()
conn.close()

Retrieving tasks is relatively simple - you need to know where they are stored. The messages come down as bytes, so you'll want to turn them back into a String, JSON, etc depending what you serialized.

If you want to be able to kill and resume a script, you need to acknowledge (ack) the messages manually at the end of your code. If you are writing some sort of scraping utility that could throw an error, you likely want to put a Try around this to catch everything.

The loop I added below is optional (for 1..30). I found that my connections would eventually die, and it was easiest to re-connect periodically inside a Future.

val channel: Channel = conn.createChannel

val autoAck = false;
for (i <- 1 to 30) {
  val response = channel.basicGet("ssl-akka", autoAck);
  if (response == null) {
    // No message retrieved - I've never seen this, 
    // but apparently it's possible
  } else {
    val body = response.getBody();
    val deliveryTag = response.getEnvelope().getDeliveryTag();
    val domain = new String(body, "UTF-8");

    Try {
      doSomething
    }
 
    channel.basicAck(deliveryTag, false);
  }
}