Connector APIs

In this section we present the APIs available to interact with datasources, virtual indices, ingestion jobs.

Permissions: To use the APIs listed in this section, ensure that the cluster-level wildcard action cluster:internal/federate/* is granted by the security system.

Configuring a JDBC-enabled node

You can ingest data from a JDBC datasource on a node where the Siren Federate plugin is installed.

Before you begin

The Elasticsearch cluster must contain at least one node that is enabled to issue queries over JDBC. It is recommended that you use a coordinating-only node for this role, although this is not a requirement for testing purposes.

To configure the JDBC datasource, you need an Avatica server. The Avatica server in turn connects to the remote JDBC database.

If your system needs additional encryption, generate a custom key by running the keygen.sh script.

Procedure

To configure a JDBC datasource, complete the following steps:

  1. Open the elasticsearch.yml file and add the following setting:

    node.attr.connector.jdbc: true
  2. Restart the Elasticsearch service.

Configuring encryption for JDBC datasources

JDBC passwords are encrypted by default by using a predefined 128-bit AES key. However, additional encryption is advisable in a production environment.

Before you create datasources, it is recommended that you generate a custom key by running the keygen.sh script that is included in the siren-federate plugin directory.

Procedure

  1. From the siren-federate plugin directory, run the following command:

    bash plugins/siren-federate/tools/keygen.sh -s 128

    The command outputs a random base64 key. It is also possible to generate keys longer than 128 bit if your JVM supports it.

  2. To use the custom key, set the following parameters in the elasticsearch.yml file on master nodes and on all of the JDBC nodes:

    • siren.connector.encryption.enabled: true by default, but can be set to false to disable JDBC password encryption.

    • siren.connector.encryption.secret_key: a base64 encoded AES key that is used to encrypt JDBC passwords.

Examples

The following are elasticsearch.yml settings that can be used for a master node with a custom encryption key:

siren.connector.encryption.secret_key: "1zxtIE6/EkAKap+5OsPWRw=="

The following are elasticsearch.yml settings for a JDBC node with a custom encryption key:

siren.connector.encryption.secret_key: "1zxtIE6/EkAKap+5OsPWRw=="
node.attr.connector.jdbc: true

After you save the configuration, restart the nodes to apply the settings.

Datasource API

In this section we present the API available to interact with datasources.

Siren Federate currently supports the following types of datasource:

  • federate to connect to a remote Federate cluster.

  • Elasticsearch to connect to a remote Elasticsearch cluster.

  • JDBC to connect to any external database providing a JDBC driver via an Avatica server.

Datasource management

The endpoint for datasource management is at /_siren/connector/datasource.

Datasource creation and modification

A datasource with a specific id can be created by issuing a PUT request. The body of the request varies with the type of the datasource. A datasource cannot be safely updated by using a PUT request due to a lack of concurrency control. By default, it is not allowed to update an existing document.

Permissions: To create a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/put is granted by the security system.

Federate datasource
curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_siren/connector/datasource/<id>' -d '
{
  "federate": {
    "alias": "remotename"
  }
}
'

Federate configuration parameters:

  • alias: The name of the configured cluster alias in the remote Federate cluster configuration.

Elasticsearch datasource
curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_siren/connector/datasource/<id>' -d '
{
  "elastic": {
    "alias": "remotename"
  }
}
'

Elasticsearch configuration parameters:

  • alias: The name of the configured cluster alias in the remote cluster configuration.

JDBC datasource
curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_siren/connector/datasource/<id>' -d '
{
  "jdbc": {
    "driver": "org.apache.calcite.avatica.remote.Driver",
    "url": "jdbc:avatica:remote:url=http://localhost:8765;serialization=json",
    "username": "username",
    "password": "password",
  }
}
'

JDBC configuration parameters:

  • driver: the class name of the JDBC driver.

  • url: the JDBC url of the datasource.

  • username: the username that will be passed to the JDBC driver when getting a connection (optional).

  • password: the password that will be passed to the JDBC driver when getting a connection (optional).

  • timezone: if date and timestamp fields are stored in a timezone other than UTC, specifying this parameter will instruct the plugin to convert dates and times to/from the specified timezone when performing queries and retrieving results.

  • properties: a map of JDBC properties to be set when initializing a connection.

When updating the datasource, if there was already a password, you have to pass it again. You can either pass the new clear password, or remove the password property to keep the previous one.

Datasource retrieval

The datasource configuration can be retrieved by issuing a GET request as follows:

curl -XGET 'http://localhost:9200/_siren/connector/datasource/<id>'

If you want to update the datasource and keep the current password, just remove the "password" property.

Permissions: To allow the retrieval of a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/get is granted by the security system.

Datasource deletion

To delete a datasource, issue a DELETE request as follows:

curl -XDELETE 'http://localhost:9200/_siren/connector/datasource/<id>'

Permissions: To allow the deletion of a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/delete is granted by the security system.

Datasource listing

To list the datasources configured in the system, issue a GET request as follows:

curl -XGET 'http://localhost:9200/_siren/connector/datasource/_search?type=federate'

Permissions: To allow the listing of datasources, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/search is granted by the security system.

The parameter type is optional and allows to filter datasource results to a specific type: either federate, elastic, or jdbc.

Datasource validation

To validate the connection to a datasource, issue a POST request as follows:

curl -XPOST 'http://localhost:9200/_siren/connector/datasource/<id>/_validate'

Permissions: To allow the validation of a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/validate is granted by the security system.

Datasource catalog metadata

To get the metadata related to a datasource connection catalog, issue a POST request as follows:

curl -XPOST 'http://localhost:9200/_siren/connector/datasource/<id>/_metadata?catalog=<catalog>&schema=<schema>'

The parameters are:

  • catalog: The name of the catalog,

  • schema: The name of the schema.

The parameters catalog and schema are optionals:

  • If no catalog parameters is given, the API returns the catalog list.

  • If no schema parameters is given, then the catalog parameter must be provided.

The API returns the schema list for the given catalog.

The result is a JSON document which contains the resource list for the given catalog and schema.

{
  "_id": "postgres",
  "found": true,
  "catalogs": [
    {
      "name": "connector",
      "schemas": [
        {
          "name": "public",
          "resources": [
            {
              "name": "emojis"
            },
            {
              "name": "Player"
            },
            {
              "name": "Matches"
            },
            {
              "name": "ingestion_testing"
            }
          ]
        }
      ]
    }
  ]
}

Permissions: To allow the retrieval of the metadata of a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/metadata is granted by the security system.

Datasource field metadata

To get the field metadata related to a datasource connection resource (a table), issue a POST request as follows:

curl -XPOST 'http://localhost:9200/_siren/connector/datasource/<id>/_resource_metadata?catalog=<catalog>&schema=<schema>&resource=<resource>'

The parameters are:

-catalog: The name of the catalog, -schema: The name of the schema, -resource: The name of the resource (table).

The result is a JSON document which contains the columns list for the given catalog, schema and resource. It contains also the name of the primary key if it exists.

{
  "_id": "postgres",
  "found": true,
  "columns": [
    {
      "name": "TEAM"
    },
    {
      "name": "ID"
    },
    {
      "name": "NAME"
    },
    {
      "name": "AGE"
    }
  ],
  "single_column_primary_keys": [
    {
      "name": "ID"
    }
  ]
}

Permissions: To allow the retrieval of the field metadata of a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/field-metadata is granted by the security system.

Datasource transform suggestions

To get a suggestion of a transform configuration that can be used by the ingestion, issue a POST request as follows:

curl -H 'Content-Type: application/json' -XPOST 'http://localhost:9200/_siren/connector/datasource/<id>/_transforms' -d '
{
    "query": "SELECT * FROM events"
}
'

It executes the query and returns a collection of transform operations based on the columns returned by the query.

{
  "_id": "postgres",
  "found": true,
  "transforms": [
    {
      "input": [
        {
          "source": "id"
        }
      ],
      "output": "id"
    },
    {
      "input": [
        {
          "source": "occurred"
        }
      ],
      "output": "occurred"
    },
    {
      "input": [
        {
          "source": "value"
        }
      ],
      "output": "value"
    },
    {
      "input": [
        {
          "source": "location"
        }
      ],
      "output": "location"
    }
  ]
}

Permissions: To suggest a transformation, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/suggest/transform is granted by the security system.

Virtual index API

In this section we present the API available to interact with the virtual indices.

Virtual index management

Virtual index creation and modification

A virtual index with a specific id can be updated by issuing a PUT request as follows:

curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_siren/connector/index/<id>'  -d '
{
  "datasource": "ds",
  "catalog": "catalog",
  "schema": "schema",
  "resource": "table",
  "key": "id",
  "search_fields": [
    {
     "function": "LIKE",
     "field": "NAME"
    }
  ]
}
'

The id of a virtual index must be a valid lowercase Elasticsearch index name; it is recommended to start virtual indices with a common prefix to simplify handling of permissions.

Body parameters:

  • datasource: the id of an existing Elasticsearch datasource.

  • resource: the name of a table or view on the remote datasource.

  • key: the name of a unique column; if a virtual index has no primary key it will be possible to perform aggregations, however queries that expect a reproducible unique identifier will not be possible.

  • catalog and schema: the catalog and schema containing the table specified in the resource parameter; these are usually required only if the connection does not specify a default catalog or schema.

  • search_fields: An optional list of field names that will be searched using the LIKE operator when processing queries. Currently only the LIKE function is supported.

Permissions: To create a virtual index, ensure that the index-level action indices:admin/federate/connector/put is granted by the security system.

Whenever a virtual index is created, the Siren Federate plugin creates a concrete Elasticsearch index with the same name as the virtual index, which would contain some metadata about the virtual index. When starting up, the Siren Federate plugin will check for missing concrete indices and will attempt to create them automatically. For more information, see Operations on virtual indices.

Virtual index deletion

To delete a virtual index, issue a DELETE request as follows:

curl -XDELETE 'http://localhost:9200/_siren/connector/index/<id>'

When a virtual index is deleted, the corresponding concrete index is not deleted automatically.

Permissions: To delete a virtual index, ensure that the index-level action indices:admin/federate/connector/delete is granted by the security system.

Virtual index listing

To list the virtual indices configured in the system, issue a GET request as follows:

curl -XGET 'http://localhost:9200/_siren/connector/index/_search'

Permissions: To list virtual indices, ensure that the index-level action indices:admin/federate/connector/search is granted by the security system.

Ingestion API

Permissions: To use the ingestion API, you must ensure that the following cluster-level actions are granted by the security system:

  • Delete: cluster:admin/federate/connector/ingestion/delete

  • Encryption: cluster:admin/federate/connector/ingestion/encrypt

  • Get: cluster:admin/federate/connector/ingestion/get

  • Put: cluster:admin/federate/connector/ingestion/put

  • Run: cluster:admin/federate/connector/ingestion/run

  • Search: cluster:admin/federate/connector/ingestion/search

  • Validate: cluster:admin/federate/connector/ingestion/validate

Ingestion management

The endpoint for ingestion management is at /_siren/connector/ingestion.

Datasource query sample

This method runs a query and returns an array of results and an Elasticsearch type for each column found.

curl -H 'Content-Type: application/json' -XPOST 'http://localhost:9200/_siren/connector/ingestion/<id>/_sample' -d '
{
  "query": "SELECT * FROM events",
  "row_limit": 10,
  "max_text_size": 100
}
'
{
  "_id": "valid",
  "found": true,
  "types": {
    "location": "keyword",
    "id": "long",
    "occurred": "date",
    "value": "long"
  },
  "results": [
    {
      "id": 0,
      "occurred": 1422806400000,
      "value": 1,
      "location": "Manila"
    },
    {
      "id": 1,
      "occurred": 1422806460000,
      "value": 5,
      "location": "Los Angeles"
    },
    {
      "id": 2,
      "occurred": 1422806520000,
      "value": 10,
      "location": "Pompilio"
    }
  ]
}

Permissions: To sample a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/ingestion/sample is granted by the security system.

Ingestion creation and modification

An ingestion with a specific id can be updated by issuing a PUT request as follows. An ingestion with a specific id can be created by issuing a PUT request. An ingestion can be safely updated by a PUT request due to the implementation of _seq_no and _primary_term fields which enables concurrent modification.

curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_siren/connector/ingestion/<id>' -d '
{
  "ingest": {
    "datasource": "postgres",
    "query": "select * from events {{#max_primary_key}}WHERE id>{{max_primary_key}}{{/max_primary_key}} limit 10000",
    "batch_size": 10,
    "schedule": "0 0 * * * ?",
    "enable_scheduler": true,
    "target": "events",
    "staging_prefix": "staging-index",
    "strategy": "REPLACE",
    "pk_field": "id",
    "mapping": {
      "properties": {
        "id": { "type": "long" },
        "value": { "type": "keyword" },
        "location": { "type": "text" },
        "geolocation": { "type": "geo_point" }
        }
    },
    "pipeline": {
      "processors": [
          {
            "set" : {
              "field": "foo",
              "value": "bar"
            }
          }
      ]
    },
    "transforms": [{
      "input": [{"source": "id"}],
      "output": "id",
       "mapping": {
        "type": "long"
      }
    },{
      "input": [
        {"source": "lat"},
        {"source": "lon"}
      ],
      "output": "geolocation",
      "transform": "geo_point",
      "mapping": {
        "type": "geo_point"
      }
    }],
    "ds_credentials": {
      "username": "user",
      "password": "pass"
    },
    "es_credentials": {
       "username": "user",
       "password": "pass"
    },
    "description": "description"
  }
}
'

Body parameters:

  • ingest: the properties of the ingestion.

Ingest configuration parameters:

  • datasource: the name of a datasource.

  • query: the template query passed to the JDBC driver collecting the record to ingest.

  • batch_size: An optional batch size (overriding the default global value).

  • schedule: An optional schedule using the cron syntax.

  • enable_schedule: enable or disable the scheduled execution.

  • target: A target Elasticsearch index name.

  • staging_prefix: An optional prefix for the staging Elasticsearch index.

  • strategy: An update strategy. It can be either INCREMENTAL or REPLACE.

  • pk_field: A primary key field name.

  • mapping: An Elasticsearch mapping definition.

  • pipeline: An optional pipeline configuration.

  • transforms: A sequence of transforms to map the fields declared by the query to the fields in the Elasticsearch mapping definition.

  • ds_credentials: An optional set of credentials used to connect to the database.

  • es_credentials: The optional credentials that will be used to perform Elasticsearch requests during jobs.

  • description: An optional description.

Strategy:

There are two available ingestion strategies:

  • INCREMENTAL: The index is created if it does not exist. The ingested records are inserted or updated in place.

  • REPLACE: The index name is an alias to a staging index. The ingested records are inserted on the staging index. When the ingestion is done the alias is moved from the previous staging index to the new one. If anything wrong happens the alias is untouched and points to the previous staging index.

Ingestion query:

The query defined in the ingestion configuration is written in the datasource language. The query can be written using mustache and the following variables are provided, if applicable, when converting the query to a string:

  • max_primary_key: the maximum value of the primary key in Elasticsearch.

  • last_record_timestamp: the UTC timestamp at which the last record was successfully processed by an ingestion job.

  • last_record: an object with the scalar values in the last record that was successfully processed by the ingestion job.

Mapping transform:

A mapping transform takes one or more fields from a datasource record as inputs and outputs a field that can be indexed with a valid Elasticsearch type.

A mapping transform has the following properties:

  • input: a sequence of inputs, where an input can be either the name of a field defined in the job query or the name of a field in the target Elasticsearch mapping.

  • transform: the name of a predefined function that takes as input the values of the fields specified in the input parameter and the mapping properties of the target Elasticsearch field. The function outputs the value to be indexed; if transform is not set, the system uses a generic cast function to create the output.

  • output: the name of the target Elasticsearch field.

Input:

The input structure must provide one of the following properties:

  • source: the name of a field defined in the job query.

  • target: the name of a field in the target Elasticsearch mapping.

Transforms (“predefined functions”):

  • copy: a default cast transform that produces a scalar Elasticsearch value in a way analogous to how the connector already translates JDBC types to Elasticsearch types. If the JDBC driver reports array fields / struct fields correctly, they will be written as Elasticsearch arrays. Any JDBC type that is not supported / not recognized causes an exception.

  • geo_point: transform that produces a geo_point value from two numerical inputs, where the first is the latitude and the second the longitude.

  • array: an array transform that produces an array Elasticsearch value from a comma separated string field in a record.

Credential parameters (for ElasticSearch or the JDBC database):

If the user does not have the permission to manage datasources in the cluster these credentials are mandatory.

  • username: the login to use to connect to the resource.

  • password: the password to use to connect to the resource.

When updating the ingestion properties, if there was already a password, you have to pass it again. You can either pass the new clear password, or remove the password property to keep the previous one.

Ingestion retrieval

The ingestion properties can be retrieved by issuing a GET request as follows:

curl -XGET 'http://localhost:9200/_siren/connector/ingestion/<id>'

Ingestion deletion

To delete an ingestion, issue a DELETE request as follows:

curl -XDELETE 'http://localhost:9200/_siren/connector/ingestion/<id>'

Ingestion listing

To list the ingestions configured in the system, issue a GET request as follows:

curl -XGET 'http://localhost:9200/_siren/connector/ingestion/_all?status=[false|true]&detailed=[false|true]'

curl -XGET 'http://localhost:9200/_siren/connector/ingestion/_search' API has been deprecated and is scheduled to be removed in next major release.

If the optional status parameter is set to true, it also returns the last job status, and the last job log.

If the optional detailed parameter(true by default) is set to false, then mapping, pipeline, transforms and removed_fields are not returned.

Ingestion validation

To validate the connection to an ingestion, issue a POST request as follows:

curl -XPOST 'http://localhost:9200/_siren/connector/ingestion/<id>/_validate'
Run an ingestion job

To execute an ingestion job, issue a POST request as follows:

curl -XPOST 'http://localhost:9200/_siren/connector/ingestion/<id>/_run'

The response returns the jobId that can be use to track the status of the running job:

{
  "_id": "postgres-events",
  "_version": 49,
  "found": true,
  "jobId": "postgres-events"
}

Job API

The job API provides methods for managing running jobs and retrieving status about previous executions.

Permissions: To use the job API, you must ensure that the following cluster-level actions are granted by the security system:

  • Abort: cluster:admin/federate/connector/jobs/abort

  • Get: cluster:admin/federate/connector/jobs/get

  • Running jobs: cluster:admin/federate/connector/jobs/running/get

  • Job log: cluster:admin/federate/connector/jobs/log/search

Job management

The endpoint for job management is at /_siren/connector/jobs.

Running jobs statuses

The status of all running jobs can be retrieved by issuing a GET request as follows:

curl -XGET 'http://localhost:9200/_siren/connector/jobs/<type>'

The possible type value is:

  • ingestion: This type is related to the ingestion jobs.

Running job status

The status of a job can be retrieved by issuing a GET request as follows:

curl -XGET 'http://localhost:9200/_siren/connector/jobs/<type>/<id>'

This API provides the status of the current running job if there is any, or the status of the last execution.

Body parameters:

  • status: the status of the job.

Status parameters:

  • id: the id of the job.

  • is_running: a boolean value indicating if the job is running.

  • is_aborting: an optional boolean value which indicates that the job is aborting.

  • start_time: a timestamp with the starting time of the job.

  • end_time: a timestamp with the ending time of the job.

  • infos: textual information.

  • error: an optional sequence of error messages.

  • state: the current state of the job.

  • count: the total number of processed records.

  • last_id: the optional last known value of the primary key column.

Possible state values:

  • running: the job is running.

  • aborting: the job is aborting due to the user request.

  • aborted: the job has been aborted.

  • error: the job failed with an error.

  • successful: the job was completed successfully.

JSON representation while a job is running:

{
  "_id": "postgres-events",
  "type": "ingestion",
  "found": true,
  "status": {
    "version": 1,
    "id": "postgres-events",
    "is_running": true,
    "start_time": 1538731228589,
    "infos": "The job is running.",
    "state": "running",
    "count": 3459,
    "last_id": "2289"
  }
}

JSON representation of a successfully completed job:

{
  "_id": "postgres-events",
  "type": "ingestion",
  "found": true,
  "status": {
    "version": 1,
    "id": "postgres-events",
    "is_running": false,
    "start_time": 1538733893554,
    "end_time": 1538733911829,
    "infos": "The job is done.",
    "state": "successful",
    "count": 10000,
    "last_id": "12219"
  }
}

JSON representation of a job who failed due to an error:

{
  "_id": "postgres-events",
  "type": "ingestion",
  "found": true,
  "status": {
    "version": 1,
    "id": "postgres-events",
    "is_running": false,
    "start_time": 1538730949766,
    "end_time": 1538730961293,
    "infos": "The job has failed.",
    "error": [
      "Could not execute datasource query [postgres].",
      "Failed to initialize pool: The connection attempt failed.",
      "The connection attempt failed.",
      "connect timed out"
    ],
    "state": "error",
    "count": 0
  }
}

Cancelling a running job

This API provides a method to cancel a running job.

curl -XPOST 'http://localhost:9200/_siren/connector/jobs/ingestion/<id>/_abort'
{
  "_id": "postgres-events",
  "type": "ingestion",
  "found": true,
  "status": {
    "version": 1,
    "id": "postgres-events",
    "is_running": false,
    "is_aborting": true,
    "start_time": 1538733800993,
    "end_time": 1538733805318,
    "infos": "The job has been aborted.",
    "state": "aborted",
    "count": 2220,
    "last_id": "2219"
  }
}

Searching on the job log

This API provides a method to retrieve the status of completed jobs. It is possible to pass parameters to filter the results.

curl -XGET 'http://localhost:9200/_siren/connector/jobs/_search'

Possible filter parameters:

  • start_time_from: jobs which start time is greater than or equal to the passed value.

  • start_time_to: jobs which start time is lower than or equal to the passed value.

  • type: a filter on the job type.

  • state: the state of the job status. See the job status description to get a list of possible values.

  • id: the id of the job.

Request and result example:

curl -XGET 'http://localhost:9200/_siren/connector/jobs/_search?type=ingestion&id=postgresevents&start_time_to=1539192173232'
{
  "hits": {
    "total": 1,
    "hits": [
      {
        "_id": "postgresevents11e247fa-ccb1-11e8-ad75-c293294ec513",
        "_source": {
          "ingestion": {
            "version": 1,
            "id": "postgresevents",
            "is_running": false,
            "start_time": 1539192150699,
            "end_time": 1539192151612,
            "infos": "The job is done.",
            "state": "successful",
            "count": 0
          }
        }
      }
    ]
  }
}