Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ def print_entry(label, value):

from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table,
concat_arrays, concat_tables, TableGroupBy,
RecordBatchReader, concat_batches)
RecordBatchReader, AsyncRecordBatchReader,
concat_batches)
Comment on lines 264 to +267
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions a "new API to produce an async device stream", but the Python surface added here only exports the AsyncRecordBatchReader type; there is no public factory/function that returns an instance (search shows no other AsyncRecordBatchReader references besides ipc_async.pxi and this import). If the user-facing entrypoint is intended to be part of this PR, it appears to be missing.

Copilot uses AI. Check for mistakes.

# Exceptions
from pyarrow.lib import (ArrowCancelled,
Expand Down
39 changes: 39 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -3179,6 +3179,9 @@ cdef extern from "arrow/c/abi.h":
int64_t device_id
int32_t device_type

cdef struct ArrowAsyncDeviceStreamHandler:
void (*release)(ArrowAsyncDeviceStreamHandler*) noexcept nogil

cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:
CStatus ExportType(CDataType&, ArrowSchema* out)
CResult[shared_ptr[CDataType]] ImportType(ArrowSchema*)
Expand Down Expand Up @@ -3225,6 +3228,42 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:
CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch(
ArrowDeviceArray*, ArrowSchema*)

# Opaque type for the async generator callable
cdef cppclass CAsyncRecordBatchGenerator_Generator \
"arrow::AsyncGenerator<arrow::RecordBatchWithMetadata>":
pass

cdef cppclass CAsyncRecordBatchGenerator \
"arrow::AsyncRecordBatchGenerator":
shared_ptr[CSchema] schema
CDeviceAllocationType device_type
CAsyncRecordBatchGenerator_Generator generator

CFuture[CAsyncRecordBatchGenerator] CreateAsyncDeviceStreamHandler(
ArrowAsyncDeviceStreamHandler* handler,
CExecutor* executor,
uint64_t queue_size)

CFuture[CAsyncRecordBatchGenerator] CreateAsyncDeviceStreamHandler(
ArrowAsyncDeviceStreamHandler* handler,
CExecutor* executor)


cdef extern from "arrow/python/async_stream.h" namespace "arrow::py" nogil:
CFuture[CRecordBatchWithMetadata] CallAsyncGenerator(
CAsyncRecordBatchGenerator_Generator& generator)

CFuture[CAsyncRecordBatchGenerator] RoundtripAsyncBatches(
shared_ptr[CSchema] schema,
vector[shared_ptr[CRecordBatch]] batches,
CExecutor* executor,
uint64_t queue_size)

CFuture[CAsyncRecordBatchGenerator] RoundtripAsyncBatches(
shared_ptr[CSchema] schema,
vector[shared_ptr[CRecordBatch]] batches,
CExecutor* executor)


cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil:
CResult[int64_t] ReferencedBufferSize(const CArray& array_data)
Expand Down
156 changes: 156 additions & 0 deletions python/pyarrow/ipc_async.pxi
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


class _AsyncioCall:
"""State for an async operation using asyncio."""

def __init__(self):
import asyncio
self._future = asyncio.get_running_loop().create_future()

def as_awaitable(self):
return self._future

def wakeup(self, result_or_exception):
loop = self._future.get_loop()
if isinstance(result_or_exception, BaseException):
loop.call_soon_threadsafe(
self._future.set_exception, result_or_exception)
else:
loop.call_soon_threadsafe(
self._future.set_result, result_or_exception)


cdef object _wrap_record_batch_or_none(CRecordBatchWithMetadata batch_with_md):
"""Wrap a CRecordBatchWithMetadata as a RecordBatch, or return None at end-of-stream."""
if batch_with_md.batch.get() == NULL:
return None
return pyarrow_wrap_batch(batch_with_md.batch)


cdef object _wrap_async_generator(CAsyncRecordBatchGenerator gen):
"""Wrap a CAsyncRecordBatchGenerator into an AsyncRecordBatchReader."""
cdef AsyncRecordBatchReader reader = AsyncRecordBatchReader.__new__(
AsyncRecordBatchReader)
cdef CAsyncRecordBatchGenerator* p = new CAsyncRecordBatchGenerator()
p.schema = gen.schema
p.device_type = gen.device_type
p.generator = move(gen.generator)
reader.generator.reset(p)
reader._schema = None
return reader


cdef class AsyncRecordBatchReader(_Weakrefable):
"""Asynchronous reader for a stream of record batches.

This class provides an async iterator interface for consuming record
batches from an asynchronous device stream.

This interface is EXPERIMENTAL.

Examples
--------
>>> async for batch in reader: # doctest: +SKIP
... process(batch)
"""

def __init__(self):
raise TypeError(
f"Do not call {self.__class__.__name__}'s constructor directly, "
"use factory methods instead.")
Comment on lines +75 to +76
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor error message instructs users to "use factory methods instead", but no public factory is introduced for AsyncRecordBatchReader in this PR (only the private test helper _test_roundtrip_async). Either provide/mention the actual public factory in the message, or adjust the message to avoid pointing to APIs that don't exist.

Suggested change
f"Do not call {self.__class__.__name__}'s constructor directly, "
"use factory methods instead.")
f"Do not call {self.__class__.__name__}'s constructor directly; "
f"{self.__class__.__name__} instances are created by pyarrow APIs.")

Copilot uses AI. Check for mistakes.

@property
def schema(self):
"""
Shared schema of the record batches in the stream.

Returns
-------
Schema
"""
if self._schema is None:
self._schema = pyarrow_wrap_schema(self.generator.get().schema)
return self._schema

def __aiter__(self):
return self

async def __anext__(self):
batch = await self._read_next_async()
if batch is None:
raise StopAsyncIteration
return batch

async def _read_next_async(self):
call = _AsyncioCall()
self._read_next(call)
return await call.as_awaitable()

cdef _read_next(self, call):
cdef CFuture[CRecordBatchWithMetadata] c_future

with nogil:
c_future = CallAsyncGenerator(self.generator.get().generator)

BindFuture(move(c_future), call.wakeup, _wrap_record_batch_or_none)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
pass


async def _test_roundtrip_async(schema, batches, queue_size=5):
"""Test helper: create an async producer+consumer pair and return reader.

EXPERIMENTAL: This function is intended for testing purposes only.

Parameters
----------
schema : Schema
The schema of the record batches.
batches : list of RecordBatch
The record batches to produce.
queue_size : int, default 5
Number of batches to request ahead.

Returns
-------
AsyncRecordBatchReader
"""
call = _AsyncioCall()
_start_roundtrip(call, schema, batches, queue_size)
return await call.as_awaitable()


cdef _start_roundtrip(call, Schema schema, list batches, uint64_t queue_size):
cdef:
shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
vector[shared_ptr[CRecordBatch]] c_batches
CFuture[CAsyncRecordBatchGenerator] c_future

for batch in batches:
c_batches.push_back((<RecordBatch?>batch).sp_batch)

with nogil:
c_future = RoundtripAsyncBatches(
c_schema, move(c_batches), GetCpuThreadPool(), queue_size)

BindFuture(move(c_future), call.wakeup, _wrap_async_generator)
8 changes: 8 additions & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,14 @@ cdef class RecordBatchReader(_Weakrefable):
SharedPtrNoGIL[CRecordBatchReader] reader


cdef class AsyncRecordBatchReader(_Weakrefable):
cdef:
SharedPtrNoGIL[CAsyncRecordBatchGenerator] generator
Schema _schema

cdef _read_next(self, call)


cdef class CacheOptions(_Weakrefable):
cdef:
CCacheOptions wrapped
Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ include "io.pxi"
# IPC / Messaging
include "ipc.pxi"

# Async IPC
include "ipc_async.pxi"

# Micro-benchmark routines
include "benchmark.pxi"

Expand Down
72 changes: 72 additions & 0 deletions python/pyarrow/src/arrow/python/async_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstring>
#include <memory>
#include <utility>
#include <vector>

#include "arrow/c/bridge.h"
#include "arrow/record_batch.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/future.h"
#include "arrow/util/thread_pool.h"

namespace arrow::py {

/// \brief Call an AsyncGenerator<RecordBatchWithMetadata> and return the Future.
///
/// This is needed because Cython cannot invoke std::function objects directly.
inline Future<RecordBatchWithMetadata> CallAsyncGenerator(
AsyncGenerator<RecordBatchWithMetadata>& generator) {
return generator();
}

/// \brief Create a roundtrip async producer+consumer pair for testing.
///
/// Allocates an ArrowAsyncDeviceStreamHandler on the heap, calls
/// CreateAsyncDeviceStreamHandler (consumer side), then submits
/// ExportAsyncRecordBatchReader (producer side) on the given executor.
/// Returns a Future that resolves to the AsyncRecordBatchGenerator once
/// the schema is available.
inline Future<AsyncRecordBatchGenerator> RoundtripAsyncBatches(
std::shared_ptr<Schema> schema, std::vector<std::shared_ptr<RecordBatch>> batches,
::arrow::internal::Executor* executor, uint64_t queue_size = 5) {
// Heap-allocate the handler so it outlives this function.
auto* handler = new ArrowAsyncDeviceStreamHandler;
std::memset(handler, 0, sizeof(ArrowAsyncDeviceStreamHandler));

auto fut_gen = CreateAsyncDeviceStreamHandler(handler, executor, queue_size);

// Submit the export to the executor so it runs concurrently with the consumer.
auto submit_result = executor->Submit(
[schema = std::move(schema), batches = std::move(batches), handler]() mutable {
auto generator = MakeVectorGenerator(std::move(batches));
return ExportAsyncRecordBatchReader(std::move(schema), std::move(generator),
DeviceAllocationType::kCPU, handler);
});

if (!submit_result.ok()) {
return Future<AsyncRecordBatchGenerator>::MakeFinished(submit_result.status());
}

return fut_gen;
Comment on lines +48 to +69
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RoundtripAsyncBatches heap-allocates ArrowAsyncDeviceStreamHandler but never deletes it. CreateAsyncDeviceStreamHandler sets handler->release to only delete private_data (see cpp/src/arrow/c/bridge.cc:2696-2698), and ExportAsyncRecordBatchReader calls handler->release(handler) but does not free the handler itself, so this leaks per roundtrip and can accumulate in test runs. Manage handler ownership explicitly (e.g., capture the Future returned by executor->Submit, attach a continuation that deletes the handler after ExportAsyncRecordBatchReader completes, and ensure the submit-failure path also cleans up and marks the consumer future finished).

Copilot uses AI. Check for mistakes.
}

} // namespace arrow::py
Loading
Loading