Source code for pyspark.sql.functions.partitioning

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A collections of partitioning functions
"""

import sys
from typing import (
    TYPE_CHECKING,
    Union,
)

from pyspark.errors import PySparkTypeError
from pyspark.sql.column import Column
from pyspark.sql.functions.builtin import _invoke_function_over_columns, _invoke_function
from pyspark.sql.utils import (
    try_partitioning_remote_functions as _try_partitioning_remote_functions,
    get_active_spark_context as _get_active_spark_context,
)

if TYPE_CHECKING:
    from pyspark.sql._typing import ColumnOrName


[docs]@_try_partitioning_remote_functions def years(col: "ColumnOrName") -> Column: """ Partition transform function: A transform for timestamps and dates to partition data into years. .. versionadded:: 4.0.0 Parameters ---------- col : :class:`~pyspark.sql.Column` or str target date or timestamp column to work on. Returns ------- :class:`~pyspark.sql.Column` data partitioned by years. Examples -------- >>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP ... partitioning.years("ts") ... ).createOrReplace() Notes ----- This function can be used only in combination with :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy` method of the `DataFrameWriterV2`. """ return _invoke_function_over_columns("years", col)
[docs]@_try_partitioning_remote_functions def months(col: "ColumnOrName") -> Column: """ Partition transform function: A transform for timestamps and dates to partition data into months. .. versionadded:: 4.0.0 Parameters ---------- col : :class:`~pyspark.sql.Column` or str target date or timestamp column to work on. Returns ------- :class:`~pyspark.sql.Column` data partitioned by months. Examples -------- >>> df.writeTo("catalog.db.table").partitionedBy( ... partitioning.months("ts") ... ).createOrReplace() # doctest: +SKIP Notes ----- This function can be used only in combination with :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy` method of the `DataFrameWriterV2`. """ return _invoke_function_over_columns("months", col)
[docs]@_try_partitioning_remote_functions def days(col: "ColumnOrName") -> Column: """ Partition transform function: A transform for timestamps and dates to partition data into days. .. versionadded:: 4.0.0 Parameters ---------- col : :class:`~pyspark.sql.Column` or str target date or timestamp column to work on. Returns ------- :class:`~pyspark.sql.Column` data partitioned by days. Examples -------- >>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP ... partitioning.days("ts") ... ).createOrReplace() Notes ----- This function can be used only in combination with :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy` method of the `DataFrameWriterV2`. """ return _invoke_function_over_columns("days", col)
[docs]@_try_partitioning_remote_functions def hours(col: "ColumnOrName") -> Column: """ Partition transform function: A transform for timestamps to partition data into hours. .. versionadded:: 4.0.0 Parameters ---------- col : :class:`~pyspark.sql.Column` or str target date or timestamp column to work on. Returns ------- :class:`~pyspark.sql.Column` data partitioned by hours. Examples -------- >>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP ... partitioning.hours("ts") ... ).createOrReplace() Notes ----- This function can be used only in combination with :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy` method of the `DataFrameWriterV2`. """ return _invoke_function_over_columns("hours", col)
[docs]@_try_partitioning_remote_functions def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column: """ Partition transform function: A transform for any type that partitions by a hash of the input column. .. versionadded:: 4.0.0 Examples -------- >>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP ... partitioning.bucket(42, "ts") ... ).createOrReplace() Parameters ---------- numBuckets : :class:`~pyspark.sql.Column` or int the number of buckets col : :class:`~pyspark.sql.Column` or str target date or timestamp column to work on. Returns ------- :class:`~pyspark.sql.Column` data partitioned by given columns. Notes ----- This function can be used only in combination with :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy` method of the `DataFrameWriterV2`. """ from pyspark.sql.classic.column import _to_java_column, _create_column_from_literal if not isinstance(numBuckets, (int, Column)): raise PySparkTypeError( errorClass="NOT_COLUMN_OR_INT", messageParameters={ "arg_name": "numBuckets", "arg_type": type(numBuckets).__name__, }, ) _get_active_spark_context() numBuckets = ( _create_column_from_literal(numBuckets) if isinstance(numBuckets, int) else _to_java_column(numBuckets) ) return _invoke_function("bucket", numBuckets, _to_java_column(col))
def _test() -> None: import doctest from pyspark.sql import SparkSession import pyspark.sql.functions.partitioning globs = pyspark.sql.functions.partitioning.__dict__.copy() spark = ( SparkSession.builder.master("local[4]") .appName("sql.functions.partitioning tests") .getOrCreate() ) globs["spark"] = spark (failure_count, test_count) = doctest.testmod( pyspark.sql.functions.partitioning, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE, ) spark.stop() if failure_count: sys.exit(-1) if __name__ == "__main__": _test()