Structured Streaming Programming Guide

Real-time Mode

Real-time Mode is a new streaming execution mode introduced in Spark 4.1.0 that targets ultra-low end-to-end latency with the exact same API and processing guarantees / semantics as the current structured streaming engine. It is intended for operational workloads that must react to data the moment it arrives, such as fraud detection, real-time alerting, and live personalization.

In this release, Real-time Mode in Apache Spark supports stateless queries only – projections, filters and other map-like operations, unions, and stream-static joins. Stateful operations such as streaming aggregations, deduplication, stream-stream joins, and transformWithState are not yet supported, but support for them is planned starting in Spark 4.3. See Supported Queries for the full list.

The most important thing to know: the duration you pass to the trigger (default 5 minutes) is a checkpoint interval, not a latency target. Records are processed and emitted continuously rather than at batch boundaries, so the trigger duration does not set latency the way a micro-batch interval does. See Batch Duration Is a Checkpoint Interval.

You enable Real-time Mode by setting a Real-time trigger on the streaming write; the rest of your query is unchanged. See Enabling Real-time Mode.

How Real-time Mode Works

By default, Structured Streaming runs a query as a series of small batch jobs – the micro-batch model. For each micro-batch, the driver plans the batch and launches a fresh set of short-lived tasks. Those tasks read and process a bounded slice of the input, and the driver commits progress before planning the next batch. The fixed per-batch planning and task-scheduling overhead places a floor on end-to-end latency.

Real-time Mode removes this per-batch overhead by launching long-running tasks – one per input partition. These tasks stay alive for the duration of a (long) batch and process records continuously as they arrive. Because tasks are scheduled once per batch rather than once per slice of data, records flow through the operator pipeline (source -> transformations -> sink) without waiting for a batch boundary. End-to-end latency drops from the ~100 ms micro-batch floor to roughly the time needed to process and ship one record (often a few milliseconds).

Since records never wait for a batch boundary, the batch duration mainly controls how often the query checkpoints progress – as the next section explains.

Batch Duration Is a Checkpoint Interval

In Real-time Mode, the batch duration is a checkpoint interval, not a latency interval. With the default 5-minute duration, the query still emits records within milliseconds; the 5 minutes only controls how often it commits progress and starts the next long-running batch. This is the opposite of the micro-batch engine, where a longer batch interval directly increases latency.

Do not confuse the 5-minute default trigger duration with the 5-second minimum allowed duration described under Requirements: the former is the checkpoint cadence used when you do not specify a duration, while the latter is the smallest duration you are allowed to set.

Choosing the batch duration is a trade-off:

The duration is set on the Real-time trigger, as shown under Enabling Real-time Mode.

Comparison with Other Modes

The table below summarizes how Real-time Mode relates to the default micro-batch engine and to the experimental Continuous Processing mode. See How Real-time Mode Works for the mechanism and Supported Queries for the full list of supported operations.

Mode Latency Processing Guarantees Supported operations When to use
Micro-batch (default) ~100 ms Exactly-once All streaming operations, including stateful Stateful or higher-throughput workloads, or queries Real-time Mode does not yet support
Real-time Mode millisecond-scale Exactly-once Stateless today (map-like operations, unions, and stream-static joins); designed to support all query shapes, including stateful Low-latency workloads
Continuous Processing (experimental) ~1 ms At-least-once Map-like only (projections and selections); no stateful operations Legacy; use Real-time Mode instead

The Processing Guarantees column refers to processing semantics, defined under Fault Tolerance; end-to-end delivery additionally depends on the sink and is independent of the execution mode.

Real-time Mode and Continuous Processing both target millisecond-scale latency, but they differ substantially:

For new low-latency workloads, prefer Real-time Mode over Continuous Processing.

Enabling Real-time Mode

To run a supported query in Real-time Mode, set a Real-time trigger on the streaming write. Everything else in the query stays the same. For example, the following query reads from a Kafka topic, applies a stateless transformation, and writes the result to another Kafka topic. Records flow through with low latency even though the trigger is 5 minutes.

spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "input-topic") \
  .load() \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "output-topic") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .outputMode("update") \
  .trigger(realTime="5 minutes") \
  .start()
import org.apache.spark.sql.streaming.Trigger

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "input-topic")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "output-topic")
  .option("checkpointLocation", "/path/to/checkpoint")
  .outputMode("update")
  .trigger(Trigger.RealTime("5 minutes"))  // enable Real-time Mode
  .start()
import org.apache.spark.sql.streaming.Trigger;

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "input-topic")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "output-topic")
  .option("checkpointLocation", "/path/to/checkpoint")
  .outputMode("update")
  .trigger(Trigger.RealTime("5 minutes"))  // enable Real-time Mode
  .start();

Trigger API

Requirements

A query must satisfy all of the following before it can start in Real-time Mode; each is checked when the query starts:

Supported Queries

Real-time Mode supports stateless, map-like queries only.

The following operations, sources, and sinks are supported as of Spark 4.1.0:

The operators and sinks used by a Real-time query are checked against an allowlist before the query starts; anything outside the allowlist fails with STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST.

Not supported

Stateful operations of any kind are not supported in this release. This includes streaming aggregations, dropDuplicates / dropDuplicatesWithinWatermark, stream-stream joins, repartition and other operations that introduce a shuffle, and stateful operators such as flatMapGroupsWithState and transformWithState. Support for stateful operations is planned starting in Spark 4.3. Asynchronous progress tracking is also not supported; enabling it fails with STREAMING_REAL_TIME_MODE.ASYNC_PROGRESS_TRACKING_NOT_SUPPORTED.

Fault Tolerance

Real-time Mode provides the same exactly-once processing guarantees as the default micro-batch engine. Two distinct guarantees are worth separating:

Real-time Mode is exactly-once with respect to processing. End-to-end delivery depends on the sink: a sink that performs idempotent or transactional writes can deliver exactly-once, while other sinks deliver at-least-once (duplicates are possible after a failure). The built-in Kafka sink provides at-least-once delivery, with or without Real-time Mode. Real-time Mode does not yet ship an exactly-once sink, though one can be implemented.

Internally, offsets are committed at the end of each batch, after the corresponding records have already been written to the sink. If a query fails partway through a batch, it resumes from the last committed offsets on restart and may re-write records emitted before the failure. Design sinks to tolerate duplicates – for example, with idempotent writes – where exactly-once output matters.

Examples

The following examples read from Kafka and assume a running Kafka cluster. Each example shows the same query in Python, Scala, and Java.

Stream-static join

Enrich a stream by joining it with a static reference dataset. The static side is wrapped in broadcast(...) so the join is executed as a broadcast (map-side) join, which avoids a shuffle.

from pyspark.sql.functions import broadcast

# Static reference data, read once as a batch DataFrame.
reference = spark.read.format("parquet").load("/path/to/reference")

spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "input-topic") \
  .load() \
  .selectExpr("CAST(key AS STRING) AS joinKey", "CAST(value AS STRING) AS value") \
  .join(broadcast(reference), "joinKey") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "output-topic") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .outputMode("update") \
  .trigger(realTime="5 minutes") \
  .start()
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.streaming.Trigger

// Static reference data, read once as a batch DataFrame.
val reference = spark.read.format("parquet").load("/path/to/reference")

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "input-topic")
  .load()
  .selectExpr("CAST(key AS STRING) AS joinKey", "CAST(value AS STRING) AS value")
  .join(broadcast(reference), "joinKey")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "output-topic")
  .option("checkpointLocation", "/path/to/checkpoint")
  .outputMode("update")
  .trigger(Trigger.RealTime("5 minutes"))
  .start()
import static org.apache.spark.sql.functions.broadcast;
import org.apache.spark.sql.streaming.Trigger;

// Static reference data, read once as a batch DataFrame.
Dataset<Row> reference = spark.read().format("parquet").load("/path/to/reference");

spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "input-topic")
  .load()
  .selectExpr("CAST(key AS STRING) AS joinKey", "CAST(value AS STRING) AS value")
  .join(broadcast(reference), "joinKey")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "output-topic")
  .option("checkpointLocation", "/path/to/checkpoint")
  .outputMode("update")
  .trigger(Trigger.RealTime("5 minutes"))
  .start();

Writing to the console for development

The console sink prints output to the driver’s standard output and is handy while developing a query. Note that the console sink buffers each batch’s rows and prints them when the batch commits, so its output appears once per batch – here, every 30 seconds – rather than continuously. This makes it useful for inspecting results, but it does not reflect Real-time Mode’s true per-record latency; to observe that, use a row-by-row sink such as Kafka. A shorter batch duration simply makes the console refresh more often.

spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "input-topic") \
  .load() \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .where("value IS NOT NULL") \
  .writeStream \
  .format("console") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .outputMode("update") \
  .trigger(realTime="30 seconds") \
  .start()
import org.apache.spark.sql.streaming.Trigger

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "input-topic")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .where("value IS NOT NULL")
  .writeStream
  .format("console")
  .option("checkpointLocation", "/path/to/checkpoint")
  .outputMode("update")
  .trigger(Trigger.RealTime("30 seconds"))
  .start()
import org.apache.spark.sql.streaming.Trigger;

spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "input-topic")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .where("value IS NOT NULL")
  .writeStream()
  .format("console")
  .option("checkpointLocation", "/path/to/checkpoint")
  .outputMode("update")
  .trigger(Trigger.RealTime("30 seconds"))
  .start();

Configuration

Configuration Default Meaning
spark.sql.streaming.realTimeMode.minBatchDuration 5000 (ms, 5 seconds) The minimum batch duration, in milliseconds, allowed for a Real-time trigger. See the batch-duration requirement under Requirements.
spark.sql.streaming.realTimeMode.allowlistCheck true Whether to verify that all operators and sinks used by a Real-time query are in the supported allowlist. Disabling this check (not recommended) lets unsupported operators and sinks run at your own risk.

Best Practices

Caveats