Siren Platform User Guide

Ingestion API

Ingestion management

The endpoint for ingestion management is at /_siren/connector/ingestion.

Ingestion creation and modification

An ingestion with a specific id can be updated by issuing a PUT request as follows:

PUT _siren/connector/ingestion/<id>
{
  "ingest": {
    "datasource": "postgres",
    "query": "select * from events {{#max_primary_key}}WHERE id>{{max_primary_key}}{{/max_primary_key}} limit 10000",
    "batch_size": 10,
    "schedule": "0 0 * * * ?",
    "enable_scheduler": true,
    "target": "events",
    "staging_prefix": "staging-index",
    "strategy": "REPLACE",
    "pk_field": "id",
    "mapping": {
      "properties": {
        "id": { "type": "long" },
        "value": { "type": "keyword" },
        "location": { "type": "text" },
        "geolocation": { "type": "geo_point" }
        }
    },
    "pipeline": {
      "processors": [
          {
            "set" : {
              "field": "foo",
              "value": "bar"
            }
          }
      ]
    },
    "transforms": [{
      "input": [{"source": "id"}],
      "output": "id",
       "mapping": {
        "type": "long"
      }
    },{
      "input": [
        {"source": "lat"},
        {"source": "lon"}
      ],
      "output": "geolocation",
      "transform": "geo_point",
      "mapping": {
        "type": "geo_point"
      }
    }],
    "ds_credentials": {
      "username": "user",
      "password": "pass"
    },
    "es_credentials": {
       "username": "user",
       "password": "pass"
    },
    "description": "description"
  }
}

Body parameters:

  • ingest: the properties of the ingestion.

Ingest configuration parameters:

  • datasource: the name of a datasource.
  • query: the template query passed to the JDBC driver collecting the record to ingest.
  • batch_size: An optional batch size (overriding the default global value).
  • schedule: An optional schedule using the cron syntax.
  • enable_schedule: enable or disable the scheduled execution.
  • target: A target Elasticsearch index name.
  • staging_prefix: An optional prefix for the staging Elasticsearch index.
  • strategy: An update strategy. It can be either INCREMENTAL or REPLACE.
  • pk_field: A primary key field name.
  • mapping: An Elasticsearch mapping definition.
  • pipeline: An optional pipeline configuration.
  • transforms: A sequence of transforms to map the fields declared by the query to the fields in the Elasticsearch mapping definition.
  • ds_credentials: An optional set of credentials used to connect to the database.
  • es_credentials: The optional credentials that will be used to perform Elasticsearch requests during jobs.
  • description: An optional description.

Strategy:

There are two available ingestion strategies:

  • INCREMENTAL: The index is created if it does not exists. The ingested records are inserted or updated in place.
  • REPLACE: The index name is an alias to a staging index. The ingested records are inserted on the staging index. When the ingestion is done the alias is moved from the previous staging index to the new one. If anything wrong happens the alias is untouched and point to the previous staging index.

Ingestion query:

The query defined in the ingestion configuration is written in the datasource language. The query can be written using mustache and the following variables are provided, if applicable, when converting the query to a string:

  • max_primary_key: the maximum value of the primary key in Elasticsearch.
  • last_record_timestamp: the UTC timestamp at which the last record was successfully processed by an ingestion job.
  • last_record: an object with the scalar values in the last record that was successfully processed by the ingestion job.

Mapping transform:

A mapping transform takes one or more fields from a datasource record as inputs and outputs a field that can be indexed with a valid Elasticsearch type.

A mapping transform has the following properties:

  • input: a sequence of inputs, where an input can be either the name of a field defined in the job query or the name of a field in the target Elasticsearch mapping.
  • transform: the name of a predefined function that takes as input the values of the fields specified in the input parameter and the mapping properties of the target Elasticsearch field. The function outputs the value to be indexed; if transform is not set, the system uses a generic cast function to create the output.
  • output: the name of the target Elasticsearch field.

Input:

The input structure must provide one of the following properties:

  • source: the name of a field defined in the job query.
  • target: the name of a field in the target Elasticsearch mapping.

Transforms (“predefined functions”):

  • copy: a default cast transform that produces a scalar Elasticsearch value in a way analogous to how the connector already translates JDBC types to Elasticsearch types. If the JDBC driver reports array fields / struct fields correctly, they will be written as Elasticsearch arrays. Any JDBC type that is not supported / not recognized causes an exception.
  • geo_point: transform that produces a geo_point value from two numerical inputs, where the first is the latitude and the second the longitude.
  • array: an array transform that produces an array Elasticsearch value from a comma separated string field in a record.

Credential parameters (for ElasticSearch or the JDBC database):

If the user does not have the permission to manage datasources in the cluster these credentials are mandatory.

  • username: the login to use to connect to the resource.
  • password: the password to use to connect to the resource.
Ingestion retrieval

The ingestion properties can be retrieved by issuing a GET request as follows:

GET /_siren/connector/ingestion/<id>
Ingestion deletion

To delete an ingestion, issue a DELETE request as follows:

DELETE /_siren/connector/ingestion/<id>
Ingestion listing

To list the ingestions configured in the system, issue a GET request as follows:

GET _siren/connector/ingestion/_all?status=[false|true]&detailed=[false|true]

Note: GET _siren/connector/ingestion/_search API has been deprecated and is scheduled to be removed in next major release.

If the optional status parameter is set to true, it also returns the last job status, and the last job log.

If the optional detailed parameter(true by default) is set to false, then mapping, pipeline, transforms and removed_fields are not returned.

Ingestion validation

To validate the connection to an ingestion, issue a POST request as follows:

POST _siren/connector/ingestion/<id>/_validate
Run an ingestion job

To execute an ingestion job, issue a POST request as follows:

POST _siren/connector/ingestion/<id>/_run

The response returns the jobId that can be use to track the status of the running job:

{
  "_id": "postgres-events",
  "_version": 49,
  "found": true,
  "jobId": "postgres-events"
}