<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.14.0</version>
</dependency>
Kudu provides C++, Java and Python client APIs, as well as reference examples to illustrate their use.
Use of server-side or private interfaces is not supported, and interfaces which are not part of public APIs have no stability guarantees. |
You can view the C++ client API documentation
online. Alternatively, after
building Kudu from source, you can
additionally build the doxygen
target (e.g., run make doxygen
if using
make) and use the locally generated API documentation by opening
docs/doxygen/client_api/html/index.html
file in your favorite Web browser.
In order to build the doxygen target, it’s necessary to have
doxygen of version 1.8.11 or newer with Dot (graphviz) support installed at
your build machine. If you installed doxygen after building Kudu from source,
you will need to run cmake again to pick up the doxygen location and generate
appropriate targets.
|
You can view the Java API documentation online.
Alternatively, after building
the Java client, Java API documentation is available in
java/kudu-client/target/apidocs/index.html
.
Several example applications are provided in the
examples directory
of the Apache Kudu git repository. Each example includes a README
that shows
how to compile and run it. The following list includes some of the
examples that are available today. Check the repository itself in case this list goes
out of date.
cpp/example.cc
A simple C++ application which connects to a Kudu instance, creates a table, writes data to it, then drops the table.
java/java-example
A simple Java application which connects to a Kudu instance, creates a table, writes data to it, then drops the table.
java/collectl
A small Java application which listens on a TCP socket for time series data corresponding to the Collectl wire protocol. The commonly-available collectl tool can be used to send example data to the server.
java/insert-loadgen
A Java application that generates random insert load.
python/dstat-kudu
An example program that shows how to use the Kudu Python API to load data into a new / existing Kudu table
generated by an external program, dstat
in this case.
python/graphite-kudu
An example plugin for using graphite-web with Kudu as a backend.
These examples should serve as helpful starting points for your own Kudu applications and integrations.
The following Maven <dependency>
element is valid for the Apache Kudu public release
(since 1.0.0):
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.14.0</version>
</dependency>
Convenience binary artifacts for the Java client and various Java integrations (e.g. Spark, Flume) are also available via the ASF Maven repository and Maven Central repository.
See Using Impala With Kudu for guidance on installing
and using Impala with Kudu, including several impala-shell
examples.
Kudu integrates with Spark through the Data Source API as of version 1.0.0. Include the kudu-spark dependency using the --packages option:
Use the kudu-spark_2.10 artifact if using Spark with Scala 2.10. Note that Spark 1 is no longer supported in Kudu starting from version 1.6.0. So in order to use Spark 1 integrated with Kudu, version 1.5.0 is the latest to go to.
spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.5.0
Use kudu-spark2_2.11 artifact if using Spark 2 with Scala 2.11.
kudu-spark versions 1.8.0 and below have slightly different syntax. See the documentation of your version for a valid example. Versioned documentation can be found on the releases page. |
spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.14.0
Below is a minimal Spark SQL "select" example. We first import the kudu spark package, then create a DataFrame, and then create a view from the DataFrame. After those steps, the table is accessible from Spark SQL.
There is also a Spark quickstart guide and another example available. |
You can use the Kudu CLI tool to create table and generate data by
kudu perf loadgen kudu.master:7051 -keep_auto_table for the following two examples.
|
import org.apache.kudu.spark.kudu._
// Create a DataFrame that points to the Kudu table we want to query.
val df = spark.read.options(Map("kudu.master" -> "kudu.master:7051",
"kudu.table" -> "default.my_table")).format("kudu").load
// Create a view from the DataFrame to make it accessible from Spark SQL.
df.createOrReplaceTempView("my_table")
// Now we can run Spark SQL queries against our view of the Kudu table.
spark.sql("select * from my_table").show()
Below is a more sophisticated example that includes both reads and writes:
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu.KuduContext
import collection.JavaConverters._
// Read a table from Kudu
val df = spark.read
.options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> "kudu_table"))
.format("kudu").load
// Query using the Spark API...
df.select("key").filter("key >= 5").show()
// ...or register a temporary table and use SQL
df.createOrReplaceTempView("kudu_table")
val filteredDF = spark.sql("select key from kudu_table where key >= 5").show()
// Use KuduContext to create, delete, or write to Kudu tables
val kuduContext = new KuduContext("kudu.master:7051", spark.sparkContext)
// Create a new Kudu table from a DataFrame schema
// NB: No rows from the DataFrame are inserted into the table
kuduContext.createTable(
"test_table", df.schema, Seq("key"),
new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(List("key").asJava, 3))
// Check for the existence of a Kudu table
kuduContext.tableExists("test_table")
// Insert data
kuduContext.insertRows(df, "test_table")
// Delete data
kuduContext.deleteRows(df, "test_table")
// Upsert data
kuduContext.upsertRows(df, "test_table")
// Update data
val updateDF = df.select($"key", ($"int_val" + 1).as("int_val"))
kuduContext.updateRows(updateDF, "test_table")
// Data can also be inserted into the Kudu table using the data source, though the methods on
// KuduContext are preferred
// NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert
// in the options map
// NB: Only mode Append is supported
df.write
.options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table"))
.mode("append")
.format("kudu").save
// Delete a Kudu table
kuduContext.deleteTable("test_table")
The upsert operation in kudu-spark supports an extra write option of ignoreNull
. If set to true,
it will avoid setting existing column values in Kudu table to Null if the corresponding DataFrame
column values are Null. If unspecified, ignoreNull
is false by default.
val dataFrame = spark.read
.options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> simpleTableName))
.format("kudu").load
dataFrame.createOrReplaceTempView(simpleTableName)
dataFrame.show()
// Below is the original data in the table 'simpleTableName'
+---+---+
|key|val|
+---+---+
| 0|foo|
+---+---+
// Upsert a row with existing key 0 and val Null with ignoreNull set to true
val nullDF = spark.createDataFrame(Seq((0, null.asInstanceOf[String]))).toDF("key", "val")
val wo = new KuduWriteOptions
wo.ignoreNull = true
kuduContext.upsertRows(nullDF, simpleTableName, wo)
dataFrame.show()
// The val field stays unchanged
+---+---+
|key|val|
+---+---+
| 0|foo|
+---+---+
// Upsert a row with existing key 0 and val Null with ignoreNull default/set to false
kuduContext.upsertRows(nullDF, simpleTableName)
// Equivalent to:
// val wo = new KuduWriteOptions
// wo.ignoreNull = false
// kuduContext.upsertRows(nullDF, simpleTableName, wo)
df.show()
// The val field is set to Null this time
+---+----+
|key| val|
+---+----+
| 0|null|
+---+----+
The Kudu Spark integration is able to operate on secure Kudu clusters which have
authentication and encryption enabled, but the submitter of the Spark job must
provide the proper credentials. For Spark jobs using the default 'client' deploy
mode, the submitting user must have an active Kerberos ticket granted through
kinit
. For Spark jobs using the 'cluster' deploy mode, a Kerberos principal
name and keytab location must be provided through the --principal
and
--keytab
arguments to spark2-submit
.
One common Kudu-Spark coding error is instantiating extra KuduClient
objects.
In kudu-spark, a KuduClient
is owned by the KuduContext
. Spark application code
should not create another KuduClient
connecting to the same cluster. Instead,
application code should use the KuduContext
to access a KuduClient
using
KuduContext#syncClient
.
To diagnose multiple KuduClient
instances in a Spark job, look for signs in
the logs of the master being overloaded by many GetTableLocations
or
GetTabletLocations
requests coming from different clients, usually around the
same time. This symptom is especially likely in Spark Streaming code,
where creating a KuduClient
per task will result in periodic waves of master
requests from new clients.
Spark 2.2+ requires Java 8 at runtime even though Kudu Spark 2.x integration is Java 7 compatible. Spark 2.2 is the default dependency version as of Kudu 1.5.0.
Kudu tables with a name containing upper case or non-ascii characters must be assigned an alternate name when registered as a temporary table.
Kudu tables with a column name containing upper case or non-ascii characters may not be used with SparkSQL. Columns may be renamed in Kudu to work around this issue.
<>
and OR
predicates are not pushed to Kudu, and instead will be evaluated
by the Spark task. Only LIKE
predicates with a suffix wildcard are pushed to
Kudu, meaning that LIKE "FOO%"
is pushed down but LIKE "FOO%BAR"
isn’t.
Kudu does not support every type supported by Spark SQL. For example,
Date
and complex types are not supported.
Kudu tables may only be registered as temporary tables in SparkSQL. Kudu tables may not be queried using HiveContext.
As of version 1.9.0, Kudu ships with an experimental feature called the binary
test JAR. This feature gives people who want to test against Kudu the
capability to start a Kudu "mini cluster" from Java or another JVM-based
language without having to first build Kudu locally. This is possible because
the Kudu binary JAR contains relocatable Kudu binaries that are used by the
KuduTestHarness
in the kudu-test-utils
module. The KuduTestHarness
contains logic to search the classpath for the Kudu binaries and to start a
mini cluster using them.
Important: The kudu-binary
module should only be used to run Kudu for
integration testing purposes. It should never be used to run an actual Kudu
service, in production or development, because the kudu-binary
module
includes native security-related dependencies that have been copied from the
build system and will not be patched when the operating system on the runtime
host is patched.
The binary test JAR must be run on one of the supported Kudu platforms, which includes:
macOS Big Sur (11) or later;
CentOS 7+, Ubuntu 18.04+, or another recent distribution of Linux
The related Maven integration using os-maven-plugin
requires Maven 3.1 or later.
Take the following steps to start a Kudu mini cluster from a Java project.
1. Add build-time dependencies. The kudu-binary
artifact contains the
native Kudu (server and command-line tool) binaries for specific operating
systems. In order to download the right artifact for the running operating
system, use the os-maven-plugin
to detect the current runtime environment.
Finally, the kudu-test-utils
module provides the KuduTestHarness
class,
which runs a Kudu mini cluster.
Maven example for Kudu 1.14.0:
<build>
<extensions>
<!-- Used to find the right kudu-binary artifact with the Maven
property ${os.detected.classifier} -->
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-test-utils</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-binary</artifactId>
<version>1.14.0</version>
<classifier>${os.detected.classifier}</classifier>
<scope>test</scope>
</dependency>
</dependencies>
2. Write a test that starts a Kudu mini cluster using the KuduTestHarness. It will automatically find the binary test JAR if Maven is configured correctly.
The recommended way to start a Kudu mini cluster is by using the
KuduTestHarness
class from the kudu-test-utils
module, which also acts as a
JUnit Rule
. Here is an example of a Java-based integration test that starts a
Kudu cluster, creates a Kudu table on the cluster, and then exits:
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.test.KuduTestHarness;
import org.junit.Rule;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class MyKuduTest {
// The KuduTestHarness automatically starts and stops a real Kudu cluster
// when each test is run. Kudu persists its on-disk state in a temporary
// directory under a location defined by the environment variable TEST_TMPDIR
// if set, or under /tmp otherwise. That cluster data is deleted on
// successful exit of the test. The cluster output is logged through slf4j.
@Rule
public KuduTestHarness harness = new KuduTestHarness();
@Test
public void test() throws Exception {
// Get a KuduClient configured to talk to the running mini cluster.
KuduClient client = harness.getClient();
// Create a new Kudu table.
List<ColumnSchema> columns = new ArrayList<>();
columns.add(
new ColumnSchema.ColumnSchemaBuilder(
"key", Type.INT32).key(true).build());
Schema schema = new Schema(columns);
CreateTableOptions opts =
new CreateTableOptions().setRangePartitionColumns(
Collections.singletonList("key"));
client.createTable("table-1", schema, opts);
// Now we may insert rows into the newly-created Kudu table using 'client',
// scan the table, etc.
}
}
For more examples of using the KuduTestHarness
, including how to pass
configuration options to the Kudu cluster being managed by the harness, see the
java-example
project in the Kudu source code repository, or look at the various Kudu
integration tests under
java in the Kudu source
code repository.
The Kudu Python client provides a Python friendly interface to the C++ client API. The sample below demonstrates the use of part of the Python client.
import kudu
from kudu.client import Partitioning
from datetime import datetime
# Connect to Kudu master server
client = kudu.connect(host='kudu.master', port=7051)
# Define a schema for a new table
builder = kudu.schema_builder()
builder.add_column('key').type(kudu.int64).nullable(False).primary_key()
builder.add_column('ts_val', type_=kudu.unixtime_micros, nullable=False, compression='lz4')
schema = builder.build()
# Define partitioning schema
partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)
# Create new table
client.create_table('python-example', schema, partitioning)
# Open a table
table = client.table('python-example')
# Create a new session so that we can apply write operations
session = client.new_session()
# Insert a row
op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()})
session.apply(op)
# Upsert a row
op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"})
session.apply(op)
# Updating a row
op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")})
session.apply(op)
# Delete a row
op = table.new_delete({'key': 2})
session.apply(op)
# Flush write operations, if failures occur, capture print them.
try:
session.flush()
except kudu.KuduBadStatus as e:
print(session.get_pending_errors())
# Create a scanner and add a predicate
scanner = table.scanner()
scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1))
# Open Scanner and read all tuples
# Note: This doesn't scale for large scans
result = scanner.open().read_all_tuples()
Kudu was designed to integrate with MapReduce, YARN, Spark, and other frameworks in the Hadoop ecosystem. See RowCounter.java and ImportCsv.java for examples which you can model your own integrations on. Stay tuned for more examples using YARN and Spark in the future.