Categories
Python

Kubernetes, moving from an entrypoint.sh to supervisord

For reasons (specifically that my kubenetes hosts have been up and down lately) I have had to harden my deploy.

First thing and the most important was to stop the graceful shutdowns of my rabbitMQ connections. I realised it would be way better to just sleep and retry on _any_ rabbitMQ connection error, particularly if rabbitMQ shut down gracefully.

Previously I was assuming that if rabbitMQ was shutting down gracefully then the whole app was.

How wrong I was, I was informed in no uncertain terms that I should be prepared for individual containers to be shut down and restarted without the whole pod getting a restart.

To that end the following changes were needed:


def getJobs():
    l.info("Running get Jobs")
    while True:
        try:
            global connection
            global channelM
            connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
            channel = connection.channel()
            channelM = connection.channel()
            channel.queue_declare(queue=myRabbitQueue)
            channel.basic_consume(queue=myRabbitQueue, auto_ack=True, on_message_callback=callback)
            l.info("Will now continue to run")
            channel.start_consuming()
        # Don't recover if connection was closed by broker
        except pika.exceptions.ConnectionClosedByBroker:
            l.error('Rabbit error connection closed by broker')
            break
        # Don't recover on channel errors
        except pika.exceptions.AMQPChannelError:
            l.error('Rabbit error channel error')
            break
        # Recover on all other connection errors
        except pika.exceptions.AMQPConnectionError:
            l.error('Retrying rabbitMQ listener')
            continue

to

def getJobs():
    l.info("Running get Jobs")
    while True:
        try:
            global connection
            global channelM
            connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
            channel = connection.channel()
            channelM = connection.channel()
            channel.queue_declare(queue=myRabbitQueue)
            channel.basic_consume(queue=myRabbitQueue, auto_ack=True, on_message_callback=callback)
            l.info("Will now continue to run")
            channel.start_consuming()
        # Recover on all other connection errors
        except pika.exceptions.AMQPConnectionError:
            l.error('Retrying rabbitMQ listener')
            continue
        except Exception as err:
            l.error(f'Error in connecting to rabbitmq, going to retry: {err}')
            sleep(5)

Note the removal of the “# Don't recover on channel errors” where it breaks. The Break broke the whole dam run loop!

And this brought me to my next problem. My app just wasn’t shutting down cleanly. After much messing I realised that the entrypoint.sh script (it had a few “wait-for-its” then started and backgrounded my various python modules) was not passing SIGTERM, so my app wasn’t shutting down properly at all.

Because of how it was built, this never mattered, but it always took the 60 seconds to destroy when I was pushing a new version… and it was a bit annoying to have it “not be workin proper” as they say.

So I am moving to supervisord. But if there’s one this I dislike it’s config files. So let me present a “translator” from entrypoint.sh to a supervisord conf file. It’s not perfect, but it beats writing it by hand. I hope someone else finds it useful.

What is confusing to me however, is it always seems so hard do just get simple best practices in deploying a simple-ish python app to kubernetes. I thought the whole point was to make it easier for developers. If I was running my app in a VM I wouldn’t have to care about any of this stuff!

I think the reason for this is I wrote the python app from scratch, and didn’t use a framework that had all the default configs ready to go!

Here’s the helper, I hope it helps:

def generate_supervisord_conf(entrypoint_file):    
    supervisord_conf = "[supervisord]\n"
    supervisord_conf += "nodaemon=true\n"
    supervisord_conf += "logfile=/dev/null\n"
    supervisord_conf += "logfile_maxbytes=0\n"
    supervisord_conf += "user=root\n\n"
    
    programs = []
    with open(entrypoint_file, "r") as f:
        lines = f.readlines()
        for line in lines:
            if line.startswith("pipenv run"):                                                                                                                                                                                                
                program_name = line.split(" ")[2].rstrip("\n").split("/")[-1].split(".")[0]    
                programs.append(program_name)    
        
    for program in programs:    
        program_conf = f"[program:{program}]\n"    
        program_conf += f"command=/usr/local/bin/pipenv run /app/{program}.py\n"    
        program_conf += "autostart=true\n"
        program_conf += "redirect_stderr=true\n"
        program_conf += "stdout_logfile=/dev/fd/1\n"
        program_conf += "stdout_logfile_maxbytes=0\n"
        #program_conf += "depends_on=is_it_up\n" # This is the wait-for-it script
        program_conf += "autorestart=true\n\n"    
        supervisord_conf += program_conf    
        
    return supervisord_conf    
    
    
entrypoint_file = "./entrypoint.sh"    
supervisord_conf = generate_supervisord_conf(entrypoint_file)    
    
with open("supervisord.conf", "w") as f:    
    f.write(supervisord_conf)    
    

I find myself liking programmatic generation more and more!

UPDATE! I forgot about the annoying logging problems when using supervisord… please see https://docs.docker.com/config/containers/multi-service_container/ I have updated the code above

Categories
Python

Reducing boilerplate with RabbitMQ and Python

Following on from my last post about sending Rabbitmq messages with shared code in python (caveat this is not for >100 message per second requirements).

Here is how to listen to a queue (and use it), with a reduced amount of boilerplate.

I also learned you can pass a function to an imported module to get that module to “callback” to the parent… to the code!!

This is the shared code (that lives in a .py file and is imported as from blah.py import *

#TheLawsandOrdinancesoftheCitiesAnkhandMorpork.py

def getJobs(officer, callback):
    l.info(f'Waiting for jobs for {officer}')
    while True:
        try:
            with pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) as connection:
                connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
                channel = connection.channel()
                channel.queue_declare(queue=officer)
                channel.basic_consume(queue=officer, auto_ack=True, on_message_callback=callback)
                l.info(f'{officer} reporting for duty sir')
                channel.start_consuming()
        # Don't recover if connection was closed by broker
        except pika.exceptions.ConnectionClosedByBroker:
            l.error('Rabbit error connection closed by broker')
            break
        # Don't recover on channel errors
        except pika.exceptions.AMQPChannelError:
            l.error('Rabbit error channel error')
            break
        # Recover on all other connection errors
        except pika.exceptions.AMQPConnectionError:
            l.error('Retrying rabbitMQ listener')
            continue

This ^^^ stuff was boilerplate in my last app, now I can reuse the code with the following… in the actual running app:

#Detritus.py
from TheLawsandOrdinancesoftheCitiesAnkhandMorpork import *

def callback(ch, method, properties, body):
    patrol = body.decode()                 
    print(f'I have received {patrol}')     

#Then from main() just call the shared code
main():
    officer = 'Detritus'
    getJobs(officer, callback)
    #This will then live in a while loop and "callback" to the supplied callback function. All you need then is a bit of error handling

# Standard boilerplate to call the main() function to begin
# the program.
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Keyboard inturrupt')
        l.warning("Keyboard inturrupt")
        try:
            connection.close()
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Again I hope the above formatting comes out well. I’m not sure if this is ok for high volume stuff, but for what I am doing (cheap parallelisation and using a messaging programming paradigm) it works good for me!

Categories
Python

Using RabbitMQ (pika) with Python

I have struggled finding what I would consider a nice way to connect to Rabbit MQ from a python program.

Some issues I have found that slack overflow answers are not great if you are reusing connections or have something that isn’t just a single sending program.

My use case, where I am programming around a narrative (in this new project I am using “The Watch” from Terry Pratchett’s Discworld.

In my last project I had what amounted to boiler plate code in each “Character” (python app). For this new project (the Quantum one), I wanted something a bit easier and cleaner.

So let me introduce TheLawsandOrdinancesoftheCitiesAnkhandMorpork.py

This is a “helper” module that I will import * from. This is how I wrote the messaging side. This produces less crap in the Rabbit MQ logs than calling a channel.close() also:

import pika
import json
import logging as l
exchange = ''

def sendMessage(person, patrol):                                                           
    try:                                                                                   
        patrol = json.dumps(patrol)                                                        
    except Exception as err:                                                               
        l.error(f'could not dump the patrol to a json')                                    
    try:                                                                                   
        with pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) as connection: 
            #connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))   
            channelM = connection.channel()                                                
            channelM.queue_declare(queue=person)                                           
            channelM.basic_publish(exchange=exchange, routing_key=person, body=patrol)     
    except Exception as err:                                                               
        l.error(f'Problem sending rabbit message: {err}')                                  

On my screen that is a bit hard to read;

What I am doing is a function that takes the name of a “character” (ie. a queue) and a dict. I then encode this as a JSON, and using the “with” block, send it to the correct queue.

By avoiding the connection.close() call I no longer get and error in rabbitMQ saying connection closed unexpectedly.

It may not be the most efficient, not reusing an existing connection. But the connection bringup is tiny, and I would rather take cleanliness over efficiency in this case.

I will be running quantum simulations so the bottleneck will not be here!

Categories
Python

Importance of being defensive when contacting external services

I just got caught by a “reliable” internal service which started to give timeouts.

I never configured a timeout on the connection (default was many minutes) which jammed the whole program.
It’s important to set aggressive timeouts in prod, better to error and figure out a way to accommodate the error than just wait.

Perhaps the next step is to make my program internally defensive in order to combat my poor coding skills.

Categories
Uncategorised

List of dicts to table with PrettyTable

Given:

summaryList = [{"Event Name": "Pod_Service_MTTR_Notify", "Count": 1}, {"Event Name": "Pod_Service_No_2_alert", "Count": 20}]
if summaryList:
    summaryTable = PrettyTable(summaryList[0])
    for i in summaryList:
        summaryTable.add_row(i.values())
Categories
Uncategorised

Rate Limiter / Cool Down timer in python3 and redis

My main source of data… jira service desk… cannot be trusted. Oh yes, 90% of the time it acts sane, but once in a while, someone misconfigures a big panda alert and we get 1000 new incidents in moments.

My colleague got flooded with pager duties and emails one Saturday, and I’m sure it made him cry. I need to rate limit these actions.

My plan is to use incr function to increment a redis key. The idea is that you create a new key every x seconds, with an expiry (equal to x) and then when you increment redis will respond with the current value, so you can just test the if loop.

First things first. I know how to do something every second int(time)) but every 10 minutes?

from time import time, sleep
while True:
     everyXSeconds = 10
     curTime = int(time())
     key = curTime - curTime % everyXSeconds
     print(key)
     sleep(1)

I’m only calling time() once because calling time() twice in the mod actually give different nanosecond values… and I just know I am going to hit some corner case where that crosses a second boundary and messes up EVERYTHING

BUT… the above is actually crap and not needed at all. That is a crap way to rate limit cause, for example, if I am trying to stop an email flood it will still send 10 emails every X seconds. As in, the count will be reset every X seconds.

To me the following makes a bit more sense:

while True:
    curTime = time()
    print(f"curTime is {curTime}")
    everyXSeconds = 10
    rateBeginTime = curTime - everyXSeconds
    count = red.zcount(queueName, rateBeginTime, curTime)
    print(f"current hit queue is :{count}")
    limitMap = {curTime: curTime}
    red.zadd(queueName, limitMap)
    sleep(1)
    remove = red.zremrangebyscore(queueName, 0, rateBeginTime)
    print(f"removed {remove} keys")

So thats just what I used to test…. but basically you are using a sorted list and adding and removing as needed

Categories
Python

Pop nested dict in python

I have struggled to find the following examples:

hello = {'a':{'b':2, 'c':3}}
#I want to pop c (which is nested in a)
hello.get('a').pop('c', None)

This took me too long to figure out

Categories
Python

Bury the body

What should we do with the body?

Keep sending it back to Watson to check state until Watson burys it.

But… what happens if the body is floating around twice?

We have a problem here. This is option 1:

^^^ try follow that shit. Problem is you end up delaying processing an update best case scenario. Worst case is the body is lost somewhere due to an error and the whole thing Jams up.

Option 2:
Ahh, but what about the caseinfo? If I have sent on the body+case already I will loose data. Actually I am making sure I loose data… I will ALWAYS loose data doing this

I decided on option 3…

TBC

Categories
Python

Tracking the body

I seem to be having a bit of an issue regarding the messaging paradigm.

Essentially, I am finding that as there is no time guarentees of events getting processed, we have to think about the flow of the “body”.

I am calling my message body when it is the incident + case info

I have to be careful about “duplicating” the body, that is, sending the message away in a queue, and continuing to take action here. as they will loose sync and state.

I would rather just try to ensure the statelessness nature of the message processing code.

In other words, Watson will mark a post as a “first post”, send it to Lestrad to post as a first post.

Initially I wanted to continue processing the body in Watson to further establish state, but there is no way to get the initial time stamp of the initial slack post back into the body that stayed in watson.

Although the timestamps are in postgres and pushed into redis, there is NO guarantee about how long that will take, and no clean way to check if the action has taken place yet or not….

because… I forgot to mention that only one module is allowed talk to redis and postgres. This may well be a terrible architectural decision….

For now, the only solution is to send the body back to watson from Lestrad with the new TS, and let Watson re-establish the state.

Left is Watson, middle is Lestrad, and right is the hound

Categories
Python

Checking redis livliness

Use this within functions which are doing to use redis

     try:
         if reddisConnect():
             l.info('Redis connection is up')                                                                                                                                                                                             
             pass
     except Exception as err:
         l.error(f'Some problem with redis: {err}')

This is the connection function defined:

### redis config

def reddisConnect():
    redpass = "Hidden"
    global red
    try:
        try:
            if red.ping():
                l.info('Redis connecton set up and validated')
                return True
        except Exception:
            try:
                red = redis.Redis(password=redpass)
            except Exception as err:
                raise err
    except Exception as err:
        l.error(f'Problem setting up redis connection: {err}')

Hopefully that’s it. The postgres connection is a bit more brute force than that. I’ll fix it once I can see that global keyword is working as I hope it does.

I have been finding with the rabbitMQ call back functions that I need to double check that global variables are working like I hope they are!