project#

DB Snapshot to Data Lake Workflow Management.

This module provides a set of functions and a class to manage the workflow of exporting database snapshots to a data lake. It offers both functional and object-oriented programming approaches to suit different user needs and preferences.

Functional API:

The module includes step-by-step functions that can be used independently, allowing for greater flexibility and customization of the workflow. These functions cover various stages of the process, from planning the snapshot export to executing the final data lake ingestion.

Key functions include:

  • step_1_1_plan_snapshot_to_staging(): Plan the division of DB snapshot files.

  • step_1_2_get_snapshot_to_staging_todo_list(): Retrieve the list of snapshot groups to process.

  • step_1_3_process_db_snapshot_file_group_manifest_file(): Process individual snapshot groups.

  • step_2_1_plan_staging_to_datalake(): Plan the merging of staging files into the data lake.

  • step_2_2_get_staging_to_datalake_todo_list(): Get the list of staging file groups to process.

  • step_2_3_process_partition_file_group_manifest_file(): Execute the compaction of staging files.

Object-Oriented API:

The module also provides a Project class that encapsulates the entire workflow. This class-based approach simplifies usage for those who prefer a more streamlined, less customizable process. Users can initialize a Project instance with all necessary parameters and then execute each step of the workflow using class methods.

Key methods of the Project class include:

  • Project.step_1_1_plan_snapshot_to_staging()

  • Project.step_1_2_process_db_snapshot_file_group_manifest_file()

  • Project.step_2_1_plan_staging_to_datalake()

  • Project.step_2_2_process_partition_file_group_manifest_file()

The functional API offers more flexibility for advanced users who need to extend or customize the workflow, while the class-based API provides a simpler interface for users who want to quickly implement the standard workflow with minimal setup.

This module is designed to be part of a larger data processing ecosystem, integrating with other components for S3 interactions, manifest file handling, and data transformations.

dbsnaplake.project.print_manifest_file_info(manifest_file: ManifestFile, logger)[source]#

A helper function to print the summary of a manifest file.

class dbsnaplake.project.Project(s3_client: S3Client, s3uri_db_snapshot_manifest_summary: str, s3uri_staging: str, s3uri_datalake: str, target_db_snapshot_file_group_size: int, extract_record_id: Optional[DerivedColumn], extract_create_time: Optional[DerivedColumn], extract_update_time: Optional[DerivedColumn], extract_partition_keys: Optional[List[DerivedColumn]], sort_by: List[str], descending: List[bool], target_parquet_file_size: int, count_on_column: Optional[str], tracker_table_name: str, aws_region: str, use_case_id: str)[source]#

Manages the workflow for converting database snapshots to a data lake format.

This class encapsulates the entire process of exporting database snapshots, transforming them, and ingesting them into a data lake. It provides methods for each step of the workflow and manages the state using DynamoDB tables.

Parameters:
  • s3_client – Initialized boto3 S3 client for S3 operations.

  • s3uri_db_snapshot_manifest_summary – S3 URI of the DB snapshot manifest summary.

  • s3uri_staging – S3 URI for storing intermediate staging data.

  • s3uri_datalake – S3 URI for the final data lake storage.

  • target_db_snapshot_file_group_size – Target size for DB snapshot file groups.

  • extract_record_id – Logic for extracting record IDs.

  • extract_create_time – Logic for extracting creation timestamps.

  • extract_update_time – Logic for extracting update timestamps.

  • extract_partition_keys[DerivedColumn]) – Logic for extracting partition keys.

  • sort_by – Column names to sort by before writing to parquet.

  • descending – Corresponding sort orders (True for descending).

  • target_parquet_file_size – Target size for output parquet files.

  • tracker_table_name – Name of the DynamoDB table for tracking tasks.

  • aws_region – AWS region for the DynamoDB tracker table.

  • use_case_id – Unique identifier for this specific use case.

Methods

  • connect_dynamodb(): Initializes connections to DynamoDB tables for task tracking.

  • step_1_1_plan_snapshot_to_staging(): Plans the division of DB snapshot files.

  • step_1_2_process_db_snapshot_file_group_manifest_file(): Processes snapshot groups.

  • step_2_1_plan_staging_to_datalake(): Plans the merging of staging files.

  • step_2_2_process_partition_file_group_manifest_file(): Executes file compaction.

property s3_loc: S3Location#

Access the S3Location object for this project.

property db_snapshot_manifest_file: DBSnapshotManifestFile#

Access the DBSnapshotManifestFile object for this project.

batch_read_snapshot_data_file(db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile, **kwargs) DataFrame[source]#

You have to override this method and implement the logic to read the snapshot data file into a Polars DataFrame.

property task_model_step_0_prepare_db_snapshot_manifest: Type[T_TASK]#

Access the DynamoDB Status Tracking ORM model for step_0.

Note

This property is created only once and has to be cached.

property task_model_step_1_1_plan_snapshot_to_staging: Type[T_TASK]#

Access the DynamoDB Status Tracking ORM model for step_1_1.

Note

This property is created only once and has to be cached.

property task_model_step_1_2_process_db_snapshot_file_group_manifest_file: Type[T_TASK]#

Access the DynamoDB Status Tracking ORM model for step_1_2.

Note

This property is created only once and has to be cached.

property task_model_step_2_1_plan_staging_to_datalake: Type[T_TASK]#

Access the DynamoDB Status Tracking ORM model for step_2_1.

Note

This property is created only once and has to be cached.

property task_model_step_2_2_process_partition_file_group_manifest_file: Type[T_TASK]#

Access the DynamoDB Status Tracking ORM model for step_2_2.

Note

This property is created only once and has to be cached.

connect_dynamodb(bsm: BotoSesManager)[source]#

Configure the DynamoDB ORM python library to use the right AWS credential.