Connector APIs
In this section we present the APIs available to interact with datasources, virtual indices, ingestion jobs.
Datasource API
In this section we present the API available to interact with datasources.
We currently supports two types of datasources:
-
JDBC to connect to any datasource providing a JDBC driver;
-
Elasticsearch to connect to an Elasticsearch remote clusters.
Datasource management
The endpoint for datasource management is at /_siren/connector/datasource
.
Datasource creation and modification
A datasource with a specific id
can be created by issuing a PUT
request. The body of the request varies with the
type of the datasource. A datasource cannot be safely updated by using a PUT
request due to a lack of concurrency
control. By default, it is not allowed to update an existing document.
JDBC datasource
curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_siren/connector/datasource/<id>' -d '
{
"jdbc": {
"username": "username",
"password": "password",
"driver": "com.db.Driver",
"url": "jdbc:db://localhost:5432/default",
"properties": {
"ssl": true
}
}
}
'
JDBC configuration parameters:
-
driver
: the class name of the JDBC driver. -
url
: the JDBC url of the datasource. -
username
: the username that will be passed to the JDBC driver when getting a connection (optional). -
password
: the password that will be passed to the JDBC driver when getting a connection (optional). -
timezone
: if date and timestamp fields are stored in a timezone other than UTC, specifying this parameter will instruct the plugin to convert dates and times to/from the specified timezone when performing queries and retrieving results. -
properties
: a map of JDBC properties to be set when initializing a connection.
When updating the datasource, if there was already a password, you have to pass it again.
You can either pass the new clear password, or remove the password
property to keep the previous one.
Elasticsearch datasource
curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_siren/connector/datasource/<id>' -d '
{
"elastic": {
"alias": "remotename"
}
}
'
Elasticsearch configuration parameters:
-
alias
: the name of the configured cluster alias in the remote cluster configuration.
Datasource retrieval
The datasource configuration can be retrieved by issuing a GET
request as follows:
curl -XGET 'http://localhost:9200/_siren/connector/datasource/<id>'
If you want to update the datasource and keep the current password, just remove the "password" property.
Datasource deletion
To delete a datasource, issue a DELETE
request as follows:
curl -XDELETE 'http://localhost:9200/_siren/connector/datasource/<id>'
Datasource listing
To list the datasources configured in the system, issue a GET
request as follows:
curl -XGET 'http://localhost:9200/_siren/connector/datasource/_search'
Datasource validation
To validate the connection to a datasource, issue a POST
request as follows:
curl -XPOST 'http://localhost:9200/_siren/connector/datasource/<id>/_validate'
Datasource catalog metadata
To get the metadata related to a datasource connection catalog,
issue a POST
request as follows:
curl -XPOST 'http://localhost:9200/_siren/connector/datasource/<id>/_metadata?catalog=<catalog>&schema=<schema>'
The parameters are:
-catalog
: The name of the catalog,
-schema
: The name of the schema.
The parameters catalog
and schema
are optionals:
- If no catalog parameters is given, the API returns the catalog list.
- If no schema parameters is given, then the catalog parameter must be provided.
The API returns the schema list for the given catalog.
The result is a JSON document which contains the resource list for the given catalog and schema.
{
"_id": "postgres",
"found": true,
"catalogs": [
{
"name": "connector",
"schemas": [
{
"name": "public",
"resources": [
{
"name": "emojis"
},
{
"name": "Player"
},
{
"name": "Matches"
},
{
"name": "ingestion_testing"
}
]
}
]
}
]
}
Datasource field metadata
To get the field metadata related to a datasource connection resource (a table), issue a POST
request as follows:
curl -XPOST 'http://localhost:9200/_siren/connector/datasource/<id>/_resource_metadata?catalog=<catalog>&schema=<schema>&resource=<resource>'
The parameters are:
-catalog
: The name of the catalog,
-schema
: The name of the schema,
-resource
: The name of the resource (table).
The result is a JSON document which contains the columns list for the given catalog, schema and resource. It contains also the name of the primary key if it exists.
{
"_id": "postgres",
"found": true,
"columns": [
{
"name": "TEAM"
},
{
"name": "ID"
},
{
"name": "NAME"
},
{
"name": "AGE"
}
],
"single_column_primary_keys": [
{
"name": "ID"
}
]
}
Datasource query sample
This method runs a query and returns an array of results and an Elasticsearch type for each column found.
curl -H 'Content-Type: application/json' -XPOST 'http://localhost:9200/_siren/connector/datasource/<id>/_sample' -d '
{
"query": "SELECT * FROM events",
"row_limit": 10,
"max_text_size": 100
}
'
{
"_id": "valid",
"found": true,
"types": {
"location": "keyword",
"id": "long",
"occurred": "date",
"value": "long"
},
"results": [
{
"id": 0,
"occurred": 1422806400000,
"value": 1,
"location": "Manila"
},
{
"id": 1,
"occurred": 1422806460000,
"value": 5,
"location": "Los Angeles"
},
{
"id": 2,
"occurred": 1422806520000,
"value": 10,
"location": "Pompilio"
}
]
}
Datasource transform suggestions
To get a suggestion of a transform configuration that can be used by the ingestion, issue a POST
request as follows:
curl -H 'Content-Type: application/json' -XPOST 'http://localhost:9200/_siren/connector/datasource/<id>/_transforms' -d '
{
"query": "SELECT * FROM events"
}
'
It executes the query and returns a collection of transform operations based on the columns returned by the query.
{
"_id": "postgres",
"found": true,
"transforms": [
{
"input": [
{
"source": "id"
}
],
"output": "id"
},
{
"input": [
{
"source": "occurred"
}
],
"output": "occurred"
},
{
"input": [
{
"source": "value"
}
],
"output": "value"
},
{
"input": [
{
"source": "location"
}
],
"output": "location"
}
]
}
Datasource type list
To get a list of supported connectors, issue a GET
request as follows:
curl -XGET 'http://localhost:9200/_siren/connector/datasource'
{
"MySQL": {
"driverClassName": "com.mysql.jdbc.Driver",
"defaultURL": "jdbc:mysql://{{host}}:{{port}}{{databasename}}?useLegacyDatetimeCode=false&useCursorFetch=true",
"defaultPort": 3306,
"defaultQuery": "SELECT 1 AS N",
"disclaimer": "This is a sample connection string, see the <a target=\"_blank\" rel="noopener noreferrer" href=\"https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference.html\">MySQL Connector/J documentation</a> for further information.",
"virtualIndexSupported": true,
"ingestionSupported": true
},
"PostgreSQL": {
"driverClassName": "org.postgresql.Driver",
"defaultURL": "jdbc:postgresql://{{host}}:{{port}}{{databasename}}",
"defaultPort": 5432,
"defaultQuery": "SELECT 1 AS N",
"disclaimer": "This is a sample connection string, see the <a target=\"_blank\" rel="noopener noreferrer" href=\"https://jdbc.postgresql.org/documentation/94/connect.html\">PostgreSQL JDBC documentation</a> for further information.",
"virtualIndexSupported": true,
"ingestionSupported": true
}
}
Datasource driver list
To get a list of installed drivers, issue a GET
request as follows:
curl -XGET 'http://localhost:9200/_siren/connector/datasource/_drivers'
The result is a JSON document which contains the drivers list for each node part of the cluster.
The nodes are identified by their IDs. Use those IDs with the cluster API to get more information about the nodes.
The returned information for each driver is:
-
key
: The name of the JAVA class, -
majorVersion
: The major version of the driver, -
minorVersion
: The minor version of the driver, -
jdbcCompliant
: Indicates if the driver is fully JDBC compliant. -
infos
: The result of the "toString()" methods.
If any failure happened, the "failures" part of the JSON results contains the error message related to the failing node(s).
{
"nodes": {
"4PdaJD0BRd2lw47U6OAzog": {
"org.apache.calcite.avatica.remote.Driver": {
"majorVersion": 1,
"minorVersion": 19,
"jdbcCompliant": true,
"infos": "org.apache.calcite.avatica.remote.Driver@6287f65f"
},
"org.apache.calcite.jdbc.Driver": {
"majorVersion": 1,
"minorVersion": 19,
"jdbcCompliant": true,
"infos": "org.apache.calcite.jdbc.Driver@1e592d50"
}
},
"A4T54CAbT5qJgwzqtBvqIA": {
"org.apache.calcite.avatica.remote.Driver": {
"majorVersion": 1,
"minorVersion": 19,
"jdbcCompliant": true,
"infos": "org.apache.calcite.avatica.remote.Driver@6287f65f"
},
"org.apache.calcite.jdbc.Driver": {
"majorVersion": 1,
"minorVersion": 19,
"jdbcCompliant": true,
"infos": "org.apache.calcite.jdbc.Driver@1e592d50"
}
},
"OSYuK1ABQ0OC-XACY51ulA": {
"org.apache.calcite.avatica.remote.Driver": {
"majorVersion": 1,
"minorVersion": 19,
"jdbcCompliant": true,
"infos": "org.apache.calcite.avatica.remote.Driver@6287f65f"
},
"org.apache.calcite.jdbc.Driver": {
"majorVersion": 1,
"minorVersion": 19,
"jdbcCompliant": true,
"infos": "org.apache.calcite.jdbc.Driver@1e592d50"
}
}
},
"failures": {
"t5eC5E1WTDCrTai5I5Tk-g": "[node_sc4][127.0.0.1:65429][cluster:admin/federate/connector/datasource/drivers/child] disconnected"
}
}
Virtual index API
In this section we present the API available to interact with the virtual indices.
Virtual index management
Virtual index creation and modification
A virtual index with a specific id
can be updated by issuing a PUT
request as follows:
curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_siren/connector/index/<id>' -d '
{
"datasource": "ds",
"catalog": "catalog",
"schema": "schema",
"resource": "table",
"key": "id",
"search_fields": [
{
"function": "LIKE",
"field": "NAME"
}
]
}
'
The id of a virtual index must be a valid lowercase Elasticsearch index name; it is recommended to start virtual indices with a common prefix to simplify handling of permissions.
Body parameters:
-
datasource
: the id of an existing datasource. -
resource
: the name of a table or view on the remote datasource. -
key
: the name of a unique column; if a virtual index has no primary key it will be possible to perform aggregations, however queries that expect a reproducible unique identifier will not be possible. -
catalog
andschema
: the catalog and schema containing the table specified in theresource
parameter; these are usually required only if the connection does not specify a default catalog or schema. -
search_fields
: An optional list of field names that will be searched using the LIKE operator when processing queries. Currently only the LIKE function is supported.
Whenever a virtual index is created, the Siren Federate plugin creates a concrete Elasticsearch index with the same name as the virtual index, which would contain some metadata about the virtual index. When starting up, the Siren Federate plugin will check for missing concrete indices and will attempt to create them automatically.
Ingestion API
Ingestion management
The endpoint for ingestion management is at /_siren/connector/ingestion
.
Ingestion creation and modification
An ingestion with a specific id
can be updated by issuing a PUT
request as follows.
An ingestion with a specific id
can be created by issuing a PUT
request. An ingestion can be safely updated by
a PUT
request due to the implementation of _seq_no
and _primary_term
fields which enables concurrent modification.
curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_siren/connector/ingestion/<id>' -d '
{
"ingest": {
"datasource": "postgres",
"query": "select * from events {{#max_primary_key}}WHERE id>{{max_primary_key}}{{/max_primary_key}} limit 10000",
"batch_size": 10,
"schedule": "0 0 * * * ?",
"enable_scheduler": true,
"target": "events",
"staging_prefix": "staging-index",
"strategy": "REPLACE",
"pk_field": "id",
"mapping": {
"properties": {
"id": { "type": "long" },
"value": { "type": "keyword" },
"location": { "type": "text" },
"geolocation": { "type": "geo_point" }
}
},
"pipeline": {
"processors": [
{
"set" : {
"field": "foo",
"value": "bar"
}
}
]
},
"transforms": [{
"input": [{"source": "id"}],
"output": "id",
"mapping": {
"type": "long"
}
},{
"input": [
{"source": "lat"},
{"source": "lon"}
],
"output": "geolocation",
"transform": "geo_point",
"mapping": {
"type": "geo_point"
}
}],
"ds_credentials": {
"username": "user",
"password": "pass"
},
"es_credentials": {
"username": "user",
"password": "pass"
},
"description": "description"
}
}
'
Body parameters:
-
ingest
: the properties of the ingestion.
Ingest configuration parameters:
-
datasource
: the name of a datasource. -
query
: the template query passed to the JDBC driver collecting the record to ingest. -
batch_size
: An optional batch size (overriding the default global value). -
schedule
: An optional schedule using the cron syntax. -
enable_schedule
: enable or disable the scheduled execution. -
target
: A target Elasticsearch index name. -
staging_prefix
: An optional prefix for the staging Elasticsearch index. -
strategy
: An update strategy. It can be either INCREMENTAL or REPLACE. -
pk_field
: A primary key field name. -
mapping
: An Elasticsearch mapping definition. -
pipeline
: An optional pipeline configuration. -
transforms
: A sequence of transforms to map the fields declared by the query to the fields in the Elasticsearch mapping definition. -
ds_credentials
: An optional set of credentials used to connect to the database. -
es_credentials
: The optional credentials that will be used to perform Elasticsearch requests during jobs. -
description
: An optional description.
Strategy:
There are two available ingestion strategies:
-
INCREMENTAL
: The index is created if it does not exist. The ingested records are inserted or updated in place. -
REPLACE
: The index name is an alias to a staging index. The ingested records are inserted on the staging index. When the ingestion is done the alias is moved from the previous staging index to the new one. If anything wrong happens the alias is untouched and points to the previous staging index.
Ingestion query:
The query defined in the ingestion configuration is written in the datasource language. The query can be written using mustache and the following variables are provided, if applicable, when converting the query to a string:
-
max_primary_key
: the maximum value of the primary key in Elasticsearch. -
last_record_timestamp
: the UTC timestamp at which the last record was successfully processed by an ingestion job. -
last_record
: an object with the scalar values in the last record that was successfully processed by the ingestion job.
Mapping transform:
A mapping transform takes one or more fields from a datasource record as inputs and outputs a field that can be indexed with a valid Elasticsearch type.
A mapping transform has the following properties:
-
input
: a sequence of inputs, where an input can be either the name of a field defined in the job query or the name of a field in the target Elasticsearch mapping. -
transform
: the name of a predefined function that takes as input the values of the fields specified in the input parameter and the mapping properties of the target Elasticsearch field. The function outputs the value to be indexed; if transform is not set, the system uses a generic cast function to create the output. -
output
: the name of the target Elasticsearch field.
Input:
The input structure must provide one of the following properties:
-
source
: the name of a field defined in the job query. -
target
: the name of a field in the target Elasticsearch mapping.
Transforms (“predefined functions”):
-
copy
: a default cast transform that produces a scalar Elasticsearch value in a way analogous to how the connector already translates JDBC types to Elasticsearch types. If the JDBC driver reports array fields / struct fields correctly, they will be written as Elasticsearch arrays. Any JDBC type that is not supported / not recognized causes an exception. -
geo_point
: transform that produces a geo_point value from two numerical inputs, where the first is the latitude and the second the longitude. -
array
: an array transform that produces an array Elasticsearch value from a comma separated string field in a record.
Credential parameters (for ElasticSearch or the JDBC database):
If the user does not have the permission to manage datasources in the cluster these credentials are mandatory.
-
username
: the login to use to connect to the resource. -
password
: the password to use to connect to the resource.
When updating the ingestion properties, if there was already a password, you have to pass it again.
You can either pass the new clear password, or remove the password
property to keep the previous one.
Ingestion retrieval
The ingestion properties can be retrieved by issuing a GET
request as follows:
curl -XGET 'http://localhost:9200/_siren/connector/ingestion/<id>'
Ingestion deletion
To delete an ingestion, issue a DELETE
request as follows:
curl -XDELETE 'http://localhost:9200/_siren/connector/ingestion/<id>'
Ingestion listing
To list the ingestions configured in the system, issue a GET
request as follows:
curl -XGET 'http://localhost:9200/_siren/connector/ingestion/_all?status=[false|true]&detailed=[false|true]'
|
If the optional status parameter is set to true, it also returns the last job status, and the last job log.
If the optional detailed parameter(true by default) is set to false,
then mapping, pipeline, transforms and removed_fields
are not returned.
Ingestion validation
To validate the connection to an ingestion, issue a POST
request as follows:
curl -XPOST 'http://localhost:9200/_siren/connector/ingestion/<id>/_validate'
Run an ingestion job
To execute an ingestion job, issue a POST
request as follows:
curl -XPOST 'http://localhost:9200/_siren/connector/ingestion/<id>/_run'
The response returns the jobId that can be use to track the status of the running job:
{
"_id": "postgres-events",
"_version": 49,
"found": true,
"jobId": "postgres-events"
}
Job API
The job API provides methods for managing running jobs and retrieving status about previous executions.
Job management
The endpoint for job management is at /_siren/connector/jobs
.
Running jobs statuses
The status of all running jobs can be retrieved by issuing a GET
request as follows:
curl -XGET 'http://localhost:9200/_siren/connector/jobs/<type>'
The possible type value is:
-
ingestion: This type is related to the ingestion jobs.
Running job status
The status of a job can be retrieved by issuing a GET
request as follows:
curl -XGET 'http://localhost:9200/_siren/connector/jobs/<type>/<id>'
This API provides the status of the current running job if there is any, or the status of the last execution.
Body parameters:
-
status
: the status of the job.
Status parameters:
-
id
: the id of the job. -
is_running
: a boolean value indicating if the job is running. -
is_aborting
: an optional boolean value which indicates that the job is aborting. -
start_time
: a timestamp with the starting time of the job. -
end_time
: a timestamp with the ending time of the job. -
infos
: textual information. -
error
: an optional sequence of error messages. -
state
: the current state of the job. -
count
: the total number of processed records. -
last_id
: the optional last known value of the primary key column.
Possible state values:
-
running
: the job is running. -
aborting
: the job is aborting due to the user request. -
aborted
: the job has been aborted. -
error
: the job failed with an error. -
successful
: the job was completed successfully.
JSON representation while a job is running:
{
"_id": "postgres-events",
"type": "ingestion",
"found": true,
"status": {
"version": 1,
"id": "postgres-events",
"is_running": true,
"start_time": 1538731228589,
"infos": "The job is running.",
"state": "running",
"count": 3459,
"last_id": "2289"
}
}
JSON representation of a successfully completed job:
{
"_id": "postgres-events",
"type": "ingestion",
"found": true,
"status": {
"version": 1,
"id": "postgres-events",
"is_running": false,
"start_time": 1538733893554,
"end_time": 1538733911829,
"infos": "The job is done.",
"state": "successful",
"count": 10000,
"last_id": "12219"
}
}
JSON representation of a job who failed due to an error:
{
"_id": "postgres-events",
"type": "ingestion",
"found": true,
"status": {
"version": 1,
"id": "postgres-events",
"is_running": false,
"start_time": 1538730949766,
"end_time": 1538730961293,
"infos": "The job has failed.",
"error": [
"Could not execute datasource query [postgres].",
"Failed to initialize pool: The connection attempt failed.",
"The connection attempt failed.",
"connect timed out"
],
"state": "error",
"count": 0
}
}
Cancelling a running job
This API provides a method to cancel a running job.
curl -XPOST 'http://localhost:9200/_siren/connector/jobs/ingestion/<id>/_abort'
{
"_id": "postgres-events",
"type": "ingestion",
"found": true,
"status": {
"version": 1,
"id": "postgres-events",
"is_running": false,
"is_aborting": true,
"start_time": 1538733800993,
"end_time": 1538733805318,
"infos": "The job has been aborted.",
"state": "aborted",
"count": 2220,
"last_id": "2219"
}
}
Searching on the job log
This API provides a method to retrieve the status of completed jobs. It is possible to pass parameters to filter the results.
curl -XGET 'http://localhost:9200/_siren/connector/jobs/_search'
Possible filter parameters:
-
start_time_from
: jobs which start time is greater than or equal to the passed value. -
start_time_to
: jobs which start time is lower than or equal to the passed value. -
type
: a filter on the job type. -
state
: the state of the job status. See the job status description to get a list of possible values. -
id
: the id of the job.
Request and result example:
curl -XGET 'http://localhost:9200/_siren/connector/jobs/_search?type=ingestion&id=postgresevents&start_time_to=1539192173232'
{
"hits": {
"total": 1,
"hits": [
{
"_id": "postgresevents11e247fa-ccb1-11e8-ad75-c293294ec513",
"_source": {
"ingestion": {
"version": 1,
"id": "postgresevents",
"is_running": false,
"start_time": 1539192150699,
"end_time": 1539192151612,
"infos": "The job is done.",
"state": "successful",
"count": 0
}
}
}
]
}
}