Producer-consumer problem in Python

By : Akshar Raaj

We will solve Producer Consumer problem in Python using Python threads. This problem is nowhere as hard as they make it sound in colleges.

This blog will make more sense if you have some idea about Producer Consumer problem.

Why care about Producer Consumer problem:

  • Will help you understand more about concurrency and different concepts of concurrency.
  • The concept of Producer Consumer problem is used to some extent in implementing a message queue. And you will surely need message queue at some point of time.

While we use threads, you will learn about the following thread topics:

  • Condition in threads.
  • wait() method available on Condition instances.
  • notify() method available on Condition instances.

I will assume you are comfortable with basics of Threads, race condition and how to prevent race condition i.e using locks. If not, my last post on basics of Threads should be able to help.

Quoting Wikipedia:

The producer's job is to generate a piece of data, put it into the buffer and start again.
At the same time, the consumer is consuming the data (i.e., removing it from the buffer) one piece at a time

The catch here is "At the same time". So, producer and consumer need to run concurrently. Hence we need separate threads for Producer and Consumer.

from threading import Thread

class ProducerThread(Thread):
    def run(self):
        pass

class ConsumerThread(Thread):
    def run(self):
        pass

Quoting Wikipedia again:

The problem describes two processes, the producer and the consumer, who share a common,
fixed-size buffer used as a queue.

So we keep one variable which will be global and will be modified by both Producer and Consumer threads. Producer produces data and adds it to the queue. Consumer consumes data from the queue i.e removes it from the queue.

queue = []

In first iteration, we will not put fixed-size constraint on queue. We will make it fixed-size once our basic program works.

Initial buggy program:

from threading import Thread, Lock
import time
import random

queue = []
lock = Lock()

class ProducerThread(Thread):
    def run(self):
        nums = range(5) #Will create the list [0, 1, 2, 3, 4]
        global queue
        while True:
            num = random.choice(nums) #Selects a random number from list [0, 1, 2, 3, 4]
            lock.acquire()
            queue.append(num)
            print "Produced", num 
            lock.release()
            time.sleep(random.random())


class ConsumerThread(Thread):
    def run(self):
        global queue
        while True:
            lock.acquire()
            if not queue:
                print "Nothing in queue, but consumer will try to consume"
            num = queue.pop(0)
            print "Consumed", num 
            lock.release()
            time.sleep(random.random())


ProducerThread().start()
ConsumerThread().start()

Run it few times and notice the result. Your program might not end after raising IndexError. Use Ctrl+Z to terminate.

Sample output:

Produced 3
Consumed 3
Produced 4
Consumed 4
Produced 1
Consumed 1
Nothing in queue, but consumer will try to consume
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "producer_consumer.py", line 31, in run
    num = queue.pop(0)
IndexError: pop from empty list

Explanation:

  • We started one producer thread(hereafter referred as producer) and one consumer thread(hereafter referred as consumer).
  • Producer keeps on adding to the queue and consumer keeps on removing from the queue.
  • Since queue is a shared variable, we keep it inside lock to avoid race condition.
  • At some point, consumer has consumed everything and producer is still sleeping. Consumer tries to consume more but since queue is empty, an IndexError is raised.
  • But on every execution, before IndexError is raised you will see the print statement telling "Nothing in queue, but consumer will try to consume", which explains why you are getting the error.

We found this implementaion as the wrong behaviour.

What is the correct behaviour?

When there was nothing in the queue, consumer should have stopped running and waited instead of trying to consume from the queue. And once producer adds something to the queue, there should be a way for it to notify the consumer telling it has added something to queue. So, consumer can again consume from the queue. And thus IndexError will never be raised.

About Condition

  • Condition object allows one or more threads to wait until notified by another thread. Taken from here.

And this is exactly what we want. We want consumer to wait when the queue is empty and resume only when it gets notified by the producer. Producer should notify only after it adds something to the queue. So after notification from producer, we can be sure that queue is not empty and hence no error can crop if consumer consumes.

  • Condition is always associated with a lock.
  • A condition has acquire() and release() methods that call the corresponding methods of the associated lock.

Condition provides acquire() and release() which calls lock's acquire() and release() internally, and so we can replace lock instances with condition instances and our lock behaviour will keep working properly.

Consumer needs to wait using a condition instance and producer needs to notify the consumer using the condition instance too. So, they must use the same condition instance for the wait and notify functionality to work properly.

Let's rewrite our Consumer and Producer code:

from threading import Condition

condition = Condition()

class ConsumerThread(Thread):
    def run(self):
        global queue
        while True:
            condition.acquire()
            if not queue:
                print "Nothing in queue, consumer is waiting"
                condition.wait()
                print "Producer added something to queue and notified the consumer"
            num = queue.pop(0)
            print "Consumed", num 
            condition.release()
            time.sleep(random.random())

Let's rewrite Producer code:

class ProducerThread(Thread):
    def run(self):
        nums = range(5)
        global queue
        while True:
            condition.acquire()
            num = random.choice(nums)
            queue.append(num)
            print "Produced", num 
            condition.notify()
            condition.release()
            time.sleep(random.random())

Sample output:

Produced 3
Consumed 3
Produced 1
Consumed 1
Produced 4
Consumed 4
Produced 3
Consumed 3
Nothing in queue, consumer is waiting
Produced 2
Producer added something to queue and notified the consumer
Consumed 2
Nothing in queue, consumer is waiting
Produced 2
Producer added something to queue and notified the consumer
Consumed 2
Nothing in queue, consumer is waiting
Produced 3
Producer added something to queue and notified the consumer
Consumed 3
Produced 4
Consumed 4
Produced 1
Consumed 1

Explanation:

  • For consumer, we check if the queue is empty before consuming.
  • If yes then call wait() on condition instance.
  • wait() blocks the consumer and also releases the lock associated with the condition. This lock was held by consumer, so basically consumer loses hold of the lock.
  • Now unless consumer is notified, it will not run.
  • Producer can acquire the lock because lock was released by consumer.
  • Producer puts data in queue and calls notify() on the condition instance.
  • Once notify() call is made on condition, consumer wakes up. But waking up doesn't mean it starts executing.
  • notify() does not release the lock. Even after notify(), lock is still held by producer.
  • Producer explicitly releases the lock by using condition.release().
  • And consumer starts running again. Now it will find data in queue and no IndexError will be raised.

Adding a max size on the queue

Producer should not put data in the queue if the queue is full.

It can be accomplished in the following way:

  • Before putting data in queue, producer should check if the queue is full.
  • If not, producer can continue as usual.
  • If the queue is full, producer must wait. So call wait() on condition instance to accomplish this.
  • This gives a chance to consumer to run. Consumer will consume data from queue which will create space in queue.
  • And then consumer should notify the producer.
  • Once consumer releases the lock, producer can acquire the lock and can add data to queue.

Final program looks like:

from threading import Thread, Condition
import time
import random

queue = []
MAX_NUM = 10
condition = Condition()

class ProducerThread(Thread):
    def run(self):
        nums = range(5)
        global queue
        while True:
            condition.acquire()
            if len(queue) == MAX_NUM:
                print "Queue full, producer is waiting"
                condition.wait()
                print "Space in queue, Consumer notified the producer"
            num = random.choice(nums)
            queue.append(num)
            print "Produced", num
            condition.notify()
            condition.release()
            time.sleep(random.random())


class ConsumerThread(Thread):
    def run(self):
        global queue
        while True:
            condition.acquire()
            if not queue:
                print "Nothing in queue, consumer is waiting"
                condition.wait()
                print "Producer added something to queue and notified the consumer"
            num = queue.pop(0)
            print "Consumed", num
            condition.notify()
            condition.release()
            time.sleep(random.random())


ProducerThread().start()
ConsumerThread().start()

Sample output:

Produced 0
Consumed 0
Produced 0
Produced 4
Consumed 0
Consumed 4
Nothing in queue, consumer is waiting
Produced 4
Producer added something to queue and notified the consumer
Consumed 4
Produced 3
Produced 2
Consumed 3

Update:

Many people on the internet suggested that I use Queue.Queue instead of using a list with conditions and lock. I agree, but I wanted to show how Conditions, wait() and notify() work so I took this approach.

Let's update our code to use Queue.

Queue encapsulates the behaviour of Condition, wait(), notify(), acquire() etc.

Now is a good time to read the documentation for Queue and the source code for it.

Updated program:

from threading import Thread
import time
import random
from Queue import Queue

queue = Queue(10)

class ProducerThread(Thread):
    def run(self):
        nums = range(5)
        global queue
        while True:
            num = random.choice(nums)
            queue.put(num)
            print "Produced", num
            time.sleep(random.random())


class ConsumerThread(Thread):
    def run(self):
        global queue
        while True:
            num = queue.get()
            queue.task_done()
            print "Consumed", num
            time.sleep(random.random())


ProducerThread().start()
ConsumerThread().start()

Explanation

  • In place of list, we are using a Queue instance(hereafter queue).
  • queue has a Condition and that condition has its lock. You don't need to bother about Condition and Lock if you use Queue.
  • Producer uses put available on queue to insert data in the queue.
  • put() has the logic to acquire the lock before inserting data in queue.
  • Also put() checks whether the queue is full. If yes, then it calls wait() internally and so producer starts waiting.
  • Consumer uses get.
  • get() acquires the lock before removing data from queue.
  • get() checks if the queue is empty. If yes, it puts consumer in waiting state.
  • get() and put() has proper logic for notify() too. Why don't you check the source code for Queue now?


Related Posts


Can we help you build amazing apps? Contact us today.

Comments

bc 19th Oct., 2013

If you look inside a stdlib Queue.Queue, this is exactly what you find: A collections.deque (rather than a list) wrapped with a couple of Conditions.

commmenttor
Akshar Raaj 29th Oct., 2013

@bc:
Looked into it. Updated the blog too. Thanks!

commmenttor
berkay

at Initial buggy program, if we check if the queue is empty for the consumer and for the produces, we check if the queue is max then it will work (?). Is my assumption true?

commmenttor
nishant 21st Jan., 2014

Hi, You wrote an excellent example. My Question is if I have more then 1000 threads for producer and >1000 threads for consumer wont locking the threads would cost more to the CPU, or does it has any other complication for such a bulk of threads enqueuing and dequeuing the Same Queue. Waiting for your response.......

commmenttor
Akshar Raaj 28th Jan., 2014

@nishant:
I guess the program would crash with so many threads, each process has a limited memory available to it and so many threads of this process will compete for this limited memory.

commmenttor
Python 18th April, 2014

Hi, I liked your site and I understand that you was trying to explain in detail the use of lock and list in the thread context. Well explained. I have just one comment to make to help improve your example. There is a bug in the line: (if not queue:). According to the manual the wait can return after an arbitrary long time, and the condition which prompted the notify() call may no longer hold true. So you should change the if for a while or use the function wait_for(). I tested your code here and sometimes it gives error because the wait can lose the lock and try to retrieve elements from an empty list. Just change the if for while solves the problem.

commmenttor
Cartier Watches Replica

Ideal headphoneschoose, nicely listed and/or what is actually revealed. Stunning gifts package and poem always enclosed. Very good concerning mother's time!

commmenttor
http://www.maverickscricket.com/?nike-roshe-run-womens

This really is quite kind of. I like that it lots. All estimate are attractive and package covered when good bow. It's not the absolute most amazing bracelet actually, but for the cost it exceeds my personal expectations. Cannotdelay on award information technology on the mom towards Mother's evening!

commmenttor
genuine ugg boots sale australia

ugg lexi slipper Producer-consumer problem in Python - Agiliq Blog | Django web app development
genuine ugg boots sale australia http://www.mccreddin.com/deals/genuine-ugg-boots-sale-australia.html

commmenttor
JVC ヘッドホン

The thing you should do to find out about women before you are left behind.

commmenttor
ugg boots long

[url="http://www.druff.co.za/ugg-leather-boots/pink-ugg-boots-with-bows.html"]pink ugg boots with bows[/url]
ugg boots long http://www.druff.co.za/ugg-leather-boots/ugg-boots-long.html

commmenttor
clearance ugg boots

ugg bags sale Producer-consumer problem in Python - Agiliq Blog | Django web app development
clearance ugg boots

commmenttor
Air Jordan Nike 15 XV Retro Noir/Brun

Cette baguette a une croûte croustillante, mais sinon gagné peu d'éloges. Il était pâle avec, selon Michael Bateman, "un blanc, la peau craquelée". "Aucune odeur du pain fraîchement cuit et sans saveur soit. Terrible", a déclaré Linda Collister. "La texture est trop humide et souple», a critiqué Eric Feuilleaubois.

commmenttor
lunette ray ban pas cher homme

En raison de la conception de la pince à linge, de sorte que l'utilisation de 2093 n'apparaît pas dans tout inconfort et de fatigue. Il est à noter que, afin d'éviter les inconvénients de drapage longs bouchons d'oreilles, des bouchons d'oreille boucle en 2093 pour soutenir la cheville de casque, pratique pour une utilisation quotidienne comme un utilisateur afin de maximiser remporté l'Allemand 2006 iF design award produit Bluetooth, Omiz 2093 En plus de la conception élégante de courbe esthétique, créatif position de la clé conception multifonction dans l'oreille, mais aussi pour le 2093 apparition de points supplémentaires.

commmenttor
kids ugg

ugg of australia Producer-consumer problem in Python - Agiliq Blog | Django web app development
kids ugg

commmenttor
ugg boots on sale ebay

cheap ugg boots sale online Producer-consumer problem in Python - Agiliq Blog | Django web app development
ugg boots on sale ebay

commmenttor
Cheap Beats by dre

I paid for this looking for a great gift concerning my personal mama. That headphone arrived in a awesome purple tied package. This was a searching present underneath their christmas time tree!! Ones headphone also looked breathtaking, still the mama was gigantic boned and headphone was some tight, and yet it is a great present! I adore that.

commmenttor
jordan femme 2012-524

For more Michael Kors spring designer satchel handbag collection, visit Michael Kors online or your local Neiman Marcus upscale department store. I enjoy researching cooking/food websites and writing book reviews.the Latest Collection of Women's Designer Leather Handbags

commmenttor
Cartier Love Bracelet

Right mother's time present. Nevertheless, that the bracelet took quite extended at come, therefore arrived following the vacation. The mom expressed that she loved your bracelet, even though I've maybe not viewed the lady wear things however. :-)

commmenttor
Cheap Nike Air Max 90 Womens

I got this particular headphonesto my personal mom for parents day, then she absolutely adored that! It really is actually adorable headphonesand suggesting on the card it comes inside package is very sentimental!! As well as the excellent of beads is actually awesome!

commmenttor
where to find cheap ugg boots

ugg broome Producer-consumer problem in Python - Agiliq Blog | Django web app development
where to find cheap ugg boots

commmenttor
ugg slippers dakota

ugg button boots Producer-consumer problem in Python - Agiliq Blog | Django web app development
ugg slippers dakota

commmenttor
sand ugg boots

cheap ugg boots outlet sale Producer-consumer problem in Python - Agiliq Blog | Django web app development
sand ugg boots

commmenttor
alviero martini shop it

Single and planning to be that way for awhile, no kids and no desire for kids, no pets (wish I could, but not home enough).

commmenttor
グラフィック Tシャツ

Innovative men Book Exposes The Best Way To Dominate The men Arena

commmenttor
Bvlgari Rings

Ideal mother's evening gifts. Still, on bracelet got as well longer at arrive, therefore came after the holiday. The mom shown that will she loved their bracelet, yet I have not really observed this girl put on things however. :-)

commmenttor
Cheap Nike air max 90

Our headset is as beautiful while in the photo. That arrived immediately. I would encourage utilizing a towel through every one layer as you click information technology out w / any steam iron. It doesn't vapor outside using merely a steamer. On iron is essential. It can be sensitive, so if you do not trust yourself using the towel additionally vapor iron, well accept that it up to a pro. Perfect seem.

commmenttor
Cheap Nike Roshe Run

I purchased our looking for a great present towards my mama. Their headphone emerged in a awesome purple tied up container. This was a searching gifts below your christmas time tree!! Some sort of headphone also seemed perfect, though our mother is gigantic boned as well as the headphone are somewhat tight, still it is a good present! I really like that it.

commmenttor
lululemon pants

I ordered our looking for a awesome present concerning my personal mama. The actual headphone arrived within a perfect purple tied up container. This was ideal looking gift underneath the christmas time tree!! The headphone also seemed awesome, though my personal mother was huge boned and the headphone is actually a little tight, however the a good present! I like things.

commmenttor
imitation sac chloé

exactly as described! Thanks so much! Excellent seller!

commmenttor
replica hermes dinnerware

replica hermes dinnerware imitation birkin bags where to buy wjjpwbrvsvn

commmenttor
cheap ugg australia boots

ugg slippers size 7 Producer-consumer problem in Python - Agiliq Blog | Django web app development
cheap ugg australia boots

commmenttor
cheap big kid jordans

You should take part in a contest for among the top blogs on the internet. I will advise this web page!
cheap big kid jordans

commmenttor
© Agiliq, 2009-2012