Note: I'm migrating from gonzalo123.com to here. When I finish I'll swap the DNS to here. The "official" blog will be always gonzalo123.com

      Playing with RabbitMQ, Python and Docker. From Exchanges to Queues

      Lately in all my projects message queues appear in one way or another. Normally I work with AWS and I use SQS and SNS, but I also use quite often, RabbitMQ and MQTT. In AWS there’s something that I use a lot to isolate services. The process that emits messages emits message to SNS and I bind SNS to SQS. With this technique I can attach n SQS to the the same SNS. I’ve used it here. In AWS is pretty straightforward to do that. Today We’re going to do the same with RabbitMQ. In fact it’s very easy to do it in RabbitMQ. We only need to follow the tutorial in official RabbitMQ documentation.

      The script listen to a exchange and resend the message to a queue topic. Basically the same that we can see within RabbitMQ documentation.

      import settings
      from lib.logger import logger
      from lib.rabbit import get_channel
       
      channel = get_channel()
      channel.exchange_declare(
          exchange=settings.EXCHANGE,
          exchange_type='fanout')
       
      result = channel.queue_declare(
          queue='',
          exclusive=True)
       
      queue_name = result.method.queue
       
      channel.queue_bind(
          exchange=settings.EXCHANGE,
          queue=queue_name)
       
      logger.info(f' [*] Waiting for {settings.EXCHANGE}. To exit press CTRL+C')
       
      channel.queue_declare(
          durable=settings.QUEUE_DURABLE,
          auto_delete=settings.QUEUE_AUTO_DELETE,
          queue=settings.QUEUE_TOPIC
      )
       
       
      def callback(ch, method, properties, body):
          channel.basic_publish(
              exchange='',
              routing_key=settings.QUEUE_TOPIC,
              body=body)
          logger.info(f'Message sent to topic {settings.QUEUE_TOPIC}. Message: {body}')
       
       
      channel.basic_consume(
          queue=queue_name,
          on_message_callback=callback,
          auto_ack=True)
       
      channel.start_consuming()
      

      We send one message to the exchange using rabbitmqadmin and see the message in the queue:

      rabbitmqadmin -u username -p password publish exchange=exchange routing_key= payload="hello, world"
      

      My idea with this project is to deploy it into a docker swarm cluster and add/remove listener only adding new services to the stack:

      ...
        exchange2topic1:
          image: ${ECR}/exchange2queue:${VERSION}
            build:
              context: .
              dockerfile: Dockerfile
            deploy:
              restart_policy:
                condition: on-failure
            depends_on:
              - rabbit
          command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py
          environment:
            RABBIT_HOST: rabbit
            RABBIT_USER: ${RABBITMQ_USER}
            RABBIT_PASS: ${RABBITMQ_PASS}
            EXCHANGE: exchange
            QUEUE_TOPIC: topic1
            QUEUE_DURABLE: 1
            QUEUE_AUTO_DELETE: 0
        ...
      

      With this approach it’s very simple form me add and remove new listeners without touching the emitter

      Also, as plus, I like to add a simple http api to allow me to send messages to the exchange with a post request, instead of using a RabbitMQ client. That’s because sometimes I work with legacy systems where using a AMQP client isn’t simple. That’s a simple Flask API

      from flask import Flask, request
      from flask import jsonify
       
      import settings
      from lib.auth import authorize_bearer
      from lib.logger import logger
      from lib.rabbit import get_channel
      import json
       
      app = Flask(__name__)
       
       
      @app.route('/health')
      def health():
          return jsonify({"status": "ok"})
       
       
      @app.route('/publish/<path:exchange>', methods=['POST'])
      @authorize_bearer(bearer=settings.API_TOKEN)
      def publish(exchange):
          channel = get_channel()
          try:
              message = request.get_json()
              channel.basic_publish(
                  exchange=exchange,
                  routing_key='',
                  body=json.dumps(message)
              )
              logger.info(f"message sent to exchange {exchange}")
              return jsonify({"status": "OK", "exchange": exchange})
          except:
              return jsonify({"status": "NOK", "exchange": exchange})
      

      Now we can emit messages to the exchange using curl, postman or any other http client.

      POST http://localhost:5000/publish/exchange
      Content-Type: application/json
      Authorization: Bearer super_secret_key
       
      {
        "hello": "Gonzalo"
      }
      

      Also, in the docker stack I want to use a reverse proxy to server my Flask application (served with gunicorn) and the RabbitMQ management console. I’m using nginx to do that:

      upstream api {
          server api:5000;
      }
       
      server {
          listen 8000 default_server;
          listen [::]:8000;
       
          client_max_body_size 20M;
       
          location / {
              try_files $uri @proxy_to_api;
          }
       
          location ~* /rabbitmq/api/(.*?)/(.*) {
              proxy_pass http://rabbit:15672/api/$1/%2F/$2?$query_string;
              proxy_buffering                    off;
              proxy_set_header Host              $http_host;
              proxy_set_header X-Real-IP         $remote_addr;
              proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
              proxy_set_header X-Forwarded-Proto $scheme;
          }
       
          location ~* /rabbitmq/(.*) {
              rewrite ^/rabbitmq/(.*)$ /$1 break;
              proxy_pass http://rabbit:15672;
              proxy_buffering                    off;
              proxy_set_header Host              $http_host;
              proxy_set_header X-Real-IP         $remote_addr;
              proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
              proxy_set_header X-Forwarded-Proto $scheme;
          }
       
          location @proxy_to_api {
              proxy_set_header X-Forwarded-Proto https;
              proxy_set_header X-Url-Scheme $scheme;
              proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
              proxy_set_header Host $http_host;
              proxy_redirect off;
       
              proxy_pass http://api;
          }
      }
      

      And that’s all. Here the docker-compose.yml

      version: '3.6'
       
      x-base: &base
        image: ${ECR}/exchange2queue:${VERSION}
        build:
          context: .
          dockerfile: Dockerfile
        deploy:
          restart_policy:
            condition: on-failure
        depends_on:
          - rabbit
       
      services:
        rabbit:
          image: rabbitmq:3-management
          deploy:
            restart_policy:
              condition: on-failure
          ports:
            - 5672:5672
          environment:
            RABBITMQ_ERLANG_COOKIE:
            RABBITMQ_DEFAULT_VHOST: /
            RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
            RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS}
       
        nginx:
          image: ${ECR}/exchange2queue_nginx:${VERSION}
          deploy:
            restart_policy:
              condition: on-failure
          build:
            context: .docker/nginx
            dockerfile: Dockerfile
          ports:
            - 8080:8000
          depends_on:
            - rabbit
            - api
       
        api:
          <<: *base
          container_name: front
          command: /bin/sh wait-for-it.sh rabbit:5672 -- gunicorn -w 4 api:app -b 0.0.0.0:5000
          deploy:
            restart_policy:
              condition: on-failure
          environment:
            RABBIT_HOST: rabbit
            RABBIT_USER: ${RABBITMQ_USER}
            RABBIT_PASS: ${RABBITMQ_PASS}
            API_TOKEN: ${API_TOKEN}
       
        exchange2topic1:
          <<: *base
          command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py
          environment:
            RABBIT_HOST: rabbit
            RABBIT_USER: ${RABBITMQ_USER}
            RABBIT_PASS: ${RABBITMQ_PASS}
            EXCHANGE: exchange
            QUEUE_TOPIC: topic1
            QUEUE_DURABLE: 1
            QUEUE_AUTO_DELETE: 0
       
        exchange2topic2:
          <<: *base
          command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py
          environment:
            RABBIT_HOST: rabbit
            RABBIT_USER: ${RABBITMQ_USER}
            RABBIT_PASS: ${RABBITMQ_PASS}
            EXCHANGE: exchange
            QUEUE_TOPIC: topic2
            QUEUE_DURABLE: 1
            QUEUE_AUTO_DELETE: 0
      

      Full code in my github

      comments powered by Disqus