Interface ActionListener<Response>

All Known Subinterfaces:
PersistentTasksService.WaitForPersistentTaskListener<P>, RejectAwareActionListener<T>
All Known Implementing Classes:
AbstractThreadedActionListener, ChannelActionListener, ContextPreservingActionListener, CountDownActionListener, DelegatingActionListener, EmptyResponseListener, FinalizeSnapshotContext, GroupedActionListener, LatchedActionListener, ListenableActionFuture, ListenableFuture, LoggingTaskListener, PlainActionFuture, RefCountAwareThreadedActionListener, RestActionListener, RestActions.NodesResponseRestListener, RestBuilderListener, RestChunkedToXContentListener, RestRefCountedChunkedToXContentListener, RestResponseListener, RestToXContentListener, SearchExecutionStatsCollector, SearchProgressActionListener, SnapshotShardContext, SubscribableListener, ThreadedActionListener, UnsafePlainActionFuture

public interface ActionListener<Response>

Callbacks are used extensively throughout Elasticsearch because they enable us to write asynchronous and nonblocking code, i.e. code which doesn't necessarily compute a result straight away but also doesn't block the calling thread waiting for the result to become available. They support several useful control flows:

  • They can be completed immediately on the calling thread.
  • They can be completed concurrently on a different thread.
  • They can be stored in a data structure and completed later on when the system reaches a particular state.
  • Most commonly, they can be passed on to other methods that themselves require a callback.
  • They can be wrapped in another callback which modifies the behaviour of the original callback, perhaps adding some extra code to run before or after completion, before passing them on.

ActionListener is a general-purpose callback interface that is used extensively across the Elasticsearch codebase. ActionListener is used pretty much everywhere that needs to perform some asynchronous and nonblocking computation. The uniformity makes it easier to compose parts of the system together without needing to build adapters to convert back and forth between different kinds of callback. It also makes it easier to develop the skills needed to read and understand all the asynchronous code, although this definitely takes practice and is certainly not easy in an absolute sense. Finally, it has allowed us to build a rich library for working with ActionListener instances themselves, creating new instances out of existing ones and completing them in interesting ways. See for instance:

Callback-based asynchronous code can easily call regular synchronous code, but synchronous code cannot run callback-based asynchronous code without blocking the calling thread until the callback is called back. This blocking is at best undesirable (threads are too expensive to waste with unnecessary blocking) and at worst outright broken (the blocking can lead to deadlock). Unfortunately this means that most of our code ends up having to be written with callbacks, simply because it's ultimately calling into some other code that takes a callback. The entry points for all Elasticsearch APIs are callback-based (e.g. REST APIs all start at BaseRestHandler#prepareRequest and transport APIs all start at TransportAction#doExecute and the whole system fundamentally works in terms of an event loop (an io.netty.channel.EventLoop) which processes network events via callbacks.

ActionListener is not an ad-hoc invention. Formally speaking, it is our implementation of the general concept of a continuation in the sense of continuation-passing style (CPS): an extra argument to a function which defines how to continue the computation when the result is available. This is in contrast to direct style which is the more usual style of calling methods that return values directly back to the caller so they can continue executing as normal. There's essentially two ways that computation can continue in Java (it can return a value or it can throw an exception) which is why ActionListener has both an onResponse(Response) and an onFailure(java.lang.Exception) method.

CPS is strictly more expressive than direct style: direct code can be mechanically translated into continuation-passing style, but CPS also enables all sorts of other useful control structures such as forking work onto separate threads, possibly to be executed in parallel, perhaps even across multiple nodes, or possibly collecting a list of continuations all waiting for the same condition to be satisfied before proceeding (e.g. SubscribableListener amongst many others). Some languages have first-class support for continuations (e.g. the async and await primitives in C#) allowing the programmer to write code in direct style away from those exotic control structures, but Java does not. That's why we have to manipulate all the callbacks ourselves.

Strictly speaking, CPS requires that a computation only continues by calling the continuation. In Elasticsearch, this means that asynchronous methods must have void return type and may not throw any exceptions. This is mostly the case in our code as written today, and is a good guiding principle, but we don't enforce void exceptionless methods and there are some deviations from this rule. In particular, it's not uncommon to permit some methods to throw an exception, using things like run(L, org.elasticsearch.core.CheckedConsumer<L, ? extends java.lang.Exception>) (or an equivalent try ... catch ... block) further up the stack to handle it. Some methods also take (and may complete) an ActionListener parameter, but still return a value separately for other local synchronous work.

This pattern is often used in the transport action layer with the use of the ChannelActionListener class, which wraps a TransportChannel produced by the transport layer.TransportChannel implementations can hold a reference to a Netty channel with which to pass the response back to the network caller. Netty has a many-to-one association of network callers to channels, so a call taking a long time generally won't hog resources: it's cheap. A transport action can take hours to respond and that's alright, barring caller timeouts.

Note that we explicitly avoid CompletableFuture and other similar mechanisms as much as possible. They can achieve the same goals as ActionListener, but can also easily be misused in various ways that lead to severe bugs. In particular, futures support blocking while waiting for a result, but this is almost never appropriate in Elasticsearch's production code where threads are such a precious resource. Moreover if something throws an Error then the JVM should exit pretty much straight away, but CompletableFuture can catch an Error which delays the JVM exit until its result is observed. This may be much later, or possibly even never. It's not possible to introduce such bugs when using ActionListener.

  • Method Details

    • onResponse

      void onResponse(Response response)
      Complete this listener with a successful (or at least, non-exceptional) response.
    • onFailure

      void onFailure(Exception e)
      Complete this listener with an exceptional response.
    • noop

      static <T> ActionListener<T> noop()
      Returns:
      a listener that does nothing
    • map

      default <T> ActionListener<T> map(CheckedFunction<T,Response,Exception> fn)
      Creates a listener that wraps this listener, mapping response values via the given mapping function and passing along exceptions to this instance. Notice that it is considered a bug if the listener's onResponse or onFailure fails. onResponse failures will not call onFailure. If the function fails, the listener's onFailure handler will be called. The principle is that the mapped listener will handle exceptions from the mapping function fn but it is the responsibility of delegate to handle its own exceptions inside `onResponse` and `onFailure`.
      Type Parameters:
      T - Response type of the wrapped listener
      Parameters:
      fn - Function to apply to listener response
      Returns:
      a listener that maps the received response and then passes it to this instance
    • safeMap

      default <T> ActionListener<T> safeMap(Function<T,Response> fn)
      Same as map(CheckedFunction) except that fn is expected to never throw.
    • delegateResponse

      Creates a listener that delegates all responses it receives to this instance.
      Parameters:
      bc - BiConsumer invoked with delegate listener and exception
      Returns:
      Delegating listener
    • delegateFailure

      default <T> ActionListener<T> delegateFailure(BiConsumer<ActionListener<Response>,T> bc)
      Creates a new listener, wrapping this one, that overrides onResponse(Response) handling with the given bc consumer. onFailure(Exception) handling is delegated to the original listener. Exceptions in onResponse(Response) are forbidden.
      Type Parameters:
      T - Type of the delegating listener's response
      Parameters:
      bc - BiConsumer invoked via onResponse(Response) with the original listener and the response with which the new listener was completed.
      Returns:
      a new listener that delegates failures to this listener and runs bc on a response.
    • delegateFailureAndWrap

      default <T> ActionListener<T> delegateFailureAndWrap(CheckedBiConsumer<ActionListener<Response>,T,? extends Exception> bc)
      Same as delegateFailure(BiConsumer) except that any failure thrown by bc or the original listener's onResponse(Response) will be passed to the original listener's onFailure(Exception).
    • delegateFailureIgnoreResponseAndWrap

      default <T> ActionListener<T> delegateFailureIgnoreResponseAndWrap(CheckedConsumer<ActionListener<Response>,? extends Exception> c)
      Same as delegateFailureAndWrap(CheckedBiConsumer) except that the response is ignored and not passed to the delegate.
    • releasing

      static <Response> ActionListener<Response> releasing(Releasable releasable)
      Creates a listener which releases the given resource on completion (whether success or failure)
    • running

      static <Response> ActionListener<Response> running(Runnable runnable)
      Creates a listener that executes the given runnable on completion (whether successful or otherwise).
      Type Parameters:
      Response - the type of the response, which is ignored.
      Parameters:
      runnable - the runnable that will be called in event of success or failure. This must not throw.
      Returns:
      a listener that executes the given runnable on completion (whether successful or otherwise).
    • wrap

      static <Response> ActionListener<Response> wrap(CheckedConsumer<Response,? extends Exception> onResponse, Consumer<Exception> onFailure)
      Creates a listener that executes the appropriate consumer when the response (or failure) is received. This listener is "wrapped" in the sense that an exception from the onResponse consumer is passed into the onFailure consumer.

      If the onFailure argument is listener::onFailure for some other ActionListener, prefer to use delegateFailureAndWrap(org.elasticsearch.common.CheckedBiConsumer<org.elasticsearch.action.ActionListener<Response>, T, ? extends java.lang.Exception>) instead for performance reasons.

      Type Parameters:
      Response - the type of the response
      Parameters:
      onResponse - the checked consumer of the response, executed when the listener is completed successfully. If it throws an exception, the exception is passed to the onFailure consumer.
      onFailure - the consumer of the failure, executed when the listener is completed with an exception (or it is completed successfully but the onResponse consumer threw an exception).
      Returns:
      a listener that executes the appropriate consumer when the response (or failure) is received.
    • onResponse

      static <Response> void onResponse(Iterable<ActionListener<Response>> listeners, Response response)
      Notifies every given listener with the response passed to onResponse(Object). If a listener itself throws an exception the exception is forwarded to onFailure(Exception). If in turn onFailure(Exception) fails all remaining listeners will be processed and the caught exception will be re-thrown.
    • onFailure

      static <Response> void onFailure(Iterable<ActionListener<Response>> listeners, Exception failure)
      Notifies every given listener with the failure passed to onFailure(Exception). If a listener itself throws an exception all remaining listeners will be processed and the caught exception will be re-thrown.
    • runAfter

      static <Response> ActionListener<Response> runAfter(ActionListener<Response> delegate, Runnable runAfter)
      Wraps a given listener and returns a new listener which executes the provided runAfter callback when the listener is notified via either #onResponse or #onFailure.
    • releaseAfter

      static <Response> ActionListener<Response> releaseAfter(ActionListener<Response> delegate, Releasable releaseAfter)
      Wraps a given listener and returns a new listener which releases the provided releaseAfter resource when the listener is notified via either #onResponse or #onFailure.
    • runBefore

      static <Response> ActionListener<Response> runBefore(ActionListener<Response> delegate, CheckedRunnable<?> runBefore)
      Wraps a given listener and returns a new listener which executes the provided runBefore callback before the listener is notified via either #onResponse or #onFailure. If the callback throws an exception then it will be passed to the listener's #onFailure and its #onResponse will not be executed.
    • releaseBefore

      static <Response> ActionListener<Response> releaseBefore(Releasable releaseBefore, ActionListener<Response> delegate)
      Wraps a given listener and returns a new listener which releases the provided releaseBefore resource before the listener is notified via either #onResponse or #onFailure.
    • notifyOnce

      static <Response> ActionListener<Response> notifyOnce(ActionListener<Response> delegate)
      Wraps a given listener and returns a new listener which makes sure onResponse(Object) and onFailure(Exception) of the provided listener will be called at most once.
    • completeWith

      static <Response> void completeWith(ActionListener<Response> listener, CheckedSupplier<Response,? extends Exception> supplier)
      Completes the given listener with the result from the provided supplier accordingly. This method is mainly used to complete a listener with a block of synchronous code. If the supplier fails, the listener's onFailure handler will be called. It is the responsibility of delegate to handle its own exceptions inside `onResponse` and `onFailure`.
    • respondAndRelease

      static <R extends RefCounted> void respondAndRelease(ActionListener<R> listener, R response)
      Shorthand for resolving given listener with given response and decrementing the response's ref count by one afterwards.
    • assertOnce

      static <Response> ActionListener<Response> assertOnce(ActionListener<Response> delegate)
      Returns:
      A listener which (if assertions are enabled) wraps around the given delegate and asserts that it is only called once.
    • assertAtLeastOnce

      static <Response> ActionListener<Response> assertAtLeastOnce(ActionListener<Response> delegate)
      Returns:
      A listener which (if assertions are enabled) wraps around the given delegate and asserts that it is called at least once.
    • run

      static <T, L extends ActionListener<T>> void run(L listener, CheckedConsumer<L,? extends Exception> action)
      Execute the given action in a try/catch block which feeds all exceptions to the given listener's onFailure(java.lang.Exception) method.
    • runWithResource

      static <T, R extends AutoCloseable> void runWithResource(ActionListener<T> listener, CheckedSupplier<R,? extends Exception> resourceSupplier, CheckedBiConsumer<ActionListener<T>,R,? extends Exception> action)
      Execute the given action in an (async equivalent of a) try-with-resources block which closes the supplied resource on completion, and feeds all exceptions to the given listener's onFailure(java.lang.Exception) method.
    • withRef

      static <Response> ActionListener<Response> withRef(ActionListener<Response> listener, RefCounted ref)
      Increments ref count and returns a listener that will decrement ref count on listener completion.