Source code for pyspark.sql.streaming.stateful_processor
## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#fromabcimportABC,abstractmethodfromtypingimportAny,List,TYPE_CHECKING,Iterator,Optional,Union,Tuplefrompyspark.sql.streaming.stateful_processor_api_clientimport(StatefulProcessorApiClient,ListTimerIterator,)frompyspark.sql.streaming.list_state_clientimportListStateClient,ListStateIteratorfrompyspark.sql.streaming.map_state_clientimport(MapStateClient,MapStateIterator,MapStateKeyValuePairIterator,)frompyspark.sql.streaming.value_state_clientimportValueStateClientfrompyspark.sql.typesimportStructTypeifTYPE_CHECKING:frompyspark.sql.pandas._typingimportDataFrameLikeasPandasDataFrameLike__all__=["StatefulProcessor","StatefulProcessorHandle"]classValueState:""" Class used for arbitrary stateful operations with transformWithState to capture single value state. .. versionadded:: 4.0.0 """def__init__(self,valueStateClient:ValueStateClient,stateName:str)->None:self._valueStateClient=valueStateClientself._stateName=stateNamedefexists(self)->bool:""" Whether state exists or not. """returnself._valueStateClient.exists(self._stateName)defget(self)->Optional[Tuple]:""" Get the state value if it exists. Returns None if the state variable does not have a value. """returnself._valueStateClient.get(self._stateName)defupdate(self,newValue:Tuple)->None:""" Update the value of the state. """self._valueStateClient.update(self._stateName,newValue)defclear(self)->None:""" Remove this state. """self._valueStateClient.clear(self._stateName)classTimerValues:""" Class used for arbitrary stateful operations with transformWithState to access processing time or event time for current batch. .. versionadded:: 4.0.0 """def__init__(self,currentProcessingTimeInMs:int=-1,currentWatermarkInMs:int=-1)->None:self._currentProcessingTimeInMs=currentProcessingTimeInMsself._currentWatermarkInMs=currentWatermarkInMsdefgetCurrentProcessingTimeInMs(self)->int:""" Get processing time for current batch, return timestamp in millisecond. """returnself._currentProcessingTimeInMsdefgetCurrentWatermarkInMs(self)->int:""" Get watermark for current batch, return timestamp in millisecond. """returnself._currentWatermarkInMsclassExpiredTimerInfo:""" Class used to provide access to expired timer's expiry time. .. versionadded:: 4.0.0 """def__init__(self,expiryTimeInMs:int=-1)->None:self._expiryTimeInMs=expiryTimeInMsdefgetExpiryTimeInMs(self)->int:""" Get the timestamp for expired timer, return timestamp in millisecond. """returnself._expiryTimeInMsclassListState:""" Class used for arbitrary stateful operations with transformWithState to capture list value state. .. versionadded:: 4.0.0 """def__init__(self,listStateClient:ListStateClient,stateName:str)->None:self._listStateClient=listStateClientself._stateName=stateNamedefexists(self)->bool:""" Whether list state exists or not. """returnself._listStateClient.exists(self._stateName)defget(self)->Iterator[Tuple]:""" Get list state with an iterator. """returnListStateIterator(self._listStateClient,self._stateName)defput(self,newState:List[Tuple])->None:""" Update the values of the list state. """self._listStateClient.put(self._stateName,newState)defappendValue(self,newState:Tuple)->None:""" Append a new value to the list state. """self._listStateClient.append_value(self._stateName,newState)defappendList(self,newState:List[Tuple])->None:""" Append a list of new values to the list state. """self._listStateClient.append_list(self._stateName,newState)defclear(self)->None:""" Remove this state. """self._listStateClient.clear(self._stateName)classMapState:""" Class used for arbitrary stateful operations with transformWithState to capture single map state. .. versionadded:: 4.0.0 """def__init__(self,mapStateClient:MapStateClient,stateName:str,)->None:self._mapStateClient=mapStateClientself._stateName=stateNamedefexists(self)->bool:""" Whether state exists or not. """returnself._mapStateClient.exists(self._stateName)defgetValue(self,key:Tuple)->Optional[Tuple]:""" Get the state value for given user key if it exists. """returnself._mapStateClient.get_value(self._stateName,key)defcontainsKey(self,key:Tuple)->bool:""" Check if the user key is contained in the map. """returnself._mapStateClient.contains_key(self._stateName,key)defupdateValue(self,key:Tuple,value:Tuple)->None:""" Update value for given user key. """returnself._mapStateClient.update_value(self._stateName,key,value)defiterator(self)->Iterator[Tuple[Tuple,Tuple]]:""" Get the map associated with grouping key. """returnMapStateKeyValuePairIterator(self._mapStateClient,self._stateName)defkeys(self)->Iterator[Tuple]:""" Get the list of keys present in map associated with grouping key. """returnMapStateIterator(self._mapStateClient,self._stateName,True)defvalues(self)->Iterator[Tuple]:""" Get the list of values present in map associated with grouping key. """returnMapStateIterator(self._mapStateClient,self._stateName,False)defremoveKey(self,key:Tuple)->None:""" Remove user key from map state. """returnself._mapStateClient.remove_key(self._stateName,key)defclear(self)->None:""" Remove this state. """self._mapStateClient.clear(self._stateName)classStatefulProcessorHandle:""" Represents the operation handle provided to the stateful processor used in transformWithState API. .. versionadded:: 4.0.0 """def__init__(self,statefulProcessorApiClient:StatefulProcessorApiClient)->None:self._statefulProcessorApiClient=statefulProcessorApiClientdefgetValueState(self,stateName:str,schema:Union[StructType,str],ttlDurationMs:Optional[int]=None)->ValueState:""" Function to create new or return existing single value state variable of given type. The user must ensure to call this function only within the `init()` method of the :class:`StatefulProcessor`. Parameters ---------- stateName : str name of the state variable schema : :class:`pyspark.sql.types.DataType` or str The schema of the state variable. The value can be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. ttlDurationMs: int Time to live duration of the state in milliseconds. State values will not be returned past ttlDuration and will be eventually removed from the state store. Any state update resets the expiration time to current processing time plus ttlDuration. If ttl is not specified the state will never expire. """self._statefulProcessorApiClient.get_value_state(stateName,schema,ttlDurationMs)returnValueState(ValueStateClient(self._statefulProcessorApiClient,schema),stateName)defgetListState(self,stateName:str,schema:Union[StructType,str],ttlDurationMs:Optional[int]=None)->ListState:""" Function to create new or return existing single value state variable of given type. The user must ensure to call this function only within the `init()` method of the :class:`StatefulProcessor`. Parameters ---------- stateName : str name of the state variable schema : :class:`pyspark.sql.types.DataType` or str The schema of the state variable. The value can be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. ttlDurationMs: int Time to live duration of the state in milliseconds. State values will not be returned past ttlDuration and will be eventually removed from the state store. Any state update resets the expiration time to current processing time plus ttlDuration. If ttl is not specified the state will never expire. """self._statefulProcessorApiClient.get_list_state(stateName,schema,ttlDurationMs)returnListState(ListStateClient(self._statefulProcessorApiClient,schema),stateName)defgetMapState(self,stateName:str,userKeySchema:Union[StructType,str],valueSchema:Union[StructType,str],ttlDurationMs:Optional[int]=None,)->MapState:""" Function to create new or return existing single map state variable of given type. The user must ensure to call this function only within the `init()` method of the :class:`StatefulProcessor`. Parameters ---------- stateName : str name of the state variable userKeySchema : :class:`pyspark.sql.types.DataType` or str The schema of the key of map state. The value can be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. valueSchema : :class:`pyspark.sql.types.DataType` or str The schema of the value of map state The value can be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. ttlDurationMs: int Time to live duration of the state in milliseconds. State values will not be returned past ttlDuration and will be eventually removed from the state store. Any state update resets the expiration time to current processing time plus ttlDuration. If ttl is not specified the state will never expire. """self._statefulProcessorApiClient.get_map_state(stateName,userKeySchema,valueSchema,ttlDurationMs)returnMapState(MapStateClient(self._statefulProcessorApiClient,userKeySchema,valueSchema),stateName,)defregisterTimer(self,expiryTimestampMs:int)->None:""" Register a timer for a given expiry timestamp in milliseconds for the grouping key. """self._statefulProcessorApiClient.register_timer(expiryTimestampMs)defdeleteTimer(self,expiryTimestampMs:int)->None:""" Delete a timer for a given expiry timestamp in milliseconds for the grouping key. """self._statefulProcessorApiClient.delete_timer(expiryTimestampMs)deflistTimers(self)->Iterator[int]:""" List all timers of their expiry timestamps in milliseconds for the grouping key. """returnListTimerIterator(self._statefulProcessorApiClient)defdeleteIfExists(self,stateName:str)->None:""" Function to delete and purge state variable if defined previously """self._statefulProcessorApiClient.delete_if_exists(stateName)classStatefulProcessor(ABC):""" Class that represents the arbitrary stateful logic that needs to be provided by the user to perform stateful manipulations on keyed streams. .. versionadded:: 4.0.0 """
[docs]@abstractmethoddefinit(self,handle:StatefulProcessorHandle)->None:""" Function that will be invoked as the first method that allows for users to initialize all their state variables and perform other init actions before handling data. Parameters ---------- handle : :class:`pyspark.sql.streaming.stateful_processor.StatefulProcessorHandle` Handle to the stateful processor that provides access to the state store and other stateful processing related APIs. """...
[docs]@abstractmethoddefhandleInputRows(self,key:Any,rows:Iterator["PandasDataFrameLike"],timerValues:TimerValues,)->Iterator["PandasDataFrameLike"]:""" Function that will allow users to interact with input data rows along with the grouping key. It should take parameters (key, Iterator[`pandas.DataFrame`]) and return another Iterator[`pandas.DataFrame`]. For each group, all columns are passed together as `pandas.DataFrame` to the function, and the returned `pandas.DataFrame` across all invocations are combined as a :class:`DataFrame`. Note that the function should not make a guess of the number of elements in the iterator. To process all data, the `handleInputRows` function needs to iterate all elements and process them. On the other hand, the `handleInputRows` function is not strictly required to iterate through all elements in the iterator if it intends to read a part of data. Parameters ---------- key : Any grouping key. rows : iterable of :class:`pandas.DataFrame` iterator of input rows associated with grouping key timerValues: TimerValues Timer value for the current batch that process the input rows. Users can get the processing or event time timestamp from TimerValues. """...
defhandleExpiredTimer(self,key:Any,timerValues:TimerValues,expiredTimerInfo:ExpiredTimerInfo)->Iterator["PandasDataFrameLike"]:""" Optional to implement. Will act return an empty iterator if not defined. Function that will be invoked when a timer is fired for a given key. Users can choose to evict state, register new timers and optionally provide output rows. Parameters ---------- key : Any grouping key. timerValues: TimerValues Timer value for the current batch that process the input rows. Users can get the processing or event time timestamp from TimerValues. expiredTimerInfo: ExpiredTimerInfo Instance of ExpiredTimerInfo that provides access to expired timer. """returniter([])
[docs]defclose(self)->None:""" Function called as the last method that allows for users to perform any cleanup or teardown operations. """...
[docs]defhandleInitialState(self,key:Any,initialState:"PandasDataFrameLike",timerValues:TimerValues)->None:""" Optional to implement. Will act as no-op if not defined or no initial state input. Function that will be invoked only in the first batch for users to process initial states. Parameters ---------- key : Any grouping key. initialState: :class:`pandas.DataFrame` One dataframe in the initial state associated with the key. timerValues: TimerValues Timer value for the current batch that process the input rows. Users can get the processing or event time timestamp from TimerValues. """pass