Federate Modules
Planner
The planner module is responsible in parsing a (multi) search request and generating a logical model. This logical model is then
optimised by leveraging the rule-based Hep engine from Apache Calcite. The outcome is
a physical query plan, which is then executed. The physical query plan is a Directed Acyclic Graph workflow composed of
individual computing steps. The workflow is executed as a Job
and the individual computing steps are executed as
Tasks
. We can therefore map one (multi) search request to a single job.
siren.planner.pool.job.size
-
Control the maximum number of concurrent jobs being executed per node. Defaults to 1.
siren.planner.pool.job.queue_size
-
Control the size of the queue for pending jobs per node. Defaults to 100.
siren.planner.pool.tasks_per_job.size
-
Control the maximum number of concurrent tasks being executed per job. Defaults to 3.
siren.planner.volcano.use_query
-
Use contextual queries when computing statistics. If
false
, computed statistics are effectively "global" to the index. Defaults tofalse
. siren.planner.volcano.cache.enable
-
Enable or disable a caching layer over Elasticsearch requests sent during query optimizations in order to gather statistics. Defaults to
true
. siren.planner.volcano.cache.refresh_interval
-
The minimum interval time for refreshing the cached response of a statistics-gathering request. The time unit is in minutes and defaults to
60
minutes. siren.planner.volcano.cache.maximum_size
-
The maximum number of requests response that can be cached. Defaults to 1,000,000.
siren.planner.field.metadata.cache.maximum_size
-
The maximum number of field metadata requests response that can be cached. Defaults to 100,000. Setting the value to 0 will disable the cache.
Memory
The memory module is responsible for allocating and managing chunks of off-heap memory.
Memory management
In Siren Federate, data is encoded in a columnar format and stored off-heap. This method of memory management reduces the pressure on the Java virtual machine (JVM) and allows fast and efficient analytical operations.
Data is read directly from the off-heap storage and decoded on-the-fly by using zero-serialization and zero-copy memory. Zero-serialization improves performance by removing any serialization overhead, while zero-copy memory reduces CPU cycles and memory bandwidth overhead.
Siren Federate’s memory management allows for granular control over the amount of off-heap memory that can be allocated per node, per search request, and per query operator, while having the inherent ability to terminate queries when the off heap memory usage is reaching its configured limit.
In addition, the garbage collector automatically releases intermediate computation results and recovers the off-heap memory to decrease the impact on memory.
Off-heap storage is used only on the data nodes; master-only and coordinator nodes do not use off-heap memory.
Hierarchical model
The allocated memory is managed in a hierarchical model.
-
The
root
allocator is managing the memory allocation on a node level, and can have one or morejob
allocators. -
A
job
allocator is created for each job (that is, a Siren Federate search request) and manages the memory allocation on a job level. Ajob
can have one or moretask
allocators. -
A
task
allocator is created for each task of a job (that is, a Siren Federate query operator) and manages the memory allocation on a task level.
Each allocator specifies a limit for how much off-heap memory it can use.
siren.memory.root.limit
-
Limit in bytes for the root allocator. Defaults to two-thirds of the maximum direct memory size of the JVM.
siren.memory.job.limit
-
Limit in bytes for the job allocator. Defaults to
siren.memory.root.limit
. siren.memory.task.limit
-
Limit in bytes for the task allocator. Defaults to
siren.memory.job.limit
.
By default, the job limit is equal to the root limit, and the task limit is equal to the job limit. This facilitates a simple configuration in most common scenarios where only the root limit must be configured.
For more advanced scenarios, for example, when there are multiple concurrent users, you might need to tune the job and task limits to avoid errors. For example, a user executes a search request that consumes all of the available off-heap memory at the root level, leaving no memory for the search requests that are executed by other users.
As a rule of thumb, do not give more than half of the remaining OS memory to the Siren root allocator. Leave some memory for the OS cache and to cater for Netty’s memory management overhead. |
For example, if Elasticsearch is configured with a 32GB heap on a machine with 64GB of RAM, this leaves 32GB to the OS. The maximum limit that one could set for the root allocator should be 16GB.
For more information, see Configuring off-heap memory.
IO
The IO module is responsible for encoding, decoding and shuffling data across the nodes in the cluster.
Tuple Collector
This module introduces the concept of Tuple Collectors
which are responsible for collecting tuples created by a
SearchProject
or Join
task and shuffling them across the shards or nodes in the cluster.
Tuples collected will be transferred in one or more packets
. The size of a packet has an impact on the resources. Small
packets will take less memory but will increase cpu times on the receiver side since it will have to reconstruct a tuple collection from
many small packets. Large packets will reduce cpu usage on the receiver side, but at the cost of higher memory usage
on the collector side and longer network transfer latency. The size of a packet can be configured with the following
setting:
siren.io.tuple.collector.packet_size
-
The number of tuples in a packet. The packet size must be a power of 2. Defaults to 2^20 tuples.
When using the Hash Join, the collector will use a hash partitioner strategy to create small data partitions. Creating multiple small data partitions helps in parallelizing the join computation, as each worker thread for the join task will be able to pick and join one partition independently of the others. Setting the number of data partitions per node to 1 will cancel any parallelization. The number of data partitions per node can be configured with the following setting:
siren.io.tuple.collector.hash.partitions_per_node
-
The number of partitions per node. The number of partitions must be a power of 2. Defaults to 32.
siren.io.tuple.collector.hash.number_of_nodes
-
The number of data nodes that are used during the join computation. This defaults to all available nodes.
Thread Pools
Siren Federate introduces new thread pools:
federate.planner
-
For the query planner operations. Thread pool type is
fixed_auto_queue_size
with a size of2 * # of available_processors
, and initial queue_size of1000
. federate.data
-
For the data operations (create, upload, delete). Thread pool type is
scaling
. federate.task.worker
-
For task worker threads. Thread pool type is
fixed_auto_queue_size
with a size ofmax((# of available_processors) - 1, 1)
, and initial queue_size of1000
. federate.connector.query
-
For connector query operations. Thread pool type is
fixed
with a size ofint((# of available_processors * 3) / 2) + 1
, and queue size1000
. federate.connector.jobs.management
-
For connector job management operations like starting and stopping ingestion jobs. Thread pool type is
scaling
. federate.connector.jobs
-
For job worker threads like ingestion jobs and related concurrent indexing bulk requests. Thread pool type is
fixed
with a size of4
, and a queue_size with100
. federate.connector.internal
-
For connector internal cluster communications. Thread pool type is
scaling
.
Query Cache
Siren Federate extends the Elasticsearch’s query cache:
index.federate.queries.cache.enabled
-
Enable (default) or disable the Siren Federate query cache, used for caching join queries.
federate.indices.queries.cache.size
-
Controls the memory size for the filter cache, defaults to 10%.
federate.indices.queries.cache.count
-
Controls the maximum number of entries in the cache, defaults to 1,000.
Connector
The Federate Connector module supports the following node configuration settings, which can be set on JDBC-enabled nodes:
siren.connector.datasources.index
-
The index in which Federate will store datasource configurations.
siren.connector.query.project_max_size
-
A setting that controls how much data flows between datasources or between a datasource and the Elasticsearch cluster. Defaults to
50000
records transferred between systems consisting in the projected values, e.g., joined values. siren.connector.siren.timeout.connection
-
the maximum amount of seconds to wait when establishing or acquiring a JDBC connection (
30
by default). siren.connector.timeout.query
-
the maximum execution time for JDBC queries, in seconds (
30
by default). siren.connector.enable_union_aggregations
-
true
by default, can be set to false to disable the use of unions in nested aggregations. siren.connector.query.max_bucket_queries
-
the maximum number of JDBC queries that will be generated to compute aggregation buckets. Defaults to
500
.