Datasource reflection pipelines
Pipelines may be used to enrich documents before they are indexed to Elasticsearch. The Siren platform provides a JSON web service processor pipeline described in the next section. Elasticsearch ingest processors can be also be used and there are sample pipelines described in the section below Sample pipelines.
JSON Web Service Processor:
It may be used to call an external JSON web service and copy the returned JSON structure to the record. For example:
{ "description": "enriching documents from a web service", "processors": [ { "json-ws": { "resource_name": "siren-nlp", "method": "post", "url": "http://35.189.96.185/bio", "input_map": { "$.Abstract": "text" }, "output_map": { "Abstract_text_mined_entities": "$" }, "input_default": { "text": "''" } } } ] }
In this example for enriching documents from a web service, the configuration takes the value of the 'Abstract' field (path syntax) and posts a request {"text": abstract_value}
to http://35.189.96.185/bio. If the Abstract field is null, an empty string will be sent instead (input_default). The JSON response object is used as the value of a new field Abstract_text_mined_entities at the top level ($) of the document
This pipeline is defined with a 'json-ws' field inside processors, the following configurations are also available:
Name | Required | Default | Description |
---|---|---|---|
method | no | get | The HTTP method: get, post, put |
url | yes | - | The URL endpoint of the web service. |
requests_per_second | no | 0 | The expected maximum number of requests per second. |
resource_name | yes | - | Apply a name to the web service resource. Every processor instance with the same resource name are consolidated and submitted to the requests_per_second limit. |
input_map | yes | - | A map with a JSON Path expression as the key and a field name as the value. It builds the JSON structure that will be submitted to the external web service. |
input_default | no | - | A map with a field as the key and a default value. For a given field, this map provides a default value if the JSON Path expression of the input_map does not return any value. |
output_map | yes | - | A map with a field name as the key and a JSON Path expression as the value. The JSON Path expressions are applied to the JSON structure returned by the external web service. The indexed document is filled with results of the JSON Path expressions associated to the given field name. Already existing content for the field name is replaced. |
output_default | no | - | A map with a field as the key and a default value. For a given field, this map provides a default value if the JSON Path expression of the output_map does not return any value. |
error_output_field | no | - | If this field is not blank and an error occurs while calling the external external the field is filled with a the error message and the ingestion process is not stopped. If the field is empty, an exception is thrown. |
time_out | no | 300 | This timeout determines how many seconds should the request wait for a response before failing the request. |
username | no | - | If a username is provided the HTTP(S) connection to the external web service will use it as the username for an HTTP basic authentication. |
password | no | - | If a password is provided the HTTP(S) connection to the external web service will use it as the password for an HTTP basic authentication. |
Sample Pipelines
Split Fields:
To split a string, separated by delimiter "|" into a list of sub-strings, and if no initial string exists, fill the target field with an empty string
{ "description": "_description", "processors": [ { "split": { "on_failure": [ { "set": { "field": "parents", "value": "" } } ], "field": "parents", "separator": "\\|" } } ] }
Split Fields to a long:
To accomplish a similar goal, but this time convert each sub-string to a long, and if no value exists in the initial field, on failure set the target field to -1.
{ "description": "_description", "processors": [ { "split": { "on_failure": [ { "set": { "field": "parents", "value": -1 } } ], "field": "parents", "separator": "\\|" }, "convert": { "field": "parents", "type": "long" } } ] }
To extract text and create a new field (Using regex):
To extract the text between the first set of parentheses in the Title field and create a new field Patent_ID for it.
{ "description": "extract the text between the first set of parentheses", "processors": [ { "script": { "source": "def f = ctx['Title']; if(f != null){ def m= /\\((.*?)\\)/.matcher(f); m.find(); ctx.Patent_ID=m.group(1);)}" } } ] }
Note
You need to enable regex in the elasticsearch.yml file: script.painless.regex.enabled: true
Merge two field to create a geo_point:
Merge two fields providing 'latitude' and 'longitude' to create a single Elasticsearch geo_point field:
{ "description": "Create geo point field", "processors": [ { "set": { "field": "geo_location", "value": { "lat": "{{latitude_field}}", "lon": "{{longitude_field}}" } } } ] }