Post processing

Datahub documentation home

Post processing is additional processing that is run after data has been written to target entities.

The post processing is specified on the target entity schema:

It multiple target entities are specified for a source entity, post processing is run only once, after the last traget entity is written, using the processor and config defined on the first entity. The post processor is not run if there are any errors writing data to the target entities.

The post processor class must be an extension of the PlugIn class.

The post processor is instantiated with the procesorConfig string. When the plugin is run, it may access the message (of class Message) from the "message" attribute. This can be used to get details of the message (such as the guid), to identify the data recently written to the store.

The post processor can set an error by setting the "status" attribute to a non-zero number and the "errorMessage" attribute to a description of the error.

The post processor can read existing data and add new messages to the store to create new data (the post processor should not directly update data within the target tables, as this could break the processing elsewhere in the data hub).

Message loader

Use the MessageLoader class to load new messages into the data store. A typical use would be:

Message message = attributes.get("message");
MessageLoader ml = new MessageLoader(message.getContext());
ml.setSystem(message.getSystem());
ml.setEntity("new_entity");
ml.setFormat("data");
ml.setMessage("{ ... your message }");
if ( !ml.run() ) {
attributes.put("status",ml.getStatus());
attributes.put("errorMessage",ml.getErrorMessage());
}

Query

Use the Query class to query data. A typical use would be:

Message message = attributes.get("message");
Query query = new Query(message.getContext());
query.setUser(message.getUser());
JSONObject queryObject = new JSONObject();
queryObject.put("entity",message.getEntity());
if ( !query.run(queryObject) ) {
attributes.put("status",MessageStatus.PROCESSOR_ERROR);
attributes.put("errorMessage","Error returned from query: " + query.getErrorMessage());
return;
}
String response = query.getResponse();
.. Do something with the response

Query resubmission processor

The query resubmission processor (class name com.metrici.datahub.QueryResubmissionProcessor) is a post-processor which queries the database and submits the results of the query back to the datahub as new data. It is useful for performing denormalisation.

The query resubmission processor is driven by the config parameter, which contains a JSON string of the following format.

{
"query": {
"entity": true|"query entity",
"where": true|{
.. where clause ..
},
"mergeWhere": true|false,
"fields": [
.. fields ..
],
"deletes": true|false
},
"system": true|"system for new message",
"entity": "entity for new message",
"user": true|"userID for new message,
"timestamp": true|false,
"refresh": true|false,
"process": true|false
}
query
The query clause. See Data hub query for details. Follows the same format as that used through the web service, and may have entity, where and fields properties.
query.entity
The entity to be queried. Use a value of exactly true, or omit the entity, to query the entity set on the incoming message. Note that the entity on the incoming message may be a source entity and that all query entities need to be target entities.
query.where
Optional object the properties of which will be used to filter the records. If set to true, the where clause will be set to return the records processed by the incoming message, i.e.
"where": {
"source_message": "messageId"
}

Where the messageId is the id of the source message.

query.mergeWhere
Read the where clause from the message options, if any, and add this to the where clause. The message options are used as defaults, and are overwritten by the where clause in the query specification.
query.fields
Optional list of fields to be returned. See Data hub query for information about field lists and how to use them to denormalise the data.
query.deletes
Set to true to return deletes that from the target table, which will be used to generate delete records for submission back into the data hub.
If you set deletes, the delete_indicator will automatically be added to the list of fields.
system
Source system to use when writing the results back to the datahub. Omit or set to true to use the same source system as the incoming message. Set to false to not set the system.
entity
Entity to use when writing the results back to the datahub. This should not be the same as the source entity.
user
Id of the user to use when writing results back to the datahub, and for running the query. Omit or set to true to use the same user as the incoming message.
refresh
Set to true if the new data is a complete refresh of the entity.
writeFile
Set to true to write the query output as a file in the data hub's file store. The message will contain the standard file info and can be reprocessed using the readFile attribute on the reader.
process
Set to true to process the new data immediately as soon as it is written to the message store (leave false for background updates).

The new message is written with a new effective timestamp, not that of the incoming message. (If the same timestamp were used, then rerunning the incoming message would generate duplicate data which would be rejected.)

Message reprocessor

If you want to process the source message twice using different message readers, you can use the message reprocessor post-processing plug in to reprocess the message from the start after it has been processed once. Unlike the message loader, this does not create a new message, but processes the same message again.

The message reprocessor has class com.metrici.datahub.MessageReprocessor.

It takes JSON config, in which the "entity" property should be set to the source entity through which you want to reprocess the original message. The entity must be different from your original source entity.

{
"entity": "sales_summary"
}