Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions cpp/src/arrow/flight/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ set(ARROW_FLIGHT_SRCS
client_cookie_middleware.cc
client_tracing_middleware.cc
cookie_internal.cc
flight_data_decoder.cc
middleware.cc
serialization_internal.cc
server.cc
Expand Down Expand Up @@ -207,6 +208,21 @@ if(CMAKE_UNITY_BUILD AND WIN32)
PROPERTIES SKIP_UNITY_BUILD_INCLUSION TRUE)
endif()

# Suppress warnings from Abseil headers using deprecated <ciso646> in C++20.
# GCC 15+ with C++20 emits #warning which -Werror turns into an error.
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_CXX_STANDARD GREATER_EQUAL 20)
set(ARROW_FLIGHT_CXX20_WARNING_FLAGS "-Wno-cpp")
set_source_files_properties(server_tracing_middleware.cc
client_tracing_middleware.cc
transport/grpc/grpc_client.cc
transport/grpc/grpc_server.cc
transport/grpc/serialization_internal.cc
transport/grpc/protocol_grpc_internal.cc
transport/grpc/util_internal.cc
PROPERTIES COMPILE_OPTIONS
"${ARROW_FLIGHT_CXX20_WARNING_FLAGS}")
endif()

if(ARROW_WITH_OPENTELEMETRY)
list(APPEND ARROW_FLIGHT_SRCS otel_logging.cc)
endif()
Expand Down Expand Up @@ -320,6 +336,13 @@ if(ARROW_TESTING)
foreach(LIB_TARGET ${ARROW_FLIGHT_TESTING_LIBRARIES})
target_compile_definitions(${LIB_TARGET} PRIVATE ARROW_FLIGHT_EXPORTING)
endforeach()

# Suppress Abseil <ciso646> warnings in testing library (GCC 15+ with C++20)
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_CXX_STANDARD GREATER_EQUAL 20)
set_source_files_properties(test_auth_handlers.cc test_definitions.cc
test_flight_server.cc test_util.cc
PROPERTIES COMPILE_OPTIONS "-Wno-cpp")
endif()
endif()

add_arrow_test(flight_internals_test
Expand All @@ -334,11 +357,39 @@ add_arrow_test(flight_test
LABELS
"arrow_flight")

# PoC: Async Flight server using gRPC generic callback API
if(ARROW_BUILD_TESTS)
add_arrow_test(async_grpc_poc_test
SOURCES
transport/grpc/async_grpc_poc_test.cc
STATIC_LINK_LIBS
${ARROW_FLIGHT_TEST_LINK_LIBS}
LABELS
"arrow_flight")
endif()

# Suppress Abseil <ciso646> warnings in test files (GCC 15+ with C++20)
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_CXX_STANDARD GREATER_EQUAL 20)
if(TARGET arrow-flight-internals-test)
target_compile_options(arrow-flight-internals-test PRIVATE "-Wno-cpp")
endif()
if(TARGET arrow-flight-test)
target_compile_options(arrow-flight-test PRIVATE "-Wno-cpp")
endif()
if(TARGET arrow-async-grpc-poc-test)
target_compile_options(arrow-async-grpc-poc-test PRIVATE "-Wno-cpp")
endif()
endif()

# Build test server for unit tests or benchmarks
if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
add_executable(flight-test-server test_server.cc)
target_link_libraries(flight-test-server ${ARROW_FLIGHT_TEST_LINK_LIBS}
${GFLAGS_LIBRARIES})
# Suppress Abseil <ciso646> warnings (GCC 15+ with C++20)
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_CXX_STANDARD GREATER_EQUAL 20)
target_compile_options(flight-test-server PRIVATE "-Wno-cpp")
endif()

if(ARROW_BUILD_TESTS)
add_dependencies(arrow-flight-test flight-test-server)
Expand All @@ -365,6 +416,12 @@ if(ARROW_BUILD_BENCHMARKS)
target_link_libraries(arrow-flight-benchmark ${ARROW_FLIGHT_TEST_LINK_LIBS}
${GFLAGS_LIBRARIES})

# Suppress Abseil <ciso646> warnings (GCC 15+ with C++20)
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_CXX_STANDARD GREATER_EQUAL 20)
target_compile_options(arrow-flight-perf-server PRIVATE "-Wno-cpp")
target_compile_options(arrow-flight-benchmark PRIVATE "-Wno-cpp")
endif()

add_dependencies(arrow-flight-benchmark arrow-flight-perf-server)

add_dependencies(arrow_flight arrow-flight-benchmark)
Expand Down
135 changes: 135 additions & 0 deletions cpp/src/arrow/flight/flight_data_decoder.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/flight/flight_data_decoder.h"

#include "arrow/flight/serialization_internal.h"
#include "arrow/flight/transport.h"
#include "arrow/ipc/message.h"
#include "arrow/ipc/reader.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/status.h"

namespace arrow {
namespace flight {

namespace {

// FlightDataMessageReader is an ipc::MessageReader that accepts one at a time
// FlightData messages. Analogous to MessageReader::Open(InputStream*) but for
// individual FlightData messages directly read from the received buffers.
class FlightDataMessageReader : public ipc::MessageReader {
public:
void Push(internal::FlightData data) { data_ = std::move(data); }

::arrow::Result<std::unique_ptr<ipc::Message>> ReadNextMessage() override {
if (!data_.metadata) return nullptr;
return data_.OpenMessage();
}

std::shared_ptr<Buffer> ReadAppMetadata() { return data_.app_metadata; }

private:
internal::FlightData data_;
};

} // namespace

class FlightMessageDecoder::FlightMessageDecoderImpl {
public:
FlightMessageDecoderImpl(std::shared_ptr<FlightDataListener> listener,
ipc::IpcReadOptions options)
: listener_(std::move(listener)),
options_(std::move(options)),
message_reader_(new FlightDataMessageReader()) {}

Status Consume(std::shared_ptr<Buffer> buffer) {
ARROW_ASSIGN_OR_RAISE(auto data, internal::DeserializeFlightData(buffer));

if (!data.metadata) {
// Metadata-only message: no IPC content, just Flight app_metadata.
if (data.app_metadata && data.app_metadata->size() > 0) {
FlightStreamChunk chunk;
chunk.app_metadata = std::move(data.app_metadata);
RETURN_NOT_OK(listener_->OnNext(std::move(chunk)));
}
return Status::OK();
}

message_reader_->Push(std::move(data));

if (!batch_reader_) {
// Initialize RecordBatchStreamReader and read the first IPC message.
// It must be a schema.
// RecordBatchStreamReader requiring unique_ptr is slightly awkward
// since we want to keep a reference to the message reader.
ARROW_ASSIGN_OR_RAISE(
batch_reader_,
ipc::RecordBatchStreamReader::Open(
std::unique_ptr<ipc::MessageReader>(message_reader_), options_));
return listener_->OnSchemaDecoded(batch_reader_->schema());
}

std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(batch_reader_->ReadNext(&batch));
auto app_metadata = message_reader_->ReadAppMetadata();

if (batch) {
FlightStreamChunk chunk;
chunk.data = std::move(batch);
chunk.app_metadata = std::move(app_metadata);
return listener_->OnNext(std::move(chunk));
}
// This has to be a Dictionary batch.
// TODO: Add unit test validating assumption.
if (app_metadata && app_metadata->size() > 0) {
FlightStreamChunk chunk;
chunk.app_metadata = std::move(app_metadata);
return listener_->OnNext(std::move(chunk));
}
return Status::OK();
}

std::shared_ptr<Schema> schema() const {
return batch_reader_ ? batch_reader_->schema() : nullptr;
}

private:
std::shared_ptr<FlightDataListener> listener_;
ipc::IpcReadOptions options_;
// This is owned by the RecordBatchStreamReader once it's passed to it.
// We want to keep a reference to it so we can extract the app_metadata.
FlightDataMessageReader* message_reader_;
std::shared_ptr<ipc::RecordBatchStreamReader> batch_reader_;
};

FlightMessageDecoder::FlightMessageDecoder(std::shared_ptr<FlightDataListener> listener,
ipc::IpcReadOptions options)
: impl_(std::make_unique<FlightMessageDecoderImpl>(std::move(listener),
std::move(options))) {}

FlightMessageDecoder::~FlightMessageDecoder() = default;

Status FlightMessageDecoder::Consume(std::shared_ptr<Buffer> buffer) {
return impl_->Consume(std::move(buffer));
}

std::shared_ptr<Schema> FlightMessageDecoder::schema() const { return impl_->schema(); }

} // namespace flight
} // namespace arrow
77 changes: 77 additions & 0 deletions cpp/src/arrow/flight/flight_data_decoder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "arrow/flight/types.h"
#include "arrow/flight/visibility.h"
#include "arrow/ipc/options.h"
#include "arrow/ipc/reader.h"
#include "arrow/result.h"
#include "arrow/status.h"

namespace arrow {
namespace flight {

/// \brief A general listener class to receive events from FlightMessageDecoder
///
/// User must implement callback methods for interested events.
class ARROW_FLIGHT_EXPORT FlightDataListener : public ipc::Listener {
public:
/// \brief Called for each decoded FlightStreamChunk.
///
/// chunk.data is the decoded RecordBatch, or nullptr for metadata-only
/// messages.
virtual Status OnNext(FlightStreamChunk chunk) = 0;
};

/// \brief Push style stream decoder that turns raw arrow Buffers into
/// FlightStreamChunks.
///
/// This class decodes Apache Arrow Flight data format from arrow::Buffer
/// and fires events on the provided FlightDataListener.
class ARROW_FLIGHT_EXPORT FlightMessageDecoder {
public:
explicit FlightMessageDecoder(
std::shared_ptr<FlightDataListener> listener,
ipc::IpcReadOptions options = ipc::IpcReadOptions::Defaults());
~FlightMessageDecoder();

/// \brief Decode one FlightData message directly from abuffer.
///
/// Fires listener->OnSchemaDecoded() on the first message containing
/// a schema, listener->OnNext() for each subsequent record batch,
/// metadata-only message or dictionary batch.
///
/// \param[in] buffer a raw buffer directly from the transport. Example
/// the arrow::Buffer extracted from the grpc::ByteBuffer from the gRPC transport.
/// \return Status
Status Consume(std::shared_ptr<Buffer> buffer);

/// \brief The decoded schema.
///
/// Available after the first Consume() call that contains a schema message.
/// Returns nullptr if no schema has been decoded yet.
std::shared_ptr<Schema> schema() const;

private:
class FlightMessageDecoderImpl;
std::unique_ptr<FlightMessageDecoderImpl> impl_;
};

} // namespace flight
} // namespace arrow
Loading
Loading