This section describes how the data hub processes messages.
Messages are passed to the data hub through the web service interface or from files loaded using the command-line tool.
The messages conform to a message format. A simple message might look something like this.
{
"entity":"product",
"timestamp":"2019-06-05T09:31:17.000",
"data": [
{
"product_number": 1234567,
"product_description": "Breville Toaster"
},
{
"product_number": 2345678,
"product_description": "Kenwood Kettle"
}
]
}
The message is stored in the message
store table. A message identifier and message timestamp are
generated.
| message_identifier |
source_entity |
message_timestamp |
effective_timestamp |
data |
status |
| m1 |
product |
... 09:45 ... |
.. 09:31 .. |
[{"product_number": 1234567,
"product_description": "Breville
Toaster"},{"product_number": 2345678, "product_description":
"Kenwood Kettle"}] |
0 |
In the example above:
The message is passed to the message processor, either
immediately or through a background process that looks for
unprocessed messages. The message status
is updated to indicate that the message is being processed.
The message processor looks up an appropriate source system
definition from the schema, and from
this gets a list of mappings to target entities that are
appropriate for the source entity. The mapping describes how to
read the message data and the target fields to which the message
data should be written. In the example above we have not set a
source system, so the default mapping (in which source is the same
as target) is used.
The message processor processes the message, as described in the
remaining sections, and then updates the message store and
associated message log. Errors are captured using the error
message field on the message store and by setting the status.
The data from the message is passed to a message reader. This
returns an object (of type Iterable<Object>) which can be
used to read the JSON in the message one-at-a-time.
In the example above, the data is already a JSON array of JSON objects, so the default message reader can be used. (Because the default mapping is used, the default message reader is always used.) Alternative message readers can be specified in the schema. These can be used to read data in different formats, for example using a CSV message reader to read incoming CSV data.
Message readers can be used to read data stored externally to the message store table. For example, if the data is held in a file and the message data field holds the name of the file, a file message reader could be used to read the file.
In the simplest case, new data is inserted into to the target table. For example, in the example above, the following would be written to the product table.
| guid |
valid_from_timestamp |
valid_to_timestamp |
deleted_indicator |
product_number |
product_description |
source_message |
| g1 |
.. 09:31 .. |
null |
false |
1234567 | Breville Toaster | m1 |
| g2 |
.. 09:31 .. |
null |
false |
2345678 | Kenwood Kettle | m1 |
Note:
The valid from timestamp is taken from the effective timestamp
originally passed in the message.
Before data is inserted, the data store is called to check if
matching data is already present. The check is performed for each
incoming record, and is passed a JSON object with a copy of the
fields that have been identified as keys. (If link fields are in
the key, these are resolved before reading the entity, see later
sections.)
In this example, the product_number is the key field and so for the first record the data store would be passed.
{
"product_number":1234567
}
Let's assume we had a second message coming into the data hub with the following data.
{
"entity":"product",
"timestamp":"2019-06-05T10:10:14.000",
"data": [
{
"product_number": 1234567,
"product_description": "Breville Toaster"
},
{
"product_number": 2345678,
"product_description": "Kenwood Automatic Kettle"
},
{
"product_number": 3456789,
"product_description": "Panasonic Microwave"
}
]
}
Each record is processed separately.
Updates are created by:
The updated table would be.
| guid |
valid_from_timestamp |
valid_to_timestamp |
deleted_indicator |
product_number |
product_description |
source_message |
| g1 |
.. 09:31 .. |
null |
false |
1234567 | Breville Toaster | m1 |
| g2 |
.. 09:31 .. |
.. 10:10 .. | false |
2345678 | Kenwood Kettle | m1 |
| g2 |
.. 10:10 .. |
null |
false |
2345678 | Kenwood Automatic Kettle |
m2 |
| g3 |
.. 10:10 .. | null |
false |
3456789 |
Panasonic Microwave |
m2 |
Deletes are indicated by including the deleted_indicator field in the incoming message. This works just like an update, except:
Assuming the following were passed
{
"entity":"product",
"timestamp":"2019-06-05T10:45:19.000",
"data": [
{
"product_number": 3456789,
"deleted_indicator": true
}
]
}
The table would end up something like:
| guid |
valid_from_timestamp |
valid_to_timestamp |
deleted_indicator |
product_number |
product_description |
source_message |
| g1 |
.. 09:31 .. |
null |
false |
1234567 | Breville Toaster | m1 |
| g2 |
.. 09:31 .. |
.. 10:10 .. | false |
2345678 | Kenwood Kettle | m1 |
| g2 |
.. 10:10 .. |
null |
false |
2345678 | Kenwood Automatic Kettle |
m2 |
| g3 |
.. 10:10 .. | .. 10:45 .. |
false |
3456789 |
Panasonic Microwave |
m2 |
| g3 |
.. 10:45 |
null |
true |
3456789 | null |
m3 |
The field_provenance holds a JSON object that, for each field,
holds an object with the identifier of the message that was used
to create the field (property m). It also holds a priority
(property p), which defaults to 0, and which is covered in a later
section. The field provenance field holds provenance for each of
the business fields that have ever been populated, and for the row
itself using a field reference of "guid".
The rules for setting field provenance are summarised below.
| Action |
Rule |
Example |
| Insert |
The message identifier is set to the message
that created the row. |
After inserting the toaster: {
|
| Update |
The message identifier is updated for all fields that are updated. The message identifier for the key fields is not updated unless they have changed. In most cases, with simple keys, the keys do not change. However, if you are updating using weak composite keys (say, using employee number or national insurance number to identify a person), it is possible to update a key field on an update.
|
After updating the kettle: {
|
| Delete |
The message identifier is updated for the guid and for all the fields that were deleted. The message identifiers for the key fields is not updated
unless they have changed, using the same rules as
update. The non-key fields (in this case product_description) are updated if they are present in the field provenance of the previous row (which shows they have previously been set). Otherwise they are not set. |
After deleting the microwave: {
|
The valid from timestamp is set from the timestamp on the incoming message (the effective_timestamp on the message store).
Messages are not necessarily received or processed in effective timestamp sequence.
What happens when an out-of-sequence record is processed?
The simple answer is that the database rows are updated as if the records had been processed in sequence.
For example, if an update to the kettle timed 09:50 was received after the one for 10:10, with another different description (say, "Kenwood Auto Kettle") the following would happen.
Out of sequence processing is affected by priority. This is covered in a later section.
In the above examples, no source system was set for the incoming message, and the entity name and field names on the input message match the table name and column names on the target table.
In the schema, you can define a source system which allows you to
specify that a message is written to one or more target entities,
and for each target entity:
You can use this in a number of ways.
The term "source system" could be considered a misnomer. Often if does identify different source systems with different processing needs. However, it really just represents a collection of data processing rules. It may be that more than one data source shares the same "source system" definition. The user field on the message can be used to record the sender of the data.
If you map a source entity to a different target entity to
perform normalisation, be careful that you don't accidentally mess
up key resolution. For example, imagine you have a person record
which has denormalised department details on it, including
business keys of both. You could set up a source entity of person
that maps firstly to the department target entity and secondly to
the person target entity. This would normalise the data and ensure
that the department was always there when the person record needs
to link to it. However, if on another table, you were attempting
to link to person, it would attempt to resolve the key of person
using the first target entity of the source entity person, i.e.
the department table. To avoid this problem, for entities that are
the parents of any links, do not use the target entity reference
as the a source entity reference unless the target entity is the
first entity for the source.
Incoming data is JSON and fields can only have a data type of string, number or boolean. These are converted automatically to the appropriate target type.
Strings are converted to numbers, and numbers to strings, as necessary. If strings cannot be converted to numbers, an error is raised. If dates are in an incorrect format, an error is raised.
Any field can be null. If an incoming record contains a null for
the field, the null can be used to overwrite existing data. If the
incoming record does not have a field, existing data is not set to
null.
If the target type is boolean, the following are considered true:
The following are considered false:
Anything else, other than null, is an error.
Target text may have a length, and will be truncated as necessary.
When converting to integer data types, any decimal points are
rounded to the nearest integer.
When converting to the decimal type, numbers are truncated to the appropriate scale, and rounded to the appropriate number of decimal places.
When more than one source can target the same entity, priorities
controls which entity takes precedence.
For each source entity, the schema defines a priority for the
target entity and each field. The target entity priority is used
as the default for the field priority, and if there is no entity
priority all priorities default to 0. Higher priorities take
precedence, and priorities can be negative.
These priorities are used to set the priorities written to the
field provenance. The entity priority is used for the guid field.
When an update is processed, priorities are examined for each field to determine whether the update should take place.
The message/priority of the guid is updated if the incoming
entity priority is greater than the existing guid priority, even
if none of the fields are updated. If the incoming entity priority
is less than or equal to the current guid priority, the
message/priority of the guid is not updated, even if fields are
updated.
For a delete, or an update after a delete (i.e. a recreate), the
priority of the guid field is used to determine whether the delete
or update should take place.
In the unusual case of deleting a record that has already been
deleted, the incoming delete is actioned only if the source entity
has a priority higher than the existing guid. This means that a
low priority delete can be made higher priority, preventing it
being undone by a low priority insert.
Link fields represent a link to another entity.
Link fields are associated with one or more source fields identified by sourceLinkKey, which defaults to linkKey. The source fields and their values are collated into an object, translating the source field names into target field names as necessary, and the data store called to look up the appropriate identifier.
For example, consider a source system definition with an order
source entity. Here is an incomplete definition of the mappings
part of the schema.
"mappings": {
"order": [
{
"reference":"order",
"fields": [
{
"reference":"order_customer",
"type":"link",
"linkEntity":"customer",
"linkKey": "customer_number"
}
]
}
]
}
If the following order record were processed.
{
"order_number": 123456,
"customer_number": "918723",
"customer_name": "Bill Jones",
"total_price": 194.27
}
In this case, the target field "order_customer" would be found by calling the data store using the identifier() method, passing it the following key:
{
"customer_number": "918723"
}
If all the key fields are null, the data store is not called and
order_customer would be set to null.
If the data store returns a single identifier, this is used as the value for order_customer.
If the data store returns the MULTIPLE_IDENTIFIERS condition, an error is raised and this record is not processed.
If the data store return null (which means the key can't be found), the record is not processed but the message is marked as requires reprocessing (status code 1).
Messages can be reprocessed simply by submitting them again to the message processor. Reprocessing a message allows new target entities on the schema to be populated. It is even possible to delete all of the data on the target data stores and reprocess all messages (though a prudent systems administrator would take a backup before doing so, just in case).
If, when a record is reprocessed, a matching row already exists and the valid from timestamp on the row matches the effective timestamp of the message, the message is ignored.
Messages that are marked as requires reprocessing are automatically resubmitted periodically, up to a maximum delay. This means that foreign keys that were not matched (for example, because of differences in timing of processing) can be matched when reprocessed.
The data hub does not track the processing status of individual records, and reprocessing a large message just to resolve one or two missing foreign keys is relatively expensive. It is better to design processing to avoid this situation where possible.
An incoming message can be marked as a refresh. This means that the data on the message should replace all the data on the target entity.
In these cases, the data store for the entity is called using the identifiers() method and passing it an empty set of keys, and ignoring deletes. This retrieves the guids of all non-deleted existing rows for the target entity.
At the end of the processing the message, deletes are generated for each of the guids on the existing rows that have not been seen when processing the incoming data. These deletes will retain all the fields identified as keys by the entity mapping.
Refresh processing obeys priority rules. If more than one source
can write to an entity, the one that performs the refresh should
be given the highest priority.
Some entities are owned by other entities. For example, an order line is owned by an order. If an order were to be deleted, all the order lines should be deleted too.
These child entities can be passed into the datahub like any
other entity. They would have a link field back to the parent
entity, and would have this link field in the key. The link field
would be resolved to an identifier before the data store for the
child entity is called.
Alternatively (or as well), the child entities can be passed in
the parent entity message passing the child entity objects in an
array. The child entity objects should not have the key fields for
the parent. In the example below, the order_lines is a field of
type children.
[
{
"order_number": 123456,
"order_lines": [
{
"product": 817272,
"quantity": 1,
"unit_price": 19.20
},
{
"product": 919215,
"quantity": 4,
"unit_price": 2.19
} ]
}
This does not mean that there is a column called "order_lines" on
the order table; it is just a convention for linking order line
rows to the order table.
The processing of child records is driven by the schema - see the childEntity, parentIdentifier, sourceParentIdentifier, childSequence and sourceChildSequence properties of the field mapping.
The list of child entities is processed:
If the parent row is being deleted, child row refresh processing with a partial key is invoked to delete any child rows that relate to this parent.
Entities referenced in children field definitions can also be
updated in their own right, or by other parents. For example,
entities that link entities (such as, say, person-in-role) could
be defined as children of two parents (person and role, in this
case).
Some entities represent facts, such as sales transactions or IoT events, and every message passed to the data hub is a unique set of facts.
If this is the case, set the unique property on the target entity
to true, set the uniqueIdentifier to a field on the target entity
that should hold a unique identifier for this batch of records,
and the uniqueSequence to a field on the target entity to hold a
record number within the batch. Include these fields in the field
list, indicating that they are keys.
The identifier for the message is used as the value for the unique identifier field and the unique sequence field is set to the record number within the message, starting at 1. These two fields are then processed as the key fields for the record. The records are then processed as other records. This allows the records to be reprocessed, which may be required for late resolution of foreign keys. (The uniqueSequence is optional, and other unique keys could be used if you are confident every record will have them, but in most situations the combination of uniqueIdentifier and uniqueSequence should be used.)
The column identified by the unique identifier field holds the same value as the message identifier column. However, these have different roles. The unique identifier field is considered a business field, and may be used to group together related records. The message identifier may be removed from extracts of the data, and so is unsuitable for this purpose.
Normal data hub processing will examine data and cope with
receiving the same data twice, unless the unique property is set.
However, in some scenarios it is useful to prevent the data hub
from storing and processing duplicate messages. This can be
particularly useful when receiving files and unique data.
For example, message queuing software may present the same message to the data hub twice, but the second one should not be stored or processed.
In these cases, incoming messages can provide their own message identifier using the guid property. If another message with the same guid is encountered, it will be rejected as already received. A non-error status of 5 (previously received) is returned, not an error.
When using this feature, the solution designer must satisfy
themselves that it is necessary, and that the id scheme used to
the messages will result in unique ids that do not clash unless
they are truly duplicate messages. A general approach to this
would be to generate a suitable type 5 UUID based on a suitable
namespace UUID and a string which is a concatentation of the data
that uniquely identifies the message.
Some entities represent data held to be used by other systems which do not support the data hub's versioning functionality. This can be useful for entities used for reporting, or, for example, Elasticsearch indexes.
These exports can be maintained through export mode. In this, all
records are presented to the data store class using the export()
method. Records are presented to the data store as if they were
inserts or deletes.
The export capability is controlled by three properties.
| export |
Set to true to indicate that this is an
export entity. |
| exportIdentifier |
Where the data is being exported to an keyed
structure (such as Elasticsearch), specifies how the
identifier for the record should be built. The value depends
on the dataStore class. For Elasticsearch, it identifies a
field that should be used for the document id. |
| exportStandardFields |
Set to true to indicate that the standard
fields should be exported. In most cases, these are not
relevant. If these are added, the export entity may be
accessible by other functions (such as Query). Defaults to
false. |
Not all data store classes support export functionality. (At time of writing, only the ElasticSearch data store classes supports this.)
It generally does not make sense for export entities to be link
targets or to have children. If exportStandardFields is true, it
may (depending on the external structure) be possible to use an
external entity in this way, provided that the data store supports
the appropriate accesses.