Producer - Consumer threads termination.

The producer–consumer problem is well known example for multithread/process synchronization, one way of achieving synchronization is establishing communication between threads/processes. There are many ways to send signals from one thread to other thread, one way of establishing communication is by using queues, and this should be shared between all the threads with fixed buffer size.

This article is mainly about how to terminate the producers and consumers through parent process. Here i’m not gonna say anything about deadlock, since queues are designed in a way that they never get into race condition.

There are two ways to terminate both consumers and producers, considering that queue should be empty before terminating all the consumers.

  1. Parent sends terminate signal to producer and let the producer send terminate signal to consumers.
  2. Parent sends terminate signal to producer and consumers.

Solution 1: Producer sends terminate signal to consumers and each consumer has to send the same signal again until all the consumers are terminated.

_terminate = object()

def producer(queue):
    while running:
       # produce some data
       queue.put(data)
    queue.put(_terminate)

def consumer(queue):
    while True:
        data = queue.get()
        if data is _terminate:
            # put it back so that other consumers see it
            queue.put(_terminate)
            break This is the simplest approach, but it won't work when it required to terminate certain category of `consumers`, since it is not easy to send signal to a specific `thread` over the queue.

Also this approach make sure that buffer size is maximum and there are no more items pending in the queue.

Solution 2:

The other way is by using a kind of semaphore variables which verifies the autherization to run the producer or consumer and the same time do not let any pending items in the queue.

Lets define a producer and Consumer..

import time
import threading
from Queue import Queue, Empty

# A thread that produces data
class Producer(threading.Thread):
    def __init__(self, kwargs=None):
        threading.Thread.__init__(self)
        self.running = True
        self.kwargs = kwargs

    def run(self):
        out_q = self.kwargs.get('queue')
        while self.running:
            # Adding some integer
            out_q.put(10)
            # Kepping this thread in sleep not to do many iterations
            time.sleep(0.1)

        print 'producer {name} terminated\n'.format(name=self.name)


# A thread that consumes data
class Consumer(threading.Thread):

    def __init__(self, kwargs=None):
        threading.Thread.__init__(self)
        self.kwargs = kwargs
        self.producer_alive = True

    def run(self):
        in_q = self.kwargs.get('queue')

    # Consumer should die one queue is producer si dead and queue is empty.
        while self.producer_alive or not in_q.empty():
            try:
                data = in_q.get(timeout=1)
            except Empty, e:
                pass
            if isinstance(data, int):
                time.sleep(1)

lets create parent thread which intern fork these producer and consumer threads, here we are dealing with multiple producers and multiple consumers.

# Create the shared queue and launch both thread pools
q = Queue()

producer_pool, consumer_pool = [], []


for i in range(1, 3):
    producer_worker = Producer(kwargs={'queue': q}, name=str(i))
    producer_pool.append(producer_worker)
    producer_worker.start()

for i in xrange(1, 5):
    consumer_worker = Consumer(kwargs={'queue': q}, name=str(i))
    consumer_pool.append(consumer_worker)
    consumer_worker.start()

while 1:
    control_process = raw_input('> Y/N: ')
    if control_process == 'Y':
        for producer in producer_pool:
            producer.running = False
            # Joining this to make sure all the producers die
            producer.join()

        for consumer in consumer_pool:
            # Ideally consumer should stop once producers die
            consumer.producer_alive = False

        break

Above block handles the IO taken and based on the result it updates the Producers semaphore running=false that blocks the Producer not to generate anymore input for Consumers, this has to to wait until all the Producers die before it enables semaphore for Consumers.

Once all the producers die then the same way Consumer semaphone needs to enabled and Consumers still have to do their job to cleanup the remaining items in the queue, once the queue is empty producer_alive semaphore makes sure that Consumers terminated.

Also this way of handling Producer-Consumer termination is more flexible when it is required to handle with subset of Producers or Consumers.

Published: August 10 2015

  • category:
blog comments powered by Disqus