Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
188 views
in Technique[技术] by (71.8m points)

java - Producer/Consumer threads using a Queue

I'd like to create some sort of Producer/Consumer threading app. But I'm not sure what the best way to implement a queue between the two.

So I've some up with two ideas (both of which could be entirely wrong). I would like to know which would be better and if they both suck then what would be the best way to implement the queue. It's mainly my implementation of the queue in these examples that I'm concerned about. I'm extending a Queue class that is an in house class and is thread safe. Below are two examples with 4 classes each.

Main class-

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

Consumer class-

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

Producer class-

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

Queue class-

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

OR

Main class-

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 

Consumer class-

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

Producer class-

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

Queue class-

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

And go!

Question&Answers:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Java 5+ has all the tools you need for this kind of thing. You will want to:

  1. Put all your Producers in one ExecutorService;
  2. Put all your Consumers in another ExecutorService;
  3. If necessary, communicate between the two using a BlockingQueue.

I say "if necessary" for (3) because from my experience it's an unnecessary step. All you do is submit new tasks to the consumer executor service. So:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

So the producers submit directly to consumers.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

57.0k users

...