Class TwsTester<K,I,O>
- Type Parameters:
K- the type of grouping key.I- the type of input rows.O- the type of output rows.
This class enables unit testing of StatefulProcessor business logic by simulating the behavior of transformWithState. It processes input rows and returns output rows equivalent to those that would be produced by the processor in an actual Spark streaming query.
'''Supported:'''
- Processing input rows and producing output rows via test().
- Initial state setup via constructor parameter.
- Direct state manipulation via updateValueState, updateListState, updateMapState.
- Direct state inspection via peekValueState, peekListState, peekMapState.
- Timers in ProcessingTime mode (use setProcessingTime to fire timers).
- Timers in EventTime mode (use setWatermark to manually set the watermark
and fire expired timers).
- Late event filtering in EventTime mode.
'''Not Supported:'''
- '''TTL'''. States persist indefinitely, even if TTLConfig is set.
- '''Automatic watermark propagation''': In production Spark streaming, the watermark is
computed from event times and propagated at the end of each microbatch. TwsTester does
not simulate this behavior because it processes keys individually rather than in batches.
To test watermark-dependent logic, use setWatermark() to manually set the watermark
to the desired value before calling test().
'''Use Cases:'''
- '''Primary''': Unit testing business logic in handleInputRows implementations.
- '''Not recommended''': End-to-end testing or performance testing - use actual Spark
streaming queries for those scenarios.
param: processor the StatefulProcessor to test. param: initialState initial state for each key as a list of (key, state) tuples. param: timeMode time mode (None, ProcessingTime or EventTime). param: outputMode output mode (Append, Update, or Complete). param: eventTimeExtractor function to extract event time (in milliseconds) from input rows. Required when using TimeMode.EventTime. Used for late event filtering.
- Since:
- 4.2.0
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoiddeleteState(String stateName, K key) Deletes state for a given key.<T> scala.collection.immutable.List<T>peekListState(String stateName, K key) Retrieves the list state for a given key.<MK,MV> scala.collection.immutable.Map<MK, MV> peekMapState(String stateName, K key) Retrieves the map state for a given key.<T> scala.Option<T>peekValueState(String stateName, K key) Retrieves the value state for a given key.scala.collection.immutable.List<O>setProcessingTime(long currentTimeMs) Sets the simulated processing time and fires all expired timers.scala.collection.immutable.List<O>setWatermark(long currentWatermarkMs) Sets the watermark and fires all expired event-time timers.scala.collection.immutable.List<O>Processes input rows for a single key through the stateful processor.<T> voidupdateListState(String stateName, K key, scala.collection.immutable.List<T> value, scala.reflect.ClassTag<T> ct) Sets the list state for a given key.<MK,MV> void updateMapState(String stateName, K key, scala.collection.immutable.Map<MK, MV> value) Sets the map state for a given key.<T> voidupdateValueState(String stateName, K key, T value) Sets the value state for a given key.
-
Constructor Details
-
TwsTester
public TwsTester(StatefulProcessor<K, I, O> processor, scala.collection.immutable.List<scala.Tuple2<K, Object>> initialState, TimeMode timeMode, OutputMode outputMode, scala.Option<scala.Function1<I, Object>> eventTimeExtractor)
-
-
Method Details
-
test
Processes input rows for a single key through the stateful processor.In EventTime mode, late events (where event time <= current watermark) are filtered out before reaching the processor, matching the behavior of real Spark streaming.
The watermark is not automatically advanced based on event times. Use
setWatermark()to manually set the watermark before callingtest().- Parameters:
key- the grouping keyvalues- input rows to process- Returns:
- all output rows produced by the processor
-
updateValueState
Sets the value state for a given key. -
peekValueState
Retrieves the value state for a given key. -
updateListState
public <T> void updateListState(String stateName, K key, scala.collection.immutable.List<T> value, scala.reflect.ClassTag<T> ct) Sets the list state for a given key. -
peekListState
Retrieves the list state for a given key. -
updateMapState
public <MK,MV> void updateMapState(String stateName, K key, scala.collection.immutable.Map<MK, MV> value) Sets the map state for a given key. -
peekMapState
Retrieves the map state for a given key. -
deleteState
Deletes state for a given key. -
setProcessingTime
Sets the simulated processing time and fires all expired timers.Call this after
test()to simulate time passage and trigger any timers registered withregisterTimer(). Timers with expiry time <= current processing time will fire, invokinghandleExpiredTimerfor each. This mirrors Spark's behavior where timers are processed after input data within a microbatch.- Parameters:
currentTimeMs- the processing time to set in milliseconds- Returns:
- output rows emitted by
handleExpiredTimerfor all fired timers
-
setWatermark
Sets the watermark and fires all expired event-time timers.Use this in EventTime mode to manually set the watermark. This is the only way to set the watermark in TwsTester, as automatic watermark propagation based on event times is not supported. Timers with expiry time <= new watermark will fire.
- Parameters:
currentWatermarkMs- the watermark to set in milliseconds- Returns:
- output rows emitted by
handleExpiredTimerfor all fired timers
-