polars_utils#
Polars utilities.
- dbsnaplake.polars_utils.configure_s3_write_options(df: DataFrame, polars_writer: Writer, gzip_compress: bool, s3pathlib_write_bytes_kwargs: Dict[str, Any]) str[source]#
Configure S3 write options based on the polars writer.
This function sets up the necessary metadata and content-related parameters for writing a Polars DataFrame to S3. It determines the appropriate file extension and configures compression settings based on the writer format and user preferences.
- Parameters:
df – The Polars DataFrame to be written.
polars_writer – The Polars writer object specifying the output format.
gzip_compress – Whether to apply gzip compression (where applicable).
s3pathlib_write_bytes_kwargs – Dictionary of keyword arguments for S3 write operation, to be modified in-place.
- Returns:
The appropriate file extension for the configured write operation.
- dbsnaplake.polars_utils.configure_s3path(s3dir: Optional[S3Path] = None, fname: Optional[str] = None, ext: Optional[str] = None, s3path: Optional[S3Path] = None)[source]#
Configure and return an S3Path object for file operations.
This function allows flexible specification of an S3 path. It can either construct a path from individual components (directory, filename, and extension) or use a pre-configured S3Path object.
- Parameters:
s3dir – The S3 directory path. Required if s3path is not provided.
fname – The filename without extension. Required if s3path is not provided. for example, if the full file name is “data.csv”, then fname is “data”.
ext – The file extension, including the dot (e.g., ‘.csv’). Required if s3path is not provided.
s3path – A pre-configured S3Path object. If provided, other arguments are ignored.
:return The configured S3Path object representing the full file path in S3.
- dbsnaplake.polars_utils.write_to_s3(df: DataFrame, s3_client: S3Client, polars_writer: Writer, gzip_compress: bool = False, s3pathlib_write_bytes_kwargs: Optional[Dict[str, Any]] = None, s3dir: Optional[S3Path] = None, fname: Optional[str] = None, s3path: Optional[S3Path] = None) Tuple[S3Path, int, str][source]#
Write the DataFrame to the given S3Path object, also attach additional information related to the dataframe.
The original
polars.write_parquetmethod doesn’t work with moto, so we use buffer to store the parquet file and then write it to S3.- Parameters:
df –
polars.DataFrameobject.s3_client –
boto3.client("s3")object.polars_writer – polars_writer.api.Writer object.
gzip_compress – Flag to enable GZIP compression.
s3pathlib_write_bytes_kwargs – Keyword arguments for
s3path.write_bytesmethod. See https://s3pathlib.readthedocs.io/en/latest/s3pathlib/core/rw.html#s3pathlib.core.rw.ReadAndWriteAPIMixin.write_bytess3dir – The S3 directory path. Required if s3path is not provided.
fname – The filename without extension. Required if s3path is not provided. for example, if the full file name is “data.csv”, then fname is “data”.
s3path – A pre-configured S3Path object. If provided, other arguments are ignored.
- Returns:
A tuple of three values: - The S3Path object representing the full file path in S3. - The number of bytes written to S3, i.e., the size of the parquet file. - The ETag of the S3 object.
- dbsnaplake.polars_utils.read_parquet_from_s3(s3path: S3Path, s3_client: S3Client, polars_read_parquet_kwargs: Optional[Dict[str, Any]] = None, s3pathlib_read_bytes_kwargs: Optional[Dict[str, Any]] = None) DataFrame[source]#
Read parquet file from S3.
- Parameters:
s3path –
s3pathlib.S3Pathobject.s3_client –
boto3.client("s3")object.polars_read_parquet_kwargs – Keyword arguments for
polars.read_parquetmethod. See https://docs.pola.rs/api/python/stable/reference/api/polars.read_parquet.htmls3pathlib_read_bytes_kwargs – Keyword arguments for
s3path.read_bytesmethod. See https://s3pathlib.readthedocs.io/en/latest/s3pathlib/core/rw.html#s3pathlib.core.rw.ReadAndWriteAPIMixin.read_bytes
- Returns:
polars.DataFrameobject.
- dbsnaplake.polars_utils.read_many_parquet_from_s3(s3path_list: List[S3Path], s3_client: S3Client, polars_read_parquet_kwargs: Optional[Dict[str, Any]] = None, s3pathlib_read_bytes_kwargs: Optional[Dict[str, Any]] = None) DataFrame[source]#
Read many parquet files from S3 and concatenate them.
- Parameters:
s3path_list – list of
s3pathlib.S3Pathobject.s3_client –
boto3.client("s3")object.polars_read_parquet_kwargs – Keyword arguments for
polars.read_parquetmethod. See https://docs.pola.rs/api/python/stable/reference/api/polars.read_parquet.htmls3pathlib_read_bytes_kwargs – Keyword arguments for
s3path.read_bytesmethod. See https://s3pathlib.readthedocs.io/en/latest/s3pathlib/core/rw.html#s3pathlib.core.rw.ReadAndWriteAPIMixin.read_bytes
- Returns:
polars.DataFrameobject.
- dbsnaplake.polars_utils.group_by_partition(df: DataFrame, s3dir: S3Path, filename: str, partition_keys: List[str], sort_by: Optional[List[str]] = None, descending: Union[bool, List[bool]] = False) List[Tuple[DataFrame, S3Path]][source]#
Group dataframe by partition keys and locate the S3 location for each partition.
- Parameters:
df –
polars.DataFrameobject.s3dir –
s3pathlib.S3Pathobject, the root directory of the S3 location.filename – filename of the parquet file. for example: “data.parquet”.
partition_keys – list of partition keys. for example: [“year”, “month”].
sort_by – list of columns to sort by. for example: [“create_time”]. use empty list or None if no sorting is needed.
descending – list of boolean values to indicate the sorting order. for example: [True] or [False, True].