Introduction

The Siren Federate plugin is a plugin for Elasticsearch that extends Elasticsearch with (1) a federation layer to query external databases with the Elasticsearch API and (2) join capabilities across indices and external databases.

Once configured, an external database is mapped to a “Virtual Index” in Elasticsearch. Elasticsearch’s requests on a virtual index, such as Get Mapping or Search, are intercepted by the Siren Federate plugin, translated into the external database dialect and executed against the external database. This enables Siren Investigate to create and display dashboards for data located both in Elasticsearch indices and external databases.

The Siren Federate plugin also extends the Elasticsearch DSL with a join query clause which enables the user to execute a join between indices. The join capabilities are implemented on top of a in-memory distributed computing layer which scales with the number of nodes available in the cluster.

The current join capabilities is currently limited to a (left) semi-join between two set of documents based on a common attribute, where the result only contains the attributes of one of the joined set of documents. This join is used to filter one document set based on a second document set, hence its name. It is equivalent to the EXISTS() operator in SQL. Joins on both numerical and textual fields are supported, but the joined attributes must be of the same type. You can also freely combine and nest multiple joins using boolean operators (conjunction, disjunction, negation) to create complex query plans. It is fully integrated with the Elasticsearch API and is compatible with distributed environments.

Architecture Overview

Our core requirements for Siren Federate are:

  • Low latency, real time interactive response – Siren Federate is designed to power ad hoc interactive, read only queries such as Siren Investigate.

  • Implement a full featured relational algebra, capable of being extended for more advanced join conditions, operations and statistical optimizations.

  • Flexible distributed computational framework

  • Horizontal scaling of fully distributed operations, leveraging all the cluster memory.

  • Federated – capable of working on data that is not inside the cluster, for example via JDBC connections.

Siren Federate is based on the following high level architecture concepts:

  • A coordinator node which is in charge of the query parsing, query planning and query execution. We are leveraging the Apache Calcite engine to create a logical plan of the query, optimise the logical plan and execute a physical plan.

  • A set of worker processes that are in charge of executing the physical operations. Depending on the type of physical operation, a worker process is spawned on a per node or per shard basis.

  • An in-memory distributed file system that is used by the worker nodes to exchange data, with a compact columnar data representation optimized for analytical data processing, zero copy and zero data serialisation.

How Does Siren Federate Join Compare With Parent-Child

The Siren Federate join is similar in nature to the Parent-Child feature of Elasticsearch: they perform a join at query-time. However, there are important differences between them:

  • The parent document and all of its children must live on the same shard, which limits its flexibility. The Siren Federate join removes this constraint and is therefore more flexible: it allows to join documents across shards and across indices.

  • Thanks to the data locality of the Parent-Child model, joins are faster and more scalable. The Siren Federate join on the contrary needs to transfer data across the network to compute joins across shards, limiting its scalability and performance.

There is no “one size fits all” solution to this problem, and you need to understand your requirements to choose the proper solution. As a basic rule, if your data model and data relationships are purely hierarchical (or can be mapped to a purely hierarchical model), then the Parent-Child model might be more appropriate. If on the contrary you need to query both directions of a data relationship, then the Siren Federate join might be more appropriate.

On Which Data Model It Operates

The most important requirement for executing a join is to have a common shared attribute between two indices. For example, let’s take a simple relational data model composed of two tables, Articles and Companies, and of one junction table ArticlesMentionCompanies to encode the many-to-many relationships between them.

This model can be mapped to two Elasticsearch indices, Articles and Companies. An article document will have a multi-valued field mentions with the unique identifiers of the companies mentioned in the article. In other words, the field mentions is a foreign key in the Articles table that refers to the primary key of the Companies table.

It should be straightforward for someone to write an SQL statement to flatten and map relationships into a single multi-valued field. We can see that, compared to a traditional database model where a junction table is necessary, the model is simplified by leveraging multi-valued fields.

Getting Started

In this short guide, you will learn how you can quickly install the Siren Federate plugin in Elasticsearch, load two collections of documents inter-connected by a common attribute, and execute a relational query across the two collections within the Elasticsearch environment.

Prerequisites

This guide requires that you have downloaded and installed the Elasticsearch 5.6.7 distribution on your computer. If you do not have an Elasticsearch distribution, you can run the following commands:

$ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.7.zip
$ unzip elasticsearch-5.6.7.zip
$ cd elasticsearch-5.6.7

Installing the Siren Federate Plugin

Before starting Elasticsearch, you have to install the Siren Federate plugin. Assuming that you are in your Elasticsearch installation directory, you can run the following command:

$ ./bin/elasticsearch-plugin install file:///PATH-TO-SIREN-FEDERATE-PLUGIN/siren-federate-5.6.7-10.0.0-beta-2-plugin.zip
-> Downloading file:///PATH-TO-SIREN-FEDERATE-PLUGIN/siren-federate-5.6.7-10.0.0-beta-2-plugin.zip
[=================================================] 100%  
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@     WARNING: plugin requires additional permissions     @
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
* java.io.FilePermission cloudera.properties read
* java.io.FilePermission simba.properties read
* java.lang.RuntimePermission accessClassInPackage.sun.misc
* java.lang.RuntimePermission accessClassInPackage.sun.misc.*
* java.lang.RuntimePermission accessClassInPackage.sun.security.provider
* java.lang.RuntimePermission accessDeclaredMembers
* java.lang.RuntimePermission createClassLoader
* java.lang.RuntimePermission getClassLoader
...
See http://docs.oracle.com/javase/8/docs/technotes/guides/security/permissions.html
for descriptions of what these permissions allow and the associated risks.

Continue with installation? [y/N]y
-> Installed siren-federate

In case you want to remove the plugin, you can run the following command:

$ bin/elasticsearch-plugin remove siren-federate

-> Removing siren-federate...
Removed siren-federate

Starting Elasticsearch

To launch Elasticsearch, run the following command:

$ ./bin/elasticsearch

In the output, you should see a line like the following which indicates that the Siren Federate plugin is installed and running:

[2017-04-11T10:42:02,209][INFO ][o.e.p.PluginsService     ] [etZuTTn] loaded plugin [siren-federate]

Loading Some Relational Data

We will use a simple synthetic dataset for the purpose of this demo. The dataset consists of two collections of documents: Articles and Companies. An article is connected to a company with the attribute mentions. Articles will be loaded into the articles index and companies in the companies index. To load the dataset, run the following command:

$ curl -XPUT 'http://localhost:9200/articles'
$ curl -XPUT 'http://localhost:9200/articles/_mapping/article' -d '
{
  "properties": {
    "mentions": {
      "type": "keyword"
    }
  }
}
'
$ curl -XPUT 'http://localhost:9200/companies'
$ curl -XPUT 'http://localhost:9200/companies/_mapping/company' -d '
{
  "properties": {
    "id": {
      "type": "keyword"
    }
  }
}
'

$ curl -XPUT 'http://localhost:9200/_bulk?pretty' -d '
{ "index" : { "_index" : "articles", "_type" : "article", "_id" : "1" } }
{ "title" : "The NoSQL database glut", "mentions" : ["1", "2"] }
{ "index" : { "_index" : "articles", "_type" : "article", "_id" : "2" } }
{ "title" : "Graph Databases Seen Connecting the Dots", "mentions" : [] }
{ "index" : { "_index" : "articles", "_type" : "article", "_id" : "3" } }
{ "title" : "How to determine which NoSQL DBMS best fits your needs", "mentions" : ["2", "4"] }
{ "index" : { "_index" : "articles", "_type" : "article", "_id" : "4" } }
{ "title" : "MapR ships Apache Drill", "mentions" : ["4"] }

{ "index" : { "_index" : "companies", "_type" : "company", "_id" : "1" } }
{ "id": "1", "name" : "Elastic" }
{ "index" : { "_index" : "companies", "_type" : "company", "_id" : "2" } }
{ "id": "2", "name" : "Orient Technologies" }
{ "index" : { "_index" : "companies", "_type" : "company", "_id" : "3" } }
{ "id": "3", "name" : "Cloudera" }
{ "index" : { "_index" : "companies", "_type" : "company", "_id" : "4" } }
{ "id": "4", "name" : "MapR" }
'

{
  "took" : 8,
  "errors" : false,
  "items" : [ {
    "index" : {
      "_index" : "articles",
      "_type" : "article",
      "_id" : "1",
      "_version" : 3,
      "status" : 200
    }
  },
  ...
}

Relational Querying of the Data

We will now show you how to execute a relational query across the two indices. For example, we would like to retrieve all the articles that mention companies whose name matches orient. This relational query can be decomposed in two search queries: the first one to find all the companies whose name matches orient, and a second query to filter out all articles that do not mention a company from the first result set. The Siren Federate plugin introduces a new Elasticsearch’s filter, named join, that allows to define such a query plan and a new search API _search that allows to execute this query plan. Below is the command to run the relational query:

$ curl -XGET 'http://localhost:9200/siren/articles/_search?pretty' -d '{
   "query" : {
      "join" : {                      (1)
        "indices" : ["companies"],    (2)
        "on" : ["mentions", "id"],    (3)
        "request" : {                 (4)
          "query" : {
            "term" : {
              "name" : "orient"
            }
          }
        }
      }
    }
}'
  1. The join query clause

  2. The source indices (i.e., companies)

  3. The clause specifying the paths for join keys in both source and target indices

  4. The search request that will be used to filter out companies

The command should return you the following response with two search hits:

{
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "articles",
      "_type" : "article",
      "_id" : "1",
      "_score" : 1.0,
      "_source":{ "title" : "The NoSQL database glut", "mentions" : ["1", "2"] }
    }, {
      "_index" : "articles",
      "_type" : "article",
      "_id" : "3",
      "_score" : 1.0,
      "_source":{ "title" : "How to determine which NoSQL DBMS best fits your needs", "mentions" : ["2", "4"] }
    } ]
  }
}

You can also reverse the order of the join, and query for all the companies that are mentioned in articles whose title matches nosql:

$ curl -XGET 'http://localhost:9200/siren/companies/_search?pretty' -d '{
   "query" : {
      "join" : {
        "indices" : ["articles"],
        "on": ["id", "mentions"],
        "request" : {
          "query" : {
            "term" : {
              "title" : "nosql"
            }
          }
        }
      }
    }
}'

The command should return you the following response with three search hits:

{
  "hits" : {
    "total" : 3,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "companies",
      "_type" : "company",
      "_id" : "4",
      "_score" : 1.0,
      "_source":{ "id": "4", "name" : "MapR" }
    }, {
      "_index" : "companies",
      "_type" : "company",
      "_id" : "1",
      "_score" : 1.0,
      "_source":{ "id": "1", "name" : "Elastic" }
    }, {
      "_index" : "companies",
      "_type" : "company",
      "_id" : "2",
      "_score" : 1.0,
      "_source":{ "id": "2", "name" : "Orient Technologies" }
    } ]
  }
}

Federate API

Node-level Settings

siren.planner.volcano.enable

Use the volcano statistical optimizer when selecting the join algorithms. Defaults to true.

siren.planner.volcano.join

Defines which distributed join algorithm to be selected when optimizing a request. Valid values are either HASH_JOIN or MERGE_JOIN, case-insensitive. Defaults to HASH_JOIN.

siren.planner.volcano.use_query

Use contextual queries when computing statistics. If false, computed statistics are effectively "global" to the index. Defaults to true.

Search API

This plugin introduces two new search actions, /siren/[INDICES]/_search that replaces the /[INDICES]/_search action, and /siren/[INDICES]/_msearch that replaces the /[INDICES]/_msearch action. Both actions are extensions of the original Elasticsearch actions and therefore supports the same API. One must use these actions with the join query clause, as the join query clause is not supported by the original Elasticsearch actions.

Search Request Parameters

debug

Enable query planner debug. Default to false.

task_timeout

A task timeout, bounding a task to be executed within the specified time value (in milliseconds) and bail with the values accumulated up to that point when expired. Default to no timeout.

Query Syntax

  • join: the name of the join query clause

  • type: the type of the join. Valid values are either BROADCAST_JOIN, HASH_JOIN or MERGE_JOIN.

  • indices: the index names that will be joined with the source indices (optional, default to all indices).

  • types: the index types that will be joined with the source indices (optional, default to all types).

  • on: an array specifying the paths for join keys in both source and target indices

  • request: the search request that will be used to filter out documents before performing the join

Example

In this example, we will join all the documents from index1 with the documents of index2. The query first filters documents from index2 and of type type with the query { "terms" : { "tag" : [ "aaa" ] } }. It then retrieves the ids of the documents from the field id specified by the parameter on. The list of ids is then used as filter and applied on the field foreign_key of the documents from index1.

GET /siren/index1/_search
{
  "join" : {
    "type": "HASH_JOIN",
    "indices" : ["index2"],
    "types" : ["type"],
    "on" : ["foreign_key", "id"],
    "request" : {
      "query" : {
        "terms" : {
          "tag" : [ "aaa" ]
        }
      }
    }
  }
}

Response Format

The response returned by the Federate’s search API is identical to the response returned by Elasticsearch’s search API.

Performance Considerations

Join Types

Siren Federate includes different join strategies: “Broadcast Join”, “Hash Join” and “Merge Join”. Each one has its pros and cons and the optimal performance will depends on the scenario. By default, the Siren Federate planner will try to automatically pick the best strategy, but it might be best in certain scenarios to pick manually one of the strategies.

The Broadcast Join is best when filtering a large index with a small set of documents. The Hash Join and Merge Join are fully distributed and are designed to handle large joins. They both scales horizontally (based on the number of nodes) and vertically (based on the number of cpu cores). Currently, the Hash Join usually performs better in many scenarios compared to the Merge Join.

Siren Federate provides two fully distributed join algorithms: the Hash Join and the Sort-Merge Join. Each one is designed for leveraging multi-core architecture. This is achieved by creating many small data partitions during the Project phase. Each node of the cluster will receive a number of partitions that are dependent of the number of cpus. Partitions are independent from each other and can be processed independently by a different join worker thread. During the join phase, each worker thread will join tuples from one partition. The number of join worker threads scales automatically with the number of cpu cores available.

The Hash Join is performed in two phases: build and probe. The build phase creates a in-memory hash table of one of the relation in the partition. The probe phase then scans the second relation and probes the hash table to find the matching tuples.

The Sort-Merge Join instead requires a sort phase of the two relations during the project phase. It then performs a linear scan over the two sorted relations to find the matching tuples.

Compared to the Hash Join, the Sort-Merge Join does not require additional memory since it does not have to build a in-memory hash table. However, it requires a sort operation to be executed during the project phase. It is in fact trading cpu for memory.

Numeric vs String Attributes

Joining numeric attributes is more efficient than joining string attributes. If you are planning to join attributes of type string, we recommend to generate a murmur hash of the string value at indexing time into a new attribute, and use this new attribute for the join. Such index-time data transformation can be easily done using Logstash’s fingerprint plugin.

Tuple Collector Settings

Tuple Collectors are sending batches of tuples of fixed size. The size of a batch has an impact on the performance. Smaller batches will take less memory but will increase cpu times on the receiver side since it will have to reconstruct a tuple collection from many small batches (especially for sorted tuple collection). By default, the size of a batch of tuple is set to 1048576 tuples (which represents 8mb for a column of long datatype). The size can be configured using the setting key siren.io.tuple.collector.batch_size with a integer value representing the maximum number of tuples in a batch.

License API for Siren Federate

Federate includes a license manager service and a set of rest commands to register, verify and delete a Siren’s license.

Without a valid license, Federate will log a message to notify that the current license is invalid at every request.

Usage

Let’s assume you have a Siren license named license.sig. You can upload and register this license in Elasticsearch using the command:

$ curl -XPUT -T "license.sig" 'http://localhost:9200/_siren/license'
---
acknowledged: true

You can then check the status of the license using the command:

$ curl -XGET 'http://localhost:9200/_siren/license'
{
  "license" : {
    "content" : {
      "valid-date" : "2016-05-16",
      "issue-date" : "2016-04-15",
      "max-nodes" : "12"
    },
    "isValid" : true
  }
}

To delete a license from Elasticsearch, you can use the command:

$ curl -XDELETE 'http://localhost:9200/_siren/license'
{"acknowledged":true}

Connector

JDBC Drivers

JDBC driver jars for remote datasources and their dependencies (if any) must be copied to the plugin directory alongside other jars; once the jars have been copied, restart the node and ensure that the node starts correctly.

If the node fails to start because of a JAR Hell exception, remove the driver and its dependencies and restart the node.

Impala JDBC Connector

The following jars should not be copied as they are already present in the default Elasticsearch path:

  • commons-logging

Configuration settings

The Federate connector supports the following node configuration settings:

  • siren.connector.datasources.index: the index in which Federate will store datasource configurations.

  • siren.connector.query.max_result_rows: the maximum number of rows returned when executing a query on a remote datasource (10 by default).

Security

When using Shield or Search Guard, Federate will need to authenticate as a user with all the permissions on the indices storing datasources and virtual indices configuration. The credentials of this user can be specified through the following node configuration settings:

  • siren.connector.username: the username of the Federate connector user.

  • siren.connector.password: the password of the Federate connector user.

Search Guard configuration

The following snippet can be added to sg_roles.yml to define a federateserver role with all the required permissions on connector indices.

federateserver:
  indices:
    ?siren-federate-datasources:
      '*':
        - ALL

The following snippet can be added to sg_roles.yml to define a federateuser role with all the required permissions to manage datasources:

federateuser:
  cluster:
    - "cluster:admin/siren/connector/datasource/*"