Package org.elasticsearch.xpack.esql.expression.function.aggregate
Guide to adding new aggregate function
-
Aggregation functions are more complex than scalar functions, so it’s a good idea to discuss
the new function with the ESQL team before starting to implement it.
You may also discuss its implementation, as aggregations may require special performance considerations.
-
To learn the basics about making functions, check
org.elasticsearch.xpack.esql.expression.function.scalar.It has the guide to making a simple function, which should be a good base to start doing aggregations.
-
Pick one of the csv-spec files in
x-pack/plugin/esql/qa/testFixtures/src/main/resources/and add a test for the function you want to write. These files are roughly themed but there isn’t a strong guiding principle in the organization. -
Rerun the
CsvTestsand watch your new test fail. -
Find an aggregate function in this package similar to the one you are working on and copy it to build
yours.
Your function might extend from the available abstract classes. Check the javadoc of each before using them:
-
AggregateFunction: The base class for aggregates -
NumericAggregate: Aggregation for numeric values -
SpatialAggregateFunction: Aggregation for spatial values
-
-
Fill the required methods in your new function. Check their JavaDoc for more information.
Here are some of the important ones:
-
Constructor: Review the constructor annotations, and make sure to add the correct types and descriptions.
FunctionInfo, for the constructor itselfParam, for the function parameters
-
resolveType: Check the metadata of your function parameters. This may include types, whether they are foldable or not, or their possible values. -
dataType: This will return the datatype of your function. May be based on its current parameters. -
Implement
SurrogateExpression, and its requiredSurrogateExpression.surrogate()method.It’s used to be able to fold the aggregation when it receives only literals, or when the aggregation can be simplified.
ToAggregator(More information about aggregators below). The only case when this interface is not required is when it always returns another function in its surrogate. -
Constructor: Review the constructor annotations, and make sure to add the correct types and descriptions.
-
To introduce your aggregation to the engine:
-
Implement serialization for your aggregation by implementing
NamedWriteable.getWriteableName(),Writeable.writeTo(org.elasticsearch.common.io.stream.StreamOutput), and a deserializing constructor. Then add anNamedWriteableRegistry.Entryconstant and add that constant to the list inAggregateWritables.getNamedWriteables(). -
Add it to
EsqlFunctionRegistry.
-
Implement serialization for your aggregation by implementing
Creating aggregators for your function
Aggregators contain the core logic of how to combine values, what to store, how to process data, etc. Currently, we rely on code generation (per aggregation per type) in order to implement such functionality. This approach was picked for performance reasons (namely to avoid virtual method calls and boxing types). As a result we could not rely on interfaces implementation and generics.
In order to implement aggregation logic create your class (typically named "${FunctionName}${Type}Aggregator").
It must be placed in `org.elasticsearch.compute.aggregation` in order to be picked up by code generation.
Annotate it with Aggregator and GroupingAggregator
The first one is responsible for an entire data set aggregation, while the second one is responsible for grouping within buckets.
Before you start implementing it, please note that:
- All methods must be public static
-
init/initSingle/initGroupingcould have optionalBigArraysorDriverContextarguments that are going to be injected automatically. It is also possible to declare any number of arbitrary arguments that must be provided via generated Supplier. -
combine, combineStates, combineIntermediate, evaluateFinalmethods (see below) could be generated automatically when both input type I and mutable accumulator state AggregatorState and GroupingAggregatorState are primitive (DOUBLE, INT). - Code generation expects at least one IntermediateState field that is going to be used to keep the serialized state of the aggregation (eg AggregatorState and GroupingAggregatorState). It must be defined even if you rely on autogenerated implementation for the primitive types.
Aggregation expects:
-
type AggregatorState (a mutable state used to accumulate result of the aggregation) to be public, not inner and implements
AggregatorState - type I (input to your aggregation function), usually primitive types and
BytesRef AggregatorState init()orAggregatorState initSingle()returns empty initialized aggregation state-
void combine(AggregatorState state, I input)orAggregatorState combine(AggregatorState state, I input)adds input entry to the aggregation state -
void combineIntermediate(AggregatorState state, intermediate states)adds serialized aggregation state to the current aggregation state (used to combine results across different nodes) -
Block evaluateFinal(AggregatorState state, DriverContext)converts the inner state of the aggregation to the result column
Grouping aggregation expects:
-
type GroupingAggregatorState (a mutable state used to accumulate result of the grouping aggregation) to be public,
not inner and implements
GroupingAggregatorState - type I (input to your aggregation function), usually primitive types and
BytesRef -
GroupingAggregatorState init()orGroupingAggregatorState initGrouping()returns empty initialized grouping aggregation state -
void combine(GroupingAggregatorState state, int groupId, I input)adds input entry to the corresponding group (bucket) of the grouping aggregation state -
void combineStates(GroupingAggregatorState targetState, int targetGroupId, GS otherState, int otherGroupId)merges other grouped aggregation state into the first one -
void combineIntermediate(GroupingAggregatorState current, int groupId, intermediate states)adds serialized aggregation state to the current grouped aggregation state (used to combine results across different nodes) -
Block evaluateFinal(GroupingAggregatorState state, IntVectorSelected, DriverContext)converts the inner state of the grouping aggregation to the result column
-
Copy an existing aggregator to use as a base. You'll usually make one per type. Check other classes to see the naming pattern.
You can find them in
org.elasticsearch.compute.aggregation.Note that some aggregators are autogenerated, so they live in different directories. The base is
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ -
Implement (or create an empty) methods according to the above list.
Also check
AggregatorJavaDoc as it contains generated method usage. -
Make a test for your aggregator.
You can copy an existing one from
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/.Tests extending from
org.elasticsearch.compute.aggregation.AggregatorFunctionTestCasewill already include most required cases. You should only need to fill the required abstract methods. -
Code generation is triggered when running the tests.
Run the CsvTests to generate the code. Generated code should include:
One of them will be the
AggregatorFunctionSupplierfor your aggregator. Find it by its name (<Aggregation-name><Type>AggregatorFunctionSupplier), and return it in thetoSuppliermethod in your function, under the correct type condition. - Now, complete the implementation of the aggregator, until the tests pass!
StringTemplates
Making an aggregator per type may be repetitive. To avoid code duplication, we use StringTemplates:
-
Create a new StringTemplate file.
Use another as a reference, like
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-TopAggregator.java.st. -
Add the template scripts to
x-pack/plugin/esql/compute/build.gradle.You can also see there which variables you can use, and which types are currently supported.
-
After completing your template, run the generation with
./gradlew :x-pack:plugin:esql:compute:compileJava.You may need to tweak some import orders per type so they don’t raise warnings.
-
ClassesClassDescriptionA type of
Functionthat takes multiple values and extracts a single value out of them.Similar toAvg, but it is used to calculate the average value over a time series of values from the given field.Similar toCountDistinct, but it is used to calculate the distinct count of values over a time series from the given field.Similar toCount, but it is used to calculate the count of values over a time series from the given field.Basic wrapper for expressions declared with a nested filter (typically in stats).Similar toMax, but it is used to calculate the maximum value over a time series of values from the given field.Similar toMin, but it is used to calculate the minimum value over a time series of values from the given field.Aggregate function that receives a numeric, signed field, and returns a single double value.All spatial aggregate functions extend this class to enable the planning of reading from doc values for higher performance.Calculate spatial centroid of all geo_point or cartesian point values of a field in matching documents.Calculate spatial extent of all values of a field in matching documents.Sum all values of a field in matching documents.Similar toSum, but it is used to calculate the sum of values over a time series from the given field.An internal aggregate function that always emits intermediate (or partial) output regardless of the aggregate mode.