snapshot_to_staging#

This module defines the abstraction and implementation of the transformation process from Database Snapshot Data Files to Staging Data Files. It includes classes and functions for managing manifest files, processing data groups, and handling data transformations using Polars DataFrames.

Key components:

See also

s3manifesto

class dbsnaplake.snapshot_to_staging.DBSnapshotManifestFile(uri: str, uri_summary: str, data_file_list: ~typing.List[~s3manifesto.typehint.T_DATA_FILE] = <factory>, size: ~typing.Optional[int] = None, n_record: ~typing.Optional[int] = None, fingerprint: ~typing.Optional[str] = None, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#

Represents the full list of data files from the Database snapshot. We will have only one DB snapshot manifest file in the data pipeline. And this is where the data pipeline starts.

This class extends ManifestFile to provide specific functionality for handling database snapshot manifest files.

split_into_groups(s3_loc: S3Location, s3_client: S3Client, target_size: int = 100000000) T.List['DBSnapshotFileGroupManifestFile'][source]#

Split the full list of data files into groups of approximately equal size.

Parameters:
  • s3_loc – S3 location information.

  • s3_client – Boto3 S3 client.

  • target_size – Target size for each group in bytes. Default is 100 MB.

Returns:

List of file group manifest files.

class dbsnaplake.snapshot_to_staging.DBSnapshotFileGroupManifestFile(uri: str, uri_summary: str, data_file_list: ~typing.List[~s3manifesto.typehint.T_DATA_FILE] = <factory>, size: ~typing.Optional[int] = None, n_record: ~typing.Optional[int] = None, fingerprint: ~typing.Optional[str] = None, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#

This class is a subgroup of DBSnapshotManifestFile, created by breaking down a larger snapshot into more manageable units.

This class extends ManifestFile to provide specific functionality for handling groups of snapshot files.

classmethod read_all_groups(s3_loc: S3Location, s3_client: S3Client) T.List['DBSnapshotFileGroupManifestFile'][source]#

Read all snapshot file group manifest files from the specified S3 location.

Parameters:
  • s3_loc – S3 location information.

  • s3_client – Boto3 S3 client.

Returns:

List of all file group manifest files.

dbsnaplake.snapshot_to_staging.batch_read_snapshot_data_file(db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile, **kwargs) DataFrame[source]#

Loads multiple snapshot data files into an in-memory Polars DataFrame and applies custom transformations.

This user-defined function serves as a central point for data processing and transformation in the pipeline. It allows for various operations such as data validation, column manipulation (add/rename/drop), joining with external data sources, data type casting, filtering, and aggregation.

Parameters:

db_snapshot_file_group_manifest_fileDBSnapshotFileGroupManifestFile Object containing references to the snapshot data files to be processed.

Returns:

A Polars DataFrame containing the processed snapshot data.

Below is an example to read data from multiple NDJson files and align them:

>>> import polars as pl
>>> def batch_read_snapshot_data_file(
...     db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile,
...     **kwargs,
... ) -> pl.DataFrame:
...     sub_df_list = list()
...     for data_file in db_snapshot_file_group_manifest_file.data_file_list:
...         sub_df = pl.read_ndjson(data_file["uri"])
...         # arbitrary transofmration logic here
...         sub_df = sub_df.with_columns(pl.col("OrderId").alias("record_id"))
...         sub_df_list.append(sub_df)
...     df = pl.concat(sub_df_list)
...     return df

See also

class dbsnaplake.snapshot_to_staging.DerivedColumn(extractor: Union[Expr, str], alias: str)[source]#

Declares how to derive a new column from the DataFrame.

Parameters:
  • extractor – A Polars expression or column name to derive the new column. If it is a polars expression, then it will be used to derive the new column. If it is a string, then use the given column, note that this column has to exist.

  • alias – The name for the derived column.

dbsnaplake.snapshot_to_staging.add_derived_columns(df: DataFrame, derived_columns: List[DerivedColumn]) DataFrame[source]#

Add new columns to the DataFrame based on the given derived_columns specifications.

Returns:

DataFrame with new columns added.

Example

>>> import polars as pl
>>> df = pl.DataFrame({"id": ["id-1", "id-2", "id-3"]})
>>> derived_columns = [
...     DerivedColumn(
...         extractor="id",
...         alias="record_id_1",
...     ),
...     DerivedColumn(
...         extractor=pl.col("id").str.split().list.last(),
...         alias="record_id_2",
...     ),
... ]
>>> add_derived_columns(df, derived_columns)
┌──────┬─────────────┬─────────────┐
│ id   ┆ record_id_1 ┆ record_id_2 │
│ ---  ┆ ---         ┆ ---         │
│ str  ┆ str         ┆ str         │
╞══════╪═════════════╪═════════════╡
│ id-1 ┆ id-1        ┆ 1           │
│ id-2 ┆ id-2        ┆ 2           │
│ id-3 ┆ id-3        ┆ 3           │
└──────┴─────────────┴─────────────┘
class dbsnaplake.snapshot_to_staging.StagingFileGroupManifestFile(uri: str, uri_summary: str, data_file_list: ~typing.List[~s3manifesto.typehint.T_DATA_FILE] = <factory>, size: ~typing.Optional[int] = None, n_record: ~typing.Optional[int] = None, fingerprint: ~typing.Optional[str] = None, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#

Represents a group of staging files derived from a single DBSnapshotFileGroupManifestFile.

This class manages the output of the partitioning process applied to a DBSnapshotFileGroupManifestFile. It stores references to multiple data files, each corresponding to a specific partition.

Key characteristics:

  1. One-to-one relationship: Each DBSnapshotFileGroupManifestFile

    generates exactly one StagingFileGroupManifestFile.

  2. Partitioning: The original snapshot data is divided into multiple partitions.

  3. File generation: One data file is created for each partition.

  4. Storage: This manifest file stores the list of all generated data files.

The StagingFileGroupManifestFile serves as an index or catalog for the partitioned and processed data, facilitating efficient data retrieval and management in subsequent data lake operations.

dbsnaplake.snapshot_to_staging.process_db_snapshot_file_group_manifest_file(db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile, s3_client: S3Client, s3_loc: S3Location, batch_read_snapshot_data_file_func: T_BatchReadSnapshotDataFileCallable, extract_record_id: T.Optional[DerivedColumn] = None, extract_create_time: T.Optional[DerivedColumn] = None, extract_update_time: T.Optional[DerivedColumn] = None, extract_partition_keys: T.Optional[T.List[DerivedColumn]] = None, polars_write_parquet_kwargs: T_OPTIONAL_KWARGS = None, s3pathlib_write_bytes_kwargs: T_OPTIONAL_KWARGS = None, logger=<dbsnaplake.logger.DummyLogger object>) StagingFileGroupManifestFile[source]#

Transform Snapshot Data Files to Staging Data Files, based on partition keys.

This function processes a group of snapshot files, derives necessary columns, partitions the data if needed, and writes the results to S3 as staging files.

Parameters:
  • db_snapshot_file_group_manifest_file – Manifest file for the snapshot group.

  • df – DataFrame containing the snapshot data.

  • s3_client – Boto3 S3 client.

  • s3_loc – S3 location information.

  • batch_read_snapshot_data_file_func

  • extract_record_id – (optional) Specification for deriving record ID.

  • extract_create_time – (optional) Specification for deriving creation time.

  • extract_update_time – (optional) Specification for deriving update time.

  • extract_partition_keys – (optional) Specifications for deriving partition keys.

  • polars_write_parquet_kwargs – Custom keyword arguments for Polars’ write_parquet method. Default is dict(compression="snappy").

  • s3pathlib_write_bytes_kwargs – Custom keyword arguments for S3Path’s write_bytes method.

  • logger – Logger object for logging operations.

Returns:

single StagingFileGroupManifestFile object