Class TwsTester<K,I,O>

Object
org.apache.spark.sql.streaming.TwsTester<K,I,O>
Type Parameters:
K - the type of grouping key.
I - the type of input rows.
O - the type of output rows.

public class TwsTester<K,I,O> extends Object
Testing utility for transformWithState stateful processors.

This class enables unit testing of StatefulProcessor business logic by simulating the behavior of transformWithState. It processes input rows and returns output rows equivalent to those that would be produced by the processor in an actual Spark streaming query.

'''Supported:''' - Processing input rows and producing output rows via test(). - Initial state setup via constructor parameter. - Direct state manipulation via updateValueState, updateListState, updateMapState. - Direct state inspection via peekValueState, peekListState, peekMapState. - Timers in ProcessingTime mode (use setProcessingTime to fire timers). - Timers in EventTime mode (use setWatermark to manually set the watermark and fire expired timers). - Late event filtering in EventTime mode.

'''Not Supported:''' - '''TTL'''. States persist indefinitely, even if TTLConfig is set. - '''Automatic watermark propagation''': In production Spark streaming, the watermark is computed from event times and propagated at the end of each microbatch. TwsTester does not simulate this behavior because it processes keys individually rather than in batches. To test watermark-dependent logic, use setWatermark() to manually set the watermark to the desired value before calling test().

'''Use Cases:''' - '''Primary''': Unit testing business logic in handleInputRows implementations. - '''Not recommended''': End-to-end testing or performance testing - use actual Spark streaming queries for those scenarios.

param: processor the StatefulProcessor to test. param: initialState initial state for each key as a list of (key, state) tuples. param: timeMode time mode (None, ProcessingTime or EventTime). param: outputMode output mode (Append, Update, or Complete). param: eventTimeExtractor function to extract event time (in milliseconds) from input rows. Required when using TimeMode.EventTime. Used for late event filtering.

Since:
4.2.0
  • Constructor Summary

    Constructors
    Constructor
    Description
    TwsTester(StatefulProcessor<K,I,O> processor, scala.collection.immutable.List<scala.Tuple2<K,Object>> initialState, TimeMode timeMode, OutputMode outputMode, scala.Option<scala.Function1<I,Object>> eventTimeExtractor)
     
  • Method Summary

    Modifier and Type
    Method
    Description
    void
    deleteState(String stateName, K key)
    Deletes state for a given key.
    <T> scala.collection.immutable.List<T>
    peekListState(String stateName, K key)
    Retrieves the list state for a given key.
    <MK, MV> scala.collection.immutable.Map<MK,MV>
    peekMapState(String stateName, K key)
    Retrieves the map state for a given key.
    <T> scala.Option<T>
    peekValueState(String stateName, K key)
    Retrieves the value state for a given key.
    scala.collection.immutable.List<O>
    setProcessingTime(long currentTimeMs)
    Sets the simulated processing time and fires all expired timers.
    scala.collection.immutable.List<O>
    setWatermark(long currentWatermarkMs)
    Sets the watermark and fires all expired event-time timers.
    scala.collection.immutable.List<O>
    test(K key, scala.collection.immutable.List<I> values)
    Processes input rows for a single key through the stateful processor.
    <T> void
    updateListState(String stateName, K key, scala.collection.immutable.List<T> value, scala.reflect.ClassTag<T> ct)
    Sets the list state for a given key.
    <MK, MV> void
    updateMapState(String stateName, K key, scala.collection.immutable.Map<MK,MV> value)
    Sets the map state for a given key.
    <T> void
    updateValueState(String stateName, K key, T value)
    Sets the value state for a given key.

    Methods inherited from class java.lang.Object

    equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

  • Method Details

    • test

      public scala.collection.immutable.List<O> test(K key, scala.collection.immutable.List<I> values)
      Processes input rows for a single key through the stateful processor.

      In EventTime mode, late events (where event time &lt;= current watermark) are filtered out before reaching the processor, matching the behavior of real Spark streaming.

      The watermark is not automatically advanced based on event times. Use setWatermark() to manually set the watermark before calling test().

      Parameters:
      key - the grouping key
      values - input rows to process
      Returns:
      all output rows produced by the processor
    • updateValueState

      public <T> void updateValueState(String stateName, K key, T value)
      Sets the value state for a given key.
    • peekValueState

      public <T> scala.Option<T> peekValueState(String stateName, K key)
      Retrieves the value state for a given key.
    • updateListState

      public <T> void updateListState(String stateName, K key, scala.collection.immutable.List<T> value, scala.reflect.ClassTag<T> ct)
      Sets the list state for a given key.
    • peekListState

      public <T> scala.collection.immutable.List<T> peekListState(String stateName, K key)
      Retrieves the list state for a given key.
    • updateMapState

      public <MK, MV> void updateMapState(String stateName, K key, scala.collection.immutable.Map<MK,MV> value)
      Sets the map state for a given key.
    • peekMapState

      public <MK, MV> scala.collection.immutable.Map<MK,MV> peekMapState(String stateName, K key)
      Retrieves the map state for a given key.
    • deleteState

      public void deleteState(String stateName, K key)
      Deletes state for a given key.
    • setProcessingTime

      public scala.collection.immutable.List<O> setProcessingTime(long currentTimeMs)
      Sets the simulated processing time and fires all expired timers.

      Call this after test() to simulate time passage and trigger any timers registered with registerTimer(). Timers with expiry time &lt;= current processing time will fire, invoking handleExpiredTimer for each. This mirrors Spark's behavior where timers are processed after input data within a microbatch.

      Parameters:
      currentTimeMs - the processing time to set in milliseconds
      Returns:
      output rows emitted by handleExpiredTimer for all fired timers
    • setWatermark

      public scala.collection.immutable.List<O> setWatermark(long currentWatermarkMs)
      Sets the watermark and fires all expired event-time timers.

      Use this in EventTime mode to manually set the watermark. This is the only way to set the watermark in TwsTester, as automatic watermark propagation based on event times is not supported. Timers with expiry time &lt;= new watermark will fire.

      Parameters:
      currentWatermarkMs - the watermark to set in milliseconds
      Returns:
      output rows emitted by handleExpiredTimer for all fired timers