Arrow Flight

This section contains a number of recipes for working with Arrow Flight, an RPC library specialized for tabular datasets. For more about Flight, see format/Flight.

Simple Parquet storage service with Arrow Flight

We’ll implement a service that provides a key-value store for tabular data, using Flight to handle uploads/requests and Parquet to store the actual data.

First, we’ll implement the service itself. For simplicity, we won’t use the Datasets API in favor of just using the Parquet API directly.

Parquet storage service, server implementation
  1class ParquetStorageService : public arrow::flight::FlightServerBase {
  2 public:
  3  const arrow::flight::ActionType kActionDropDataset{"drop_dataset", "Delete a dataset."};
  4
  5  explicit ParquetStorageService(std::shared_ptr<arrow::fs::FileSystem> root)
  6      : root_(std::move(root)) {}
  7
  8  arrow::Status ListFlights(
  9      const arrow::flight::ServerCallContext&, const arrow::flight::Criteria*,
 10      std::unique_ptr<arrow::flight::FlightListing>* listings) override {
 11    arrow::fs::FileSelector selector;
 12    selector.base_dir = "/";
 13    ARROW_ASSIGN_OR_RAISE(auto listing, root_->GetFileInfo(selector));
 14
 15    std::vector<arrow::flight::FlightInfo> flights;
 16    for (const auto& file_info : listing) {
 17      if (!file_info.IsFile() || file_info.extension() != "parquet") continue;
 18      ARROW_ASSIGN_OR_RAISE(auto info, MakeFlightInfo(file_info));
 19      flights.push_back(std::move(info));
 20    }
 21
 22    *listings = std::unique_ptr<arrow::flight::FlightListing>(
 23        new arrow::flight::SimpleFlightListing(std::move(flights)));
 24    return arrow::Status::OK();
 25  }
 26
 27  arrow::Status GetFlightInfo(const arrow::flight::ServerCallContext&,
 28                              const arrow::flight::FlightDescriptor& descriptor,
 29                              std::unique_ptr<arrow::flight::FlightInfo>* info) override {
 30    ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(descriptor));
 31    ARROW_ASSIGN_OR_RAISE(auto flight_info, MakeFlightInfo(file_info));
 32    *info = std::unique_ptr<arrow::flight::FlightInfo>(
 33        new arrow::flight::FlightInfo(std::move(flight_info)));
 34    return arrow::Status::OK();
 35  }
 36
 37  arrow::Status DoPut(const arrow::flight::ServerCallContext&,
 38                      std::unique_ptr<arrow::flight::FlightMessageReader> reader,
 39                      std::unique_ptr<arrow::flight::FlightMetadataWriter>) override {
 40    ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(reader->descriptor()));
 41    ARROW_ASSIGN_OR_RAISE(auto sink, root_->OpenOutputStream(file_info.path()));
 42    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, reader->ToTable());
 43
 44    ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(),
 45                                                   sink, /*chunk_size=*/65536));
 46    return arrow::Status::OK();
 47  }
 48
 49  arrow::Status DoGet(const arrow::flight::ServerCallContext&,
 50                      const arrow::flight::Ticket& request,
 51                      std::unique_ptr<arrow::flight::FlightDataStream>* stream) override {
 52    ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(request.ticket));
 53    ARROW_ASSIGN_OR_RAISE(
 54        auto reader,
 55        parquet::arrow::OpenFile(std::move(input), arrow::default_memory_pool()));
 56
 57    std::shared_ptr<arrow::Table> table;
 58#if ARROW_VERSION_MAJOR >= 24
 59    ARROW_ASSIGN_OR_RAISE(table, reader->ReadTable());
 60#else
 61    ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
 62#endif
 63    // Note that we can't directly pass TableBatchReader to
 64    // RecordBatchStream because TableBatchReader keeps a non-owning
 65    // reference to the underlying Table, which would then get freed
 66    // when we exit this function
 67    std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
 68    arrow::TableBatchReader batch_reader(*table);
 69    ARROW_ASSIGN_OR_RAISE(batches, batch_reader.ToRecordBatches());
 70
 71    ARROW_ASSIGN_OR_RAISE(auto owning_reader, arrow::RecordBatchReader::Make(
 72                                                  std::move(batches), table->schema()));
 73    *stream = std::unique_ptr<arrow::flight::FlightDataStream>(
 74        new arrow::flight::RecordBatchStream(owning_reader));
 75
 76    return arrow::Status::OK();
 77  }
 78
 79  arrow::Status ListActions(const arrow::flight::ServerCallContext&,
 80                            std::vector<arrow::flight::ActionType>* actions) override {
 81    *actions = {kActionDropDataset};
 82    return arrow::Status::OK();
 83  }
 84
 85  arrow::Status DoAction(const arrow::flight::ServerCallContext&,
 86                         const arrow::flight::Action& action,
 87                         std::unique_ptr<arrow::flight::ResultStream>* result) override {
 88    if (action.type == kActionDropDataset.type) {
 89      *result = std::unique_ptr<arrow::flight::ResultStream>(
 90          new arrow::flight::SimpleResultStream({}));
 91      return DoActionDropDataset(action.body->ToString());
 92    }
 93    return arrow::Status::NotImplemented("Unknown action type: ", action.type);
 94  }
 95
 96 private:
 97  arrow::Result<arrow::flight::FlightInfo> MakeFlightInfo(
 98      const arrow::fs::FileInfo& file_info) {
 99    ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(file_info));
100    ARROW_ASSIGN_OR_RAISE(
101        auto reader,
102        parquet::arrow::OpenFile(std::move(input), arrow::default_memory_pool()));
103
104    std::shared_ptr<arrow::Schema> schema;
105    ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));
106
107    auto descriptor = arrow::flight::FlightDescriptor::Path({file_info.base_name()});
108
109    arrow::flight::FlightEndpoint endpoint;
110    endpoint.ticket.ticket = file_info.base_name();
111    arrow::flight::Location location;
112    ARROW_ASSIGN_OR_RAISE(location,
113                          arrow::flight::Location::ForGrpcTcp("localhost", port()));
114    endpoint.locations.push_back(location);
115
116    int64_t total_records = reader->parquet_reader()->metadata()->num_rows();
117    int64_t total_bytes = file_info.size();
118
119    return arrow::flight::FlightInfo::Make(*schema, descriptor, {endpoint}, total_records,
120                                           total_bytes);
121  }
122
123  arrow::Result<arrow::fs::FileInfo> FileInfoFromDescriptor(
124      const arrow::flight::FlightDescriptor& descriptor) {
125    if (descriptor.type != arrow::flight::FlightDescriptor::PATH) {
126      return arrow::Status::Invalid("Must provide PATH-type FlightDescriptor");
127    } else if (descriptor.path.size() != 1) {
128      return arrow::Status::Invalid(
129          "Must provide PATH-type FlightDescriptor with one path component");
130    }
131    return root_->GetFileInfo(descriptor.path[0]);
132  }
133
134  arrow::Status DoActionDropDataset(const std::string& key) {
135    return root_->DeleteFile(key);
136  }
137
138  std::shared_ptr<arrow::fs::FileSystem> root_;
139};  // end ParquetStorageService

First, we’ll start our server:

auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
ARROW_RETURN_NOT_OK(fs->CreateDir("./flight_datasets/"));
ARROW_RETURN_NOT_OK(fs->DeleteDirContents("./flight_datasets/"));
auto root = std::make_shared<arrow::fs::SubTreeFileSystem>("./flight_datasets/", fs);

arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
                      arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));

arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
    new ParquetStorageService(std::move(root)));
ARROW_RETURN_NOT_OK(server->Init(options));
rout << "Listening on port " << server->port() << std::endl;
Code Output
Listening on port 34427

We can then create a client and connect to the server:

arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
                      arrow::flight::Location::ForGrpcTcp("localhost", server->port()));

std::unique_ptr<arrow::flight::FlightClient> client;
ARROW_ASSIGN_OR_RAISE(client, arrow::flight::FlightClient::Connect(location));
rout << "Connected to " << location.ToString() << std::endl;
Code Output
Connected to grpc+tcp://localhost:34427

First, we’ll create and upload a table, which will get stored in a Parquet file by the server.

// Open example data file to upload
ARROW_ASSIGN_OR_RAISE(std::string airquality_path,
                      FindTestDataFile("airquality.parquet"));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input,
                      fs->OpenInputFile(airquality_path));
ARROW_ASSIGN_OR_RAISE(auto reader, parquet::arrow::OpenFile(
                                       std::move(input), arrow::default_memory_pool()));

auto descriptor = arrow::flight::FlightDescriptor::Path({"airquality.parquet"});
std::shared_ptr<arrow::Schema> schema;
ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));

// Start the RPC call
std::unique_ptr<arrow::flight::FlightStreamWriter> writer;
std::unique_ptr<arrow::flight::FlightMetadataReader> metadata_reader;
ARROW_ASSIGN_OR_RAISE(auto put_stream, client->DoPut(descriptor, schema));
writer = std::move(put_stream.writer);
metadata_reader = std::move(put_stream.reader);

// Upload data
std::vector<int> row_groups(reader->num_row_groups());
std::iota(row_groups.begin(), row_groups.end(), 0);
ARROW_ASSIGN_OR_RAISE(auto batch_reader, reader->GetRecordBatchReader(row_groups))
int64_t batches = 0;
while (true) {
  ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->Next());
  if (!batch) break;
  ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
  batches++;
}

ARROW_RETURN_NOT_OK(writer->Close());
rout << "Wrote " << batches << " batches" << std::endl;
Code Output
Wrote 1 batches

Once we do so, we can retrieve the metadata for that dataset:

std::unique_ptr<arrow::flight::FlightInfo> flight_info;
ARROW_ASSIGN_OR_RAISE(flight_info, client->GetFlightInfo(descriptor));
rout << flight_info->descriptor().ToString() << std::endl;
rout << "=== Schema ===" << std::endl;
std::shared_ptr<arrow::Schema> info_schema;
arrow::ipc::DictionaryMemo dictionary_memo;
ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
rout << info_schema->ToString() << std::endl;
rout << "==============" << std::endl;
Code Output
<FlightDescriptor path='airquality.parquet'>
=== Schema ===
Ozone: int32
Solar.R: int32
Wind: double
Temp: int32
Month: int32
Day: int32
==============

And get the data back:

std::unique_ptr<arrow::flight::FlightStreamReader> stream;
ARROW_ASSIGN_OR_RAISE(stream, client->DoGet(flight_info->endpoints()[0].ticket));
std::shared_ptr<arrow::Table> table;
ARROW_ASSIGN_OR_RAISE(table, stream->ToTable());
arrow::PrettyPrintOptions print_options(/*indent=*/0, /*window=*/2);
ARROW_RETURN_NOT_OK(arrow::PrettyPrint(*table, print_options, &rout));
Code Output
Ozone: int32
Solar.R: int32
Wind: double
Temp: int32
Month: int32
Day: int32
----
Ozone:
  [
    [
      41,
      36,
      ...
      18,
      20
    ]
  ]
Solar.R:
  [
    [
      190,
      118,
      ...
      131,
      223
    ]
  ]
Wind:
  [
    [
      7.4,
      8,
      ...
      8,
      11.5
    ]
  ]
Temp:
  [
    [
      67,
      72,
      ...
      76,
      68
    ]
  ]
Month:
  [
    [
      5,
      5,
      ...
      9,
      9
    ]
  ]
Day:
  [
    [
      1,
      2,
      ...
      29,
      30
    ]
  ]

Then, we’ll delete the dataset:

arrow::flight::Action action{"drop_dataset",
                             arrow::Buffer::FromString("airquality.parquet")};
std::unique_ptr<arrow::flight::ResultStream> results;
ARROW_ASSIGN_OR_RAISE(results, client->DoAction(action));
rout << "Deleted dataset" << std::endl;
Code Output
Deleted dataset

And confirm that it’s been deleted:

std::unique_ptr<arrow::flight::FlightListing> listing;
ARROW_ASSIGN_OR_RAISE(listing, client->ListFlights());
while (true) {
  std::unique_ptr<arrow::flight::FlightInfo> flight_info;
  ARROW_ASSIGN_OR_RAISE(flight_info, listing->Next());
  if (!flight_info) break;
  rout << flight_info->descriptor().ToString() << std::endl;
  rout << "=== Schema ===" << std::endl;
  std::shared_ptr<arrow::Schema> info_schema;
  arrow::ipc::DictionaryMemo dictionary_memo;
  ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
  rout << info_schema->ToString() << std::endl;
  rout << "==============" << std::endl;
}
rout << "End of listing" << std::endl;
Code Output
End of listing

Finally, we’ll stop our server:

ARROW_RETURN_NOT_OK(server->Shutdown());
rout << "Server shut down successfully" << std::endl;
Code Output
Server shut down successfully

Setting gRPC client options

Options for gRPC clients can be passed in using the generic_options field of arrow::flight::FlightClientOptions. There is a list of available client options in the gRPC API documentation.

For example, you can change the maximum message length sent with:

auto client_options = arrow::flight::FlightClientOptions::Defaults();
// Set a very low limit at the gRPC layer to fail all calls
client_options.generic_options.emplace_back(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 2);

arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
                      arrow::flight::Location::ForGrpcTcp("localhost", server->port()));

std::unique_ptr<arrow::flight::FlightClient> client;
// pass client_options into Connect()
ARROW_ASSIGN_OR_RAISE(client,
                      arrow::flight::FlightClient::Connect(location, client_options));
rout << "Connected to " << location.ToString() << std::endl;
Code Output
Connected to grpc+tcp://localhost:40083

Flight Service with other gRPC endpoints

If you are using the gRPC backend, you can add other gRPC endpoints to the Flight server. While Flight clients won’t recognize these endpoints, general gRPC clients will be able to.

Note

If statically linking Arrow Flight, Protobuf and gRPC must also be statically linked, and the same goes for dynamic linking. Read more at https://arrow.apache.org/docs/cpp/build_system.html#a-note-on-linking

Creating the server

To create a gRPC service, first define a service using protobuf.

Hello world protobuf specification
 1syntax = "proto3";
 2
 3service HelloWorldService {
 4  rpc SayHello(HelloRequest) returns (HelloResponse);
 5}
 6
 7message HelloRequest {
 8  string name = 1;
 9}
10
11message HelloResponse {
12  string reply = 1;
13}

Next, you’ll need to compile that to provide the protobuf and gRPC generated files. See gRPC’s generating client and server code docs for details.

Then write an implementation for the gRPC service:

Hello world gRPC service implementation
 1class HelloWorldServiceImpl : public HelloWorldService::Service {
 2  grpc::Status SayHello(grpc::ServerContext*, const HelloRequest* request,
 3                        HelloResponse* reply) override {
 4    const std::string& name = request->name();
 5    if (name.empty()) {
 6      return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Must provide a name!");
 7    }
 8    reply->set_reply("Hello, " + name);
 9    return grpc::Status::OK;
10  }
11};  // end HelloWorldServiceImpl

Finally, use the builder_hook hook on arrow::flight::FlightServerOptions to register the additional gRPC service.

arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
                      arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));

arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
    new ParquetStorageService(std::move(root)));

// Create hello world service
HelloWorldServiceImpl grpc_service;

// Use builder_hook to register grpc service
options.builder_hook = [&](void* raw_builder) {
  auto* builder = reinterpret_cast<grpc::ServerBuilder*>(raw_builder);
  builder->RegisterService(&grpc_service);
};

ARROW_RETURN_NOT_OK(server->Init(options));
rout << "Listening on port " << server->port() << std::endl;
Code Output
Listening on port 44375

Creating the client

The Flight client implementation doesn’t know about any custom gRPC services, so to call them you’ll need to create a normal gRPC client. For the Hello World service, we use the HelloWorldService stub, which is provided by the compiled gRPC definition.

auto client_channel = grpc::CreateChannel("0.0.0.0:" + std::to_string(server->port()),
                                          grpc::InsecureChannelCredentials());

auto stub = HelloWorldService::NewStub(client_channel);

grpc::ClientContext context;
HelloRequest request;
request.set_name("Arrow User");
HelloResponse response;
grpc::Status status = stub->SayHello(&context, request, &response);
if (!status.ok()) {
  return arrow::Status::IOError(status.error_message());
}
rout << response.reply();
Code Output
Hello, Arrow User