Architecture

Siren Federate is designed around the following core requirements:

  • Low latency, real time interactive response – Siren Federate is designed to power ad hoc interactive, read only queries such as those sent from Siren Investigate.

  • Implementation of a fully featured relational algebra, capable of being extended for advanced join conditions, operations and statistical optimizations.

  • Flexible in-memory distributed computational framework.

  • Horizontal scaling of fully distributed operations, leveraging all the available nodes in the cluster.

  • Federated – capable of working on data that spans several Elasticsearch clusters.

Siren Federate is based on the following high level architecture concepts:

  • A coordinator node, which is in charge of the query parsing, query planning, and query execution. The Apache Calcite engine creates a logical plan of the query, optimizes the logical plan, and executes a physical plan.

  • A set of worker processes, which are in charge of executing the physical operations. Depending on the type of physical operation, a worker process is spawned on a per-node or per-shard basis.

  • An in-memory distributed file system that is used by the worker nodes to exchange data, with a compact columnar data representation optimized for analytical data processing, zero copy and zero data serialisation.

Distributed Join Workflow

When a (multi) search request is sent with one or more nested joins, the node that receives the request becomes the “Coordinator” node. The coordinator node is in charge of controlling and executing a “Job” across the available nodes in the cluster. A job represents the full workflow of the execution of a (multi) search request. A job is composed of one or more “Tasks”. A task represents a single type of operation, such as a Search/Project or Join, which is executed by a “Worker” on a node. A worker is a thread that performs a task and reports the outcome of the task to the coordinator.

For example, the following search request joining the index company with article:

curl -H 'Content-Type: application/json' -XGET 'http://localhost:9200/siren/company/_search' -d '
{
   "query" : {
      "join" : {
        "type": "HASH_JOIN",
        "indices" : ["article"],
        "on": ["id", "mentions"],
        "request" : {
          "query" : {
            "match_all": {}
          }
        }
      }
    }
}
'

will produce the following workflow:

Query Workflow

The coordinator executes a Search/Project task on every shard of the company and article indices. These tasks first execute a search query to compute the matching documents, then scan the id and mentions fields of the matching documents and, finally, shuffle them to all of the nodes of the cluster. Once these tasks are completed, the coordinator executes a Hash Join task on every node of the cluster. The Hash Join task joins the two streams of data that have been sent by the two previous Search/Project tasks to compute a set of document ids for the company index. These document identifiers are then transferred back to their respective shards and used to filter the company index.

This workflow allows Siren Federate to push all of the filtering predicates (for example, terms, range, or boolean queries) down to Elasticsearch, leveraging the indices for fast computation.

Query Planning & Optimisation

The coordinator node leverages Apache Calcite for planning the job execution. A search request is first parsed into an abstract syntax tree before being transformed into a logical relational plan. A set of rules will then be applied to optimize the logical plan. We leverage both the Hep and Volcano engines to optimize the logical plan using heuristic and statistical information. The logical plan is then transformed into a physical plan before being executed.

The physical plan represents a tree of tasks to be executed. The coordinator will try to execute tasks concurrently when possible. In the previous example, the two Search/Project tasks are executed concurrently, and the Hash Join task is executed only after the completion of the two Search/Project tasks.

When handling a multi search request, each request will be planned separately, each one producing a physical plan. However, before the execution of the physical plans, the planner will combine all the physical plans into a single one, by mapping identical operations to one single task. We can see that as a step to fold multiple trees of tasks into a single directed graph model, where overlapping operations across trees will become one single vertex in the graph. This is useful to reuse computation across multiple requests.

IO

The shuffling and transfer of data produced by a task is handled by a Collector. A collector will collect data, serialize it into a compact columnar data representation, and transfer it in the form of binary packets. Different collector strategies are implemented that are adapted to different tasks. For example, in case of a Hash Join, a Search/Project task will use a collector with a hash partitioning strategy to create small data partitions and shuffle these partitions uniformly across the cluster.

On the receiver side, when a packet is received, it is stored as is (without deserialization) in an in-memory data store. Tasks, such as the Join task, will directly work on top of these binary data packets in order to avoid unnecessary data copy and deserialization.

The binary data packets are created, stored, and manipulated off-heap. This helps to reduce unnecessary loads on the JVM and Garbage Collection when dealing with a large amount of data. Siren Federate leverages the Apache Arrow project for the allocation and management of off-heap byte arrays.