Categories
Python

Reducing boilerplate with RabbitMQ and Python

Following on from my last post about sending Rabbitmq messages with shared code in python (caveat this is not for >100 message per second requirements).

Here is how to listen to a queue (and use it), with a reduced amount of boilerplate.

I also learned you can pass a function to an imported module to get that module to “callback” to the parent… to the code!!

This is the shared code (that lives in a .py file and is imported as from blah.py import *

#TheLawsandOrdinancesoftheCitiesAnkhandMorpork.py

def getJobs(officer, callback):
    l.info(f'Waiting for jobs for {officer}')
    while True:
        try:
            with pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) as connection:
                connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
                channel = connection.channel()
                channel.queue_declare(queue=officer)
                channel.basic_consume(queue=officer, auto_ack=True, on_message_callback=callback)
                l.info(f'{officer} reporting for duty sir')
                channel.start_consuming()
        # Don't recover if connection was closed by broker
        except pika.exceptions.ConnectionClosedByBroker:
            l.error('Rabbit error connection closed by broker')
            break
        # Don't recover on channel errors
        except pika.exceptions.AMQPChannelError:
            l.error('Rabbit error channel error')
            break
        # Recover on all other connection errors
        except pika.exceptions.AMQPConnectionError:
            l.error('Retrying rabbitMQ listener')
            continue

This ^^^ stuff was boilerplate in my last app, now I can reuse the code with the following… in the actual running app:

#Detritus.py
from TheLawsandOrdinancesoftheCitiesAnkhandMorpork import *

def callback(ch, method, properties, body):
    patrol = body.decode()                 
    print(f'I have received {patrol}')     

#Then from main() just call the shared code
main():
    officer = 'Detritus'
    getJobs(officer, callback)
    #This will then live in a while loop and "callback" to the supplied callback function. All you need then is a bit of error handling

# Standard boilerplate to call the main() function to begin
# the program.
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Keyboard inturrupt')
        l.warning("Keyboard inturrupt")
        try:
            connection.close()
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Again I hope the above formatting comes out well. I’m not sure if this is ok for high volume stuff, but for what I am doing (cheap parallelisation and using a messaging programming paradigm) it works good for me!