Package org.elasticsearch.xpack.esql.expression.function.aggregate


package org.elasticsearch.xpack.esql.expression.function.aggregate
Functions that aggregate values, with or without grouping within buckets. Used in `STATS` and similar commands.

Guide to adding new aggregate function

  1. Aggregation functions are more complex than scalar functions, so it’s a good idea to discuss the new function with the ESQL team before starting to implement it.

    You may also discuss its implementation, as aggregations may require special performance considerations.

  2. To learn the basics about making functions, check org.elasticsearch.xpack.esql.expression.function.scalar.

    It has the guide to making a simple function, which should be a good base to start doing aggregations.

  3. Pick one of the csv-spec files in x-pack/plugin/esql/qa/testFixtures/src/main/resources/ and add a test for the function you want to write. These files are roughly themed but there isn’t a strong guiding principle in the organization.
  4. Rerun the CsvTests and watch your new test fail.
  5. Find an aggregate function in this package similar to the one you are working on and copy it to build yours. Your function might extend from the available abstract classes. Check the javadoc of each before using them:
  6. Fill the required methods in your new function. Check their JavaDoc for more information. Here are some of the important ones:
    • Constructor: Review the constructor annotations, and make sure to add the correct types and descriptions.
    • resolveType: Check the metadata of your function parameters. This may include types, whether they are foldable or not, or their possible values.
    • dataType: This will return the datatype of your function. May be based on its current parameters.
    • Implement SurrogateExpression, and its required SurrogateExpression.surrogate() method.

      It’s used to be able to fold the aggregation when it receives only literals, or when the aggregation can be simplified.

    Finally, implement ToAggregator (More information about aggregators below). The only case when this interface is not required is when it always returns another function in its surrogate.
  7. To introduce your aggregation to the engine:

Creating aggregators for your function

Aggregators contain the core logic of how to combine values, what to store, how to process data, etc. Currently, we rely on code generation (per aggregation per type) in order to implement such functionality. This approach was picked for performance reasons (namely to avoid virtual method calls and boxing types). As a result we could not rely on interfaces implementation and generics.

In order to implement aggregation logic create your class (typically named "${FunctionName}${Type}Aggregator"). It must be placed in `org.elasticsearch.compute.aggregation` in order to be picked up by code generation. Annotate it with Aggregator and GroupingAggregator The first one is responsible for an entire data set aggregation, while the second one is responsible for grouping within buckets.

Before you start implementing it, please note that:

  • All methods must be public static
  • init/initSingle/initGrouping could have optional BigArrays or DriverContext arguments that are going to be injected automatically. It is also possible to declare any number of arbitrary arguments that must be provided via generated Supplier.
  • combine, combineStates, combineIntermediate, evaluateFinal methods (see below) could be generated automatically when both input type I and mutable accumulator state AggregatorState and GroupingAggregatorState are primitive (DOUBLE, INT).
  • Code generation expects at least one IntermediateState field that is going to be used to keep the serialized state of the aggregation (eg AggregatorState and GroupingAggregatorState). It must be defined even if you rely on autogenerated implementation for the primitive types.

Aggregation expects:

  • type AggregatorState (a mutable state used to accumulate result of the aggregation) to be public, not inner and implements AggregatorState
  • type I (input to your aggregation function), usually primitive types and BytesRef
  • AggregatorState init() or AggregatorState initSingle() returns empty initialized aggregation state
  • void combine(AggregatorState state, I input) or AggregatorState combine(AggregatorState state, I input) adds input entry to the aggregation state
  • void combineIntermediate(AggregatorState state, intermediate states) adds serialized aggregation state to the current aggregation state (used to combine results across different nodes)
  • Block evaluateFinal(AggregatorState state, DriverContext) converts the inner state of the aggregation to the result column

Grouping aggregation expects:

  • type GroupingAggregatorState (a mutable state used to accumulate result of the grouping aggregation) to be public, not inner and implements GroupingAggregatorState
  • type I (input to your aggregation function), usually primitive types and BytesRef
  • GroupingAggregatorState init() or GroupingAggregatorState initGrouping() returns empty initialized grouping aggregation state
  • void combine(GroupingAggregatorState state, int groupId, I input) adds input entry to the corresponding group (bucket) of the grouping aggregation state
  • void combineStates(GroupingAggregatorState targetState, int targetGroupId, GS otherState, int otherGroupId) merges other grouped aggregation state into the first one
  • void combineIntermediate(GroupingAggregatorState current, int groupId, intermediate states) adds serialized aggregation state to the current grouped aggregation state (used to combine results across different nodes)
  • Block evaluateFinal(GroupingAggregatorState state, IntVectorSelected, DriverContext) converts the inner state of the grouping aggregation to the result column
  1. Copy an existing aggregator to use as a base. You'll usually make one per type. Check other classes to see the naming pattern. You can find them in org.elasticsearch.compute.aggregation.

    Note that some aggregators are autogenerated, so they live in different directories. The base is x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/

  2. Implement (or create an empty) methods according to the above list. Also check Aggregator JavaDoc as it contains generated method usage.
  3. Make a test for your aggregator. You can copy an existing one from x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/.

    Tests extending from org.elasticsearch.compute.aggregation.AggregatorFunctionTestCase will already include most required cases. You should only need to fill the required abstract methods.

  4. Code generation is triggered when running the tests. Run the CsvTests to generate the code. Generated code should include:

    One of them will be the AggregatorFunctionSupplier for your aggregator. Find it by its name (<Aggregation-name><Type>AggregatorFunctionSupplier), and return it in the toSupplier method in your function, under the correct type condition.

  5. Now, complete the implementation of the aggregator, until the tests pass!

StringTemplates

Making an aggregator per type may be repetitive. To avoid code duplication, we use StringTemplates:

  1. Create a new StringTemplate file. Use another as a reference, like x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-TopAggregator.java.st.
  2. Add the template scripts to x-pack/plugin/esql/compute/build.gradle.

    You can also see there which variables you can use, and which types are currently supported.

  3. After completing your template, run the generation with ./gradlew :x-pack:plugin:esql:compute:compileJava.

    You may need to tweak some import orders per type so they don’t raise warnings.