GH-37937: [C++][FlightRPC] Investigate using gRPC's generic API using gRPC's BidiReactor#49339
GH-37937: [C++][FlightRPC] Investigate using gRPC's generic API using gRPC's BidiReactor#49339raulcd wants to merge 5 commits intoapache:mainfrom
Conversation
…ges. Build new PayloadData::SerializeToBuffers method to retrieve list of arrow::buffers from a PayloadData. This function internally is a copy on what we had on FlightDataSerialize but using arrow buffers instead of grpc::ByteBuffers. Move the logic to a single place and reuse on FlightDataSerialize
|
|
|
Some initial discussion happened on this same branch on a PR my fork, see more details here: I am moving the PR to here for visibility. |
raulcd
left a comment
There was a problem hiding this comment.
@lidavidm @pitrou I've spent some time today learning more about our FlightDataSerialize method and how we could expose an API that leaves gRPC out of the picture using a BufferVector instead. This is only for the write path so far but I wanted to share early to validate this is in-line with what we had been discussing.
| auto reader = | ||
| RecordBatchReader::Make({batch, batch, batch, batch, batch}).ValueOrDie(); | ||
| data_stream_ = std::make_unique<RecordBatchStream>(std::move(reader)); | ||
| payload = data_stream_->GetSchemaPayload().ValueOrDie(); | ||
| } else { | ||
| payload = data_stream_->Next().ValueOrDie(); |
There was a problem hiding this comment.
As discussed we should use RecordBatchStream to get the FlightPayloads not generate the payloads manually.
| return; | ||
| } | ||
|
|
||
| auto buffers = payload.SerializeToBuffers().ValueOrDie(); |
There was a problem hiding this comment.
Extract the buffers (arrow::BufferVector) directly from the FlightPayload this is a new API.
| ::grpc::Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out, | ||
| bool* own_buffer) { |
There was a problem hiding this comment.
All the manual deserialization has been moved to a common arrow::Result<arrow::BufferVector> SerializePayloadToBuffers(const arrow::flight::FlightPayload& msg) on cpp/src/arrow/flight/serialization_internal.cc which is called from FlightPayload::SerializeToBuffers.
We just use the arrow::buffers here.
| arrow::Status IpcMessageHeaderSize(const arrow::ipc::IpcPayload& ipc_msg, bool has_body, | ||
| size_t* header_size, int32_t* metadata_size) { |
There was a problem hiding this comment.
This and SerializePayloadToBuffers below are almost copy&paste from what we had at cpp/src/arrow/flight/transport/grpc/serialization_internal.cc --> FlightDataSerialize
the main difference is they deal with arrow::Buffers instead of grpc::ByteBuffer's
…tly with arrow::Buffer and FlightData
…entation that consumes arrow::buffers and triggers calls to a user built listener once RecordBatch has been read
Warning
Do not merge, this is a PoC being discussed at the moment
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
This PR includes breaking changes to public APIs. (If there are any breaking changes to public APIs, please explain which changes are breaking. If not, you can remove this.)
This PR contains a "Critical Fix". (If the changes fix either (a) a security vulnerability, (b) a bug that caused incorrect or invalid data to be produced, or (c) a bug that causes a crash (even when the API contract is upheld), please provide explanation. If not, you can remove this.)