Data hub processing

Datahub documentation home

This section describes how the data hub processes messages.

Message receipt

Messages are passed to the data hub through the web service interface or from files loaded using the command-line tool.

The messages conform to a message format. A simple message might look something like this.

{
"entity":"product",
"timestamp":"2019-06-05T09:31:17.000",
"data": [
{
"product_number": 1234567,
"product_description": "Breville Toaster"
},
{
"product_number": 2345678,
"product_description": "Kenwood Kettle"
}
]
}

The message is stored in the message store table. A message identifier and message timestamp are generated.

message_identifier
source_entity
message_timestamp
effective_timestamp
data
status
m1
product
... 09:45 ...
.. 09:31 ..
[{"product_number": 1234567, "product_description": "Breville Toaster"},{"product_number": 2345678, "product_description": "Kenwood Kettle"}]
0

In the example above:

Message processing

The message is passed to the message processor, either immediately or through a background process that looks for unprocessed messages. The message status is updated to indicate that the message is being processed.

The message processor looks up an appropriate source system definition from the schema, and from this gets a list of mappings to target entities that are appropriate for the source entity. The mapping describes how to read the message data and the target fields to which the message data should be written. In the example above we have not set a source system, so the default mapping (in which source is the same as target) is used.

The message processor processes the message, as described in the remaining sections, and then updates the message store and associated message log. Errors are captured using the error message field on the message store and by setting the status.

Message readers

The data from the message is passed to a message reader. This returns an object (of type Iterable<Object>) which can be used to read the JSON in the message one-at-a-time.

In the example above, the data is already a JSON array of JSON objects, so the default message reader can be used. (Because the default mapping is used, the default message reader is always used.) Alternative message readers can be specified in the schema. These can be used to read data in different formats, for example using a CSV message reader to read incoming CSV data.

Message readers can be used to read data stored externally to the message store table. For example, if the data is held in a file and the message data field holds the name of the file, a file message reader could be used to read the file.

Inserts

In the simplest case, new data is inserted into to the target table. For example, in the example above, the following would be written to the product table.

guid
valid_from_timestamp
valid_to_timestamp
deleted_indicator
product_number
product_description
source_message
g1
.. 09:31 ..
null
false
1234567 Breville Toaster m1
g2
.. 09:31 ..
null
false
2345678 Kenwood Kettle m1

Note:

The valid from timestamp is taken from the effective timestamp originally passed in the message.

Updates

Before data is inserted, the data store is called to check if matching data is already present. The check is performed for each incoming record, and is passed a JSON object with a copy of the fields that have been identified as keys. (If link fields are in the key, these are resolved before reading the entity, see later sections.)

In this example, the product_number is the key field and so for the first record the data store would be passed.

{
"product_number":1234567
}

Let's assume we had a second message coming into the data hub with the following data.

{
"entity":"product",
"timestamp":"2019-06-05T10:10:14.000",
"data": [
{
"product_number": 1234567,
"product_description": "Breville Toaster"
},
{
"product_number": 2345678,
"product_description": "Kenwood Automatic Kettle"
},
{
"product_number": 3456789,
"product_description": "Panasonic Microwave"
}
]
}

Each record is processed separately.

Updates are created by:

The updated table would be.

guid
valid_from_timestamp
valid_to_timestamp
deleted_indicator
product_number
product_description
source_message
g1
.. 09:31 ..
null
false
1234567 Breville Toaster m1
g2
.. 09:31 ..
.. 10:10 .. false
2345678 Kenwood Kettle m1
g2
.. 10:10 ..
null
false
2345678 Kenwood Automatic Kettle
m2
g3
.. 10:10 .. null
false
3456789
Panasonic Microwave
m2

Deletes

Deletes are indicated by including the deleted_indicator field in the incoming message. This works just like an update, except:

If there is no existing row, a delete row is inserted. This is required to fully capture the incoming delete, and to prevent a lower-priority update creating the row when it should not.

Assuming the following were passed

{
"entity":"product",
"timestamp":"2019-06-05T10:45:19.000",
"data": [
{
"product_number": 3456789,
"deleted_indicator": true
} ]
}

The table would end up something like:

guid
valid_from_timestamp
valid_to_timestamp
deleted_indicator
product_number
product_description
source_message
g1
.. 09:31 ..
null
false
1234567 Breville Toaster m1
g2
.. 09:31 ..
.. 10:10 .. false
2345678 Kenwood Kettle m1
g2
.. 10:10 ..
null
false
2345678 Kenwood Automatic Kettle
m2
g3
.. 10:10 .. .. 10:45 ..
false
3456789
Panasonic Microwave
m2
g3
.. 10:45
null
true
3456789 null
m3

Field provenance

The field_provenance holds a JSON object that, for each field, holds an object with the identifier of the message that was used to create the field (property m). It also holds a priority (property p), which defaults to 0, and which is covered in a later section. The field provenance field holds provenance for each of the business fields that have ever been populated, and for the row itself using a field reference of "guid".

The rules for setting field provenance are summarised below.

Action
Rule
Example
Insert
The message identifier is set to the message that created the row.

After inserting the toaster:

{
"guid":{"m":"m1","p":0},
"product_number":{"m":"m1","p":0},
"product_description":{"m":"m1","p":0} }
Update

The message identifier is updated for all fields that are updated.

The message identifier for the key fields is not updated unless they have changed. In most cases, with simple keys, the keys do not change. However, if you are updating using weak composite keys (say, using employee number or national insurance number to identify a person), it is possible to update a key field on an update.


After updating the kettle:

{
"guid":{"m":"m1","p":0},
"product_number":{"m":"m1","p":0},
"product_description":{"m":"m2","p":0} }
Delete

The message identifier is updated for the guid and for all the fields that were deleted.

The message identifiers for the key fields is not updated unless they have changed, using the same rules as update.

The non-key fields (in this case product_description) are updated if they are present in the field provenance of the previous row (which shows they have previously been set). Otherwise they are not set.

After deleting the microwave:

{
"guid":{"m":"m3","p":0},
"product_number":{"m":"m2","p":0},
"product_description":{"m":"m3","p":0} }

Out-of-sequence records

The valid from timestamp is set from the timestamp on the incoming message (the effective_timestamp on the message store).

Messages are not necessarily received or processed in effective timestamp sequence.

What happens when an out-of-sequence record is processed?

The simple answer is that the database rows are updated as if the records had been processed in sequence.

For example, if an update to the kettle timed 09:50 was received after the one for 10:10, with another different description (say, "Kenwood Auto Kettle") the following would happen.

Out of sequence processing is affected by priority. This is covered in a later section.

Source systems and entity and field mapping

In the above examples, no source system was set for the incoming message, and the entity name and field names on the input message match the table name and column names on the target table.

In the schema, you can define a source system which allows you to specify that a message is written to one or more target entities, and for each target entity:

You can use this in a number of ways.

The term "source system" could be considered a misnomer. Often if does identify different source systems with different processing needs. However, it really just represents a collection of data processing rules. It may be that more than one data source shares the same "source system" definition. The user field on the message can be used to record the sender of the data.

If you map a source entity to a different target entity to perform normalisation, be careful that you don't accidentally mess up key resolution. For example, imagine you have a person record which has denormalised department details on it, including business keys of both. You could set up a source entity of person that maps firstly to the department target entity and secondly to the person target entity. This would normalise the data and ensure that the department was always there when the person record needs to link to it. However, if on another table, you were attempting to link to person, it would attempt to resolve the key of person using the first target entity of the source entity person, i.e. the department table. To avoid this problem, for entities that are the parents of any links, do not use the target entity reference as the a source entity reference unless the target entity is the first entity for the source.

Data type conversion

Incoming data is JSON and fields can only have a data type of string, number or boolean. These are converted automatically to the appropriate target type.

Strings are converted to numbers, and numbers to strings, as necessary. If strings cannot be converted to numbers, an error is raised. If dates are in an incorrect format, an error is raised.

Any field can be null. If an incoming record contains a null for the field, the null can be used to overwrite existing data. If the incoming record does not have a field, existing data is not set to null.

If the target type is boolean, the following are considered true:

The following are considered false:

Anything else, other than null, is an error.

Target text may have a length, and will be truncated as necessary.

When converting to integer data types, any decimal points are rounded to the nearest integer.

When converting to the decimal type, numbers are truncated to the appropriate scale, and rounded to the appropriate number of decimal places.

Priorities

When more than one source can target the same entity, priorities controls which entity takes precedence.

For each source entity, the schema defines a priority for the target entity and each field. The target entity priority is used as the default for the field priority, and if there is no entity priority all priorities default to 0. Higher priorities take precedence, and priorities can be negative.

These priorities are used to set the priorities written to the field provenance. The entity priority is used for the guid field.

When an update is processed, priorities are examined for each field to determine whether the update should take place.

The message/priority of the guid is updated if the incoming entity priority is greater than the existing guid priority, even if none of the fields are updated. If the incoming entity priority is less than or equal to the current guid priority, the message/priority of the guid is not updated, even if fields are updated.

For a delete, or an update after a delete (i.e. a recreate), the priority of the guid field is used to determine whether the delete or update should take place.

In the unusual case of deleting a record that has already been deleted, the incoming delete is actioned only if the source entity has a priority higher than the existing guid. This means that a low priority delete can be made higher priority, preventing it being undone by a low priority insert.

Link fields

Link fields represent a link to another entity.

Link fields are associated with one or more source fields identified by sourceLinkKey, which defaults to linkKey. The source fields and their values are collated into an object, translating the source field names into target field names as necessary, and the data store called to look up the appropriate identifier.

For example, consider a source system definition with an order source entity. Here is an incomplete definition of the mappings part of the schema.

"mappings": {
"order": [
{
"reference":"order",
"fields": [
{
"reference":"order_customer",
"type":"link",
"linkEntity":"customer",
"linkKey": "customer_number"
}
]
}
]
}

If the following order record were processed.

{
"order_number": 123456,
"customer_number": "918723",
"customer_name": "Bill Jones",
"total_price": 194.27
}

In this case, the target field "order_customer" would be found by calling the data store using the identifier() method, passing it the following key:

{
"customer_number": "918723"
}

If all the key fields are null, the data store is not called and order_customer would be set to null.

If the data store returns a single identifier, this is used as the value for order_customer.

If the data store returns the MULTIPLE_IDENTIFIERS condition, an error is raised and this record is not processed.

If the data store return null (which means the key can't be found), the record is not processed but the message is marked as requires reprocessing (status code 1).

Reprocessing

Messages can be reprocessed simply by submitting them again to the message processor. Reprocessing a message allows new target entities on the schema to be populated. It is even possible to delete all of the data on the target data stores and reprocess all messages (though a prudent systems administrator would take a backup before doing so, just in case).

If, when a record is reprocessed, a matching row already exists and the valid from timestamp on the row matches the effective timestamp of the message, the message is ignored.

Messages that are marked as requires reprocessing are automatically resubmitted periodically, up to a maximum delay. This means that foreign keys that were not matched (for example, because of differences in timing of processing) can be matched when reprocessed.

The data hub does not track the processing status of individual records, and reprocessing a large message just to resolve one or two missing foreign keys is relatively expensive. It is better to design processing to avoid this situation where possible.

Refresh

An incoming message can be marked as a refresh. This means that the data on the message should replace all the data on the target entity.

In these cases, the data store for the entity is called using the identifiers() method and passing it an empty set of keys, and ignoring deletes. This retrieves the guids of all non-deleted existing rows for the target entity.

At the end of the processing the message, deletes are generated for each of the guids on the existing rows that have not been seen when processing the incoming data. These deletes will retain all the fields identified as keys by the entity mapping.

Refresh processing obeys priority rules. If more than one source can write to an entity, the one that performs the refresh should be given the highest priority.

Children fields

Some entities are owned by other entities. For example, an order line is owned by an order. If an order were to be deleted, all the order lines should be deleted too.

These child entities can be passed into the datahub like any other entity. They would have a link field back to the parent entity, and would have this link field in the key. The link field would be resolved to an identifier before the data store for the child entity is called.

Alternatively (or as well), the child entities can be passed in the parent entity message passing the child entity objects in an array. The child entity objects should not have the key fields for the parent. In the example below, the order_lines is a field of type children.

[
{
"order_number": 123456,
"order_lines": [
{
"product": 817272,
"quantity": 1,
"unit_price": 19.20
},
{
"product": 919215,
"quantity": 4,
"unit_price": 2.19
} ]
}

This does not mean that there is a column called "order_lines" on the order table; it is just a convention for linking order line rows to the order table.

The processing of child records is driven by the schema - see the childEntity, parentIdentifier,  sourceParentIdentifier, childSequence and sourceChildSequence properties of the field mapping.

The list of child entities is processed:

If the parent row is being deleted, child row refresh processing with a partial key is invoked to delete any child rows that relate to this parent.

Entities referenced in children field definitions can also be updated in their own right, or by other parents. For example, entities that link entities (such as, say, person-in-role) could be defined as children of two parents (person and role, in this case).

Unique processing

Some entities represent facts, such as sales transactions or IoT events, and every message passed to the data hub is a unique set of facts.

If this is the case, set the unique property on the target entity to true, set the uniqueIdentifier to a field on the target entity that should hold a unique identifier for this batch of records, and the uniqueSequence to a field on the target entity to hold a record number within the batch. Include these fields in the field list, indicating that they are keys.

The identifier for the message is used as the value for the unique identifier field and the unique sequence field is set to the record number within the message, starting at 1. These two fields are then processed as the key fields for the record. The records are then processed as other records. This allows the records to be reprocessed, which may be required for late resolution of foreign keys. (The uniqueSequence is optional, and other unique keys could be used if you are confident every record will have them, but in most situations the combination of uniqueIdentifier and uniqueSequence should be used.)

The column identified by the unique identifier field holds the same value as the message identifier column. However, these have different roles. The unique identifier field is considered a business field, and may be used to group together related records. The message identifier may be removed from extracts of the data, and so is unsuitable for this purpose.

Duplicate messages

Normal data hub processing will examine data and cope with receiving the same data twice, unless the unique property is set. However, in some scenarios it is useful to prevent the data hub from storing and processing duplicate messages. This can be particularly useful when receiving files and unique data.

For example, message queuing software may present the same message to the data hub twice, but the second one should not be stored or processed.

In these cases, incoming messages can provide their own message identifier using the guid property. If another message with the same guid is encountered, it will be rejected as already received. A non-error status of 5 (previously received) is returned, not an error.

When using this feature, the solution designer must satisfy themselves that it is necessary, and that the id scheme used to the messages will result in unique ids that do not clash unless they are truly duplicate messages. A general approach to this would be to generate a suitable type 5 UUID based on a suitable namespace UUID and a string which is a concatentation of the data that uniquely identifies the message.

Export

Some entities represent data held to be used by other systems which do not support the data hub's versioning functionality. This can be useful for entities used for reporting, or, for example, Elasticsearch indexes.

These exports can be maintained through export mode. In this, all records are presented to the data store class using the export() method. Records are presented to the data store as if they were inserts or deletes.

The export capability is controlled by three properties.

export
Set to true to indicate that this is an export entity.
exportIdentifier
Where the data is being exported to an keyed structure (such as Elasticsearch), specifies how the identifier for the record should be built. The value depends on the dataStore class. For Elasticsearch, it identifies a field that should be used for the document id.
exportStandardFields
Set to true to indicate that the standard fields should be exported. In most cases, these are not relevant. If these are added, the export entity may be accessible by other functions (such as Query). Defaults to false.

Not all data store classes support export functionality. (At time of writing, only the ElasticSearch data store classes supports this.)

It generally does not make sense for export entities to be link targets or to have children. If exportStandardFields is true, it may (depending on the external structure) be possible to use an external entity in this way, provided that the data store supports the appropriate accesses.