Federate Modules

To take advantage of Siren Federate features, every node in the Elasticsearch cluster must be equipped with the Federate plugin. Each node that has the Federate plugin installed loads all Federate modules described in this section to establish a Federate cluster. To change any settings, see Elasticsearch documentation at https://www.elastic.co/guide/en/elasticsearch/reference/8.15/settings.html

You can set the attribute "federate.enabled" to false to disable the loading of all Federate modules in a cluster node and prevent it from being involved in any Federate tasks.

Planner

The planner module is responsible in parsing a (multi) search request and generating a logical model. This logical model is then optimised by leveraging the rule-based Hep engine from Apache Calcite. The outcome is a physical query plan, which is then executed. The physical query plan is a Directed Acyclic Graph workflow composed of individual computing steps. The workflow is executed as a Job and the individual computing steps are executed as Tasks. We can therefore map one (multi) search request to a single job.

The planner settings starting with prefix "siren.*" have been deprecated and are scheduled to be removed in next major release.

federate.planner.pool.job.size

Control the maximum number of concurrent jobs being executed per node. Defaults to 1. This setting is static.

federate.planner.pool.job.queue_size

Control the size of the queue for pending jobs per node. Defaults to 100. This setting is static.

federate.planner.pool.tasks_per_job.size

Control the maximum number of concurrent tasks being executed per job. Defaults to 3. This setting is dynamic.

federate.planner.optimizer.cache.enable

Enable or disable a caching layer over Elasticsearch requests sent during query optimizations in order to gather statistics. Defaults to true. This setting is static.

federate.planner.optimizer.cache.refresh_interval

The minimum interval time for refreshing the cached response of a statistics-gathering request. The time unit is in minutes and defaults to 60 minutes. This setting is static.

federate.planner.optimizer.cache.maximum_size

The maximum number of requests response that can be cached. Defaults to 1,000,000. This setting is static.

federate.planner.field.metadata.cache.maximum_size

The maximum number of field metadata requests response that can be cached. Defaults to 100,000. Setting the value to 0 will disable the cache. This setting is static.

federate.planner.multi_conditional.index_lookup.limit

The maximum number of index lookups that can be efficiently processed by a multi-conditional index join. Defaults to 50,000. If the number of lookups is greater than this limit, the join fails with an exception. This setting only affects multi-conditional index joins. The number of index lookups is approximately the child set cardinality multiplied by the number of join conditions. More lookups might be necessary if the join conditions involve multi-valued document fields. Federate optimizes multi-conditional index joins to reduce the number of required index lookups as much as possible. This limit can be disabled with -1 value. This setting is dynamic. == Memory

The memory module is responsible for allocating and managing chunks of off-heap memory.

Memory management

In Siren Federate, data is encoded in a columnar format and stored off-heap. This method of memory management reduces the pressure on the Java virtual machine (JVM) and allows fast and efficient analytical operations.

Data is read directly from the off-heap storage and decoded on-the-fly by using zero-serialization and zero-copy memory. Zero-serialization improves performance by removing any serialization overhead, while zero-copy memory reduces CPU cycles and memory bandwidth overhead.

Siren Federate’s memory management allows for granular control over the amount of off-heap memory that can be allocated per node, per search request, and per query operator, while having the inherent ability to terminate queries when the off heap memory usage is reaching its configured limit.

In addition, the garbage collector automatically releases intermediate computation results and recovers the off-heap memory to decrease the impact on memory.

Off-heap storage is used only on the data nodes; master-only and coordinator nodes do not use off-heap memory.

Hierarchical model

The allocated memory is managed in a hierarchical model.

  • The root allocator is managing the memory allocation on a node level, and can have one or more job allocators.

  • A job allocator is created for each job (that is, a Siren Federate search request) and manages the memory allocation on a job level. A job can have one or more task allocators.

  • A task allocator is created for each task of a job (that is, a Siren Federate query operator) and manages the memory allocation on a task level.

Each allocator specifies a limit for how much off-heap memory it can use.

siren.memory.root.limit

Limit in bytes for the root allocator. Defaults to the minimum between 1GB and the maximum direct memory size (-XX:MaxDirectMemorySize) of the JVM. This setting is dynamic. Append k or kb for changing the unit to kilobytes, m or mb for megabytes, and g or gb for gigabytes. Units are case insensitive.

siren.memory.job.limit

Limit in bytes for the job allocator. Defaults to siren.memory.root.limit. This setting is dynamic. Append k or kb for changing the unit to kilobytes, m or mb for megabytes, and g or gb for gigabytes. Units are case insensitive.

siren.memory.task.limit

Limit in bytes for the task allocator. Defaults to siren.memory.job.limit. This setting is dynamic. Append k or kb for changing the unit to kilobytes, m or mb for megabytes, and g or gb for gigabytes. Units are case insensitive.

By default, the job limit is equal to the root limit, and the task limit is equal to the job limit. This facilitates a simple configuration in most common scenarios where only the root limit must be configured.

For more advanced scenarios, for example, when there are multiple concurrent users, you might need to tune the job and task limits to avoid errors. For example, a user executes a search request that consumes all of the available off-heap memory at the root level, leaving no memory for the search requests that are executed by other users.

As a rule of thumb, do not give more than half of the remaining OS memory to the Siren root allocator. Leave some memory for the OS cache and to cater for Netty’s memory management overhead.

For example, if Elasticsearch is configured with a 32GB heap on a machine with 64GB of RAM, this leaves 32GB to the OS. The maximum limit that one could set for the root allocator should be 16GB.

For more information, see Configuring off-heap memory.

IO

The IO module is responsible for encoding, decoding and shuffling data across the nodes in the cluster.

Vectorized Pipeline

This module introduces the concept of Vectorized Pipeline which is responsible for processing and collecting tuples created by a SearchProject or Join task and shuffling them across the shards or nodes in the cluster.

Tuples collected will be transferred in one or more packets. The size of a packet has an impact on the resources. Small packets will take less memory but will increase cpu times on the receiver side since it will have to reconstruct a tuple collection from many small packets. Large packets will reduce cpu usage on the receiver side, but at the cost of higher memory usage on the collector side and longer network transfer latency. The size of a packet can be configured with the following setting:

siren.io.pipeline.max_packet_size

The maximum size in bytes for a data packet. Must be a power of 2. Defaults to 1MB. Append k or kb for changing the unit to kilobytes, m or mb for megabytes, and g or gb for gigabytes. Units are case insensitive. This setting is dynamic.

siren.io.pipeline.hash.partitions_per_node

The number of partitions per node. Defaults to max((# of available_processors) - 1, 1). This setting is dynamic.

siren.io.pipeline.batch_size

The number of rows in a batch. Must be a power of two. Defaults to 65536. This setting is dynamic.

siren.io.pipeline.number_of_nodes

The number of data nodes that are used during the join computation. Defaults to all available nodes. This setting is dynamic.

Thread Pools

Siren Federate introduces new thread pools:

federate.planner

For the query planner operations. Thread pool type is fixed with a size of 2 * # of available_processors, and initial queue_size of 1000.

federate.data

For the data operations (create, upload, delete). Thread pool type is fixed with a size of max(# of available_processors - 1, 1), and initial queue_size of 1000.

federate.task.worker

For task worker threads. Thread pool type is fixed with a size of max((# of available_processors) - 1, 1), and initial queue_size of 10000.

Query Cache

Siren Federate extends the Elasticsearch’s query cache:

index.federate.queries.cache.enabled

Enable (default) or disable the Siren Federate query cache, used for caching join queries. This setting is dynamic.

federate.indices.queries.cache.size

Controls the memory size for the filter cache, defaults to 10%. This setting is static.

federate.indices.queries.cache.count

Controls the maximum number of entries in the cache, defaults to 1,000. This setting is static.

Internal

Siren Federate introduces new internal actions to be performed by an Elasticsearch cluster:

federate.internal.actions.timeout

A setting that controls the timeout for internal actions (e.g., data transfers) with a default value of 30s.