Note: I'm migrating from gonzalo123.com to here. When I finish I'll swap the DNS to here. The "official" blog will be always gonzalo123.com

      Playing with Python threads. Part 2

      Last year I’ve written one post about Python and threads. You can read it here. Today I want to keep on playing with Python and threads. The idea is create one simple script that prints one asterisk in the console each second. Simple, isn’t it?

      from time import sleep
      while True:
          print("*")
          sleep(1)
      

      I want to keep this script running but I want to send one message externally, for example using RabbitMQ, and do something with the running script. In this demo, for example, stop the script.

      In javascript we can do it with a single thread process with setInterval function but, since the rabbit listener with pika is a blocking action, we need to use threads in Python (please tell me if I’m wrong). The idea is to create a circuit breaker condition in the main loop to check if I need to stop or not the main thread.

      First I’ve created my Rabbit listener in a thread:

      from queue import Queue, Empty
      import threading
      import pika
      import os
       
       
      class Listener(threading.Thread):
          def __init__(self, queue=Queue()):
              super(Listener, self).__init__()
              self.queue = queue
              self.daemon = True
       
          def run(self):
              channel = self._get_channel()
              channel.queue_declare(queue='stop')
       
              channel.basic_consume(
                  queue='stop',
                  on_message_callback=lambda ch, method, properties, body: self.queue.put(item=True),
                  auto_ack=True)
       
              channel.start_consuming()
       
          def stop(self):
              try:
                  return True if self.queue.get(timeout=0.05) is True else False
              except Empty:
                  return False
       
          def _get_channel(self):
              credentials = pika.PlainCredentials(
                  username=os.getenv('RABBITMQ_USER'),
                  password=os.getenv('RABBITMQ_PASS'))
       
              parameters = pika.ConnectionParameters(
                  host=os.getenv('RABBITMQ_HOST'),
                  credentials=credentials)
       
              connection = pika.BlockingConnection(parameters=parameters)
       
              return connection.channel()
      

      Now in the main process I start the Listener and I enter into one endless loop to print my asterisk each second but at the end of each loop I check if I need to stop the process or not.

      from Listener import Listener
      from dotenv import load_dotenv
      import logging
      from time import sleep
      import os
       
      logging.basicConfig(level=logging.INFO)
       
      current_dir = os.path.dirname(os.path.abspath(__file__))
      load_dotenv(dotenv_path="{}/.env".format(current_dir))
       
      l = Listener()
      l.start()
       
       
      def main():
          while True:
              logging.info("*")
              sleep(1)
              if l.stop():
                  break
       
       
      if __name__ == '__main__':
          main()
      

      As we can see in the stop function we’re using the queue.Queue package to communicate with our listener loop.

      And that’s all. In the example I also provide a minimal RabbitMQ server in a docker container.

      version: '3.4'
       
      services:
        rabbit:
          image: rabbitmq:3-management
          restart: always
          environment:
            RABBITMQ_ERLANG_COOKIE:
            RABBITMQ_DEFAULT_VHOST: /
            RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
            RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS}
          ports:
            - "15672:15672"
            - "5672:5672"
      

      And that’s all. Full source code available here

      comments powered by Disqus