Categories
Python

Kubernetes, moving from an entrypoint.sh to supervisord

For reasons (specifically that my kubenetes hosts have been up and down lately) I have had to harden my deploy.

First thing and the most important was to stop the graceful shutdowns of my rabbitMQ connections. I realised it would be way better to just sleep and retry on _any_ rabbitMQ connection error, particularly if rabbitMQ shut down gracefully.

Previously I was assuming that if rabbitMQ was shutting down gracefully then the whole app was.

How wrong I was, I was informed in no uncertain terms that I should be prepared for individual containers to be shut down and restarted without the whole pod getting a restart.

To that end the following changes were needed:


def getJobs():
    l.info("Running get Jobs")
    while True:
        try:
            global connection
            global channelM
            connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
            channel = connection.channel()
            channelM = connection.channel()
            channel.queue_declare(queue=myRabbitQueue)
            channel.basic_consume(queue=myRabbitQueue, auto_ack=True, on_message_callback=callback)
            l.info("Will now continue to run")
            channel.start_consuming()
        # Don't recover if connection was closed by broker
        except pika.exceptions.ConnectionClosedByBroker:
            l.error('Rabbit error connection closed by broker')
            break
        # Don't recover on channel errors
        except pika.exceptions.AMQPChannelError:
            l.error('Rabbit error channel error')
            break
        # Recover on all other connection errors
        except pika.exceptions.AMQPConnectionError:
            l.error('Retrying rabbitMQ listener')
            continue

to

def getJobs():
    l.info("Running get Jobs")
    while True:
        try:
            global connection
            global channelM
            connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
            channel = connection.channel()
            channelM = connection.channel()
            channel.queue_declare(queue=myRabbitQueue)
            channel.basic_consume(queue=myRabbitQueue, auto_ack=True, on_message_callback=callback)
            l.info("Will now continue to run")
            channel.start_consuming()
        # Recover on all other connection errors
        except pika.exceptions.AMQPConnectionError:
            l.error('Retrying rabbitMQ listener')
            continue
        except Exception as err:
            l.error(f'Error in connecting to rabbitmq, going to retry: {err}')
            sleep(5)

Note the removal of the “# Don't recover on channel errors” where it breaks. The Break broke the whole dam run loop!

And this brought me to my next problem. My app just wasn’t shutting down cleanly. After much messing I realised that the entrypoint.sh script (it had a few “wait-for-its” then started and backgrounded my various python modules) was not passing SIGTERM, so my app wasn’t shutting down properly at all.

Because of how it was built, this never mattered, but it always took the 60 seconds to destroy when I was pushing a new version… and it was a bit annoying to have it “not be workin proper” as they say.

So I am moving to supervisord. But if there’s one this I dislike it’s config files. So let me present a “translator” from entrypoint.sh to a supervisord conf file. It’s not perfect, but it beats writing it by hand. I hope someone else finds it useful.

What is confusing to me however, is it always seems so hard do just get simple best practices in deploying a simple-ish python app to kubernetes. I thought the whole point was to make it easier for developers. If I was running my app in a VM I wouldn’t have to care about any of this stuff!

I think the reason for this is I wrote the python app from scratch, and didn’t use a framework that had all the default configs ready to go!

Here’s the helper, I hope it helps:

def generate_supervisord_conf(entrypoint_file):    
    supervisord_conf = "[supervisord]\n"
    supervisord_conf += "nodaemon=true\n"
    supervisord_conf += "logfile=/dev/null\n"
    supervisord_conf += "logfile_maxbytes=0\n"
    supervisord_conf += "user=root\n\n"
    
    programs = []
    with open(entrypoint_file, "r") as f:
        lines = f.readlines()
        for line in lines:
            if line.startswith("pipenv run"):                                                                                                                                                                                                
                program_name = line.split(" ")[2].rstrip("\n").split("/")[-1].split(".")[0]    
                programs.append(program_name)    
        
    for program in programs:    
        program_conf = f"[program:{program}]\n"    
        program_conf += f"command=/usr/local/bin/pipenv run /app/{program}.py\n"    
        program_conf += "autostart=true\n"
        program_conf += "redirect_stderr=true\n"
        program_conf += "stdout_logfile=/dev/fd/1\n"
        program_conf += "stdout_logfile_maxbytes=0\n"
        #program_conf += "depends_on=is_it_up\n" # This is the wait-for-it script
        program_conf += "autorestart=true\n\n"    
        supervisord_conf += program_conf    
        
    return supervisord_conf    
    
    
entrypoint_file = "./entrypoint.sh"    
supervisord_conf = generate_supervisord_conf(entrypoint_file)    
    
with open("supervisord.conf", "w") as f:    
    f.write(supervisord_conf)    
    

I find myself liking programmatic generation more and more!

UPDATE! I forgot about the annoying logging problems when using supervisord… please see https://docs.docker.com/config/containers/multi-service_container/ I have updated the code above

Categories
Python

Reducing boilerplate with RabbitMQ and Python

Following on from my last post about sending Rabbitmq messages with shared code in python (caveat this is not for >100 message per second requirements).

Here is how to listen to a queue (and use it), with a reduced amount of boilerplate.

I also learned you can pass a function to an imported module to get that module to “callback” to the parent… to the code!!

This is the shared code (that lives in a .py file and is imported as from blah.py import *

#TheLawsandOrdinancesoftheCitiesAnkhandMorpork.py

def getJobs(officer, callback):
    l.info(f'Waiting for jobs for {officer}')
    while True:
        try:
            with pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) as connection:
                connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
                channel = connection.channel()
                channel.queue_declare(queue=officer)
                channel.basic_consume(queue=officer, auto_ack=True, on_message_callback=callback)
                l.info(f'{officer} reporting for duty sir')
                channel.start_consuming()
        # Don't recover if connection was closed by broker
        except pika.exceptions.ConnectionClosedByBroker:
            l.error('Rabbit error connection closed by broker')
            break
        # Don't recover on channel errors
        except pika.exceptions.AMQPChannelError:
            l.error('Rabbit error channel error')
            break
        # Recover on all other connection errors
        except pika.exceptions.AMQPConnectionError:
            l.error('Retrying rabbitMQ listener')
            continue

This ^^^ stuff was boilerplate in my last app, now I can reuse the code with the following… in the actual running app:

#Detritus.py
from TheLawsandOrdinancesoftheCitiesAnkhandMorpork import *

def callback(ch, method, properties, body):
    patrol = body.decode()                 
    print(f'I have received {patrol}')     

#Then from main() just call the shared code
main():
    officer = 'Detritus'
    getJobs(officer, callback)
    #This will then live in a while loop and "callback" to the supplied callback function. All you need then is a bit of error handling

# Standard boilerplate to call the main() function to begin
# the program.
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Keyboard inturrupt')
        l.warning("Keyboard inturrupt")
        try:
            connection.close()
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Again I hope the above formatting comes out well. I’m not sure if this is ok for high volume stuff, but for what I am doing (cheap parallelisation and using a messaging programming paradigm) it works good for me!