Class SubscribableListener<T>
- All Implemented Interfaces:
ActionListener<T>
- Direct Known Subclasses:
ListenableActionFuture,ListenableFuture
ActionListener to which other ActionListener instances can subscribe, such that when this listener is completed it
fans-out its result to the subscribed listeners.
If this listener is complete, addListener(org.elasticsearch.action.ActionListener<T>) completes the subscribing listener immediately with the result with which this
listener was completed. Otherwise, the subscribing listener is retained and completed when this listener is completed.
Exceptions are passed to subscribed listeners without modification. ListenableActionFuture and ListenableFuture are
subclasses which modify the exceptions passed to subscribed listeners.
If this listener is completed more than once then all results other than the first (whether successful or otherwise) are silently discarded. All subscribed listeners will be notified of the same result, exactly once, even if several completions occur concurrently.
A sequence of async steps can be chained together using a series of SubscribableListeners, similar to CompletionStage
(without the catch (Throwable t)). Listeners can be created for each step, where the next step subscribes to the result of the
previous, using utilities like andThen(CheckedBiConsumer). The following example demonstrates how this might be used:
private void exampleAsyncMethod(String request, List<Long> items, ActionListener<Boolean> finalListener) {
SubscribableListener
// Start the chain and run the first step by creating a SubscribableListener using newForked():
.<String>newForked(l -> firstAsyncStep(request, l))
// Run a second step when the first step completes using andThen(); if the first step fails then the exception falls through to
// the end without executing the intervening steps.
.<Integer>andThen((l, firstStepResult) -> secondAsyncStep(request, firstStepResult, l))
// Run another step when the second step completes with another andThen() call; as above this only runs if the first two steps
// succeed.
.<Boolean>andThen((l, secondStepResult) -> {
if (condition) {
// Steps are exception-safe: an exception thrown here will be passed to the listener rather than escaping to the
// caller.
throw new IOException("failure");
}
// Steps can fan out to multiple subsidiary async actions using utilities like RefCountingListener.
final var result = new AtomicBoolean();
try (var listeners = new RefCountingListener(l.map(v -> result.get()))) {
for (final var item : items) {
thirdAsyncStep(secondStepResult, item, listeners.acquire());
}
}
})
// Synchronous (non-forking) steps which do not return a result can be expressed using andThenAccept() with a consumer:
.andThenAccept(thirdStepResult -> {
if (condition) {
// andThenAccept() is also exception-safe
throw new ElasticsearchException("some other problem");
}
consumeThirdStepResult(thirdStepResult);
})
// Synchronous (non-forking) steps which do return a result can be expressed using andThenApply() with a function:
.andThenApply(voidFromStep4 -> {
if (condition) {
// andThenApply() is also exception-safe
throw new IllegalArgumentException("failure");
}
return computeFifthStepResult();
})
// To complete the chain, add the outer listener which will be completed with the result of the previous step if all steps were
// successful, or the exception if any step failed.
.addListener(finalListener);
}
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionfinal voidaddListener(ActionListener<T> listener) Add a listener to this listener's collection of subscribers.final voidaddListener(ActionListener<T> listener, Executor executor, ThreadContext threadContext) Add a listener to this listener's collection of subscribers.voidaddTimeout(TimeValue timeout, ThreadPool threadPool, Executor timeoutExecutor) Adds a timeout to this listener, such that if the timeout elapses before the listener is completed then it will be completed with anElasticsearchTimeoutException.<U> SubscribableListener<U> andThen(Executor executor, ThreadContext threadContext, CheckedBiConsumer<ActionListener<U>, T, ? extends Exception> nextStep) Creates and returns a newSubscribableListenerLand subscribesnextStepto this listener such that if this listener is completed successfully with resultRthennextStepis invoked with argumentsLandR.<U> SubscribableListener<U> andThen(CheckedBiConsumer<ActionListener<U>, T, ? extends Exception> nextStep) Creates and returns a newSubscribableListenerLand subscribesnextStepto this listener such that if this listener is completed successfully with resultRthennextStepis invoked with argumentsLandR.<U> SubscribableListener<U> andThen(CheckedConsumer<ActionListener<U>, ? extends Exception> nextStep) Creates and returns a newSubscribableListenerLand subscribesnextStepto this listener such that if this listener is completed successfully then the result is discarded andnextStepis invoked with argumentL.andThenAccept(CheckedConsumer<T, Exception> consumer) Creates and returns a newSubscribableListenerLsuch that if this listener is completed successfully with resultRthenconsumeris applied to argumentR, andLis completed withnullwhenconsumerreturns.<U> SubscribableListener<U> andThenApply(CheckedFunction<T, U, Exception> fn) Creates and returns a newSubscribableListenerLsuch that if this listener is completed successfully with resultRthenfnis invoked with argumentR, andLis completed with the result of that invocation.final booleanisDone()static <T> SubscribableListener<T> Create aSubscribableListenerwhich has already failed with the given exception.static <T> SubscribableListener<T> newForked(CheckedConsumer<ActionListener<T>, ? extends Exception> fork) Create aSubscribableListener, fork a computation to complete it, and return the listener.static <T> SubscribableListener<T> newSucceeded(T result) Create aSubscribableListenerwhich has already succeeded with the given result.static <T> SubscribableListener<T> Same asnewSucceeded(Object)but always returns the same instance with result valuenull.final voidComplete this listener with an exceptional response.final voidonResponse(T result) Complete this listener with a successful (or at least, non-exceptional) response.protected final Tprotected static RuntimeExceptionprotected ExceptionwrapException(Exception exception) Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.elasticsearch.action.ActionListener
delegateFailure, delegateFailureAndWrap, delegateFailureIgnoreResponseAndWrap, delegateResponse, map, safeMap
-
Constructor Details
-
SubscribableListener
public SubscribableListener()Create aSubscribableListenerwhich is incomplete.
-
-
Method Details
-
newSucceeded
Create aSubscribableListenerwhich has already succeeded with the given result. -
newFailed
Create aSubscribableListenerwhich has already failed with the given exception. -
newForked
public static <T> SubscribableListener<T> newForked(CheckedConsumer<ActionListener<T>, ? extends Exception> fork) Create aSubscribableListener, fork a computation to complete it, and return the listener. If the forking itself throws an exception then the exception is caught and fed to the returned listener. -
addListener
Add a listener to this listener's collection of subscribers. If this listener is complete, this method completes the subscribing listener immediately with the result with which this listener was completed. Otherwise, the subscribing listener is retained and completed when this listener is completed.Subscribed listeners must not throw any exceptions.
Listeners added strictly before this listener is completed will themselves be completed in the order in which their subscriptions were received. However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with (or after) the completion of this listener.
If the subscribed listener is not completed immediately then it will be completed on the thread, and in the
ThreadContext, of the thread which completes this listener. In other words, if you want to ensure thatlisteneris completed using a particular executor, then you must do both of:- Ensure that this
SubscribableListeneris always completed using that executor, and - Invoke
addListener(org.elasticsearch.action.ActionListener<T>)using that executor.
- Ensure that this
-
addListener
public final void addListener(ActionListener<T> listener, Executor executor, @Nullable ThreadContext threadContext) Add a listener to this listener's collection of subscribers. If this listener is complete, this method completes the subscribing listener immediately with the result with which this listener was completed. Otherwise, the subscribing listener is retained and completed when this listener is completed.Subscribed listeners must not throw any exceptions.
Listeners added strictly before this listener is completed will themselves be completed in the order in which their subscriptions were received. However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with (or after) the completion of this listener.
- Parameters:
executor- If notEsExecutors.DIRECT_EXECUTOR_SERVICE, and the subscribing listener is not completed immediately, then it will be completed using the given executor. If the subscribing listener is completed immediately then this completion happens on the subscribing thread.This behaviour may seem complex at first sight, but it is like this to allow callers to ensure that
listeneris completed using a particular executor much more cheaply than simply always forking the completion task to the desired executor. To ensure thatlisteneris completed using a particular executor, do both of the following:- Pass the desired executor in as
executor, and - Invoke
addListener(org.elasticsearch.action.ActionListener<T>)using that executor.
If you really want to fork the completion task to a specific executor in all circumstances, wrap the supplied
listenerin aThreadedActionListeneryourself. But do note that this can be surprisingly expensive, and it's almost always not the right approach, so it is deliberate that there is no convenient method onSubscribableListenerwhich does this for you.If
executorrejects the execution of the completion of the subscribing listener then the result is discarded and the subscribing listener is completed with a rejection exception on the thread which completes this listener.- Pass the desired executor in as
threadContext- If notnull, and the subscribing listener is not completed immediately, then it will be completed in the given thread context. Ifnull, and the subscribing listener is not completed immediately, then it will be completed in theThreadContextof the completing thread. If the subscribing listener is completed immediately then this completion happens in theThreadContextof the subscribing thread.
-
onResponse
Description copied from interface:ActionListenerComplete this listener with a successful (or at least, non-exceptional) response.- Specified by:
onResponsein interfaceActionListener<T>
-
onFailure
Description copied from interface:ActionListenerComplete this listener with an exceptional response.- Specified by:
onFailurein interfaceActionListener<T>
-
wrapException
-
isDone
public final boolean isDone()- Returns:
trueif and only if this listener has been completed (either successfully or exceptionally).
-
rawResult
- Returns:
- the result with which this listener completed successfully, or throw the exception with which it failed.
- Throws:
AssertionError- if this listener is not complete yet and assertions are enabled.IllegalStateException- if this listener is not complete yet and assertions are disabled.Exception
-
wrapAsExecutionException
-
andThen
public <U> SubscribableListener<U> andThen(CheckedConsumer<ActionListener<U>, ? extends Exception> nextStep) Creates and returns a newSubscribableListenerLand subscribesnextStepto this listener such that if this listener is completed successfully then the result is discarded andnextStepis invoked with argumentL. If this listener is completed with exceptionEthen so isL.This can be used to construct a sequence of async actions, each ignoring the result of the previous ones:
l.andThen(l1 -> forkAction1(args1, l1)).andThen(l2 -> forkAction2(args2, l2)).addListener(finalListener);
After creating this chain, completinglwith a successful response will callforkAction1, which will on completion callforkAction2, which will in turn pass its response tofinalListener. A failure of any step will bypass the remaining steps and ultimately failfinalListener.The threading of the
nextStepcallback is the same as for listeners added withaddListener(org.elasticsearch.action.ActionListener<T>): if this listener is already complete thennextStepis invoked on the thread callingandThen(org.elasticsearch.core.CheckedConsumer<org.elasticsearch.action.ActionListener<U>, ? extends java.lang.Exception>)and in its thread context, but if this listener is incomplete thennextStepis invoked on the completing thread and in its thread context. In other words, if you want to ensure thatnextStepis invoked using a particular executor, then you must do both of:- Ensure that this
SubscribableListeneris always completed using that executor, and - Invoke
andThen(org.elasticsearch.core.CheckedConsumer<org.elasticsearch.action.ActionListener<U>, ? extends java.lang.Exception>)using that executor.
- Ensure that this
-
andThen
public <U> SubscribableListener<U> andThen(CheckedBiConsumer<ActionListener<U>, T, ? extends Exception> nextStep) Creates and returns a newSubscribableListenerLand subscribesnextStepto this listener such that if this listener is completed successfully with resultRthennextStepis invoked with argumentsLandR. If this listener is completed with exceptionEthen so isL.This can be used to construct a sequence of async actions, each invoked with the result of the previous one:
l.andThen((l1, o1) -> forkAction1(o1, args1, l1)).andThen((l2, o2) -> forkAction2(o2, args2, l2)).addListener(finalListener);
After creating this chain, completinglwith a successful response will pass the response toforkAction1, which will on completion pass its response toforkAction2, which will in turn pass its response tofinalListener. A failure of any step will bypass the remaining steps and ultimately failfinalListener.The threading of the
nextStepcallback is the same as for listeners added withaddListener(org.elasticsearch.action.ActionListener<T>): if this listener is already complete thennextStepis invoked on the thread callingandThen(org.elasticsearch.core.CheckedConsumer<org.elasticsearch.action.ActionListener<U>, ? extends java.lang.Exception>)and in its thread context, but if this listener is incomplete thennextStepis invoked on the completing thread and in its thread context. In other words, if you want to ensure thatnextStepis invoked using a particular executor, then you must do both of:- Ensure that this
SubscribableListeneris always completed using that executor, and - Invoke
andThen(org.elasticsearch.core.CheckedConsumer<org.elasticsearch.action.ActionListener<U>, ? extends java.lang.Exception>)using that executor.
- Ensure that this
-
andThen
public <U> SubscribableListener<U> andThen(Executor executor, @Nullable ThreadContext threadContext, CheckedBiConsumer<ActionListener<U>, T, ? extends Exception> nextStep) Creates and returns a newSubscribableListenerLand subscribesnextStepto this listener such that if this listener is completed successfully with resultRthennextStepis invoked with argumentsLandR. If this listener is completed with exceptionEthen so isL.This can be used to construct a sequence of async actions, each invoked with the result of the previous one:
l.andThen(x, t, (l1,o1) -> forkAction1(o1,args1,l1)).andThen(x, t, (l2,o2) -> forkAction2(o2,args2,l2)).addListener(finalListener);
After creating this chain, completinglwith a successful response will pass the response toforkAction1, which will on completion pass its response toforkAction2, which will in turn pass its response tofinalListener. A failure of any step will bypass the remaining steps and ultimately failfinalListener.The threading of the
nextStepcallback is the same as for listeners added withaddListener(org.elasticsearch.action.ActionListener<T>): if this listener is already complete thennextStepis invoked on the thread callingandThen(org.elasticsearch.core.CheckedConsumer<org.elasticsearch.action.ActionListener<U>, ? extends java.lang.Exception>)and in its thread context, but if this listener is incomplete thennextStepis invoked usingexecutor, in a thread context captured whenandThen(org.elasticsearch.core.CheckedConsumer<org.elasticsearch.action.ActionListener<U>, ? extends java.lang.Exception>)was called. This behaviour may seem complex at first sight but it is like this to allow callers to ensure thatnextStepruns using a particular executor much more cheaply than simply always forking its execution. To ensure thatnextStepis invoked using a particular executor, do both of the following:- Pass the desired executor in as
executor, and - Invoke
andThen(org.elasticsearch.core.CheckedConsumer<org.elasticsearch.action.ActionListener<U>, ? extends java.lang.Exception>)using that executor.
If you really want to fork the execution of the next step in the sequence to a specific executor in all circumstances, explicitly call
Executor.execute(java.lang.Runnable)withinnextStepyourself. But do note that this can be surprisingly expensive, and it's almost always not the right approach, so it is deliberate that there is no convenient method onSubscribableListenerwhich does this for you.If
executorrejects the execution ofnextStepthen the result is discarded and the returned listener is completed with a rejection exception on the thread which completes this listener. Likewise if this listener is completed exceptionally butexecutorrejects the execution of the completion of the returned listener then the returned listener is completed with a rejection exception on the thread which completes this listener. - Pass the desired executor in as
-
andThenApply
Creates and returns a newSubscribableListenerLsuch that if this listener is completed successfully with resultRthenfnis invoked with argumentR, andLis completed with the result of that invocation. If this listener is completed exceptionally, orfnthrows an exception, thenLis completed with that exception.This is essentially a shorthand for a call to
andThen(org.elasticsearch.core.CheckedConsumer<org.elasticsearch.action.ActionListener<U>, ? extends java.lang.Exception>)with anextStepargument that is fully synchronous.The threading of the
fninvocation is the same as for listeners added withaddListener(org.elasticsearch.action.ActionListener<T>): if this listener is already complete thenfnis invoked on the thread callingandThenApply(org.elasticsearch.core.CheckedFunction<T, U, java.lang.Exception>)and in its thread context, but if this listener is incomplete thenfnis invoked on the thread, and in the thread context, on which this listener is completed. -
andThenAccept
Creates and returns a newSubscribableListenerLsuch that if this listener is completed successfully with resultRthenconsumeris applied to argumentR, andLis completed withnullwhenconsumerreturns. If this listener is completed exceptionally, orconsumerthrows an exception, thenLis completed with that exception.This is essentially a shorthand for a call to
andThen(org.elasticsearch.core.CheckedConsumer<org.elasticsearch.action.ActionListener<U>, ? extends java.lang.Exception>)with anextStepargument that is fully synchronous.The threading of the
consumerinvocation is the same as for listeners added withaddListener(org.elasticsearch.action.ActionListener<T>): if this listener is already complete thenconsumeris invoked on the thread callingandThenAccept(org.elasticsearch.core.CheckedConsumer<T, java.lang.Exception>)and in its thread context, but if this listener is incomplete thenconsumeris invoked on the thread, and in the thread context, on which this listener is completed. -
addTimeout
Adds a timeout to this listener, such that if the timeout elapses before the listener is completed then it will be completed with anElasticsearchTimeoutException.The process which is racing against this timeout should stop and clean up promptly when the timeout occurs to avoid unnecessary work. For instance, it could check that the race is not lost by calling
isDone()whenever appropriate, or it could subscribe another listener which performs any necessary cleanup steps. -
nullSuccess
Same asnewSucceeded(Object)but always returns the same instance with result valuenull.
-