Joining your data

Join based on the metadata field _id

The metadata field _id is supported as a join key field in semi-join queries.

Example

Consider the following documents from two indices, company and people:

$ curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_bulk?pretty' -d '
{ "index" : { "_index" : "company", "_type" : "company", "_id" : "CoAcme" } }
{ "id": 1, "name" : "Acme", "ceo": "peo1" }
{ "index" : { "_index" : "company", "_type" : "company", "_id" : "CoBueno" } }
{ "id": 2, "name" : "Bueno" }
{ "index" : { "_index" : "company", "_type" : "company", "_id" : "CoArk" } }
{ "id": 3, "name" : "Ark" }

{ "index" : { "_index" : "people", "_type" : "person", "_id" : "peo1" } }
{ "id" : 1, "name" : "Alice", "employed_by" : "CoAcme" }
{ "index" : { "_index" : "people", "_type" : "person", "_id" : "peo2" } }
{ "id" : 2, "name" : "Bob", "employed_by" : "CoBueno" }
{ "index" : { "_index" : "people", "_type" : "person", "_id" : "peo3" } }
{ "id" : 3, "name" : "Carol", "employed_by" : "CoAcme" }
'

Suppose that the two indices are joined in order to retrieve a list of companies using the following request:

$ curl -H 'Content-Type: application/json' 'http://localhost:9200/siren/company/_search?pretty' -d '{
   "query" : {
      "join" : {
        "indices" : ["people"],
        "on" : ["_id", "employed_by"],  (1)
        "request" : {
          "query" : {
            "match_all" : {}
          }
        }
      }
    }
}'
1 The metadata field _id of the index company is used as the left join key field

The response should contain two hits, as follows:

{
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "company",
        "_type" : "company",
        "_id" : "CoBueno",
        "_score" : 1.0
      },
      {
        "_index" : "company",
        "_type" : "company",
        "_id" : "CoAcme",
        "_score" : 1.0
      }
    ]
  }
}

Suppose that the two indices are joined in order to retrieve a list of companies using the following request:

$ curl -H 'Content-Type: application/json' 'http://localhost:9200/siren/company/_search?pretty' -d '{
   "query" : {
      "join" : {
        "indices" : ["people"],
        "on" : ["ceo", "_id"],  (1)
        "request" : {
          "query" : {
            "match_all" : {}
          }
        }
      }
    }
}'
1 The metadata field _id of the index people is used as the right join key field

The response should contain one hit, as follows:

{
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "company",
        "_type" : "company",
        "_id" : "CoAcme",
        "_score" : 1.0
      }
    ]
  }
}

Join based on a runtime field

Runtime fields can also be used as part of a join. They can be as part of the parent index or the child index, as shown in the following example:

$ curl -H 'Content-Type: application/json' 'http://localhost:9200/siren/company/_search?pretty' -d '{
   "query" : {
      "join" : {
        "indices" : ["people"],
        "on" : ["rt_field_company", "rt_field_people"],         (1)
        "request" : {
            "runtime_mappings": {
                "rt_field_people": {
                  "type": "long",
                  "script": {
                    "source": "<put your script here>"          (2)
                  }
                }
              },
              "query" : {
                "match_all" : {}
              }
        }
      }
    },
    "runtime_mappings": {
        "rt_field_company": {
          "type": "long",
          "script": {
            "source": "<put your script here>"                   (3)
          }
        }
      }
}'
1 Join on two runtime fields defined in the request
2 Runtime field for people index
3 Runtime field for company index

In this example, the join operation is performed on two runtime fields: rt_field_company and rt_field_people. The runtime mappings are defined within the request for both the parent (company) and child (people) indices. Replace <put your script here> with the appropriate Painless script to compute the desired values for each runtime field.

Join based on a composite key

Runtime fields can be used to create composite keys, that can be used to join on multiple conditions. For example, if you have first_name and last_name fields, you often want to join on both fields at the same time. You can use runtime fields to concatenate multiple fields, and use this composite key as a join key. Another use case example is if you have fields for IP address and network port, you can create a composite key <ip_address>:<port>.

Like in the previous section, they can be defined for the parent index, the child index, or both.

$ curl -H 'Content-Type: application/json' 'http://localhost:9200/siren/company/_search?pretty' -d '
{
  "query" : {
    "join" : {
      "indices" : ["people"],
      "on" : ["ceo_full_name", "rt_full_name"],     (1)
      "request" : {
        "runtime_mappings": {
          "rt_full_name": {                         (2)
            "type": "keyword",
            "script": {
              "source": "emit(doc['first_name'].value + ' ' + doc['last_name'].value)"
            }
          }
        },
        "query" : {
          "match_all" : {}
        }
      }
    }
  }
}'
1 Join between an indexed field and a runtime field
2 Runtime field for the people index, containing the concatenation of multiple fields

In this example, a join operation is performed between the ceo_full_name field of the company index and a runtime field rt_full_name. The runtime field is a composite key: a concatenation of the first_name and last_name of employees in the people index. This allows to join indices even when they don’t have directly corresponding keys.

Retrieving a projected field

A script field may be used to retrieve the values of a projected field for each hit, as shown in the following example.

Consider the following documents from two indices, company and people:

$ curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_bulk?pretty' -d '
{ "index" : { "_index" : "company", "_type" : "company", "_id" : "1" } }
{ "id": 1, "name" : "Acme" }
{ "index" : { "_index" : "company", "_type" : "company", "_id" : "2" } }
{ "id": 2, "name" : "Bueno" }

{ "index" : { "_index" : "people", "_type" : "person", "_id" : "1" } }
{ "id" : 1, "name" : "Alice", "age" : 31, "gender" : "Female", "employed_by" : 1 }
{ "index" : { "_index" : "people", "_type" : "person", "_id" : "2" } }
{ "id" : 2, "name" : "Bob", "age" : 42, "gender" : "Male", "employed_by" : 2 }
{ "index" : { "_index" : "people", "_type" : "person", "_id" : "3" } }
{ "id" : 3, "name" : "Carol", "age" : 26, "gender" : "Female", "employed_by" : 1 }
'

Suppose that the two indices are joined in order to retrieve a list of companies with the ages of all their respective employees using the following request:

$ curl -H 'Content-Type: application/json' 'http://localhost:9200/siren/company/_search?pretty' -d '{
   "query" : {
      "join" : {
        "indices" : ["people"],
        "on" : ["id", "employed_by"],
        "request" : {
          "project" : [
            { "field" : { "name" : "age", "alias" : "employee_age" } }  (1)
          ],
          "query" : {
            "match_all" : {}
          }
        }
      }
    },
    "script_fields" : {
      "employees_age" : {
        "script" : "doc.employee_age"                                    (2)
      }
    }
}'
1 Project the field age from index people as employee_age
2 Return a script field employees_age for each hit with the associated projected values

The response should contain two hits, one for each company, with the script field employees_age as follows:

{
  "hits" : {
    "total" : 2,
    "max_score" : 0.0,
    "hits" : [
      {
        "_index" : "company",
        "_type" : "company",
        "_id" : "2",
        "_score" : 0.0,
        "fields" : {
          "employees_age" : [
            42
          ]
        }
      },
      {
        "_index" : "company",
        "_type" : "company",
        "_id" : "1",
        "_score" : 0.0,
        "fields" : {
          "employees_age" : [
            26,
            31
          ]
        }
      }
    ]
  }
}

The projected field is accessed using the doc values API. In the example, the projected field employee_age is accessed using the syntax doc.employee_age.

Retrieving a runtime field

The runtime field mapping defined in the search request allows creating a field that exists only as part of the query. This new field can be projected like any other field using script fields.

$ curl -H 'Content-Type: application/json' 'http://localhost:9200/siren/company/_search?pretty' -d '{
   "query" : {
      "join" : {
        "indices" : ["people"],
        "on" : ["id", "employed_by"],
        "request" : {
            "runtime_mappings": {
                "rt_field": {
                  "type": "long",
                  "script": {
                    "source": "<put your script here>"                   (1)
                  }
                }
              },
              "project" : [
                { "field" : { "name" : "rt_field" } }                    (2)
              ],
              "query" : {
                "match_all" : {}
              }
        }
      }
    },
    "script_fields" : {
      "fetched_rt_field" : {
        "script" : "doc.rt_field"                                         (3)
      }
    }
}'
1 The runtime field defined in the request
2 To project the new field
3 To fetch the new field into the response if needed

In this example, a runtime field named rt_field is created with a Painless script. Replace <put your script here> with the appropriate script to compute the desired value for the runtime field. The new runtime field is projected and retrieved into the response using the script_fields parameter.

Filter using a projected field

Runtime fields can also be used to filter on projected data.

$ curl -H 'Content-Type: application/json' 'http://localhost:9200/siren/company/_search?pretty' -d '
{
  "runtime_mappings": {
    "employee_count": {
      "type": "long",
      "script": "emit(doc.employees.length)"                          (1)
    }
  },
  "query": {
    "bool": {
      "must": [
        {
          "join": {
            "indices": ["people"],
            "on": ["id", "employed_by"],
            "request": {
              "project": [
                {
                  "field": {"name": "id", "alias": "employees"}       (2)
                }
              ],
              "query": {
                "match_all": {}
              }
            }
          }
        }
      ],
      "filter": [
        {
          "range" : {
            "employee_count" : { "gt" : 1 }                           (3)
          }
        }
      ]
    }
  }
}'
1 The runtime field used to collect the projected fields
2 To project the id field for all matching documents in the people index
3 To filter the documents in the company index, using the value from the runtime field

In this example, the runtime field employee_count collects all the projected values from the people index, and returns the number of values. This runtime field is then used in the filter clause to return only the companies that have more than 1 employee.

Sorting based on a projected field

A script-based sorting method can be used to sort the hits based on the values of a projected field, for example:

$ curl -H 'Content-Type: application/json' 'http://localhost:9200/siren/company/_search?pretty' -d '{
   "query" : {
     "join" : {
       "indices" : ["people"],
       "on" : ["id", "employed_by"],
       "request" : {
          "project" : [
            { "field" : { "name" : "age", "alias" : "employee_age" } }
          ],
         "query" : {
           "match_all" : {}
         }
       }
     }
   },
   "runtime_mappings": {
    "employee_ages": {
      "type": "long",
      "script": "int sum = 0; for (value in doc.employee_age) { sum += value } emit(sum);"
     }
   },
   "sort": [
    {
      "employee_ages": "desc"
    }
  ]
}'

The response should contain two hits, one for each company, sorted by the sum of their employees age as follows:

{
  "hits" : {
    "total" : 2,
    "max_score" : null,
    "hits" : [
      {
        "_index" : "company",
        "_type" : "company",
        "_id" : "1",
        "_score" : null,
        "_source" : {
          "id" : 1,
          "name" : "Acme"
        },
        "sort" : [
          57.0
        ]
      },
      {
        "_index" : "company",
        "_type" : "company",
        "_id" : "2",
        "_score" : null,
        "_source" : {
          "id" : 2,
          "name" : "Bueno"
        },
        "sort" : [
          42.0
        ]
      }
    ]
  }
}

In this example, the join operation is performed between the company and people indices, and the projected field employee_age is created. The script-based sorting method is used to sort the hits based on the sum of employees' ages in the employee_ages runtime field. The search results are sorted in descending order, with the company having the highest sum of employees' ages appearing first in the search results.

Spatial join

Federate supports spatial joins by applying inequalities between one or more fields. For example, documents with a date field in the parent index can be joined with the child index where the date is between an initial_date field and an end_date field of the child index. The previous statement can be translated to the following boolean expression date > initial_date && date < end_date.

The previous spatial join can be expressed as follows:

curl -H 'Content-Type: application/json' -XGET 'http://localhost:9200/siren/parent_index/_search' -d '
{
  "query" : {
    "join" : {
      "indices" : ["child_index"],
      "on" :
        {
          "must": [ (1)
            {"gt": {"fields": ["date", "initial_date"] }}, (2)
            {"lt": {"fields": ["date", "end_date"] }} (3)
          ],
        }
        "request" : {
          "query" : {
            "match_all" : {}
          }
        }
    }
  }
}
'
1 The join is the conjunction of 2 conditions.
2 Match documents where the date is greater than initial_date.
3 In addition, this matches documents where the date is less than end_date.

In this example, a spatial join is performed between the parent_index and child_index based on the date, initial_date, and end_date fields. The join condition requires the parent index’s date field to be greater than the child index’s initial_date field and less than the child index’s end_date field.

Scoring Capabilities

By default, the join filter returns a constant score. Therefore, the scores of the matching documents from the child set do not affect the scores of the matching documents from the parent set. However, you can project the document’s score from the child set and customize the scoring of the documents from the parent set with a script score function.

Document Score

The score of a matching document from a set may be projected using a standard field object using the special field name _score.

{
  "field" : {
    "name" : "_score",
    "alias" : "employee_score"
  }
}

Scoring based on a projected field

A script-based scoring method can be used to customize the scoring based on the values of a projected field. For example, you can project the score of the matching documents from the child set and aggregate them into the parent document as follows:

$ curl -H 'Content-Type: application/json' 'http://localhost:9200/siren/company/_search?pretty' -d '{
  "query": {
    "function_score": {
      "query": {
        "join": {
          "indices": [ "people" ],
          "on": [ "id", "employed_by" ],
          "request": {
            "project" : [
              { "field" : { "name" : "_score", "alias" : "child_score" } }
            ],
            "query": {
              "match_all": {}
            }
          }
        }
      },
      "functions": [
        {
          "script_score": {
            "script": {
              "lang": "painless",
              "source": "float sum = 0; for (value in doc.child_score) { sum += value } return sum;"
            }
          }
        }
      ],
      "score_mode": "multiply",
      "boost_mode": "replace"
    }
  }
}'

The response should contain two hits, one for each company, sorted by the sum of their child scores as follows:

{
  "hits" : {
    "total" : 2,
    "max_score" : 2.0,
    "hits" : [
      {
        "_index" : "company",
        "_type" : "company",
        "_id" : "1",
        "_score" : 2.0,
        "_source" : {
          "id" : 1,
          "name" : "Acme"
        }
      },
      {
        "_index" : "company",
        "_type" : "company",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "name" : "Bueno"
        }
      }
    ]
  }
}

Aggregating based on a projected field

A script can be used to access and aggregate the values of a projected field. For example, you can project the values of the field age of the matching documents from the people index and aggregate the documents from the company index by using these values as follows:

$ curl -H 'Content-Type: application/json' 'http://localhost:9200/siren/company/_search?pretty' -d '
{
 "query" : {
    "join" : {
      "indices" : ["people"],
      "on" : ["id", "employed_by"],
      "request" : {
        "project" : [
          { "field" : { "name" : "age", "alias" : "employee_age" } }
        ],
        "query" : {
          "match_all" : {}
        }
      }
    }
  },
  "runtime_mappings": {
    "employee_ages": {
      "type": "long",
      "script": "for (value in doc.employee_age) { emit(value) }"
    }
  },
  "aggs": {
    "by_company": {
      "terms": {
        "field": "name.keyword"
      },
      "aggs": {
        "average_employee_age": {
          "avg": {
            "field": "employee_ages"
          }
        }
      }
    }
  }
}'

The response contains an aggregation result with a bucket for each company, and a sub-aggregation average_employee_age corresponding to the average age of employees in this company.

{
  "aggregations": {
    "by_company": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "Acme",
          "doc_count": 1,
          "average_employee_age": {
            "value": 28.5
          }
        },
        {
          "key": "Bueno",
          "doc_count": 1,
          "average_employee_age": {
            "value": 42.0
          }
        }
      ]
    }
  }
}

In this example, the query joins the company index with the people index using the id and employed_by fields. The query projects the age field from the people index using an alias employee_age. The aggregation is performed using a runtime field employee_ages, which iterates through the projected employee_age field. The documents are first aggregated by company name, then a sub-aggregation average_employee_age computes the average age of employees in each bucket.