Categories
Python

Kubernetes, moving from an entrypoint.sh to supervisord

For reasons (specifically that my kubenetes hosts have been up and down lately) I have had to harden my deploy.

First thing and the most important was to stop the graceful shutdowns of my rabbitMQ connections. I realised it would be way better to just sleep and retry on _any_ rabbitMQ connection error, particularly if rabbitMQ shut down gracefully.

Previously I was assuming that if rabbitMQ was shutting down gracefully then the whole app was.

How wrong I was, I was informed in no uncertain terms that I should be prepared for individual containers to be shut down and restarted without the whole pod getting a restart.

To that end the following changes were needed:


def getJobs():
    l.info("Running get Jobs")
    while True:
        try:
            global connection
            global channelM
            connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
            channel = connection.channel()
            channelM = connection.channel()
            channel.queue_declare(queue=myRabbitQueue)
            channel.basic_consume(queue=myRabbitQueue, auto_ack=True, on_message_callback=callback)
            l.info("Will now continue to run")
            channel.start_consuming()
        # Don't recover if connection was closed by broker
        except pika.exceptions.ConnectionClosedByBroker:
            l.error('Rabbit error connection closed by broker')
            break
        # Don't recover on channel errors
        except pika.exceptions.AMQPChannelError:
            l.error('Rabbit error channel error')
            break
        # Recover on all other connection errors
        except pika.exceptions.AMQPConnectionError:
            l.error('Retrying rabbitMQ listener')
            continue

to

def getJobs():
    l.info("Running get Jobs")
    while True:
        try:
            global connection
            global channelM
            connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
            channel = connection.channel()
            channelM = connection.channel()
            channel.queue_declare(queue=myRabbitQueue)
            channel.basic_consume(queue=myRabbitQueue, auto_ack=True, on_message_callback=callback)
            l.info("Will now continue to run")
            channel.start_consuming()
        # Recover on all other connection errors
        except pika.exceptions.AMQPConnectionError:
            l.error('Retrying rabbitMQ listener')
            continue
        except Exception as err:
            l.error(f'Error in connecting to rabbitmq, going to retry: {err}')
            sleep(5)

Note the removal of the “# Don't recover on channel errors” where it breaks. The Break broke the whole dam run loop!

And this brought me to my next problem. My app just wasn’t shutting down cleanly. After much messing I realised that the entrypoint.sh script (it had a few “wait-for-its” then started and backgrounded my various python modules) was not passing SIGTERM, so my app wasn’t shutting down properly at all.

Because of how it was built, this never mattered, but it always took the 60 seconds to destroy when I was pushing a new version… and it was a bit annoying to have it “not be workin proper” as they say.

So I am moving to supervisord. But if there’s one this I dislike it’s config files. So let me present a “translator” from entrypoint.sh to a supervisord conf file. It’s not perfect, but it beats writing it by hand. I hope someone else finds it useful.

What is confusing to me however, is it always seems so hard do just get simple best practices in deploying a simple-ish python app to kubernetes. I thought the whole point was to make it easier for developers. If I was running my app in a VM I wouldn’t have to care about any of this stuff!

I think the reason for this is I wrote the python app from scratch, and didn’t use a framework that had all the default configs ready to go!

Here’s the helper, I hope it helps:

def generate_supervisord_conf(entrypoint_file):    
    supervisord_conf = "[supervisord]\n"
    supervisord_conf += "nodaemon=true\n"
    supervisord_conf += "logfile=/dev/null\n"
    supervisord_conf += "logfile_maxbytes=0\n"
    supervisord_conf += "user=root\n\n"
    
    programs = []
    with open(entrypoint_file, "r") as f:
        lines = f.readlines()
        for line in lines:
            if line.startswith("pipenv run"):                                                                                                                                                                                                
                program_name = line.split(" ")[2].rstrip("\n").split("/")[-1].split(".")[0]    
                programs.append(program_name)    
        
    for program in programs:    
        program_conf = f"[program:{program}]\n"    
        program_conf += f"command=/usr/local/bin/pipenv run /app/{program}.py\n"    
        program_conf += "autostart=true\n"
        program_conf += "redirect_stderr=true\n"
        program_conf += "stdout_logfile=/dev/fd/1\n"
        program_conf += "stdout_logfile_maxbytes=0\n"
        #program_conf += "depends_on=is_it_up\n" # This is the wait-for-it script
        program_conf += "autorestart=true\n\n"    
        supervisord_conf += program_conf    
        
    return supervisord_conf    
    
    
entrypoint_file = "./entrypoint.sh"    
supervisord_conf = generate_supervisord_conf(entrypoint_file)    
    
with open("supervisord.conf", "w") as f:    
    f.write(supervisord_conf)    
    

I find myself liking programmatic generation more and more!

UPDATE! I forgot about the annoying logging problems when using supervisord… please see https://docs.docker.com/config/containers/multi-service_container/ I have updated the code above

Categories
Python

Reducing boilerplate with RabbitMQ and Python

Following on from my last post about sending Rabbitmq messages with shared code in python (caveat this is not for >100 message per second requirements).

Here is how to listen to a queue (and use it), with a reduced amount of boilerplate.

I also learned you can pass a function to an imported module to get that module to “callback” to the parent… to the code!!

This is the shared code (that lives in a .py file and is imported as from blah.py import *

#TheLawsandOrdinancesoftheCitiesAnkhandMorpork.py

def getJobs(officer, callback):
    l.info(f'Waiting for jobs for {officer}')
    while True:
        try:
            with pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) as connection:
                connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
                channel = connection.channel()
                channel.queue_declare(queue=officer)
                channel.basic_consume(queue=officer, auto_ack=True, on_message_callback=callback)
                l.info(f'{officer} reporting for duty sir')
                channel.start_consuming()
        # Don't recover if connection was closed by broker
        except pika.exceptions.ConnectionClosedByBroker:
            l.error('Rabbit error connection closed by broker')
            break
        # Don't recover on channel errors
        except pika.exceptions.AMQPChannelError:
            l.error('Rabbit error channel error')
            break
        # Recover on all other connection errors
        except pika.exceptions.AMQPConnectionError:
            l.error('Retrying rabbitMQ listener')
            continue

This ^^^ stuff was boilerplate in my last app, now I can reuse the code with the following… in the actual running app:

#Detritus.py
from TheLawsandOrdinancesoftheCitiesAnkhandMorpork import *

def callback(ch, method, properties, body):
    patrol = body.decode()                 
    print(f'I have received {patrol}')     

#Then from main() just call the shared code
main():
    officer = 'Detritus'
    getJobs(officer, callback)
    #This will then live in a while loop and "callback" to the supplied callback function. All you need then is a bit of error handling

# Standard boilerplate to call the main() function to begin
# the program.
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Keyboard inturrupt')
        l.warning("Keyboard inturrupt")
        try:
            connection.close()
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Again I hope the above formatting comes out well. I’m not sure if this is ok for high volume stuff, but for what I am doing (cheap parallelisation and using a messaging programming paradigm) it works good for me!

Categories
Python

Using RabbitMQ (pika) with Python

I have struggled finding what I would consider a nice way to connect to Rabbit MQ from a python program.

Some issues I have found that slack overflow answers are not great if you are reusing connections or have something that isn’t just a single sending program.

My use case, where I am programming around a narrative (in this new project I am using “The Watch” from Terry Pratchett’s Discworld.

In my last project I had what amounted to boiler plate code in each “Character” (python app). For this new project (the Quantum one), I wanted something a bit easier and cleaner.

So let me introduce TheLawsandOrdinancesoftheCitiesAnkhandMorpork.py

This is a “helper” module that I will import * from. This is how I wrote the messaging side. This produces less crap in the Rabbit MQ logs than calling a channel.close() also:

import pika
import json
import logging as l
exchange = ''

def sendMessage(person, patrol):                                                           
    try:                                                                                   
        patrol = json.dumps(patrol)                                                        
    except Exception as err:                                                               
        l.error(f'could not dump the patrol to a json')                                    
    try:                                                                                   
        with pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) as connection: 
            #connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))   
            channelM = connection.channel()                                                
            channelM.queue_declare(queue=person)                                           
            channelM.basic_publish(exchange=exchange, routing_key=person, body=patrol)     
    except Exception as err:                                                               
        l.error(f'Problem sending rabbit message: {err}')                                  

On my screen that is a bit hard to read;

What I am doing is a function that takes the name of a “character” (ie. a queue) and a dict. I then encode this as a JSON, and using the “with” block, send it to the correct queue.

By avoiding the connection.close() call I no longer get and error in rabbitMQ saying connection closed unexpectedly.

It may not be the most efficient, not reusing an existing connection. But the connection bringup is tiny, and I would rather take cleanliness over efficiency in this case.

I will be running quantum simulations so the bottleneck will not be here!

Categories
Python

Importance of being defensive when contacting external services

I just got caught by a “reliable” internal service which started to give timeouts.

I never configured a timeout on the connection (default was many minutes) which jammed the whole program.
It’s important to set aggressive timeouts in prod, better to error and figure out a way to accommodate the error than just wait.

Perhaps the next step is to make my program internally defensive in order to combat my poor coding skills.

Categories
Python quick

Working in prod

Look, I’m not, a professional let us say. Sometimes I work in prod shell.

In order to keep myself aware of same, make this little change to the .tmux file so you always have a visual queue that you are in prod:

# Status Bar
set -g status-bg red
(as opposed to black)
Categories
Python

Nested Dict Gets in python

Update- the below doesn’t work at all actually.. you do need to do a try-except block cause if any of the values are, for example NONE. The whole thing craps out with an exception.

I’ll leave the below to…. Show my workings!

Trying to figure out the best way to parse out complex JSON.

Saying

Level = issue.get('fields').get('customfield_1', 'NA')

Doesn’t actually work if ‘fields’ is missing. Error is:

AttributeError: 'NoneType' object has no attribute 'get'

you actually need to say:

In [1]: d = {'parent':{'test':True}}

In [2]: d
Out[2]: {'parent': {'test': True}}

In [3]: d.get('parent')
Out[3]: {'test': True}

In [4]: d.get('parentt')

In [5]: d.get('parent').get('test')
Out[5]: True

In [6]: d.get('parenT').get('test')
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-6-142a5af0ba34> in <module>
----> 1 d.get('parenT').get('test')

AttributeError: 'NoneType' object has no attribute 'get'

In [7]: d.get('parenT', {}).get('test')

In other words, in the case of nested gets, make sure the second attribute (the one that returns if your key is missing) is an empty dict… ie {}

Categories
Python

Pop nested dict in python

I have struggled to find the following examples:

hello = {'a':{'b':2, 'c':3}}
#I want to pop c (which is nested in a)
hello.get('a').pop('c', None)

This took me too long to figure out

Categories
Python

Bury the body

What should we do with the body?

Keep sending it back to Watson to check state until Watson burys it.

But… what happens if the body is floating around twice?

We have a problem here. This is option 1:

^^^ try follow that shit. Problem is you end up delaying processing an update best case scenario. Worst case is the body is lost somewhere due to an error and the whole thing Jams up.

Option 2:
Ahh, but what about the caseinfo? If I have sent on the body+case already I will loose data. Actually I am making sure I loose data… I will ALWAYS loose data doing this

I decided on option 3…

TBC

Categories
Python

Messaging and time

Remember, there is no guaranteed delivery time of a message; The destination module may be broken, or it may be busy.

On data changes verify the change prior to making it in a way that fixes the amount of time between validation and edit.

ie.

findings = body['caseinfo']['findings']
if 'Needs to be assigned to new team' in findings:                                    
    l.info(f'Moriorty to assign {body["ticket"]} to new team')                        
    teamAssignedField = 'customfield_1XXX'                                          
    #Need to validate current state as there are no time guarantees in the message   
    issue = getJiraTicketInfo(body['issueURL'])      # <--- this is a function within the module code, no messaging needed                                
    teamCurrent = ''                                                                 
    try:                                                                             
        teamCurrent = issue['fields']['customfield_10311']['name']   #who knows what crap will be returned                
    except:                                                                          
        teamCurrent = ''                                                             
    if teamCurrent == 'old-team':                                                      
        l.info(f'Validated {body["ticket"]} is assigned to old-team')                  
        putData = {}                                                                 
        putData['fields'] = {teamAssignedField : 'new-field'}          
        pushUpdate = postJiraTicketInfo(body, putData)  # <=== also a function in the module code                             

This way I don’t have to worry about material delays. Ideally the api should really accept a validation value but it doesn’t. The change will only be made if the data is in the same (relevant) state as when the original check took place .

Because of this the various modules tend to interact with one system, both reads AND writes….. Except the slack integration of course, I have Lestrad sending messages and Lestrad’s phone answering them 🙂

One issue I can see if the value in one system depends on the state of another system… but I’m not sure if that is an actual issue, as you could just send the body back through the processing chain to double check afterwards maybe… I’ll burn that bridge when I get to it.

Categories
Python

Tracking the body

I seem to be having a bit of an issue regarding the messaging paradigm.

Essentially, I am finding that as there is no time guarentees of events getting processed, we have to think about the flow of the “body”.

I am calling my message body when it is the incident + case info

I have to be careful about “duplicating” the body, that is, sending the message away in a queue, and continuing to take action here. as they will loose sync and state.

I would rather just try to ensure the statelessness nature of the message processing code.

In other words, Watson will mark a post as a “first post”, send it to Lestrad to post as a first post.

Initially I wanted to continue processing the body in Watson to further establish state, but there is no way to get the initial time stamp of the initial slack post back into the body that stayed in watson.

Although the timestamps are in postgres and pushed into redis, there is NO guarantee about how long that will take, and no clean way to check if the action has taken place yet or not….

because… I forgot to mention that only one module is allowed talk to redis and postgres. This may well be a terrible architectural decision….

For now, the only solution is to send the body back to watson from Lestrad with the new TS, and let Watson re-establish the state.

Left is Watson, middle is Lestrad, and right is the hound