staging_to_datalake#
This module defines the abstraction of the transformation process from Staging Data File to Final Datalake.
- dbsnaplake.staging_to_datalake.extract_s3_directory(s3uri_col_name: str, s3dir_col_name: str) Expr[source]#
Generate a Polars expression to extract the directory part of S3 URIs.
- Parameters:
s3uri_col_name – Name of the column containing S3 URIs.
s3dir_col_name – Name for the new column to store S3 directories.
- Returns:
Polars expression for extracting S3 directories.
- Example:
If “s3uri_col_name” contains
s3://bucket/path/to/file, the resulting “s3dir_col_name” will contains3://bucket/path/to/.
- class dbsnaplake.staging_to_datalake.PartitionFileGroupManifestFile(uri: str, uri_summary: str, data_file_list: ~typing.List[~s3manifesto.typehint.T_DATA_FILE] = <factory>, size: ~typing.Optional[int] = None, n_record: ~typing.Optional[int] = None, fingerprint: ~typing.Optional[str] = None, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#
Represents a group of files across many partitions in the staging area.
This class extends ManifestFile to provide specific functionality for handling partition file groups during the compaction process.
- classmethod plan_partition_compaction(s3_loc: S3Location, s3_client: S3Client, target_size: int = 128000000)[source]#
Plan the compaction of partition file groups.
This method reads staging file group manifests, groups them by partition, and creates new partition file group manifests for compaction.
- Parameters:
s3_loc – S3 location information.
s3_client – Boto3 S3 client.
target_size – Target size for compacted files. Default is 128 MB.
- Returns:
List of planned partition file group manifests.
- classmethod read_all_groups(s3_loc: S3Location, s3_client: S3Client) List[PartitionFileGroupManifestFile][source]#
Read all partition file group manifest files from the specified S3 location.
- Parameters:
s3_loc – S3 location information.
s3_client – Boto3 S3 client.
- Returns:
List of all file group manifest files.
- dbsnaplake.staging_to_datalake.process_partition_file_group_manifest_file(partition_file_group_manifest_file: ~dbsnaplake.staging_to_datalake.PartitionFileGroupManifestFile, s3_client: S3Client, s3_loc: ~dbsnaplake.s3_loc.S3Location, sort_by: ~typing.Optional[~typing.List[str]] = None, descending: ~typing.Optional[~typing.List[bool]] = None, polars_write_parquet_kwargs: ~typing.Optional[~typing.Dict[str, ~typing.Any]] = None, s3pathlib_write_bytes_kwargs: ~typing.Optional[~typing.Dict[str, ~typing.Any]] = None, logger=<dbsnaplake.logger.DummyLogger object>) S3Path[source]#
Execute the compaction process for a partition file group.
This function reads all data files in the manifest, (optional) sorts them by the specified fields, and writes a single compacted file to the final data lake location.
- Parameters:
partition_file_group_manifest_file – Manifest file for the partition group.
s3_client – Boto3 S3 client.
s3_loc – S3 location information.
sort_by – List of column names to sort by.
descending – List of boolean values to specify descending order.
update_at_col – Name of the column used for sorting and updating.
polars_write_parquet_kwargs – Custom keyword arguments for Polars’ write_parquet method.
s3pathlib_write_bytes_kwargs – Custom keyword arguments for S3Path’s write_bytes method.
logger – logger for logging operations.
- Returns:
S3 path of the compacted file in the data lake.