Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
413 views
in Technique[技术] by (71.8m points)

java - How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

I am getting data from a queue server and I need to process it and send an acknowledgement. Something like this:

while (true) {
    queueserver.get.data
    ThreadPoolExecutor //send data to thread
    queueserver.acknowledgement 

I don't fully understand what happens in threads but I think this program gets the data, sends it the thread and then immediately acknowledges it. So even if I have a limit of each queue can only have 200 unacknowledged items, it will just pull as fast as it can receive it. This is good when I write a program on a single server, but if I'm using multiple workers then this becomes an issue because the amount of items in the thread queue are not a reflection of the work its done but instead of how fast it can get items from the queue server.

Is there anything I can do to somehow make the program wait if the thread queue is full of work?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

Instead of an open-ended queue, you can use a BlockingQueue with a limit on it:

BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200);

In terms of jobs submitted to an ExecutorService, instead of using the default ExecutorServices created using Executors, which use an unbounded queue, you can create your own:

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
              new ArrayBlockingQueue<Runnable>(200));

Once the queue fills up, it will cause it to reject any new tasks that are submitted. You will need to set a RejectedExecutionHandler that submits to the queue. Something like:

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
           0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
   }
});

I think it's a major miss that Java doesn't have a ThreadPoolExecutor.CallerBlocksPolicy.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...