Following on from my last post about sending Rabbitmq messages with shared code in python (caveat this is not for >100 message per second requirements).
Here is how to listen to a queue (and use it), with a reduced amount of boilerplate.
I also learned you can pass a function to an imported module to get that module to “callback” to the parent… to the code!!
This is the shared code (that lives in a .py file and is imported as from blah.py import *
#TheLawsandOrdinancesoftheCitiesAnkhandMorpork.py
def getJobs(officer, callback):
l.info(f'Waiting for jobs for {officer}')
while True:
try:
with pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) as connection:
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel()
channel.queue_declare(queue=officer)
channel.basic_consume(queue=officer, auto_ack=True, on_message_callback=callback)
l.info(f'{officer} reporting for duty sir')
channel.start_consuming()
# Don't recover if connection was closed by broker
except pika.exceptions.ConnectionClosedByBroker:
l.error('Rabbit error connection closed by broker')
break
# Don't recover on channel errors
except pika.exceptions.AMQPChannelError:
l.error('Rabbit error channel error')
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
l.error('Retrying rabbitMQ listener')
continue
This ^^^ stuff was boilerplate in my last app, now I can reuse the code with the following… in the actual running app:
#Detritus.py
from TheLawsandOrdinancesoftheCitiesAnkhandMorpork import *
def callback(ch, method, properties, body):
patrol = body.decode()
print(f'I have received {patrol}')
#Then from main() just call the shared code
main():
officer = 'Detritus'
getJobs(officer, callback)
#This will then live in a while loop and "callback" to the supplied callback function. All you need then is a bit of error handling
# Standard boilerplate to call the main() function to begin
# the program.
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Keyboard inturrupt')
l.warning("Keyboard inturrupt")
try:
connection.close()
sys.exit(0)
except SystemExit:
os._exit(0)
Again I hope the above formatting comes out well. I’m not sure if this is ok for high volume stuff, but for what I am doing (cheap parallelisation and using a messaging programming paradigm) it works good for me!