Maintaining state while threading in Python

If you have ever worked with threading in Python before, you may have encountered issues where everything gets a little bit out of step (if you’re incrementing a counter, 2 threads may simultaneously try to increment it & mess up your program).

So we can deal with that using queues. Let’s walk through an example – the full code is at the bottom.

First off, we will create a global variable, to be referenced within our functions. We’ll call this counter. Next, we create two queues – the first is the job queue, that’s the list of ‘stuff’ that needs tobe done. The counter queue is a list of 1’s – each time an item is taken from the list, we’ll increment by that value.

We then create a function called increment manager. Here, we reference the global variable and grab a value from the counter queue. The cool thing here is that the get function locks the queue, so no other threads can pick up a value – removing the issue I described earlier.

We then, increment our counter variable and send something to the print queue – finally, we use the .done() call, which unlocks the queue.

Note: while true enables us to run this function in perpetutiy.

def increment_manager():
  global counter

  while True:
    increment = counter_queue.get() 
    old_counter = counter
    counter = old_counter + increment
    job_queue.put((f'thing to print', '---'))
    counter_queue.task_done() #unlocks the queue

So, now we have loaded some things into the queue to be printed. Now, we need a function to take the values out of the queue and to print them.

In the below, we take each of the items in the job queue and print them out. Again, we lock the queue until the last line, so other threads can’t mess up our little program.

def print_manager():
  while True:
    for line in job_queue.get():
      print(line)
    job_queue.task_done()

Next, we have this little function which puts values to be incremented into the increment counter queue.

def increment_counter():
  counter_queue.put(1)

Now we get to the interesting bit – starting everything running. Here we:

  • Start a thread to run the increment_manager function
  • Start a thread to run the print_manager function
  • Create a pool of 10 threads to run the increment counter (we’ll hence get 10 number 1’s in the queue)
  • Start and join (make sure the program waits for all threads to complete) the threads.
Thread(target=increment_manager, daemon=True).start() 
Thread(target=print_manager, daemon=True).start() 

worker_threads = [Thread(target=increment_counter) for thread in range(10)]

for thread in worker_threads:
  thread.start()

for thread in worker_threads:
  thread.join()

counter_queue.join()
job_queue.join()

Here is the full code:

import time
import random
import queue
from threading import Thread

counter = 0
job_queue = queue.Queue() #things to print out
counter_queue = queue.Queue() #amount to increase counter

def increment_manager():
  global counter

  while True:
    increment = counter_queue.get() 
    old_counter = counter
    counter = old_counter + increment
    job_queue.put((f'thing to print', '---'))
    counter_queue.task_done() #unlocks the queue

def print_manager():
  while True:
    for line in job_queue.get():
      print(line)
    job_queue.task_done()

def increment_counter():
  counter_queue.put(1)

Thread(target=increment_manager, daemon=True).start() #True means run forever
Thread(target=print_manager, daemon=True).start() #True means run forever

worker_threads = [Thread(target=increment_counter) for thread in range(10)]

for thread in worker_threads:
  thread.start()

for thread in worker_threads:
  thread.join()

counter_queue.join()
job_queue.join()
Kodey