Structured Streaming Programming Guide
Real-time Mode
Real-time Mode is a new streaming execution mode introduced in Spark 4.1.0 that targets ultra-low end-to-end latency with the exact same API and processing guarantees / semantics as the current structured streaming engine. It is intended for operational workloads that must react to data the moment it arrives, such as fraud detection, real-time alerting, and live personalization.
In this release, Real-time Mode in Apache Spark supports stateless queries only – projections,
filters and other map-like operations, unions, and stream-static joins. Stateful operations such as
streaming aggregations, deduplication, stream-stream joins, and transformWithState are not yet
supported, but support for them is planned starting in Spark 4.3. See
Supported Queries for the full list.
The most important thing to know: the duration you pass to the trigger (default 5 minutes) is a checkpoint interval, not a latency target. Records are processed and emitted continuously rather than at batch boundaries, so the trigger duration does not set latency the way a micro-batch interval does. See Batch Duration Is a Checkpoint Interval.
You enable Real-time Mode by setting a Real-time trigger on the streaming write; the rest of your query is unchanged. See Enabling Real-time Mode.
How Real-time Mode Works
By default, Structured Streaming runs a query as a series of small batch jobs – the micro-batch model. For each micro-batch, the driver plans the batch and launches a fresh set of short-lived tasks. Those tasks read and process a bounded slice of the input, and the driver commits progress before planning the next batch. The fixed per-batch planning and task-scheduling overhead places a floor on end-to-end latency.
Real-time Mode removes this per-batch overhead by launching long-running tasks – one per input partition. These tasks stay alive for the duration of a (long) batch and process records continuously as they arrive. Because tasks are scheduled once per batch rather than once per slice of data, records flow through the operator pipeline (source -> transformations -> sink) without waiting for a batch boundary. End-to-end latency drops from the ~100 ms micro-batch floor to roughly the time needed to process and ship one record (often a few milliseconds).
Since records never wait for a batch boundary, the batch duration mainly controls how often the query checkpoints progress – as the next section explains.
Batch Duration Is a Checkpoint Interval
In Real-time Mode, the batch duration is a checkpoint interval, not a latency interval. With the default 5-minute duration, the query still emits records within milliseconds; the 5 minutes only controls how often it commits progress and starts the next long-running batch. This is the opposite of the micro-batch engine, where a longer batch interval directly increases latency.
Do not confuse the 5-minute default trigger duration with the 5-second minimum allowed duration described under Requirements: the former is the checkpoint cadence used when you do not specify a duration, while the latter is the smallest duration you are allowed to set.
Choosing the batch duration is a trade-off:
- A shorter batch duration checkpoints more often, giving finer-grained recovery (less work to re-process after a failure). However, the query does not process data while it commits progress and starts the next batch, so checkpointing too frequently adds more of these gaps, which can raise tail (p99) latency, in addition to incurring more planning and commit overhead.
- A longer batch duration checkpoints less often, reducing that overhead and those gaps, at the cost of coarser-grained recovery (more data re-processed after a failure).
The duration is set on the Real-time trigger, as shown under Enabling Real-time Mode.
Comparison with Other Modes
The table below summarizes how Real-time Mode relates to the default micro-batch engine and to the experimental Continuous Processing mode. See How Real-time Mode Works for the mechanism and Supported Queries for the full list of supported operations.
| Mode | Latency | Processing Guarantees | Supported operations | When to use |
|---|---|---|---|---|
| Micro-batch (default) | ~100 ms | Exactly-once | All streaming operations, including stateful | Stateful or higher-throughput workloads, or queries Real-time Mode does not yet support |
| Real-time Mode | millisecond-scale | Exactly-once | Stateless today (map-like operations, unions, and stream-static joins); designed to support all query shapes, including stateful | Low-latency workloads |
| Continuous Processing (experimental) | ~1 ms | At-least-once | Map-like only (projections and selections); no stateful operations | Legacy; use Real-time Mode instead |
The Processing Guarantees column refers to processing semantics, defined under Fault Tolerance; end-to-end delivery additionally depends on the sink and is independent of the execution mode.
Real-time Mode and Continuous Processing both target millisecond-scale latency, but they differ substantially:
- Continuous Processing (introduced in Spark 2.3) is, and remains, experimental. It supports only map-like operations – projections and selections – with no stateful operations such as aggregations or joins, and it provides at-least-once guarantees. Because it is stateless, the exactly-once processing guarantee discussed under Fault Tolerance does not apply to it. These constraints have limited its adoption.
- Real-time Mode is designed to support all query shapes, including stateful operations, while reusing Spark’s mature components such as state management, the Catalyst optimizer, and the existing SQL operators. It provides exactly-once processing semantics. It currently supports stateless queries, with stateful support planned starting in Spark 4.3.
For new low-latency workloads, prefer Real-time Mode over Continuous Processing.
Enabling Real-time Mode
To run a supported query in Real-time Mode, set a Real-time trigger on the streaming write. Everything else in the query stays the same. For example, the following query reads from a Kafka topic, applies a stateless transformation, and writes the result to another Kafka topic. Records flow through with low latency even though the trigger is 5 minutes.
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "input-topic") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "output-topic") \
.option("checkpointLocation", "/path/to/checkpoint") \
.outputMode("update") \
.trigger(realTime="5 minutes") \
.start()import org.apache.spark.sql.streaming.Trigger
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "output-topic")
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("update")
.trigger(Trigger.RealTime("5 minutes")) // enable Real-time Mode
.start()import org.apache.spark.sql.streaming.Trigger;
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "output-topic")
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("update")
.trigger(Trigger.RealTime("5 minutes")) // enable Real-time Mode
.start();Trigger API
- Scala and Java: the trigger is
Trigger.RealTime(...), imported fromorg.apache.spark.sql.streaming.Trigger. Several forms are available:Trigger.RealTime()– uses the default batch duration of 5 minutes.Trigger.RealTime("5 minutes")– a duration string.Trigger.RealTime(300000)– the batch duration in milliseconds, as along.Trigger.RealTime(5, TimeUnit.MINUTES)– a value together with ajava.util.concurrent.TimeUnit.Trigger.RealTime(Duration("10 seconds"))– a Scalascala.concurrent.duration.Duration.
- Python: pass the batch duration as a string to the
realTimekeyword argument oftrigger(), for example.trigger(realTime="5 minutes"). The duration is required in Python.
Requirements
A query must satisfy all of the following before it can start in Real-time Mode; each is checked when the query starts:
- The output mode must be
update. Any other output mode fails to start withSTREAMING_REAL_TIME_MODE.OUTPUT_MODE_NOT_SUPPORTED. - A
checkpointLocationis required, as with any other Structured Streaming query. - The batch duration must be at least
spark.sql.streaming.realTimeMode.minBatchDuration(5000 ms, i.e. 5 seconds, by default); a shorter interval fails to start withINVALID_STREAMING_REAL_TIME_MODE_TRIGGER_INTERVAL. The duration string must parse to a positive interval, and month-based intervals (for example,"1 month") are not accepted. (This 5-second minimum is distinct from the 5-minute default; see Batch Duration Is a Checkpoint Interval.)
Supported Queries
Real-time Mode supports stateless, map-like queries only.
The following operations, sources, and sinks are supported as of Spark 4.1.0:
- Operations: stateless, map-like operations are supported:
- Projections:
select,selectExpr,withColumn,drop, and the typedmap/flatMapDataset operations. - Selections:
where/filter. - Expressions that compile to a projection – including functions such as
from_json/to_jsonand scalar user-defined functions (UDFs). - Column generators such as
explode. unionof two or more distinct streaming sources. Referencing the same source DataFrame more than once is not supported and fails withSTREAMING_REAL_TIME_MODE.IDENTICAL_SOURCES_IN_UNION_NOT_SUPPORTED; create a separate DataFrame for each source instead.- Stream-static joins, where a streaming DataFrame is joined with a static DataFrame. The static
side must be broadcast (use the
broadcast(...)hint), because Real-time Mode does not support shuffles. withWatermark(event-time watermark declaration) is allowed, although it has no effect because stateful operators are not supported. This lets queries that already declare a watermark run in Real-time Mode without modification.
- Projections:
-
Sources: the source must support Real-time Mode. In Apache Spark, the Kafka source supports Real-time Mode. An unsupported source fails with
STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED. (The built-inratesource is not supported as a Real-time source.) - Sinks:
- Kafka sink.
- Foreach sink (via
ForeachWriter), for writing to arbitrary external systems one record at a time. See Using Foreach. Note thatforeachBatchis not supported, because it processes each batch as a whole rather than one record at a time. - Console and memory sinks, which are useful for development and debugging.
Other sinks fail with
STREAMING_REAL_TIME_MODE.SINK_NOT_SUPPORTED.
The operators and sinks used by a Real-time query are checked against an allowlist before the query
starts; anything outside the allowlist fails with
STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST.
Not supported
Stateful operations of any kind are not supported in this release. This includes streaming
aggregations, dropDuplicates / dropDuplicatesWithinWatermark, stream-stream joins, repartition
and other operations that introduce a shuffle, and stateful operators such as
flatMapGroupsWithState and transformWithState. Support for stateful operations is planned
starting in Spark 4.3. Asynchronous progress tracking is also not supported; enabling it fails with
STREAMING_REAL_TIME_MODE.ASYNC_PROGRESS_TRACKING_NOT_SUPPORTED.
Fault Tolerance
Real-time Mode provides the same exactly-once processing guarantees as the default micro-batch engine. Two distinct guarantees are worth separating:
- Exactly-once processing means every input record’s effect on the state the engine manages (for example, aggregation counts) is applied effectively once, even across failures and restarts.
- Delivery semantics describe whether a record may be written to the external system more than once. This is a property of the sink, not the execution mode.
Real-time Mode is exactly-once with respect to processing. End-to-end delivery depends on the sink: a sink that performs idempotent or transactional writes can deliver exactly-once, while other sinks deliver at-least-once (duplicates are possible after a failure). The built-in Kafka sink provides at-least-once delivery, with or without Real-time Mode. Real-time Mode does not yet ship an exactly-once sink, though one can be implemented.
Internally, offsets are committed at the end of each batch, after the corresponding records have already been written to the sink. If a query fails partway through a batch, it resumes from the last committed offsets on restart and may re-write records emitted before the failure. Design sinks to tolerate duplicates – for example, with idempotent writes – where exactly-once output matters.
Examples
The following examples read from Kafka and assume a running Kafka cluster. Each example shows the same query in Python, Scala, and Java.
Stream-static join
Enrich a stream by joining it with a static reference dataset. The static side is wrapped in
broadcast(...) so the join is executed as a broadcast (map-side) join, which avoids a shuffle.
from pyspark.sql.functions import broadcast
# Static reference data, read once as a batch DataFrame.
reference = spark.read.format("parquet").load("/path/to/reference")
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "input-topic") \
.load() \
.selectExpr("CAST(key AS STRING) AS joinKey", "CAST(value AS STRING) AS value") \
.join(broadcast(reference), "joinKey") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "output-topic") \
.option("checkpointLocation", "/path/to/checkpoint") \
.outputMode("update") \
.trigger(realTime="5 minutes") \
.start()import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.streaming.Trigger
// Static reference data, read once as a batch DataFrame.
val reference = spark.read.format("parquet").load("/path/to/reference")
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(key AS STRING) AS joinKey", "CAST(value AS STRING) AS value")
.join(broadcast(reference), "joinKey")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "output-topic")
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("update")
.trigger(Trigger.RealTime("5 minutes"))
.start()import static org.apache.spark.sql.functions.broadcast;
import org.apache.spark.sql.streaming.Trigger;
// Static reference data, read once as a batch DataFrame.
Dataset<Row> reference = spark.read().format("parquet").load("/path/to/reference");
spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(key AS STRING) AS joinKey", "CAST(value AS STRING) AS value")
.join(broadcast(reference), "joinKey")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "output-topic")
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("update")
.trigger(Trigger.RealTime("5 minutes"))
.start();Writing to the console for development
The console sink prints output to the driver’s standard output and is handy while developing a query. Note that the console sink buffers each batch’s rows and prints them when the batch commits, so its output appears once per batch – here, every 30 seconds – rather than continuously. This makes it useful for inspecting results, but it does not reflect Real-time Mode’s true per-record latency; to observe that, use a row-by-row sink such as Kafka. A shorter batch duration simply makes the console refresh more often.
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "input-topic") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.where("value IS NOT NULL") \
.writeStream \
.format("console") \
.option("checkpointLocation", "/path/to/checkpoint") \
.outputMode("update") \
.trigger(realTime="30 seconds") \
.start()import org.apache.spark.sql.streaming.Trigger
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.where("value IS NOT NULL")
.writeStream
.format("console")
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("update")
.trigger(Trigger.RealTime("30 seconds"))
.start()import org.apache.spark.sql.streaming.Trigger;
spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.where("value IS NOT NULL")
.writeStream()
.format("console")
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("update")
.trigger(Trigger.RealTime("30 seconds"))
.start();Configuration
| Configuration | Default | Meaning |
|---|---|---|
spark.sql.streaming.realTimeMode.minBatchDuration |
5000 (ms, 5 seconds) |
The minimum batch duration, in milliseconds, allowed for a Real-time trigger. See the batch-duration requirement under Requirements. |
spark.sql.streaming.realTimeMode.allowlistCheck |
true |
Whether to verify that all operators and sinks used by a Real-time query are in the supported allowlist. Disabling this check (not recommended) lets unsupported operators and sinks run at your own risk. |
Best Practices
- Real-time Mode launches long-running tasks – one per input partition – that continuously read,
process, and write data. The number of tasks a query needs depends on how many partitions it reads
from its sources in parallel. Before starting a Real-time query, ensure the cluster has enough
cores to run all of these tasks simultaneously and continuously. For example, reading from a Kafka
topic with 10 partitions requires at least 10 cores for the query to make progress. Real-time Mode
uses a fixed 1:1 mapping between Kafka topic partitions and reader tasks; the
minPartitionsoption is not supported in Real-time Mode. - Run a single Real-time query per cluster. Because Real-time Mode holds its task slots for the entire batch duration, any other queries sharing the cluster compete for the same slots, which can starve the Real-time query of resources and increase its latency.
Caveats
- Real-time Mode provides exactly-once processing semantics, but sinks may receive duplicate records after a failure. See Fault Tolerance for how to design sinks for exactly-once writes.
- Adaptive Query Execution (AQE) is not supported for Real-time Mode queries.