snapshot_to_staging#

This module defines the abstraction and implementation of the transformation process from Database Snapshot Data Files to Staging Data Files. It includes classes and functions for managing manifest files, processing data groups, and handling data transformations using Polars DataFrames.

Key components:

See also

s3manifesto

class dbsnaplake.snapshot_to_staging.DBSnapshotManifestFile(uri: str, uri_summary: str, data_file_list: ~typing.List[~s3manifesto.typehint.T_DATA_FILE] = <factory>, size: ~typing.Optional[int] = None, n_record: ~typing.Optional[int] = None, fingerprint: ~typing.Optional[str] = None, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#

Represents the full list of data files from the Database snapshot. We will have only one DB snapshot manifest file in the data pipeline. And this is where the data pipeline starts.

This class extends ManifestFile to provide specific functionality for handling database snapshot manifest files.

split_into_groups(s3_loc: S3Location, s3_client: S3Client, target_size: int = 100000000) T.List['DBSnapshotFileGroupManifestFile'][source]#

Split the full list of data files into groups of approximately equal size.

Parameters:
  • s3_loc – S3 location information.

  • s3_client – Boto3 S3 client.

  • target_size – Target size for each group in bytes. Default is 100 MB.

Returns:

List of file group manifest files.

class dbsnaplake.snapshot_to_staging.DBSnapshotFileGroupManifestFile(uri: str, uri_summary: str, data_file_list: ~typing.List[~s3manifesto.typehint.T_DATA_FILE] = <factory>, size: ~typing.Optional[int] = None, n_record: ~typing.Optional[int] = None, fingerprint: ~typing.Optional[str] = None, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#

This class is a subgroup of DBSnapshotManifestFile, created by breaking down a larger snapshot into more manageable units.

This class extends ManifestFile to provide specific functionality for handling groups of snapshot files.

classmethod read_all_groups(s3_loc: S3Location, s3_client: S3Client) T.List['DBSnapshotFileGroupManifestFile'][source]#

Read all snapshot file group manifest files from the specified S3 location.

Parameters:
  • s3_loc – S3 location information.

  • s3_client – Boto3 S3 client.

Returns:

List of all file group manifest files.

dbsnaplake.snapshot_to_staging.batch_read_snapshot_data_file(db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile, **kwargs) DataFrame[source]#

Loads multiple snapshot data files into an in-memory Polars DataFrame and applies custom transformations.

This user-defined function serves as a central point for data processing and transformation in the pipeline. It allows for various operations such as data validation, column manipulation (add/rename/drop), joining with external data sources, data type casting, filtering, and aggregation.

Parameters:

db_snapshot_file_group_manifest_fileDBSnapshotFileGroupManifestFile Object containing references to the snapshot data files to be processed.

Returns:

A Polars DataFrame containing the processed snapshot data.

Below is an example to read data from multiple NDJson files and align them:

>>> import polars as pl
>>> def batch_read_snapshot_data_file(
...     db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile,
...     **kwargs,
... ) -> pl.DataFrame:
...     sub_df_list = list()
...     for data_file in db_snapshot_file_group_manifest_file.data_file_list:
...         sub_df = pl.read_ndjson(data_file["uri"])
...         # arbitrary transofmration logic here
...         sub_df = sub_df.with_columns(pl.col("OrderId").alias("record_id"))
...         sub_df_list.append(sub_df)
...     df = pl.concat(sub_df_list)
...     return df

See also

class dbsnaplake.snapshot_to_staging.StagingFileGroupManifestFile(uri: str, uri_summary: str, data_file_list: ~typing.List[~s3manifesto.typehint.T_DATA_FILE] = <factory>, size: ~typing.Optional[int] = None, n_record: ~typing.Optional[int] = None, fingerprint: ~typing.Optional[str] = None, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#

Represents a group of staging files derived from a single DBSnapshotFileGroupManifestFile.

This class manages the output of the partitioning process applied to a DBSnapshotFileGroupManifestFile. It stores references to multiple data files, each corresponding to a specific partition.

Key characteristics:

  1. One-to-one relationship: Each DBSnapshotFileGroupManifestFile

    generates exactly one StagingFileGroupManifestFile.

  2. Partitioning: The original snapshot data is divided into multiple partitions.

  3. File generation: One data file is created for each partition.

  4. Storage: This manifest file stores the list of all generated data files.

The StagingFileGroupManifestFile serves as an index or catalog for the partitioned and processed data, facilitating efficient data retrieval and management in subsequent data lake operations.

dbsnaplake.snapshot_to_staging.process_db_snapshot_file_group_manifest_file(db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile, s3_client: S3Client, s3_loc: S3Location, batch_read_snapshot_data_file_func: T_BatchReadSnapshotDataFileCallable, partition_keys: T.List[str], sort_by: T.Optional[T.List[str]] = None, descending: T.Union[bool, T.List[bool]] = False, s3pathlib_write_bytes_kwargs: T_OPTIONAL_KWARGS = None, logger=<dbsnaplake.logger.DummyLogger object>) StagingFileGroupManifestFile[source]#

Transform Snapshot Data Files to Staging Data Files, based on partition keys.

This function processes a group of snapshot files, derives necessary columns, partitions the data if needed, and writes the results to S3 as staging files.

Parameters:
  • db_snapshot_file_group_manifest_file – Manifest file for the snapshot group.

  • df – DataFrame containing the snapshot data.

  • s3_client – Boto3 S3 client.

  • s3_loc – S3 location information.

  • batch_read_snapshot_data_file_func

  • partition_keys – partition keys, if you don’t have partition keys, then set it to an empty list.

  • sort_by – list of columns to sort by. for example: [“create_time”]. use empty list or None if no sorting is needed.

  • descending – list of boolean values to indicate the sorting order. for example: [True] or [False, True].

  • polars_write_parquet_kwargs – Custom keyword arguments for Polars’ write_parquet method. Default is dict(compression="snappy").

  • s3pathlib_write_bytes_kwargs – Custom keyword arguments for S3Path’s write_bytes method.

  • logger – Logger object for logging operations.

Returns:

single StagingFileGroupManifestFile object