Implementing Producer-Consumer by making own implementation of blocking queue.

Lets implement Producer, Consumer by making own implementation of blocking queue.

Lets make own interface which will be having two methods:

1. enqueue // for put
2. dequeue // for take

Here is our interface:

package com.pract.producerconsumer.problem;

public interface CustomQueue<E>{

    public void enqueue(E e);
    
    public E dequeue();
}

Now lets make one implementation of above interface.

package com.pract.producerconsumer.problem;

import java.util.LinkedList;
import java.util.Queue;


public class MyQueue<E> implements CustomQueue<E>{

    // queue backed by a linkedlist
    private Queue<E> queue = new LinkedList<E>();
    
    
    /**
     * Enqueue will add an object to this queue, and will notify any waiting
     * threads that now there is an object available.
     * 
     * In enqueue method we just adding the elements not caring of size,
     * we can even introduce some check of size here also.
     */
    @Override
    public synchronized void enqueue(E e) {
        queue.add(e);
        // Wake up anyone waiting on the queue to put some item.
        notifyAll();
    }

    /**
    * Make a blocking call so that we will only return when the queue has
    * something on it, otherwise wait until something is put on it.
    */
    @Override
    public synchronized E dequeue(){
        E e = null;
        
        while(queue.isEmpty()){
            try {
                wait();
            } catch (InterruptedException e1) {
                return e;
            }
        }
        e = queue.remove();
        return e;
    }
}

Above implementation behaves as blocking queue, i.e. according to javadoc: ” A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.”

But in our case it just wait to become non-empty when retrieving an element. Since we have used linked list and there is no check of size so here we are not caring the other part “wait for space to become available in the queue when storing an element.”

So now lets concentrate on Producer and Consumer.

Here is our Producer class.

package com.pract.producerconsumer.problem;

import java.util.Random;

public class Producer implements Runnable{
    
    private MyQueue<Integer> myQueue;
    
    public Producer(MyQueue<Integer> myQueue){
        this.myQueue = myQueue;
    }

    @Override
    public void run() {
        while(true){
            // lets produce some item
            Integer random = new Random().nextInt(10);
            myQueue.enqueue(random);
            System.out.println("Produced Item : " + random);
            // putting some delay
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

And below is our Consumer class:

package com.pract.producerconsumer.problem;

public class Consumer implements Runnable{

    private MyQueue<Integer> myQueue;
    
    public Consumer(MyQueue<Integer> myQueue){
        this.myQueue = myQueue;
    }
    
    
    @Override
    public void run() {
        while(true){
            int e = myQueue.dequeue();
            // put some delay
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            
            // print the dequeued item
            System.out.println(" Consumed Item :" + e);
        }
    }
}

Now lets test our implementation:

package com.pract.producerconsumer.problem;

public class Test {

    public static void main(String[] args) {
        MyQueue<Integer> myQueue = new MyQueue<Integer>();

        // instantiate both producer and consumer
        Thread producer = new Thread(new Producer(myQueue));
        Thread consumer = new Thread(new Consumer(myQueue));

        // start both producer and consumer
        producer.start();
        consumer.start();

    }
}

Leave a Reply