Wednesday, August 25, 2010

Looks like Google voice is going to compete big time with Skype

When skype call was launched it was free for the remaining part of the year  and now google voice is introducing free calls for the year for US and canada. I just tried the call quality from my ubuntu and it rocks. Here is how it looks like in my gmail

Saturday, August 21, 2010

RabbitMQ synchronously consume messages

In my previous post I had described a scenario to synchronously consuming message from RabbitMQ. Here is a way to do it in python.To make things simple to understand I just wrote a dummy program that dumps the content of a RabbitMQ queue and then you can use the same program to remove a message also from queue by iterating all messages and acknowledging it. (its a dumb implementation so dont judge the coding, the intent is to demonstrate synchronous consumption of queue contents).



import sys
from amqplib import client_0_8 as amqp
messageIdToRemove = None
chan = None

def process_message(msg):
    print "================================================="
    print "Properties ="
    print msg.properties
    print "Body=" 
    print msg.body
    if op == "remove_message":
        if messageIdToRemove == msg.properties['message_id']:
            print "@@@@@@removing message@@@@@@@@@@@@@@@@@@"
            chan.basic_ack(msg.delivery_tag)
if __name__ == '__main__':
    if len(sys.argv) < 9:
       print "Usage python list_queue_messages.py mq_url mq_user mq_pass mq_vhost mq_exchange mq_queue_name mq_routing_key [list_queue|remove_message] messageIdToRemove"
       exit()
   
    mq_url = sys.argv[1]
    mq_user = sys.argv[2]
    mq_pass = sys.argv[3]
    mq_vhost = sys.argv[4]
    mq_exchange = sys.argv[5]
    mq_queue_name = sys.argv[6]
    mq_routing_key = sys.argv[7]
    op=sys.argv[8]
    if op == "remove_message":
        messageIdToRemove=sys.argv[9]
        
    conn = amqp.Connection(host=mq_url,
                           userid=mq_user,
                           password=mq_pass,
                           virtual_host=mq_vhost,
                           insist=False);
    chan = conn.channel();
    chan.queue_declare(queue=mq_queue_name, durable=True,
        exclusive=False, auto_delete=False);
    chan.exchange_declare(exchange=mq_exchange, type="direct", durable=True,
        auto_delete=False);        
    chan.queue_bind(queue=mq_queue_name, exchange=mq_exchange,
        routing_key=mq_routing_key)

    print "Consumer dumping messages from %s" % mq_queue_name
    i=0
    try:
        while True:
            msg = chan.basic_get(mq_queue_name)
            if msg is None:
                break;
            else:
                i=i+1
                process_message(msg)            
    finally:
        chan.close();
        conn.close();
    print "There are %d messages in the queue" % i

RabbitMQ retrying failed messages

We are a Hybrid cloud file server company and recently we had a requirement where we had to allow users to View a file with Google docs and upon saving the file in Google docs we need to download the file back and create a version in Cloud file server. Upon saving the file in google docs we insert a message in rabbitMQ from app nodes and then a background GoogleDocs consumer process pulls the file and create a version using REST api of the cloud file server.

As there are many components involved here there can be multiple failure scenarios from Google throttling us, to appservers going down for maintenance, to app servers throttling the background jobs if they are under heavy load.  The problem with rabbitMQ is that once a message is delivered to the consumer even if the consumer doesn't acknowledges it, RabbitMQ won't redeliver the unacked message to consumers until the channel is properly closed and reopened.  I tried checking rabbit transactions api to rollback the transaction in case of external component failure and also tryied basic_recover but it has its own issues and none of them worked.

So the best idea I came up was to start a timer thread in python that wakes up every 1 minute and polls Rabbitmq for pending messages and processes them and closes the channel/connection and  goes back to sleep for 1 minute again.  This way all unacked messages would be redelivered.