Using RabbitMQ (pika) with Python

I have struggled finding what I would consider a nice way to connect to Rabbit MQ from a python program.

Some issues I have found that slack overflow answers are not great if you are reusing connections or have something that isn’t just a single sending program.

My use case, where I am programming around a narrative (in this new project I am using “The Watch” from Terry Pratchett’s Discworld.

In my last project I had what amounted to boiler plate code in each “Character” (python app). For this new project (the Quantum one), I wanted something a bit easier and cleaner.

So let me introduce

This is a “helper” module that I will import * from. This is how I wrote the messaging side. This produces less crap in the Rabbit MQ logs than calling a channel.close() also:

import pika
import json
import logging as l
exchange = ''

def sendMessage(person, patrol):                                                           
        patrol = json.dumps(patrol)                                                        
    except Exception as err:                                                               
        l.error(f'could not dump the patrol to a json')                                    
        with pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) as connection: 
            #connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))   
            channelM =                                                
            channelM.basic_publish(exchange=exchange, routing_key=person, body=patrol)     
    except Exception as err:                                                               
        l.error(f'Problem sending rabbit message: {err}')                                  

On my screen that is a bit hard to read;

What I am doing is a function that takes the name of a “character” (ie. a queue) and a dict. I then encode this as a JSON, and using the “with” block, send it to the correct queue.

By avoiding the connection.close() call I no longer get and error in rabbitMQ saying connection closed unexpectedly.

It may not be the most efficient, not reusing an existing connection. But the connection bringup is tiny, and I would rather take cleanliness over efficiency in this case.

I will be running quantum simulations so the bottleneck will not be here!


Bury the body

What should we do with the body?

Keep sending it back to Watson to check state until Watson burys it.

But… what happens if the body is floating around twice?

We have a problem here. This is option 1:

^^^ try follow that shit. Problem is you end up delaying processing an update best case scenario. Worst case is the body is lost somewhere due to an error and the whole thing Jams up.

Option 2:
Ahh, but what about the caseinfo? If I have sent on the body+case already I will loose data. Actually I am making sure I loose data… I will ALWAYS loose data doing this

I decided on option 3…



Messaging and time

Remember, there is no guaranteed delivery time of a message; The destination module may be broken, or it may be busy.

On data changes verify the change prior to making it in a way that fixes the amount of time between validation and edit.


findings = body['caseinfo']['findings']
if 'Needs to be assigned to new team' in findings:                               'Moriorty to assign {body["ticket"]} to new team')                        
    teamAssignedField = 'customfield_1XXX'                                          
    #Need to validate current state as there are no time guarantees in the message   
    issue = getJiraTicketInfo(body['issueURL'])      # <--- this is a function within the module code, no messaging needed                                
    teamCurrent = ''                                                                 
        teamCurrent = issue['fields']['customfield_10311']['name']   #who knows what crap will be returned                
        teamCurrent = ''                                                             
    if teamCurrent == 'old-team':                                                     'Validated {body["ticket"]} is assigned to old-team')                  
        putData = {}                                                                 
        putData['fields'] = {teamAssignedField : 'new-field'}          
        pushUpdate = postJiraTicketInfo(body, putData)  # <=== also a function in the module code                             

This way I don’t have to worry about material delays. Ideally the api should really accept a validation value but it doesn’t. The change will only be made if the data is in the same (relevant) state as when the original check took place .

Because of this the various modules tend to interact with one system, both reads AND writes….. Except the slack integration of course, I have Lestrad sending messages and Lestrad’s phone answering them 🙂

One issue I can see if the value in one system depends on the state of another system… but I’m not sure if that is an actual issue, as you could just send the body back through the processing chain to double check afterwards maybe… I’ll burn that bridge when I get to it.


Tracking the body

I seem to be having a bit of an issue regarding the messaging paradigm.

Essentially, I am finding that as there is no time guarentees of events getting processed, we have to think about the flow of the “body”.

I am calling my message body when it is the incident + case info

I have to be careful about “duplicating” the body, that is, sending the message away in a queue, and continuing to take action here. as they will loose sync and state.

I would rather just try to ensure the statelessness nature of the message processing code.

In other words, Watson will mark a post as a “first post”, send it to Lestrad to post as a first post.

Initially I wanted to continue processing the body in Watson to further establish state, but there is no way to get the initial time stamp of the initial slack post back into the body that stayed in watson.

Although the timestamps are in postgres and pushed into redis, there is NO guarantee about how long that will take, and no clean way to check if the action has taken place yet or not….

because… I forgot to mention that only one module is allowed talk to redis and postgres. This may well be a terrible architectural decision….

For now, the only solution is to send the body back to watson from Lestrad with the new TS, and let Watson re-establish the state.

Left is Watson, middle is Lestrad, and right is the hound


Messaging paradigm in Python

I’m not a fan of async when programming in Python. Although I have programmed in C++, I am not a fan of classes. By not a fan I mean I have no idea how to use them.

What I have is a goal and a need. The idea is using the messaging paradigm as a structure of Python coding. This and a functional aspect.


pipenv, docker-compose, rabbitMQ for messaging, postgres for some “data” data and redis for some state data (ie. persistent key values


Idea is that the “app” can be stateless. Each .py program performs a task based on message traffic received, interacts with one system or does one thing, and then sends back a message.
At them moment I am sending the “initial message” with a trigger script called by cron.