Elasticsearch in Action: Pipeline Aggregations (2/2)
Excerpts taken from my upcoming book: Elasticsearch in Action
The excerpts are taken from my book Elasticsearch in Action, Second Edition. The code is available in my GitHub repository. You can find executable Kibana scripts in the repository so you can run the commands in Kibana straight away. All code is tested against Elasticsearch 8.4 version.
In the last article, we learned what pipeline aggregations are and their syntax. In this article, we look at a few aggregations in practice.
Cumulative sum parent aggregation
To collect the cumulative sum of coffees sold each day, we can chain the coffee sales by day and pass the results to the cumulative_sum
pipeline aggregation. The code snippet in the following listing fetches the cumulative sum of cappuccinos sold each day by using this pipeline aggregation.
GET coffee_sales/_search
{
"size": 0,
"aggs": {
"sales_by_coffee": {
"date_histogram": {
"field": "date",
"calendar_interval": "1d"
},
"aggs": {
"cappuccino_sales": {
"sum": {
"field": "sales.cappuccino"
}
},
"total_cappuccinos": {
"cumulative_sum": {
"buckets_path": "cappuccino_sales"
}
}
}
}
}
}
Let’s dissect the aggregation in the previous listing. We have a sales_by_coffee
aggregation, which is a date_histogram
that brings all the dates and the documents that fall within those dates (so far, we only have two dates). We also have a sub-aggregation (cappuccino_sales
) that sums the sales figures for cappuccinos for that bucket.
The highlighted portion of the code is the parent pipeline aggregation (total_cappuccinos
). It fetches the cumulative coffee sales per day. This is called a parent pipeline aggregation because it is applied in the scope of its parent, the cappuccino_sales
aggregation. The following code snippet shows the result of the aggregation:
"aggregations" : {
"sales_by_coffee" : {
"buckets" : [
{
"key_as_string" : "2022-09-01T00:00:00.000Z",
"key" : 1661990400000,
"doc_count" : 1,
"cappuccino_sales" : {
"value" : 23.0
},
"total_cappuccinos" : {
"value" : 23.0
}
},
{
"key_as_string" : "2022-09-02T00:00:00.000Z",
"key" : 1662076800000,
"doc_count" : 1,
"cappuccino_sales" : {
"value" : 40.0
},
"total_cappuccinos" : {
"value" : 63.0
}
}
]
}
}
Let’s go over the result for a moment. As you can see, the buckets are segregated by dates (check key_as_string
) due to the date_histogram
aggregation at the top of the query. We also created a sub-aggregation (cappuccino_sales
) that fetches the number of cappuccinos sold daily (per bucket).
The final part of the result is the cumulatively totalled sum of cappuccinos (total_cappuccinos
) added to the existing bucket. Notice that on day 2, the total cappuccinos were 63 (23 from the first day and 40 from the second day).
While the cumulative sum total of cappuccinos is an existing parent bucket level, finding the maximum or minimum coffees sold in a set of buckets is at a sibling level. For that, we will need to create an aggregation at the same level as the main aggregation, which is why the aggregation is called a sibling aggregation.
Let’s say we want to find which day the most cappuccinos were sold or, conversely, on which day the least number of cappuccinos were sold. To do this, we need to utilize the pipeline aggregation’s max_bucket
and min_bucket
aggregations, which the next section covers.
Max and min sibling pipeline aggregations
Elasticsearch provides a pipeline aggregation called max_bucket
to fetch the top bucket from the given set of buckets fetched from the other aggregations. Remember, the pipeline aggregation takes the input of other aggregations to calculate its own aggregation.
The max_bucket aggregation
The query in the following listing enhances the aggregation we performed in the last section. It does this by adding a max_bucket
function.
GET coffee_sales/_search
{
"size": 0,
"aggs": {
"sales_by_coffee": {
"date_histogram": {
"field": "date",
"calendar_interval": "1d"
},
"aggs": {
"cappuccino_sales": {
"sum": {
"field": "sales.cappuccino"
}
}
}
},
"highest_cappuccino_sales_bucket":{
"max_bucket": {
"buckets_path": "sales_by_coffee>cappuccino_sales"
}
}
}
}
As you can see in the highlighted code, the highest_cappuccino_sales_bucket
is the custom name given to the sibling pipeline aggregation that we are about to perform. We declare the max_bucket
aggregation at the same level as the sales_by_coffee
aggregation; hence, it is called a sibling aggregation. This expects a buckets_path
, which is the combination of the aggregations sales_by_coffee
and cappuccino_sales
. (These two were the result of bucket and metric aggregations on the data.) Once executed, we get this response:
"aggregations" : {
"sales_by_coffee" : {
"buckets" : [{
"key_as_string" : "2022-09-01T00:00:00.000Z",
"key" : 1661990400000,
"doc_count" : 1,
"cappuccino_sales" : {
"value" : 23.0
},{
"key_as_string" : "2022-09-02T00:00:00.000Z",
"key" : 1662076800000,
"doc_count" : 1,
"cappuccino_sales" : {
"value" : 40.0
}
}]
},
"highest_cappuccino_sales_bucket" : {
"value" : 40.0,
"keys" : [
"2022-09-02T00:00:00.000Z"
]
}
}
In this snippet, the highlighted portion indicates the highest_cappuccino_sales_bucket
information. The date 2022–09–02
(September 2, 2022) is the one where the most cappuccinos were sold.
The min_bucket aggregation
We can also fetch the days where the cappuccinos were not sold as much. To do this, we need to use the min_bucket
pipeline aggregation. Replace the highlighted code in the previous listing with the code in the following snippet:
..
"lowest_cappuccino_sales_bucket":{
"min_bucket": {
"buckets_path": "sales_by_coffee>cappuccino_sales"
}
}
This should yield the lowest number of cappuccinos that were sold on a particular day (Sept 1, 2022, in this case). The following response demonstrates this:
"lowest_cappuccino_sales_bucket" : {
"value" : 23.0,
"keys" : [
"2022-09-01T00:00:00.000Z"
]
}
There are a handful of pipeline aggregations like the metric and bucket aggregations. I also advise you to check the official documentation when you work with a particular aggregation.