State Data Source Integration Guide
State data source Guide in Structured Streaming (Experimental)
Overview
State data source provides functionality to manipulate the state from the checkpoint.
As of Spark 4.0, state data source provides the read functionality with a batch query. Additional functionalities including write is on the future roadmap.
NOTE: this data source is currently marked as experimental - source options and the behavior (output) might be subject to change.
Reading state key-values from the checkpoint
State data source enables reading key-value pairs from the state store in the checkpoint, via running a separate batch query. Users can leverage the functionality to cover two major use cases described below:
- Construct a test checking both output and the state. It is non-trivial to deduce the key-value of the state from the output, and having visibility of the state would be a huge win on testing.
- Investigate an incident against stateful streaming query. If users observe the incorrect output and want to track how it came up, having visibility of the state would be required.
Users can read an instance of state store, which is matched to a single stateful operator in most cases. This means, users can expect that they can read the entire key-value pairs in the state for a single stateful operator.
Note that there could be an exception, e.g. stream-stream join, which leverages multiple state store instances internally. The data source abstracts the internal representation away from users and provides a user-friendly approach to read the state. See the section for stream-stream join for more details.
Creating a state store for batch queries (all defaults)
Each row in the source has the following schema:
Column | Type | Note |
---|---|---|
key | struct (depends on the type for state key) | |
value | struct (depends on the type for state value) | |
partition_id | int |
The nested columns for key and value heavily depend on the input schema of the stateful operator as well as the type of operator. Users are encouraged to query about the schema via df.schema() / df.printSchema() first to understand the type of output.
The following options must be set for the source.
Option | Value | Meaning |
---|---|---|
path | string | Specify the root directory of the checkpoint location. You can either specify the path via option("path", `path`) or load(`path`). |
The following configurations are optional:
Option | Value | Default | Meaning |
---|---|---|---|
batchId | numeric value | latest committed batch | Represents the target batch to read from. This option is used when users want to perform time-travel. The batch should be committed but not yet cleaned up. |
operatorId | numeric value | 0 | Represents the target operator to read from. This option is used when the query is using multiple stateful operators. |
storeName | string | DEFAULT | Represents the target state store name to read from. This option is used when the stateful operator uses multiple state store instances. It is not required except stream-stream join. |
joinSide | string ("left" or "right") | (none) | Represents the target side to read from. This option is used when users want to read the state from stream-stream join. |
snapshotStartBatchId | numeric value | If specified, force to read the snapshot at this batch ID, then changelogs will be replayed until 'batchId' or its default. Note that snapshot batch ID starts with 0 and equals to snapshot version ID minus 1. This option must be used together with 'snapshotPartitionId'. | |
snapshotPartitionId | numeric value | If specified, only this specific partition will be read. Note that partition ID starts with 0. This option must be used together with 'snapshotStartBatchId'. | |
readChangeFeed | boolean | false | If set to true, will read the change of state over microbatches. The output table schema will also differ. Details can be found in section "Reading state changes over microbatches". Option 'changeStartBatchId' must be specified with this option. Option 'batchId', 'joinSide', 'snapshotStartBatchId' and 'snapshotPartitionId' cannot be used together with this option. |
changeStartBatchId | numeric value | Represents the first batch to read in the read change feed mode. This option requires 'readChangeFeed' to be set to true. | |
changeEndBatchId | numeric value | latest commited batchId | Represents the last batch to read in the read change feed mode. This option requires 'readChangeFeed' to be set to true. |
Reading state for stream-stream join
Structured Streaming implements the stream-stream join feature via leveraging multiple instances of state store internally. These instances logically compose buffers to store the input rows for left and right.
Since it is more obvious to users to reason about, the data source provides the option ‘joinSide’ to read the buffered input for specific side of the join. To enable the functionality to read the internal state store instance directly, we also allow specifying the option ‘storeName’, with restriction that ‘storeName’ and ‘joinSide’ cannot be specified together.
Reading state changes over microbatches
If we want to understand the change of state store over microbatches instead of the whole state store at a particular microbatch, ‘readChangeFeed’ is the option to use. For example, this is the code to read the change of state from batch 2 to the latest committed batch.
The output schema will also be different from the normal output.
Column | Type | Note |
---|---|---|
batch_id | long | |
change_type | string | There are two possible values: 'update' and 'delete'. Update represents either inserting a non-existing key-value pair or updating an existing key with new value. The 'value' field will be null for delete records. |
key | struct (depends on the type for state key) | |
value | struct (depends on the type for state value) | |
partition_id | int |
State Metadata Source
Before querying the state from existing checkpoint via state data source, users would like to understand the information for the checkpoint, especially about state operator. This includes which operators and state store instances are available in the checkpoint, available range of batch IDs, etc.
Structured Streaming provides a data source named “State metadata source” to provide the state-related metadata information from the checkpoint.
Note: The metadata is constructed when the streaming query is running with Spark 4.0+. The existing checkpoint which has been running with lower Spark version does not have the metadata and will be unable to query/use with this metadata source. It is required to run the streaming query pointing the existing checkpoint in Spark 4.0+ to construct the metadata before querying. Users can optionally provide the batchId to get the operator metadata at a point in time.
Creating a State metadata store for Batch Queries
The following options must be set for the source:
Option | Value | Meaning |
---|---|---|
path | string | Specify the root directory of the checkpoint location. You can either specify the path via option("path", `path`) or load(`path`). |
The following configurations are optional:
Option | Value | Default | Meaning |
---|---|---|---|
batchId | numeric value | Last committed batch if available, else 0 | Optional batchId used to retrieve operator metadata at that batch. |
Each row in the source has the following schema:
Column | Type | Note |
---|---|---|
operatorId | int | |
operatorName | string | |
stateStoreName | int | |
numPartitions | int | |
minBatchId | int | The minimum batch ID available for querying state. The value could be invalid if the streaming query taking the checkpoint is running, as cleanup would run. |
maxBatchId | int | The maximum batch ID available for querying state. The value could be invalid if the streaming query taking the checkpoint is running, as the query will commit further batches. |
operatorProperties | string | List of properties used by the operator encoded as JSON. Output generated here is operator dependent. |
_numColsPrefixKey | int | metadata column (hidden unless specified with SELECT) |
One of the major use cases of this data source is to identify the operatorId to query if the query has multiple stateful operators, e.g. stream-stream join followed by deduplication. The column ‘operatorName’ helps users to identify the operatorId for given operator.
Additionally, if users want to query about an internal state store instance for a stateful operator (e.g. stream-stream join), the column ‘stateStoreName’ would be useful to determine the target.