snapshot_to_staging#
This module defines the abstraction and implementation of the transformation process from Database Snapshot Data Files to Staging Data Files. It includes classes and functions for managing manifest files, processing data groups, and handling data transformations using Polars DataFrames.
Key components:
DBSnapshotManifestFile: Represents the full list of data files from the Database snapshot.DBSnapshotFileGroupManifestFile: Represents a group of snapshot files.StagingFileGroupManifestFile: Represents a group of staging files.DerivedColumn: Defines how to derive new columns from the DataFrame.Various utility functions for data processing and S3 interactions.
See also
- class dbsnaplake.snapshot_to_staging.DBSnapshotManifestFile(uri: str, uri_summary: str, data_file_list: ~typing.List[~s3manifesto.typehint.T_DATA_FILE] = <factory>, size: ~typing.Optional[int] = None, n_record: ~typing.Optional[int] = None, fingerprint: ~typing.Optional[str] = None, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#
Represents the full list of data files from the Database snapshot. We will have only one DB snapshot manifest file in the data pipeline. And this is where the data pipeline starts.
This class extends ManifestFile to provide specific functionality for handling database snapshot manifest files.
- split_into_groups(s3_loc: S3Location, s3_client: S3Client, target_size: int = 100000000) T.List['DBSnapshotFileGroupManifestFile'][source]#
Split the full list of data files into groups of approximately equal size.
- Parameters:
s3_loc – S3 location information.
s3_client – Boto3 S3 client.
target_size – Target size for each group in bytes. Default is 100 MB.
- Returns:
List of file group manifest files.
See also
- class dbsnaplake.snapshot_to_staging.DBSnapshotFileGroupManifestFile(uri: str, uri_summary: str, data_file_list: ~typing.List[~s3manifesto.typehint.T_DATA_FILE] = <factory>, size: ~typing.Optional[int] = None, n_record: ~typing.Optional[int] = None, fingerprint: ~typing.Optional[str] = None, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#
This class is a subgroup of
DBSnapshotManifestFile, created by breaking down a larger snapshot into more manageable units.This class extends ManifestFile to provide specific functionality for handling groups of snapshot files.
- classmethod read_all_groups(s3_loc: S3Location, s3_client: S3Client) T.List['DBSnapshotFileGroupManifestFile'][source]#
Read all snapshot file group manifest files from the specified S3 location.
- Parameters:
s3_loc – S3 location information.
s3_client – Boto3 S3 client.
- Returns:
List of all file group manifest files.
- dbsnaplake.snapshot_to_staging.batch_read_snapshot_data_file(db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile, **kwargs) DataFrame[source]#
Loads multiple snapshot data files into an in-memory Polars DataFrame and applies custom transformations.
This user-defined function serves as a central point for data processing and transformation in the pipeline. It allows for various operations such as data validation, column manipulation (add/rename/drop), joining with external data sources, data type casting, filtering, and aggregation.
- Parameters:
db_snapshot_file_group_manifest_file –
DBSnapshotFileGroupManifestFileObject containing references to the snapshot data files to be processed.- Returns:
A Polars DataFrame containing the processed snapshot data.
Below is an example to read data from multiple NDJson files and align them:
>>> import polars as pl >>> def batch_read_snapshot_data_file( ... db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile, ... **kwargs, ... ) -> pl.DataFrame: ... sub_df_list = list() ... for data_file in db_snapshot_file_group_manifest_file.data_file_list: ... sub_df = pl.read_ndjson(data_file["uri"]) ... # arbitrary transofmration logic here ... sub_df = sub_df.with_columns(pl.col("OrderId").alias("record_id")) ... sub_df_list.append(sub_df) ... df = pl.concat(sub_df_list) ... return df
See also
Polars Input/Output Documentation: For reading data from different file formats and cloud storage.
process_db_snapshot_file_group_manifest_file(): Related processing function.
- class dbsnaplake.snapshot_to_staging.DerivedColumn(extractor: Union[Expr, str], alias: str)[source]#
Declares how to derive a new column from the DataFrame.
- Parameters:
extractor – A Polars expression or column name to derive the new column. If it is a polars expression, then it will be used to derive the new column. If it is a string, then use the given column, note that this column has to exist.
alias – The name for the derived column.
- dbsnaplake.snapshot_to_staging.add_derived_columns(df: DataFrame, derived_columns: List[DerivedColumn]) DataFrame[source]#
Add new columns to the DataFrame based on the given
derived_columnsspecifications.- Returns:
DataFrame with new columns added.
Example
>>> import polars as pl >>> df = pl.DataFrame({"id": ["id-1", "id-2", "id-3"]}) >>> derived_columns = [ ... DerivedColumn( ... extractor="id", ... alias="record_id_1", ... ), ... DerivedColumn( ... extractor=pl.col("id").str.split().list.last(), ... alias="record_id_2", ... ), ... ] >>> add_derived_columns(df, derived_columns) ┌──────┬─────────────┬─────────────┐ │ id ┆ record_id_1 ┆ record_id_2 │ │ --- ┆ --- ┆ --- │ │ str ┆ str ┆ str │ ╞══════╪═════════════╪═════════════╡ │ id-1 ┆ id-1 ┆ 1 │ │ id-2 ┆ id-2 ┆ 2 │ │ id-3 ┆ id-3 ┆ 3 │ └──────┴─────────────┴─────────────┘
- class dbsnaplake.snapshot_to_staging.StagingFileGroupManifestFile(uri: str, uri_summary: str, data_file_list: ~typing.List[~s3manifesto.typehint.T_DATA_FILE] = <factory>, size: ~typing.Optional[int] = None, n_record: ~typing.Optional[int] = None, fingerprint: ~typing.Optional[str] = None, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#
Represents a group of staging files derived from a single
DBSnapshotFileGroupManifestFile.This class manages the output of the partitioning process applied to a
DBSnapshotFileGroupManifestFile. It stores references to multiple data files, each corresponding to a specific partition.Key characteristics:
- One-to-one relationship: Each
DBSnapshotFileGroupManifestFile generates exactly one
StagingFileGroupManifestFile.
- One-to-one relationship: Each
Partitioning: The original snapshot data is divided into multiple partitions.
File generation: One data file is created for each partition.
Storage: This manifest file stores the list of all generated data files.
The
StagingFileGroupManifestFileserves as an index or catalog for the partitioned and processed data, facilitating efficient data retrieval and management in subsequent data lake operations.
- dbsnaplake.snapshot_to_staging.process_db_snapshot_file_group_manifest_file(db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile, s3_client: S3Client, s3_loc: S3Location, batch_read_snapshot_data_file_func: T_BatchReadSnapshotDataFileCallable, extract_record_id: T.Optional[DerivedColumn] = None, extract_create_time: T.Optional[DerivedColumn] = None, extract_update_time: T.Optional[DerivedColumn] = None, extract_partition_keys: T.Optional[T.List[DerivedColumn]] = None, polars_write_parquet_kwargs: T_OPTIONAL_KWARGS = None, s3pathlib_write_bytes_kwargs: T_OPTIONAL_KWARGS = None, logger=<dbsnaplake.logger.DummyLogger object>) StagingFileGroupManifestFile[source]#
Transform Snapshot Data Files to Staging Data Files, based on partition keys.
This function processes a group of snapshot files, derives necessary columns, partitions the data if needed, and writes the results to S3 as staging files.
- Parameters:
db_snapshot_file_group_manifest_file – Manifest file for the snapshot group.
df – DataFrame containing the snapshot data.
s3_client – Boto3 S3 client.
s3_loc – S3 location information.
batch_read_snapshot_data_file_func –
extract_record_id – (optional) Specification for deriving record ID.
extract_create_time – (optional) Specification for deriving creation time.
extract_update_time – (optional) Specification for deriving update time.
extract_partition_keys – (optional) Specifications for deriving partition keys.
polars_write_parquet_kwargs – Custom keyword arguments for Polars’ write_parquet method. Default is
dict(compression="snappy").s3pathlib_write_bytes_kwargs – Custom keyword arguments for S3Path’s write_bytes method.
logger – Logger object for logging operations.
- Returns:
single
StagingFileGroupManifestFileobject