- All Known Subinterfaces:
PersistentTasksService.WaitForPersistentTaskListener<P>,RejectAwareActionListener<T>
- All Known Implementing Classes:
AbstractThreadedActionListener,ChannelActionListener,ContextPreservingActionListener,CountDownActionListener,DelegatingActionListener,FinalizeSnapshotContext,GroupedActionListener,LatchedActionListener,ListenableActionFuture,ListenableFuture,LoggingTaskListener,PlainActionFuture,RefCountAwareThreadedActionListener,RestActionListener,RestActions.NodesResponseRestListener,RestBuilderListener,RestChunkedToXContentListener,RestRefCountedChunkedToXContentListener,RestResponseListener,RestToXContentListener,SearchExecutionStatsCollector,SearchProgressActionListener,SnapshotShardContext,SubscribableListener,ThreadedActionListener,UnsafePlainActionFuture
Callbacks are used extensively throughout Elasticsearch because they enable us to write asynchronous and nonblocking code, i.e. code which doesn't necessarily compute a result straight away but also doesn't block the calling thread waiting for the result to become available. They support several useful control flows:
- They can be completed immediately on the calling thread.
- They can be completed concurrently on a different thread.
- They can be stored in a data structure and completed later on when the system reaches a particular state.
- Most commonly, they can be passed on to other methods that themselves require a callback.
- They can be wrapped in another callback which modifies the behaviour of the original callback, perhaps adding some extra code to run before or after completion, before passing them on.
ActionListener is a general-purpose callback interface that is used extensively across the Elasticsearch codebase. ActionListener is used pretty much everywhere that needs to perform some asynchronous and nonblocking computation. The uniformity makes
it easier to compose parts of the system together without needing to build adapters to convert back and forth between different kinds of
callback. It also makes it easier to develop the skills needed to read and understand all the asynchronous code, although this definitely
takes practice and is certainly not easy in an absolute sense. Finally, it has allowed us to build a rich library for working with ActionListener instances themselves, creating new instances out of existing ones and completing them in interesting ways. See for
instance:
- All the static methods on
ActionListeneritself. ThreadedActionListenerfor forking work elsewhere.RefCountingListenerfor running work in parallel.SubscribableListenerfor constructing flexible workflows.
Callback-based asynchronous code can easily call regular synchronous code, but synchronous code cannot run callback-based asynchronous
code without blocking the calling thread until the callback is called back. This blocking is at best undesirable (threads are too
expensive to waste with unnecessary blocking) and at worst outright broken (the blocking can lead to deadlock). Unfortunately this means
that most of our code ends up having to be written with callbacks, simply because it's ultimately calling into some other code that takes
a callback. The entry points for all Elasticsearch APIs are callback-based (e.g. REST APIs all start at BaseRestHandler#prepareRequest and transport APIs all start at TransportAction#doExecute and the whole system fundamentally works in terms of an event loop
(an io.netty.channel.EventLoop) which processes network events via callbacks.
ActionListener is not an ad-hoc invention. Formally speaking, it is our implementation of the general concept of a
continuation in the sense of continuation-passing style
(CPS): an extra argument to a function which defines how to continue the computation when the result is available. This is in contrast to
direct style which is the more usual style of calling methods that return values directly back to the caller so they can continue
executing as normal. There's essentially two ways that computation can continue in Java (it can return a value or it can throw an
exception) which is why ActionListener has both an onResponse(Response) and an onFailure(java.lang.Exception) method.
CPS is strictly more expressive than direct style: direct code can be mechanically translated into continuation-passing style, but CPS
also enables all sorts of other useful control structures such as forking work onto separate threads, possibly to be executed in
parallel, perhaps even across multiple nodes, or possibly collecting a list of continuations all waiting for the same condition to be
satisfied before proceeding (e.g. SubscribableListener amongst many others). Some languages have
first-class support for continuations (e.g. the async and await primitives in C#) allowing the programmer to write code
in direct style away from those exotic control structures, but Java does not. That's why we have to manipulate all the callbacks
ourselves.
Strictly speaking, CPS requires that a computation only continues by calling the continuation. In Elasticsearch, this means that
asynchronous methods must have void return type and may not throw any exceptions. This is mostly the case in our code as written
today, and is a good guiding principle, but we don't enforce void exceptionless methods and there are some deviations from this rule. In
particular, it's not uncommon to permit some methods to throw an exception, using things like run(L, org.elasticsearch.core.CheckedConsumer<L, ? extends java.lang.Exception>) (or an
equivalent try ... catch ... block) further up the stack to handle it. Some methods also take (and may complete) an ActionListener parameter, but still return a value separately for other local synchronous work.
This pattern is often used in the transport action layer with the use of the ChannelActionListener class, which wraps a TransportChannel
produced by the transport layer.TransportChannel implementations can hold a reference to a Netty
channel with which to pass the response back to the network caller. Netty has a many-to-one association of network callers to channels,
so a call taking a long time generally won't hog resources: it's cheap. A transport action can take hours to respond and that's alright,
barring caller timeouts.
Note that we explicitly avoid CompletableFuture and other similar mechanisms as much as possible. They
can achieve the same goals as ActionListener, but can also easily be misused in various ways that lead to severe bugs. In
particular, futures support blocking while waiting for a result, but this is almost never appropriate in Elasticsearch's production code
where threads are such a precious resource. Moreover if something throws an Error then the JVM should exit pretty much straight
away, but CompletableFuture can catch an Error which delays the JVM exit until its result is
observed. This may be much later, or possibly even never. It's not possible to introduce such bugs when using ActionListener.
-
Method Summary
Modifier and TypeMethodDescriptionstatic <Response> ActionListener<Response> assertAtLeastOnce(ActionListener<Response> delegate) static <Response> ActionListener<Response> assertOnce(ActionListener<Response> delegate) static <Response> voidcompleteWith(ActionListener<Response> listener, CheckedSupplier<Response, ? extends Exception> supplier) Completes the given listener with the result from the provided supplier accordingly.default <T> ActionListener<T> Creates a new listener, wrapping this one, that overridesonResponse(Response)handling with the givenbcconsumer.default <T> ActionListener<T> delegateFailureAndWrap(CheckedBiConsumer<ActionListener<Response>, T, ? extends Exception> bc) Same asdelegateFailure(BiConsumer)except that any failure thrown bybcor the original listener'sonResponse(Response)will be passed to the original listener'sonFailure(Exception).default <T> ActionListener<T> delegateFailureIgnoreResponseAndWrap(CheckedConsumer<ActionListener<Response>, ? extends Exception> c) Same asdelegateFailureAndWrap(CheckedBiConsumer)except that the response is ignored and not passed to the delegate.default ActionListener<Response> Creates a listener that delegates all responses it receives to this instance.default <T> ActionListener<T> map(CheckedFunction<T, Response, Exception> fn) Creates a listener that wraps this listener, mapping response values via the given mapping function and passing along exceptions to this instance.static <T> ActionListener<T> noop()static <Response> ActionListener<Response> notifyOnce(ActionListener<Response> delegate) Wraps a given listener and returns a new listener which makes sureonResponse(Object)andonFailure(Exception)of the provided listener will be called at most once.voidComplete this listener with an exceptional response.static <Response> voidonFailure(Iterable<ActionListener<Response>> listeners, Exception failure) Notifies every given listener with the failure passed toonFailure(Exception).static <Response> voidonResponse(Iterable<ActionListener<Response>> listeners, Response response) Notifies every given listener with the response passed toonResponse(Object).voidonResponse(Response response) Complete this listener with a successful (or at least, non-exceptional) response.static <Response> ActionListener<Response> releaseAfter(ActionListener<Response> delegate, Releasable releaseAfter) Wraps a given listener and returns a new listener which releases the providedreleaseAfterresource when the listener is notified via either#onResponseor#onFailure.static <Response> ActionListener<Response> releasing(Releasable releasable) Creates a listener which releases the given resource on completion (whether success or failure)static <R extends RefCounted>
voidrespondAndRelease(ActionListener<R> listener, R response) Shorthand for resolving givenlistenerwith givenresponseand decrementing the response's ref count by one afterwards.static <T,L extends ActionListener<T>>
voidrun(L listener, CheckedConsumer<L, ? extends Exception> action) Execute the given action in atry/catchblock which feeds all exceptions to the given listener'sonFailure(java.lang.Exception)method.static <Response> ActionListener<Response> runAfter(ActionListener<Response> delegate, Runnable runAfter) Wraps a given listener and returns a new listener which executes the providedrunAftercallback when the listener is notified via either#onResponseor#onFailure.static <Response> ActionListener<Response> runBefore(ActionListener<Response> delegate, CheckedRunnable<?> runBefore) Wraps a given listener and returns a new listener which executes the providedrunBeforecallback before the listener is notified via either#onResponseor#onFailure.static <Response> ActionListener<Response> Creates a listener that executes the given runnable on completion (whether successful or otherwise).static <T,R extends AutoCloseable>
voidrunWithResource(ActionListener<T> listener, CheckedSupplier<R, ? extends Exception> resourceSupplier, CheckedBiConsumer<ActionListener<T>, R, ? extends Exception> action) Execute the given action in an (async equivalent of a) try-with-resources block which closes the supplied resource on completion, and feeds all exceptions to the given listener'sonFailure(java.lang.Exception)method.default <T> ActionListener<T> Same asmap(CheckedFunction)except thatfnis expected to never throw.static <Response> ActionListener<Response> withRef(ActionListener<Response> listener, RefCounted ref) Increments ref count and returns a listener that will decrement ref count on listener completion.static <Response> ActionListener<Response> wrap(CheckedConsumer<Response, ? extends Exception> onResponse, Consumer<Exception> onFailure) Creates a listener that executes the appropriate consumer when the response (or failure) is received.
-
Method Details
-
onResponse
Complete this listener with a successful (or at least, non-exceptional) response. -
onFailure
Complete this listener with an exceptional response. -
noop
- Returns:
- a listener that does nothing
-
map
Creates a listener that wraps this listener, mapping response values via the given mapping function and passing along exceptions to this instance. Notice that it is considered a bug if the listener's onResponse or onFailure fails. onResponse failures will not call onFailure. If the function fails, the listener's onFailure handler will be called. The principle is that the mapped listener will handle exceptions from the mapping functionfnbut it is the responsibility ofdelegateto handle its own exceptions inside `onResponse` and `onFailure`.- Type Parameters:
T- Response type of the wrapped listener- Parameters:
fn- Function to apply to listener response- Returns:
- a listener that maps the received response and then passes it to this instance
-
safeMap
Same asmap(CheckedFunction)except thatfnis expected to never throw. -
delegateResponse
default ActionListener<Response> delegateResponse(BiConsumer<ActionListener<Response>, Exception> bc) Creates a listener that delegates all responses it receives to this instance.- Parameters:
bc- BiConsumer invoked with delegate listener and exception- Returns:
- Delegating listener
-
delegateFailure
Creates a new listener, wrapping this one, that overridesonResponse(Response)handling with the givenbcconsumer.onFailure(Exception)handling is delegated to the original listener. Exceptions inonResponse(Response)are forbidden.- Type Parameters:
T- Type of the delegating listener's response- Parameters:
bc-BiConsumerinvoked viaonResponse(Response)with the original listener and the response with which the new listener was completed.- Returns:
- a new listener that delegates failures to this listener and runs
bcon a response.
-
delegateFailureAndWrap
default <T> ActionListener<T> delegateFailureAndWrap(CheckedBiConsumer<ActionListener<Response>, T, ? extends Exception> bc) Same asdelegateFailure(BiConsumer)except that any failure thrown bybcor the original listener'sonResponse(Response)will be passed to the original listener'sonFailure(Exception). -
delegateFailureIgnoreResponseAndWrap
default <T> ActionListener<T> delegateFailureIgnoreResponseAndWrap(CheckedConsumer<ActionListener<Response>, ? extends Exception> c) Same asdelegateFailureAndWrap(CheckedBiConsumer)except that the response is ignored and not passed to the delegate. -
releasing
Creates a listener which releases the given resource on completion (whether success or failure) -
running
Creates a listener that executes the given runnable on completion (whether successful or otherwise).- Type Parameters:
Response- the type of the response, which is ignored.- Parameters:
runnable- the runnable that will be called in event of success or failure. This must not throw.- Returns:
- a listener that executes the given runnable on completion (whether successful or otherwise).
-
wrap
static <Response> ActionListener<Response> wrap(CheckedConsumer<Response, ? extends Exception> onResponse, Consumer<Exception> onFailure) Creates a listener that executes the appropriate consumer when the response (or failure) is received. This listener is "wrapped" in the sense that an exception from theonResponseconsumer is passed into theonFailureconsumer.If the
onFailureargument islistener::onFailurefor some otherActionListener, prefer to usedelegateFailureAndWrap(org.elasticsearch.common.CheckedBiConsumer<org.elasticsearch.action.ActionListener<Response>, T, ? extends java.lang.Exception>)instead for performance reasons.- Type Parameters:
Response- the type of the response- Parameters:
onResponse- the checked consumer of the response, executed when the listener is completed successfully. If it throws an exception, the exception is passed to theonFailureconsumer.onFailure- the consumer of the failure, executed when the listener is completed with an exception (or it is completed successfully but theonResponseconsumer threw an exception).- Returns:
- a listener that executes the appropriate consumer when the response (or failure) is received.
-
onResponse
Notifies every given listener with the response passed toonResponse(Object). If a listener itself throws an exception the exception is forwarded toonFailure(Exception). If in turnonFailure(Exception)fails all remaining listeners will be processed and the caught exception will be re-thrown. -
onFailure
Notifies every given listener with the failure passed toonFailure(Exception). If a listener itself throws an exception all remaining listeners will be processed and the caught exception will be re-thrown. -
runAfter
static <Response> ActionListener<Response> runAfter(ActionListener<Response> delegate, Runnable runAfter) Wraps a given listener and returns a new listener which executes the providedrunAftercallback when the listener is notified via either#onResponseor#onFailure. -
releaseAfter
static <Response> ActionListener<Response> releaseAfter(ActionListener<Response> delegate, Releasable releaseAfter) Wraps a given listener and returns a new listener which releases the providedreleaseAfterresource when the listener is notified via either#onResponseor#onFailure. -
runBefore
static <Response> ActionListener<Response> runBefore(ActionListener<Response> delegate, CheckedRunnable<?> runBefore) Wraps a given listener and returns a new listener which executes the providedrunBeforecallback before the listener is notified via either#onResponseor#onFailure. If the callback throws an exception then it will be passed to the listener's#onFailureand its#onResponsewill not be executed. -
notifyOnce
Wraps a given listener and returns a new listener which makes sureonResponse(Object)andonFailure(Exception)of the provided listener will be called at most once. -
completeWith
static <Response> void completeWith(ActionListener<Response> listener, CheckedSupplier<Response, ? extends Exception> supplier) Completes the given listener with the result from the provided supplier accordingly. This method is mainly used to complete a listener with a block of synchronous code. If the supplier fails, the listener's onFailure handler will be called. It is the responsibility ofdelegateto handle its own exceptions inside `onResponse` and `onFailure`. -
respondAndRelease
Shorthand for resolving givenlistenerwith givenresponseand decrementing the response's ref count by one afterwards. -
assertOnce
- Returns:
- A listener which (if assertions are enabled) wraps around the given delegate and asserts that it is only called once.
-
assertAtLeastOnce
- Returns:
- A listener which (if assertions are enabled) wraps around the given delegate and asserts that it is called at least once.
-
run
static <T,L extends ActionListener<T>> void run(L listener, CheckedConsumer<L, ? extends Exception> action) Execute the given action in atry/catchblock which feeds all exceptions to the given listener'sonFailure(java.lang.Exception)method. -
runWithResource
static <T,R extends AutoCloseable> void runWithResource(ActionListener<T> listener, CheckedSupplier<R, ? extends Exception> resourceSupplier, CheckedBiConsumer<ActionListener<T>, R, ? extends Exception> action) Execute the given action in an (async equivalent of a) try-with-resources block which closes the supplied resource on completion, and feeds all exceptions to the given listener'sonFailure(java.lang.Exception)method. -
withRef
static <Response> ActionListener<Response> withRef(ActionListener<Response> listener, RefCounted ref) Increments ref count and returns a listener that will decrement ref count on listener completion.
-