SIGN IN SIGN UP
apache / arrow UNCLAIMED

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics

0 0 19 C++
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Arrow file and stream reader/writer classes, and other messaging tools
import os
import pyarrow as pa
ARROW-1214: [Python/C++] Add C++ functionality to more easily handle encapsulated IPC messages, Python bindings This patch does a bunch of things: * Decouples the RecordBatchStreamReader from the actual message iteration (which is handled by a new `arrow::ipc::MessageReader` interface * Enables `arrow::ipc::Message` to hold all of the memory for a complete unit of data: metadata plus body * Renames some IPC methods for better consistency (GetNextRecordBatch -> ReadNextRecordBatch) * Adds function to serialize a complete encapsulated message to an `arrow::io::OutputStream* * Add Python bindings for all of the above, introduce `pyarrow.Message`, `pyarrow.MessageReader`. Add `read_message` and `Message.serialize` functions for efficient memory round trips * Add `pyarrow.read_record_batch` for reading a single record batch given a message and a known schema Later we will want to add `pyarrow.read_schema`, but it seemed like a bit of work to make it work for dictionaries. This implements the C++ analogue to ARROW-1047, which was for Java. Not sure why I didn't create a JIRA about this. cc @icexelloss Author: Wes McKinney <wes.mckinney@twosigma.com> Closes #839 from wesm/ARROW-1214 and squashes the following commits: 07f1820a [Wes McKinney] Refactor to introduce MessageReader abstract type, use unique_ptr for messages instead of shared_ptr. First cut at Message, MessageReader Python API. Add read_message, C++/Python machinery for message roundtrips to Buffer, comparison. Add function to read RecordBatch from encapsulated message given schema.
2017-07-15 16:51:51 -04:00
from pyarrow.lib import (IpcReadOptions, IpcWriteOptions, ReadStats, WriteStats, # noqa
Message, MessageReader,
RecordBatchReader, _ReadPandasMixin,
GH-32276: [C++][FlightRPC] Add option to align RecordBatch buffers given to IPC reader (#44279) ### Rationale for this change Data retrieved via IPC is expected to provide memory-aligned arrays, but data retrieved via C++ Flight client is mis-aligned. Datafusion (Rust), which requires data type-specific alignment, cannot handle such data: #43552. https://arrow.apache.org/docs/format/Columnar.html#buffer-alignment-and-padding ### What changes are included in this PR? This adds option `arrow::ipc::IpcReadOptions.ensure_alignment` of type `arrow::ipc::Alignment` to configure how RecordBatch array buffers decoded by IPC are realigned. It supports no realignment (default), data type-specific alignment and 64-byte alignment. Implementation mirrors that of [`align_buffers` in arrow-rs](https://github.com/apache/arrow-rs/blob/3293a8c2f9062fca93bee2210d540a1d25155bf5/arrow-data/src/data.rs#L698-L711) (https://github.com/apache/arrow-rs/pull/4681). ### Are these changes tested? Configuration flag tested in unit test. Integration test with Flight server. Manually end-to-end tested that memory alignment fixes issue with reproduction code provided in #43552. ### Are there any user-facing changes? Adds option `IpcReadOptions.ensure_alignment` and enum type `Alignment`. * GitHub Issue: #32276 Lead-authored-by: Enrico Minack <github@enrico.minack.dev> Co-authored-by: David Li <li.davidm96@gmail.com> Co-authored-by: Antoine Pitrou <pitrou@free.fr> Signed-off-by: David Li <li.davidm96@gmail.com>
2025-05-20 01:32:04 +02:00
MetadataVersion, Alignment,
read_message, read_record_batch, read_schema,
ARROW-1214: [Python/C++] Add C++ functionality to more easily handle encapsulated IPC messages, Python bindings This patch does a bunch of things: * Decouples the RecordBatchStreamReader from the actual message iteration (which is handled by a new `arrow::ipc::MessageReader` interface * Enables `arrow::ipc::Message` to hold all of the memory for a complete unit of data: metadata plus body * Renames some IPC methods for better consistency (GetNextRecordBatch -> ReadNextRecordBatch) * Adds function to serialize a complete encapsulated message to an `arrow::io::OutputStream* * Add Python bindings for all of the above, introduce `pyarrow.Message`, `pyarrow.MessageReader`. Add `read_message` and `Message.serialize` functions for efficient memory round trips * Add `pyarrow.read_record_batch` for reading a single record batch given a message and a known schema Later we will want to add `pyarrow.read_schema`, but it seemed like a bit of work to make it work for dictionaries. This implements the C++ analogue to ARROW-1047, which was for Java. Not sure why I didn't create a JIRA about this. cc @icexelloss Author: Wes McKinney <wes.mckinney@twosigma.com> Closes #839 from wesm/ARROW-1214 and squashes the following commits: 07f1820a [Wes McKinney] Refactor to introduce MessageReader abstract type, use unique_ptr for messages instead of shared_ptr. First cut at Message, MessageReader Python API. Add read_message, C++/Python machinery for message roundtrips to Buffer, comparison. Add function to read RecordBatch from encapsulated message given schema.
2017-07-15 16:51:51 -04:00
read_tensor, write_tensor,
get_record_batch_size, get_tensor_size)
import pyarrow.lib as lib
class RecordBatchStreamReader(lib._RecordBatchStreamReader):
"""
Reader for the Arrow streaming binary format.
Parameters
----------
source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object
Either an in-memory buffer, or a readable file object.
If you want to use memory map use MemoryMappedFile as source.
options : pyarrow.ipc.IpcReadOptions
Options for IPC deserialization.
If None, default values will be used.
memory_pool : MemoryPool, default None
If None, default memory pool is used.
"""
def __init__(self, source, *, options=None, memory_pool=None):
options = _ensure_default_ipc_read_options(options)
self._open(source, options=options, memory_pool=memory_pool)
_ipc_writer_class_doc = """\
Parameters
----------
sink : str, pyarrow.NativeFile, or file-like Python object
Either a file path, or a writable file object.
schema : pyarrow.Schema
The Arrow schema for data to be written to the file.
options : pyarrow.ipc.IpcWriteOptions
Options for IPC serialization.
If None, default values will be used: the legacy format will not
be used unless overridden by setting the environment variable
ARROW_PRE_0_15_IPC_FORMAT=1, and the V5 metadata version will be
used unless overridden by setting the environment variable
ARROW_PRE_1_0_METADATA_VERSION=1."""
_ipc_file_writer_class_doc = (
_ipc_writer_class_doc
+ "\n"
+ """\
metadata : dict | pyarrow.KeyValueMetadata, optional
Key/value pairs (both must be bytes-like) that will be stored
in the file footer and are retrievable via
pyarrow.ipc.open_file(...).metadata."""
)
class RecordBatchStreamWriter(lib._RecordBatchStreamWriter):
__doc__ = f"""Writer for the Arrow streaming binary format
{_ipc_writer_class_doc}"""
def __init__(self, sink, schema, *, options=None):
options = _get_legacy_format_default(options)
self._open(sink, schema, options=options)
class RecordBatchFileReader(lib._RecordBatchFileReader):
"""
Class for reading Arrow record batch data from the Arrow binary file format
Parameters
----------
source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object
Either an in-memory buffer, or a readable file object.
If you want to use memory map use MemoryMappedFile as source.
footer_offset : int, default None
If the file is embedded in some larger file, this is the byte offset to
the very end of the file data
options : pyarrow.ipc.IpcReadOptions
Options for IPC serialization.
If None, default values will be used.
memory_pool : MemoryPool, default None
If None, default memory pool is used.
"""
def __init__(self, source, footer_offset=None, *, options=None,
memory_pool=None):
options = _ensure_default_ipc_read_options(options)
self._open(source, footer_offset=footer_offset,
options=options, memory_pool=memory_pool)
class RecordBatchFileWriter(lib._RecordBatchFileWriter):
__doc__ = f"""Writer to create the Arrow binary file format
{_ipc_file_writer_class_doc}"""
def __init__(self, sink, schema, *, options=None, metadata=None):
options = _get_legacy_format_default(options)
self._open(sink, schema, options=options, metadata=metadata)
def _get_legacy_format_default(options):
if options:
if not isinstance(options, IpcWriteOptions):
raise TypeError(f"expected IpcWriteOptions, got {type(options)}")
return options
metadata_version = MetadataVersion.V5
use_legacy_format = \
bool(int(os.environ.get('ARROW_PRE_0_15_IPC_FORMAT', '0')))
if bool(int(os.environ.get('ARROW_PRE_1_0_METADATA_VERSION', '0'))):
metadata_version = MetadataVersion.V4
return IpcWriteOptions(use_legacy_format=use_legacy_format,
metadata_version=metadata_version)
def _ensure_default_ipc_read_options(options):
if options and not isinstance(options, IpcReadOptions):
raise TypeError(f"expected IpcReadOptions, got {type(options)}")
return options or IpcReadOptions()
def new_stream(sink, schema, *, options=None):
return RecordBatchStreamWriter(sink, schema,
options=options)
new_stream.__doc__ = f"""\
Create an Arrow columnar IPC stream writer instance
{_ipc_writer_class_doc}
Returns
-------
writer : RecordBatchStreamWriter
A writer for the given sink
"""
def open_stream(source, *, options=None, memory_pool=None):
"""
Create reader for Arrow streaming format.
Parameters
----------
source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object
Either an in-memory buffer, or a readable file object.
options : pyarrow.ipc.IpcReadOptions
Options for IPC serialization.
If None, default values will be used.
memory_pool : MemoryPool, default None
If None, default memory pool is used.
Returns
-------
reader : RecordBatchStreamReader
A reader for the given source
"""
return RecordBatchStreamReader(source, options=options,
memory_pool=memory_pool)
def new_file(sink, schema, *, options=None, metadata=None):
return RecordBatchFileWriter(sink, schema, options=options, metadata=metadata)
new_file.__doc__ = f"""\
Create an Arrow columnar IPC file writer instance
{_ipc_file_writer_class_doc}
Returns
-------
writer : RecordBatchFileWriter
A writer for the given sink
"""
def open_file(source, footer_offset=None, *, options=None, memory_pool=None):
"""
Create reader for Arrow file format.
Parameters
----------
source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object
Either an in-memory buffer, or a readable file object.
footer_offset : int, default None
If the file is embedded in some larger file, this is the byte offset to
the very end of the file data.
options : pyarrow.ipc.IpcReadOptions
Options for IPC serialization.
If None, default values will be used.
memory_pool : MemoryPool, default None
If None, default memory pool is used.
Returns
-------
reader : RecordBatchFileReader
A reader for the given source
"""
return RecordBatchFileReader(
source, footer_offset=footer_offset,
options=options, memory_pool=memory_pool)
def serialize_pandas(df, *, nthreads=None, preserve_index=None):
"""
Serialize a pandas DataFrame into a buffer protocol compatible object.
Parameters
----------
df : pandas.DataFrame
nthreads : int, default None
Number of threads to use for conversion to Arrow, default all CPUs.
preserve_index : bool, default None
The default of None will store the index as a column, except for
RangeIndex which is stored as metadata only. If True, always
preserve the pandas index data as a column. If False, no index
information is saved and the result will have a default RangeIndex.
Returns
-------
buf : buffer
An object compatible with the buffer protocol.
"""
batch = pa.RecordBatch.from_pandas(df, nthreads=nthreads,
preserve_index=preserve_index)
sink = pa.BufferOutputStream()
with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
writer.write_batch(batch)
return sink.getvalue()
def deserialize_pandas(buf, *, use_threads=True):
"""Deserialize a buffer protocol compatible object into a pandas DataFrame.
Parameters
----------
buf : buffer
An object compatible with the buffer protocol.
use_threads : bool, default True
Whether to parallelize the conversion using multiple threads.
Returns
-------
df : pandas.DataFrame
The buffer deserialized as pandas DataFrame
"""
buffer_reader = pa.BufferReader(buf)
with pa.RecordBatchStreamReader(buffer_reader) as reader:
table = reader.read_all()
ARROW-2568: [Python] Expose thread pool size setting to Python, and deprecate "nthreads" where possible There are two areas where `nthreads` cannot be replaced immediately by the global thread pool: 1. when converting Pandas data to Arrow table or record batch, since it uses a Python `ThreadPoolExecutor` from pure Python code (see `dataframe_to_arrays` in `pandas_compat.py`) 2. when reading or writing Parquet data, since `parquet-cpp` relies on parallelization facilities in the stable version of Arrow (see https://github.com/apache/parquet-cpp/pull/467) Elsewhere, we add a `use_threads` boolean argument and deprecate `nthreads`. Author: Antoine Pitrou <antoine@python.org> Closes #2078 from pitrou/ARROW-2568 and squashes the following commits: 91187bf6 <Antoine Pitrou> Move use_threads flag into PandasOptions a7aeed0e <Antoine Pitrou> Factor out secession predicate f601d4e9 <Antoine Pitrou> ThreadPool::State pointer is const 4567a2c3 <Antoine Pitrou> Add a two-argument variant of ParallelFor() that uses the global CPU thread pool d0a527ab <Antoine Pitrou> Restore single-thread path in WriteTableToBlocks() 2171e6e2 <Antoine Pitrou> On Windows, avoid shutting down the global thread pool at process exit 934e5a11 <Antoine Pitrou> Add & operator between Statuses 96397076 <Antoine Pitrou> Factor out deprecation logic 6b685406 <Antoine Pitrou> Fix MSVC warning 6b6f64a1 <Antoine Pitrou> Make ThreadPool capacity an int, not a size_t 61669755 <Antoine Pitrou> Rename CPUThreadPool() to GetCpuThreadPool() d4eb8d47 <Antoine Pitrou> Export ThreadPool and CPUThreadPool() 6985afe6 <Antoine Pitrou> Lint fcca62f5 <Antoine Pitrou> Emit FutureWarning (which is visible by default) rather than DeprecationWarning 172fba37 <Antoine Pitrou> Use C++ API, rather than multiprocessing, in pyarrow.{cpu_count,set_cpu_count} 50310b88 <Antoine Pitrou> Add API function to get desired ThreadPool capacity 3417325c <Antoine Pitrou> ARROW-2568: WIP
2018-05-24 21:28:15 -04:00
return table.to_pandas(use_threads=use_threads)