01 April 2015

Copying Messages from RabbitMQ to RavenDB with Python

Today I found myself needing to grab a copy of the messages in a RabbitMQ queue. After a few minutes of deliberation, I decided a quick and dirty way would be to use the REST API provided by RabbitMQ to grab the messages and the REST API provided by RavenDB to shove them in to RavenDB. While it’s perhaps not the most elegant solution, it is straight-forward and simple.

import sys
import json
import requests
import pprint

def dumpmessage():

    # RabbitMQ Stuff
    rabbit_url = 'http://myrabbitmqserver:15672/api/queues/myvhost/myqueue/get'
    rabbit_request_data = {'count': 1000, 'requeue': True, 'encoding': 'auto'}
    rabbit_request_json = json.dumps(rabbit_request_data)
    rabbit_request_headers = {'Content-type': 'application/json', 'Authorization': 'stuffgoeshere'}

    # RavenDB Stuff
    raven_url = 'http://myravendbserver:8080/docs'
    raven_request_headers = {'Raven-Entity-name': 'DumpedMessages'}

    rabbit_response = requests.post(rabbit_url, data=rabbit_request_json, headers=rabbit_request_headers)

    if rabbit_response.status_code != 200:
        pprint.pprint(rabbit_response.status_code)
        sys.exit('ERROR (' + str(rabbit_response.status_code) + '): ' + rabbit_response.text)

    documents = rabbit_response.json()

    for document in documents:

        json_document = json.dumps(document)

        raven_response = requests.post(raven_url, data=json_document, headers=raven_request_headers)

        if raven_response.status_code != 201:
            pprint.pprint(raven_response.status_code)
            sys.exit('ERROR (' + str(raven_response.status_code) + '): ' + raven_response.text)

dumpmessage()

The code itself is pretty easy to follow. In a nutshell:

  1. Set up the RabbitMQ REST API request
  2. Set up the RavenDB REST API request
  3. Make a POST to RabbitMQ (in my case I’m limiting to 1000 and re-queuing) using Requests
  4. Snag the JSON from the response
  5. Check for an error and stop if something went wrong
  6. Iterate through each item and…
    1. Use the JSON encoder and decoder in Python to turn the dictionary into a JSON string
    2. Make a POST to RavenDB passing in the JSON serialized document
    3. Check for an error and stop if something went wrong

In about 30 lines of code (and it could be made shorter), I got the job done. Granted, it’s not something everyone is likely to encounter, but it does demonstrate the power and usefulness of REST API’s and a language like Python.

UPDATE 2015-04-02

The above code works, but if you want a better version check out RabbitToRaven which is a bit nicer implementation.