Source code for pyspark.sql.tvf

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Optional

from pyspark.errors import PySparkValueError
from pyspark.sql.column import Column
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.session import SparkSession

__all__ = ["TableValuedFunction"]


[docs]class TableValuedFunction: """ Interface for invoking table-valued functions in Spark SQL. """ def __init__(self, sparkSession: SparkSession): self._sparkSession = sparkSession
[docs] def range( self, start: int, end: Optional[int] = None, step: int = 1, numPartitions: Optional[int] = None, ) -> DataFrame: """ Create a :class:`DataFrame` with single :class:`pyspark.sql.types.LongType` column named ``id``, containing elements in a range from ``start`` to ``end`` (exclusive) with step value ``step``. .. versionadded:: 4.0.0 Parameters ---------- start : int the start value end : int, optional the end value (exclusive) step : int, optional the incremental step (default: 1) numPartitions : int, optional the number of partitions of the DataFrame Returns ------- :class:`DataFrame` Examples -------- >>> spark.tvf.range(1, 7, 2).show() +---+ | id| +---+ | 1| | 3| | 5| +---+ If only one argument is specified, it will be used as the end value. >>> spark.tvf.range(3).show() +---+ | id| +---+ | 0| | 1| | 2| +---+ """ return self._sparkSession.range(start, end, step, numPartitions)
[docs] def explode(self, collection: Column) -> DataFrame: """ Returns a :class:`DataFrame` containing a new row for each element in the given array or map. Uses the default column name `col` for elements in the array and `key` and `value` for elements in the map unless specified otherwise. .. versionadded:: 4.0.0 Parameters ---------- collection : :class:`~pyspark.sql.Column` Target column to work on. Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.explode` Examples -------- Example 1: Exploding an array column >>> import pyspark.sql.functions as sf >>> spark.tvf.explode(sf.array(sf.lit(1), sf.lit(2), sf.lit(3))).show() +---+ |col| +---+ | 1| | 2| | 3| +---+ Example 2: Exploding a map column >>> import pyspark.sql.functions as sf >>> spark.tvf.explode( ... sf.create_map(sf.lit("a"), sf.lit("b"), sf.lit("c"), sf.lit("d")) ... ).show() +---+-----+ |key|value| +---+-----+ | a| b| | c| d| +---+-----+ Example 3: Exploding an array of struct column >>> import pyspark.sql.functions as sf >>> spark.tvf.explode(sf.array( ... sf.named_struct(sf.lit("a"), sf.lit(1), sf.lit("b"), sf.lit(2)), ... sf.named_struct(sf.lit("a"), sf.lit(3), sf.lit("b"), sf.lit(4)) ... )).select("col.*").show() +---+---+ | a| b| +---+---+ | 1| 2| | 3| 4| +---+---+ Example 4: Exploding an empty array column >>> import pyspark.sql.functions as sf >>> spark.tvf.explode(sf.array()).show() +---+ |col| +---+ +---+ Example 5: Exploding an empty map column >>> import pyspark.sql.functions as sf >>> spark.tvf.explode(sf.create_map()).show() +---+-----+ |key|value| +---+-----+ +---+-----+ """ return self._fn("explode", collection)
[docs] def explode_outer(self, collection: Column) -> DataFrame: """ Returns a :class:`DataFrame` containing a new row for each element with position in the given array or map. Unlike explode, if the array/map is null or empty then null is produced. Uses the default column name `col` for elements in the array and `key` and `value` for elements in the map unless specified otherwise. .. versionadded:: 4.0.0 Parameters ---------- collection : :class:`~pyspark.sql.Column` target column to work on. Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.explode_outer` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.explode_outer(sf.array(sf.lit("foo"), sf.lit("bar"))).show() +---+ |col| +---+ |foo| |bar| +---+ >>> spark.tvf.explode_outer(sf.array()).show() +----+ | col| +----+ |NULL| +----+ >>> spark.tvf.explode_outer(sf.create_map(sf.lit("x"), sf.lit(1.0))).show() +---+-----+ |key|value| +---+-----+ | x| 1.0| +---+-----+ >>> spark.tvf.explode_outer(sf.create_map()).show() +----+-----+ | key|value| +----+-----+ |NULL| NULL| +----+-----+ """ return self._fn("explode_outer", collection)
[docs] def inline(self, input: Column) -> DataFrame: """ Explodes an array of structs into a table. This function takes an input column containing an array of structs and returns a new column where each struct in the array is exploded into a separate row. .. versionadded:: 4.0.0 Parameters ---------- input : :class:`~pyspark.sql.Column` Input column of values to explode. Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.inline` Examples -------- Example 1: Using inline with a single struct array >>> import pyspark.sql.functions as sf >>> spark.tvf.inline(sf.array( ... sf.named_struct(sf.lit("a"), sf.lit(1), sf.lit("b"), sf.lit(2)), ... sf.named_struct(sf.lit("a"), sf.lit(3), sf.lit("b"), sf.lit(4)) ... )).show() +---+---+ | a| b| +---+---+ | 1| 2| | 3| 4| +---+---+ Example 2: Using inline with an empty struct array column >>> import pyspark.sql.functions as sf >>> spark.tvf.inline(sf.array().astype("array<struct<a:int,b:int>>")).show() +---+---+ | a| b| +---+---+ +---+---+ Example 3: Using inline with a struct array column containing null values >>> import pyspark.sql.functions as sf >>> spark.tvf.inline(sf.array( ... sf.named_struct(sf.lit("a"), sf.lit(1), sf.lit("b"), sf.lit(2)), ... sf.lit(None), ... sf.named_struct(sf.lit("a"), sf.lit(3), sf.lit("b"), sf.lit(4)) ... )).show() +----+----+ | a| b| +----+----+ | 1| 2| |NULL|NULL| | 3| 4| +----+----+ """ return self._fn("inline", input)
[docs] def inline_outer(self, input: Column) -> DataFrame: """ Explodes an array of structs into a table. Unlike inline, if the array is null or empty then null is produced for each nested column. .. versionadded:: 4.0.0 Parameters ---------- input : :class:`~pyspark.sql.Column` input column of values to explode. Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.inline_outer` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.inline_outer(sf.array( ... sf.named_struct(sf.lit("a"), sf.lit(1), sf.lit("b"), sf.lit(2)), ... sf.named_struct(sf.lit("a"), sf.lit(3), sf.lit("b"), sf.lit(4)) ... )).show() +---+---+ | a| b| +---+---+ | 1| 2| | 3| 4| +---+---+ >>> spark.tvf.inline_outer(sf.array().astype("array<struct<a:int,b:int>>")).show() +----+----+ | a| b| +----+----+ |NULL|NULL| +----+----+ """ return self._fn("inline_outer", input)
[docs] def json_tuple(self, input: Column, *fields: Column) -> DataFrame: """ Creates a new row for a json column according to the given field names. .. versionadded:: 4.0.0 Parameters ---------- input : :class:`~pyspark.sql.Column` string column in json format fields : :class:`~pyspark.sql.Column` a field or fields to extract Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.json_tuple` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.json_tuple( ... sf.lit('{"f1": "value1", "f2": "value2"}'), sf.lit("f1"), sf.lit("f2") ... ).show() +------+------+ | c0| c1| +------+------+ |value1|value2| +------+------+ """ from pyspark.sql.classic.column import _to_seq, _to_java_column if len(fields) == 0: raise PySparkValueError( errorClass="CANNOT_BE_EMPTY", messageParameters={"item": "field"}, ) sc = self._sparkSession.sparkContext return DataFrame( self._sparkSession._jsparkSession.tvf().json_tuple( _to_java_column(input), _to_seq(sc, fields, _to_java_column) ), self._sparkSession, )
[docs] def posexplode(self, collection: Column) -> DataFrame: """ Returns a :class:`DataFrame` containing a new row for each element with position in the given array or map. Uses the default column name `pos` for position, and `col` for elements in the array and `key` and `value` for elements in the map unless specified otherwise. .. versionadded:: 4.0.0 Parameters ---------- collection : :class:`~pyspark.sql.Column` target column to work on. Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.posexplode` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.posexplode(sf.array(sf.lit(1), sf.lit(2), sf.lit(3))).show() +---+---+ |pos|col| +---+---+ | 0| 1| | 1| 2| | 2| 3| +---+---+ >>> spark.tvf.posexplode(sf.create_map(sf.lit("a"), sf.lit("b"))).show() +---+---+-----+ |pos|key|value| +---+---+-----+ | 0| a| b| +---+---+-----+ """ return self._fn("posexplode", collection)
[docs] def posexplode_outer(self, collection: Column) -> DataFrame: """ Returns a :class:`DataFrame` containing a new row for each element with position in the given array or map. Unlike posexplode, if the array/map is null or empty then the row (null, null) is produced. Uses the default column name `pos` for position, and `col` for elements in the array and `key` and `value` for elements in the map unless specified otherwise. .. versionadded:: 4.0.0 Parameters ---------- collection : :class:`~pyspark.sql.Column` target column to work on. Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.posexplode_outer` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.posexplode_outer(sf.array(sf.lit("foo"), sf.lit("bar"))).show() +---+---+ |pos|col| +---+---+ | 0|foo| | 1|bar| +---+---+ >>> spark.tvf.posexplode_outer(sf.array()).show() +----+----+ | pos| col| +----+----+ |NULL|NULL| +----+----+ >>> spark.tvf.posexplode_outer(sf.create_map(sf.lit("x"), sf.lit(1.0))).show() +---+---+-----+ |pos|key|value| +---+---+-----+ | 0| x| 1.0| +---+---+-----+ >>> spark.tvf.posexplode_outer(sf.create_map()).show() +----+----+-----+ | pos| key|value| +----+----+-----+ |NULL|NULL| NULL| +----+----+-----+ """ return self._fn("posexplode_outer", collection)
[docs] def stack(self, n: Column, *fields: Column) -> DataFrame: """ Separates `col1`, ..., `colk` into `n` rows. Uses column names col0, col1, etc. by default unless specified otherwise. .. versionadded:: 4.0.0 Parameters ---------- n : :class:`~pyspark.sql.Column` the number of rows to separate fields : :class:`~pyspark.sql.Column` input elements to be separated Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.stack` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.stack(sf.lit(2), sf.lit(1), sf.lit(2), sf.lit(3)).show() +----+----+ |col0|col1| +----+----+ | 1| 2| | 3|NULL| +----+----+ """ from pyspark.sql.classic.column import _to_seq, _to_java_column sc = self._sparkSession.sparkContext return DataFrame( self._sparkSession._jsparkSession.tvf().stack( _to_java_column(n), _to_seq(sc, fields, _to_java_column) ), self._sparkSession, )
[docs] def collations(self) -> DataFrame: """ Get all of the Spark SQL string collations. .. versionadded:: 4.0.0 Returns ------- :class:`DataFrame` Examples -------- >>> spark.tvf.collations().show() +-------+-------+-------------------+... |CATALOG| SCHEMA| NAME|... +-------+-------+-------------------+... ... +-------+-------+-------------------+... """ return self._fn("collations")
[docs] def sql_keywords(self) -> DataFrame: """ Get Spark SQL keywords. .. versionadded:: 4.0.0 Returns ------- :class:`DataFrame` Examples -------- >>> spark.tvf.sql_keywords().show() +-------------+--------+ | keyword|reserved| +-------------+--------+ ... +-------------+--------+... """ return self._fn("sql_keywords")
[docs] def variant_explode(self, input: Column) -> DataFrame: """ Separates a variant object/array into multiple rows containing its fields/elements. Its result schema is `struct<pos int, key string, value variant>`. `pos` is the position of the field/element in its parent object/array, and `value` is the field/element value. `key` is the field name when exploding a variant object, or is NULL when exploding a variant array. It ignores any input that is not a variant array/object, including SQL NULL, variant null, and any other variant values. .. versionadded:: 4.0.0 Parameters ---------- input : :class:`~pyspark.sql.Column` input column of values to explode. Returns ------- :class:`DataFrame` Examples -------- Example 1: Using variant_explode with a variant array >>> import pyspark.sql.functions as sf >>> spark.tvf.variant_explode(sf.parse_json(sf.lit('["hello", "world"]'))).show() +---+----+-------+ |pos| key| value| +---+----+-------+ | 0|NULL|"hello"| | 1|NULL|"world"| +---+----+-------+ Example 2: Using variant_explode with a variant object >>> import pyspark.sql.functions as sf >>> spark.tvf.variant_explode(sf.parse_json(sf.lit('{"a": true, "b": 3.14}'))).show() +---+---+-----+ |pos|key|value| +---+---+-----+ | 0| a| true| | 1| b| 3.14| +---+---+-----+ Example 3: Using variant_explode with an empty variant array >>> import pyspark.sql.functions as sf >>> spark.tvf.variant_explode(sf.parse_json(sf.lit('[]'))).show() +---+---+-----+ |pos|key|value| +---+---+-----+ +---+---+-----+ Example 4: Using variant_explode with an empty variant object >>> import pyspark.sql.functions as sf >>> spark.tvf.variant_explode(sf.parse_json(sf.lit('{}'))).show() +---+---+-----+ |pos|key|value| +---+---+-----+ +---+---+-----+ """ return self._fn("variant_explode", input)
[docs] def variant_explode_outer(self, input: Column) -> DataFrame: """ Separates a variant object/array into multiple rows containing its fields/elements. Its result schema is `struct<pos int, key string, value variant>`. `pos` is the position of the field/element in its parent object/array, and `value` is the field/element value. `key` is the field name when exploding a variant object, or is NULL when exploding a variant array. Unlike variant_explode, if the given variant is not a variant array/object, including SQL NULL, variant null, and any other variant values, then NULL is produced. .. versionadded:: 4.0.0 Parameters ---------- input : :class:`~pyspark.sql.Column` input column of values to explode. Returns ------- :class:`DataFrame` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.variant_explode_outer(sf.parse_json(sf.lit('["hello", "world"]'))).show() +---+----+-------+ |pos| key| value| +---+----+-------+ | 0|NULL|"hello"| | 1|NULL|"world"| +---+----+-------+ >>> spark.tvf.variant_explode_outer(sf.parse_json(sf.lit('[]'))).show() +----+----+-----+ | pos| key|value| +----+----+-----+ |NULL|NULL| NULL| +----+----+-----+ >>> spark.tvf.variant_explode_outer(sf.parse_json(sf.lit('{"a": true, "b": 3.14}'))).show() +---+---+-----+ |pos|key|value| +---+---+-----+ | 0| a| true| | 1| b| 3.14| +---+---+-----+ >>> spark.tvf.variant_explode_outer(sf.parse_json(sf.lit('{}'))).show() +----+----+-----+ | pos| key|value| +----+----+-----+ |NULL|NULL| NULL| +----+----+-----+ """ return self._fn("variant_explode_outer", input)
def _fn(self, functionName: str, *args: Column) -> DataFrame: from pyspark.sql.classic.column import _to_java_column return DataFrame( getattr(self._sparkSession._jsparkSession.tvf(), functionName)( *(_to_java_column(arg) for arg in args) ), self._sparkSession, )
def _test() -> None: import os import doctest import sys import pyspark.sql.tvf os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.tvf.__dict__.copy() globs["spark"] = SparkSession.builder.master("local[4]").appName("sql.tvf tests").getOrCreate() (failure_count, test_count) = doctest.testmod( pyspark.sql.tvf, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE, ) globs["spark"].stop() if failure_count: sys.exit(-1) if __name__ == "__main__": _test()