Search APIs
Siren Federate introduces the following new search actions:
-
/siren/<INDEX>/_search
replaces the/<INDEX>/_search
Elasticsearch action; and -
/siren/<INDEX>/_msearch
replaces the/<INDEX>/_msearch
Elasticsearch action.
Both actions are extensions of the original Elasticsearch actions and therefore support the same API.
You must use these actions with the join
query clause, as the join
query clause is not supported by the original Elasticsearch actions.
Search API
The search API allows you to execute a search query and get back search hits that match the query.
Scroll API
The scroll API allows to paginate search hits. Similarly to Elasticsearch, you pass a scroll
parameter to the Search API to set the duration of a scroll. Then to go through each pages or clear a scroll, you use the endpoint /siren/_search/scroll/<SCROLL_ID>
instead of the /_search/scroll/<SCROLL_ID>
indicated in the Elasticsearch documentation.
Multi Search API
The multi search API allows to execute several search requests within the same API.
Search Request
The syntax for the body of the search request is identical to the one supported by the Elasticsearch search API, with
the additional support for the join
query clause in the Query DSL.
Parameters
In addition to the parameters supported by the Elasticsearch search API, the Federate search API introduces the following additional parameters:
task_timeout
|
A task timeout, bounding a task to be executed within the specified time value (in milliseconds) and returns
with the values accumulated up to that point when expired. Defaults to no timeout ( |
debug
|
To retrieve debug information from the query planner. Defaults to |
Taking advantage of the join query cache
The
join query cache
is responsible for caching the results of a join query clause at the shard level. If an index has one or more replicas, it is recommended that you specify the preference
parameter of the search request.
If no preference
parameter is specified, the search request is processed against a random selection of shards. In such a scenario, the join query cache on every shard may differ and the chance of having a positive cache hit decreases.
For example, it is common practice to specify a user session ID as preference
, so that the same set of shards are selected across the search requests of a same user.
Search Response
The response returned by Federate’s search API is similar to the response returned by Elasticsearch’s search API.
It extends the response with a planner
object which includes information about the query plan execution.
is_pruned
The request response may have been truncated for several reasons and the flag is_pruned
indicates that the search results are incomplete in the following cases:
-
If the
task_timeout
parameter was set. -
If a shard failed.
query_plan
If the debug
parameter is enabled, it will also include detailed information and statistics about the query plan execution within a query_plan
object.
If the debug
parameter was disabled and the response was truncated, then a simplified query plan is displayed with information detailing the causes of
the truncation.
{
"_shards": {
"failed": 0,
"skipped": 0,
"successful": 5,
"total": 5
},
"hits": {
"hits": [],
"max_score": 0.0,
"total": 0
},
"planner": {
"is_pruned": true,
"is_truncated": true,
"node": "AYex2HdPTu-cwkqwaquH1w",
"query_plan": {
"children": [
{
"failures": [
{
"reason": "Unable to allocate buffer of size 2097152 due to memory limit. Current allocation: 0",
"type": "out_of_memory_exception"
}
],
"type": "SearchTaskBroadcastRequest"
}
],
"type": "SearchJoinRequest"
},
"timestamp": {
"start_in_millis": 1579776194845,
"stop_in_millis": 1579776195243,
"took_in_millis": 398
},
"took_in_millis": 398
},
"timed_out": false,
"took": 19
}
The |
Cancelling a request
A search or a multi search request can be cancelled explicitely by a user. In
order to do so, you need to pass a X-Opaque-Id
header which is used to
identify the request. The endpoint for cancelling a request is
/_siren/job/<ID>/_cancel
. By default, the cancel request will wait for all
tasks associated to the search to be cancelled. This can be disabled by passing
false
to the boolean parameter wait_for_completion
.
Usage
Let’s identify a search request with the name my-request
:
$ curl -H "Content-Type: application/json" -H "X-Opaque-Id: my-request" 'http://localhost:9200/siren/_search'
Then to cancel it, issue a request as follows:
$ curl -XPOST -H "Content-Type: application/json" 'localhost:9200/_siren/job/my-request/_cancel'
If successful, the response will acknowledge the request and give a listing of the cancelled tasks:
{
"acknowledged" : true,
"tasks" : [
{
"node" : "5ILUA44uSee-VxsBsNbsNA",
"id" : 947,
"type" : "transport",
"action" : "indices:siren/plan",
"description" : "federate query",
"start_time_in_millis" : 1524815599457,
"running_time_in_nanos" : 199131478,
"cancellable" : true,
"headers" : {
"X-Opaque-Id" : "my-request"
}
}
]
}
Validating a request
The explain
API provides information about the query planning of a search request, without executing it.
Request
curl -XGET 'http://localhost:9200/siren/<INDEX>/_explain'
curl -XPOST 'http://localhost:9200/siren/<INDEX>/_explain'
curl -XGET 'http://localhost:9200/siren/_explain'
curl -XPOST 'http://localhost:9200/siren/_explain'
Response
The explain response contains the id of the coordinator node and the physical query plan of the search request.
The query plan is a directed acyclic graph, where each node represents a task that is being executed on the cluster. The graph is represented as a tree to match the JSON data model. Therefore, it might contain duplicate tasks.
Each task node contains the following information:
type
|
Specifies the physical operator type, for example, |
is_cached
|
Indicates whether the physical operator is cached or not. |
request
|
Represents the associated search request for a |
row_type
|
Defines the rows that are being projected by the task. A row is composed of one or more columns. This parameter describes the names and data types of the columns. |
row_count
|
An estimation of the number of rows that will be projected. |
cost
|
An estimation of the execution cost of the task. This includes the network and I/O costs. |
cumulative_cost
|
An estimation of the cumulative execution cost of the task. It is the sum of the estimated execution cost of the task and all of its descendants. |
When applicable, the cost
object also details the costs of the different phases; select
and project
. This is the case for SearchJoinRel
and SearchJoinTaskRel
.
For more information about estimating the execution cost, see
Example of the network, memory, and I/O cost of joins.
For more information about the workflow phases, see Distributed join workflow.
Example responses
Hash join
POST /siren/index1/_explain?pretty=true
{
"query": {
"join": {
"indices": [
"index2"
],
"type": "HASH_JOIN",
"on": [
"foreign_key",
"id"
],
"request": {
"query": {
"bool": {
"filter": [
{
"term": {
"tag": {
"value": "aaa",
"boost": 1
}
}
}
],
"adjust_pure_negative": true,
"boost": 1
}
}
}
}
}
}
{
"node": "RC7OM86mQhGoEW4Q3LVXUg",
"query_plan": {
"request": "SearchJoinRequest{jobId=395f99f3-e1c0-43cf-9306-9fbb74c33753, contextIds=SearchLocks{contextIds=[[RC7OM86mQhGoEW4Q3LVXUg][index1][4]=>[jGiaHnYBx7CeZFoovdPz][36], [RC7OM86mQhGoEW4Q3LVXUg][index1][6]=>[jmiaHnYBx7CeZFoovdPz][38], [RC7OM86mQhGoEW4Q3LVXUg][index1][0]=>[iGiaHnYBx7CeZFoovdPy][32], [RC7OM86mQhGoEW4Q3LVXUg][index1][3]=>[i2iaHnYBx7CeZFoovdPz][35], [RC7OM86mQhGoEW4Q3LVXUg][index1][5]=>[jWiaHnYBx7CeZFoovdPz][37], [RC7OM86mQhGoEW4Q3LVXUg][index1][1]=>[iWiaHnYBx7CeZFoovdPy][33], [RC7OM86mQhGoEW4Q3LVXUg][index1][2]=>[imiaHnYBx7CeZFoovdPz][34]]}, innerRequest=SearchRequest{searchType=QUERY_THEN_FETCH, indices=[index1], indicesOptions=IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true], types=[], routing='null', preference='null', requestCache=false, scroll=null, maxConcurrentShardRequests=5, batchedReduceSize=512, preFilterShardSize=128, allowPartialSearchResults=null, localClusterAlias=null, getOrCreateAbsoluteStartMillis=-1, ccsMinimizeRoundtrips=true, source={\"query\":{\"doc_ids\":{\"job_id\":\"395f99f3-e1c0-43cf-9306-9fbb74c33753\",\"input_data_id\":\"-1296081227--1507322559-247037071\"}}}}}",
"row_type": [
"#0: _shard_id JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer)",
"#1: _segment_id JavaType(class io.siren.federate.core.planner.schema.PlannerType$Short)",
"#2: _doc_id JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer)",
"#3: _score JavaType(class io.siren.federate.core.planner.schema.PlannerType$Float)",
"#4: foreign_key MetadataType{digest=JavaType(class io.siren.federate.core.planner.schema.PlannerType$String) SEARCHABLE AGGREGATABLE}"
],
"type": "SearchJoinRel",
"physical_plan": "rel#254:SearchJoinRel.ELASTICSEARCH(input#0=ParallelHashSemiJoinTaskRel#252,invocation=SearchRequest{id=29a689d0-31b1-4163-8b23-5c92e1af7c9c},rowType=RecordType(JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer) _shard_id, JavaType(class io.siren.federate.core.planner.schema.PlannerType$Short) _segment_id, JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer) _doc_id, JavaType(class io.siren.federate.core.planner.schema.PlannerType$Float) _score, MetadataType{digest=JavaType(class io.siren.federate.core.planner.schema.PlannerType$String) SEARCHABLE AGGREGATABLE} foreign_key),elementType=class [Ljava.lang.Object;)",
"is_cached": false,
"row_count": 1,
"cost": {
"io": 0,
"network": 0,
"project": {
"io": 0,
"network": 0
},
"select": {
"io": 0,
"network": 0
}
},
"cumulative_cost": {
"io": 7,
"network": 12
},
"children": [
{
"request": "JoinTaskNodesRequest{jobId=395f99f3-e1c0-43cf-9306-9fbb74c33753, taskType=ParallelHashSemiJoinTask, left_input_data=1323318367--820606795--8891252244165294113--786400190, right_input_data=3386-316335040--8907078984324448671--786400190, output_data_id=-1296081227--1507322559-247037071, projection=[1, 2, 3], condition=(EQUALS, 0, 0), timeout=-1, collector={class=SegmentPartitionerTupleCollectorManager, target=[[RC7OM86mQhGoEW4Q3LVXUg][index1][4], [RC7OM86mQhGoEW4Q3LVXUg][index1][6], [RC7OM86mQhGoEW4Q3LVXUg][index1][0], [RC7OM86mQhGoEW4Q3LVXUg][index1][3], [RC7OM86mQhGoEW4Q3LVXUg][index1][5], [RC7OM86mQhGoEW4Q3LVXUg][index1][1], [RC7OM86mQhGoEW4Q3LVXUg][index1][2]]}}",
"row_type": [
"#0: _shard_id JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer)",
"#1: _segment_id JavaType(class io.siren.federate.core.planner.schema.PlannerType$Short)",
"#2: _doc_id JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer)"
],
"type": "ParallelHashSemiJoinTaskRel",
"physical_plan": "rel#252:ParallelHashSemiJoinTaskRel.SIREN(left=SearchJoinTaskRel#247,right=SearchJoinTaskRel#249,condition==($0, $4),joinType=inner)",
"is_cached": false,
"row_count": 1.5,
"cost": {
"io": 0,
"network": 5
},
"cumulative_cost": {
"io": 7,
"network": 12
},
"children": [
{
"request": "SearchTaskBroadcastRequest{jobId=395f99f3-e1c0-43cf-9306-9fbb74c33753, taskType=SearchProjectTask, indices=[index1], types=[], projection=[foreign_key:LONG:class io.siren.federate.core.planner.schema.PlannerType$Hashed:false, _shard_id:INT:class io.siren.federate.core.planner.schema.PlannerType$Integer:false, _segment_id:SHORT:class io.siren.federate.core.planner.schema.PlannerType$Short:false, _doc_id:INT:class io.siren.federate.core.planner.schema.PlannerType$Integer:false], collector={class=HashPartitionerTupleCollectorManager, target=[data:true]}, timeout=-1, output_data_id=1323318367--820606795--8891252244165294113--786400190, context_ids=SearchLocks{contextIds=[[RC7OM86mQhGoEW4Q3LVXUg][index1][4]=>[jGiaHnYBx7CeZFoovdPz][36], [RC7OM86mQhGoEW4Q3LVXUg][index1][6]=>[jmiaHnYBx7CeZFoovdPz][38], [RC7OM86mQhGoEW4Q3LVXUg][index1][0]=>[iGiaHnYBx7CeZFoovdPy][32], [RC7OM86mQhGoEW4Q3LVXUg][index1][3]=>[i2iaHnYBx7CeZFoovdPz][35], [RC7OM86mQhGoEW4Q3LVXUg][index1][5]=>[jWiaHnYBx7CeZFoovdPz][37], [RC7OM86mQhGoEW4Q3LVXUg][index1][1]=>[iWiaHnYBx7CeZFoovdPy][33], [RC7OM86mQhGoEW4Q3LVXUg][index1][2]=>[imiaHnYBx7CeZFoovdPz][34]]}, input_data_ids=[Lio.siren.federate.core.io.data.DataId;@13bdcd3b, source={\n \"match_all\" : {\n \"boost\" : 1.0\n }\n}}",
"row_type": [
"#0: foreign_key MetadataType{digest=JavaType(class io.siren.federate.core.planner.schema.PlannerType$Hashed) NOT NULL SEARCHABLE AGGREGATABLE}",
"#1: _shard_id JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer)",
"#2: _segment_id JavaType(class io.siren.federate.core.planner.schema.PlannerType$Short)",
"#3: _doc_id JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer)"
],
"type": "SearchJoinTaskRel",
"physical_plan": "rel#247:SearchJoinTaskRel.ELASTICSEARCH(invocation=SearchRequest{id=62a60c4d-1c65-414f-bdd6-4f1fd884cc71},rowType=RecordType(MetadataType{digest=JavaType(class io.siren.federate.core.planner.schema.PlannerType$Hashed) NOT NULL SEARCHABLE AGGREGATABLE} foreign_key, JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer) _shard_id, JavaType(class io.siren.federate.core.planner.schema.PlannerType$Short) _segment_id, JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer) _doc_id),elementType=class [Ljava.lang.Object;)",
"is_cached": false,
"row_count": 5,
"cost": {
"io": 5,
"network": 5,
"project": {
"io": 5,
"network": 5
},
"select": {
"io": 0,
"network": 0
}
},
"cumulative_cost": {
"io": 5,
"network": 5
}
},
{
"request": "SearchTaskBroadcastRequest{jobId=395f99f3-e1c0-43cf-9306-9fbb74c33753, taskType=SearchProjectTask, indices=[index2], types=[], projection=[id:LONG:class io.siren.federate.core.planner.schema.PlannerType$Hashed:false], collector={class=HashPartitionerTupleCollectorManager, target=[data:true]}, timeout=-1, output_data_id=3386-316335040--8907078984324448671--786400190, context_ids=SearchLocks{contextIds=[[RC7OM86mQhGoEW4Q3LVXUg][index2][4]=>[hWiaHnYBx7CeZFoovdPy][29], [RC7OM86mQhGoEW4Q3LVXUg][index2][2]=>[g2iaHnYBx7CeZFoovdPy][27], [RC7OM86mQhGoEW4Q3LVXUg][index2][0]=>[gWiaHnYBx7CeZFoovdPy][25], [RC7OM86mQhGoEW4Q3LVXUg][index2][1]=>[gmiaHnYBx7CeZFoovdPy][26], [RC7OM86mQhGoEW4Q3LVXUg][index2][5]=>[hmiaHnYBx7CeZFoovdPy][30], [RC7OM86mQhGoEW4Q3LVXUg][index2][3]=>[hGiaHnYBx7CeZFoovdPy][28], [RC7OM86mQhGoEW4Q3LVXUg][index2][6]=>[h2iaHnYBx7CeZFoovdPy][31]]}, input_data_ids=[Lio.siren.federate.core.io.data.DataId;@1e7af06, source={\n \"bool\" : {\n \"filter\" : [\n {\n \"term\" : {\n \"tag\" : {\n \"value\" : \"aaa\",\n \"boost\" : 1.0\n }\n }\n }\n ],\n \"adjust_pure_negative\" : true,\n \"boost\" : 1.0\n }\n}}",
"row_type": [
"#0: id MetadataType{digest=JavaType(class io.siren.federate.core.planner.schema.PlannerType$Hashed) NOT NULL SEARCHABLE AGGREGATABLE}"
],
"type": "SearchJoinTaskRel",
"physical_plan": "rel#249:SearchJoinTaskRel.ELASTICSEARCH(invocation=SearchRequest{id=4d4024cf-83ec-4a6a-8d1a-be5b034851fd},rowType=RecordType(MetadataType{digest=JavaType(class io.siren.federate.core.planner.schema.PlannerType$Hashed) NOT NULL SEARCHABLE AGGREGATABLE} id),elementType=class [Ljava.lang.Object;)",
"is_cached": false,
"row_count": 2,
"cost": {
"io": 2,
"network": 2,
"project": {
"io": 2,
"network": 2
},
"select": {
"io": 0,
"network": 0
}
},
"cumulative_cost": {
"io": 2,
"network": 2
}
}
]
}
]
}
}
Broadcast join
POST /siren/index1/_explain?pretty=true
{
"query": {
"join": {
"indices": [
"index2"
],
"type": "BROADCAST_JOIN",
"on": [
"foreign_key",
"id"
],
"request": {
"query": {
"bool": {
"filter": [
{
"term": {
"tag": {
"value": "aaa",
"boost": 1
}
}
}
],
"adjust_pure_negative": true,
"boost": 1
}
}
}
}
}
}
{
"node": "nW_8gimES2O-hU0jn3HZBw",
"query_plan": {
"request": "SearchJoinRequest{jobId=94662061-49d9-4ac4-bb70-93c2511abffa, contextIds=SearchLocks{contextIds=[[nW_8gimES2O-hU0jn3HZBw][index1][4]=>[YP21HnYBbiKmK-hXe9BC][4], [nW_8gimES2O-hU0jn3HZBw][index1][2]=>[Xv21HnYBbiKmK-hXe9A-][3], [nW_8gimES2O-hU0jn3HZBw][index1][6]=>[Yv21HnYBbiKmK-hXe9BN][5], [RXgnavPjTp6KSZmRzGTdmQ][index1][3]=>[Yf21HnYBbiKmK-hXe9BC][3], [nW_8gimES2O-hU0jn3HZBw][index1][0]=>[XP21HnYBbiKmK-hXe9A7][2], [RXgnavPjTp6KSZmRzGTdmQ][index1][1]=>[Xf21HnYBbiKmK-hXe9A-][1], [RXgnavPjTp6KSZmRzGTdmQ][index1][5]=>[X_21HnYBbiKmK-hXe9BB][2]]}, innerRequest=SearchRequest{searchType=QUERY_THEN_FETCH, indices=[index1], indicesOptions=IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true], types=[], routing='null', preference='null', requestCache=false, scroll=null, maxConcurrentShardRequests=5, batchedReduceSize=512, preFilterShardSize=128, allowPartialSearchResults=null, localClusterAlias=null, getOrCreateAbsoluteStartMillis=-1, ccsMinimizeRoundtrips=true, source={\"query\":{\"hash_semi_join\":{\"field\":\"foreign_key\",\"job_id\":\"94662061-49d9-4ac4-bb70-93c2511abffa\",\"input_data_id\":\"3386-591710918-1684886495832309826-2147154417\"}}}}}",
"row_type": [
"#0: _shard_id JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer)",
"#1: _segment_id JavaType(class io.siren.federate.core.planner.schema.PlannerType$Short)",
"#2: _doc_id JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer)",
"#3: _score JavaType(class io.siren.federate.core.planner.schema.PlannerType$Float)",
"#4: foreign_key MetadataType{digest=JavaType(class io.siren.federate.core.planner.schema.PlannerType$String) SEARCHABLE AGGREGATABLE}"
],
"type": "SearchJoinRel",
"physical_plan": "rel#53:SearchJoinRel.ELASTICSEARCH(input#0=SearchJoinTaskRel#48,invocation=SearchRequest{id=81d64797-66b7-427a-a7e2-8252d753bf1e},rowType=RecordType(JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer) _shard_id, JavaType(class io.siren.federate.core.planner.schema.PlannerType$Short) _segment_id, JavaType(class io.siren.federate.core.planner.schema.PlannerType$Integer) _doc_id, JavaType(class io.siren.federate.core.planner.schema.PlannerType$Float) _score, MetadataType{digest=JavaType(class io.siren.federate.core.planner.schema.PlannerType$String) SEARCHABLE AGGREGATABLE} foreign_key),elementType=class [Ljava.lang.Object;)",
"is_cached": false,
"row_count": 1,
"cost": {
"io": 5,
"network": 0,
"project": {
"io": 0,
"network": 0
},
"select": {
"io": 5,
"network": 0
}
},
"cumulative_cost": {
"io": 7,
"network": 4
},
"children": [
{
"request": "SearchTaskBroadcastRequest{jobId=94662061-49d9-4ac4-bb70-93c2511abffa, taskType=SearchProjectTask, indices=[index2], types=[], projection=[id:LONG:class io.siren.federate.core.planner.schema.PlannerType$Hashed:false], collector={class=BroadcastTupleCollectorManager, target=[nW_8gimES2O-hU0jn3HZBw, RXgnavPjTp6KSZmRzGTdmQ]}, timeout=-1, output_data_id=3386-591710918-1684886495832309826-2147154417, context_ids=SearchLocks{contextIds=[[nW_8gimES2O-hU0jn3HZBw][index2][0]=>[W_21HnYBbiKmK-hXe9Az][1]]}, input_data_ids=[Lio.siren.federate.core.io.data.DataId;@2a7d114b, source={\n \"bool\" : {\n \"filter\" : [\n {\n \"term\" : {\n \"tag\" : {\n \"value\" : \"aaa\",\n \"boost\" : 1.0\n }\n }\n }\n ],\n \"adjust_pure_negative\" : true,\n \"boost\" : 1.0\n }\n}}",
"row_type": [
"#0: id MetadataType{digest=JavaType(class io.siren.federate.core.planner.schema.PlannerType$Hashed) NOT NULL SEARCHABLE AGGREGATABLE}"
],
"type": "SearchJoinTaskRel",
"physical_plan": "rel#48:SearchJoinTaskRel.ELASTICSEARCH(invocation=SearchRequest{id=24aac4ed-e220-44d8-8803-aa4bac51e0bf},rowType=RecordType(MetadataType{digest=JavaType(class io.siren.federate.core.planner.schema.PlannerType$Hashed) NOT NULL SEARCHABLE AGGREGATABLE} id),elementType=class [Ljava.lang.Object;)",
"is_cached": false,
"row_count": 2,
"cost": {
"io": 2,
"network": 4,
"project": {
"io": 2,
"network": 4
},
"select": {
"io": 0,
"network": 0
}
},
"cumulative_cost": {
"io": 2,
"network": 4
}
}
]
}
}