Elasticsearch in Action: Mechanics of Ingest Pipelines (2/3)

Madhusudhan Konda
3 min readJan 17, 2023
Elasticsearch in Action by M Konda

The excerpts are taken from my book Elasticsearch in Action, Second Edition. The code is available in my GitHub repository. You can find executable Kibana scripts in the repository so you can run the commands in Kibana straight away. All code is tested against Elasticsearch 8.4 version.

Part 2/3 of the Ingest Pipelines mini-article series:

  1. Introduction to Ingest Pipelines
  2. Mechanics of Ingest Pipelines
  3. Loading PDFs into Elasticsearch

Overview

In the last article, we learned that Ingest pipelines help transformation and manipulation of data with relatively minimal or no costly setup. A pipeline is made of a chain of processors, each of the processors performing a specific operation on the incoming data. These ingest processors are loaded onto nodes that are assigned a ingest role; each node in a cluster can be assigned to a role — master, data, ingest, machine learning and so on.

Let’s take an example of MI5’s top secret operations data being loaded from a database into Elasticsearch to integrate the search feature. The data that’s extracted from the database must be stamped with an additional category field with “confidential” to begin with before indexing it into Elasticsearch.

The first step in solutionizing this requirement is to create a pipeline with a processor. We need to add a field called “category” as “CONFIDENTIAL” using a set processor.

Creating an Ingest Pipeline

Elasticsearch exposes _ingestAPI to create and test pipelines. We can use the _ingest/pipeline/<pipeline_name URL to create a new pipeline. The following code in the listing below creates a new pipeline with one processor — called setprocessor:

PUT _ingest/pipeline/confidential_files_pipeline 
{
"description": "Stamp confidential on the file (document)",
"processors": [
{
"set": {
"field": "category",
"value": "confidential"
}
}
]
}

Here, as shown in the listing above, we are creating an ingest pipeline called confidential_files_pipelinepipeline with a single setprocessor. The setprocessor’s job is to create a new field called “category” with a value “confidential”. When a new document is passed through this pipeline, the setprocessor will add an additional field called categoryto the document on the fly.

Dry Run the Pipeline

Once the pipeline definition was created, executing it will get them stored in cluster state. It is now ready to be put to use. However, we can dry-run this before we start indexing the data. We dry-run the pipeline using a _simulate API call. The code in the listing below demonstrates the simulation of the above pipeline:

POST _ingest/pipeline/confidential_files_pipeline/_simulate #A
{
"docs": [{
"_source": {
"op_name": "Operation Cobra"
}
}
]
}

Executing the above code will fetch the response as shown below. When we simulate (dry-run) the pipeline, the document will not be indexed instead it will test out the pipeline’s logic.

{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source": {
"category": "confidential",
"op_name": "Operation Cobra"
},
"_ingest": {
"timestamp": "2022-11-03T23:42:33.379569Z"
}
}
}
]
}

The _sourceobject consists of our modified document: an additional field categoryis added by the pipeline. This is the magic of the setprocessor.

Chaining the Processors

As we discussed earlier, we can chain the processors. Say, if we wish to uppercase the above, all you do is add another processor — aptly called uppercase — to the pipeline and re-run the query. The query in the listing shown below adds uppercaseprocessor to the same existing pipeline:

PUT _ingest/pipeline/confidential_files_pipeline 
{
"description": "Stamp confidential on the file (document)",
"processors": [
{
"set": {
"field": "category",
"value": "confidential"
},
"uppercase": {
"field": "category"
}
}
]
}

As the query shows, we added an uppercaseprocessor so that the two processors are chained up: the output from the first processor will become the input to the second processor. The result of the above is shown in the snippet below:

"_source": {
"category": "CONFIDENTIAL",
"op_name": "Operation Cobra"
}

The categoryfield gets added by the setprocessor while the same field gets uppercased by the uppercaseprocessor, yielding the CONFIDENTIALfield on the final document.

In the next article, we will see a practical example of loading PDF files using ingest pipelines into Elasticsearch so we can enable search on the content.

Part 2/3 of the Ingest Pipelines mini-article series:

  1. Introduction to Ingest Pipelines
  2. Mechanics of Ingest Pipelines
  3. Loading PDFs into Elasticsearch

--

--

Madhusudhan Konda
Madhusudhan Konda

Written by Madhusudhan Konda

Madhusudhan Konda is a full-stack lead engineer, mentor, and conference speaker. He delivers live online training on Elasticsearch, Elastic Stack &Spring Cloud

No responses yet