Source code for pyspark.sql.functions.partitioning
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
A collections of partitioning functions
"""
import sys
from typing import (
TYPE_CHECKING,
Union,
)
from pyspark.errors import PySparkTypeError
from pyspark.sql.column import Column
from pyspark.sql.functions.builtin import _invoke_function_over_columns, _invoke_function
from pyspark.sql.utils import (
try_partitioning_remote_functions as _try_partitioning_remote_functions,
get_active_spark_context as _get_active_spark_context,
)
if TYPE_CHECKING:
from pyspark.sql._typing import ColumnOrName
[docs]@_try_partitioning_remote_functions
def years(col: "ColumnOrName") -> Column:
"""
Partition transform function: A transform for timestamps and dates
to partition data into years.
.. versionadded:: 4.0.0
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
target date or timestamp column to work on.
Returns
-------
:class:`~pyspark.sql.Column`
data partitioned by years.
Examples
--------
>>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP
... partitioning.years("ts")
... ).createOrReplace()
Notes
-----
This function can be used only in combination with
:py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
method of the `DataFrameWriterV2`.
"""
return _invoke_function_over_columns("years", col)
[docs]@_try_partitioning_remote_functions
def months(col: "ColumnOrName") -> Column:
"""
Partition transform function: A transform for timestamps and dates
to partition data into months.
.. versionadded:: 4.0.0
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
target date or timestamp column to work on.
Returns
-------
:class:`~pyspark.sql.Column`
data partitioned by months.
Examples
--------
>>> df.writeTo("catalog.db.table").partitionedBy(
... partitioning.months("ts")
... ).createOrReplace() # doctest: +SKIP
Notes
-----
This function can be used only in combination with
:py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
method of the `DataFrameWriterV2`.
"""
return _invoke_function_over_columns("months", col)
[docs]@_try_partitioning_remote_functions
def days(col: "ColumnOrName") -> Column:
"""
Partition transform function: A transform for timestamps and dates
to partition data into days.
.. versionadded:: 4.0.0
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
target date or timestamp column to work on.
Returns
-------
:class:`~pyspark.sql.Column`
data partitioned by days.
Examples
--------
>>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP
... partitioning.days("ts")
... ).createOrReplace()
Notes
-----
This function can be used only in combination with
:py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
method of the `DataFrameWriterV2`.
"""
return _invoke_function_over_columns("days", col)
[docs]@_try_partitioning_remote_functions
def hours(col: "ColumnOrName") -> Column:
"""
Partition transform function: A transform for timestamps
to partition data into hours.
.. versionadded:: 4.0.0
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
target date or timestamp column to work on.
Returns
-------
:class:`~pyspark.sql.Column`
data partitioned by hours.
Examples
--------
>>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP
... partitioning.hours("ts")
... ).createOrReplace()
Notes
-----
This function can be used only in combination with
:py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
method of the `DataFrameWriterV2`.
"""
return _invoke_function_over_columns("hours", col)
[docs]@_try_partitioning_remote_functions
def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
"""
Partition transform function: A transform for any type that partitions
by a hash of the input column.
.. versionadded:: 4.0.0
Examples
--------
>>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP
... partitioning.bucket(42, "ts")
... ).createOrReplace()
Parameters
----------
numBuckets : :class:`~pyspark.sql.Column` or int
the number of buckets
col : :class:`~pyspark.sql.Column` or str
target date or timestamp column to work on.
Returns
-------
:class:`~pyspark.sql.Column`
data partitioned by given columns.
Notes
-----
This function can be used only in combination with
:py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
method of the `DataFrameWriterV2`.
"""
from pyspark.sql.classic.column import _to_java_column, _create_column_from_literal
if not isinstance(numBuckets, (int, Column)):
raise PySparkTypeError(
errorClass="NOT_COLUMN_OR_INT",
messageParameters={
"arg_name": "numBuckets",
"arg_type": type(numBuckets).__name__,
},
)
_get_active_spark_context()
numBuckets = (
_create_column_from_literal(numBuckets)
if isinstance(numBuckets, int)
else _to_java_column(numBuckets)
)
return _invoke_function("bucket", numBuckets, _to_java_column(col))
def _test() -> None:
import doctest
from pyspark.sql import SparkSession
import pyspark.sql.functions.partitioning
globs = pyspark.sql.functions.partitioning.__dict__.copy()
spark = (
SparkSession.builder.master("local[4]")
.appName("sql.functions.partitioning tests")
.getOrCreate()
)
globs["spark"] = spark
(failure_count, test_count) = doctest.testmod(
pyspark.sql.functions.partitioning,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)
if __name__ == "__main__":
_test()