Note: I'm migrating from gonzalo123.com to here. When I finish I'll swap the DNS to here. The "official" blog will be always gonzalo123.com

      Playing with RabbitMQ (part 2). Now with Python

      Do you remember the las post about RabbitMQ? In that post we created a small wrapper library to use RabbitMQ with node and PHP. I also work with Python and I also want to use the same RabbitMQ wrapper here. With Python there’re several libraries to use Rabbit. I’ll use pika.

      The idea is the same than the another post. I want to use queues, exchanges and RPCs. So let’s start with queues:

      We can create a queue receiver called ‘queue.backend’

      from rabbit import builder
       
      server = {
          'host': 'localhost',
          'port': 5672,
          'user': 'guest',
          'pass': 'guest',
      }
       
      def onData(data):
          print data['aaa']
       
      builder.queue('queue.backend', server).receive(onData)
      

      and emit messages to the queue

      from rabbit import builder
       
      server = {
          'host': 'localhost',
          'port': 5672,
          'user': 'guest',
          'pass': 'guest',
      }
       
      queue = builder.queue('queue.backend', server)
       
      queue.emit({"aaa": 1})
      queue.emit({"aaa": 2})
      queue.emit({"aaa": 3})
      

      The library (as the PHP and ones). Uses a builder class to create our instances

      from queue import Queue
      from rpc import RPC
      from exchange import Exchange
       
      defaults = {
          'queue': {
              'queue': {
                  'passive': False,
                  'durable': True,
                  'exclusive': False,
                  'autoDelete': False,
                  'nowait': False
              },
              'consumer': {
                  'noLocal': False,
                  'noAck': False,
                  'exclusive': False,
                  'nowait': False
              }
          },
          'exchange': {
              'exchange': {
                  'passive': False,
                  'durable': True,
                  'autoDelete': True,
                  'internal': False,
                  'nowait': False
              },
              'queue': {
                  'passive': False,
                  'durable': True,
                  'exclusive': False,
                  'autoDelete': True,
                  'nowait': False
              },
              'consumer': {
                  'noLocal': False,
                  'noAck': False,
                  'exclusive': False,
                  'nowait': False
              }
          },
          'rpc': {
              'queue': {
                  'passive': False,
                  'durable': True,
                  'exclusive': False,
                  'autoDelete': True,
                  'nowait': False
              },
              'consumer': {
                  'noLocal': False,
                  'noAck': False,
                  'exclusive': False,
                  'nowait': False
              }
          }
      }
       
      def queue(name, server):
          conf = defaults['queue']
          conf['server'] = server
       
          return Queue(name, conf)
       
      def rpc(name, server):
          conf = defaults['rpc']
          conf['server'] = server
       
          return RPC(name, conf)
       
      def exchange(name, server):
          conf = defaults['exchange']
          conf['server'] = server
       
          return Exchange(name, conf)
      

      And our Queue class

      import pika
      import json
      import time
       
      class Queue:
          def __init__(self, name, conf):
              self.name = name
              self.conf = conf
       
          def emit(self, data=None):
              credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
              connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
              channel = connection.channel()
       
              queueConf = self.conf['queue']
              channel.queue_declare(queue=self.name, passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
       
              channel.basic_publish(exchange='', routing_key=self.name, body=json.dumps(data), properties=pika.BasicProperties(delivery_mode=2))
              connection.close()
       
          def receive(self, callback):
              credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
              connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
              channel = connection.channel()
       
              queueConf = self.conf['queue']
              channel.queue_declare(queue=self.name, passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
       
              def _callback(ch, method, properties, body):
                  callback(json.loads(body))
                  ch.basic_ack(delivery_tag=method.delivery_tag)
                  print "%s %s::%s" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body)
       
              print "%s Queue '%s' initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
              consumerConf = self.conf['consumer']
              channel.basic_qos(prefetch_count=1)
              channel.basic_consume(_callback, self.name, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'])
       
              channel.start_consuming()
      

      We also want to use exchanges to emit messages without waiting for answers, just as a event broadcast. We can emit messages:

      from rabbit import builder
       
      server = {
          'host': 'localhost',
          'port': 5672,
          'user': 'guest',
          'pass': 'guest',
      }
       
      exchange = builder.exchange('process.log', server)
       
      exchange.emit("xxx.log", "aaaa")
      exchange.emit("xxx.log", ["11", "aaaa"])
      exchange.emit("yyy.log", "aaaa")
      

      And listen to messages

      from rabbit import builder
       
      server = {
          'host': 'localhost',
          'port': 5672,
          'user': 'guest',
          'pass': 'guest',
      }
       
      def onData(routingKey, data):
          print routingKey, data
       
      builder.exchange('process.log', server).receive("yyy.log", onData)
      

      That’s the class

      import pika
      import json
      import time
       
      class Exchange:
          def __init__(self, name, conf):
              self.name = name
              self.conf = conf
       
          def emit(self, routingKey, data=None):
              credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
              connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
              channel = connection.channel()
       
              exchangeConf = self.conf['exchange']
              channel.exchange_declare(exchange=self.name, type='topic', passive=exchangeConf['passive'], durable=exchangeConf['durable'], auto_delete=exchangeConf['autoDelete'], internal=exchangeConf['internal'])
              channel.basic_publish(exchange=self.name, routing_key=routingKey, body=json.dumps(data))
              connection.close()
       
          def receive(self, bindingKey, callback):
              credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
              connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
              channel = connection.channel()
       
              exchangeConf = self.conf['exchange']
              channel.exchange_declare(exchange=self.name, type='topic', passive=exchangeConf['passive'], durable=exchangeConf['durable'], auto_delete=exchangeConf['autoDelete'], internal=exchangeConf['internal'])
       
              queueConf = self.conf['queue']
              result = channel.queue_declare(passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
              queue_name = result.method.queue
       
              channel.queue_bind(exchange=self.name, queue=queue_name, routing_key=bindingKey)
       
              print "%s Exchange '%s' initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
       
              def _callback(ch, method, properties, body):
                  callback(method.routing_key, json.loads(body))
                  ch.basic_ack(delivery_tag=method.delivery_tag)
                  print "%s %s:::%s" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body)
       
              consumerConf = self.conf['consumer']
              channel.basic_consume(_callback, queue=queue_name, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'])
              channel.start_consuming()
      

      And finally we can use RPCs. Emit \

      from rabbit import builder
       
      server = {
          'host': 'localhost',
          'port': 5672,
          'user': 'guest',
          'pass': 'guest',
      }
       
      print builder.rpc('rpc.hello', server).call("Gonzalo", "Ayuso")
      

      And the server side

      from rabbit import builder
       
      server = {
          'host': 'localhost',
          'port': 5672,
          'user': 'guest',
          'pass': 'guest',
      }
       
      def onData(name, surname):
          return "Hello %s %s" % (name, surname)
       
      builder.rpc('rpc.hello', server).server(onData)
      

      And that’s the class

      import pika
      import json
      import time
      import uuid
       
      class RPC:
          def __init__(self, name, conf):
              self.name = name
              self.conf = conf
       
          def call(self, *params):
              pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
              connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port']))
              channel = connection.channel()
       
              queueConf = self.conf['queue']
              result = channel.queue_declare(queue='', passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
              callback_queue = result.method.queue
              consumerConf = self.conf['consumer']
              channel.basic_consume(self.on_call_response, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'], queue='')
       
              self.response = None
              self.corr_id = str(uuid.uuid4())
              channel.basic_publish(exchange='', routing_key=self.name, properties=pika.BasicProperties(reply_to=callback_queue, correlation_id=self.corr_id), body=json.dumps(params))
              while self.response is None:
                  connection.process_data_events()
              return self.response
       
          def on_call_response(self, ch, method, props, body):
              if self.corr_id == props.correlation_id:
                  self.response = body
       
          def server(self, callback):
              pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
              connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port']))
              channel = connection.channel()
       
              queueConf = self.conf['queue']
              channel.queue_declare(self.name, passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
       
              channel.basic_qos(prefetch_count=1)
              consumerConf = self.conf['consumer']
       
              def on_server_request(ch, method, props, body):
                  response = callback(*json.loads(body))
       
                  ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=json.dumps(response))
                  ch.basic_ack(delivery_tag=method.delivery_tag)
                  print "%s %s::req => '%s' response => '%s'" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body, response)
       
              channel.basic_consume(on_server_request, queue=self.name, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'])
       
              print "%s RPC '%s' initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
              channel.start_consuming()
      

      And that’s all. Full project is available within my github account

      comments powered by Disqus