polars_utils#

Polars utilities.

dbsnaplake.polars_utils.configure_s3_write_options(df: DataFrame, polars_writer: Writer, gzip_compress: bool, s3pathlib_write_bytes_kwargs: Dict[str, Any]) str[source]#

Configure S3 write options based on the polars writer.

This function sets up the necessary metadata and content-related parameters for writing a Polars DataFrame to S3. It determines the appropriate file extension and configures compression settings based on the writer format and user preferences.

Parameters:
  • df – The Polars DataFrame to be written.

  • polars_writer – The Polars writer object specifying the output format.

  • gzip_compress – Whether to apply gzip compression (where applicable).

  • s3pathlib_write_bytes_kwargs – Dictionary of keyword arguments for S3 write operation, to be modified in-place.

Returns:

The appropriate file extension for the configured write operation.

dbsnaplake.polars_utils.configure_s3path(s3dir: Optional[S3Path] = None, fname: Optional[str] = None, ext: Optional[str] = None, s3path: Optional[S3Path] = None)[source]#

Configure and return an S3Path object for file operations.

This function allows flexible specification of an S3 path. It can either construct a path from individual components (directory, filename, and extension) or use a pre-configured S3Path object.

Parameters:
  • s3dir – The S3 directory path. Required if s3path is not provided.

  • fname – The filename without extension. Required if s3path is not provided. for example, if the full file name is “data.csv”, then fname is “data”.

  • ext – The file extension, including the dot (e.g., ‘.csv’). Required if s3path is not provided.

  • s3path – A pre-configured S3Path object. If provided, other arguments are ignored.

:return The configured S3Path object representing the full file path in S3.

dbsnaplake.polars_utils.write_to_s3(df: DataFrame, s3_client: S3Client, polars_writer: Writer, gzip_compress: bool = False, s3pathlib_write_bytes_kwargs: Optional[Dict[str, Any]] = None, s3dir: Optional[S3Path] = None, fname: Optional[str] = None, s3path: Optional[S3Path] = None) Tuple[S3Path, int, str][source]#

Write the DataFrame to the given S3Path object, also attach additional information related to the dataframe.

The original polars.write_parquet method doesn’t work with moto, so we use buffer to store the parquet file and then write it to S3.

Parameters:
Returns:

A tuple of three values: - The S3Path object representing the full file path in S3. - The number of bytes written to S3, i.e., the size of the parquet file. - The ETag of the S3 object.

dbsnaplake.polars_utils.read_parquet_from_s3(s3path: S3Path, s3_client: S3Client, polars_read_parquet_kwargs: Optional[Dict[str, Any]] = None, s3pathlib_read_bytes_kwargs: Optional[Dict[str, Any]] = None) DataFrame[source]#

Read parquet file from S3.

Parameters:
Returns:

polars.DataFrame object.

dbsnaplake.polars_utils.read_many_parquet_from_s3(s3path_list: List[S3Path], s3_client: S3Client, polars_read_parquet_kwargs: Optional[Dict[str, Any]] = None, s3pathlib_read_bytes_kwargs: Optional[Dict[str, Any]] = None) DataFrame[source]#

Read many parquet files from S3 and concatenate them.

Parameters:
Returns:

polars.DataFrame object.

dbsnaplake.polars_utils.group_by_partition(df: DataFrame, s3dir: S3Path, filename: str, partition_keys: List[str], sort_by: Optional[List[str]] = None, descending: Union[bool, List[bool]] = False) List[Tuple[DataFrame, S3Path]][source]#

Group dataframe by partition keys and locate the S3 location for each partition.

Parameters:
  • dfpolars.DataFrame object.

  • s3dirs3pathlib.S3Path object, the root directory of the S3 location.

  • filename – filename of the parquet file. for example: “data.parquet”.

  • partition_keys – list of partition keys. for example: [“year”, “month”].

  • sort_by – list of columns to sort by. for example: [“create_time”]. use empty list or None if no sorting is needed.

  • descending – list of boolean values to indicate the sorting order. for example: [True] or [False, True].