Post processing is additional processing that is run after data has been written to target entities.
The post processing is specified on the target entity schema:
It multiple target entities are specified for a source entity,
post processing is run only once, after the last traget entity is
written, using the processor and config defined on the first
entity. The post processor is not run if there are any errors
writing data to the target entities.
The post processor class must be an extension of the PlugIn class.
The post processor is instantiated with the procesorConfig string. When the plugin is run, it may access the message (of class Message) from the "message" attribute. This can be used to get details of the message (such as the guid), to identify the data recently written to the store.
The post processor can set an error by setting the "status" attribute to a non-zero number and the "errorMessage" attribute to a description of the error.
The post processor can read existing data and add new messages to the store to create new data (the post processor should not directly update data within the target tables, as this could break the processing elsewhere in the data hub).
Use the MessageLoader class to load new messages into the data store. A typical use would be:
Message message = attributes.get("message");
MessageLoader ml = new MessageLoader(message.getContext());
ml.setSystem(message.getSystem());
ml.setEntity("new_entity");
ml.setFormat("data");
ml.setMessage("{ ... your message }");
if ( !ml.run() ) {
attributes.put("status",ml.getStatus());
attributes.put("errorMessage",ml.getErrorMessage());
}
Use the Query class to query data. A typical use would be:
Message message = attributes.get("message");
Query query = new Query(message.getContext());
query.setUser(message.getUser());
JSONObject queryObject = new JSONObject();
queryObject.put("entity",message.getEntity());
if ( !query.run(queryObject) ) {
attributes.put("status",MessageStatus.PROCESSOR_ERROR);
attributes.put("errorMessage","Error returned from query: " + query.getErrorMessage());
return;
}
String response = query.getResponse();
.. Do something with the response
The query resubmission processor (class name com.metrici.datahub.QueryResubmissionProcessor) is a post-processor which queries the database and submits the results of the query back to the datahub as new data. It is useful for performing denormalisation.
The query resubmission processor is driven by the config parameter, which contains a JSON string of the following format.
{
"query": {
"entity": true|"query entity",
"where": true|{
.. where clause ..
},
"mergeWhere": true|false,
"fields": [
.. fields ..
],
"deletes": true|false
},
"system": true|"system for new message",
"entity": "entity for new message",
"user": true|"userID for new message,
"timestamp": true|false,
"refresh": true|false,
"process": true|false
}
| query |
The query clause. See Data
hub query for details. Follows the same format as that
used through the web service, and may have entity, where and
fields properties. |
| query.entity |
The entity to be queried. Use a value of
exactly true, or omit the entity, to query the entity set on
the incoming message. Note that the entity on the incoming
message may be a source entity and that all query entities
need to be target entities. |
| query.where |
Optional object the properties of which will
be used to filter the records. If set to true, the where
clause will be set to return the records processed by the
incoming message, i.e."where": {
Where the messageId is the id of the source
message. |
| query.mergeWhere |
Read the where clause from the message
options, if any, and add this to the where clause. The
message options are used as defaults, and are overwritten by
the where clause in the query specification. |
| query.fields |
Optional list of fields to be returned. See Data hub query for information about
field lists and how to use them to denormalise the data. |
| query.deletes |
Set to true to return deletes that from the
target table, which will be used to generate delete records
for submission back into the data hub. If you set deletes, the delete_indicator will automatically be added to the list of fields. |
| system |
Source system to use when writing the results
back to the datahub. Omit or set to true to use the same
source system as the incoming message. Set to false to not
set the system. |
| entity |
Entity to use when writing the results back
to the datahub. This should not be the same as the source
entity. |
| user |
Id of the user to use when writing results
back to the datahub, and for running the query. Omit or set
to true to use the same user as the incoming message. |
| refresh |
Set to true if the new data is a complete
refresh of the entity. |
| writeFile |
Set to true to write the query output as a
file in the data hub's file store. The message will contain
the standard file info and can be reprocessed using the
readFile attribute on the reader. |
| process |
Set to true to process the new data
immediately as soon as it is written to the message store
(leave false for background updates). |
The new message is written with a new effective timestamp, not that of the incoming message. (If the same timestamp were used, then rerunning the incoming message would generate duplicate data which would be rejected.)
If you want to process the source message twice using different
message readers, you can use the message reprocessor
post-processing plug in to reprocess the message from the start
after it has been processed once. Unlike the message loader, this
does not create a new message, but processes the same message
again.
The message reprocessor has class com.metrici.datahub.MessageReprocessor.
It takes JSON config, in which the "entity" property should be
set to the source entity through which you want to reprocess the
original message. The entity must be different from your original
source entity.
{
"entity": "sales_summary"
}