SQL Pipe Syntax

Syntax

Overview

Apache Spark supports SQL pipe syntax which allows composing queries from combinations of operators.

FROM <tableName> is now a supported standalone query which behaves the same as TABLE <tableName>. This provides a convenient starting place to begin a chained pipe SQL query, although it is possible to add one or more pipe operators to the end of any valid Spark SQL query with the same consistent behavior as written here.

Please refer to the table at the end of this document for a full list of all supported operators and their semantics.

Example

For example, this is query 13 from the TPC-H benchmark:

SELECT c_count, COUNT(*) AS custdist
FROM
  (SELECT c_custkey, COUNT(o_orderkey) c_count
   FROM customer
   LEFT OUTER JOIN orders ON c_custkey = o_custkey
     AND o_comment NOT LIKE '%unusual%packages%' GROUP BY c_custkey
  ) AS c_orders
GROUP BY c_count
ORDER BY custdist DESC, c_count DESC;

To write the same logic using SQL pipe operators, we express it like this:

FROM customer
|> LEFT OUTER JOIN orders ON c_custkey = o_custkey
   AND o_comment NOT LIKE '%unusual%packages%'
|> AGGREGATE COUNT(o_orderkey) c_count
   GROUP BY c_custkey
|> AGGREGATE COUNT(*) AS custdist
   GROUP BY c_count
|> ORDER BY custdist DESC, c_count DESC;

Source Tables

To start a new query using SQL pipe syntax, use the FROM <tableName> or TABLE <tableName> clause, which creates a relation comprising all rows from the source table. Then append one or more pipe operators to the end of this clause to perform further transformations.

Projections

SQL pipe syntax supports composable ways to evaluate expressions. A major advantage of these projection features is that they support computing new expressions based on previous ones in an incremental way. No lateral column references are needed here since each operator applies independently on its input table, regardless of the order in which the operators appear. Each of these computed columns then becomes visible to use with the following operator.

SELECT produces a new table by evaluating the provided expressions.
It is possible to use DISTINCT and * as needed.
This works like the outermost SELECT in a table subquery in regular Spark SQL.

EXTEND adds new columns to the input table by evaluating the provided expressions.
This also preserves table aliases.
This works like SELECT *, new_column in regular Spark SQL.

DROP removes columns from the input table.
This is similar to SELECT * EXCEPT (column) in regular Spark SQL.

SET replaces column values from the input table.
This is similar to SELECT * REPLACE (expression AS column) in regular Spark SQL.

AS forwards the input table and introduces a new alias for each row.

Aggregations

In general, aggregation takes place differently using SQL pipe syntax as opposed to regular Spark SQL.

To perform full-table aggregation, use the AGGREGATE operator with a list of aggregate expressions to evaluate. This returns one single row in the output table.

To perform aggregation with grouping, use the AGGREGATE operator with a GROUP BY clause. This returns one row for each unique combination of values of the grouping expressions. The output table contains the evaluated grouping expressions followed by the evaluated aggregate functions. Grouping expressions support assigning aliases for purposes of referring to them in future operators. In this way, it is not necessary to repeat entire expressions between GROUP BY and SELECT, since AGGREGATE is a single operator that performs both.

Other Transformations

The remaining operators are used for other transformations, such as filtering, joining, sorting, sampling, and set operations. These operators generally work in the same way as in regular Spark SQL, as described in the table below.

Independence and Interoperability

SQL pipe syntax works in Spark without any backwards-compatibility concerns with existing SQL queries; it is possible to write any query using regular Spark SQL, pipe syntax, or a combination of the two. As a consequence, the following invariants always hold:

Supported Operators

Operator Output rows
FROM or TABLE Returns all the output rows from the source table unmodified.
SELECT Evaluates the provided expressions over each of the rows of the input table.
EXTEND Appends new columns to the input table by evaluating the specified expressions over each of the input rows.
SET Updates columns of the input table by replacing them with the result of evaluating the provided expressions.
DROP Drops columns of the input table by name.
AS Retains the same rows and column names of the input table but with a new table alias.
WHERE Returns the subset of input rows passing the condition.
LIMIT Returns the specified number of input rows, preserving ordering (if any).
AGGREGATE Performs aggregation with or without grouping.
JOIN Joins rows from both inputs, returning a filtered cross-product of the input table and the table argument.
ORDER BY Returns the input rows after sorting as indicated.
UNION ALL Performs the union or other set operation over the combined rows from the input table plus other table argument(s).
TABLESAMPLE Returns the subset of rows chosen by the provided sampling algorithm.
PIVOT Returns a new table with the input rows pivoted to become columns.
UNPIVOT Returns a new table with the input columns pivoted to become rows.

This table lists each of the supported pipe operators and describes the output rows they produce. Note that each operator accepts an input relation comprising the rows generated by the query preceding the |> symbol.

FROM or TABLE

FROM <tableName>
TABLE <tableName>

Returns all the output rows from the source table unmodified.

For example:

CREATE TABLE t AS VALUES (1, 2), (3, 4) AS t(a, b);
TABLE t;

+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

SELECT

|> SELECT <expr> [[AS] alias], ...

Evaluates the provided expressions over each of the rows of the input table.

It is possible to use DISTINCT and * as needed.
This works like the outermost SELECT in a table subquery in regular Spark SQL.

Window functions are supported in the SELECT list as well. To use them, the OVER clause must be provided. You may provide the window specification in the WINDOW clause.

For example:

CREATE TABLE t AS VALUES (0), (1) AS t(col);

FROM t
|> SELECT col * 2 AS result;

+------+
|result|
+------+
|     0|
|     2|
+------+

EXTEND

|> EXTEND <expr> [[AS] alias], ...

Appends new columns to the input table by evaluating the specified expressions over each of the input rows.

For example:

VALUES (0), (1) tab(col)
|> EXTEND col * 2 AS result;

+---+------+
|col|result|
+---+------+
|  0|     0|
|  1|     2|
+---+------+

SET

|> SET <column> = <expression>, ...

Updates columns of the input table by replacing them with the result of evaluating the provided expressions.

For example:

VALUES (0), (1) tab(col)
|> SET col = col * 2;

+---+
|col|
+---+
|  0|
|  2|
+---+

DROP

|> DROP <column>, ...

Drops columns of the input table by name.

For example:

VALUES (0, 1) tab(col1, col2)
|> DROP col1;

+----+
|col2|
+----+
|   1|
+----+

AS

|> AS <alias>

Retains the same rows and column names of the input table but with a new table alias.

For example:

VALUES (0, 1) tab(col1, col2)
|> AS new_tab;
|> SELECT * FROM new_tab;

+----+----+
|col1|col2|
+----+----+
|   0|   1|
+----+----+

WHERE

|> WHERE <condition>

Returns the subset of input rows passing the condition.

Since this operator may appear anywhere, no separate HAVING or QUALIFY syntax is needed.

For example:

VALUES (0), (1) tab(col)
|> WHERE col = 1;

+---+
|col|
+---+
|  1|
+---+

LIMIT

|> [LIMIT <n>] [OFFSET <m>]

Returns the specified number of input rows, preserving ordering (if any).

LIMIT and OFFSET are supported together. The LIMIT clause can also be used without the OFFSET clause, and the OFFSET clause can be used without the LIMIT clause.

For example:

VALUES (0), (0) tab(col)
|> LIMIT 1;

+---+
|col|
+---+
|  0|
+---+

AGGREGATE

|> AGGREGATE <agg_expr> [[AS] alias], ...

Performs full-table aggregation, returning one result row with a column for each aggregate expression.

|> AGGREGATE [<agg_expr> [[AS] alias], ...] GROUP BY <grouping_expr> [AS alias], ...

Performs aggregation with grouping, returning one row per group. The column list includes the grouping columns first and then the aggregate columns afterward. Aliases can be assigned directly on grouping expressions.

For example:

VALUES (0), (1) tab(col)
|> AGGREGATE COUNT(col) AS count;

+-----+
|count|
+-----+
|    2|
+-----+

VALUES (0, 1), (0, 2) tab(col1, col2)
|> AGGREGATE COUNT(col2) AS count GROUP BY col1;

+----+-----+
|col1|count|
+----+-----+
|   0|    2|
+----+-----+

JOIN

|> [LEFT | RIGHT | FULL | CROSS | SEMI | ANTI | NATURAL | LATERAL] JOIN <table> [ON <condition> | USING(col, ...)]

Joins rows from both inputs, returning a filtered cross-product of the pipe input table and the table expression following the JOIN keyword.

For example:

VALUES (0, 1) tab(a, b)
|> JOIN VALUES (0, 2) tab(c, d) ON a = c;

+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  0|  1|  0|  2|
+---+---+---+---+

ORDER BY

|> ORDER BY <expr> [ASC | DESC], ...

Returns the input rows after sorting as indicated. Standard modifiers are supported including NULLS FIRST/LAST.

For example:

VALUES (0), (1) tab(col)
|> ORDER BY col DESC;

+---+
|col|
+---+
|  1|
|  0|
+---+

UNION, INTERSECT, EXCEPT

|> {UNION | INTERSECT | EXCEPT} {ALL | DISTINCT} (<query>), (<query>), ...

Performs the union or other set operation over the combined rows from the input table plus one or more tables provided as input arguments.

For example:

VALUES (0), (1) tab(a, b)
|> UNION ALL VALUES (2), (3) tab(c, d);

+---+----+
|  a|   b|
+---+----+
|  0|   1|
|  2|   3|
+---+----+

TABLESAMPLE

|> TABLESAMPLE <method>(<size> {ROWS | PERCENT})

Returns the subset of rows chosen by the provided sampling algorithm.

For example:

VALUES (0), (0), (0), (0) tab(col)
|> TABLESAMPLE BERNOULLI(1 ROWS);

+---+
|col|
+---+
|  0|
+---+

PIVOT

|> PIVOT (agg_expr FOR col IN (val1, ...))

Returns a new table with the input rows pivoted to become columns.

For example:

VALUES
  ("dotNET", 2012, 10000),
  ("Java", 2012, 20000),
  ("dotNET", 2012, 5000),
  ("dotNET", 2013, 48000),
  ("Java", 2013, 30000)
  courseSales(course, year, earnings)
|> PIVOT (
     SUM(earnings)
     FOR COURSE IN ('dotNET', 'Java')
  )

+----+------+------+
|year|dotNET|  Java|
+----+------+------+
|2012| 15000| 20000|
|2013| 48000| 30000|
+----+------+------+

UNPIVOT

|> UNPIVOT (value_col FOR key_col IN (col1, ...))

Returns a new table with the input columns pivoted to become rows.

For example:

VALUES
  ("dotNET", 2012, 10000),
  ("Java", 2012, 20000),
  ("dotNET", 2012, 5000),
  ("dotNET", 2013, 48000),
  ("Java", 2013, 30000)
  courseSales(course, year, earnings)
|> UNPIVOT (
  earningsYear FOR `year` IN (`2012`, `2013`, `2014`)

+--------+------+--------+
|  course|  year|earnings|
+--------+------+--------+
|    Java|  2012|   20000|
|    Java|  2013|   30000|
|  dotNET|  2012|   15000|
|  dotNET|  2013|   48000|
|  dotNET|  2014|   22500|
+--------+------+--------+