## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#fromtypingimportOptionalfrompyspark.errorsimportPySparkValueErrorfrompyspark.sql.columnimportColumnfrompyspark.sql.dataframeimportDataFramefrompyspark.sql.sessionimportSparkSession__all__=["TableValuedFunction"]
[docs]classTableValuedFunction:""" Interface for invoking table-valued functions in Spark SQL. """def__init__(self,sparkSession:SparkSession):self._sparkSession=sparkSession
[docs]defrange(self,start:int,end:Optional[int]=None,step:int=1,numPartitions:Optional[int]=None,)->DataFrame:""" Create a :class:`DataFrame` with single :class:`pyspark.sql.types.LongType` column named ``id``, containing elements in a range from ``start`` to ``end`` (exclusive) with step value ``step``. .. versionadded:: 4.0.0 Parameters ---------- start : int the start value end : int, optional the end value (exclusive) step : int, optional the incremental step (default: 1) numPartitions : int, optional the number of partitions of the DataFrame Returns ------- :class:`DataFrame` Examples -------- >>> spark.tvf.range(1, 7, 2).show() +---+ | id| +---+ | 1| | 3| | 5| +---+ If only one argument is specified, it will be used as the end value. >>> spark.tvf.range(3).show() +---+ | id| +---+ | 0| | 1| | 2| +---+ """returnself._sparkSession.range(start,end,step,numPartitions)
[docs]defexplode(self,collection:Column)->DataFrame:""" Returns a :class:`DataFrame` containing a new row for each element in the given array or map. Uses the default column name `col` for elements in the array and `key` and `value` for elements in the map unless specified otherwise. .. versionadded:: 4.0.0 Parameters ---------- collection : :class:`~pyspark.sql.Column` Target column to work on. Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.explode` Examples -------- Example 1: Exploding an array column >>> import pyspark.sql.functions as sf >>> spark.tvf.explode(sf.array(sf.lit(1), sf.lit(2), sf.lit(3))).show() +---+ |col| +---+ | 1| | 2| | 3| +---+ Example 2: Exploding a map column >>> import pyspark.sql.functions as sf >>> spark.tvf.explode( ... sf.create_map(sf.lit("a"), sf.lit("b"), sf.lit("c"), sf.lit("d")) ... ).show() +---+-----+ |key|value| +---+-----+ | a| b| | c| d| +---+-----+ Example 3: Exploding an array of struct column >>> import pyspark.sql.functions as sf >>> spark.tvf.explode(sf.array( ... sf.named_struct(sf.lit("a"), sf.lit(1), sf.lit("b"), sf.lit(2)), ... sf.named_struct(sf.lit("a"), sf.lit(3), sf.lit("b"), sf.lit(4)) ... )).select("col.*").show() +---+---+ | a| b| +---+---+ | 1| 2| | 3| 4| +---+---+ Example 4: Exploding an empty array column >>> import pyspark.sql.functions as sf >>> spark.tvf.explode(sf.array()).show() +---+ |col| +---+ +---+ Example 5: Exploding an empty map column >>> import pyspark.sql.functions as sf >>> spark.tvf.explode(sf.create_map()).show() +---+-----+ |key|value| +---+-----+ +---+-----+ """returnself._fn("explode",collection)
[docs]defexplode_outer(self,collection:Column)->DataFrame:""" Returns a :class:`DataFrame` containing a new row for each element with position in the given array or map. Unlike explode, if the array/map is null or empty then null is produced. Uses the default column name `col` for elements in the array and `key` and `value` for elements in the map unless specified otherwise. .. versionadded:: 4.0.0 Parameters ---------- collection : :class:`~pyspark.sql.Column` target column to work on. Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.explode_outer` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.explode_outer(sf.array(sf.lit("foo"), sf.lit("bar"))).show() +---+ |col| +---+ |foo| |bar| +---+ >>> spark.tvf.explode_outer(sf.array()).show() +----+ | col| +----+ |NULL| +----+ >>> spark.tvf.explode_outer(sf.create_map(sf.lit("x"), sf.lit(1.0))).show() +---+-----+ |key|value| +---+-----+ | x| 1.0| +---+-----+ >>> spark.tvf.explode_outer(sf.create_map()).show() +----+-----+ | key|value| +----+-----+ |NULL| NULL| +----+-----+ """returnself._fn("explode_outer",collection)
[docs]definline(self,input:Column)->DataFrame:""" Explodes an array of structs into a table. This function takes an input column containing an array of structs and returns a new column where each struct in the array is exploded into a separate row. .. versionadded:: 4.0.0 Parameters ---------- input : :class:`~pyspark.sql.Column` Input column of values to explode. Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.inline` Examples -------- Example 1: Using inline with a single struct array >>> import pyspark.sql.functions as sf >>> spark.tvf.inline(sf.array( ... sf.named_struct(sf.lit("a"), sf.lit(1), sf.lit("b"), sf.lit(2)), ... sf.named_struct(sf.lit("a"), sf.lit(3), sf.lit("b"), sf.lit(4)) ... )).show() +---+---+ | a| b| +---+---+ | 1| 2| | 3| 4| +---+---+ Example 2: Using inline with an empty struct array column >>> import pyspark.sql.functions as sf >>> spark.tvf.inline(sf.array().astype("array<struct<a:int,b:int>>")).show() +---+---+ | a| b| +---+---+ +---+---+ Example 3: Using inline with a struct array column containing null values >>> import pyspark.sql.functions as sf >>> spark.tvf.inline(sf.array( ... sf.named_struct(sf.lit("a"), sf.lit(1), sf.lit("b"), sf.lit(2)), ... sf.lit(None), ... sf.named_struct(sf.lit("a"), sf.lit(3), sf.lit("b"), sf.lit(4)) ... )).show() +----+----+ | a| b| +----+----+ | 1| 2| |NULL|NULL| | 3| 4| +----+----+ """returnself._fn("inline",input)
[docs]definline_outer(self,input:Column)->DataFrame:""" Explodes an array of structs into a table. Unlike inline, if the array is null or empty then null is produced for each nested column. .. versionadded:: 4.0.0 Parameters ---------- input : :class:`~pyspark.sql.Column` input column of values to explode. Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.inline_outer` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.inline_outer(sf.array( ... sf.named_struct(sf.lit("a"), sf.lit(1), sf.lit("b"), sf.lit(2)), ... sf.named_struct(sf.lit("a"), sf.lit(3), sf.lit("b"), sf.lit(4)) ... )).show() +---+---+ | a| b| +---+---+ | 1| 2| | 3| 4| +---+---+ >>> spark.tvf.inline_outer(sf.array().astype("array<struct<a:int,b:int>>")).show() +----+----+ | a| b| +----+----+ |NULL|NULL| +----+----+ """returnself._fn("inline_outer",input)
[docs]defjson_tuple(self,input:Column,*fields:Column)->DataFrame:""" Creates a new row for a json column according to the given field names. .. versionadded:: 4.0.0 Parameters ---------- input : :class:`~pyspark.sql.Column` string column in json format fields : :class:`~pyspark.sql.Column` a field or fields to extract Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.json_tuple` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.json_tuple( ... sf.lit('{"f1": "value1", "f2": "value2"}'), sf.lit("f1"), sf.lit("f2") ... ).show() +------+------+ | c0| c1| +------+------+ |value1|value2| +------+------+ """frompyspark.sql.classic.columnimport_to_seq,_to_java_columniflen(fields)==0:raisePySparkValueError(errorClass="CANNOT_BE_EMPTY",messageParameters={"item":"field"},)sc=self._sparkSession.sparkContextreturnDataFrame(self._sparkSession._jsparkSession.tvf().json_tuple(_to_java_column(input),_to_seq(sc,fields,_to_java_column)),self._sparkSession,)
[docs]defposexplode(self,collection:Column)->DataFrame:""" Returns a :class:`DataFrame` containing a new row for each element with position in the given array or map. Uses the default column name `pos` for position, and `col` for elements in the array and `key` and `value` for elements in the map unless specified otherwise. .. versionadded:: 4.0.0 Parameters ---------- collection : :class:`~pyspark.sql.Column` target column to work on. Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.posexplode` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.posexplode(sf.array(sf.lit(1), sf.lit(2), sf.lit(3))).show() +---+---+ |pos|col| +---+---+ | 0| 1| | 1| 2| | 2| 3| +---+---+ >>> spark.tvf.posexplode(sf.create_map(sf.lit("a"), sf.lit("b"))).show() +---+---+-----+ |pos|key|value| +---+---+-----+ | 0| a| b| +---+---+-----+ """returnself._fn("posexplode",collection)
[docs]defposexplode_outer(self,collection:Column)->DataFrame:""" Returns a :class:`DataFrame` containing a new row for each element with position in the given array or map. Unlike posexplode, if the array/map is null or empty then the row (null, null) is produced. Uses the default column name `pos` for position, and `col` for elements in the array and `key` and `value` for elements in the map unless specified otherwise. .. versionadded:: 4.0.0 Parameters ---------- collection : :class:`~pyspark.sql.Column` target column to work on. Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.posexplode_outer` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.posexplode_outer(sf.array(sf.lit("foo"), sf.lit("bar"))).show() +---+---+ |pos|col| +---+---+ | 0|foo| | 1|bar| +---+---+ >>> spark.tvf.posexplode_outer(sf.array()).show() +----+----+ | pos| col| +----+----+ |NULL|NULL| +----+----+ >>> spark.tvf.posexplode_outer(sf.create_map(sf.lit("x"), sf.lit(1.0))).show() +---+---+-----+ |pos|key|value| +---+---+-----+ | 0| x| 1.0| +---+---+-----+ >>> spark.tvf.posexplode_outer(sf.create_map()).show() +----+----+-----+ | pos| key|value| +----+----+-----+ |NULL|NULL| NULL| +----+----+-----+ """returnself._fn("posexplode_outer",collection)
[docs]defstack(self,n:Column,*fields:Column)->DataFrame:""" Separates `col1`, ..., `colk` into `n` rows. Uses column names col0, col1, etc. by default unless specified otherwise. .. versionadded:: 4.0.0 Parameters ---------- n : :class:`~pyspark.sql.Column` the number of rows to separate fields : :class:`~pyspark.sql.Column` input elements to be separated Returns ------- :class:`DataFrame` See Also -------- :meth:`pyspark.sql.functions.stack` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.stack(sf.lit(2), sf.lit(1), sf.lit(2), sf.lit(3)).show() +----+----+ |col0|col1| +----+----+ | 1| 2| | 3|NULL| +----+----+ """frompyspark.sql.classic.columnimport_to_seq,_to_java_columnsc=self._sparkSession.sparkContextreturnDataFrame(self._sparkSession._jsparkSession.tvf().stack(_to_java_column(n),_to_seq(sc,fields,_to_java_column)),self._sparkSession,)
[docs]defcollations(self)->DataFrame:""" Get all of the Spark SQL string collations. .. versionadded:: 4.0.0 Returns ------- :class:`DataFrame` Examples -------- >>> spark.tvf.collations().show() +-------+-------+-------------------+... |CATALOG| SCHEMA| NAME|... +-------+-------+-------------------+... ... +-------+-------+-------------------+... """returnself._fn("collations")
[docs]defvariant_explode(self,input:Column)->DataFrame:""" Separates a variant object/array into multiple rows containing its fields/elements. Its result schema is `struct<pos int, key string, value variant>`. `pos` is the position of the field/element in its parent object/array, and `value` is the field/element value. `key` is the field name when exploding a variant object, or is NULL when exploding a variant array. It ignores any input that is not a variant array/object, including SQL NULL, variant null, and any other variant values. .. versionadded:: 4.0.0 Parameters ---------- input : :class:`~pyspark.sql.Column` input column of values to explode. Returns ------- :class:`DataFrame` Examples -------- Example 1: Using variant_explode with a variant array >>> import pyspark.sql.functions as sf >>> spark.tvf.variant_explode(sf.parse_json(sf.lit('["hello", "world"]'))).show() +---+----+-------+ |pos| key| value| +---+----+-------+ | 0|NULL|"hello"| | 1|NULL|"world"| +---+----+-------+ Example 2: Using variant_explode with a variant object >>> import pyspark.sql.functions as sf >>> spark.tvf.variant_explode(sf.parse_json(sf.lit('{"a": true, "b": 3.14}'))).show() +---+---+-----+ |pos|key|value| +---+---+-----+ | 0| a| true| | 1| b| 3.14| +---+---+-----+ Example 3: Using variant_explode with an empty variant array >>> import pyspark.sql.functions as sf >>> spark.tvf.variant_explode(sf.parse_json(sf.lit('[]'))).show() +---+---+-----+ |pos|key|value| +---+---+-----+ +---+---+-----+ Example 4: Using variant_explode with an empty variant object >>> import pyspark.sql.functions as sf >>> spark.tvf.variant_explode(sf.parse_json(sf.lit('{}'))).show() +---+---+-----+ |pos|key|value| +---+---+-----+ +---+---+-----+ """returnself._fn("variant_explode",input)
[docs]defvariant_explode_outer(self,input:Column)->DataFrame:""" Separates a variant object/array into multiple rows containing its fields/elements. Its result schema is `struct<pos int, key string, value variant>`. `pos` is the position of the field/element in its parent object/array, and `value` is the field/element value. `key` is the field name when exploding a variant object, or is NULL when exploding a variant array. Unlike variant_explode, if the given variant is not a variant array/object, including SQL NULL, variant null, and any other variant values, then NULL is produced. .. versionadded:: 4.0.0 Parameters ---------- input : :class:`~pyspark.sql.Column` input column of values to explode. Returns ------- :class:`DataFrame` Examples -------- >>> import pyspark.sql.functions as sf >>> spark.tvf.variant_explode_outer(sf.parse_json(sf.lit('["hello", "world"]'))).show() +---+----+-------+ |pos| key| value| +---+----+-------+ | 0|NULL|"hello"| | 1|NULL|"world"| +---+----+-------+ >>> spark.tvf.variant_explode_outer(sf.parse_json(sf.lit('[]'))).show() +----+----+-----+ | pos| key|value| +----+----+-----+ |NULL|NULL| NULL| +----+----+-----+ >>> spark.tvf.variant_explode_outer(sf.parse_json(sf.lit('{"a": true, "b": 3.14}'))).show() +---+---+-----+ |pos|key|value| +---+---+-----+ | 0| a| true| | 1| b| 3.14| +---+---+-----+ >>> spark.tvf.variant_explode_outer(sf.parse_json(sf.lit('{}'))).show() +----+----+-----+ | pos| key|value| +----+----+-----+ |NULL|NULL| NULL| +----+----+-----+ """returnself._fn("variant_explode_outer",input)