I had the pleasure of interning with the Apache Kudu team at Cloudera this summer. This project was my summer contribution to Kudu: a restructuring of the scan path to speed up queries.
I had the pleasure of interning with the Apache Kudu team at Cloudera this summer. This project was my summer contribution to Kudu: a restructuring of the scan path to speed up queries.
This post discusses the Kudu Flume Sink. First, I’ll give some background on why we considered using Kudu, what Flume does for us, and how Flume fits with Kudu in our project.
Traditionally in the Hadoop ecosystem we’ve dealt with various batch processing technologies such as MapReduce and the many libraries and tools built on top of it in various languages (Apache Pig, Apache Hive, Apache Oozie and many others). The main problem with this approach is that it needs to process the whole data set in batches, again and again, as soon as new data gets added. Things get really complicated when a few such tasks need to get chained together, or when the same data set needs to be processed in various ways by different jobs, while all compete for the shared cluster resources.
The opposite of this approach is stream processing: process the data as soon as it arrives, not in batches. Streaming systems such as Spark Streaming, Storm, Kafka Streams, and many others make this possible. But writing streaming services is not trivial. The streaming systems are becoming more and more capable and support more complex constructs, but they are not yet easy to use. All queries and processes need to be carefully planned and implemented.
To summarize, batch processing is:
While stream processing is:
And a Kudu-based near real-time approach is:
At Argyle Data, we’re dealing with complex fraud detection scenarios. We need to ingest massive amounts of data, run machine learning algorithms and generate reports. When we created our current architecture two years ago we decided to opt for a database as the backbone of our system. That database is Apache Accumulo. It’s a key-value based database which runs on top of Hadoop HDFS, quite similar to HBase but with some important improvements such as cell level security and ease of deployment and management. To enable querying of this data for quite complex reporting and analytics, we used Presto, a distributed query engine with a pluggable architecture open-sourced by Facebook. We wrote a connector for it to let it run queries against the Accumulo database. This architecture has served us well, but there were a few problems:
So, we’ve started gradually moving the core machine-learning pipeline to a streaming based solution. This way we can ingest and process larger data-sets faster in the real-time. But then how would we take care of ad-hoc queries and long-term persistence? This is where Kudu comes in. While the machine learning pipeline ingests and processes real-time data, we store a copy of the same ingested data in Kudu for long-term access and ad-hoc queries. Kudu is our data warehouse. By using Kudu and Impala, we can retire our in-house Presto connector and rely on Impala’s super-fast query engine.
But how would we make sure data is reliably ingested into the streaming pipeline and the Kudu-based data warehouse? This is where Apache Flume comes in.
According to their website “Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms.” As you can see, nowhere is Hadoop mentioned but Flume is typically used for ingesting data to Hadoop clusters.
Flume has an extensible architecture. An instance of Flume, called an agent, can have multiple channels, with each having multiple sources and sinks of various types. Sources queue data in channels, which in turn write out data to sinks. Such pipelines can be chained together to create even more complex ones. There may be more than one agent and agents can be configured to support failover and recovery.
Flume comes with a bunch of built-in types of channels, sources and sinks. Memory channel is the default (an in-memory queue with no persistence to disk), but other options such as Kafka- and File-based channels are also provided. As for the sources, Avro, JMS, Thrift, spooling directory source are some of the built-in ones. Flume also ships with many sinks, including sinks for writing data to HDFS, HBase, Hive, Kafka, as well as to other Flume agents.
In the rest of this post I’ll go over the Kudu Flume sink and show you how to configure Flume to write ingested data to a Kudu table. The sink has been part of the Kudu distribution since the 0.8 release and the source code can be found here.
Here is a sample flume configuration file:
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
agent1.sources.source1.type = exec
agent1.sources.source1.command = /usr/bin/vmstat 1
agent1.sources.source1.channels = channel1
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 10000
agent1.channels.channel1.transactionCapacity = 1000
agent1.sinks.sink1.type = org.apache.flume.sink.kudu.KuduSink
agent1.sinks.sink1.masterAddresses = localhost
agent1.sinks.sink1.tableName = stats
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.batchSize = 50
agent1.sinks.sink1.producer = org.apache.kudu.flume.sink.SimpleKuduEventProducer
We define a source called source1
which simply executes a vmstat
command to continuously generate
virtual memory statistics for the machine and queue events into an in-memory channel1
channel,
which in turn is used for writing these events to a Kudu table called stats
. We are using
org.apache.kudu.flume.sink.SimpleKuduEventProducer
as the producer. SimpleKuduEventProducer
is
the built-in and default producer, but it’s implemented as a showcase for how to write Flume
events into Kudu tables. For any serious functionality we’d have to write a custom producer. We
need to make this producer and the KuduSink
class available to Flume. We can do that by simply
copying the kudu-flume-sink-<VERSION>.jar
jar file from the Kudu distribution to the
$FLUME_HOME/plugins.d/kudu-sink/lib
directory in the Flume installation. The jar file contains
KuduSink
and all of its dependencies (including Kudu java client classes).
At a minimum, the Kudu Flume Sink needs to know where the Kudu masters are
(agent1.sinks.sink1.masterAddresses = localhost
) and which Kudu table should be used for writing
Flume events to (agent1.sinks.sink1.tableName = stats
). The Kudu Flume Sink doesn’t create this
table, it has to be created before the Kudu Flume Sink is started.
You may also notice the batchSize
parameter. Batch size is used for batching up to that many
Flume events and flushing the entire batch in one shot. Tuning batchSize properly can have a huge
impact on ingest performance of the Kudu cluster.
Here is a complete list of KuduSink parameters:
Parameter Name | Default | Description |
---|---|---|
masterAddresses | N/A | Comma-separated list of “host:port” pairs of the masters (port optional) |
tableName | N/A | The name of the table in Kudu to write to |
producer | org.apache.kudu.flume.sink.SimpleKuduEventProducer | The fully qualified class name of the Kudu event producer the sink should use |
batchSize | 100 | Maximum number of events the sink should take from the channel per transaction, if available |
timeoutMillis | 30000 | Timeout period for Kudu operations, in milliseconds |
ignoreDuplicateRows | true | Whether to ignore errors indicating that we attempted to insert duplicate rows into Kudu |
Let’s take a look at the source code for the built-in producer class:
public class SimpleKuduEventProducer implements KuduEventProducer {
private byte[] payload;
private KuduTable table;
private String payloadColumn;
public SimpleKuduEventProducer(){
}
@Override
public void configure(Context context) {
payloadColumn = context.getString("payloadColumn","payload");
}
@Override
public void configure(ComponentConfiguration conf) {
}
@Override
public void initialize(Event event, KuduTable table) {
this.payload = event.getBody();
this.table = table;
}
@Override
public List<Operation> getOperations() throws FlumeException {
try {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addBinary(payloadColumn, payload);
return Collections.singletonList((Operation) insert);
} catch (Exception e){
throw new FlumeException("Failed to create Kudu Insert object!", e);
}
}
@Override
public void close() {
}
}
SimpleKuduEventProducer
implements the org.apache.kudu.flume.sink.KuduEventProducer
interface,
which itself looks like this:
public interface KuduEventProducer extends Configurable, ConfigurableComponent {
/**
* Initialize the event producer.
* @param event to be written to Kudu
* @param table the KuduTable object used for creating Kudu Operation objects
*/
void initialize(Event event, KuduTable table);
/**
* Get the operations that should be written out to Kudu as a result of this
* event. This list is written to Kudu using the Kudu client API.
* @return List of {@link org.kududb.client.Operation} which
* are written as such to Kudu
*/
List<Operation> getOperations();
/*
* Clean up any state. This will be called when the sink is being stopped.
*/
void close();
}
public void configure(Context context)
is called when an instance of our producer is instantiated
by the KuduSink. SimpleKuduEventProducer’s implementation looks for a producer parameter named
payloadColumn
and uses its value (“payload” if not overridden in Flume configuration file) as the
column which will hold the value of the Flume event payload. If you recall from above, we had
configured the KuduSink to listen for events generated from the vmstat
command. Each output row
from that command will be stored as a new row containing a payload
column in the stats
table.
SimpleKuduEventProducer
does not have any configuration parameters, but if it had any we would
define them by prefixing it with producer.
(agent1.sinks.sink1.producer.parameter1
for
example).
The main producer logic resides in the public List<Operation> getOperations()
method. In
SimpleKuduEventProducer’s implementation we simply insert the binary body of the Flume event into
the Kudu table. Here we call Kudu’s newInsert()
to initiate an insert, but could have used
Upsert
if updating an existing row was also an option, in fact there’s another producer
implementation available for doing just that: SimpleKeyedKuduEventProducer
. Most probably you
will need to write your own custom producer in the real world, but you can base your implementation
on the built-in ones.
In the future, we plan to add more flexible event producer implementations so that creation of a custom event producer is not required to write data to Kudu. See here for a work-in-progress generic event producer for Avro-encoded Events.
Kudu is a scalable data store which lets us ingest insane amounts of data per second. Apache Flume helps us aggregate data from various sources, and the Kudu Flume Sink lets us easily store the aggregated Flume events into Kudu. Together they enable us to create a data warehouse out of disparate sources.
Ara Abrahamian is a software engineer at Argyle Data building fraud detection systems using sophisticated machine learning methods. Ara is the original author of the Flume Kudu Sink that is included in the Kudu distribution. You can follow him on Twitter at @ara_e.
Kudu 0.10 is shipping with a few important new features for range partitioning. These features are designed to make Kudu easier to scale for certain workloads, like time series. This post will introduce these features, and discuss how to use them to effectively design tables for scalability and performance.
The Apache Kudu team is happy to announce the release of Kudu 0.10.0!
This latest version adds several new features, including:
Welcome to the twentieth edition of the Kudu Weekly Update. This weekly blog post covers ongoing development and news in the Apache Kudu project.