Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
400 views
in Technique[技术] by (71.8m points)

java - How can I terminate Tasks that have timed out in multithreading?

I need to make a library in which I will have synchronous and asynchronous methods in it.

  • executeSynchronous() - waits until I have a result, returns the result.
  • executeAsynchronous() - returns a Future immediately which can be processed after other things are done, if needed.

Core Logic of my Library

The customer will use our library and they will call it by passing DataKey builder object. We will then construct a URL by using that DataKey object and make a HTTP client call to that URL by executing it and after we get the response back as a JSON String, we will send that JSON String back to our customer as it is by creating DataResponse object. Some customer will call executeSynchronous() and some might call executeAsynchronous() so that's why I need to provide two method separately in my library.

Interface:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}

And then I have my DataClient which implements the above Client interface:

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        Future<DataResponse> future = null;

        try {
            future = executeAsynchronous(key);
            dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
            // does this look right the way I am doing it?
            future.cancel(true); // terminating tasks that have timed out.
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey key) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(key, restTemplate);
            future = executor.submit(task); 
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
        }

        return future;
    }
}

Simple class which will perform the actual task:

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
}

Problem Statement:-

When I started working on this solution, I was not terminating the tasks that have timed out. I was reporting the timeout to the client, but the task continues to run in the thread pool (potentially occupying one of my limited 10 threads for a long time). So I did some research online and I found that I can cancel my tasks those have timed out by using cancel on future as shown below -

future.cancel(true);

But I wanted to make sure, does it look right the way I am doing in my executeSynchronous method to cancel the tasks that have got timedout?

Since I am calling cancel() on theFuture which will stop it from running if tasks is still in the queue so I am not sure what I am doing is right or not? What is the right approach to do this?

If there is any better way, then can anyone provide an example for that?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

If task is still in the queue then cancelling it by simply calling future.cancel() is ok but obviously you don't know if that is in the queue. Also even if you ask future to interrupt the task it may not work as your task can still be doing something which is ignoring the thread interrupted status.

So you can use the future.cancel(true) but you need to make sure that your task (thread) does regard the thread interrupted status. For example as you mentioned you make http call, so you might need to close the http client resource as soon as thread is interrupted.

Please refer to the example below.

I have tried to implement the task cancellation scenario. Normally a thread can check isInterrupted() and try to terminate itself. But this becomes more complex when you are using thread pool executors, callable and if the task is not really like while(!Thread.isInterrupted()) {// execute task}.

In this example, a task is writing a file (I did not use http call to keep the it simple). A thread pool executor starts running the task but the caller wants to cancel it just after 100 milli seconds. Now future sends the interrupt signal to the thread but the callable task can not check it immediately while writing to file. So to make this happen callable maintains a list of IO resources it is going to use and as soon as future wants to cancel the task it just calls cancel() on all IO resources which terminates the task with IOException and then thread finishes.

public class CancellableTaskTest {

    public static void main(String[] args) throws Exception {
        CancellableThreadPoolExecutor threadPoolExecutor = new CancellableThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        long startTime = System.currentTimeMillis();
        Future<String> future = threadPoolExecutor.submit(new CancellableTask());
        while (System.currentTimeMillis() - startTime < 100) {
            Thread.sleep(10);
        }
        System.out.println("Trying to cancel task");
        future.cancel(true);
    }
}

class CancellableThreadPoolExecutor extends ThreadPoolExecutor {

    public CancellableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new CancellableFutureTask<T>(callable);
    }
}

class CancellableFutureTask<V> extends FutureTask<V> {

    private WeakReference<CancellableTask> weakReference;

    public CancellableFutureTask(Callable<V> callable) {
        super(callable);
        if (callable instanceof CancellableTask) {
            this.weakReference = new WeakReference<CancellableTask>((CancellableTask) callable);
        }
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean result = super.cancel(mayInterruptIfRunning);
        if (weakReference != null) {
            CancellableTask task = weakReference.get();
            if (task != null) {
                try {
                    task.cancel();
                } catch (Exception e) {
                    e.printStackTrace();
                    result = false;
                }
            }
        }
        return result;
    }
}

class CancellableTask implements Callable<String> {

    private volatile boolean cancelled;
    private final Object lock = new Object();
    private LinkedList<Object> cancellableResources = new LinkedList<Object>();

    @Override
    public String call() throws Exception {
        if (!cancelled) {
            System.out.println("Task started");
            // write file
            File file = File.createTempFile("testfile", ".txt");
            BufferedWriter writer = new BufferedWriter(new FileWriter(file));
            synchronized (lock) {
                cancellableResources.add(writer);
            }
            try {
                long lineCount = 0;
                while (lineCount++ < 100000000) {
                    writer.write("This is a test text at line: " + lineCount);
                    writer.newLine();
                }
                System.out.println("Task completed");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                writer.close();
                file.delete();
                synchronized (lock) {
                    cancellableResources.clear();
                }
            }
        }
        return "done";
    }

    public void cancel() throws Exception {
        cancelled = true;
        Thread.sleep(1000);
        boolean success = false;
        synchronized (lock) {
            for (Object cancellableResource : cancellableResources) {
                if (cancellableResource instanceof Closeable) {
                    ((Closeable) cancellableResource).close();
                    success = true;
                }
            }
        }
        System.out.println("Task " + (success ? "cancelled" : "could not be cancelled. It might have completed or not started at all"));
    }
}

For your REST Http client related requirement you can modify the factory class something like this -

public class CancellableSimpleClientHttpRequestFactory extends SimpleClientHttpRequestFactory {

    private List<Object> cancellableResources;

    public CancellableSimpleClientHttpRequestFactory() {
    }

    public CancellableSimpleClientHttpRequestFactory(List<Object> cancellableResources) {
        this.cancellableResources = cancellableResources;
    }

    protected HttpURLConnection openConnection(URL url, Proxy proxy) throws IOException {
        HttpURLConnection connection = super.openConnection(url, proxy);
        if (cancellableResources != null) {
            cancellableResources.add(connection);
        }
        return connection;
    }
}

Here you need to use this factory while creating RestTemplate in your runnable task.

    RestTemplate template = new RestTemplate(new CancellableSimpleClientHttpRequestFactory(this.cancellableResources));

Make sure that you pass the same list of cancellable resources that you have maintained in CancellableTask.

Now you need to modify the cancel() method in CancellableTask like this -

synchronized (lock) {
    for (Object cancellableResource : cancellableResources) {
        if (cancellableResource instanceof HttpURLConnection) {
            ((HttpURLConnection) cancellableResource).disconnect();
            success = true;
        }
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...