Siren Platform User Guide

Distributed join workflow

When sending a (multi) search request with one or more nested joins, the node receiving the request will become the “Coordinator”. The coordinator node is in charge of controlling and executing a “Job” across the available nodes in the cluster. A job represents the full workflow of the execution of a (multi) search request. A job is composed of one or more “Tasks”. A task represents a single type of operations, such as a Search/Project or Join, that is executed by a “Worker” on a node. A worker is a thread that will perform a task and report the outcome of the task to the coordinator.

For example, the following search request joining the index companies with articles:

GET /_siren/companies/search
{
   "query" : {
      "join" : {
        "type": "HASH_JOIN",
        "indices" : ["articles"],
        "on": ["id", "mentions"],
        "request" : {
          "query" : {
            "match_all": {}
          }
        }
      }
    }
}

will produce the following workflow:

UUID-c9a12fc9-2d7d-74c8-eac1-3bbdc0ec9dfd.png

Query Workflow

The coordinator will execute a Search/Project task on every shard of the companies and articles indices. These tasks will first execute a search query to compute the matching documents, then scan the id and mentions fields of the matching documents and shuffle them to all the nodes of the cluster. Once these tasks are completed, the coordinator will execute a Hash Join task on every node of the cluster. The Hash Join task will join the two streams of data that were sent by the two previous Search/Project tasks to compute a set of document ids for the companies index. These documents ids will be transferred back to their respective shards and used to filter the companies index.

This particular workflow enables Federate to push all the filtering predicates (e.g., terms, range, Boolean queries) down to Elasticsearch, leveraging the indices for fast computation. The Join task is currently limited to compute the intersection of two different set of documents based on a join condition. This reduces the amount of data allocated in memory, the amount of data transferred across the network, and the workload performed by a task.