data-anaxi/README.md
2024-08-19 14:16:03 +02:00

15 KiB

Anaxi

Anaxi is Superhog's tool to perform Extract-Load (EL) syncs between our multiple Cosmos DB databases and our DWH.

How to use the tool

Note: the app has only been used so far in a Linux environment. Windows support is dubious.

Install

  • Ensure you have Python 3.10>= and poetry installed.
  • Run poetry install to install dependencies.
  • Activate the project's virtual environment. You can use poetry shell.
  • Test that everything is working by running anaxi smoke-test. You should see a happy pig.

Set up credentials

anaxi needs a few configs and secrets to run.

Regarding Cosmos DB databases: anaxi expects to find a file called cosmos-db.yml in the path ~/.anaxi/cosmos-db.yml. The file should specify one or more Cosmos DB databases, along with the required secrets to interact with them. You can check the example file in this repo named example-cosmos-db.yml to understand how to build this file. Once you've done that, you can check if any database is reachable with the cosmos-db-healthcheck command. See more in the General Usage section below.

For Postgres databases: anaxi expects to find a file called postgres.yml in the path ~/.anaxi/postgres.yml. The file should specify one or more Postgres databases, along with the required secrets to interact with them. You can check the example file in this repo named example-postgres.yml to understand how to build this file. Once you've done that, you can check if any database is reachable with the postgres-healthcheck command. See more in the General Usage section below.

Target database preparation

anaxi assumes that the destination Postgres database and schema are already present before executing, and will not try to create them if they can't be found. Instead, things will simply fail. You are expected to take care of creating them.

anaxi will use the source container name as the name for the destination table. So, if you are reading from a container named oranges, anaxi will move the documents into a table name oranges. The table will be created for you on the first run, so you don't need to take care of that.

Set up streams

anaxi works through streams. A stream is the link between one specific Cosmos DB container in some database, and a table in a Postgres database. Once you configure a stream properly, you can use anaxi to move the data from the container to the table. Each time you do that, you are syncing.

Syncs are incremental in nature. anaxi keeps track of what's the most up to date timestamp it has seen in the last run for the stream, and will only bring over Cosmos DB documents created or edited since then. We call this stored point in time the checkpoint. When you run a sync, anaxi will increase the checkpoint as it sees more data. Even if an error happens in the middle of a job, anaxi will keep track of the checkpoint up to which it can be fully confident that the data has been delivered to the target table.

Different streams are independent from each other. Their runs won't affect them in anyway.

To set up a stream, anaxi expects to find a file called streams.yml in the path ~/.anaxi/streams.yml. You can check the example file in this repo named example-streams.yml to understand how to build this file. Each entry in the file represents one stream. The cosmos_database_id field and postgres_database field in each stream entry should be filled in with values that you have informed in the cosmos-db.yml and postgres.yml files, respectively. The cutoff_timestamp field allows you to specify a timestamp (ISO 8601) that should be used as the first date to read data from if no checkpoint is available. You can leave it empty to read all records from the start of the container history.

Also, you will need to create a folder named checkpoints in the path ~/.anaxi/checkpoints. The state of the checkpoints for each stream will be kept there in different files.

Once you have configured the streams.yml file, you can use anaxi to execute the syncs. See more details in the next section.

Calling anaxi

You can run a healthcheck against any Cosmos DB database like this:

anaxi cosmos-db-healthcheck --cosmos-db-id <your-db-id>

You can run a healthcheck against Postgres databases like this:

anaxi postgres-healthcheck --postgres-database <your-db-name>

To run a sync job, you can use:

anaxi sync-stream --stream-id <your-stream-name>

Deploying for Superhog infra

anaxi is simply a Python CLI app coupled with some configs stored in the file system. It can be run in many different ways. This section will provide some guidance in deploying it for the specific needs we have as of today.

  • Setup

    • Prepare a Linux VM.
    • Follow the instructions from the previous sections of this readme to get the tool ready to run.
  • Scheduling

    • Up next, schedule the execution in the Linux VM to fit your needs.
    • Specifics are up to you and your circumstances.
    • A general pattern would be to create a little bash script that calls the tool with the right parameters on it. You can find an example that I like in the root of this repo named run_anaxi.sh, but that's opinionated and adjusted to my needs at the time of writing this. Adapt it to your environment or start from scratch if necessary. The script is designed to be placed in~/run_anaxi.sh.
    • The script is designed to send both success and failure messages to slack channels upon completion. To properly set this up, you will need to place a file called slack_webhook_urls.txt on the same path you drop run_anaxi.sh. The file should have two lines: SLACK_ALERT_WEBHOOK_URL=<url-of-webhook-for-failures> and SLACK_RECEIPT_WEBHOOK_URL=<url-of-webhook-for-successful-runs>. Setting up the slack channels and webhooks is outside of the scope of this readme.
    • Create a cron entry with crontab -e that runs the script. For example: 0 2 * * * /bin/bash /home/azureuser/run_anaxi.sh to run syncs every day at 2AM.
    • If you want to run syncs at different frequencies, you can make different copies of run_anaxi.sh and schedule them independently.
  • Backfilling and first runs

    • When running the first sync for a stream, anaxi will by default start reading records since the specified cutoff_timestamp date.
    • If the value is not provided, anaxi will read the container's full history from the very beginning.
    highest_synced_timestamp: '2024-08-16T9:02:23+00:00'
    

The internal logs of anaxi will be found in ~/.anaxi/anaxi.log. The logs of the script will be at ~/anaxi_run.log.

Relevant internals and implementation details

Tracking checkpoints

anaxi keeps track of the most recent timestamp it has committed (the checkpoint) for each stream to keep syncs incremental. Checkpoints are stored in ~/.anaxi/checkpoints/, with one file dedicated to each stream. The files are named as the stream they track (stream some-stream will have its checkpoint stored at ~/.anaxi/checkpoints/some-stream.yml). The timestamp is stored in UTC.

This implies two important facts:

  • If the checkpoint file gets deleted, altered, corrupted, or whatever dramatic event happens to it, the checkpoint will be lost.
  • You can modify the file to manipulate the behaviour of the next sync for any given stream. For example, if you want to run a full-refresh, you can simply delete the file.

On the other hand, anaxi will never delete anything on destination in any situation. Be careful when runnning full-refreshes, since you will need to decide if you want to remove data from destination or if you are happy having duplicate records there.

Deletes

The Cosmos DB change feed does not report on deletes in any way. Unless the documents in the container somehow signal soft-deletion, we are uncapable of propagating deletes in the Postgres database.

More-than-once delivery

Due to some quirks around the way the Cosmos DB change feed works and how we keep checkpoints, it is possible for a specific document to be synced more than once in Postgres, which would lead to duplicate records of it in the Postgres landing table. This is not a massive issue since deduplication can be worked on destination through unique IDs and timestamps, but should nevertheless be a well-known fact across the consumers of this data on destination.

If you don't really care about why this is the case, you can skip the rest of this section. If you are curious about why this happens, read along:

  • Cosmos DB attaches a timestamp to each document whenever it gets created or updated. The field is the one named _ts. The timestamp is a UNIX epoch style timestamp with second resolution. We leverage this timestamp to maintain our checkpoints for each stream: everytime anaxi writes documents to the target postgres, it uses the highest seen timestamp as the new checkpoint, thus making the next sync take over from it.
  • The Python client for Cosmos DB allows to read a container's change feed from some point in time. So, on every sync, we direct the client to read the feed since the checkpoint. This setting makes the checkpoint inclusive. So, if the checkpoint sits at '2024-08-13T14:53:54+00:00', and some document in the database got updated at '2024-08-13T14:53:54+00:00', it will be read and synced.
  • Thus, a document might be synced more than once when:
    • A sync happens, and the document gets written into destination. The document's timestamp is the highest one in that sync, so it gets used as the new checkpoint.
    • A new sync gets triggered, and documents that have a timestamp equal to the checkpoint get synced again.
    • This behaviour will actually repeat in further syncs until a new change feed event with a higher timestamp appears.

Note that, for some unknown reason, this only happens to SOME records. Meaning, if checkpoint is at point in time t, and there are 10 documents with timestamp being t, it can happen that only 7 of them get repeatedly loaded again and again, while the other 3 only got loaded on the first sync. Why? Only Bill Gates might know I guess.

Multiple containers with same name

In trying to keep anaxi as simple as possible, we've made the assumption that no two Cosmos DB containers will have the same name, even across different databases.

If you encounter this situation: that's an issue. You are going to have to modify the structure of the Cosmos DB configuration file, as well as the code that reads and uses it.

Missing intermdiate steps due to Live mode

Cosmos DB databases have different modes for the Change Feed. In databases that run in the Latest version mode, not every single change in documents will be picked up by anaxi. Instead, every time you run a sync, you will receive the latest state of any document that have been created or documented since the checkpoint you are using. Intermediate states will not be read through the feed.

You can read more here to understand all the nuances and implications of the different modes: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/change-feed-modes

Modeling at destination given more-than-once delivery, only-last-state document tracking and flexible schemas

The nuances on how Cosmos DB and anaxi work pose certain constrains and best practices on how the synced data should be modeled in our DWH. This section provides some guidance.

Dealing with more-than-once-delivery

First, because documents can be delivered more than once, you'll need to run some type of deduplication logic on the destination side. There are many ways to do this, with different trade-offs:

  • If you can assume that the documents have some field that acts as a PK, then you can simply deduplicate by picking the document with the highest timestamp for each ID and considering that the current state of the data. If the data is small, you can probably get away with running this frequently. If the table grows huge, you might need to put some incremental approach in place. This approach is the one I recommend.
  • If you can assume that the documents have some field that acts as a PK and also that documents are immutable and never get updated, you can simply make sure to only get one record per PK and discard any other instances that have the same PK.

If this problem becomes truly terrible, there are other ways of solving it: from running recurrent de-dup jobs at destination that delete duplicates, to modifying the way anaxi works to ensure that it never writes a document more than once. At this stage, we are choosing not to follow those paths because they are unnecessary complexity for our current needs, but this might change in the future.

Normalizing JSONs

Anaxi will deliver the data in JSON format on the destination table in Postgres. This means that, in order to work easily with the data, you probably will want to extract fields from the JSON and turn them into proper relational structures. There is no specific set of instructions, since how to do it depends on what's the structure of the raw documents and what data in them is relevant.

Just for guidance, here you can see an example document and a query that turns each field into it's own column:

{
  "id": "5eb63ec7-5dd6-4009-8baa-63475a35fdf3",
  "Name": "John Galt",
  "PersonId": "JG",
  "FavouriteColors": [
    "Black",
    "Yellow"
  ],
  "_lsn": 14,
  "_rid": "4RdcAJr+EzwBAAAAAAAAAA==",
  "_etag": "\"ed01d0ff-0000-1100-0000-66b234870000\"",
  "_self": "dbs/4RdcAA==/colls/4RdcAJr+Ezw=/docs/4RdcAJr+EzwBAAAAAAAAAA==/",
  "_ts": 1722954887,
  "_attachments": "attachments/"
}
SELECT
    documents->>'id' AS id,
    documents->>'_ts' AS timestamp,
    documents->>'Name' AS name,
    documents->>'_lsn' AS lsn,
    documents->>'_rid' AS rid,
    documents->>'_etag' AS etag,
    documents->>'_self' AS self,
    documents->>'PersonId' AS person_id,
    documents->>'_attachments' AS attachments,
    documents->'FavouriteColors' AS favourite_colors
FROM
    dwh.sync_cosmos_test.test_container;

Dealing with changing schemas

Cosmos DB containers lack schema enforcement, and each document can have a different structure. This means that the different JSON documents that land on the destination table in postgres can have different structures.

This doesn't need to be an issue if managed properly. The ideal scenario is to have a clear schema catalogue maintained by the owners of the Cosmos DB container at hand. If the different versions of the documents schema are documented there, along with some pointers to the cutoff dates where the schemas changed, you can simply write different SQL for each version and tie everything together.

SELECT
  <the-right-stuff-for-v1>
FROM 
  my_json_documents
WHERE
  <condition-to-get-only-v1-documents>
UNION ALL
SELECT
  <the-right-stuff-for-v2>
FROM 
  my_json_documents
WHERE
  <condition-to-get-only-v2-documents>

Development

Local Cosmos DB

Microsoft provides tools to run a local emulator of Cosmos DB. The bad news is we have been unable to make it work so far, it always breaks for one reason or another.

You can find instructions here:

What's with the name

Anaxi is short for Anaximander. Anaximander of Miletus was a pre-Socratic Greek philosopher who lived in Miletus. He is often called the "Father of Cosmology" and founder of astronomy.