Apache Avro Data Source Guide
- Deploying
- Load and Save Functions
- to_avro() and from_avro()
- Data Source Option
- Configuration
- Compatibility with Databricks spark-avro
- Supported types for Avro -> Spark SQL conversion
- Supported types for Spark SQL -> Avro conversion
- Handling circular references of Avro fields
Since Spark 2.4 release, Spark SQL provides built-in support for reading and writing Apache Avro data.
Deploying
The spark-avro
module is external and not included in spark-submit
or spark-shell
by default.
As with any Spark applications, spark-submit
is used to launch your application. spark-avro_2.13
and its dependencies can be directly added to spark-submit
using --packages
, such as,
./bin/spark-submit --packages org.apache.spark:spark-avro_2.13:In-Progress ...
For experimenting on spark-shell
, you can also use --packages
to add org.apache.spark:spark-avro_2.13
and its dependencies directly,
./bin/spark-shell --packages org.apache.spark:spark-avro_2.13:In-Progress ...
See Application Submission Guide for more details about submitting applications with external dependencies.
Load and Save Functions
Since spark-avro
module is external, there is no .avro
API in
DataFrameReader
or DataFrameWriter
.
To load/save data in Avro format, you need to specify the data source option format
as avro
(or org.apache.spark.sql.avro
).
to_avro() and from_avro()
The Avro package provides function to_avro
to encode a column as binary in Avro
format, and from_avro()
to decode Avro binary data into a column. Both functions transform one column to
another column, and the input/output SQL data type can be a complex type or a primitive type.
Using Avro record as columns is useful when reading from or writing to a streaming source like Kafka. Each Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc.
- If the “value” field that contains your data is in Avro, you could use
from_avro()
to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file. to_avro()
can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka.
Data Source Option
Data source options of Avro can be set via:
- the
.option
method onDataFrameReader
orDataFrameWriter
. - the
options
parameter in functionfrom_avro
.
Property Name | Default | Meaning | Scope | Since Version |
---|---|---|---|---|
avroSchema |
None | Optional schema provided by a user in JSON format.
|
read, write and function from_avro |
2.4.0 |
recordName |
topLevelRecord | Top level record name in write result, which is required in Avro spec. | write | 2.4.0 |
recordNamespace |
"" | Record namespace in write result. | write | 2.4.0 |
ignoreExtension |
true | The option controls ignoring of files without .avro extensions in read.If the option is enabled, all files (with and without .avro extension) are loaded.The option has been deprecated, and it will be removed in the future releases. Please use the general data source option pathGlobFilter for filtering file names. |
read | 2.4.0 |
compression |
snappy | The compression option allows to specify a compression codec used in write.Currently supported codecs are uncompressed , snappy , deflate , bzip2 , xz and zstandard .If the option is not set, the configuration spark.sql.avro.compression.codec config is taken into account. |
write | 2.4.0 |
mode |
FAILFAST | The mode option allows to specify parse mode for function from_avro .Currently supported modes are:
|
function from_avro |
2.4.0 |
datetimeRebaseMode |
(value of spark.sql.avro.datetimeRebaseModeInRead configuration) |
The datetimeRebaseMode option allows to specify the rebasing mode for the values of the date , timestamp-micros , timestamp-millis logical types from the Julian to Proleptic Gregorian calendar.Currently supported modes are:
|
read and function from_avro |
3.2.0 |
positionalFieldMatching |
false | This can be used in tandem with the `avroSchema` option to adjust the behavior for matching the fields in the provided Avro schema with those in the SQL schema. By default, the matching will be performed using field names, ignoring their positions. If this option is set to "true", the matching will be based on the position of the fields. | read and write | 3.2.0 |
enableStableIdentifiersForUnionType |
false | If it is set to true, Avro schema is deserialized into Spark SQL schema, and the Avro Union type is transformed into a structure where the field names remain consistent with their respective types. The resulting field names are converted to lowercase, e.g. member_int or member_string. If two user-defined type names or a user-defined type name and a built-in type name are identical regardless of case, an exception will be raised. However, in other cases, the field names can be uniquely identified. | read | 3.5.0 |
stableIdentifierPrefixForUnionType |
member_ | When `enableStableIdentifiersForUnionType` is enabled, the option allows to configure the prefix for fields of Avro Union type. | read | 4.0.0 |
recursiveFieldMaxDepth |
-1 | If this option is specified to negative or is set to 0, recursive fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15. Values larger than 15 are not allowed in order to avoid inadvertently creating very large schemas. If an avro message has depth beyond this limit, the Spark struct returned is truncated after the recursion limit. An example of usage can be found in section Handling circular references of Avro fields | read | 4.0.0 |
Configuration
Configuration of Avro can be done via spark.conf.set
or by running SET key=value
commands using SQL.
Property Name | Default | Meaning | Since Version |
---|---|---|---|
spark.sql.legacy.replaceDatabricksSparkAvro.enabled | true |
If it is set to true, the data source provider com.databricks.spark.avro is mapped
to the built-in but external Avro data source module for backward compatibility.
Note: the SQL config has been deprecated in Spark 3.2 and might be removed in the future. |
2.4.0 |
spark.sql.avro.compression.codec | snappy | Compression codec used in writing of AVRO files. Supported codecs: uncompressed, deflate, snappy, bzip2, xz and zstandard. Default codec is snappy. | 2.4.0 |
spark.sql.avro.deflate.level | -1 | Compression level for the deflate codec used in writing of AVRO files. Valid value must be in the range of from 1 to 9 inclusive or -1. The default value is -1 which corresponds to 6 level in the current implementation. | 2.4.0 |
spark.sql.avro.xz.level | 6 | Compression level for the xz codec used in writing of AVRO files. Valid value must be in the range of from 1 to 9 inclusive. The default value is 6 in the current implementation. | 4.0.0 |
spark.sql.avro.zstandard.level | 3 | Compression level for the zstandard codec used in writing of AVRO files. The default value is 3 in the current implementation. | 4.0.0 |
spark.sql.avro.zstandard.bufferPool.enabled | false | If true, enable buffer pool of ZSTD JNI library when writing of AVRO files. | 4.0.0 |
spark.sql.avro.datetimeRebaseModeInRead | EXCEPTION |
The rebasing mode for the values of the date , timestamp-micros , timestamp-millis logical types from the Julian to Proleptic Gregorian calendar:
|
3.0.0 |
spark.sql.avro.datetimeRebaseModeInWrite | EXCEPTION |
The rebasing mode for the values of the date , timestamp-micros , timestamp-millis logical types from the Proleptic Gregorian to Julian calendar:
|
3.0.0 |
spark.sql.avro.filterPushdown.enabled | true | When true, enable filter pushdown to Avro datasource. | 3.1.0 |
Compatibility with Databricks spark-avro
This Avro data source module is originally from and compatible with Databricks’s open source repository spark-avro.
By default with the SQL configuration spark.sql.legacy.replaceDatabricksSparkAvro.enabled
enabled, the data source provider com.databricks.spark.avro
is
mapped to this built-in Avro module. For the Spark tables created with Provider
property as com.databricks.spark.avro
in
catalog meta store, the mapping is essential to load these tables if you are using this built-in Avro module.
Note in Databricks’s spark-avro, implicit classes
AvroDataFrameWriter
and AvroDataFrameReader
were created for shortcut function .avro()
. In this
built-in but external module, both implicit classes are removed. Please use .format("avro")
in
DataFrameWriter
or DataFrameReader
instead, which should be clean and good enough.
If you prefer using your own build of spark-avro
jar file, you can simply disable the configuration
spark.sql.legacy.replaceDatabricksSparkAvro.enabled
, and use the option --jars
on deploying your
applications. Read the Advanced Dependency Management section in the Application
Submission Guide for more details.
Supported types for Avro -> Spark SQL conversion
Currently Spark supports reading all primitive types and complex types under records of Avro.
Avro type | Spark SQL type |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
fixed | BinaryType |
bytes | BinaryType |
record | StructType |
array | ArrayType |
map | MapType |
union | See below |
In addition to the types listed above, it supports reading union
types. The following three types are considered basic union
types:
union(int, long)
will be mapped to LongType.union(float, double)
will be mapped to DoubleType.union(something, null)
, where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true. All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet.
It also supports reading the following Avro logical types:
Avro logical type | Avro type | Spark SQL type |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | fixed | DecimalType |
decimal | bytes | DecimalType |
At the moment, it ignores docs, aliases and other properties present in the Avro file.
Supported types for Spark SQL -> Avro conversion
Spark supports writing of all Spark SQL types into Avro. For most types, the mapping from Spark types to Avro types is straightforward (e.g. IntegerType gets converted to int); however, there are a few special cases which are listed below:
Spark SQL type | Avro type | Avro logical type |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DateType | int | date |
TimestampType | long | timestamp-micros |
DecimalType | fixed | decimal |
You can also specify the whole output Avro schema with the option avroSchema
, so that Spark SQL types can be converted into other Avro types. The following conversions are not applied by default and require user specified Avro schema:
Spark SQL type | Avro type | Avro logical type |
---|---|---|
BinaryType | fixed | |
StringType | enum | |
TimestampType | long | timestamp-millis |
DecimalType | bytes | decimal |
Handling circular references of Avro fields
In Avro, a circular reference occurs when the type of a field is defined in one of the parent records. This can cause issues when parsing the data, as it can result in infinite loops or other unexpected behavior.
To read Avro data with schema that has circular reference, users can use the recursiveFieldMaxDepth
option to specify the maximum number of levels of recursion to allow when parsing the schema. By default, Spark Avro data source will not permit recursive fields by setting recursiveFieldMaxDepth
to -1. However, you can set this option to 1 to 15 if needed.
Setting recursiveFieldMaxDepth
to 1 drops all recursive fields, setting it to 2 allows it to be recursed once, and setting it to 3 allows it to be recursed twice. A recursiveFieldMaxDepth
value greater than 15 is not allowed, as it can lead to performance issues and even stack overflows.
SQL Schema for the below Avro message will vary based on the value of recursiveFieldMaxDepth
.