#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# mypy: disable-error-code="empty-body"
from typing import TYPE_CHECKING
from pyspark.sql.tvf_argument import TableValuedFunctionArgument
from pyspark.sql.utils import dispatch_table_arg_method
if TYPE_CHECKING:
from pyspark.sql._typing import ColumnOrName
[docs]class TableArg(TableValuedFunctionArgument):
"""
Represents a table argument in PySpark.
This class provides methods to specify partitioning, ordering, and
single-partition constraints when passing a DataFrame as a table argument
to TVF(Table-Valued Function)s including UDTF(User-Defined Table Function)s.
"""
[docs] @dispatch_table_arg_method
def partitionBy(self, *cols: "ColumnOrName") -> "TableArg":
"""
Partitions the data based on the specified columns.
This method partitions the table argument data by the specified columns.
It must be called before `orderBy()` and cannot be called after
`withSinglePartition()` has been called.
Parameters
----------
cols : str, :class:`Column`, or list
Column names or :class:`Column` objects to partition by.
Returns
-------
:class:`TableArg`
A new `TableArg` instance with partitioning applied.
Examples
--------
>>> from pyspark.sql.functions import udtf
>>>
>>> @udtf(returnType="key: int, value: string")
... class ProcessUDTF:
... def eval(self, row):
... yield row["key"], row["value"]
...
>>> df = spark.createDataFrame(
... [(1, "a"), (1, "b"), (2, "c"), (2, "d")], ["key", "value"]
... )
>>>
>>> # Partition by a single column
>>> result = ProcessUDTF(df.asTable().partitionBy("key"))
>>> result.show()
+---+-----+
|key|value|
+---+-----+
| 1| a|
| 1| b|
| 2| c|
| 2| d|
+---+-----+
>>>
>>> # Partition by multiple columns
>>> df2 = spark.createDataFrame(
... [(1, "x", 10), (1, "x", 20), (2, "y", 30)], ["key", "category", "value"]
... )
>>> result2 = ProcessUDTF(df2.asTable().partitionBy("key", "category"))
>>> result2.show()
+---+-----+
|key|value|
+---+-----+
| 1| x|
| 1| x|
| 2| y|
+---+-----+
"""
...
[docs] @dispatch_table_arg_method
def orderBy(self, *cols: "ColumnOrName") -> "TableArg":
"""
Orders the data within each partition by the specified columns.
This method orders the data within partitions. It must be called after
`partitionBy()` or `withSinglePartition()` has been called.
Parameters
----------
cols : str, :class:`Column`, or list
Column names or :class:`Column` objects to order by. Columns can be
ordered in ascending or descending order using :meth:`Column.asc` or
:meth:`Column.desc`.
Returns
-------
:class:`TableArg`
A new `TableArg` instance with ordering applied.
Examples
--------
>>> from pyspark.sql.functions import udtf
>>>
>>> @udtf(returnType="key: int, value: string")
... class ProcessUDTF:
... def eval(self, row):
... yield row["key"], row["value"]
...
>>> df = spark.createDataFrame(
... [(1, "b"), (1, "a"), (2, "d"), (2, "c")], ["key", "value"]
... )
>>>
>>> # Order by a single column within partitions
>>> result = ProcessUDTF(df.asTable().partitionBy("key").orderBy("value"))
>>> result.show()
+---+-----+
|key|value|
+---+-----+
| 1| a|
| 1| b|
| 2| c|
| 2| d|
+---+-----+
>>>
>>> # Order by multiple columns
>>> df2 = spark.createDataFrame(
... [(1, "a", 2), (1, "a", 1), (1, "b", 3)], ["key", "value", "num"]
... )
>>> result2 = ProcessUDTF(df2.asTable().partitionBy("key").orderBy("value", "num"))
>>> result2.show()
+---+-----+
|key|value|
+---+-----+
| 1| a|
| 1| a|
| 1| b|
+---+-----+
>>>
>>> # Order by descending order
>>> result3 = ProcessUDTF(df.asTable().partitionBy("key").orderBy(df.value.desc()))
>>> result3.show()
+---+-----+
|key|value|
+---+-----+
| 1| b|
| 1| a|
| 2| d|
| 2| c|
+---+-----+
"""
...
[docs] @dispatch_table_arg_method
def withSinglePartition(self) -> "TableArg":
"""
Forces the data to be processed in a single partition.
This method indicates that all data should be treated as a single partition.
It cannot be called after `partitionBy()` has been called. `orderBy()` can
be called after this method to order the data within the single partition.
Returns
-------
:class:`TableArg`
A new `TableArg` instance with single partition constraint applied.
Examples
--------
>>> from pyspark.sql.functions import udtf
>>>
>>> @udtf(returnType="key: int, value: string")
... class ProcessUDTF:
... def eval(self, row):
... yield row["key"], row["value"]
...
>>> df = spark.createDataFrame(
... [(1, "a"), (2, "b"), (3, "c")], ["key", "value"]
... )
>>>
>>> # Process all data in a single partition
>>> result = ProcessUDTF(df.asTable().withSinglePartition())
>>> result.show()
+---+-----+
|key|value|
+---+-----+
| 1| a|
| 2| b|
| 3| c|
+---+-----+
>>>
>>> # Use withSinglePartition and orderBy together
>>> df2 = spark.createDataFrame(
... [(3, "c"), (1, "a"), (2, "b")], ["key", "value"]
... )
>>> result2 = ProcessUDTF(df2.asTable().withSinglePartition().orderBy("key"))
>>> result2.show()
+---+-----+
|key|value|
+---+-----+
| 1| a|
| 2| b|
| 3| c|
+---+-----+
"""
...