diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 15a4a270ce0..9ea39e692b6 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -26,6 +26,7 @@ #include #include +#include #include @@ -33,6 +34,7 @@ #include #include +#include #include namespace cudf { @@ -389,6 +391,86 @@ class user_datasource_wrapper : public datasource { datasource* const source; ///< A non-owning pointer to the user-implemented datasource }; +/** + * @brief Remote file source backed by KvikIO, which handles S3 filepaths seamlessly. + */ +class remote_file_source : public datasource { + static std::unique_ptr create_s3_endpoint(char const* filepath) + { + auto [bucket_name, bucket_object] = kvikio::S3Endpoint::parse_s3_url(filepath); + return std::make_unique(bucket_name, bucket_object); + } + + public: + explicit remote_file_source(char const* filepath) : _kvikio_file{create_s3_endpoint(filepath)} {} + + ~remote_file_source() override = default; + + [[nodiscard]] bool supports_device_read() const override { return true; } + + [[nodiscard]] bool is_device_read_preferred(size_t size) const override { return true; } + + [[nodiscard]] size_t size() const override { return _kvikio_file.nbytes(); } + + std::future device_read_async(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override + { + CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); + + auto const read_size = std::min(size, this->size() - offset); + return _kvikio_file.pread(dst, read_size, offset); + } + + size_t device_read(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override + { + return device_read_async(offset, size, dst, stream).get(); + } + + std::unique_ptr device_read(size_t offset, + size_t size, + rmm::cuda_stream_view stream) override + { + rmm::device_buffer out_data(size, stream); + size_t read = device_read(offset, size, reinterpret_cast(out_data.data()), stream); + out_data.resize(read, stream); + return datasource::buffer::create(std::move(out_data)); + } + + size_t host_read(size_t offset, size_t size, uint8_t* dst) override + { + auto const read_size = std::min(size, this->size() - offset); + return _kvikio_file.pread(dst, read_size, offset).get(); + } + + std::unique_ptr host_read(size_t offset, size_t size) override + { + auto const count = std::min(size, this->size() - offset); + std::vector h_data(count); + this->host_read(offset, count, h_data.data()); + return datasource::buffer::create(std::move(h_data)); + } + + /** + * @brief Is `url` referring to a remote file supported by KvikIO? + * + * For now, only S3 urls (urls starting with "s3://") are supported. + */ + static bool is_supported_remote_url(std::string const& url) + { + // Regular expression to match "s3://" + std::regex pattern{R"(^s3://)", std::regex_constants::icase}; + return std::regex_search(url, pattern); + } + + private: + kvikio::RemoteHandle _kvikio_file; +}; + } // namespace std::unique_ptr datasource::create(std::string const& filepath, @@ -403,8 +485,9 @@ std::unique_ptr datasource::create(std::string const& filepath, CUDF_FAIL("Invalid LIBCUDF_MMAP_ENABLED value: " + policy); }(); - - if (use_memory_mapping) { + if (remote_file_source::is_supported_remote_url(filepath)) { + return std::make_unique(filepath.c_str()); + } else if (use_memory_mapping) { return std::make_unique(filepath.c_str(), offset, max_size_estimate); } else { // `file_source` reads the file directly, without memory mapping diff --git a/python/cudf/cudf/options.py b/python/cudf/cudf/options.py index df7bbe22a61..e206c8bca08 100644 --- a/python/cudf/cudf/options.py +++ b/python/cudf/cudf/options.py @@ -351,6 +351,22 @@ def _integer_and_none_validator(val): _make_contains_validator([False, True]), ) +_register_option( + "kvikio_remote_io", + _env_get_bool("CUDF_KVIKIO_REMOTE_IO", False), + textwrap.dedent( + """ + Whether to use KvikIO's remote IO backend or not. + \tWARN: this is experimental and may be removed at any time + \twithout warning or deprecation period. + \tSet KVIKIO_NTHREADS (default is 8) to change the number of + \tconcurrent tcp connections, which is important for good performance. + \tValid values are True or False. Default is False. + """ + ), + _make_contains_validator([False, True]), +) + class option_context(ContextDecorator): """ diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 0958b68084d..afb82f75bcf 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -69,6 +69,7 @@ def s3_base(endpoint_ip, endpoint_port): # with an S3 endpoint on localhost endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/" + os.environ["AWS_ENDPOINT_URL"] = endpoint_uri server = ThreadedMotoServer(ip_address=endpoint_ip, port=endpoint_port) server.start() @@ -105,6 +106,15 @@ def s3_context(s3_base, bucket, files=None): pass +@pytest.fixture( + params=[True, False], + ids=["kvikio=ON", "kvikio=OFF"], +) +def kvikio_remote_io(request): + with cudf.option_context("kvikio_remote_io", request.param): + yield request.param + + @pytest.fixture def pdf(scope="module"): df = pd.DataFrame() @@ -193,6 +203,7 @@ def test_write_csv(s3_base, s3so, pdf, chunksize): def test_read_parquet( s3_base, s3so, + kvikio_remote_io, pdf, bytes_per_thread, columns, diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index d636f36f282..aecb7ae7c5c 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -16,6 +16,7 @@ import pandas as pd from fsspec.core import expand_paths_if_needed, get_fs_token_paths +import cudf from cudf.api.types import is_list_like from cudf.core._compat import PANDAS_LT_300 from cudf.utils.docutils import docfmt_partial @@ -1624,6 +1625,16 @@ def _maybe_expand_directories(paths, glob_pattern, fs): return expanded_paths +def _use_kvikio_remote_io(fs) -> bool: + """Whether `kvikio_remote_io` is enabled and `fs` refers to a S3 file""" + + try: + from s3fs.core import S3FileSystem + except ImportError: + return False + return cudf.get_option("kvikio_remote_io") and isinstance(fs, S3FileSystem) + + @doc_get_reader_filepath_or_buffer() def get_reader_filepath_or_buffer( path_or_data, @@ -1649,17 +1660,17 @@ def get_reader_filepath_or_buffer( ) ] if not input_sources: - raise ValueError("Empty input source list: {input_sources}.") + raise ValueError(f"Empty input source list: {input_sources}.") filepaths_or_buffers = [] string_paths = [isinstance(source, str) for source in input_sources] if any(string_paths): - # Sources are all strings. Thes strings are typically + # Sources are all strings. The strings are typically # file paths, but they may also be raw text strings. # Don't allow a mix of source types if not all(string_paths): - raise ValueError("Invalid input source list: {input_sources}.") + raise ValueError(f"Invalid input source list: {input_sources}.") # Make sure we define a filesystem (if possible) paths = input_sources @@ -1712,11 +1723,17 @@ def get_reader_filepath_or_buffer( raise FileNotFoundError( f"{input_sources} could not be resolved to any files" ) - filepaths_or_buffers = _prefetch_remote_buffers( - paths, - fs, - **(prefetch_options or {}), - ) + + # If `kvikio_remote_io` is enabled and `fs` refers to a S3 file, + # we create S3 URLs and let them pass-through to libcudf. + if _use_kvikio_remote_io(fs): + filepaths_or_buffers = [f"s3://{fpath}" for fpath in paths] + else: + filepaths_or_buffers = _prefetch_remote_buffers( + paths, + fs, + **(prefetch_options or {}), + ) else: raw_text_input = True diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 967d05e7057..c129903f8f1 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -20,6 +20,7 @@ import codecs import errno import io import os +import re from pylibcudf.libcudf.io.json import \ json_recovery_mode_t as JSONRecoveryMode # no-cython-lint @@ -147,6 +148,8 @@ cdef class SourceInfo: Mixing different types of sources will raise a `ValueError`. """ + # Regular expression that match remote file paths supported by libcudf + _is_remote_file_pattern = re.compile(r"^s3://", re.IGNORECASE) def __init__(self, list sources): if not sources: @@ -161,11 +164,10 @@ cdef class SourceInfo: for src in sources: if not isinstance(src, (os.PathLike, str)): raise ValueError("All sources must be of the same type!") - if not os.path.isfile(src): - raise FileNotFoundError(errno.ENOENT, - os.strerror(errno.ENOENT), - src) - + if not (os.path.isfile(src) or self._is_remote_file_pattern.match(src)): + raise FileNotFoundError( + errno.ENOENT, os.strerror(errno.ENOENT), src + ) c_files.push_back( str(src).encode()) self.c_obj = move(source_info(c_files))