# -*- coding: utf-8 -*-
"""
This module defines the abstraction and implementation of the transformation process
from Database Snapshot Data Files to Staging Data Files. It includes classes and
functions for managing manifest files, processing data groups, and handling data
transformations using Polars DataFrames.
Key components:
- :class:`DBSnapshotManifestFile`: Represents the full list of data files from the Database snapshot.
- :class:`DBSnapshotFileGroupManifestFile`: Represents a group of snapshot files.
- :class:`StagingFileGroupManifestFile`: Represents a group of staging files.
- :class:`DerivedColumn`: Defines how to derive new columns from the DataFrame.
- Various utility functions for data processing and S3 interactions.
.. seealso::
`s3manifesto <https://s3manifesto.readthedocs.io/en/latest/>`_
"""
from __future__ import annotations
import typing as T
import dataclasses
import polars as pl
try:
import pyarrow.parquet as pq
except ImportError: # pragma: no cover
pass
from s3manifesto.api import KeyEnum, ManifestFile
from polars_writer.api import Writer
from .typehint import T_OPTIONAL_KWARGS
from .s3_loc import S3Location
from .polars_utils import write_to_s3
from .polars_utils import group_by_partition
from .logger import dummy_logger
if T.TYPE_CHECKING: # pragma: no cover
from mypy_boto3_s3.client import S3Client
[docs]@dataclasses.dataclass
class DBSnapshotManifestFile(ManifestFile):
"""
Represents the full list of data files from the Database snapshot. We will
have only one DB snapshot manifest file in the data pipeline. And this is
where the data pipeline starts.
This class extends
`ManifestFile <https://s3manifesto.readthedocs.io/en/latest/s3manifesto/manifest.html#module-s3manifesto.manifest>`_
to provide specific functionality for handling database snapshot manifest files.
"""
[docs] def split_into_groups(
self,
s3_loc: S3Location,
s3_client: "S3Client",
target_size: int = 100 * 1000 * 1000, ## 100 MB
) -> T.List["DBSnapshotFileGroupManifestFile"]:
"""
Split the full list of data files into groups of approximately equal size.
:param s3_loc: S3 location information.
:param s3_client: Boto3 S3 client.
:param target_size: Target size for each group in bytes. Default is 100 MB.
:return: List of file group manifest files.
.. seealso::
:class:`DBSnapshotFileGroupManifestFile`
"""
db_snapshot_file_group_manifest_file_list = list()
_lst = db_snapshot_file_group_manifest_file_list # for shortening the code
for ith, (data_file_list, total_size) in enumerate(
self.group_files_into_tasks_by_size(target_size=target_size),
start=1,
):
db_snapshot_file_group_manifest_file = DBSnapshotFileGroupManifestFile.new(
uri=s3_loc.s3dir_snapshot_file_group_manifest_data.joinpath(
f"manifest-data-{ith}.parquet"
).uri,
uri_summary=s3_loc.s3dir_snapshot_file_group_manifest_summary.joinpath(
f"manifest-summary-{ith}.json"
).uri,
size=total_size,
data_file_list=data_file_list,
calculate=True,
)
db_snapshot_file_group_manifest_file.write(s3_client)
_lst.append(db_snapshot_file_group_manifest_file)
return db_snapshot_file_group_manifest_file_list
[docs]@dataclasses.dataclass
class DBSnapshotFileGroupManifestFile(ManifestFile):
"""
This class is a subgroup of :class:`DBSnapshotManifestFile`, created by
breaking down a larger snapshot into more manageable units.
This class extends
`ManifestFile <https://s3manifesto.readthedocs.io/en/latest/s3manifesto/manifest.html#module-s3manifesto.manifest>`_
to provide specific functionality for handling groups of snapshot files.
.. seealso::
:meth:`DBSnapshotManifestFile.split_into_groups`
"""
[docs] @classmethod
def read_all_groups(
cls,
s3_loc: S3Location,
s3_client: "S3Client",
) -> T.List["DBSnapshotFileGroupManifestFile"]:
"""
Read all snapshot file group manifest files from the specified S3 location.
:param s3_loc: S3 location information.
:param s3_client: Boto3 S3 client.
:returns: List of all file group manifest files.
"""
s3path_list = s3_loc.s3dir_snapshot_file_group_manifest_summary.iter_objects(
bsm=s3_client
).all()
db_snapshot_file_group_manifest_file_list = [
DBSnapshotFileGroupManifestFile.read(
uri_summary=s3path.uri,
s3_client=s3_client,
)
for s3path in s3path_list
]
return db_snapshot_file_group_manifest_file_list
[docs]def batch_read_snapshot_data_file(
db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile,
**kwargs,
) -> pl.DataFrame:
"""
Loads multiple snapshot data files into an in-memory
`Polars DataFrame <https://docs.pola.rs/api/python/stable/reference/dataframe/index.html>`_
and applies custom transformations.
This user-defined function serves as a central point for data processing and
transformation in the pipeline. It allows for various operations such as
data validation, column manipulation (add/rename/drop), joining with external
data sources, data type casting, filtering, and aggregation.
:param db_snapshot_file_group_manifest_file: :class:`DBSnapshotFileGroupManifestFile`
Object containing references to the snapshot data files to be processed.
:return: A Polars DataFrame containing the processed snapshot data.
Below is an example to read data from multiple NDJson files and align them:
>>> import polars as pl
>>> def batch_read_snapshot_data_file(
... db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile,
... **kwargs,
... ) -> pl.DataFrame:
... sub_df_list = list()
... for data_file in db_snapshot_file_group_manifest_file.data_file_list:
... sub_df = pl.read_ndjson(data_file["uri"])
... # arbitrary transofmration logic here
... sub_df = sub_df.with_columns(pl.col("OrderId").alias("record_id"))
... sub_df_list.append(sub_df)
... df = pl.concat(sub_df_list)
... return df
.. seealso::
- `Polars Input/Output Documentation <https://docs.pola.rs/user-guide/io/>`_:
For reading data from different file formats and cloud storage.
- :func:`process_db_snapshot_file_group_manifest_file`: Related processing function.
"""
raise NotImplementedError
T_BatchReadSnapshotDataFileCallable = T.Callable[
[DBSnapshotFileGroupManifestFile, ...], pl.DataFrame
]
[docs]@dataclasses.dataclass
class StagingFileGroupManifestFile(ManifestFile):
"""
Represents a group of staging files derived from a single
:class:`DBSnapshotFileGroupManifestFile`.
This class manages the output of the partitioning process applied to a
:class:`DBSnapshotFileGroupManifestFile`. It stores references to multiple
data files, each corresponding to a specific partition.
Key characteristics:
1. One-to-one relationship: Each :class:`DBSnapshotFileGroupManifestFile`
generates exactly one :class:`StagingFileGroupManifestFile`.
2. Partitioning: The original snapshot data is divided into multiple partitions.
3. File generation: One data file is created for each partition.
4. Storage: This manifest file stores the list of all generated data files.
The :class:`StagingFileGroupManifestFile` serves as an index or catalog for
the partitioned and processed data, facilitating efficient data retrieval and
management in subsequent data lake operations.
.. seealso::
- :class:`DBSnapshotFileGroupManifestFile`
- :func:`process_db_snapshot_file_group_manifest_file`
"""
[docs]def process_db_snapshot_file_group_manifest_file(
db_snapshot_file_group_manifest_file: DBSnapshotFileGroupManifestFile,
s3_client: "S3Client",
s3_loc: S3Location,
batch_read_snapshot_data_file_func: T_BatchReadSnapshotDataFileCallable,
partition_keys: T.List[str],
sort_by: T.Optional[T.List[str]] = None,
descending: T.Union[bool, T.List[bool]] = False,
s3pathlib_write_bytes_kwargs: T_OPTIONAL_KWARGS = None,
logger=dummy_logger,
) -> StagingFileGroupManifestFile:
"""
Transform Snapshot Data Files to Staging Data Files, based on partition keys.
This function processes a group of snapshot files, derives necessary columns,
partitions the data if needed, and writes the results to S3 as staging files.
:param db_snapshot_file_group_manifest_file: Manifest file for the snapshot group.
:param df: DataFrame containing the snapshot data.
:param s3_client: Boto3 S3 client.
:param s3_loc: S3 location information.
:param batch_read_snapshot_data_file_func:
:param partition_keys: partition keys, if you don't have partition keys, then
set it to an empty list.
:param sort_by: list of columns to sort by. for example: ["create_time"].
use empty list or None if no sorting is needed.
:param descending: list of boolean values to indicate the sorting order.
for example: [True] or [False, True].
:param polars_write_parquet_kwargs: Custom keyword arguments for Polars' write_parquet method.
Default is ``dict(compression="snappy")``.
:param s3pathlib_write_bytes_kwargs: Custom keyword arguments for S3Path's write_bytes method.
:param logger: Logger object for logging operations.
:return: single :class:`StagingFileGroupManifestFile` object
"""
# Derive more columns for data lake
logger.info(
"Derive record_id, create_time, update_time, and partition keys columns ..."
)
df = batch_read_snapshot_data_file_func(
db_snapshot_file_group_manifest_file=db_snapshot_file_group_manifest_file,
)
logger.info(f" Dataframe Shape {df.shape}")
# prepare variables for following operations
polars_writer = Writer(
format="parquet",
parquet_compression="snappy",
)
fname = db_snapshot_file_group_manifest_file.fingerprint
basename = f"{fname}.snappy.parquet"
staging_data_file_list = list()
# if we have partition keys, then we group data by partition keys
# and write them to different partition (1 file per partition)
if len(partition_keys):
logger.info("Group data by partition keys ...")
# ----------------------------------------------------------------------
# Method 1, split df into sub_df based on partition keys and
# write them to different partition (1 file per partition)
# ----------------------------------------------------------------------
total_size = 0
total_n_record = 0
results = group_by_partition(
df=df,
s3dir=s3_loc.s3dir_staging_datalake,
filename=basename,
partition_keys=partition_keys,
sort_by=sort_by,
descending=descending,
)
logger.info(f"Will write data to {len(results)} partitions ...")
for ith, (sub_df, s3path) in enumerate(results, start=1):
logger.info(f"Write to {ith}th partition: {s3path.parent.uri}")
logger.info(f" s3uri: {s3path.uri}")
logger.info(f" preview at: {s3path.console_url}")
n_record = sub_df.shape[0]
s3path_new, size, etag = write_to_s3(
df=sub_df,
s3_client=s3_client,
polars_writer=polars_writer,
s3pathlib_write_bytes_kwargs=s3pathlib_write_bytes_kwargs,
s3path=s3path,
)
total_size += size
total_n_record += n_record
staging_data_file = {
KeyEnum.URI: s3path.uri,
KeyEnum.SIZE: size,
KeyEnum.N_RECORD: n_record,
KeyEnum.ETAG: etag,
}
staging_data_file_list.append(staging_data_file)
# ----------------------------------------------------------------------
# Method 2, Use ``pyarrow.parquet.write_to_dataset`` methods
# ----------------------------------------------------------------------
# pq.write_to_dataset(
# df.to_arrow(),
# root_path=s3dir_staging.uri,
# partition_cols=partition_keys,
# )
# if we don't have partition keys, then we write this file to the s3dir_staging
else:
logger.info("We don't have partition keys, write to single file ...")
s3path = s3_loc.s3dir_staging_datalake.joinpath(basename)
logger.info(f"Write to: {s3path.uri}")
logger.info(f" preview at: {s3path.console_url}")
total_n_record = df.shape[0]
s3path_new, total_size, etag = write_to_s3(
df=df,
s3_client=s3_client,
polars_writer=polars_writer,
s3pathlib_write_bytes_kwargs=s3pathlib_write_bytes_kwargs,
s3path=s3path,
)
staging_data_file = {
KeyEnum.URI: s3path.uri,
KeyEnum.SIZE: total_size,
KeyEnum.N_RECORD: df.shape[0],
KeyEnum.ETAG: etag,
}
staging_data_file_list.append(staging_data_file)
staging_file_group_manifest_file = StagingFileGroupManifestFile.new(
uri="",
uri_summary="",
data_file_list=staging_data_file_list,
size=total_size,
n_record=total_n_record,
calculate=True,
)
fingerprint = staging_file_group_manifest_file.fingerprint
s3path_manifest_data = (
s3_loc.s3dir_staging_file_group_manifest_data
/ f"manifest-data-{fingerprint}.parquet"
)
s3path_manifest_summary = (
s3_loc.s3dir_staging_file_group_manifest_summary
/ f"manifest-summary-{fingerprint}.json"
)
staging_file_group_manifest_file.uri = s3path_manifest_data.uri
staging_file_group_manifest_file.uri_summary = s3path_manifest_summary.uri
logger.info("Write generated files information to manifest file ...")
logger.info(f" Write to manifest summary: {s3path_manifest_summary.uri}")
logger.info(f" preview at: {s3path_manifest_summary.console_url}")
logger.info(f" Write to manifest data: {s3path_manifest_data.uri}")
logger.info(f" preview at: {s3path_manifest_data.console_url}")
staging_file_group_manifest_file.write(s3_client=s3_client)
return staging_file_group_manifest_file