Iceberg Support in Velox Backend

Supported Spark version

All the spark version is supported, but for convenience, only Spark 3.4 is well tested. Now only read is supported in Gluten.

Support Status

Following value indicates the iceberg support progress:

Value Description
Offload Offload to the Velox backend
PartialOffload Some operators offload and some fallback
Fallback Fallback to spark to execute
Exception Cannot fallback by some conditions, throw the exception
ResultMismatch Some hidden bug may cause result mismatch, especially for some corner case

Adding catalogs

Fallback

Creating a table

Fallback

Writing

Fallback

INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');

PartialOffload

The write is fallback while read is offload.

INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1;

Reading

Read data

Offload/Fallback

Table Type No Delete Position Delete Equality Delete
unpartition Offload Offload Fallback
partition Fallback mostly Fallback mostly Fallback
metadata Fallback    

Offload the simple query.

SELECT count(1) as count, data
FROM local.db.table
GROUP BY data;

If delete by Spark and copy on read, will generate position delete file, the query may offload.

If delete by Flink, may generate the equality delete file, fallback in tht case.

Now we only offload the simple query, for partition table, many operators are fallback by Expression StaticInvoke such as BucketFunction, wait to be supported.

DataFrame reads are supported and can now reference tables by name using spark.table:

val df = spark.table("local.db.table")
df.count()

Read metadata

Fallback

SELECT data, _file FROM local.db.table;

DataType

Timestamptz in orc format is not supported, throws exception. UUID type and Fixed type is fallback.

Format

PartialOffload

Supports parquet and orc format. Not support avro format.

SQL

Only support SELECT.

Schema evolution

PartialOffload

Gluten uses column name to match the parquet file, so if the column is renamed or the added column name is same to the deleted column, the scan will fall back.

Configuration

Catalogs

All the catalog configurations are transparent to Gluten

SQL Extensions

Fallback

Supports the option spark.sql.extensions, fallback the SQL command CALL.

Runtime configuration

The “Gluten Support” column is now ready to be populated with:

✅ Supported
❌ Not Supported
⚠️ Partial Support
🔄 In Progress
🚫 Not applied or transparent to Gluten

Spark SQL Options

| Spark option | Default | Description | Gluten Support | | — | — | — | — | | spark.sql.iceberg.vectorization.enabled | Table default | Enables vectorized reads of data files |❌| | spark.sql.iceberg.parquet.reader-type | ICEBERG | Sets Parquet reader implementation (ICEBERG,COMET) |✅ | | spark.sql.iceberg.check-nullability | true | Validate that the write schema’s nullability matches the table’s nullability |✅ | | spark.sql.iceberg.check-ordering | true | Validates the write schema column order matches the table schema order |✅ | | spark.sql.iceberg.planning.preserve-data-grouping | false | When true, co-locate scan tasks for the same partition in the same read split, used in Storage Partitioned Joins |✅ | | spark.sql.iceberg.aggregate-push-down.enabled | true | Enables pushdown of aggregate functions (MAX, MIN, COUNT) | | | spark.sql.iceberg.distribution-mode | See Spark Writes | Controls distribution strategy during writes | ✅ | | spark.wap.id | null | Write-Audit-Publish snapshot staging ID | | | spark.wap.branch | null | WAP branch name for snapshot commit | | | spark.sql.iceberg.compression-codec | Table default | Write compression codec (e.g., zstd, snappy) | | | spark.sql.iceberg.compression-level | Table default | Compression level for Parquet/Avro | | | spark.sql.iceberg.compression-strategy | Table default | Compression strategy for ORC | | | spark.sql.iceberg.data-planning-mode | AUTO | Scan planning mode for data files (AUTO, LOCAL, DISTRIBUTED) | | | spark.sql.iceberg.delete-planning-mode | AUTO | Scan planning mode for delete files (AUTO, LOCAL, DISTRIBUTED) | | | spark.sql.iceberg.advisory-partition-size | Table default | Advisory size (bytes) used for writing to the Table when Spark’s Adaptive Query Execution is enabled. Used to size output files | | | spark.sql.iceberg.locality.enabled | false | Report locality information for Spark task placement on executors |✅ | | spark.sql.iceberg.executor-cache.enabled | true | Enables cache for executor-side (currently used to cache Delete Files) |❌| | spark.sql.iceberg.executor-cache.timeout | 10 | Timeout in minutes for executor cache entries |❌| | spark.sql.iceberg.executor-cache.max-entry-size | 67108864 (64MB) | Max size per cache entry (bytes) |❌| | spark.sql.iceberg.executor-cache.max-total-size | 134217728 (128MB) | Max total executor cache size (bytes) |❌| | spark.sql.iceberg.executor-cache.locality.enabled | false | Enables locality-aware executor cache usage |❌| | spark.sql.iceberg.merge-schema | false | Enables modifying the table schema to match the write schema. Only adds columns missing columns |✅| | spark.sql.iceberg.report-column-stats | true | Report Puffin Table Statistics if available to Spark’s Cost Based Optimizer. CBO must be enabled for this to be effective |✅|

Read options

| Spark option | Default | Description | Gluten Support | | — | — | — | — | | snapshot-id | (latest) | Snapshot ID of the table snapshot to read |✅| | as-of-timestamp | (latest) | A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. |✅| | split-size | As per table property | Overrides this table’s read.split.target-size and read.split.metadata-target-size |✅| | lookback | As per table property | Overrides this table’s read.split.planning-lookback |✅| | file-open-cost | As per table property | Overrides this table’s read.split.open-file-cost |✅| | vectorization-enabled | As per table property | Overrides this table’s read.parquet.vectorization.enabled |❌| | batch-size | As per table property | Overrides this table’s read.parquet.vectorization.batch-size |❌| | stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used | | | streaming-max-files-per-micro-batch | INT_MAX | Maximum number of files per microbatch | | | streaming-max-rows-per-micro-batch | INT_MAX | Maximum number of rows per microbatch | |

Write options

Spark option Default Description Gluten Support
write-format Table write.format.default File format to use for this write operation; parquet, avro, or orc ⚠️ Parquet only
target-file-size-bytes As per table property Overrides this table’s write.target-file-size-bytes  
check-nullability true Sets the nullable check on fields  
snapshot-property.custom-key null Adds an entry with custom-key and corresponding value in the snapshot summary (the snapshot-property. prefix is only required for DSv2)  
fanout-enabled false Overrides this table’s write.spark.fanout.enabled
check-ordering true Checks if input schema and table schema are same  
isolation-level null Desired isolation level for Dataframe overwrite operations. null => no checks (for idempotent writes), serializable => check for concurrent inserts or deletes in destination partitions, snapshot => checks for concurrent deletes in destination partitions.  
validate-from-snapshot-id null If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via Table API or Snapshots table. If null, the table’s oldest known snapshot is used.  
compression-codec Table write.(fileformat).compression-codec Overrides this table’s compression codec for this write  
compression-level Table write.(fileformat).compression-level Overrides this table’s compression level for Parquet and Avro tables for this write  
compression-strategy Table write.orc.compression-strategy Overrides this table’s compression strategy for ORC tables for this write  
distribution-mode See Spark Writes for defaults Override this table’s distribution mode for this write 🚫
delete-granularity file Override this table’s delete granularity for this write  

Iceberg Table Properties

extracted from https://iceberg.apache.org/docs/latest/configuration/

READ Properties

Property Default Description Gluten Support
read.split.target-size 134217728 (128 MB) Target size when combining data input splits  
read.split.metadata-target-size 33554432 (32 MB) Target size when combining metadata input splits  
read.split.planning-lookback 10 Number of bins to consider when combining input splits  
read.split.open-file-cost 4194304 (4 MB) The estimated cost to open a file, used as a minimum weight when combining splits.  
read.parquet.vectorization.enabled true Controls whether Parquet vectorized reads are used  
read.parquet.vectorization.batch-size 5000 The batch size for parquet vectorized reads  
read.orc.vectorization.enabled false Controls whether orc vectorized reads are used  
read.orc.vectorization.batch-size 5000 The batch size for orc vectorized reads  

WRITE Properties

Property Default Description Gluten Support
write.format.default parquet Default file format for the table; parquet, avro, or orc  
write.delete.format.default data file format Default delete file format for the table; parquet, avro, or orc  
write.parquet.row-group-size-bytes 134217728 (128 MB) Parquet row group size  
write.parquet.page-size-bytes 1048576 (1 MB) Parquet page size  
write.parquet.page-row-limit 20000 Parquet page row limit  
write.parquet.dict-size-bytes 2097152 (2 MB) Parquet dictionary page size  
write.parquet.compression-codec zstd Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed  
write.parquet.compression-level null Parquet compression level  
write.parquet.bloom-filter-enabled.column.col1 (not set) Hint to parquet to write a bloom filter for the column: ‘col1’  
write.parquet.bloom-filter-max-bytes 1048576 (1 MB) The maximum number of bytes for a bloom filter bitset  
write.parquet.bloom-filter-fpp.column.col1 0.01 The false positive probability for a bloom filter applied to ‘col1’ (must > 0.0 and < 1.0)  
write.parquet.stats-enabled.column.col1 (not set) Controls whether to collect parquet column statistics for column ‘col1’  
write.avro.compression-codec gzip Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed  
write.avro.compression-level null Avro compression level  
write.orc.stripe-size-bytes 67108864 (64 MB) Define the default ORC stripe size, in bytes  
write.orc.block-size-bytes 268435456 (256 MB) Define the default file system block size for ORC files  
write.orc.compression-codec zlib ORC compression codec: zstd, lz4, lzo, zlib, snappy, none  
write.orc.compression-strategy speed ORC compression strategy: speed, compression  
write.orc.bloom.filter.columns (not set) Comma separated list of column names for which a Bloom filter must be created  
write.orc.bloom.filter.fpp 0.05 False positive probability for Bloom filter (must > 0.0 and < 1.0)  
write.location-provider.impl null Optional custom implementation for LocationProvider  
write.metadata.compression-codec none Metadata compression codec; none or gzip  
write.metadata.metrics.max-inferred-column-defaults 100 Defines the maximum number of columns for which metrics are collected. Columns are included with a pre-order traversal of the schema: top level fields first; then all elements of the first nested s…  
write.metadata.metrics.default truncate(16) Default metrics mode for all columns in the table; none, counts, truncate(length), or full  
write.metadata.metrics.column.col1 (not set) Metrics mode for column ‘col1’ to allow per-column tuning; none, counts, truncate(length), or full  
write.target-file-size-bytes 536870912 (512 MB) Controls the size of files generated to target about this many bytes  
write.delete.target-file-size-bytes 67108864 (64 MB) Controls the size of delete files generated to target about this many bytes  
write.distribution-mode not set, see engines for specific defaults, for example Spark Writes Defines distribution of write data: none: don’t shuffle rows; hash: hash distribute by partition key ; range: range distribute by partition key or sort key if table has an SortOrder  
write.delete.distribution-mode (not set) Defines distribution of write delete data  
write.update.distribution-mode (not set) Defines distribution of write update data  
write.merge.distribution-mode (not set) Defines distribution of write merge data  
write.wap.enabled false Enables write-audit-publish writes  
write.summary.partition-limit 0 Includes partition-level summary stats in snapshot summaries if the changed partition count is less than this limit  
write.metadata.delete-after-commit.enabled false Controls whether to delete the oldest tracked version metadata files after each table commit. See the Remove old metadata files section for additional details  
write.metadata.previous-versions-max 100 The max number of previous version metadata files to track  
write.spark.fanout.enabled false Enables the fanout writer in Spark that does not require data to be clustered; uses more memory  
write.object-storage.enabled false Enables the object storage location provider that adds a hash component to file paths  
write.object-storage.partitioned-paths true Includes the partition values in the file path  
write.data.path table location + /data Base location for data files  
write.metadata.path table location + /metadata Base location for metadata files  
write.delete.mode copy-on-write Mode used for delete commands: copy-on-write or merge-on-read (v2 and above)  
write.delete.isolation-level serializable Isolation level for delete commands: serializable or snapshot  
write.update.mode copy-on-write Mode used for update commands: copy-on-write or merge-on-read (v2 and above)  
write.update.isolation-level serializable Isolation level for update commands: serializable or snapshot  
write.merge.mode copy-on-write Mode used for merge commands: copy-on-write or merge-on-read (v2 and above)  
write.merge.isolation-level serializable Isolation level for merge commands: serializable or snapshot  
write.delete.granularity partition Controls the granularity of generated delete files: partition or file  

COMMIT Properties

Property Default Description Gluten Support
commit.retry.num-retries 4 Number of times to retry a commit before failing  
commit.retry.min-wait-ms 100 Minimum time in milliseconds to wait before retrying a commit  
commit.retry.max-wait-ms 60000 (1 min) Maximum time in milliseconds to wait before retrying a commit  
commit.retry.total-timeout-ms 1800000 (30 min) Total retry timeout period in milliseconds for a commit  
commit.status-check.num-retries 3 Number of times to check whether a commit succeeded after a connection is lost before failing due to an unknown commit state  
commit.status-check.min-wait-ms 1000 (1s) Minimum time in milliseconds to wait before retrying a status-check  
commit.status-check.max-wait-ms 60000 (1 min) Maximum time in milliseconds to wait before retrying a status-check  
commit.status-check.total-timeout-ms 1800000 (30 min) Total timeout period in which the commit status-check must succeed, in milliseconds  
commit.manifest.target-size-bytes 8388608 (8 MB) Target size when merging manifest files  
commit.manifest.min-count-to-merge 100 Minimum number of manifests to accumulate before merging  
commit.manifest-merge.enabled true Controls whether to automatically merge manifests on writes  

HISTORY Properties

Property Default Description Gluten Support
history.expire.max-snapshot-age-ms 432000000 (5 days) Default max age of snapshots to keep on the table and all of its branches while expiring snapshots  
history.expire.min-snapshots-to-keep 1 Default min number of snapshots to keep on the table and all of its branches while expiring snapshots  
history.expire.max-ref-age-ms Long.MAX_VALUE (forever) For snapshot references except the main branch, default max age of snapshot references to keep while expiring snapshots. The main branch never expires.  
format-version 2 Table’s format version (can be 1 or 2) as defined in the Spec. Defaults to 2 since version 1.4.0.  

COMPATIBILITY Properties

Property Default Description Gluten Support
compatibility.snapshot-id-inheritance.enabled false Enables committing snapshots without explicit snapshot IDs (always true if the format version is > 1)  
catalog-impl null a custom Catalog implementation to use by an engine  
io-impl null a custom FileIO implementation to use in a catalog  
warehouse null the root path of the data warehouse  
uri null a URI string, such as Hive metastore URI  
clients 2 client pool size  
cache-enabled true Whether to cache catalog entries  

CACHE Properties

Property Default Description Gluten Support
cache.expiration-interval-ms 30000 How long catalog entries are locally cached, in milliseconds; 0 disables caching, negative values disable expiration
metrics-reporter-impl org.apache.iceberg.metrics.LoggingMetricsReporter Custom MetricsReporter implementation to use in a catalog. See the Metrics reporting section for additional details
lock-impl null a custom implementation of the lock manager, the actual interface depends on the catalog used

LOCK Properties

Property Default Description Gluten Support
lock.table null an auxiliary table for locking, such as in AWS DynamoDB lock manager  
lock.acquire-interval-ms 5000 (5 s) the interval to wait between each attempt to acquire a lock  
lock.acquire-timeout-ms 180000 (3 min) the maximum time to try acquiring a lock  
lock.heartbeat-interval-ms 3000 (3 s) the interval to wait between each heartbeat after acquiring a lock  
lock.heartbeat-timeout-ms 15000 (15 s) the maximum time without a heartbeat to consider a lock expired  

ICEBERG Properties

Property Default Description Gluten Support
iceberg.hive.client-pool-size 5 The size of the Hive client pool when tracking tables in HMS  
iceberg.hive.lock-creation-timeout-ms 180000 (3 min) Maximum time in milliseconds to create a lock in the HMS  
iceberg.hive.lock-creation-min-wait-ms 50 Minimum time in milliseconds between retries of creating the lock in the HMS  
iceberg.hive.lock-creation-max-wait-ms 5000 Maximum time in milliseconds between retries of creating the lock in the HMS  
iceberg.hive.lock-timeout-ms 180000 (3 min) Maximum time in milliseconds to acquire a lock  
iceberg.hive.lock-check-min-wait-ms 50 Minimum time in milliseconds between checking the acquisition of the lock  
iceberg.hive.lock-check-max-wait-ms 5000 Maximum time in milliseconds between checking the acquisition of the lock  
iceberg.hive.lock-heartbeat-interval-ms 240000 (4 min) The heartbeat interval for the HMS locks.  
iceberg.hive.metadata-refresh-max-retries 2 Maximum number of retries when the metadata file is missing  
iceberg.hive.table-level-lock-evict-ms 600000 (10 min) The timeout for the JVM table lock is  
iceberg.engine.hive.lock-enabled true Use HMS locks to ensure atomicity of commits  

This site uses Just the Docs, a documentation theme for Jekyll.