Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream parameters in pylibcudf IO APIs #17620

Open
wants to merge 30 commits into
base: branch-25.04
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e27cad2
Add stream parameters in pylibcudf IO APIs
Matt711 Dec 18, 2024
a72988f
add remaining streams
Matt711 Dec 18, 2024
76ffc61
Merge branch 'branch-25.02' into fea/plc/io/streams
Matt711 Dec 18, 2024
48fc8a9
clean up
Matt711 Dec 18, 2024
8cf0eb1
use stream
Matt711 Dec 18, 2024
dee9fda
add stream to parquet_chunked_writer
Matt711 Dec 18, 2024
b06dabc
add stream to orc_chunked_writer
Matt711 Dec 18, 2024
4fa1a1a
Update python/pylibcudf/pylibcudf/io/csv.pyx
Matt711 Dec 18, 2024
a60f800
Update python/pylibcudf/pylibcudf/io/csv.pyx
Matt711 Dec 18, 2024
5727aa1
Update python/pylibcudf/pylibcudf/io/json.pyx
Matt711 Dec 18, 2024
a8da383
Merge branch 'branch-25.02' into fea/plc/io/streams
Matt711 Dec 18, 2024
9d1cc92
clean up
Matt711 Dec 18, 2024
8477869
fix typo
Matt711 Dec 18, 2024
e117d64
add stream param to cpp_read_orc
Matt711 Dec 18, 2024
0fd96db
Merge branch 'branch-25.02' into fea/plc/io/streams
Matt711 Dec 18, 2024
8758baa
Merge branch 'branch-25.02' into fea/plc/io/streams
Matt711 Dec 18, 2024
3913229
add a test
Matt711 Dec 19, 2024
5affbbc
Update python/pylibcudf/pylibcudf/io/json.pyx
Matt711 Dec 19, 2024
218e73b
Update python/pylibcudf/pylibcudf/io/json.pyx
Matt711 Dec 19, 2024
c29cdc4
remove stream param from avro reader
Matt711 Dec 19, 2024
9f7e87e
clean up
Matt711 Dec 19, 2024
3f84e5c
clean up
Matt711 Dec 19, 2024
f34b519
Merge branch 'branch-25.02' into fea/plc/io/streams
Matt711 Dec 20, 2024
93dd969
Merge branch 'branch-25.02' into fea/plc/io/streams
Matt711 Feb 4, 2025
d730631
Merge branch 'branch-25.04' of https://github.com/rapidsai/cudf into …
Matt711 Feb 4, 2025
248e713
use function overloading
Matt711 Feb 4, 2025
d3f8808
add tests
Matt711 Feb 5, 2025
c4a722e
remove old test
Matt711 Feb 5, 2025
46c8ec5
Merge branch 'branch-25.04' into fea/plc/io/streams
Matt711 Feb 5, 2025
c717721
add doc
Matt711 Feb 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions python/pylibcudf/pylibcudf/io/avro.pxd
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.
from pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from pylibcudf.libcudf.io.avro cimport avro_reader_options, avro_reader_options_builder
from pylibcudf.libcudf.types cimport size_type
from rmm.pylibrmm.stream cimport Stream


from pylibcudf.libcudf.types cimport size_type

cdef class AvroReaderOptions:
cdef avro_reader_options c_obj
cdef SourceInfo source
Expand All @@ -20,4 +19,4 @@ cdef class AvroReaderOptionsBuilder:
cpdef AvroReaderOptionsBuilder num_rows(self, size_type num_rows)
cpdef AvroReaderOptions build(self)

cpdef TableWithMetadata read_avro(AvroReaderOptions options)
cpdef TableWithMetadata read_avro(AvroReaderOptions options, Stream stream = *)
6 changes: 5 additions & 1 deletion python/pylibcudf/pylibcudf/io/avro.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
from rmm.pylibrmm.stream import Stream

from pylibcudf.io.types import SourceInfo, TableWithMetadata

__all__ = ["AvroReaderOptions", "AvroReaderOptionsBuilder", "read_avro"]
Expand All @@ -13,4 +15,6 @@ class AvroReaderOptionsBuilder:
def num_rows(num_rows: int) -> AvroReaderOptionsBuilder: ...
def build(self) -> AvroReaderOptions: ...

def read_avro(options: AvroReaderOptions) -> TableWithMetadata: ...
def read_avro(
options: AvroReaderOptions, stream: Stream = None
) -> TableWithMetadata: ...
12 changes: 9 additions & 3 deletions python/pylibcudf/pylibcudf/io/avro.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.

from libcpp.string cimport string
from libcpp.utility cimport move
Expand All @@ -9,6 +9,8 @@ from pylibcudf.libcudf.io.avro cimport (
read_avro as cpp_read_avro,
)
from pylibcudf.libcudf.types cimport size_type
from rmm.pylibrmm.stream cimport Stream


__all__ = ["read_avro", "AvroReaderOptions", "AvroReaderOptionsBuilder"]

Expand Down Expand Up @@ -126,7 +128,8 @@ cdef class AvroReaderOptionsBuilder:


cpdef TableWithMetadata read_avro(
AvroReaderOptions options
AvroReaderOptions options,
Stream stream = None,
):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Stream stream = None,
Stream stream = DEFAULT_STREAM,

"""
Read from Avro format.
Expand All @@ -142,6 +145,9 @@ cpdef TableWithMetadata read_avro(
Settings for controlling reading behavior
"""
with nogil:
c_result = move(cpp_read_avro(options.c_obj))
if stream is not None:
c_result = move(cpp_read_avro(options.c_obj, stream.view()))
else:
c_result = move(cpp_read_avro(options.c_obj))

Comment on lines +150 to +153
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if stream is not None:
c_result = move(cpp_read_avro(options.c_obj, stream.view()))
else:
c_result = move(cpp_read_avro(options.c_obj))
c_result = move(cpp_read_avro(options.c_obj, stream.view()))

return TableWithMetadata.from_libcudf(c_result)
7 changes: 4 additions & 3 deletions python/pylibcudf/pylibcudf/io/csv.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.

from libcpp.vector cimport vector
from libcpp.string cimport string
Expand All @@ -18,6 +18,7 @@ from pylibcudf.libcudf.io.types cimport (
table_with_metadata,
)
from pylibcudf.libcudf.types cimport size_type
from rmm.pylibrmm.stream cimport Stream

cdef class CsvReaderOptions:
cdef csv_reader_options c_obj
Expand Down Expand Up @@ -61,7 +62,7 @@ cdef class CsvReaderOptionsBuilder:
cpdef CsvReaderOptionsBuilder dayfirst(self, bool dayfirst)
cpdef CsvReaderOptions build(self)

cpdef TableWithMetadata read_csv(CsvReaderOptions options)
cpdef TableWithMetadata read_csv(CsvReaderOptions options, Stream stream = *)

cdef class CsvWriterOptions:
cdef csv_writer_options c_obj
Expand All @@ -84,4 +85,4 @@ cdef class CsvWriterOptionsBuilder:
cpdef CsvWriterOptions build(self)


cpdef void write_csv(CsvWriterOptions options)
cpdef void write_csv(CsvWriterOptions options, Stream stream = *)
48 changes: 5 additions & 43 deletions python/pylibcudf/pylibcudf/io/csv.pyi
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from collections.abc import Mapping

from typing_extensions import Self

from rmm._cuda.stream import Stream

from pylibcudf.io.types import (
CompressionType,
QuoteStyle,
Expand Down Expand Up @@ -56,48 +56,10 @@ class CsvReaderOptionsBuilder:
def build(self) -> CsvReaderOptions: ...

def read_csv(
source_info: SourceInfo,
*,
compression: CompressionType = CompressionType.AUTO,
byte_range_offset: int = 0,
byte_range_size: int = 0,
col_names: list[str] | None = None,
prefix: str = "",
mangle_dupe_cols: bool = True,
usecols: list[int] | list[str] | None = None,
nrows: int = -1,
skiprows: int = 0,
skipfooter: int = 0,
header: int = 0,
lineterminator: str = "\n",
delimiter: str | None = None,
thousands: str | None = None,
decimal: str = ".",
comment: str | None = None,
delim_whitespace: bool = False,
skipinitialspace: bool = False,
skip_blank_lines: bool = True,
quoting: QuoteStyle = QuoteStyle.MINIMAL,
quotechar: str = '"',
doublequote: bool = True,
parse_dates: list[str] | list[int] | None = None,
parse_hex: list[str] | list[int] | None = None,
# Technically this should be dict/list
# but using a fused type prevents using None as default
dtypes: Mapping[str, DataType] | list[DataType] | None = None,
true_values: list[str] | None = None,
false_values: list[str] | None = None,
na_values: list[str] | None = None,
keep_default_na: bool = True,
na_filter: bool = True,
dayfirst: bool = False,
# Note: These options are supported by the libcudf reader
# but are not exposed here since there is no demand for them
# on the Python side yet.
# detect_whitespace_around_quotes: bool = False,
# timestamp_type: DataType = DataType(type_id.EMPTY),
options: CsvReaderOptions,
stream: Stream = None,
) -> TableWithMetadata: ...
def write_csv(options: CsvWriterOptionsBuilder): ...
def write_csv(options: CsvWriterOptionsBuilder, stream: Stream = None): ...

class CsvWriterOptions:
def __init__(self): ...
Expand Down
20 changes: 14 additions & 6 deletions python/pylibcudf/pylibcudf/io/csv.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.

from libcpp cimport bool
from libcpp.map cimport map
Expand All @@ -22,6 +22,7 @@ from pylibcudf.libcudf.io.types cimport (
from pylibcudf.libcudf.types cimport data_type, size_type
from pylibcudf.types cimport DataType
from pylibcudf.table cimport Table
from rmm.pylibrmm.stream cimport Stream

__all__ = [
"read_csv",
Expand Down Expand Up @@ -629,7 +630,8 @@ cdef class CsvReaderOptionsBuilder:


cpdef TableWithMetadata read_csv(
CsvReaderOptions options
CsvReaderOptions options,
Stream stream = None,
):
"""
Read from CSV format.
Expand All @@ -646,7 +648,10 @@ cpdef TableWithMetadata read_csv(
"""
cdef table_with_metadata c_result
with nogil:
c_result = move(cpp_read_csv(options.c_obj))
if stream is not None:
c_result = move(cpp_read_csv(options.c_obj, stream.view()))
else:
c_result = move(cpp_read_csv(options.c_obj))

cdef TableWithMetadata tbl_meta = TableWithMetadata.from_libcudf(c_result)
return tbl_meta
Expand Down Expand Up @@ -831,7 +836,8 @@ cdef class CsvWriterOptionsBuilder:


cpdef void write_csv(
CsvWriterOptions options
CsvWriterOptions options,
Stream stream = None,
):
"""
Write to CSV format.
Expand All @@ -846,6 +852,8 @@ cpdef void write_csv(
options: CsvWriterOptions
Settings for controlling writing behavior
"""

with nogil:
cpp_write_csv(move(options.c_obj))
if stream is not None:
cpp_write_csv(move(options.c_obj), stream.view())
else:
cpp_write_csv(move(options.c_obj))
6 changes: 4 additions & 2 deletions python/pylibcudf/pylibcudf/io/json.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.
from libcpp cimport bool
from pylibcudf.io.types cimport (
SinkInfo,
Expand All @@ -14,6 +14,7 @@ from pylibcudf.libcudf.io.json cimport (
json_writer_options_builder,
)
from pylibcudf.libcudf.types cimport size_type
from rmm.pylibrmm.stream cimport Stream
from pylibcudf.table cimport Table


Expand Down Expand Up @@ -75,9 +76,10 @@ cdef class JsonWriterOptionsBuilder:
cpdef JsonWriterOptionsBuilder compression(self, compression_type comptype)
cpdef JsonWriterOptions build(self)

cpdef void write_json(JsonWriterOptions options)
cpdef void write_json(JsonWriterOptions options, Stream stream = *)

cpdef tuple chunked_read_json(
JsonReaderOptions options,
int chunk_size= *,
Stream stream = *,
)
5 changes: 4 additions & 1 deletion python/pylibcudf/pylibcudf/io/json.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ from typing import TypeAlias

from typing_extensions import Self

from rmm._cuda.stream import Stream

from pylibcudf.column import Column
from pylibcudf.io.types import (
CompressionType,
Expand Down Expand Up @@ -70,8 +72,9 @@ class JsonWriterOptionsBuilder:
def compression(self, comptype: CompressionType) -> Self: ...
def build(self) -> JsonWriterOptions: ...

def write_json(options: JsonWriterOptions) -> None: ...
def write_json(options: JsonWriterOptions, stream: Stream = None) -> None: ...
def chunked_read_json(
options: JsonReaderOptions,
chunk_size: int = 100_000_000,
stream: Stream = None,
) -> tuple[list[Column], list[str], ChildNameToTypeMap]: ...
25 changes: 19 additions & 6 deletions python/pylibcudf/pylibcudf/io/json.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.string cimport string
Expand All @@ -20,6 +20,8 @@ from pylibcudf.libcudf.io.types cimport (
)
from pylibcudf.libcudf.types cimport data_type, size_type
from pylibcudf.types cimport DataType
from rmm.pylibrmm.stream cimport Stream


__all__ = [
"chunked_read_json",
Expand Down Expand Up @@ -422,6 +424,7 @@ cdef class JsonReaderOptionsBuilder:
cpdef tuple chunked_read_json(
JsonReaderOptions options,
int chunk_size=100_000_000,
Stream stream = None,
):
"""
Reads chunks of a JSON file into a :py:class:`~.types.TableWithMetadata`.
Expand Down Expand Up @@ -455,7 +458,10 @@ cpdef tuple chunked_read_json(

try:
with nogil:
c_result = move(cpp_read_json(options.c_obj))
if stream is not None:
c_result = move(cpp_read_json(options.c_obj, stream.view()))
else:
c_result = move(cpp_read_json(options.c_obj))
except (ValueError, OverflowError):
break
if meta_names is None:
Expand Down Expand Up @@ -483,7 +489,8 @@ cpdef tuple chunked_read_json(


cpdef TableWithMetadata read_json(
JsonReaderOptions options
JsonReaderOptions options,
Stream stream = None,
):
"""
Read from JSON format.
Expand All @@ -506,7 +513,10 @@ cpdef TableWithMetadata read_json(
cdef table_with_metadata c_result

with nogil:
c_result = move(cpp_read_json(options.c_obj))
if stream is not None:
c_result = move(cpp_read_json(options.c_obj, stream.view()))
else:
c_result = move(cpp_read_json(options.c_obj))

return TableWithMetadata.from_libcudf(c_result)

Expand Down Expand Up @@ -694,7 +704,7 @@ cdef class JsonWriterOptionsBuilder:
return json_options


cpdef void write_json(JsonWriterOptions options):
cpdef void write_json(JsonWriterOptions options, Stream stream = None):
"""
Writes a set of columns to JSON format.

Expand All @@ -708,4 +718,7 @@ cpdef void write_json(JsonWriterOptions options):
None
"""
with nogil:
cpp_write_json(options.c_obj)
if stream is not None:
cpp_write_json(options.c_obj, stream.view())
else:
cpp_write_json(options.c_obj)
6 changes: 4 additions & 2 deletions python/pylibcudf/pylibcudf/io/orc.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.
from libc.stdint cimport uint64_t, int64_t
from libcpp cimport bool
from libcpp.optional cimport optional
Expand Down Expand Up @@ -33,6 +33,8 @@ from pylibcudf.libcudf.io.types cimport (
compression_type,
statistics_freq,
)
from rmm.pylibrmm.stream cimport Stream


cdef class OrcReaderOptions:
cdef orc_reader_options c_obj
Expand Down Expand Up @@ -93,7 +95,7 @@ cdef class OrcWriterOptionsBuilder:
cpdef OrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef OrcWriterOptions build(self)

cpdef void write_orc(OrcWriterOptions options)
cpdef void write_orc(OrcWriterOptions options, Stream stream = *)

cdef class OrcChunkedWriter:
cdef unique_ptr[orc_chunked_writer] c_obj
Expand Down
Loading
Loading