From 167c5d29481f6c8225456d377183b0a5fd07de97 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 2 Oct 2024 09:22:48 +0200 Subject: [PATCH 1/8] remote_file_source --- cpp/src/io/utilities/datasource.cpp | 85 ++++++++++++++++++++++++- python/cudf/cudf/io/parquet.py | 1 + python/cudf/cudf/options.py | 13 ++++ python/cudf/cudf/tests/test_s3.py | 11 ++++ python/cudf/cudf/utils/ioutils.py | 27 +++++--- python/pylibcudf/pylibcudf/io/types.pyx | 12 ++-- 6 files changed, 134 insertions(+), 15 deletions(-) diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 2daaecadca6..4c3ca56c887 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -25,6 +25,7 @@ #include #include +#include #include @@ -32,6 +33,7 @@ #include #include +#include #include #include @@ -387,6 +389,84 @@ class user_datasource_wrapper : public datasource { datasource* const source; ///< A non-owning pointer to the user-implemented datasource }; +/** + * @brief Remote file source backed by KvikIO, which handles S3 filepaths seamlessly. + */ +class remote_file_source : public datasource { + static std::unique_ptr create_s3_endpoint(char const* filepath) + { + auto [bucket_name, bucket_object] = kvikio::S3Endpoint::parse_s3_url(filepath); + return std::make_unique(bucket_name, bucket_object); + } + + public: + explicit remote_file_source(char const* filepath) : _kvikio_file{create_s3_endpoint(filepath)} {} + + ~remote_file_source() override = default; + + [[nodiscard]] bool supports_device_read() const override { return true; } + + [[nodiscard]] bool is_device_read_preferred(size_t size) const override { return true; } + + [[nodiscard]] size_t size() const override { return _kvikio_file.nbytes(); } + + std::future device_read_async(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override + { + CUDF_EXPECTS(supports_device_read(), "Remote reads are not supported for this file."); + + auto const read_size = std::min(size, this->size() - offset); + return _kvikio_file.pread(dst, read_size, offset); + } + + size_t device_read(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override + { + return device_read_async(offset, size, dst, stream).get(); + } + + std::unique_ptr device_read(size_t offset, + size_t size, + rmm::cuda_stream_view stream) override + { + rmm::device_buffer out_data(size, stream); + size_t read = device_read(offset, size, reinterpret_cast(out_data.data()), stream); + out_data.resize(read, stream); + return datasource::buffer::create(std::move(out_data)); + } + + size_t host_read(size_t offset, size_t size, uint8_t* dst) override + { + auto const read_size = std::min(size, this->size() - offset); + return _kvikio_file.pread(dst, read_size, offset).get(); + } + + std::unique_ptr host_read(size_t offset, size_t size) override + { + auto const count = std::min(size, this->size() - offset); + std::vector h_data(count); + this->host_read(offset, count, h_data.data()); + return datasource::buffer::create(std::move(h_data)); + } + + /** + * @brief Is `url` referring to a remote file using a protocol KvikIO support? + */ + static bool is_supported_remote_url(std::string const& url) + { + // Regular expression to match "://" + std::regex pattern{R"(^(s3|http|https)://)", std::regex_constants::icase}; + return std::regex_search(url, pattern); + } + + private: + kvikio::RemoteHandle _kvikio_file; +}; + } // namespace std::unique_ptr datasource::create(std::string const& filepath, @@ -401,8 +481,9 @@ std::unique_ptr datasource::create(std::string const& filepath, CUDF_FAIL("Invalid LIBCUDF_MMAP_ENABLED value: " + policy); }(); - - if (use_memory_mapping) { + if (remote_file_source::is_supported_remote_url(filepath)) { + return std::make_unique(filepath.c_str()); + } else if (use_memory_mapping) { return std::make_unique(filepath.c_str(), offset, max_size_estimate); } else { // `file_source` reads the file directly, without memory mapping diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index ce99f98b559..5dbd38c7e59 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -622,6 +622,7 @@ def read_parquet( storage_options=storage_options, bytes_per_thread=bytes_per_thread, prefetch_options=prefetch_options, + libcudf_s3_io=cudf.get_option("libcudf_s3_io"), ) # Warn user if they are not using cudf for IO diff --git a/python/cudf/cudf/options.py b/python/cudf/cudf/options.py index df7bbe22a61..89164f56238 100644 --- a/python/cudf/cudf/options.py +++ b/python/cudf/cudf/options.py @@ -351,6 +351,19 @@ def _integer_and_none_validator(val): _make_contains_validator([False, True]), ) +_register_option( + "libcudf_s3_io", + _env_get_bool("CUDF_LIBCUDF_S3_IO", False), + textwrap.dedent( + """ + Whether to use libcudf's native S3 backend or not. + \tWARN: this is experimental and only works for parquet_read(). + \tValid values are True or False. Default is False. + """ + ), + _make_contains_validator([False, True]), +) + class option_context(ContextDecorator): """ diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 0958b68084d..1680391bca5 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -69,6 +69,7 @@ def s3_base(endpoint_ip, endpoint_port): # with an S3 endpoint on localhost endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/" + os.environ["AWS_ENDPOINT_URL"] = endpoint_uri server = ThreadedMotoServer(ip_address=endpoint_ip, port=endpoint_port) server.start() @@ -105,6 +106,15 @@ def s3_context(s3_base, bucket, files=None): pass +@pytest.fixture( + params=[True, False], + ids=["libcudfS3=ON", "libcudfS3=OFF"], +) +def libcudf_s3_io(request): + with cudf.option_context("libcudf_s3_io", request.param): + yield request.param + + @pytest.fixture def pdf(scope="module"): df = pd.DataFrame() @@ -193,6 +203,7 @@ def test_write_csv(s3_base, s3so, pdf, chunksize): def test_read_parquet( s3_base, s3so, + libcudf_s3_io, pdf, bytes_per_thread, columns, diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index d636f36f282..4e5c866c49c 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1456,6 +1456,11 @@ paths. If 'method' is set to 'all' (the default), the only supported option is 'blocksize' (default 256 MB). If method is set to 'parquet', 'columns' and 'row_groups' are also supported (default None). +libcudf_s3_io : bool, default False + WARNING: This is an experimental feature and may be removed at any + time without warning or deprecation period. + Use libcudf's native S3 backend, if applicable, by preserving S3 file + paths such as "s3://my-bucket/my-object". Returns ------- @@ -1638,6 +1643,7 @@ def get_reader_filepath_or_buffer( warn_meta=None, expand_dir_pattern=None, prefetch_options=None, + libcudf_s3_io=False, ): """{docstring}""" @@ -1649,17 +1655,17 @@ def get_reader_filepath_or_buffer( ) ] if not input_sources: - raise ValueError("Empty input source list: {input_sources}.") + raise ValueError(f"Empty input source list: {input_sources}.") filepaths_or_buffers = [] string_paths = [isinstance(source, str) for source in input_sources] if any(string_paths): - # Sources are all strings. Thes strings are typically + # Sources are all strings. The strings are typically # file paths, but they may also be raw text strings. # Don't allow a mix of source types if not all(string_paths): - raise ValueError("Invalid input source list: {input_sources}.") + raise ValueError(f"Invalid input source list: {input_sources}.") # Make sure we define a filesystem (if possible) paths = input_sources @@ -1712,11 +1718,16 @@ def get_reader_filepath_or_buffer( raise FileNotFoundError( f"{input_sources} could not be resolved to any files" ) - filepaths_or_buffers = _prefetch_remote_buffers( - paths, - fs, - **(prefetch_options or {}), - ) + from s3fs.core import S3FileSystem + + if libcudf_s3_io and isinstance(fs, S3FileSystem): + filepaths_or_buffers = [f"s3://{fpath}" for fpath in paths] + else: + filepaths_or_buffers = _prefetch_remote_buffers( + paths, + fs, + **(prefetch_options or {}), + ) else: raw_text_input = True diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 563a02761da..2ff57535301 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -20,6 +20,7 @@ import codecs import errno import io import os +import re from pylibcudf.libcudf.io.json import \ json_recovery_mode_t as JSONRecoveryMode # no-cython-lint @@ -143,6 +144,8 @@ cdef class SourceInfo: Mixing different types of sources will raise a `ValueError`. """ + # Regular expression to match remote files + _is_remote_file_pattern = re.compile(r"(s3|http|https)://", re.IGNORECASE) def __init__(self, list sources): if not sources: @@ -157,11 +160,10 @@ cdef class SourceInfo: for src in sources: if not isinstance(src, (os.PathLike, str)): raise ValueError("All sources must be of the same type!") - if not os.path.isfile(src): - raise FileNotFoundError(errno.ENOENT, - os.strerror(errno.ENOENT), - src) - + if not (os.path.isfile(src) or self._is_remote_file_pattern.match(src)): + raise FileNotFoundError( + errno.ENOENT, os.strerror(errno.ENOENT), src + ) c_files.push_back( str(src).encode()) self.c_obj = move(source_info(c_files)) From 216b4ac03c688aa688f90d701f3e2565ba6ba99c Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 23 Oct 2024 10:37:56 +0200 Subject: [PATCH 2/8] rename to kvikio_remote_io --- python/cudf/cudf/io/parquet.py | 2 +- python/cudf/cudf/options.py | 8 ++++---- python/cudf/cudf/tests/test_s3.py | 8 ++++---- python/cudf/cudf/utils/ioutils.py | 10 +++++----- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 5dbd38c7e59..0c01b4c7e19 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -622,7 +622,7 @@ def read_parquet( storage_options=storage_options, bytes_per_thread=bytes_per_thread, prefetch_options=prefetch_options, - libcudf_s3_io=cudf.get_option("libcudf_s3_io"), + kvikio_remote_io=cudf.get_option("kvikio_remote_io"), ) # Warn user if they are not using cudf for IO diff --git a/python/cudf/cudf/options.py b/python/cudf/cudf/options.py index 89164f56238..b83731d9f0a 100644 --- a/python/cudf/cudf/options.py +++ b/python/cudf/cudf/options.py @@ -352,12 +352,12 @@ def _integer_and_none_validator(val): ) _register_option( - "libcudf_s3_io", - _env_get_bool("CUDF_LIBCUDF_S3_IO", False), + "kvikio_remote_io", + _env_get_bool("CUDF_KVIKIO_REMOTE_IO", False), textwrap.dedent( """ - Whether to use libcudf's native S3 backend or not. - \tWARN: this is experimental and only works for parquet_read(). + Whether to use KvikIO's remote IO backend or not. + \tWARN: this is experimental and is implemented for parquet_read(). \tValid values are True or False. Default is False. """ ), diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 1680391bca5..afb82f75bcf 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -108,10 +108,10 @@ def s3_context(s3_base, bucket, files=None): @pytest.fixture( params=[True, False], - ids=["libcudfS3=ON", "libcudfS3=OFF"], + ids=["kvikio=ON", "kvikio=OFF"], ) -def libcudf_s3_io(request): - with cudf.option_context("libcudf_s3_io", request.param): +def kvikio_remote_io(request): + with cudf.option_context("kvikio_remote_io", request.param): yield request.param @@ -203,7 +203,7 @@ def test_write_csv(s3_base, s3so, pdf, chunksize): def test_read_parquet( s3_base, s3so, - libcudf_s3_io, + kvikio_remote_io, pdf, bytes_per_thread, columns, diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 4e5c866c49c..4cafbe07a4c 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1456,11 +1456,11 @@ paths. If 'method' is set to 'all' (the default), the only supported option is 'blocksize' (default 256 MB). If method is set to 'parquet', 'columns' and 'row_groups' are also supported (default None). -libcudf_s3_io : bool, default False +kvikio_remote_io : bool, default False WARNING: This is an experimental feature and may be removed at any time without warning or deprecation period. - Use libcudf's native S3 backend, if applicable, by preserving S3 file - paths such as "s3://my-bucket/my-object". + Use KvikIO's remote IO backend, if applicable, by pass-through remote + file paths such as "s3://my-bucket/my-object" to libcudf as-it. Returns ------- @@ -1643,7 +1643,7 @@ def get_reader_filepath_or_buffer( warn_meta=None, expand_dir_pattern=None, prefetch_options=None, - libcudf_s3_io=False, + kvikio_remote_io=False, ): """{docstring}""" @@ -1720,7 +1720,7 @@ def get_reader_filepath_or_buffer( ) from s3fs.core import S3FileSystem - if libcudf_s3_io and isinstance(fs, S3FileSystem): + if kvikio_remote_io and isinstance(fs, S3FileSystem): filepaths_or_buffers = [f"s3://{fpath}" for fpath in paths] else: filepaths_or_buffers = _prefetch_remote_buffers( From d73abae91286e6ed29d645e234f1de90767acc3c Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 24 Oct 2024 09:43:57 +0200 Subject: [PATCH 3/8] _use_kvikio_remote_io --- python/cudf/cudf/io/parquet.py | 1 - python/cudf/cudf/options.py | 3 ++- python/cudf/cudf/utils/ioutils.py | 22 ++++++++++++++-------- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 0c01b4c7e19..ce99f98b559 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -622,7 +622,6 @@ def read_parquet( storage_options=storage_options, bytes_per_thread=bytes_per_thread, prefetch_options=prefetch_options, - kvikio_remote_io=cudf.get_option("kvikio_remote_io"), ) # Warn user if they are not using cudf for IO diff --git a/python/cudf/cudf/options.py b/python/cudf/cudf/options.py index b83731d9f0a..aa0ee20e58a 100644 --- a/python/cudf/cudf/options.py +++ b/python/cudf/cudf/options.py @@ -357,7 +357,8 @@ def _integer_and_none_validator(val): textwrap.dedent( """ Whether to use KvikIO's remote IO backend or not. - \tWARN: this is experimental and is implemented for parquet_read(). + \tWARN: this is experimental and may be removed at any time + \twithout warning or deprecation period. \tValid values are True or False. Default is False. """ ), diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 4cafbe07a4c..aecb7ae7c5c 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -16,6 +16,7 @@ import pandas as pd from fsspec.core import expand_paths_if_needed, get_fs_token_paths +import cudf from cudf.api.types import is_list_like from cudf.core._compat import PANDAS_LT_300 from cudf.utils.docutils import docfmt_partial @@ -1456,11 +1457,6 @@ paths. If 'method' is set to 'all' (the default), the only supported option is 'blocksize' (default 256 MB). If method is set to 'parquet', 'columns' and 'row_groups' are also supported (default None). -kvikio_remote_io : bool, default False - WARNING: This is an experimental feature and may be removed at any - time without warning or deprecation period. - Use KvikIO's remote IO backend, if applicable, by pass-through remote - file paths such as "s3://my-bucket/my-object" to libcudf as-it. Returns ------- @@ -1629,6 +1625,16 @@ def _maybe_expand_directories(paths, glob_pattern, fs): return expanded_paths +def _use_kvikio_remote_io(fs) -> bool: + """Whether `kvikio_remote_io` is enabled and `fs` refers to a S3 file""" + + try: + from s3fs.core import S3FileSystem + except ImportError: + return False + return cudf.get_option("kvikio_remote_io") and isinstance(fs, S3FileSystem) + + @doc_get_reader_filepath_or_buffer() def get_reader_filepath_or_buffer( path_or_data, @@ -1643,7 +1649,6 @@ def get_reader_filepath_or_buffer( warn_meta=None, expand_dir_pattern=None, prefetch_options=None, - kvikio_remote_io=False, ): """{docstring}""" @@ -1718,9 +1723,10 @@ def get_reader_filepath_or_buffer( raise FileNotFoundError( f"{input_sources} could not be resolved to any files" ) - from s3fs.core import S3FileSystem - if kvikio_remote_io and isinstance(fs, S3FileSystem): + # If `kvikio_remote_io` is enabled and `fs` refers to a S3 file, + # we create S3 URLs and let them pass-through to libcudf. + if _use_kvikio_remote_io(fs): filepaths_or_buffers = [f"s3://{fpath}" for fpath in paths] else: filepaths_or_buffers = _prefetch_remote_buffers( From 4a1c4e88e56765fcecdc4ec3ff82c5134f94b072 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 24 Oct 2024 09:46:04 +0200 Subject: [PATCH 4/8] doc --- cpp/src/io/utilities/datasource.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index e597a6a6f9b..5b28a8c6cbc 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -457,7 +457,7 @@ class remote_file_source : public datasource { } /** - * @brief Is `url` referring to a remote file using a protocol KvikIO support? + * @brief Is `url` referring to a remote file supported by KvikIO? */ static bool is_supported_remote_url(std::string const& url) { From 70d60a417d10b171acb7eba73440efc3cb8a66bf Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 25 Oct 2024 08:12:25 +0200 Subject: [PATCH 5/8] doc --- python/cudf/cudf/options.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/cudf/cudf/options.py b/python/cudf/cudf/options.py index aa0ee20e58a..e206c8bca08 100644 --- a/python/cudf/cudf/options.py +++ b/python/cudf/cudf/options.py @@ -359,6 +359,8 @@ def _integer_and_none_validator(val): Whether to use KvikIO's remote IO backend or not. \tWARN: this is experimental and may be removed at any time \twithout warning or deprecation period. + \tSet KVIKIO_NTHREADS (default is 8) to change the number of + \tconcurrent tcp connections, which is important for good performance. \tValid values are True or False. Default is False. """ ), From 1e48dc05072a077fc56643210dff11907095de38 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 28 Oct 2024 11:17:39 +0100 Subject: [PATCH 6/8] Apply suggestions from code review Co-authored-by: Vukasin Milovanovic --- cpp/src/io/utilities/datasource.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 5b28a8c6cbc..39205ade13e 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -418,7 +418,7 @@ class remote_file_source : public datasource { uint8_t* dst, rmm::cuda_stream_view stream) override { - CUDF_EXPECTS(supports_device_read(), "Remote reads are not supported for this file."); + CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); auto const read_size = std::min(size, this->size() - offset); return _kvikio_file.pread(dst, read_size, offset); From e6505c5689b21368386324b0b40bc7f911b73fbe Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 28 Oct 2024 12:46:30 +0100 Subject: [PATCH 7/8] For now, only S3 urls are supported --- cpp/src/io/utilities/datasource.cpp | 6 ++++-- python/pylibcudf/pylibcudf/io/types.pyx | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 39205ade13e..de5b745fc65 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -458,11 +458,13 @@ class remote_file_source : public datasource { /** * @brief Is `url` referring to a remote file supported by KvikIO? + * + * For now, only S3 urls (urls starting with "s3://") are supported. */ static bool is_supported_remote_url(std::string const& url) { - // Regular expression to match "://" - std::regex pattern{R"(^(s3|http|https)://)", std::regex_constants::icase}; + // Regular expression to match "s3://" + std::regex pattern{R"(^s3://)", std::regex_constants::icase}; return std::regex_search(url, pattern); } diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 2ff57535301..6e9cf5a1baf 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -144,8 +144,8 @@ cdef class SourceInfo: Mixing different types of sources will raise a `ValueError`. """ - # Regular expression to match remote files - _is_remote_file_pattern = re.compile(r"(s3|http|https)://", re.IGNORECASE) + # Regular expression that match remote file paths supported by libcudf + _is_remote_file_pattern = re.compile(r"s3://", re.IGNORECASE) def __init__(self, list sources): if not sources: From 80cbb65faaebd38f39f14d6075240cc7443d524b Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 28 Oct 2024 12:48:17 +0100 Subject: [PATCH 8/8] ^ --- python/pylibcudf/pylibcudf/io/types.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 6e9cf5a1baf..86aa684b518 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -145,7 +145,7 @@ cdef class SourceInfo: Mixing different types of sources will raise a `ValueError`. """ # Regular expression that match remote file paths supported by libcudf - _is_remote_file_pattern = re.compile(r"s3://", re.IGNORECASE) + _is_remote_file_pattern = re.compile(r"^s3://", re.IGNORECASE) def __init__(self, list sources): if not sources: