Implemented the tasks.

This commit is contained in:
Pablo Martin 2023-04-21 12:22:32 +02:00
parent 885c30184f
commit 8f1f3d75e1

View file

@ -1,5 +1,8 @@
import json import json
from typing import Any
import prefect
from prefect import task
class S3FileReader: class S3FileReader:
""" """
@ -22,3 +25,38 @@ class S3FileReader:
.read() .read()
.decode("utf-8") .decode("utf-8")
) )
@task()
def begin_sql_transaction(connection: Any) -> None:
"""
Start a SQL transaction in the passed connection. The task is agnostic to
the SQL engine being used. As long as it implements a begin() method, this
will work.
:param connection: the connection to some database.
:return: None
"""
logger = prefect.context.get("logger")
logger.info(f"Starting SQL transaction with connection: {connection}.")
connection.begin()
@task()
def end_sql_transaction(connection: Any, dry_run: bool = False) -> None:
"""
Finish a SQL transaction, either by rolling it back or by committing it.
:param connection: the connection to some database.
:param dry_run: a flag indicating if persistence is desired. If dry_run
is True, changes will be rollbacked.
:return: None
"""
logger = prefect.context.get("logger")
logger.info(f"Using connection: {connection}.")
if dry_run:
connection.rollback()
logger.info("Dry-run mode activated. Rolling back the transaction.")
else:
logger.info("Committing the transaction.")
connection.commit()