## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#"""User-defined function related classes and functions"""frominspectimportgetfullargspecimportfunctoolsimportinspectimportsysimportwarningsfromtypingimportCallable,Any,TYPE_CHECKING,Optional,cast,Unionfrompyspark.utilimportPythonEvalTypefrompyspark.sql.columnimportColumnfrompyspark.sql.typesimport(DataType,StringType,StructType,_parse_datatype_string,)frompyspark.sql.utilsimportget_active_spark_contextfrompyspark.sql.pandas.typesimportto_arrow_typefrompyspark.sql.pandas.utilsimportrequire_minimum_pandas_version,require_minimum_pyarrow_versionfrompyspark.errorsimportPySparkTypeError,PySparkNotImplementedError,PySparkRuntimeErrorifTYPE_CHECKING:frompy4j.java_gatewayimportJavaObjectfrompyspark.core.contextimportSparkContextfrompyspark.sql._typingimportDataTypeOrString,ColumnOrName,UserDefinedFunctionLikefrompyspark.sql.sessionimportSparkSession__all__=["UDFRegistration"]def_wrap_function(sc:"SparkContext",func:Callable[...,Any],returnType:Optional[DataType]=None)->"JavaObject":frompyspark.core.rddimport_prepare_for_python_RDDcommand:AnyifreturnTypeisNone:command=funcelse:command=(func,returnType)pickled_command,broadcast_vars,env,includes=_prepare_for_python_RDD(sc,command)assertsc._jvmisnotNonereturnsc._jvm.SimplePythonFunction(bytearray(pickled_command),env,includes,sc.pythonExec,sc.pythonVer,broadcast_vars,sc._javaAccumulator,)def_create_udf(f:Callable[...,Any],returnType:"DataTypeOrString",evalType:int,name:Optional[str]=None,deterministic:bool=True,)->"UserDefinedFunctionLike":"""Create a regular(non-Arrow-optimized) Python UDF."""# Set the name of the UserDefinedFunction object to be the name of function fudf_obj=UserDefinedFunction(f,returnType=returnType,name=name,evalType=evalType,deterministic=deterministic)returnudf_obj._wrapped()def_create_py_udf(f:Callable[...,Any],returnType:"DataTypeOrString",useArrow:Optional[bool]=None,)->"UserDefinedFunctionLike":"""Create a regular/Arrow-optimized Python UDF."""# The following table shows the results when the type coercion in Arrow is needed, that is,# when the user-specified return type(SQL Type) of the UDF and the actual instance(Python# Value(Type)) that the UDF returns are different.# Arrow and Pickle have different type coercion rules, so a UDF might have a different result# with/without Arrow optimization. That's the main reason the Arrow optimization for Python# UDFs is disabled by default.# +-----------------------------+--------------+----------+------+---------------+--------------------+-----------------------------+----------+----------------------+---------+--------------------+----------------------------+------------+--------------+ # noqa# |SQL Type \ Python Value(Type)|None(NoneType)|True(bool)|1(int)| a(str)| 1970-01-01(date)|1970-01-01 00:00:00(datetime)|1.0(float)|array('i', [1])(array)|[1](list)| (1,)(tuple)|bytearray(b'ABC')(bytearray)| 1(Decimal)|{'a': 1}(dict)| # noqa# +-----------------------------+--------------+----------+------+---------------+--------------------+-----------------------------+----------+----------------------+---------+--------------------+----------------------------+------------+--------------+ # noqa# | boolean| None| True| None| None| None| None| None| None| None| None| None| None| None| # noqa# | tinyint| None| None| 1| None| None| None| None| None| None| None| None| None| None| # noqa# | smallint| None| None| 1| None| None| None| None| None| None| None| None| None| None| # noqa# | int| None| None| 1| None| None| None| None| None| None| None| None| None| None| # noqa# | bigint| None| None| 1| None| None| None| None| None| None| None| None| None| None| # noqa# | string| None| 'true'| '1'| 'a'|'java.util.Gregor...| 'java.util.Gregor...| '1.0'| '[I@120d813a'| '[1]'|'[Ljava.lang.Obje...| '[B@48571878'| '1'| '{a=1}'| # noqa# | date| None| X| X| X|datetime.date(197...| datetime.date(197...| X| X| X| X| X| X| X| # noqa# | timestamp| None| X| X| X| X| datetime.datetime...| X| X| X| X| X| X| X| # noqa# | float| None| None| None| None| None| None| 1.0| None| None| None| None| None| None| # noqa# | double| None| None| None| None| None| None| 1.0| None| None| None| None| None| None| # noqa# | binary| None| None| None|bytearray(b'a')| None| None| None| None| None| None| bytearray(b'ABC')| None| None| # noqa# | decimal(10,0)| None| None| None| None| None| None| None| None| None| None| None|Decimal('1')| None| # noqa# +-----------------------------+--------------+----------+------+---------------+--------------------+-----------------------------+----------+----------------------+---------+--------------------+----------------------------+------------+--------------+ # noqa# Note: Python 3.9.15, Pandas 1.5.2 and PyArrow 10.0.1 are used.# Note: The values of 'SQL Type' are DDL formatted strings, which can be used as `returnType`s.# Note: The values inside the table are generated by `repr`. X' means it throws an exception# during the conversion.ifuseArrowisNone:frompyspark.sqlimportSparkSessionsession=SparkSession._instantiatedSessionis_arrow_enabled=(FalseifsessionisNoneelsesession.conf.get("spark.sql.execution.pythonUDF.arrow.enabled")=="true")else:is_arrow_enabled=useArroweval_type:int=PythonEvalType.SQL_BATCHED_UDFifis_arrow_enabled:try:is_func_with_args=len(getfullargspec(f).args)>0exceptTypeError:is_func_with_args=Falseifis_func_with_args:require_minimum_pandas_version()require_minimum_pyarrow_version()eval_type=PythonEvalType.SQL_ARROW_BATCHED_UDFelse:warnings.warn("Arrow optimization for Python UDFs cannot be enabled for functions"" without arguments.",UserWarning,)return_create_udf(f,returnType,eval_type)
[docs]classUserDefinedFunction:""" User defined function in Python .. versionadded:: 1.3 Notes ----- The constructor of this class is not supposed to be directly called. Use :meth:`pyspark.sql.functions.udf` or :meth:`pyspark.sql.functions.pandas_udf` to create this instance. """def__init__(self,func:Callable[...,Any],returnType:"DataTypeOrString"=StringType(),name:Optional[str]=None,evalType:int=PythonEvalType.SQL_BATCHED_UDF,deterministic:bool=True,):ifnotcallable(func):raisePySparkTypeError(errorClass="NOT_CALLABLE",messageParameters={"arg_name":"func","arg_type":type(func).__name__},)ifnotisinstance(returnType,(DataType,str)):raisePySparkTypeError(errorClass="NOT_DATATYPE_OR_STR",messageParameters={"arg_name":"returnType","arg_type":type(returnType).__name__,},)ifnotisinstance(evalType,int):raisePySparkTypeError(errorClass="NOT_INT",messageParameters={"arg_name":"evalType","arg_type":type(evalType).__name__},)self.func=funcself._returnType=returnType# Stores UserDefinedPythonFunctions jobj, once initializedself._returnType_placeholder:Optional[DataType]=Noneself._judf_placeholder=Noneself._name=nameor(func.__name__ifhasattr(func,"__name__")elsefunc.__class__.__name__)self.evalType=evalTypeself.deterministic=deterministic@staticmethoddef_check_return_type(returnType:DataType,evalType:int)->None:ifevalType==PythonEvalType.SQL_ARROW_BATCHED_UDF:try:to_arrow_type(returnType)exceptTypeError:raisePySparkNotImplementedError(errorClass="NOT_IMPLEMENTED",messageParameters={"feature":f"Invalid return type with Arrow-optimized Python UDF: "f"{returnType}"},)elif(evalType==PythonEvalType.SQL_SCALAR_PANDAS_UDForevalType==PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF):try:to_arrow_type(returnType)exceptTypeError:raisePySparkNotImplementedError(errorClass="NOT_IMPLEMENTED",messageParameters={"feature":f"Invalid return type with scalar Pandas UDFs: "f"{returnType}"},)elif(evalType==PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDForevalType==PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE):ifisinstance(returnType,StructType):try:to_arrow_type(returnType)exceptTypeError:raisePySparkNotImplementedError(errorClass="NOT_IMPLEMENTED",messageParameters={"feature":f"Invalid return type with grouped map Pandas UDFs or "f"at groupby.applyInPandas(WithState): {returnType}"},)else:raisePySparkTypeError(errorClass="INVALID_RETURN_TYPE_FOR_PANDAS_UDF",messageParameters={"eval_type":"SQL_GROUPED_MAP_PANDAS_UDF or ""SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE","return_type":str(returnType),},)elif(evalType==PythonEvalType.SQL_MAP_PANDAS_ITER_UDForevalType==PythonEvalType.SQL_MAP_ARROW_ITER_UDF):ifisinstance(returnType,StructType):try:to_arrow_type(returnType)exceptTypeError:raisePySparkNotImplementedError(errorClass="NOT_IMPLEMENTED",messageParameters={"feature":f"Invalid return type in mapInPandas: "f"{returnType}"},)else:raisePySparkTypeError(errorClass="INVALID_RETURN_TYPE_FOR_PANDAS_UDF",messageParameters={"eval_type":"SQL_MAP_PANDAS_ITER_UDF or SQL_MAP_ARROW_ITER_UDF","return_type":str(returnType),},)elifevalType==PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF:ifisinstance(returnType,StructType):try:to_arrow_type(returnType)exceptTypeError:raisePySparkNotImplementedError(errorClass="NOT_IMPLEMENTED",messageParameters={"feature":"Invalid return type with grouped map Arrow UDFs or "f"at groupby.applyInArrow: {returnType}"},)else:raisePySparkTypeError(errorClass="INVALID_RETURN_TYPE_FOR_ARROW_UDF",messageParameters={"eval_type":"SQL_GROUPED_MAP_ARROW_UDF","return_type":str(returnType),},)elifevalType==PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:ifisinstance(returnType,StructType):try:to_arrow_type(returnType)exceptTypeError:raisePySparkNotImplementedError(errorClass="NOT_IMPLEMENTED",messageParameters={"feature":f"Invalid return type in cogroup.applyInPandas: "f"{returnType}"},)else:raisePySparkTypeError(errorClass="INVALID_RETURN_TYPE_FOR_PANDAS_UDF",messageParameters={"eval_type":"SQL_COGROUPED_MAP_PANDAS_UDF","return_type":str(returnType),},)elifevalType==PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF:ifisinstance(returnType,StructType):try:to_arrow_type(returnType)exceptTypeError:raisePySparkNotImplementedError(errorClass="NOT_IMPLEMENTED",messageParameters={"feature":"Invalid return type in cogroup.applyInArrow: "f"{returnType}"},)else:raisePySparkTypeError(errorClass="INVALID_RETURN_TYPE_FOR_ARROW_UDF",messageParameters={"eval_type":"SQL_COGROUPED_MAP_ARROW_UDF","return_type":str(returnType),},)elifevalType==PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:try:# StructType is not yet allowed as a return type, explicitly check here to fail fastifisinstance(returnType,StructType):raisePySparkNotImplementedError(errorClass="NOT_IMPLEMENTED",messageParameters={"feature":f"Invalid return type with grouped aggregate Pandas UDFs: "f"{returnType}"},)to_arrow_type(returnType)exceptTypeError:raisePySparkNotImplementedError(errorClass="NOT_IMPLEMENTED",messageParameters={"feature":f"Invalid return type with grouped aggregate Pandas UDFs: "f"{returnType}"},)@propertydefreturnType(self)->DataType:# Make sure this is called after SparkContext is initialized.# ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string.# TODO: PythonEvalType.SQL_BATCHED_UDFifself._returnType_placeholderisNone:ifisinstance(self._returnType,DataType):self._returnType_placeholder=self._returnTypeelse:self._returnType_placeholder=_parse_datatype_string(self._returnType)UserDefinedFunction._check_return_type(self._returnType_placeholder,self.evalType)returnself._returnType_placeholder@propertydef_judf(self)->"JavaObject":# It is possible that concurrent access, to newly created UDF,# will initialize multiple UserDefinedPythonFunctions.# This is unlikely, doesn't affect correctness,# and should have a minimal performance impact.ifself._judf_placeholderisNone:self._judf_placeholder=self._create_judf(self.func)returnself._judf_placeholderdef_create_judf(self,func:Callable[...,Any])->"JavaObject":frompyspark.sqlimportSparkSessionspark=SparkSession._getActiveSessionOrCreate()sc=spark.sparkContextwrapped_func=_wrap_function(sc,func,self.returnType)jdt=spark._jsparkSession.parseDataType(self.returnType.json())assertsc._jvmisnotNonejudf=getattr(sc._jvm,"org.apache.spark.sql.execution.python.UserDefinedPythonFunction")(self._name,wrapped_func,jdt,self.evalType,self.deterministic)returnjudfdef__call__(self,*args:"ColumnOrName",**kwargs:"ColumnOrName")->Column:frompyspark.sql.classic.columnimport_to_java_column,_to_seqsc=get_active_spark_context()assertsc._jvmisnotNonejcols=[_to_java_column(arg)forarginargs]+[sc._jvm.PythonSQLUtils.namedArgumentExpression(key,_to_java_column(value))forkey,valueinkwargs.items()]profiler_enabled=sc._conf.get("spark.python.profile","false")=="true"memory_profiler_enabled=sc._conf.get("spark.python.profile.memory","false")=="true"ifprofiler_enabledormemory_profiler_enabled:# Disable profiling Pandas UDFs with iterators as input/output.ifself.evalTypein[PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,PythonEvalType.SQL_MAP_ARROW_ITER_UDF,]:warnings.warn("Profiling UDFs with iterators input/output is not supported.",UserWarning,)judf=self._judfreturnColumn(judf.apply(_to_seq(sc,jcols)))# Disallow enabling two profilers at the same time.ifprofiler_enabledandmemory_profiler_enabled:# When both profilers are enabled, they interfere with each other,# that makes the result profile misleading.raisePySparkRuntimeError(errorClass="CANNOT_SET_TOGETHER",messageParameters={"arg_list":"'spark.python.profile' and ""'spark.python.profile.memory' configuration"},)elifprofiler_enabled:f=self.funcprofiler=sc.profiler_collector.new_udf_profiler(sc)@functools.wraps(f)deffunc(*args:Any,**kwargs:Any)->Any:assertprofilerisnotNonereturnprofiler.profile(f,*args,**kwargs)func.__signature__=inspect.signature(f)# type: ignore[attr-defined]judf=self._create_judf(func)jUDFExpr=judf.builderWithColumns(_to_seq(sc,jcols))jPythonUDF=judf.fromUDFExpr(jUDFExpr)id=jUDFExpr.resultId().id()sc.profiler_collector.add_profiler(id,profiler)else:# memory_profiler_enabledf=self.funcmemory_profiler=sc.profiler_collector.new_memory_profiler(sc)(sub_lines,start_line)=inspect.getsourcelines(f.__code__)@functools.wraps(f)deffunc(*args:Any,**kwargs:Any)->Any:assertmemory_profilerisnotNonereturnmemory_profiler.profile(sub_lines,start_line,f,*args,**kwargs# type: ignore[arg-type])func.__signature__=inspect.signature(f)# type: ignore[attr-defined]judf=self._create_judf(func)jUDFExpr=judf.builderWithColumns(_to_seq(sc,jcols))jPythonUDF=judf.fromUDFExpr(jUDFExpr)id=jUDFExpr.resultId().id()sc.profiler_collector.add_profiler(id,memory_profiler)else:judf=self._judfjPythonUDF=judf.apply(_to_seq(sc,jcols))returnColumn(jPythonUDF)# This function is for improving the online help system in the interactive interpreter.# For example, the built-in help / pydoc.help. It wraps the UDF with the docstring and# argument annotation. (See: SPARK-19161)def_wrapped(self)->"UserDefinedFunctionLike":""" Wrap this udf with a function and attach docstring from func """# It is possible for a callable instance without __name__ attribute or/and# __module__ attribute to be wrapped here. For example, functools.partial. In this case,# we should avoid wrapping the attributes from the wrapped function to the wrapper# function. So, we take out these attribute names from the default names to set and# then manually assign it after being wrapped.assignments=tuple(aforainfunctools.WRAPPER_ASSIGNMENTSifa!="__name__"anda!="__module__")@functools.wraps(self.func,assigned=assignments)defwrapper(*args:"ColumnOrName",**kwargs:"ColumnOrName")->Column:returnself(*args,**kwargs)wrapper.__name__=self._namewrapper.__module__=(self.func.__module__ifhasattr(self.func,"__module__")elseself.func.__class__.__module__)wrapper.func=self.func# type: ignore[attr-defined]wrapper.returnType=self.returnType# type: ignore[attr-defined]wrapper.evalType=self.evalType# type: ignore[attr-defined]wrapper.deterministic=self.deterministic# type: ignore[attr-defined]wrapper.asNondeterministic=functools.wraps(# type: ignore[attr-defined]self.asNondeterministic)(lambda:self.asNondeterministic()._wrapped())wrapper._unwrapped=self# type: ignore[attr-defined]returnwrapper# type: ignore[return-value]
[docs]defasNondeterministic(self)->"UserDefinedFunction":""" Updates UserDefinedFunction to nondeterministic. .. versionadded:: 2.3 """# Here, we explicitly clean the cache to create a JVM UDF instance# with 'deterministic' updated. See SPARK-23233.self._judf_placeholder=Noneself.deterministic=Falsereturnself
[docs]classUDFRegistration:""" Wrapper for user-defined function registration. This instance can be accessed by :attr:`spark.udf` or :attr:`sqlContext.udf`. .. versionadded:: 1.3.1 """def__init__(self,sparkSession:"SparkSession"):self.sparkSession=sparkSession
[docs]defregister(self,name:str,f:Union[Callable[...,Any],"UserDefinedFunctionLike"],returnType:Optional["DataTypeOrString"]=None,)->"UserDefinedFunctionLike":"""Register a Python function (including lambda function) or a user-defined function as a SQL function. .. versionadded:: 1.3.1 .. versionchanged:: 3.4.0 Supports Spark Connect. Parameters ---------- name : str, name of the user-defined function in SQL statements. f : function, :meth:`pyspark.sql.functions.udf` or :meth:`pyspark.sql.functions.pandas_udf` a Python function, or a user-defined function. The user-defined function can be either row-at-a-time or vectorized. See :meth:`pyspark.sql.functions.udf` and :meth:`pyspark.sql.functions.pandas_udf`. returnType : :class:`pyspark.sql.types.DataType` or str, optional the return type of the registered user-defined function. The value can be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. `returnType` can be optionally specified when `f` is a Python function but not when `f` is a user-defined function. Please see the examples below. Returns ------- function a user-defined function Notes ----- To register a nondeterministic Python function, users need to first build a nondeterministic user-defined function for the Python function and then register it as a SQL function. Examples -------- 1. When `f` is a Python function: `returnType` defaults to string type and can be optionally specified. The produced object must match the specified type. In this case, this API works as if `register(name, f, returnType=StringType())`. >>> strlen = spark.udf.register("stringLengthString", lambda x: len(x)) >>> spark.sql("SELECT stringLengthString('test')").collect() [Row(stringLengthString(test)='4')] >>> spark.sql("SELECT 'foo' AS text").select(strlen("text")).collect() [Row(stringLengthString(text)='3')] >>> from pyspark.sql.types import IntegerType >>> _ = spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] >>> from pyspark.sql.types import IntegerType >>> _ = spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] 2. When `f` is a user-defined function (from Spark 2.3.0): Spark uses the return type of the given user-defined function as the return type of the registered user-defined function. `returnType` should not be specified. In this case, this API works as if `register(name, f)`. >>> from pyspark.sql.types import IntegerType >>> from pyspark.sql.functions import udf >>> slen = udf(lambda s: len(s), IntegerType()) >>> _ = spark.udf.register("slen", slen) >>> spark.sql("SELECT slen('test')").collect() [Row(slen(test)=4)] >>> import random >>> from pyspark.sql.functions import udf >>> from pyspark.sql.types import IntegerType >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() >>> new_random_udf = spark.udf.register("random_udf", random_udf) >>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP [Row(random_udf()=82)] >>> import pandas as pd # doctest: +SKIP >>> from pyspark.sql.functions import pandas_udf >>> @pandas_udf("integer") # doctest: +SKIP ... def add_one(s: pd.Series) -> pd.Series: ... return s + 1 ... >>> _ = spark.udf.register("add_one", add_one) # doctest: +SKIP >>> spark.sql("SELECT add_one(id) FROM range(3)").collect() # doctest: +SKIP [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)] >>> @pandas_udf("integer") # doctest: +SKIP ... def sum_udf(v: pd.Series) -> int: ... return v.sum() ... >>> _ = spark.udf.register("sum_udf", sum_udf) # doctest: +SKIP >>> q = "SELECT sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2" >>> spark.sql(q).collect() # doctest: +SKIP [Row(sum_udf(v1)=1), Row(sum_udf(v1)=5)] """# This is to check whether the input function is from a user-defined function or# Python function.ifhasattr(f,"asNondeterministic"):ifreturnTypeisnotNone:raisePySparkTypeError(errorClass="CANNOT_SPECIFY_RETURN_TYPE_FOR_UDF",messageParameters={"arg_name":"f","return_type":str(returnType)},)f=cast("UserDefinedFunctionLike",f)iff.evalTypenotin[PythonEvalType.SQL_BATCHED_UDF,PythonEvalType.SQL_ARROW_BATCHED_UDF,PythonEvalType.SQL_SCALAR_PANDAS_UDF,PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,]:raisePySparkTypeError(errorClass="INVALID_UDF_EVAL_TYPE",messageParameters={"eval_type":"SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, ""SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_PANDAS_ITER_UDF or ""SQL_GROUPED_AGG_PANDAS_UDF"},)source_udf=_create_udf(f.func,returnType=f.returnType,name=name,evalType=f.evalType,deterministic=f.deterministic,)register_udf=source_udf._unwrapped# type: ignore[attr-defined]return_udf=register_udfelse:ifreturnTypeisNone:returnType=StringType()return_udf=_create_udf(f,returnType=returnType,evalType=PythonEvalType.SQL_BATCHED_UDF,name=name)register_udf=return_udf._unwrappedself.sparkSession._jsparkSession.udf().registerPython(name,register_udf._judf)returnreturn_udf
[docs]defregisterJavaFunction(self,name:str,javaClassName:str,returnType:Optional["DataTypeOrString"]=None,)->None:"""Register a Java user-defined function as a SQL function. In addition to a name and the function itself, the return type can be optionally specified. When the return type is not specified we would infer it via reflection. .. versionadded:: 2.3.0 .. versionchanged:: 3.4.0 Supports Spark Connect. Parameters ---------- name : str name of the user-defined function javaClassName : str fully qualified name of java class returnType : :class:`pyspark.sql.types.DataType` or str, optional the return type of the registered Java function. The value can be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. Examples -------- >>> from pyspark.sql.types import IntegerType >>> spark.udf.registerJavaFunction( ... "javaStringLength", "test.org.apache.spark.sql.JavaStringLength", IntegerType()) ... # doctest: +SKIP >>> spark.sql("SELECT javaStringLength('test')").collect() # doctest: +SKIP [Row(javaStringLength(test)=4)] >>> spark.udf.registerJavaFunction( ... "javaStringLength2", "test.org.apache.spark.sql.JavaStringLength") ... # doctest: +SKIP >>> spark.sql("SELECT javaStringLength2('test')").collect() # doctest: +SKIP [Row(javaStringLength2(test)=4)] >>> spark.udf.registerJavaFunction( ... "javaStringLength3", "test.org.apache.spark.sql.JavaStringLength", "integer") ... # doctest: +SKIP >>> spark.sql("SELECT javaStringLength3('test')").collect() # doctest: +SKIP [Row(javaStringLength3(test)=4)] """jdt=NoneifreturnTypeisnotNone:ifnotisinstance(returnType,DataType):returnType=_parse_datatype_string(returnType)jdt=self.sparkSession._jsparkSession.parseDataType(returnType.json())self.sparkSession._jsparkSession.udf().registerJava(name,javaClassName,jdt)
[docs]defregisterJavaUDAF(self,name:str,javaClassName:str)->None:"""Register a Java user-defined aggregate function as a SQL function. .. versionadded:: 2.3.0 .. versionchanged:: 3.4.0 Supports Spark Connect. name : str name of the user-defined aggregate function javaClassName : str fully qualified name of java class Examples -------- >>> spark.udf.registerJavaUDAF("javaUDAF", "test.org.apache.spark.sql.MyDoubleAvg") ... # doctest: +SKIP >>> df = spark.createDataFrame([(1, "a"),(2, "b"), (3, "a")],["id", "name"]) >>> df.createOrReplaceTempView("df") >>> q = "SELECT name, javaUDAF(id) as avg from df group by name order by name desc" >>> spark.sql(q).collect() # doctest: +SKIP [Row(name='b', avg=102.0), Row(name='a', avg=102.0)] """self.sparkSession._jsparkSession.udf().registerJavaUDAF(name,javaClassName)