Connector APIs

In this section we present the APIs available to interact with datasources, virtual indices, ingestion jobs.

Permissions: To use the APIs listed in this section, ensure that the cluster-level wildcard action cluster:internal/federate/* is granted by the security system.

Datasource API

In this section we present the API available to interact with datasources.

We currently supports two types of datasources:

  • JDBC to connect to any datasource providing a JDBC driver;

  • Elasticsearch to connect to an Elasticsearch remote clusters.

Datasource management

The endpoint for datasource management is at /_siren/connector/datasource.

Datasource creation and modification

A datasource with a specific id can be created by issuing a PUT request. The body of the request varies with the type of the datasource. A datasource cannot be safely updated by using a PUT request due to a lack of concurrency control. By default, it is not allowed to update an existing document.

Permissions: To create a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/put is granted by the security system.

JDBC datasource
PUT /_siren/connector/datasource/<id>
{
  "jdbc": {
    "username": "username",
    "password": "password",
    "driver": "com.db.Driver",
    "url": "jdbc:db://localhost:5432/default",
    "properties": {
      "ssl": true
    }
  }
}

JDBC configuration parameters:

  • driver: the class name of the JDBC driver.

  • url: the JDBC url of the datasource.

  • username: the username that will be passed to the JDBC driver when getting a connection (optional).

  • password: the password that will be passed to the JDBC driver when getting a connection (optional).

  • timezone: if date and timestamp fields are stored in a timezone other than UTC, specifying this parameter will instruct the plugin to convert dates and times to/from the specified timezone when performing queries and retrieving results.

  • properties: a map of JDBC properties to be set when initializing a connection.

When updating the datasource, if there was already a password, you have to pass it again. You can either pass the new clear password, or remove the password property to keep the previous one.

Elasticsearch datasource
PUT /_siren/connector/datasource/<id>
{
  "elastic": {
    "alias": "remotename"
  }
}

Elasticsearch configuration parameters:

  • alias: the name of the configured cluster alias in the remote cluster configuration.

Datasource retrieval

The datasource configuration can be retrieved by issuing a GET request as follows:

GET /_siren/connector/datasource/<id>

If you want to update the datasource and keep the current password, just remove the "password" property.

Permissions: To allow the retrieval of a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/get is granted by the security system.

Datasource deletion

To delete a datasource, issue a DELETE request as follows:

DELETE /_siren/connector/datasource/<id>

Permissions: To allow the deletion of a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/delete is granted by the security system.

Datasource listing

To list the datasources configured in the system, issue a GET request as follows:

GET /_siren/connector/datasource/_search

Permissions: To allow the listing of datasources, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/search is granted by the security system.

Datasource validation

To validate the connection to a datasource, issue a POST request as follows:

POST /_siren/connector/datasource/<id>/_validate

Permissions: To allow the validation of a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/validate is granted by the security system.

Datasource catalog metadata

To get the metadata related to a datasource connection catalog, issue a POST request as follows:

POST /_siren/connector/datasource/<id>/_metadata?catalog=<catalog>&schema=<schema>

The parameters are:

-catalog: The name of the catalog, -schema: The name of the schema.

The parameters catalog and schema are optionals: - If no catalog parameters is given, the API returns the catalog list. - If no schema parameters is given, then the catalog parameter must be provided. The API returns the schema list for the given catalog.

The result is a JSON document which contains the resource list for the given catalog and schema.

{
  "_id": "postgres",
  "found": true,
  "catalogs": [
    {
      "name": "connector",
      "schemas": [
        {
          "name": "public",
          "resources": [
            {
              "name": "emojis"
            },
            {
              "name": "Player"
            },
            {
              "name": "Matches"
            },
            {
              "name": "ingestion_testing"
            }
          ]
        }
      ]
    }
  ]
}

Permissions: To allow the retrieval of the metadata of a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/metadata is granted by the security system.

Datasource field metadata

To get the field metadata related to a datasource connection resource (a table), issue a POST request as follows:

POST /_siren/connector/datasource/<id>/_resource_metadata?catalog=<catalog>&schema=<schema>&resource=<resource>

The parameters are:

-catalog: The name of the catalog, -schema: The name of the schema, -resource: The name of the resource (table).

The result is a JSON document which contains the columns list for the given catalog, schema and resource. It contains also the name of the primary key if it exists.

{
  "_id": "postgres",
  "found": true,
  "columns": [
    {
      "name": "TEAM"
    },
    {
      "name": "ID"
    },
    {
      "name": "NAME"
    },
    {
      "name": "AGE"
    }
  ],
  "single_column_primary_keys": [
    {
      "name": "ID"
    }
  ]
}

Permissions: To allow the retrieval of the field metadata of a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/field-metadata is granted by the security system.

Datasource query sample

This method runs a query and returns an array of results and an Elasticsearch type for each column found.

POST _siren/connector/datasource/<id>/_sample
{
  "query": "SELECT * FROM events",
  "row_limit": 10,
  "max_text_size": 100
}
{
  "_id": "valid",
  "found": true,
  "types": {
    "location": "keyword",
    "id": "long",
    "occurred": "date",
    "value": "long"
  },
  "results": [
    {
      "id": 0,
      "occurred": 1422806400000,
      "value": 1,
      "location": "Manila"
    },
    {
      "id": 1,
      "occurred": 1422806460000,
      "value": 5,
      "location": "Los Angeles"
    },
    {
      "id": 2,
      "occurred": 1422806520000,
      "value": 10,
      "location": "Pompilio"
    }
  ]
}

Permissions: To sample a datasource, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/sample is granted by the security system.

Datasource transform suggestions

To get a suggestion of a transform configuration that can be used by the ingestion, issue a POST request as follows:

POST /_siren/connector/datasource/<id>/_transforms
{
    "query": "SELECT * FROM events"
}

It executes the query and returns a collection of transform operations based on the columns returned by the query.

{
  "_id": "postgres",
  "found": true,
  "transforms": [
    {
      "input": [
        {
          "source": "id"
        }
      ],
      "output": "id"
    },
    {
      "input": [
        {
          "source": "occurred"
        }
      ],
      "output": "occurred"
    },
    {
      "input": [
        {
          "source": "value"
        }
      ],
      "output": "value"
    },
    {
      "input": [
        {
          "source": "location"
        }
      ],
      "output": "location"
    }
  ]
}

Permissions: To suggest a transformation, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/suggest/transform is granted by the security system.

Datasource type list

To get a list of supported connectors, issue a GET request as follows:

GET /_siren/connector/datasource
{
  "MySQL": {
    "driverClassName": "com.mysql.jdbc.Driver",
    "defaultURL": "jdbc:mysql://{{host}}:{{port}}{{databasename}}?useLegacyDatetimeCode=false&useCursorFetch=true",
    "defaultPort": 3306,
    "defaultQuery": "SELECT 1 AS N",
    "disclaimer": "This is a sample connection string, see the <a target=\"_blank\" rel="noopener noreferrer" href=\"https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference.html\">MySQL Connector/J documentation</a> for further information.",
    "virtualIndexSupported": true,
    "ingestionSupported": true
  },
  "PostgreSQL": {
    "driverClassName": "org.postgresql.Driver",
    "defaultURL": "jdbc:postgresql://{{host}}:{{port}}{{databasename}}",
    "defaultPort": 5432,
    "defaultQuery": "SELECT 1 AS N",
    "disclaimer": "This is a sample connection string, see the <a target=\"_blank\" rel="noopener noreferrer" href=\"https://jdbc.postgresql.org/documentation/94/connect.html\">PostgreSQL JDBC documentation</a> for further information.",
    "virtualIndexSupported": true,
    "ingestionSupported": true
  }
}

Permissions: To get the type list, you must ensure that the cluster-level action cluster:admin/federate/connector/datasource/types is granted by the security system.

Datasource driver list

To get a list of installed drivers, issue a GET request as follows:

GET /_siren/connector/datasource/_drivers

The result is a JSON document which contains the drivers list for each node part of the cluster.

The nodes are identified by their IDs. Use those IDs with the cluster API to get more information about the nodes.

The returned information for each driver is:

  • key: The name of the JAVA class,

  • majorVersion: The major version of the driver,

  • minorVersion: The minor version of the driver,

  • jdbcCompliant: Indicates if the driver is fully JDBC compliant.

  • infos: The result of the "toString()" methods.

If any failure happened, the "failures" part of the JSON results contains the error message related to the failing node(s).

{
  "nodes": {
    "4PdaJD0BRd2lw47U6OAzog": {
      "org.apache.calcite.avatica.remote.Driver": {
        "majorVersion": 1,
        "minorVersion": 19,
        "jdbcCompliant": true,
        "infos": "org.apache.calcite.avatica.remote.Driver@6287f65f"
      },
      "org.apache.calcite.jdbc.Driver": {
        "majorVersion": 1,
        "minorVersion": 19,
        "jdbcCompliant": true,
        "infos": "org.apache.calcite.jdbc.Driver@1e592d50"
      }
    },
    "A4T54CAbT5qJgwzqtBvqIA": {
      "org.apache.calcite.avatica.remote.Driver": {
        "majorVersion": 1,
        "minorVersion": 19,
        "jdbcCompliant": true,
        "infos": "org.apache.calcite.avatica.remote.Driver@6287f65f"
      },
      "org.apache.calcite.jdbc.Driver": {
        "majorVersion": 1,
        "minorVersion": 19,
        "jdbcCompliant": true,
        "infos": "org.apache.calcite.jdbc.Driver@1e592d50"
      }
    },
    "OSYuK1ABQ0OC-XACY51ulA": {
      "org.apache.calcite.avatica.remote.Driver": {
        "majorVersion": 1,
        "minorVersion": 19,
        "jdbcCompliant": true,
        "infos": "org.apache.calcite.avatica.remote.Driver@6287f65f"
      },
      "org.apache.calcite.jdbc.Driver": {
        "majorVersion": 1,
        "minorVersion": 19,
        "jdbcCompliant": true,
        "infos": "org.apache.calcite.jdbc.Driver@1e592d50"
      }
    }
  },
  "failures": {
    "t5eC5E1WTDCrTai5I5Tk-g": "[node_sc4][127.0.0.1:65429][cluster:admin/federate/connector/datasource/drivers/child] disconnected"
  }
}

Permissions: To get the driver list, you must ensure that the cluster-level actions cluster:admin/federate/connector/datasource/drivers and cluster:admin/federate/connector/datasource/drivers/child are granted by the security system.

Virtual index API

In this section we present the API available to interact with the virtual indices.

Virtual index management

Virtual index creation and modification

A virtual index with a specific id can be updated by issuing a PUT request as follows:

PUT /_siren/connector/index/<id>
{
  "datasource": "ds",
  "catalog": "catalog",
  "schema": "schema",
  "resource": "table",
  "key": "id",
  "search_fields": [
    {
     "function": "LIKE",
     "field": "NAME"
    }
  ]
}

The id of a virtual index must be a valid lowercase Elasticsearch index name; it is recommended to start virtual indices with a common prefix to simplify handling of permissions.

Body parameters:

  • datasource: the id of an existing datasource.

  • resource: the name of a table or view on the remote datasource.

  • key: the name of a unique column; if a virtual index has no primary key it will be possible to perform aggregations, however queries that expect a reproducible unique identifier will not be possible.

  • catalog and schema: the catalog and schema containing the table specified in the resource parameter; these are usually required only if the connection does not specify a default catalog or schema.

  • search_fields: An optional list of field names that will be searched using the LIKE operator when processing queries. Currently only the LIKE function is supported.

Permissions: To create a virtual index, ensure that the index-level action indices:admin/federate/connector/put is granted by the security system.

Virtual index deletion

To delete a virtual index, issue a DELETE request as follows:

DELETE /_siren/connector/index/<id>

When a virtual index is deleted, the corresponding concrete index is not deleted automatically.

Permissions: To delete a virtual index, ensure that the index-level action indices:admin/federate/connector/delete is granted by the security system.

Virtual index listing

To list the virtual indices configured in the system, issue a GET request as follows:

GET /_siren/connector/index/_search

Permissions: To list virtual indices, ensure that the index-level action indices:admin/federate/connector/search is granted by the security system.

Ingestion API

Permissions: To use the ingestion API, you must ensure that the following cluster-level actions are granted by the security system:

  • Delete: cluster:admin/federate/connector/ingestion/delete

  • Encryption: cluster:admin/federate/connector/ingestion/encrypt

  • Get: cluster:admin/federate/connector/ingestion/get

  • Put: cluster:admin/federate/connector/ingestion/put

  • Run: cluster:admin/federate/connector/ingestion/run

  • Search: cluster:admin/federate/connector/ingestion/search

  • Validate: cluster:admin/federate/connector/ingestion/validate

Ingestion management

The endpoint for ingestion management is at /_siren/connector/ingestion.

Ingestion creation and modification

An ingestion with a specific id can be updated by issuing a PUT request as follows. An ingestion with a specific id can be created by issuing a PUT request. An ingestion can be safely updated by a PUT request due to the implementation of _seq_no and _primary_term fields which enables concurrent modification.

PUT _siren/connector/ingestion/<id>
{
  "ingest": {
    "datasource": "postgres",
    "query": "select * from events {{#max_primary_key}}WHERE id>{{max_primary_key}}{{/max_primary_key}} limit 10000",
    "batch_size": 10,
    "schedule": "0 0 * * * ?",
    "enable_scheduler": true,
    "target": "events",
    "staging_prefix": "staging-index",
    "strategy": "REPLACE",
    "pk_field": "id",
    "mapping": {
      "properties": {
        "id": { "type": "long" },
        "value": { "type": "keyword" },
        "location": { "type": "text" },
        "geolocation": { "type": "geo_point" }
        }
    },
    "pipeline": {
      "processors": [
          {
            "set" : {
              "field": "foo",
              "value": "bar"
            }
          }
      ]
    },
    "transforms": [{
      "input": [{"source": "id"}],
      "output": "id",
       "mapping": {
        "type": "long"
      }
    },{
      "input": [
        {"source": "lat"},
        {"source": "lon"}
      ],
      "output": "geolocation",
      "transform": "geo_point",
      "mapping": {
        "type": "geo_point"
      }
    }],
    "ds_credentials": {
      "username": "user",
      "password": "pass"
    },
    "es_credentials": {
       "username": "user",
       "password": "pass"
    },
    "description": "description"
  }
}

Body parameters:

  • ingest: the properties of the ingestion.

Ingest configuration parameters:

  • datasource: the name of a datasource.

  • query: the template query passed to the JDBC driver collecting the record to ingest.

  • batch_size: An optional batch size (overriding the default global value).

  • schedule: An optional schedule using the cron syntax.

  • enable_schedule: enable or disable the scheduled execution.

  • target: A target Elasticsearch index name.

  • staging_prefix: An optional prefix for the staging Elasticsearch index.

  • strategy: An update strategy. It can be either INCREMENTAL or REPLACE.

  • pk_field: A primary key field name.

  • mapping: An Elasticsearch mapping definition.

  • pipeline: An optional pipeline configuration.

  • transforms: A sequence of transforms to map the fields declared by the query to the fields in the Elasticsearch mapping definition.

  • ds_credentials: An optional set of credentials used to connect to the database.

  • es_credentials: The optional credentials that will be used to perform Elasticsearch requests during jobs.

  • description: An optional description.

Strategy:

There are two available ingestion strategies:

  • INCREMENTAL: The index is created if it does not exist. The ingested records are inserted or updated in place.

  • REPLACE: The index name is an alias to a staging index. The ingested records are inserted on the staging index. When the ingestion is done the alias is moved from the previous staging index to the new one. If anything wrong happens the alias is untouched and points to the previous staging index.

Ingestion query:

The query defined in the ingestion configuration is written in the datasource language. The query can be written using mustache and the following variables are provided, if applicable, when converting the query to a string:

  • max_primary_key: the maximum value of the primary key in Elasticsearch.

  • last_record_timestamp: the UTC timestamp at which the last record was successfully processed by an ingestion job.

  • last_record: an object with the scalar values in the last record that was successfully processed by the ingestion job.

Mapping transform:

A mapping transform takes one or more fields from a datasource record as inputs and outputs a field that can be indexed with a valid Elasticsearch type.

A mapping transform has the following properties:

  • input: a sequence of inputs, where an input can be either the name of a field defined in the job query or the name of a field in the target Elasticsearch mapping.

  • transform: the name of a predefined function that takes as input the values of the fields specified in the input parameter and the mapping properties of the target Elasticsearch field. The function outputs the value to be indexed; if transform is not set, the system uses a generic cast function to create the output.

  • output: the name of the target Elasticsearch field.

Input:

The input structure must provide one of the following properties:

  • source: the name of a field defined in the job query.

  • target: the name of a field in the target Elasticsearch mapping.

Transforms (“predefined functions”):

  • copy: a default cast transform that produces a scalar Elasticsearch value in a way analogous to how the connector already translates JDBC types to Elasticsearch types. If the JDBC driver reports array fields / struct fields correctly, they will be written as Elasticsearch arrays. Any JDBC type that is not supported / not recognized causes an exception.

  • geo_point: transform that produces a geo_point value from two numerical inputs, where the first is the latitude and the second the longitude.

  • array: an array transform that produces an array Elasticsearch value from a comma separated string field in a record.

Credential parameters (for ElasticSearch or the JDBC database):

If the user does not have the permission to manage datasources in the cluster these credentials are mandatory.

  • username: the login to use to connect to the resource.

  • password: the password to use to connect to the resource.

When updating the ingestion properties, if there was already a password, you have to pass it again. You can either pass the new clear password, or remove the password property to keep the previous one.

Ingestion retrieval

The ingestion properties can be retrieved by issuing a GET request as follows:

GET /_siren/connector/ingestion/<id>

Ingestion deletion

To delete an ingestion, issue a DELETE request as follows:

DELETE /_siren/connector/ingestion/<id>

Ingestion listing

To list the ingestions configured in the system, issue a GET request as follows:

GET _siren/connector/ingestion/_all?status=[false|true]&detailed=[false|true]

GET _siren/connector/ingestion/_search API has been deprecated and is scheduled to be removed in next major release.

If the optional status parameter is set to true, it also returns the last job status, and the last job log.

If the optional detailed parameter(true by default) is set to false, then mapping, pipeline, transforms and removed_fields are not returned.

Ingestion validation

To validate the connection to an ingestion, issue a POST request as follows:

POST _siren/connector/ingestion/<id>/_validate
Run an ingestion job

To execute an ingestion job, issue a POST request as follows:

POST _siren/connector/ingestion/<id>/_run

The response returns the jobId that can be use to track the status of the running job:

{
  "_id": "postgres-events",
  "_version": 49,
  "found": true,
  "jobId": "postgres-events"
}

Job API

The job API provides methods for managing running jobs and retrieving status about previous executions.

Permissions: To use the job API, you must ensure that the following cluster-level actions are granted by the security system:

  • Abort: cluster:admin/federate/connector/jobs/abort

  • Get: cluster:admin/federate/connector/jobs/get

  • Running jobs: cluster:admin/federate/connector/jobs/running/get

  • Job log: cluster:admin/federate/connector/jobs/log/search

Job management

The endpoint for job management is at /_siren/connector/jobs.

Running jobs statuses

The status of all running jobs can be retrieved by issuing a GET request as follows:

GET _siren/connector/jobs/<type>

The possible type value is:

  • ingestion: This type is related to the ingestion jobs.

Running job status

The status of a job can be retrieved by issuing a GET request as follows:

GET _siren/connector/jobs/<type>/<id>

This API provides the status of the current running job if there is any, or the status of the last execution.

Body parameters:

  • status: the status of the job.

Status parameters:

  • id: the id of the job.

  • is_running: a boolean value indicating if the job is running.

  • is_aborting: an optional boolean value which indicates that the job is aborting.

  • start_time: a timestamp with the starting time of the job.

  • end_time: a timestamp with the ending time of the job.

  • infos: textual information.

  • error: an optional sequence of error messages.

  • state: the current state of the job.

  • count: the total number of processed records.

  • last_id: the optional last known value of the primary key column.

Possible state values:

  • running: the job is running.

  • aborting: the job is aborting due to the user request.

  • aborted: the job has been aborted.

  • error: the job failed with an error.

  • successful: the job was completed successfully.

JSON representation while a job is running:

{
  "_id": "postgres-events",
  "type": "ingestion",
  "found": true,
  "status": {
    "version": 1,
    "id": "postgres-events",
    "is_running": true,
    "start_time": 1538731228589,
    "infos": "The job is running.",
    "state": "running",
    "count": 3459,
    "last_id": "2289"
  }
}

JSON representation of a successfully completed job:

{
  "_id": "postgres-events",
  "type": "ingestion",
  "found": true,
  "status": {
    "version": 1,
    "id": "postgres-events",
    "is_running": false,
    "start_time": 1538733893554,
    "end_time": 1538733911829,
    "infos": "The job is done.",
    "state": "successful",
    "count": 10000,
    "last_id": "12219"
  }
}

JSON representation of a job who failed due to an error:

{
  "_id": "postgres-events",
  "type": "ingestion",
  "found": true,
  "status": {
    "version": 1,
    "id": "postgres-events",
    "is_running": false,
    "start_time": 1538730949766,
    "end_time": 1538730961293,
    "infos": "The job has failed.",
    "error": [
      "Could not execute datasource query [postgres].",
      "Failed to initialize pool: The connection attempt failed.",
      "The connection attempt failed.",
      "connect timed out"
    ],
    "state": "error",
    "count": 0
  }
}

Cancelling a running job

This API provides a method to cancel a running job.

POST _siren/connector/jobs/ingestion/<id>/_abort
{
  "_id": "postgres-events",
  "type": "ingestion",
  "found": true,
  "status": {
    "version": 1,
    "id": "postgres-events",
    "is_running": false,
    "is_aborting": true,
    "start_time": 1538733800993,
    "end_time": 1538733805318,
    "infos": "The job has been aborted.",
    "state": "aborted",
    "count": 2220,
    "last_id": "2219"
  }
}

Searching on the job log

This API provides a method to retrieve the status of completed jobs. It is possible to pass parameters to filter the results.

GET _siren/connector/jobs/_search

Possible filter parameters:

  • start_time_from: jobs which start time is greater than or equal to the passed value.

  • start_time_to: jobs which start time is lower than or equal to the passed value.

  • type: a filter on the job type.

  • state: the state of the job status. See the job status description to get a list of possible values.

  • id: the id of the job.

Request and result example:

GET _siren/connector/jobs/_search?type=ingestion&id=postgresevents&start_time_to=1539192173232
{
  "hits": {
    "total": 1,
    "hits": [
      {
        "_id": "postgresevents11e247fa-ccb1-11e8-ad75-c293294ec513",
        "_source": {
          "ingestion": {
            "version": 1,
            "id": "postgresevents",
            "is_running": false,
            "start_time": 1539192150699,
            "end_time": 1539192151612,
            "infos": "The job is done.",
            "state": "successful",
            "count": 0
          }
        }
      }
    ]
  }
}