The data hub can use Elasticsearch as a DataStore, in place of or in conjunction with a traditional database.
As well as supporting the standard data hub pattern, the ElasticsearchDataStore supports the export pattern, which maintains the Elasticsearch index without version history, without the additional standard fields, and with real deletes rather than soft deletes. This can be useful when creating an Elasticsearch index for consumption by analysis or search tools.
You will need to set field mappings on the Elasticsearch index.
As a minimum:
Mapping may not be required if using export mode.
The Elasticsearch interface requires different properties than a normal database. Since Elasticsearch would often be used alongside traditional databases, it is likely to need a property prefix, which in the example below is "elastic".
elastic.url=http://localhost:9200
elastic.user=admin
elastic.password=admin
elastic.prefix=yourcompany
elastic.commitFrequency=1000
The url identifies the Elasticsearch instance, and user and password provide basic authentication credentials.
prefix is used as a prefix for the indexes - it would
typically be the same as the instance name or company name.
commitFrequency controls how frequently changes are
committed to the index. Use a relatively high value for
Elasticsearch. The default value of 1 will work, and may be
necessary in very rare situations where a table cross-refers to
itself, but should be avoided in other cases because it has a
large impact on performance. Use a value in the thousands if
possible.
If you are using HTTPS, you may need to accept invalid certificates, for example in a test setup, or when installing within a secure internal network. Setting the insecure property to true allow will allow self-signed certificates and certificates for the wrong host.
elastic.insecure=true
In addition to supporting the standard pattern of data processing, the Elasticsearch data store supports export mode. This can be appropriate for Elasticsearch indexes which will not be read by the data hub, but which will be used by applications.
To switch on export mode, add the following to your schema.
{
...
"export": true,
"exportIdentifier": "source_guid",
"exportStandardFields": false|true
...
}
Setting export to true turns on export mode for the
entity. Elasticsearch will allow you to overwrite existing
records, so exporting a row with the same id updates the row. The
export will be passed an indicator of deletes, and this is to
delete existing documents from the index.
Use exportIdentifier to set a different id field for the
index. The default id is a concatenation of the guid and the valid
from timestamp, so every input record will be written as a
separate document. However, in many cases, the data used to build
the index comes from another table with a different guid. If you
were, for example, to extract a source table's guid into field
source_guid, and use this as the id, then re-extracting the data
will update it rather than insert new rows with a different guid.
Alternatively, you could use a message filter to generate a
suitable id field.
In most cases, it does not make sense to generate the standard fields (guid, valid_from_timestamp, valid_to_timestamp, deleted_indicator, source_message and field_provenance) in the search index, either because they are not required or because you need different fields to reference those from a source table. Set exportStandardFields to true if you want these fields.
You must set exportIdentifier if exportStandardFields is false.
If you set writeStandardFields to true, the index will be
accessible by the Query function. However, you may need to set
Elasticsearch field mappings, particularly to indicate that text
fields that may be used in the query are keywords.
In this example, a sale_transaction table holds details of sales
transactions. Separate product and location tables hold details
about the product and the location at which it was purchased. We
want to send a denormalised version of this data to Elasticsearch.
We also want to be able to rebuild the index if required.
For maximum flexibility, we can break this down into a number of different processes.
Use the Script plug-in as the message processor to submit a trigger message. Write a where clause in the options to parameterise the trigger message. The script will be something like:
var message = attributes.get("message");
var ml = new java.com.metrici.datahub.MessageLoader(message.getContext());
ml.setSystem(message.getSystem());
ml.setEntity("sales_transaction_index_trigger");
ml.setFormat("data");
ml.setMessage('[]');
ml.setOptions(JSON.stringify({where:{source_message: message.getMessageIdentifier()}}));
if ( !ml.run() ) {
attributes.put("status",ml.getStatus());
attributes.put("errorMessage",ml.getErrorMessage());
}
In sales_transaction_index_trigger, set the target entity to null. Use the QueryResubmissionProcessor as the message processor. Configure it with the whereOptions property to use the where clause from the options. The options for the QueryResubmissionProcessor will be something like:
{
"query": {
"entity": "sales_transaction",
"whereOptions": true,
"fields": {
{
"name": "guid",
"as": "source_guid"
}
"transaction_timestamp",
"transaction_quantity",
"transaction_unit_price",
"transaction_total_price",
{
"name": "product_identifier",
"fields": {
"product_number",
"product_name"
}
},
{
"name": "location_identifier",
"fields": {
"location_number",
"location_name"
}
}
}
},
"entity": "sales_transaction_index"
}
The write to sales_transaction_index is like a normal entity
declaration. However, we need to set extended properties for this.
Set the data_store property of the entity to
elastic_sales_transaction_index, and set the properties to
something like.
elastic_sales_transaction_index.url=http://localhost:9200
elastic_sales_transaction_index.user=admin
elastic_sales_transaction_index.password=admin
elastic_sales_transaction_index.prefix=yourcompany
elastic_sales_transaction_index.commitFrequency=1000
elastic_sales_transaction_index.insertOnly=true
elastic_sales_transaction_index.idField=source_guid
elastic_sales_transaction_index.writeStandardFields=false
This specifies that we'll use the source_guid field (originally from the sales_transaction entity) as the document index field, and we won't write standard fields. We don't need the deleteField option because we are writing fact data that does not have deletes.
(If we were writing reference data with deletes, we would include the deleted_indicator in the query, and then reference it as the deleteField.)
In sales_transaction_index_rebuild, set the target entity to null. Set the message processor to a script re-processor. In this script, generate multiple trigger messages, to refresh the index.
You might want some control over what triggers are generated - refreshing the entire index might not be feasible. In this example, we refresh it one location at a time.
var message = attributes.get("message");
var query = new java.com.metrici.datahub.Query(message.getContext());
query.setUser(message.getUser());
var queryObject = {
entity: "location",
fields: ["location_number"]
};
if ( !query.run(queryObject) ) {
attributes.put("status",MessageStatus.PROCESSOR_ERROR);
attributes.put("errorMessage","Error returned from query: " + query.getErrorMessage());
return;
}
var records = JSON.parse(query.getResponse());
for ( var i = 0 ; i < records.length ; i++ ) {
var ml = new java.com.metrici.datahub.MessageLoader(message.getContext());
ml.setSystem(message.getSystem());
ml.setEntity("sales_transaction_index_trigger");
ml.setFormat("data");
ml.setMessage('[]');
ml.setOptions(JSON.stringify({where:{location_number:records[i].location_number}}));
if ( !ml.run() ) {
attributes.put("status",ml.getStatus());
attributes.put("errorMessage",ml.getErrorMessage());
}
});