{"id":3053,"date":"2016-02-09T01:31:19","date_gmt":"2016-02-09T01:31:19","guid":{"rendered":"http:\/\/www.garysieling.com\/blog\/?p=3053"},"modified":"2016-02-09T01:31:19","modified_gmt":"2016-02-09T01:31:19","slug":"rabbitmq-etl-cloud-api","status":"publish","type":"post","link":"https:\/\/www.garysieling.com\/blog\/rabbitmq-etl-cloud-api\/","title":{"rendered":"Using RabbitMQ for ETL of cloud storage APIs"},"content":{"rendered":"<p>Message queues are used as a communication between pieces of software, e.g. on a web site these are often used to request that a job be run in the background.<\/p>\n<p>Compared to a database, they are optimized for message throughput and provide some mechanisms to send messages to many consumers &#8211; this could be useful if you were replicating data to many locations.<\/p>\n<p>RabbitMQ is a popular message queuing product, and is fairly easy to use. In this article, I&#8217;ll show how to use it in a piece of software that pulls documents from DropBox, OneDrive, or Google Docs and indexes them for a search engine in Solr.<\/p>\n<p>RabbitMQ lets us save a message string. If you use JSON, you can serialize any class, so for this example we&#8217;ll define something that takes an account ID for our application, the type of document store we&#8217;re indexing, and the OAuth credentials to DropBox.<\/p>\n<pre lang=\"csharp\">\npublic class IndexQueueMessage\n{\n  public String accountId { get; set; }\n  public SearchableApplication requestedIndex { get; set; }\n  public String user_key { get; set; } \n  public String user_token { get; set; } \n}\n<\/pre>\n<p>Connecting and receiving messages becomes very simple.<\/p>\n<p>Queues are made to collect and pass on messages. The naming of these is one of the more important features, and the closest thing I&#8217;ve seen to a database design problem in message queueing &#8211; for instance, if you made one of these per account\/tenant in your application, you could expose statistics about usage or charge people based on the quality of service you provide them (more indexers for their account)<\/p>\n<pre lang=\"csharp\">\nvar factory = new ConnectionFactory() \n{ \n  HostName = \"tasks.cloudapp.net\", \n  UserName = \"user\", \n  Password = \"pwd\"\n};\n\nfactory.AutomaticRecoveryEnabled = true;\n\nusing (var connection = factory.CreateConnection())\nusing (var channel = connection.CreateModel())\n{\n  var queue = indexer.GetType().ToString().ToLower() + \"_indexer\";\n\n  channel.QueueDeclare(queue: queue,\n    durable: true,\n    exclusive: false,\n    autoDelete: false,\n    arguments: null);\n\n  var consumer = new QueueingBasicConsumer(channel);\n\t\t\n  channel.BasicConsume(\n    queue: queue,\n    noAck: false, \/\/ false = send acknowledgement from the job\n    consumer: consumer\n  );\n\n  while (true)\n  {\n     ...\n  }\n<\/pre>\n<p>To retrieve a message, we get the message bytes and deserialize it:<\/p>\n<pre lang=\"csharp\">\nvar ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();\n\ntry\n{\n  var body = ea.Body;\n  var messageJson = Encoding.UTF8.GetString(body);\n\n  executeMessage(messageJson, indexer);\n}\nfinally\n{\n  debug(\"Acking job: \" + ea.DeliveryTag);\n  channel.BasicAck(\n    deliveryTag: ea.DeliveryTag,\n    multiple: false\n  );\n}\n<\/pre>\n<p>The sending side does something similar:<\/p>\n<pre lang=\"csharp\">\nchannel.BasicPublish(\n  exchange: \"\",\n  routingKey: app.ToString().ToLower() + \"_indexer\",\n  basicProperties: properties,\n  body: body);\n<\/pre>\n<p>When we receive messages, we can deserialize them to our message type:<\/p>\n<pre lang=\"csharp\">\ntry\n{\n  var jss = new System.Web.Script.Serialization.JavaScriptSerializer();\n  var message = jss.Deserialize<IndexQueueMessage>(messageJson);\n\n  routeType(message.type, indexer, message);\n}\ncatch (Exception e)\n{\n  error(e);\n}\n<\/pre>\n<p>Finally, we need to route these to application code:<\/p>\n<pre lang=\"csharp\">\nif (MESSAGE_START.Equals(type))\n{\n  indexer.startIndex(message);\n} \nelse if (MESSAGE_FOLDER.Equals(type)) \n{\n  indexer.indexFolder(message);\n} \nelse if (MESSAGE_CONTENT.Equals(type)) \n{\n  indexer.indexContent(message);\n} \nelse if (MESSAGE_RECENT.Equals(type)) \n{\n  indexer.indexRecent(message);\n}\nelse if (MESSAGE_NEXT.Equals(type)) \n{\n  indexer.indexNext(message);\n}\n<\/pre>\n<p>There are several things that are neat about this, which were not immediately apparent to me until I set this up.<\/p>\n<p>One really nice feature (compared to cron) is that you can get events to run nearly instantaneously from the time they are requested. <\/p>\n<p>If you use a common message queue, a web application could have clients that use many backend clients &#8211; as an example, you could use a pre-built image resizing utility in Go, a Java based natural language processing tool, and computer vision libraries in Python, with JSON as the interop language.<\/p>\n<p>This design also allows for instant parallelism &#8211; you can simply connect more processing nodes to the cluster. I&#8217;ve found that many long running ETL scripts tend to accumulate resources (memory, disk, file handles, etc) if not carefully written. A nice side benefit of this design is that you can subdivide these types of problems by having scripts run a set number of jobs and shut down &#8211; it also makes failing and re-trying much simpler (especially in the out of memory case). <\/p>\n","protected":false},"excerpt":{"rendered":"<p>Message queues are used as a communication between pieces of software, e.g. on a web site these are often used to request that a job be run in the background. Compared to a database, they are optimized for message throughput and provide some mechanisms to send messages to many consumers &#8211; this could be useful &hellip; <\/p>\n<p class=\"link-more\"><a href=\"https:\/\/www.garysieling.com\/blog\/rabbitmq-etl-cloud-api\/\" class=\"more-link\">Continue reading<span class=\"screen-reader-text\"> &#8220;Using RabbitMQ for ETL of cloud storage APIs&#8221;<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"om_disable_all_campaigns":false,"_monsterinsights_skip_tracking":false,"_monsterinsights_sitenote_active":false,"_monsterinsights_sitenote_note":"","_monsterinsights_sitenote_category":0,"footnotes":""},"categories":[22],"tags":[96,177,204,254,401,453,517],"aioseo_notices":[],"amp_enabled":true,"_links":{"self":[{"href":"https:\/\/www.garysieling.com\/blog\/wp-json\/wp\/v2\/posts\/3053"}],"collection":[{"href":"https:\/\/www.garysieling.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.garysieling.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.garysieling.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.garysieling.com\/blog\/wp-json\/wp\/v2\/comments?post=3053"}],"version-history":[{"count":0,"href":"https:\/\/www.garysieling.com\/blog\/wp-json\/wp\/v2\/posts\/3053\/revisions"}],"wp:attachment":[{"href":"https:\/\/www.garysieling.com\/blog\/wp-json\/wp\/v2\/media?parent=3053"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.garysieling.com\/blog\/wp-json\/wp\/v2\/categories?post=3053"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.garysieling.com\/blog\/wp-json\/wp\/v2\/tags?post=3053"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}