There are often some tasks in the project that need to be executed asynchronously (submitted to the thread pool) to be executed, while the main thread often needs to know the results of asynchronous execution. What should we do at this time? It is impossible to achieve with runnable, we need to use callable to read the following code:
import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class AddTask implements Callable<Integer> { private int a,b; public AddTask(int a, int b) { this.a = a; this.b = b; } @Override public Integer call throws Exception { Integer result = a + b; return result; } public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newSingleThreadExecutor; //JDK has returned so far and are instances of FutureTask Future<Integer> future = executor.submit(new AddTask(1, 2)); Integer result = future.get;// Only when the status of future is completed (future.isDone = true), the get method will return }} Although we can realize the requirement of obtaining asynchronous execution results, we found that this Future is actually not useful because it does not provide a notification mechanism, which means we don’t know when the future will be completed (if we need to poll isDone() to judge, it feels like there is no need to use this). Take a look at the interface method of java.util.concurrent.future.Future:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled; boolean isDone; V get throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;} From this we can see that the Future mechanism of JDK is actually not easy to use. If you can add a listener to this future and let it notify the listener when it is completed, it will be easier to use, just like the following IFuture:
package future;import java.util.concurrent.CancellationException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;/** * The result of an asynchronous operation. * * @author lixiaohui * @param <V> Type parameter of the execution result*/public interface IFuture<V> extends Future<V> { boolean isSuccess; // Whether V getNow is successful; // Return the result immediately (regardless of whether Future is in the completed state) Throwable cause; //The reason for the execution failure is Cancellable; //Can I cancel IFuture<V> await throws InterruptedException; //Waiting for future completion boolean await(long timeoutMillis) throws InterruptedException; //Timeout wait for future completion boolean await(long timeout, TimeUnit timeunit) throws InterruptedException; IFuture<V> awaitUninterruptibly; //Waiting for future completion, no interruption boolean awaitUninterruptibly(long timeoutMillis);//Timeout waits for future completion, no interrupt response boolean awaitUninterruptibly(long timeout, TimeUnit timeunit); IFuture<V> addListener(IFutureListener<V> l); //When future is completed, these added listeners will be notified IFuture<V> removeListener(IFutureListener<V> l); } Next, let’s implement this IFuture together. Before this, we will explain the Object.wait, Object.notifyAll method, because the core of the original "��" of the entire Future implementation is these two methods. Take a look at the explanation in JDK:
public class Object { /** * Causes the current thread to wait until another thread invokes the * {@link java.lang.Object#notify} method or the * {@link java.lang.Object#notifyAll} method for this object. * In other words, this method behaves exactly as if it simply * performs the call {@code wait(0)}. * After calling this method, the current thread will release the object monitor lock and give up the CPU usage rights. Until another thread calls notify/notifyAll */ public final void wait throws InterruptedException { wait(0); } /** * Wakes up all threads that are waiting on this object's monitor. A * thread waits on an object's monitor by calling one of the * {@code wait} methods. * <p> * The awakened threads will not be able to proceed until the current * thread relinquishes the lock on this object. The awakened threads * will compete in the usual manner with any other threads that might * be actively competing to synchronize on this object; for example, * the awakened threads enjoy no reliable privilege or disadvantage in * being the next thread to lock this object. */ public final native void notifyAll;} After knowing this, we have an idea to implement Future by ourselves. When the thread calls a series of methods such as IFuture.await, if the Future has not been completed, then call the future.wait method to make the thread enter the WAITING state. When other threads set Future to the completion state (note that the completion state here includes normal end and abnormal end), the future.notifyAll method needs to be called to wake up those threads that were in the WAITING state because of calling the wait method. The complete implementation is as follows (the code should not be difficult to understand. I refer to the Future mechanism of netty. If you are interested, you can check out the source code of netty):
package future;import java.util.Collection;import java.util.concurrent.CancellationException;import java.util.concurrent.CopyOnWriteArrayList;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;/** * <pre> * At normal end, if the execution result is not null, then result is the execution result; if the execution result is null, then result = {@link AbstractFuture#SUCCESS_SIGNAL} * When the exception ends, the result is an instance of {@link CauseHolder}; if the exception ends due to cancellation, the result is an instance of {@link CancellationException}, otherwise it is an instance of other exceptions* The following situations will cause the asynchronous operation to be transferred from the unfinished state to the completed state, that is, the notifyAll method is called when the following situations occur: * <ul> * <li>When the asynchronous operation is cancelled (cancel method) </li> * <li>When the asynchronous operation ends normally (setSuccess method) </li> * <li>When the asynchronous operation ends abnormally (setFailure method) </li> * </ul> * </pre> * * @author lixiaohui * * @param <V> * Type of asynchronous execution result*/public class AbstractFuture<V> implements IFuture<V> { protected volatile Object result; // It needs to be guaranteed to be visibility/** * Listener set*/ protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>; /** * When the normal execution result of the task is null, that is, when the client calls {@link AbstractFuture#setSuccess(null)}, * result references the object*/ private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal; @Override public boolean cancel(boolean mayInterruptIfRunning) { if (isDone) { // return false cannot be cancelled; } synchronized (this) { if (isDone) { // double check return false; } result = new CauseHolder(new CancellationException); notifyAll; // isDone = true, notify the thread waiting on the wait on the object} notifyListeners; // Notify the listener that the asynchronous operation has been completed return true; } @Override public boolean isCancellable { return result == null; } @Override public boolean isCancelled { return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; } @Override public boolean isDone { return result != null; } @Override public V get throws InterruptedException, ExecutionException { await; // Wait for the execution result Throwable cause = cause; if (cause == null) { // No exception occurred, the asynchronous operation ended normally return getNow; } if (cause instanceof CancellationException) { // The asynchronous operation was cancelled throw (CancellationException) cause; } throw new ExecutionException(cause); // Other exceptions} @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) {// Timeout waiting for the execution result Throwable cause = cause; if (cause == null) {// No exception occurred, the asynchronous operation ended normally return getNow; } if (cause instanceof CancellationException) {// The asynchronous operation was cancelled throw throw (CancellationException) cause; } throw new ExecutionException(cause);// Other exceptions} // The time has not ended yet, throwing a timeout exception throw new TimeoutException; } @Override public boolean isSuccess { return result == null ? false : !(result instanceof CauseHolder); } @SuppressWarnings("unchecked") @Override public V getNow { return (V) (result == SUCCESS_SIGNAL ? null : result); } @Override public Throwable cause { if (result != null && result instanceof CauseHolder) { return ((CauseHolder) result).cause; } return null; } @Override public IFuture<V> addListener(IFutureListener<V> listener) { if (listener == null) { throw new NullPointerException("listener"); } if (isDone) { // If you have completed notifyListener(listener); return this; } synchronized (this) { if (!isDone) { listeners.add(listener); return this; } } notifyListener(listener); return this; } @Override public IFuture<V> removeListener(IFutureListener<V> listener) { if (listener == null) { throw new NullPointerException("listener"); } if (!isDone) { listeners.remove(listener); } return this; } @Override public IFuture<V> await throws InterruptedException { return await0(true); } private IFuture<V> await0(boolean interruptable) throws InterruptedException { if (!isDone) { // If it has been completed, it will be returned directly // If the terminal is allowed and interrupted, an interrupt exception will be thrown if (interruptable && Thread.interrupted) { throw new InterruptedException("thread " + Thread.currentThread.getName + " has been interrupted."); } boolean interrupted = false; synchronized (this) { while (!isDone) { try { wait; // Release the lock and enter the waiting state, wait for other threads to call the object's notify/notifyAll method} catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } } } } if (interrupted) { // Why do we need to set the interrupt flag here? Because after returning from the wait method, the interrupt flag is cleared, // Reset here so that other codes know that it is interrupted here. Thread.currentThread.interrupt; } } return this; } @Override public boolean await(long timeoutMillis) throws InterruptedException { return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true); } @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true); } private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { if (isDone) { return true; } if (timeoutNanos <= 0) { return isDone; } if (interruptable && Thread.interrupted) { throw new InterruptedException(toString); } long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime; long waitTime = timeoutNanos; boolean interrupted = false; try { synchronized (this) { if (isDone) { return true; } if (waitTime <= 0) { return isDone; } for (;;) { try { wait(waitTime / 1000000, (int) (waitTime % 1000000)); } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } if (isDone) { return true; } else { waitTime = timeoutNanos - (System.nanoTime - startTime); if (waitTime <= 0) { return isDone; } } } } } } finally { if (interrupted) { Thread.currentThread.interrupt; } } } @Override public IFuture<V> awaitUninterruptibly { try { return await0(false); } catch (InterruptedException e) { // If an exception is thrown here, it cannot be handled throw new java.lang.InternalError; } } @Override public boolean awaitUninterruptibly(long timeoutMillis) { try { return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false); } catch (InterruptedException e) { throw new java.lang.InternalError; } } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { try { return await0(unit.toNanos(timeout), false); } catch (InterruptedException e) { throw new java.lang.InternalError; } } protected IFuture<V> setFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners; return this; } throw new IllegalStateException("complete already: " + this); } private boolean setFailure0(Throwable cause) { if (isDone) { return false; } synchronized (this) { if (isDone) { return false; } result = new CauseHolder(cause); notifyAll; } return true; } protected IFuture<V> setSuccess(Object result) { if (setSuccess0(result)) { // NotifyListeners after setting successfully; return this; } throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(Object result) { if (isDone) { return false; } synchronized (this) { if (isDone) { return false; } if (result == null) { // The result of normal execution of asynchronous operation is null this.result = SUCCESS_SIGNAL; } else { this.result = result; } notifyAll; } return true; } private void notifyListeners { for (IFutureListener<V> l : listeners) { notifyListener(l); } } private void notifyListener(IFutureListener<V> l) { try { l.operationCompleted(this); } catch (Exception e) { e.printStackTrace; } } private static class SuccessSignal { } private static final class CauseHolder { final Throwable cause; CauseHolder(Throwable cause) { this.cause = cause; } }} So how to use this? With the above skeleton implementation, we can customize various asynchronous results. The following is a delayed task:
package future.test;import future.IFuture;import future.IFutureListener;/** * Delay addition* @author lixiaohui * */public class DelayAdder { public static void main(String[] args) { new DelayAdder.add(3 * 1000, 1, 2).addListener(new IFutureListener<Integer> { @Override public void operationCompleted(IFuture<Integer> future) throws Exception { System.out.println(future.getNow); } }); } /** * Delay addition* @param delay Delay duration milliseconds * @param a Addition* @param b Addition* @return Asynchronous result*/ public DelayAdditionFuture add(long delay, int a, int b) { DelayAdditionFuture future = new DelayAdditionFuture; new Thread(new DelayAdditionTask(delay, a, b, future)).start; return future; } private class DelayAdditionTask implements Runnable { private long delay; private int a, b; private DelayAdditionFuture future; public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) { super; this.delay = delay; this.a = a; this.b = b; this.future = future; } @Override public void run { try { Thread.sleep(delay); Integer i = a + b; // TODO Here is the future set to the completion status (normal execution is completed) future.setSuccess(i); } catch (InterruptedException e) { // TODO Here is the future set to the completion status (exception is completed) future.setFailure(e.getCause); } } }} package future.test;import future.AbstractFuture;import future.IFuture;//Just expose two methods to the outside public class DelayAdditionFuture extends AbstractFuture<Integer> { @Override public IFuture<Integer> setSuccess(Object result) { return super.setSuccess(result); } @Override public IFuture<Integer> setFailure(Throwable cause) { return super.setFailure(cause); } } You can see that the client does not need to actively ask whether the future is completed, but will automatically call back the operationcompleted method when the future is completed. The client only needs to implement the logic in the callback.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.