polars_utils#

dbsnaplake.polars_utils.write_parquet_to_s3(df: DataFrame, s3path: S3Path, s3_client: S3Client, polars_write_parquet_kwargs: Optional[Dict[str, Any]] = None, s3pathlib_write_bytes_kwargs: Optional[Dict[str, Any]] = None) Tuple[int, int, str][source]#

Write polars dataframe to AWS S3 as a parquet file.

The original polars.write_parquet method doesn’t work with moto.

Parameters:
Returns:

A tuple of three values: - The number of bytes written to S3, i.e., the size of the parquet file. - The number of records in the DataFrame. - The ETag of the S3 object.

dbsnaplake.polars_utils.write_data_file(df: DataFrame, s3path: S3Path, s3_client: S3Client, polars_write_parquet_kwargs: Optional[Dict[str, Any]] = None, s3pathlib_write_bytes_kwargs: Optional[Dict[str, Any]] = None) Tuple[int, int, str][source]#

Write the DataFrame to the given S3Path as a Parquet file, also attach additional information related to the Snapshot Data File.

It is a wrapper of the write_parquet_to_s3 function, make the final code shorter.

Returns:

A tuple of three values: - The number of bytes written to S3, i.e., the size of the parquet file. - The number of records in the DataFrame. - The ETag of the S3 object.

dbsnaplake.polars_utils.group_by_partition(df: DataFrame, s3dir: S3Path, filename: str, partition_keys: List[str], sort_by: List[str]) List[Tuple[DataFrame, S3Path]][source]#

Group dataframe by partition keys and locate the S3 location for each partition.

Parameters:
  • dfpolars.DataFrame object.

  • s3dirs3pathlib.S3Path object, the root directory of the S3 location.

  • filename – filename of the parquet file. for example: “data.parquet”.

  • partition_keys – list of partition keys. for example: [“year”, “month”].

  • sort_by – list of columns to sort by. for example: [“create_time”]. use empty list if no sorting is needed.