4  Data Processing

4.1 Task: Define a mapping that satisfies a given set of requirements

Example 1: Defining Index Mappings for a Product Catalog

Requirements

  • Create a mapping for an index named product_catalog
  • Define fields for product ID, name, description, price, and availability status.
  • Ensure the price field is a numeric type.
  • Use a text type for description with a keyword sub-field for exact matches.

Steps

  1. Open the Kibana Console or use a REST client.

  2. Create the index with mappings:

    PUT /product_catalog
    {
      "mappings": {
        "properties": {
          "product_id": {
            "type": "keyword"
          },
          "name": {
            "type": "text"
          },
          "description": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "price": {
            "type": "double"
          },
          "availability_status": {
            "type": "boolean"
          }
        }
      }
    }
  3. Create sample documents using the _bulk endpoint:

    POST /product_catalog/_bulk
    { "index": { "_id": "1" } }
    { "product_id": "p001", "name": "Product 1", "description": "Description of product 1", "price": 19.99, "availability_status": true }
    { "index": { "_id": "2" } }
    { "product_id": "p002", "name": "Product 2", "description": "Description of product 2", "price": 29.99, "availability_status": false }

Test

  1. Retrieve the mappings to verify:

    GET /product_catalog/_mapping
  2. Search for documents to confirm they are indexed correctly:

    GET /product_catalog/_search

    OR

    GET /product_catalog/_search
    {
      "query": {
        "match_all": {}
      }
    }

    OR

    GET product_catalog/_search
    {
      "query": {
        "term": {
          "description": "product"
        }
      }
    }

    OR

    GET product_catalog/_search
    {
      "query": {
        "match": {
          "description.keyword": "Description of product 1"
        }
      }
    }

Considerations

  • The price field is set to integer to handle whole numbers.
  • The description field includes a keyword sub-field for exact match searches.

Clean-up (optional)

  • Delete the index (which will also delete the mapping)

    DELETE product_catalog

Documentation

Example 2: Creating a mapping for a social media platform

Requirements

  • Create a mapping for an index named users
  • The mapping should have a field called username of type keyword
  • The mapping should have a field called email of type keyword
  • The mapping should have a field called posts of type array containing object values
  • The posts array should have a property called content of type text
  • The posts array should have a property called likes of type integer

Steps

  1. Open the Kibana Console or use a REST client

  2. Create an index with the desired mapping:

    PUT /users
    {
      "mappings": {
        "properties": {
          "username": {
            "type": "keyword"
          },
          "email": {
            "type": "keyword"
          },
          "posts": {
            "properties": {
              "content": {
                "type": "text"
              },
              "likes": {
                "type": "integer"
              }
            }
          }
        }
      }
    }
  3. Index a document:

    POST /users/_doc
    {
      "username": "john_doe",
      "email": "john.doe@example.com",
      "posts": [
        {
          "content": "Hello World!",
          "likes": 10
        },
        {
          "content": "This is my second post",
          "likes": 5
        }
      ]
    }

Test

  • Verify the mapping
GET users
  • Use the _search API to verify that the mapping is correct and the data is indexed:

    GET /users/_search
    {
      "query": {
        "match": {
          "username": "john_doe"
        }
      }
    }

    And

    GET users/_search
    {
      "size": 0, 
      "aggs": {
        "total_likes": {
          "sum": {
            "field": "posts.likes"
          }
        }
      }
    }

Considerations

  • The username and email fields are of type keyword to enable exact matching.
  • The posts field is of type array with object values to enable storing multiple posts per user.
  • The content field is of type text to enable full-text search.
  • The likes field is of type integer to enable aggregations and sorting.

Clean-up (optional)

  • Delete the index (which will also delete the mapping)

    DELETE users

Documentation

Example 3: Creating a mapping for storing and searching restaurant data

Requirements

  • Create a mapping for an index named restaurants.
  • The mapping should include fields for:
    • name (text field for restaurant name)
    • description (text field for restaurant description)
    • location (geolocation field for restaurant location)

Steps

  1. Open the Kibana Console or use a REST client

  2. Define the mapping using a REST API call:

    PUT /restaurants
    {
      "mappings": {
        "properties": {
          "name": {
            "type": "text"
          },
          "description": {
            "type": "text"
          },
          "location": {
            "type": "geo_point"
          }
        }
      }
    }

Test

  1. Verify that the mapping is created successfully by using the following API call:

    GET /restaurants/_mapping
  2. Try indexing a sample document with the defined fields:

    PUT /restaurants/_doc/1
    {
      "name": "Pizza Palace",
      "description": "Delicious pizzas and Italian cuisine",
      "location": {
        "lat": 40.7128,
        "lon": -74.0059
      }
    }
  3. Use search queries to test text search on name and description fields, and utilize geoqueries to search based on the location field.

    GET /restaurants/_search
    {
      "query": {
        "match": {
          "name": "Pizza Palace"
        }
      }
    }
    GET /restaurants/_search
    {
      "query": {
        "match": {
          "description": "Italian cuisine"
        }
      }
    }
    GET /restaurants/_search
    {
      "query": {
        "bool": {
          "filter": {
            "geo_distance": {
              "distance": "5km",
              "location": {
                "lat": 40.7128,
                "lon": -74.0059
              }
            }
          }
        }
      }
    }

Considerations

  • text is a generic field type suitable for textual data like names and descriptions.
  • geo_point is a specialized field type for storing and searching geospatial data like latitude and longitude coordinates.

Clean-up (optional)

  • Delete the index (which will also delete the mapping)

    DELETE restaurants

Documentation

4.2 Task: Define and use a custom analyzer that satisfies a given set of requirements

Example 1: Custom Analyzer for Restaurant Reviews

4.2.0.1 Requirements

  • Create a mapping for an index named restaurant_reviews
  • Create a custom analyzer named custom_review_analyzer.
  • The analyzer should:
    • Use the standard tokenizer.
    • Include a lowercase filter.
    • Include a stop filter to remove common English stop words.
    • Include a synonym filter to handle common synonyms.

4.2.0.2 Steps

  1. Open the Kibana Console or use a REST client

  2. Create the index with a custom analyzer defined in the index settings.

    PUT /restaurant_reviews
    {
      "settings": {
        "analysis": {
          "analyzer": {
            "custom_review_analyzer": {
              "type": "custom",
              "tokenizer": "standard",
              "filter": [
                "lowercase",
                "stop",
                "synonym"
              ]
            }
          },
          "filter": {
            "synonym": {
              "type": "synonym",
              "synonyms": [
                "delicious, tasty",
                "restaurant, eatery"
              ]
            }
          }
        }
      },
      "mappings": {
        "properties": {
          "review_id": {
            "type": "keyword"
          },
          "restaurant_name": {
            "type": "text"
          },
          "review_text": {
            "type": "text",
            "analyzer": "custom_review_analyzer"
          },
          "rating": {
            "type": "integer"
          },
          "review_date": {
            "type": "date"
          }
        }
      }
    }
  3. Add some sample documents to the index to test the custom analyzer

    POST /restaurant_reviews/_bulk
    { "index": {} }
    { "review_id": "1", "restaurant_name": "Pizza Palace", "review_text": "The pizza was delicious and the service was excellent.", "rating": 5, "review_date": "2024-07-01" }
    { "index": {} }
    { "review_id": "2", "restaurant_name": "Burger Haven", "review_text": "Tasty burgers and friendly staff.", "rating": 4, "review_date": "2024-07-02" }
  4. Perform a search query to verify the custom analyzer is working as expected.

    GET /restaurant_reviews/_search
    {
      "query": {
        "match": {
          "review_text": "tasty"
        }
      }
    }

4.2.0.3 Considerations

  • Standard Tokenizer: Chosen for its ability to handle most text inputs effectively.
  • Lowercase Filter: Ensures case-insensitive search.
  • Stop Filter: Removes common stop words to improve search relevance.
  • Synonym Filter: Handles common synonyms to enhance search matching.

4.2.0.4 Test

  1. Verify the analyzer was created

    GET /restaurant_reviews/_settings
  2. Verify the custom analyzer configuration using the _analyze API to test the custom analyzer directly.

    GET /restaurant_reviews/_analyze
    {
      "analyzer": "custom_review_analyzer",
      "text": "The pizza was delicious and the service was excellent."
    }
  3. Perform a search queries to ensure the custom analyzer processes the text as expected.

    GET /restaurant_reviews/_search
    {
      "query": {
        "match": {
          "review_text": "tasty"
        }
      }
    }

4.2.0.5 Clean-up (optional)

  • Delete the Index

    DELETE /restaurant_reviews

4.2.0.6 Documentation

Example 2: Creating a custom analyzer for product descriptions

Requirements

  • Create a mapping for an index named products with a description field containing product descriptions
  • The custom analyzer should:
    • Lowercase all text
    • Remove stop words (common words like the, and, a, etc.)
    • Split text into individual words (tokenize)
    • Stem words (reduce words to their root form, e.g., running - run)

Steps

  1. Open the Kibana Console or use a REST client

  2. Create the products index with a custom analyzer for the description field:

PUT /products
{
  "settings": {
    "analysis": {
      "analyzer": {
        "product_description_analyzer": {
          "tokenizer": "standard",
          "filter": [
            "lowercase",
            "stop",
            "stemmer"
          ]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "description": {
        "type": "text",
        "analyzer": "product_description_analyzer"
      }
    }
  }
}
  1. Index some sample documents using the _bulk endpoint:
POST /products/_bulk
{ "index": { "_id": 1 } }
{ "description": "The quick brown fox jumps over the lazy dog." }
{ "index": { "_id": 2 } }
{ "description": "A high-quality product for running enthusiasts." }

Test

  1. Search for documents containing the term run
GET /products/_search
{
  "query": {
    "match": {
      "description": "run"
    }
  }
}

This should return the document with _id 2, as the custom analyzer has stemmed running to run.

  1. Search for documents containing the term the
GET /products/_search
{
  "query": {
    "match": {
      "description": "the"
    }
  }
}

This should not return any documents, as the custom analyzer has removed stop words like the.

Considerations

  • The custom analyzer is defined in the index settings using the analysis section.
  • The tokenizer parameter specifies how the text should be split into tokens (individual words).
  • The filter parameter specifies the filters to be applied to the tokens, such as lowercasing, stop word removal, and stemming.
  • The custom analyzer is applied to description by specifying it in the field mapping.

Clean-up (optional)

  • Delete the Index

    DELETE /products

Documentation

Example 3: Creating a custom analyzer for product descriptions in an ecommerce catalog

Requirements

  • Define an index called product_catalog with a description field.
  • Create a custom tokenizer that splits text on non-letter characters.
  • Include a lowercase filter to normalize text.
  • Add a stopword filter to remove common English stopwords.

Steps

  1. Open the Kibana Console or use a REST client

  2. Define the custom analyzer in the index settings

    PUT product_catalog
    {
      "settings": {
        "analysis": {
          "analyzer": {
            "custom_analyzer": {
              "type": "custom",
              "tokenizer": "lowercase",
              "filter": [
                "english_stop"
              ]
            }
          },
          "filter": {
            "english_stop": {
              "type": "stop",
              "stopwords": "_english_"
            }
          }
        }
      },
      "mappings": {
        "properties": {
          "description" : {
            "type": "text",
            "analyzer": "custom_analyzer"
          }
        }
      }
    }
  3. Create sample documents using the _bulk endpoint:

    POST /product_catalog/_bulk
    { "index": { "_id": "1" } }
    { "description": "This is a great product! It works perfectly." }
    { "index": { "_id": "2" } }
    { "description": "An amazing gadget, with excellent features." }

Test

  1. Analyze a sample text to verify the custom analyzer:

    GET product_catalog/_analyze
    {
      "analyzer" : "custom_analyzer",
      "text" : "i2can2RUN4the6MARATHON!"
    }
    // response
    {
      "tokens": [
        {
          "token": "i",
          "start_offset": 0,
          "end_offset": 1,
          "type": "word",
          "position": 0
        },
        {
          "token": "can",
          "start_offset": 2,
          "end_offset": 5,
          "type": "word",
          "position": 1
        },
        {
          "token": "run",
          "start_offset": 6,
          "end_offset": 9,
          "type": "word",
          "position": 2
        },
        {
          "token": "marathon",
          "start_offset": 14,
          "end_offset": 22,
          "type": "word",
          "position": 4
        }
      ]
    }
  2. Search for documents to confirm they are indexed correctly:

    GET /product_catalog/_search
    {
      "query": {
        "match": {
          "description": "great product"
        }
      }
    }

Considerations

  • The custom tokenizer splits text on non-letter characters, ensuring that punctuation does not affect tokenization.
    • The lowercase tokenizer splits text on non-letter characters and turns uppercase characters into lowercase
  • The lowercase filter normalizes text to lower case, providing case-insensitive searches.
  • The custom_stop stopword filter removes common English stopwords, improving search relevance by ignoring less important words.

Clean-up (optional)

  • Delete the ecommerce_products index:

    DELETE /ecommerce_products

Documentation

Example 4: Create a Custom Analyzer for E-commerce Product Data

Requirements

  • Index e-commerce product data with fields such as name, category, description, and sku.
  • Custom analyzer to normalize text for consistent search results, including handling special characters and case sensitivity.
  • Use the _bulk endpoint to ingest multiple documents.
  • Two example searches to verify that the custom analyzer handles both hyphenated and non-hyphenated queries.

Steps

  1. Define the Custom Analyzer:
    • Set up the analyzer to lowercase text, remove special characters, and tokenize the content.
    PUT /ecommerce_products
    {
      "settings": {
        "analysis": {
          "char_filter": {
            "remove_special_chars": {
              "type": "pattern_replace",
              "pattern": "[^\\w\\s]",
              "replacement": ""
            }
          },
          "filter": {
            "my_lowercase": {
              "type": "lowercase"
            }
          },
          "analyzer": {
            "custom_analyzer": {
              "char_filter": ["remove_special_chars"],
              "tokenizer": "standard",
              "filter": ["my_lowercase"]
            }
          }
        }
      },
      "mappings": {
        "properties": {
          "name": {
            "type": "text",
            "analyzer": "custom_analyzer"
          },
          "category": {
            "type": "keyword"
          },
          "description": {
            "type": "text",
            "analyzer": "custom_analyzer"
          },
          "sku": {
            "type": "keyword"
          }
        }
      }
    }
  2. Index Sample Documents Using _bulk Endpoint:
    • Use the _bulk endpoint to ingest multiple documents.
    POST /ecommerce_products/_bulk
    { "index": { "_id": "1" } }
    { "name": "Choco-Lite Bar", "category": "Snacks", "description": "A light and crispy chocolate snack bar.", "sku": "SNACK-CHOCOLITE-001" }
    { "index": { "_id": "2" } }
    { "name": "Apple iPhone 12", "category": "Electronics", "description": "The latest iPhone model with advanced features.", "sku": "ELEC-IPH12-256GB" }
    { "index": { "_id": "3" } }
    { "name": "Samsung Galaxy S21", "category": "Electronics", "description": "A powerful smartphone with an impressive camera.", "sku": "ELEC-SG-S21" }
    { "index": { "_id": "4" } }
    { "name": "Nike Air Max 270", "category": "Footwear", "description": "Comfortable and stylish sneakers.", "sku": "FTWR-NIKE-AM270" }

Test

  • Query without Hyphen:

    GET /ecommerce_products/_search
    {
      "query": {
        "match": {
          "name": "chocolite"
        }
      }
    }
  • Query with Hyphen:

    GET /ecommerce_products/_search
    {
      "query": {
        "match": {
          "name": "choco-lite"
        }
      }
    }

Considerations

  • The pattern_replace character filter removes non-alphanumeric characters (excluding whitespace) to normalize data for indexing and searching.
  • The lowercase filter ensures case-insensitivity, providing consistent search results regardless of the case of the input.
  • The use of the _bulk endpoint allows efficient indexing of multiple documents in a single request, which is especially useful for large datasets.

Documentation

4.3 Task: Define and use multi-fields with different data types and/or analyzers

Example 1: Creating multi-fields for product names in an e-commerce catalog

Requirements

  • Define an index called product_catalog
  • Define a field with a text type for full-text search.
  • Include a keyword sub-field for exact matches.
  • Add a custom analyzer to the text field to normalize the text.

Steps

  1. Open the Kibana Console or use a REST client

  2. Define the multi-fields in the index mappings

    PUT /product_catalog
    {
      "settings": {
        "analysis": {
          "analyzer": {
            "custom_analyzer": {
              "type": "custom",
              "tokenizer": "standard",
              "filter": [
                "lowercase",
                "asciifolding"
              ]
            }
          }
        }
      },
      "mappings": {
        "properties": {
          "product_name": {
            "type": "text",
            "analyzer": "custom_analyzer",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          }
        }
      }
    }
  3. Create sample documents using the _bulk endpoint:

    POST /product_catalog/_bulk
    { "index": { "_id": "1" } }
    { "product_name": "Deluxe Toaster" }
    { "index": { "_id": "2" } }
    { "product_name": "Premium Coffee Maker" }

Test

  1. Retrieve the index configuration to verify the custom analyzer and the sub-field:

    GET product_catalog
  2. Search for documents using the text field:

    GET /product_catalog/_search
    {
      "query": {
        "match": {
          "product_name": "deluxe"
        }
      }
    }
  3. Search for documents using the keyword sub-field:

    GET /product_catalog/_search
    {
      "query": {
        "term": {
          "product_name.keyword": "Deluxe Toaster"
        }
      }
    }

Considerations

  • The custom analyzer (standard) includes the lowercase filter for case-insensitive searches.
  • The keyword sub-field allows for exact matches, which is useful for aggregations and sorting.

Clean-up (optional)

  • Delete the Index

    DELETE /product_catalog

Documentation

Example 2: Creating a multi-field for a title with different analyzers

Requirements

  • Create a mapping for a index named myindex
  • The title field should have a sub-field for exact matching (keyword)
  • The title field should have a sub-field for full-text search (text) with standard analyzer
  • The title field should have a sub-field for full-text search (text) with english analyzer

Steps

  1. Open the Kibana Console or use a REST client

  2. Create an index with the desired mapping:

    PUT /myindex
    {
      "mappings": {
        "properties": {
          "title": {
            "type": "text",
            "fields": {
              "exact": {
                "type": "keyword"
              },
              "std": {
                "type": "text",
                "analyzer": "standard"
              },
              "english": {
                "type": "text",
                "analyzer": "english"
              }
            }
          }
        }
      }
    }
  3. Add documents using the appropriate endpoint:

    POST /myindex/_bulk
    { "index": { "_index": "myindex" } }
    { "title": "The Quick Brown Fox" }
    { "index": { "_index": "myindex" } }
    { "title": "The Quick Brown Fox Jumps" }

Test

  • Verify the index was created with its associated multi-fields

    GET myindex
  • Use the _search API to verify that the multi-field is working correctly

    GET /myindex/_search
    {
      "query": {
        "match": {
          "title.exact": "The Quick Brown Fox"
        }
      }
    }
    
    GET /myindex/_search
    {
      "query": {
        "match": {
          "title.std": "Quick Brown"
        }
      }
    }
    
    GET /myindex/_search
    {
      "query": {
        "match": {
          "title.english": "Quick Brown"
        }
      }
    }

Considerations

  • The title.exact sub-field is used for exact matching.
  • The title.std sub-field is used for full-text search with the standard analyzer.
  • The title.english sub-field is used for full-text search with the English analyzer.

Clean-up (optional)

  • Delete the Index

    DELETE /myindex

Documentation

Example 3: Creating multi-fields for analyzing text data

Requirements

  • Create a mapping for a index named text_data
  • Store the original text data in content for display purposes
  • Analyze the text data for full-text search
  • Analyze the text data for filtering and aggregations

Steps

  1. Open the Kibana Console or use a REST client

  2. Define the multi-fields in the index mapping

    PUT /text_data
    {
      "mappings": {
        "properties": {
          "content": {
            "type": "text",
            "fields": {
              "raw": {
                "type": "keyword"
              },
              "analyzed": {
                "type": "text",
                "analyzer": "english"
              },
              "ngram": {
                "type": "text",
                "analyzer": "ngram_analyzer"
              }
            }
          }
        }
      },
      "settings": {
        "analysis": {
          "analyzer": {
            "ngram_analyzer": {
              "tokenizer": "ngram_tokenizer"
            }
          },
          "tokenizer": {
            "ngram_tokenizer": {
              "type": "ngram",
              "min_gram": 2,
              "max_gram": 3
            }
          }
        }
      }
    }
  3. Index some documents using the text_data index:

    POST /text_data/_bulk
    { "index": {} }
    { "content": "This is a sample text for analyzing." }
    { "index": {} }
    { "content": "Another example of text data." }

Test

  1. Verify the index was created with its associated multi-fields

    GET text_data
  2. Test the multi-fields by querying and aggregating the data:

    GET /text_data/_search
    {
      "query": {
        "match": {
          "content.analyzed": "sample"
        }
      },
      "aggs": {
        "filter_agg": {
          "filter": {
            "term": {
              "content.ngram": "ex"
            }
          }
        }
      }
    }

    The output should show a single document in the search results matching the analyzed text and the aggregation results based on the ngram analysis.

    The following:

    GET /text_data/_search
    {
      "query": {
        "match": {
          "content.ngram": "ex"
        }
      },
      "aggs": {
        "filter_agg": {
          "filter": {
            "term": {
              "content.ngram": "ex"
            }
          }
        }
      }
    }

    will show 2 documents as the search is looking for the substring “ex” which can be found in both documents, but only if you search against content.ngram.

    // edited response
    {
      ...
      "hits": {
        "total": {
          "value": 1,
          "relation": "eq"
        },
        "max_score": 0.7361701,
        "hits": [
          {
            "_index": "text_data",
            "_id": "qnqiBJEBRRh1FLFiJKsV",
            "_score": 0.7361701,
            "_source": {
              "content": "This is a sample text for analyzing."
            }
          }
        ]
      },
      "aggregations": {
        "filter_agg": {
          "doc_count": 1
        }
      }
    }

Considerations

  • The content field has multiple sub-fields: raw (keyword), analyzed (text with English analyzer), and ngram (text with ngram analyzer).
  • The raw sub-field is used for storing the original text data without analysis.
  • The analyzed sub-field is used for full-text search using the English analyzer.
  • The ngram sub-field is used for filtering and aggregations based on ngram analysis.

Clean-up (optional)

  • Delete the Index

    DELETE text_data

Documentation

4.4 Task: Use the Reindex API and Update By Query API to reindex and/or update documents

Example 1: Moving and updating product data to a new index with a new field

Requirements

  • Reindex data from an existing index named products_old to a new index named products_new.
  • During the reindexing process, add a new field named stock_level with a default value of 10 for each product.

Steps

  1. Open the Kibana Console or use a REST client

  2. Create the indices (notice that they both look identical){target=“_blank”}

    PUT /products_old
    {
      "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
          "product_id": {
            "type": "keyword"
          },
          "name": {
            "type": "text"
          },
          "description": {
            "type": "text"
          },
          "price": {
            "type": "double"
          },
          "availability_status": {
            "type": "boolean"
          }
        }
      }
    }
    PUT /products_new
    {
      "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
          "product_id": {
            "type": "keyword"
          },
          "name": {
            "type": "text"
          },
          "description": {
            "type": "text"
          },
          "price": {
            "type": "double"
          },
          "availability_status": {
            "type": "boolean"
          }
        }
      }
    }
  3. Add products to products_old

    POST /products_old/_bulk
    { "index": { "_index": "products_old", "_id": "1" } }
    { "product_id": "1", "name": "Wireless Mouse", "description": "A high-quality wireless mouse with ergonomic design.", "price": 29.99, "availability_status": true }
    { "index": { "_index": "products_old", "_id": "2" } }
    { "product_id": "2", "name": "Gaming Keyboard", "description": "Mechanical gaming keyboard with customizable RGB lighting.", "price": 79.99, "availability_status": true }
    { "index": { "_index": "products_old", "_id": "3" } }
    { "product_id": "3", "name": "USB-C Hub", "description": "A versatile USB-C hub with multiple ports.", "price": 49.99, "availability_status": true }
  4. Use the Reindex API with a script to update documents during the copy process:

    POST /_reindex
    {
      "source": {
        "index": "products_old"
      },
      "dest": {
        "index": "products_new"
      },
      "script": {
        "source": "ctx._source.stock_level = 10"
      }
    }
  5. Wait for the reindexing or update operation to complete.

Test

  1. Verify that the documents from products_old do not contain stock_level

    GET /products_old/_search
    // edited response
    {
     ...
         "hits": [
           {
             "_index": "products_old",
             "_id": "1",
             "_score": 1,
             "_source": {
               "product_id": "1",
               "name": "Wireless Mouse",
               "description": "A high-quality wireless mouse with ergonomic design.",
               "price": 29.99,
               "availability_status": true
             }
           },
           {
             "_index": "products_old",
             "_id": "2",
             "_score": 1,
             "_source": {
               "product_id": "2",
               "name": "Gaming Keyboard",
               "description": "Mechanical gaming keyboard with customizable RGB lighting.",
               "price": 79.99,
               "availability_status": true
             }
           },
           {
             "_index": "products_old",
             "_id": "3",
             "_score": 1,
             "_source": {
               "product_id": "3",
               "name": "USB-C Hub",
               "description": "A versatile USB-C hub with multiple ports.",
               "price": 49.99,
               "availability_status": true
             }
           }
         ]
       }
     }
  2. Verify that the data is successfully migrated to the products_new index with the addition of stock_level

    GET /products_new/_search
    // edited response
     {
       ...
         "hits": [
           {
             "_index": "products_new",
             "_id": "1",
             "_score": 1,
             "_source": {
               "availability_status": true,
               "price": 29.99,
               "product_id": "1",
               "stock_level": 10,
               "name": "Wireless Mouse",
               "description": "A high-quality wireless mouse with ergonomic design."
             }
           },
           {
             "_index": "products_new",
             "_id": "2",
             "_score": 1,
             "_source": {
               "availability_status": true,
               "price": 79.99,
               "product_id": "2",
               "stock_level": 10,
               "name": "Gaming Keyboard",
               "description": "Mechanical gaming keyboard with customizable RGB lighting."
             }
           },
           {
             "_index": "products_new",
             "_id": "3",
             "_score": 1,
             "_source": {
               "availability_status": true,
               "price": 49.99,
               "product_id": "3",
               "stock_level": 10,
               "name": "USB-C Hub",
               "description": "A versatile USB-C hub with multiple ports."
             }
           }
         ]
       }
     }

Considerations

  • The Reindex API with a script allows copying data and applying transformations during the process.

Clean-up (optional)

  • Delete the two indices

    DELETE products_old
    DELETE products_new

Documentation

Example 2: Reindexing and updating product data

Requirements

  • Reindex data from an existing index named products_old to a new index named products_new.
  • Both indices have the following fields:
    • name (text)
    • price (float)
    • inventory_count (integer)
  • The products_new index has an additional boolean field called in_stock
  • In products_new, update the in_stock field for products with a low inventory count (less than 10 items)

Steps

  1. Open the Kibana Console or use a REST client

  2. Create the old index with some sample data:

    PUT /products_old
    {
      "mappings": {
        "properties": {
          "name": {
            "type": "text"
          },
          "price": {
            "type": "float"
          },
          "inventory_count": {
            "type": "integer"
          }
        }
      }
    }
    POST /products_old/_bulk
    { "index": {} }
    { "name": "Product A", "price": 19.99, "inventory_count": 10 }
    { "index": {} }
    { "name": "Product B", "price": 29.99, "inventory_count": 5 }
    { "index": {} }
    { "name": "Product C", "price": 39.99, "inventory_count": 20 }
  3. Create the new index with an updated mapping:

    PUT /products_new
    {
      "mappings": {
        "properties": {
          "name": {
            "type": "text"
          },
          "price": {
            "type": "float"
          },
          "inventory_count": {
            "type": "integer"
          },
          "in_stock": {
            "type": "boolean"
          }
        }
      }
    }
  4. Reindex the data from the old index to the new index. This updates the in_stock field as it migrates the content.

    POST /_reindex
    {
      "source": {
        "index": "products_old"
      },
      "dest": {
        "index": "products_new"
      },
      "script": {
        "source": """
          if (ctx._source.inventory_count < 10) {
            ctx._source.in_stock = false;
          } else {
            ctx._source.in_stock = true;
          }
        """
      }
    }
  5. You also update the in_stock field for products with low inventory after the content is reindexed/migrated.

    POST /products_new/_update_by_query
    {
      "script": {
        "source": "ctx._source.in_stock = false"
      },
      "query": {
        "range": {
          "inventory_count": {
            "lt": 10
          }
        }
      }
    }

Test

  1. Search the new index to verify the reindexed data and updated in_stock field

    GET /products_new/_search
    // edited response
    {
      ...
        "hits": [
          {
            "_index": "products_new",
            "_id": "rHqtBJEBRRh1FLFi_quh",
            "_score": 1,
            "_source": {
              "price": 19.99,
              "inventory_count": 10,
              "name": "Product A",
              "in_stock": true
            }
          },
          {
            "_index": "products_new",
            "_id": "rXqtBJEBRRh1FLFi_qui",
            "_score": 1,
            "_source": {
              "price": 29.99,
              "inventory_count": 5,
              "name": "Product B",
              "in_stock": false
            }
          },
          {
            "_index": "products_new",
            "_id": "rnqtBJEBRRh1FLFi_qui",
            "_score": 1,
            "_source": {
              "price": 39.99,
              "inventory_count": 20,
              "name": "Product C",
              "in_stock": true
            }
          }
        ]
      }
    }

    The response should show the reindexed products with in_stock set correctly based on the inventory count.

  2. Search products_old to verify the original data and the absence of in_stock

    GET /products_old/_search
    // edited response
    {
      ...
        "hits": [
          {
            "_index": "products_old",
            "_id": "rHqtBJEBRRh1FLFi_quh",
            "_score": 1,
            "_source": {
              "name": "Product A",
              "price": 19.99,
              "inventory_count": 10
            }
          },
          {
            "_index": "products_old",
            "_id": "rXqtBJEBRRh1FLFi_qui",
            "_score": 1,
            "_source": {
              "name": "Product B",
              "price": 29.99,
              "inventory_count": 5
            }
          },
          {
            "_index": "products_old",
            "_id": "rnqtBJEBRRh1FLFi_qui",
            "_score": 1,
            "_source": {
              "name": "Product C",
              "price": 39.99,
              "inventory_count": 20
            }
          }
        ]
      }
    }

Considerations

  • The Reindex API is used to copy data from the old index to the new index while applying a script to set the “in_stock” field based on the inventory count.
  • The Update By Query API is used to update the in_stock field for products with an inventory count lower than 10.

Clean-up (optional)

  • Delete the two indices

    DELETE products_old
    DELETE products_new

Documentation

Example 3: Reindexing documents from an old product catalog to a new one with updated mappings and updating prices in the new catalog

Requirements

  • Create the products_old index and add sample products.
  • Create the products_new index using the products_old mapping.
  • Reindex documents from products_old to products_new.
    • Increase the price of all products in products_new by 10%.

Steps

  1. Create the products_old index and add sample products

    PUT /products_old
    {
      "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
          "product_id": {
            "type": "keyword"
          },
          "name": {
            "type": "text"
          },
          "description": {
            "type": "text"
          },
          "price": {
            "type": "double"
          },
          "availability_status": {
            "type": "boolean"
          }
        }
      }
    }
    
    POST /products_old/_bulk
    { "index": { "_index": "products_old", "_id": "1" } }
    { "product_id": "1", "name": "Wireless Mouse", "description": "A high-quality wireless mouse with ergonomic design.", "price": 29.99, "availability_status": true }
    { "index": { "_index": "products_old", "_id": "2" } }
    { "product_id": "2", "name": "Gaming Keyboard", "description": "Mechanical gaming keyboard with customizable RGB lighting.", "price": 79.99, "availability_status": true }
    { "index": { "_index": "products_old", "_id": "3" } }
    { "product_id": "3", "name": "USB-C Hub", "description": "A versatile USB-C hub with multiple ports.", "price": 49.99, "availability_status": true }
  2. Create the new index with updated mappings

    • Define the new index products_new with the desired mappings.
    PUT /products_new
    {
      "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
          "product_id": {
            "type": "keyword"
          },
          "name": {
            "type": "text"
          },
          "description": {
            "type": "text"
          },
          "price": {
            "type": "double"
          },
          "availability_status": {
            "type": "boolean"
          }
        }
      }
    }
  3. Reindex Documents from products_old to products_new while updating price

    POST _reindex
    {
      "source": {
        "index": "products_old"
      },
      "dest": {
        "index": "products_new"
      },
      "script": {
        "source": "ctx._source.price *= 1.1;"
      }
    }
  4. OR Migrate the content and then update price in the new index using the Update By Query API to increase the price of all products in products_new by 10%.

    POST _reindex
    {
      "source": {
        "index": "products_old"
      },
      "dest": {
        "index": "products_new"
      }
    }
    POST /products_new/_update_by_query
    {
      "script": {
        "source": "ctx._source.price *= 1.10",
        "lang": "painless"
      },
      "query": {
        "match_all": {}
      }
    }

Test

  1. Verify the reindexing

    GET /products_old/_count
    GET /products_new/_count
    // responses for both indices
    # GET /products_old/_count 200 OK
    {
      "count": 3,
      "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
      }
    }
    # GET /products_new/_count 200 OK
    {
      "count": 3,
      "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
      }
    }
  2. Verify the price update

    GET /products_old,products_new/_search
    {
      "query": {
        "match_all": {}
      },
      "_source": [
        "price"
        ]
    }
    // edited response
    {
      ...
        "hits": [
          {
            "_index": "products_new",
            "_id": "1",
            "_score": 1,
            "_source": {
              "price": 32.989000000000004
            }
          },
          {
            "_index": "products_new",
            "_id": "2",
            "_score": 1,
            "_source": {
              "price": 87.989
            }
          },
          {
            "_index": "products_new",
            "_id": "3",
            "_score": 1,
            "_source": {
              "price": 54.989000000000004
            }
          },
          {
            "_index": "products_old",
            "_id": "1",
            "_score": 1,
            "_source": {
              "price": 29.99
            }
          },
          {
            "_index": "products_old",
            "_id": "2",
            "_score": 1,
            "_source": {
              "price": 79.99
            }
          },
          {
            "_index": "products_old",
            "_id": "3",
            "_score": 1,
            "_source": {
              "price": 49.99
            }
          }
        ]
      }
    }

Considerations

  • Mappings Update: Ensure the new index products_new has the updated mappings to accommodate any changes in the document structure.
  • Price Update Script: The script in the Update By Query API uses the painless language to increase the price by 10%. This is a simple and efficient way to update document fields.

Clean-up (optional)

  • Delete the indices

    DELETE /products_old
    DELETE /products_new

Documentation

4.5 Task: Define and use an ingest pipeline that satisfies a given set of requirements, including the use of Painless to modify documents

Example 1: Create an ingest pipeline for enriching and modifying product data in an e-commerce catalog

Requirements

  • Create an ingest pipeline named product_pipeline to process incoming documents.
  • Apply a Painless script to modify price to add 10% to the price
  • Enrich the data by adding the ingest time to a timestamp field
  • Create a product_catalog index

Notes: the use of the ctx object which represents a single document being processed. When updating a field (meaning the doc already exists in the index) you use the following form:

ctx._source.[field name]

vs. directly accessing the field in question prior to it being indexed:

ctx.[field name]

Steps

  1. Open the Kibana Console or use a REST client

  2. Define the ingest pipeline with a Painless script and additional processors:

    PUT /_ingest/pipeline/product_pipeline
    {
      "processors": [
        {
          "script": {
            "lang": "painless",
            "source": """
              if (ctx.price != null) {
                ctx.price *= 1.1;
              }
            """
          }
        },
        {
          "set": {
            "field": "timestamp",
            "value": "{{_ingest.timestamp}}"
          }
        }
      ]
    }
  3. Create the product_catalog index

    PUT /product_catalog
    {
      "mappings": {
        "properties": {
          "product_id": {
            "type": "keyword"
          },
          "name": {
            "type": "text"
          },
          "description": {
            "type": "text"
          },
          "price": {
            "type": "double"
          },
          "timestamp": {
            "type": "date"
          }
        }
      }
    }
  4. Index documents using the ingest pipeline

    POST /product_catalog/_bulk?pipeline=product_pipeline
    { "index": { "_id": "1" } }
    { "product_id": "p001", "name": "Product 1", "description": "Description of product 1", "price": 20.0 }
    { "index": { "_id": "2" } }
    { "product_id": "p002", "name": "Product 2", "description": "Description of product 2", "price": 30.0 }

Test

  1. Verify the ingest pipeline configuration:

    GET /_ingest/pipeline/product_pipeline
  2. Search the indexed documents to ensure the modifications have been applied:

    GET /product_catalog/_search

Considerations

  • The Painless script modifies the price field to contain a 10% higher price
  • The set processor adds a timestamp to each document to track when it was ingested.
  • The inkjest pipeline processes all incoming documents to maintain data consistency.

Clean-up (optional)

  • Delete the index

    DELETE product_catalog
  • Delete the pipeline

    DELETE _ingest/pipeline/product_pipeline

Documentation

Example 2: Creating an ingest pipeline to extract and transform data for a logging index

This example creates another ingest pipeline, but this time adds it directly into the index definition.

This is also an example of how helpful it is to know more about scripting in Elasticsearch. The examples may or may not be trivial/complex, but an understanding of how to write script is required.

Requirements

  • Create an ingest pipeline named logging-pipeline
  • Extract from the log message:
    • the log level (DEBUG, INFO, WARNING, ERROR)
    • the log timestamp in ISO format
  • Add a new field log_level_tag with a value based on the log level (e.g. DEBUG -> DEBUG_LOG).
  • Add a new field log_timestamp_in_seconds with the timestamp in seconds.
  • Create a logging-index index
    • Declare the ingest pipeline as the defaultin the logging-index index settings

Steps

  1. Open the Kibana Console or use a REST client

  2. Create an ingest pipeline:

    PUT /_ingest/pipeline/logging-pipeline
    {
      "description": "Extract and transform log data",
      "processors": [
        {
          "grok": {
            "field": "message",
            "patterns": ["%{LOGLEVEL:log_level} %{TIMESTAMP_ISO8601:log_timestamp} %{GREEDYDATA:message}"]
          }
        },
        {
          "script": {
            "source": """
              ctx.log_level_tag = ctx.log_level.toUpperCase() + '_LOG';
              ctx.log_timestamp_in_seconds = ZonedDateTime.parse(ctx.log_timestamp).toEpochSecond();
            """,
            "lang": "painless"
          }
        }
      ]
    }
  3. Create an index with the ingest pipeline:

    PUT /logging-index
    {
      "mappings": {
        "properties": {
          "message": {
            "type": "text"
          },
          "log_level": {
            "type": "keyword"
          },
          "log_timestamp": {
            "type": "date"
          },
          "log_level_tag": {
            "type": "keyword"
          },
          "log_timestamp_in_seconds": {
            "type": "long"
          }
        }
      },
      "settings": {
        "index": {
          "default_pipeline": "logging-pipeline"
        }
      }
    }
  4. Add documents to the index:

    POST /logging-index/_bulk
    { "index": { "_index": "logging-index" } }
    { "message": "DEBUG 2022-05-25T14:30:00.000Z This is a debug message" }
    { "index": { "_index": "logging-index" } }
    { "message": "INFO 2022-05-25T14:30:00.000Z This is an info message" }

Test

  • Verify that the documents have been processed correctly:

    GET /logging-index/_search
    // edited response
    {
      ...
        "hits": [
          {
            "_index": "logging-index",
            "_id": "uXpCBpEBRRh1FLFiQ6s4",
            "_score": 1,
            "_source": {
              "log_level": "DEBUG",
              "log_timestamp": "2022-05-25T14:30:00.000Z",
              "log_level_tag": "DEBUG_LOG",
              "message": "This is a debug message",
              "log_timestamp_in_seconds": 1653489000
            }
          },
          {
            "_index": "logging-index",
            "_id": "unpCBpEBRRh1FLFiQ6s4",
            "_score": 1,
            "_source": {
              "log_level": "INFO",
              "log_timestamp": "2022-05-25T14:30:00.000Z",
              "log_level_tag": "INFO_LOG",
              "message": "This is an info message",
              "log_timestamp_in_seconds": 1653489000
            }
          }
        ]
      }
    }

Considerations

  • The ingest pipeline uses the Grok processor to extract the log level and timestamp from the log message.
  • The Painless script processor is used to transform the log level and timestamp into new fields.

Clean-up (optional)

  • Delete the index

    DELETE logging-index
  • Delete the pipeline

    DELETE _ingest/pipeline/logging-pipeline

Documentation

Example 3: Creating an ingest pipeline for product data

Requirements

  • Create an index mapping for products with fields like name, price, category, description, discounted_price.
  • Preprocess incoming product data using an ingest pipeline called product_pipeline:
    • Lowercase the name and category fields
    • Remove HTML tags from the description field
    • Calculate a discounted_price field based on the price field and a discount percentage stored in a pipeline variable

Steps

  1. Open the Kibana Console or use a REST client

  2. Define the ingest pipeline:

    PUT _ingest/pipeline/product_pipeline
    {
      "processors": [
        {
          "lowercase": {
            "field": "name"
          },
          "html_strip": {
            "field": "description"
          },
          "script": {
            "source": "double discount = 0.1; ctx.discounted_price = ctx.price * (1 - discount);"
          }
        },
        {
          "lowercase": {
            "field": "category"
          }
        }
      ]
    }
  3. Index a sample document using the ingest pipeline:

    PUT /products/_doc/1?pipeline=product_pipeline
    {
      "name": "Product A",
      "price": 99.99,
      "category": "Electronics",
      "description": "A <b>high-quality</b> product for running enthusiasts."
    }

Test

  1. Search the products index and verify that the document has been processed by the ingest pipeline:

    GET /products/_search
    // edited response
    {
      ...
        "hits": [
          {
            "_index": "products",
            "_id": "1",
            "_score": 1,
            "_source": {
              "name": "product a",
              "description": "A high-quality product for running enthusiasts.",
              "category": "electronics",
              "price": 99.99,
              "discounted_price": 89.991
            }
          }
        ]
      }
    }

Considerations

  • The ingest pipeline is defined with a list of processors that perform specific operations on incoming documents.
  • The lowercase processor lowercases the name and category fields.
  • The html_strip processor removes HTML tags from description
  • The script processor uses the Painless scripting language to calculate the discounted_price field based on the price field and a discount percentage variable.

Clean-up (optional)

  • Delete the index

    DELETE products
  • Delete the pipeline

    DELETE _ingest/pipeline/product_pipeline

Documentation

Example 4: Merge content from two indices into a third index

Requirements

The movie index has content that looks like this:

{
  "movie_id": 1,
  "title": "The Adventure Begins",
  "release_year": 2021,
  "genre_code": "ACT"
}

The genre index has content that looks like this:

{
  "genre_code": "ACT",
  "description": "Action - Movies with high energy and lots of physical activity"
}

Merge movie and genre into a third index called movie_with_genre that includes the genre.description in each movie record:

{
  "movie_id": 1,
  "title": "The Adventure Begins",
  "release_year": 2021,
  "genre_code": "ACT",
  "genre_description": "Action - Movies with high energy and lots of physical activity"
}

Steps

In order to merge two or more indices into a third index you will need to create an ingest pipeline that uses an index management enrich policy.

  1. Create an enrich policy that contains the index with the additional content to be used
  2. Execute the policy to create an enrich index as a temporary location for the enrich content
  3. Create an ingest pipeline that points to the enrich policy and the input index that will be merged with the enrich index

FROM THE KIBANA UI

  1. Open the Kibana Console or use a REST client

    • Create the movie index with sample documents

      PUT /movie
      {
        "mappings": {
          "properties": {
            "movie_id": { "type": "integer" },
            "title": { "type": "text" },
            "release_year": { "type": "integer" },
            "genre_code": { "type": "keyword" }
          }
        }
      }
      
      POST /movie/_bulk
      { "index": { "_id": 1 } }
      { "movie_id": 1, "title": "The Adventure Begins", "release_year": 2021, "genre_code": "ACT" }
      { "index": { "_id": 2 } }
      { "movie_id": 2, "title": "Drama Unfolds", "release_year": 2019, "genre_code": "DRM" }
      { "index": { "_id": 3 } }
      { "movie_id": 3, "title": "Comedy Night", "release_year": 2020, "genre_code": "COM" }
      { "index": { "_id": 4 } }
      { "movie_id": 4, "title": "Epic Adventure", "release_year": 2022, "genre_code": "ACT" }
      { "index": { "_id": 5 } }
      { "movie_id": 5, "title": "Tragic Tale", "release_year": 2018, "genre_code": "DRM" }
    • Create the genre index with sample documents

      PUT /genre
      {
        "mappings": {
          "properties": {
            "genre_code": { "type": "keyword" },
            "description": { "type": "text" }
          }
        }
      }
      
      POST /genre/_bulk
      { "index": { "_id": "ACT" } }
      { "genre_code": "ACT", "description": "Action - Movies with high energy and lots of physical activity" }
      { "index": { "_id": "DRM" } }
      { "genre_code": "DRM", "description": "Drama - Movies with serious, emotional, and often realistic stories" }
      { "index": { "_id": "COM" } }
      { "genre_code": "COM", "description": "Comedy - Movies designed to make the audience laugh" }
    • Optionally, create the movie_with_genre index

      PUT /movie_with_genre
      {
        "mappings": {
          "properties": {
            "movie_id": { "type": "integer" },
            "title": { "type": "text" },
            "release_year": { "type": "integer" },
            "genre_code": { "type": "keyword" },
            "genre_description": { "type": "text" }
          }
        }
      }
  2. From the Kibana dashboard: Home > Management > Index Management

  3. Press Add an Enrich Policy

    Configuration

    • Policy Name: movie-genre-policy
    • Policy Type: Match
    • Source Indices: genre

    Next: Field Selection

    • Match field: genre_code
    • Enrich field: description

    Next: Create

    Press Create and Execute (if everything looks correct)

  4. Home > Management > Data > Ingest > Ingest Pipelines

  • Press: Create Pipeline > New Pipeline

    Create Pipeline

    • Name: genre_ingest_pipeline
    • Press: Add Your First Processor > Add a Processor
      • Add Processor
        • Processor: Enrich
        • Field: genre_code (from the movie index)
        • Policy name: movie-genre-policy
        • Target field: genre_description (from movie_with_genre index)
      • Press Add Processor
    • Press: Test Document: Add Documents

    Enter:

    [
      {
        "_index": "movie",
        "_source": {
          "movie_id": 1,
          "title": "The Adventure Begins",
          "release_year": 2021,
          "genre_code": "ACT"
        }
      }
    ]

    Press: Run the Pipeline

    If the information entered is correct the response will be:

    {
      "docs": [
        {
          "doc": {
            "_index": "movie",
            "_version": "-3",
            "_id": "_id",
            "_source": {
              "release_year": 2021,
              "genre_description": {
                "description": "Action - Movies with high energy and lots of physical activity",
                "genre_code": "ACT"
              },
              "movie_id": 1,
              "title": "The Adventure Begins",
              "genre_code": "ACT"
            },
            "_ingest": {
              "timestamp": "2024-08-04T17:18:50.159798109Z"
            }
          }
        }
      ]
    }    

    Which is wrong as we just want the genre description field and not both the genre_code and description. The answer is given in the JSON below.

    Press the X in the top right hand corner of the panel to close the panel (not the browser).

    Press: Create Pipeline (when thhe side panel opens press Close)

WTF? Not sure why the enrich pipeline does that, but it needs to be corrected.

FROM THE KIBANA CONSOLE

  1. Open the Kibana Console or use a REST client

    • Create the movie index with sample documents
    PUT /movie
    {
      "mappings": {
        "properties": {
          "movie_id": { "type": "integer" },
          "title": { "type": "text" },
          "release_year": { "type": "integer" },
          "genre_code": { "type": "keyword" }
        }
      }
    }
    
    POST /movie/_bulk
    { "index": { "_id": 1 } }
    { "movie_id": 1, "title": "The Adventure Begins", "release_year": 2021, "genre_code": "ACT" }
    { "index": { "_id": 2 } }
    { "movie_id": 2, "title": "Drama Unfolds", "release_year": 2019, "genre_code": "DRM" }
    { "index": { "_id": 3 } }
    { "movie_id": 3, "title": "Comedy Night", "release_year": 2020, "genre_code": "COM" }
    { "index": { "_id": 4 } }
    { "movie_id": 4, "title": "Epic Adventure", "release_year": 2022, "genre_code": "ACT" }
    { "index": { "_id": 5 } }
    { "movie_id": 5, "title": "Tragic Tale", "release_year": 2018, "genre_code": "DRM" }
    • Create the genre index with sample documents
    PUT /genre
    {
      "mappings": {
        "properties": {
          "genre_code": { "type": "keyword" },
          "description": { "type": "text" }
        }
      }
    }
    
    POST /genre/_bulk
    { "index": { "_id": "ACT" } }
    { "genre_code": "ACT", "description": "Action - Movies with high energy and lots of physical activity" }
    { "index": { "_id": "DRM" } }
    { "genre_code": "DRM", "description": "Drama - Movies with serious, emotional, and often realistic stories" }
    { "index": { "_id": "COM" } }
    { "genre_code": "COM", "description": "Comedy - Movies designed to make the audience laugh" }
    • Optionally, create the movie_with_genre index
    PUT /movie_with_genre
    {
      "mappings": {
        "properties": {
          "movie_id": { "type": "integer" },
          "title": { "type": "text" },
          "release_year": { "type": "integer" },
          "genre_code": { "type": "keyword" },
          "genre_description": { "type": "text" }
        }
      }
    }
  2. Create an enrich policy

    PUT /_enrich/policy/movie-genre-policy
    {
      "match": {
        "indices": "genre",
        "match_field": "genre_code",
        "enrich_fields": ["description"]
      }
    }
  3. Execute the enrich policy

    PUT _enrich/policy/movie-genre-policy/_execute
  4. Define the ingest pipeline that will merge the content from genre into movie_with_genre. Notice the use of a temporary field as the genre content is being copied in its entirety into the new index. To correct that, we copy it into a temp field and then delete the temp field.

    PUT _ingest/pipeline/movie_genre_pipeline
    {
      "processors": [
        {
          "enrich": {
            "policy_name": "movie-genre-policy",
            "field": "genre_code",
            "target_field": "enriched_data",
            "max_matches": "1"
          }
        },
        {
          "script": {
            "source": """
              if (ctx.enriched_data != null && ctx.enriched_data.description != null) {
                ctx.genre_description = ctx.enriched_data.description;
              }
              ctx.remove("enriched_data");
            """
          }
        }
      ]
    }
  5. Reindex movie into movie_with_genre

    POST _reindex
    {
      "source": {
        "index": "movie"
      },
      "dest": {
        "index": "movie_with_genre",
        "pipeline": "movie_genre_pipeline"
      }
    }

Test

  1. Validate the creation of the movie index

    GET movie/_search
  2. Validate the creation of the genre index

    GET genre/_search
  3. Validate the creation of the enrich policy

    GET _enrich/policy/movie-genre-policy
  4. Validate the creation of the ingest pipeline

    GET _ingest/pipeline/movie_genre_pipeline
  5. Simulate the use of the ingest pipeline

    GET _ingest/pipeline/movie_genre_pipeline/_simulate
    {
      "docs": [
        {
          "_index": "movie",
          "_source": {
            "movie_id": 1,
            "title": "The Adventure Begins",
            "release_year": 2021,
            "genre_code": "ACT"
          }
        }
      ]
    }
  6. Validate the genre_description in movie_with_genre

    GET movie_with_genre/_search
    {
      "query": {
        "match_all": {}
      },
      "_source": [ "genre_code", "genre_description" ]
    }
    // edited response
    {
      ...
     "hits": [
       {
         "_index": "movie_with_genre",
         "_id": "1",
         "_score": 1,
         "_source": {
           "genre_description": "Action - Movies with high energy and lots of physical activity",
           "genre_code": "ACT"
         }
       },
       {
         "_index": "movie_with_genre",
         "_id": "2",
         "_score": 1,
         "_source": {
           "genre_description": "Drama - Movies with serious, emotional, and often realistic stories",
           "genre_code": "DRM"
         }
       },
       {
         "_index": "movie_with_genre",
         "_id": "3",
         "_score": 1,
         "_source": {
           "genre_description": "Comedy - Movies designed to make the audience laugh",
           "genre_code": "COM"
         }
       },
       ...
     ]
      }
    }

    Considerations

  • The Painless script calculates a 10% discount on the price.
  • Runtime fields are defined in the index mappings and can be used for querying and aggregations without being stored in the index.

Clean-up (optional)

  • Delete the final index

    DELETE movie_with_genre
  • Delete the ingest pipeline and the enrich policy

    DELETE _ingest/pipeline/movie_genre_pipeline
    DELETE _enrich/policy/movie-genre-policy
  • Delete the movie and genre indices

    DELETE movie
    DELETE genre

Documentation

4.6 Task: Define runtime fields to retrieve custom values using Painless scripting

Example 1: Creating a runtime field for discounted prices in a product catalog

Requirements

  • Create a mapping for the product_catalog index
    • Include runtime field discounted_price to calculate a discount on product prices.
    • Apply a Painless script to dynamically compute the discounted price.
    • Ensure the runtime field is available for queries and aggregations.

Steps

  1. Open the Kibana Console or use a REST client

  2. Define the index with appropriate mappings:

    PUT /product_catalog
    {
      "mappings": {
        "properties": {
          "product_id": {
            "type": "keyword"
          },
          "name": {
            "type": "text"
          },
          "description": {
            "type": "text"
          },
          "price": {
            "type": "double"
          }
        },
        "runtime": {
          "discounted_price": {
            "type": "double",
            "script": {
              "source": """
                if (doc['price'].size() != 0) {
                  emit(doc['price'].value * 0.9);
                } else {
                  emit(Double.NaN);
                }
              """
            }
          }
        }
      }
    }
  3. Index sample documents using the _bulk endpoint:

    POST /product_catalog/_bulk
    { "index": { "_id": "1" } }
    { "product_id": "p001", "name": "Product 1", "description": "Description of product 1", "price": 20.0 }
    { "index": { "_id": "2" } }
    { "product_id": "p002", "name": "Product 2", "description": "Description of product 2", "price": 30.0 }

Test

  1. Search the indexed documents and retrieve the runtime field

    GET /product_catalog/_search
    {
      "_source": ["name", "price"],
      "fields": ["discounted_price"],
      "query": {
        "match_all": {}
      }
    }
    // edited response
    {
      ...
        "hits": [
          {
            "_index": "product_catalog",
            "_id": "1",
            "_score": 1,
            "_source": {
              "name": "Product 1",
              "price": 20
            },
            "fields": {
              "discounted_price": [
                18
              ]
            }
          },
          {
            "_index": "product_catalog",
            "_id": "2",
            "_score": 1,
            "_source": {
              "name": "Product 2",
              "price": 30
            },
            "fields": {
              "discounted_price": [
                27
              ]
            }
          }
        ]
      }
    }
  2. Verify the discounted price in the search results

    GET /product_catalog/_search
    {
      "query": {
        "match_all": {}
      },
      "script_fields": {
        "discounted_price": {
          "script": {
            "source": "doc['price'].value * 0.9"
          }
        }
      }
    }

Considerations

  • The Painless script calculates a 10% discount on the price.
  • Runtime fields are defined in the index mappings and can be used for querying and aggregations without being stored in the index.

Clean-up (optional)

  • Delete the index

    DELETE product_catalog

Documentation

Example 2: Create a runtime field to extract the domain from a URL

Requirements

  • Create a mapping to the myindex index
    • Define a field called url
  • Extract the domain from a URL field using Painless scripting to define a runtime field named domain.

Steps

  1. Open the Kibana Console or use a REST client

  2. Create an index with a URL field:

    PUT /myindex
    {
      "mappings": {
        "properties": {
          "url": {
            "type": "keyword"
          }
        }
      }
    }
  3. Define a runtime field to extract the domain:

    PUT myindex
    {
      "mappings": {
        "properties": {
          "url": {
            "type": "keyword"
          }
        },
        "runtime": {
          "domain": {
            "type": "keyword",
            "script": {
              "source": """
              // https://xyz.domain.com/stuff/stuff
            String domain = grok("%{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST:domain})?(?:%{URIPATHPARAM})?").extract(doc["url"].value)?.domain;
            if (domain != null) emit(domain);
            else emit("grok failed");
          """
            }
          }
        }
      }
    }
  4. Add documents to the index:

    POST /myindex/_bulk
    { "index": { "_index": "myindex" } }
    { "url": "https://www.example.com/path/to/page" }
    { "index": { "_index": "myindex" } }
    { "url": "http://sub.example.com/other/page" }

Test

  • Verify that the runtime field is working correctly:

    GET /myindex/_search
    {
      "query": {
        "match_all": {}
      },
      "fields": ["domain"]
    }

Considerations

  • The runtime field uses Painless scripting to extract the domain from the URL field.
  • The script splits the URL into components and returns the domain (including the sub-domain. Removing it involves ugly logic).

Clean-up (optional)

  • Delete the index

    DELETE myindex

Documentation

Example 3: Calculating the age difference in years based on date fields

Requirements

  • Create a mapping to the people index
  • Define a search query that utilizes a runtime field (current_age) to calculate the age difference in years between two date fields (date_of_birth and current_date) within the search results.

Steps

  1. Open the Kibana Console or use a REST client

  2. Create the index

    PUT people
    {
      "mappings": {
        "properties": {
          "date_of_birth": {
            "type": "date"
          },
          "current_date": {
            "type": "date"
          }
        },
        "runtime": {
          "current_age": {
            "type": "long",
            "script": {
              "source": """
              int birthday_year = ZonedDateTime.parse(doc["date_of_birth"].value.toString()).getYear();
              int today_year = ZonedDateTime.parse(doc["current_date"].value.toString()).getYear();
              long age = today_year - birthday_year;
              emit(age);
              """
            }
          }
        }
      }
    }
  3. Index sample documents

    POST /people/_bulk
    { "index": { "_index": "people", "_id": "1" } }
    { "name": "Alice", "date_of_birth": "1990-01-01", "current_date": "2024-07-08" }
    { "index": { "_index": "people", "_id": "2" } }
    { "name": "Bob", "date_of_birth": "1985-05-15", "current_date": "2024-07-08" }
    { "index": { "_index": "people", "_id": "3" } }
    { "name": "Charlie", "date_of_birth": "2000-12-25", "current_date": "2024-07-08" }
  4. Construct a search query and return the runtime field:

    GET people/_search
    {
      "query": {
        "match_all": {}
      },
      "fields": [
        "current_age"
      ]
    }

Test

  1. Ensure the documents in your index have date_of_birth and current_date fields in a compatible date format

    GET people/_search
  2. Run the search query and examine the response. The results should include an additional field named current_age representing the calculated age difference in years for each document.

    GET people/_search
    {
      "query": {
        "match_all": {}
      },
      "fields": [
        "current_age"
      ]
    }
    // edited responses
    {
      ...
        "hits": [
          {
            "_index": "people",
            "_id": "1",
            "_score": 1,
            "_source": {
              "name": "Alice",
              "date_of_birth": "1990-01-01",
              "current_date": "2024-07-08"
            },
            "fields": {
              "current_age": [
                34
              ]
            }
          },
          {
            "_index": "people",
            "_id": "2",
            "_score": 1,
            "_source": {
              "name": "Bob",
              "date_of_birth": "1985-05-15",
              "current_date": "2024-07-08"
            },
            "fields": {
              "current_age": [
                39
              ]
            }
          },
          {
            "_index": "people",
            "_id": "3",
            "_score": 1,
            "_source": {
              "name": "Charlie",
              "date_of_birth": "2000-12-25",
              "current_date": "2024-07-08"
            },
            "fields": {
              "current_age": [
                24
              ]
            }
          }
        ]
      }
    }

Considerations

  • The runtime field definition utilizes Painless scripting to perform the age calculation.
  • The script calculates the difference in years between current_date and date_of_birth to determine the user’s age.

Clean-up (optional)

  • Delete the index

    DELETE people

Documentation