diff --git a/README.md b/README.md index a43e9de..4bcd487 100644 --- a/README.md +++ b/README.md @@ -72,10 +72,111 @@ This implies two important facts: On the other hand, `anaxi` will never delete anything on destination in any situation. Be careful when runnning full-refreshes, since you will need to decided if you want to remove data from destination or are happy having duplicate records there. -- More-than-once delivery -- deletes -- missing intermediate steps due to live mode -- same-container-name in different databases +### Deletes + +The Cosmos DB change feed does not report on deletes in any way. Unless the documents in the container somehow signal soft-deletion, we are uncapable of propagating deletes in the Postgres database. + +### More-than-once delivery + +Due to some quirks around the way the Cosmos DB change feed works and how we keep checkpoints, it is possible for a specific document to be synced more than once in Postgres, which would lead to duplicate records of it in the Postgres landing table. This is not a massive issue since deduplication can be worked on destination through unique IDs and timestamps, but should nevertheless be a well-known fact across the consumers of this data on destination. + +If you don't really care about why this is the case, you can skip the rest of this section. If you are curious about why this happens, read along: + +- Cosmos DB attaches a timestamp to each document whenever it gets created or updated. The field is the one named `_ts`. The timestamp is a UNIX epoch style timestamp with second resolution. We leverage this timestamp to maintain our checkpoints for each stream: everytime `anaxi` writes documents to the target postgres, it uses the highest seen timestamp as the new checkpoint, thus making the next sync take over from it. +- The Python client for Cosmos DB allows to read a container's change feed from some point in time. So, on every sync, we direct the client to read the feed since the checkpoint. This setting makes the checkpoint __inclusive__. So, if the checkpoint sits at `'2024-08-13T14:53:54+00:00'`, and some document in the database got updated at `'2024-08-13T14:53:54+00:00'`, it will be read and synced. +- Thus, a document might be synced more than once when: + - A sync happens, and the document gets written into destination. The document's timestamp is the highest one in that sync, so it gets used as the new checkpoint. + - A new sync gets triggered, and documents that have a timestamp equal to the checkpoint get synced *again*. + - This behaviour will actually repeat in further syncs until a new change feed event with a higher timestamp appears. + +Note that, for some unknown reason, this only happens to SOME records. Meaning, if checkpoint is at point in time *t*, and there are 10 documents with timestamp being *t*, it can happen that only 7 of them get repeteadly loaded again and again, while the other 3 only got loaded on the first sync. Why? Only Bill Gates might know I guess. + +### Multiple containers with same name + +In trying to keep `anaxi` as simple as possible, we've made the assumption that no two Cosmos DB containers will have the same name, even across different databases. + +If you encounter this situation: that's an issue. You are going to have to modify the structure of the Cosmos DB configuration file, as well as the code that reads and uses it. + +### Missing intermdiate steps due to Live mode + +Cosmos DB databases have different modes for the Change Feed. In databases that run in the *Latest version* mode, not *every* single change in documents will be picked up by `anaxi`. Instead, every time you run a sync, you will receive __the latest state__ of any documents that have been created or documented since the checkpoint you are using. Intermediate states will not be read through the feed. + +You can read more here to understand all the nuances and implications of the different modes: + +## Modeling at destination given more-than-once delivery, only-last-state document tracking and flexible schemas + +The nuances on how Cosmos DB and `anaxi` work pose certain constrains and best practices on how the synced data should be modeled in our DWH. This section provides some guidance. + +### Dealing with more-than-once-delivery + +First, because documents can be delivered more than once, you'll need to run some type of deduplication logic on the destination side. There are many ways to do this, with different trade-offs: + +- If you can assume that the documents have some field that acts as a PK, then you can simply deduplicate by picking the document with the highest timestamp for each ID and considering that the current state of the data. If the data is small, you can probably get away with running this frequently. If the table grows huge, you might need to put some incremental approach in place. This approach is the one I recommend. +- If you can assume that the documents have some field that acts as a PK and also that documents are immutable and never get updated, you can simply make sure to only get one record per PK and discard any other instances that have the same PK. + +If this problem becomes truly terrible, there are other ways of solving it: from running recurrent de-dup jobs at destination that delete duplicates, to modifying the way `anaxi` works to ensure that it never writes a document more than once. At this stage, we are choosing not to follow those paths because they are unnecessary complexity for our current needs, but this might change in the future. + +### Normalizing JSONs + +`Anaxi` will deliver the data in JSON format on the destination table in Postgres. This means that, in order to work easily with the data, you probably will want to extract fields from the JSON and turn them into proper relational structures. There is no specific set of instructions, since how to do it depends on what's the structure of the raw documents and what data in them is relevant. + +Just for guidance, here you can see an example document and a query that turns each field into it's own column: + +```json +{ + "id": "5eb63ec7-5dd6-4009-8baa-63475a35fdf3", + "Name": "John Galt", + "PersonId": "JG", + "FavouriteColors": [ + "Black", + "Yellow" + ], + "_lsn": 14, + "_rid": "4RdcAJr+EzwBAAAAAAAAAA==", + "_etag": "\"ed01d0ff-0000-1100-0000-66b234870000\"", + "_self": "dbs/4RdcAA==/colls/4RdcAJr+Ezw=/docs/4RdcAJr+EzwBAAAAAAAAAA==/", + "_ts": 1722954887, + "_attachments": "attachments/" +} +``` + +```sql +SELECT + documents->>'id' AS id, + documents->>'_ts' AS timestamp, + documents->>'Name' AS name, + documents->>'_lsn' AS lsn, + documents->>'_rid' AS rid, + documents->>'_etag' AS etag, + documents->>'_self' AS self, + documents->>'PersonId' AS person_id, + documents->>'_attachments' AS attachments, + documents->'FavouriteColors' AS favourite_colors +FROM + dwh.sync_cosmos_test.test_container; +``` + +### Dealing with changing schemas + +Cosmos DB containers lack schema enforcement, and each document can have a different structure. This means that the different JSON documents that land on the destination table in postgres can have different structures. + +This doesn't need to be an issue if managed properly. The ideal scenario is to have a clear schema catalogue maintained by the owners of the Cosmos DB container at hand. If the different versions of the documents schema are documented there, along with some pointers to the cutoff dates where the schemas changed, you can simply write different SQL for each version and tie everything together. + +```sql +SELECT + +FROM + my_json_documents +WHERE + +UNION ALL +SELECT + +FROM + my_json_documents +WHERE + +``` ## Development