The actual implementation was a lot more robust and testable, this example simply demonstrates the concept with the minimal amount of code.
The end result that we were trying to achieve was to have a ThreadPoolExecutor that, without changes to the standard functions or interfaces, would automatically cancel any Futures that may still be running after a given timeout. Even though the Future interface provided a get() method with a timeout, it was not desirable to modify every place where the no-parameter get() was called, so it was important to retain the standard interface while gaining automatic timeouts.
Extending ThreadPoolExecutor was fairly trivial. First, two variables were added to keep track of the timeout value and time unit. Then, a new setter was added to set these timeouts. The rest of the changes were to overwrite all of the submit() methods to make use of a custom Future implementation that was aware of timeouts. The new class MyFuture was a wrapper for the Future instance that the ThreadPoolExecutor super methods returned.
MyThreadPoolExecutor.java
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
private long timeout = -1;
private TimeUnit timeUnit = TimeUnit.SECONDS;
/* Super constructors not shown */
public void setTimeout(long timeout, TimeUnit timeUnit) {
this.timeout = timeout;
this.timeUnit = timeUnit;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return new MyFuture<>(super.submit(task), timeout, timeUnit);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return new MyFuture<>(super.submit(task, result), timeout, timeUnit);
}
@Override
public Future<?> submit(Runnable task) {
return new MyFuture<>(super.submit(task), timeout, timeUnit);
}
}
The custom Future implementation, MyFuture was a little bit more involved. Since Future was an interface, it was necessary to make use of the delegation pattern.
MyFuture.java
public class MyFuture<V> implements Future<V> {
private Future<V> delegate;
private long timeout;
private TimeUnit timeUnit;
public MyFuture(Future delegate, long timeout, TimeUnit timeUnit) {
this.delegate = delegate;
this.timeout = timeout;
this.timeUnit = timeUnit;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException {
try {
if (timeout > 0) {
return delegate.get(timeout, timeUnit);
}
return delegate.get();
}
catch (TimeoutException e) {
this.cancel(true);
throw new ExecutionException(
"Forced timeout after " + timeout + " " + timeUnit.name(), null);
}
}
@Override
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
try {
return delegate.get(timeout, unit);
}
catch (TimeoutException e) {
this.cancel(true);
throw new ExecutionException(
"Timeout after " + timeout + " " + unit.name(), null);
}
}
}
The MyFuture class delegated most of the implemented methods to the delegate Future object without additional logic. It also kept track of the timeout and time units values, which were the same as in the MyThreadPoolExecutor class.
The no-argument get() method had additional logic that implemented timeouts and task cancellation if timeouts were set. The other get() method had similar logic except it used timeout values that were passed into it. The trick in both was to cancel() the running task, which the default implementation did not do.
That's all there was to it. The changes were minimal and all of the services could instantly make use of this new functionality. Not bad for a few minutes work!
-i