staging_to_datalake#

This module defines the abstraction of the transformation process from Staging Data File to Final Datalake.

dbsnaplake.staging_to_datalake.extract_s3_directory(s3uri_col_name: str, s3dir_col_name: str) Expr[source]#

Generate a Polars expression to extract the directory part of S3 URIs.

Parameters:
  • s3uri_col_name – Name of the column containing S3 URIs.

  • s3dir_col_name – Name for the new column to store S3 directories.

Returns:

Polars expression for extracting S3 directories.

Example:

If “s3uri_col_name” contains s3://bucket/path/to/file, the resulting “s3dir_col_name” will contain s3://bucket/path/to/.

class dbsnaplake.staging_to_datalake.PartitionFileGroupManifestFile(uri: str, uri_summary: str, data_file_list: ~typing.List[~s3manifesto.typehint.T_DATA_FILE] = <factory>, size: ~typing.Optional[int] = None, n_record: ~typing.Optional[int] = None, fingerprint: ~typing.Optional[str] = None, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#

Represents a group of files across many partitions in the staging area.

This class extends ManifestFile to provide specific functionality for handling partition file groups during the compaction process.

classmethod plan_partition_compaction(s3_loc: S3Location, s3_client: S3Client, target_size: int = 128000000)[source]#

Plan the compaction of partition file groups.

This method reads staging file group manifests, groups them by partition, and creates new partition file group manifests for compaction.

Parameters:
  • s3_loc – S3 location information.

  • s3_client – Boto3 S3 client.

  • target_size – Target size for compacted files. Default is 128 MB.

Returns:

List of planned partition file group manifests.

classmethod read_all_groups(s3_loc: S3Location, s3_client: S3Client) List[PartitionFileGroupManifestFile][source]#

Read all partition file group manifest files from the specified S3 location.

Parameters:
  • s3_loc – S3 location information.

  • s3_client – Boto3 S3 client.

Returns:

List of all file group manifest files.

dbsnaplake.staging_to_datalake.process_partition_file_group_manifest_file(partition_file_group_manifest_file: ~dbsnaplake.staging_to_datalake.PartitionFileGroupManifestFile, s3_client: S3Client, s3_loc: ~dbsnaplake.s3_loc.S3Location, sort_by: ~typing.Optional[~typing.List[str]] = None, descending: ~typing.Optional[~typing.List[bool]] = None, polars_write_parquet_kwargs: ~typing.Optional[~typing.Dict[str, ~typing.Any]] = None, s3pathlib_write_bytes_kwargs: ~typing.Optional[~typing.Dict[str, ~typing.Any]] = None, logger=<dbsnaplake.logger.DummyLogger object>) S3Path[source]#

Execute the compaction process for a partition file group.

This function reads all data files in the manifest, (optional) sorts them by the specified fields, and writes a single compacted file to the final data lake location.

Parameters:
  • partition_file_group_manifest_file – Manifest file for the partition group.

  • s3_client – Boto3 S3 client.

  • s3_loc – S3 location information.

  • sort_by – List of column names to sort by.

  • descending – List of boolean values to specify descending order.

  • update_at_col – Name of the column used for sorting and updating.

  • polars_write_parquet_kwargs – Custom keyword arguments for Polars’ write_parquet method.

  • s3pathlib_write_bytes_kwargs – Custom keyword arguments for S3Path’s write_bytes method.

  • logger – logger for logging operations.

Returns:

S3 path of the compacted file in the data lake.