From 764ce42e36cd2bd0f56ff14569f9e64659eb9b3e Mon Sep 17 00:00:00 2001 From: Dean Jackson <57651082+deanja@users.noreply.github.com> Date: Mon, 5 Feb 2024 14:29:39 +1100 Subject: [PATCH 1/5] support gitpythonfs fsspec implementation - also introduces support for passing keyword argument to fsspec. gitpythonfs is the first use case where the kwargs are needed. - requires future version of dlt that supports gitpython and dynamic registration of fssepc implementations. - requires manual creation of a git repo for testing gitpythonfs - if kwargs are used to construct filesystem instances then FileItemDict must be instantiated with an existing instance of AbstractFileSystem. Instantiating FileItemDict with FileSystemCredentials will omit the fs kwargs and cause unexpected behaviour. --- sources/filesystem/__init__.py | 31 ++++++-- tests/filesystem/cases/GIT-SETUP.md | 59 ++++++++++++++++ tests/filesystem/settings.py | 17 +++-- tests/filesystem/test_filesystem.py | 106 +++++++++++++++++++++------- tests/filesystem/utils.py | 6 ++ 5 files changed, 180 insertions(+), 39 deletions(-) create mode 100644 tests/filesystem/cases/GIT-SETUP.md create mode 100644 tests/filesystem/utils.py diff --git a/sources/filesystem/__init__.py b/sources/filesystem/__init__.py index c6452543e..d41ccc35d 100644 --- a/sources/filesystem/__init__.py +++ b/sources/filesystem/__init__.py @@ -2,7 +2,7 @@ from typing import Iterator, List, Optional, Tuple, Union import dlt -from dlt.common.typing import copy_sig +from dlt.common.typing import copy_sig, DictStrAny from dlt.sources import DltResource from dlt.sources.filesystem import FileItem, FileItemDict, fsspec_filesystem, glob_files from dlt.sources.credentials import FileSystemCredentials @@ -26,6 +26,7 @@ def readers( bucket_url: str = dlt.secrets.value, credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value, file_glob: Optional[str] = "*", + kwargs: Optional[DictStrAny] = None, ) -> Tuple[DltResource, ...]: """This source provides a few resources that are chunked file readers. Readers can be further parametrized before use read_csv(chunksize, **pandas_kwargs) @@ -38,13 +39,28 @@ def readers( file_glob (str, optional): The filter to apply to the files in glob format. by default lists all files in bucket_url non-recursively """ return ( - filesystem(bucket_url, credentials, file_glob=file_glob) + filesystem( + bucket_url, + credentials, + file_glob=file_glob, + kwargs=kwargs, + ) | dlt.transformer(name="read_csv")(_read_csv), - filesystem(bucket_url, credentials, file_glob=file_glob) + filesystem( + bucket_url, + credentials, + file_glob=file_glob, + kwargs=kwargs, + ) | dlt.transformer(name="read_jsonl")(_read_jsonl), - filesystem(bucket_url, credentials, file_glob=file_glob) + filesystem( + bucket_url, + credentials, + file_glob=file_glob, + kwargs=kwargs, + ) | dlt.transformer(name="read_parquet")(_read_parquet), - filesystem(bucket_url, credentials, file_glob=file_glob) + filesystem(bucket_url, credentials, file_glob=file_glob, kwargs=kwargs) | dlt.transformer(name="read_csv_duckdb")(_read_csv_duckdb), ) @@ -58,6 +74,7 @@ def filesystem( file_glob: Optional[str] = "*", files_per_page: int = DEFAULT_CHUNK_SIZE, extract_content: bool = False, + kwargs: Optional[DictStrAny] = None, ) -> Iterator[List[FileItem]]: """This resource lists files in `bucket_url` using `file_glob` pattern. The files are yielded as FileItem which also provide methods to open and read file data. It should be combined with transformers that further process (ie. load files) @@ -76,11 +93,11 @@ def filesystem( if isinstance(credentials, AbstractFileSystem): fs_client = credentials else: - fs_client = fsspec_filesystem(bucket_url, credentials)[0] + fs_client = fsspec_filesystem(bucket_url, credentials, kwargs=kwargs)[0] files_chunk: List[FileItem] = [] for file_model in glob_files(fs_client, bucket_url, file_glob): - file_dict = FileItemDict(file_model, credentials) + file_dict = FileItemDict(file_model, fs_client) if extract_content: file_dict["file_content"] = file_dict.read_bytes() files_chunk.append(file_dict) # type: ignore diff --git a/tests/filesystem/cases/GIT-SETUP.md b/tests/filesystem/cases/GIT-SETUP.md new file mode 100644 index 000000000..0bdb7dbf1 --- /dev/null +++ b/tests/filesystem/cases/GIT-SETUP.md @@ -0,0 +1,59 @@ +# Git repo for testing + +The `./git`folder contains a bare repo used for running tests for the `filesystem` dlt Source. + +# Usage + +For example, use it to test a pipeline that reads files using the `gitpythonfs` fsspec implementation. + +The repo is not needed for regular use of dlt. + +For the tests to pass, use the tag (aka `ref`) called `unmodified-samples`. Using HEAD (the default) is intended to fail tests due to modifications such as a file not having the expected file name. It allows testing of the `ref` functionality of git-based fsspec implementations. + +Some features of the repo are intentionally different to the containing repo (eg verified-sources repo) to help prevent mistakenly testing against (or modifying!) the wrong repo: + +- The default branch is `cases-master` +- the sample files root folder is `samples`, not `tests`. + +# Configuration + +When to configure (build?): +- When setting up an environment - CI, local dev etc. (Unless it's now committed in verified-sources repo) +- After modifying any content in `../samples folder` + +Ideally the repo will be created idempotently by a pytest fixture, `make` script or similar. Until then, these are the manual steps to idempotently create/recreate: + +1. Set working directory to `tests/filesystem/cases/git` +2. Check the current folder contains only `.git`. eg `ls -a`. It's also ok if +the current folder is empty. +3. Delete `.git` and all subfolders. ie, delete the repo. `rm -rf` +4. Make a fresh repo using: + +``` +git init +git checkout -b cases-master +``` + +5. Copy in the folder `../../samples`. ie samples folder and all its contents. eg `cp -r ../../samples .` +6. Put some object in the repo: + +``` +git add --all +git commit -m "add standard sample files for tests" +git tag -a unmodified-samples -m "The sample test files with no modifications" +git mv samples/sample.txt samples/sample_renamed.txt +git commit -m "rename samples.txt to make tests fail" +``` + +5. Delete all working files, except `.git`. eg with `rm -rf samples`. (ToDo: that's not officially not a bare repo. Use `git clone --bare path/to/repo` instead. Maybe we create the repo in a temp folder and then bare clone it into `cases/git` folder, discard the temp folder.) + +# Developing + +Note that at least one IDE - VSCode - does not recognise this repo in the Explorer and Source Control tabs. Likely because it is a repo inside another repo. + +If you are considering committing the repo to its containing repo - eg the verified-sources repo - consider the effects on the size of the containing repo: + +`du -sh ./.git/*` + +That's about the same as the samples folder itself. BUT consider that the largest file, `./.git/objects` might change often if the repo is regenerated. So it might be better to ensure the repo is gitignored (so meta!) and "build" it in each environment where needed. + diff --git a/tests/filesystem/settings.py b/tests/filesystem/settings.py index a442052b3..baafd9dfb 100644 --- a/tests/filesystem/settings.py +++ b/tests/filesystem/settings.py @@ -1,10 +1,17 @@ import os -TESTS_BUCKET_URLS = [ - os.path.abspath("tests/filesystem/samples"), - "s3://dlt-ci-test-bucket/standard_source/samples", - "gs://ci-test-bucket/standard_source/samples", - "az://dlt-ci-test-bucket/standard_source/samples", +FACTORY_ARGS = [ + {"bucket_url": os.path.abspath("tests/filesystem/samples")}, + {"bucket_url": "s3://dlt-ci-test-bucket/standard_source/samples"}, + {"bucket_url": "gs://ci-test-bucket/standard_source/samples"}, + {"bucket_url": "az://dlt-ci-test-bucket/standard_source/samples"}, + { + "bucket_url": "gitpythonfs://samples", + "kwargs": { + "repo_path": "tests/filesystem/cases/git", + "ref": "unmodified-samples", + }, + } ] GLOB_RESULTS = [ diff --git a/tests/filesystem/test_filesystem.py b/tests/filesystem/test_filesystem.py index 550f4c29d..b3f96b760 100644 --- a/tests/filesystem/test_filesystem.py +++ b/tests/filesystem/test_filesystem.py @@ -18,22 +18,32 @@ assert_query_data, TEST_STORAGE_ROOT, ) +from tests.filesystem.utils import unpack_factory_args -from .settings import GLOB_RESULTS, TESTS_BUCKET_URLS +from .settings import GLOB_RESULTS, FACTORY_ARGS -@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS) +@pytest.mark.parametrize("factory_args", FACTORY_ARGS) @pytest.mark.parametrize("glob_params", GLOB_RESULTS) -def test_file_list(bucket_url: str, glob_params: Dict[str, Any]) -> None: +def test_file_list(factory_args: Dict[str, Any], glob_params: Dict[str, Any]) -> None: + bucket_url, kwargs = unpack_factory_args(factory_args) + @dlt.transformer def bypass(items) -> str: return items - # we just pass the glob parameter to the resource if it is not None + # we only pass the glob parameter to the resource if it is not None if file_glob := glob_params["glob"]: - filesystem_res = filesystem(bucket_url=bucket_url, file_glob=file_glob) | bypass + filesystem_res = ( + filesystem( + bucket_url=bucket_url, + file_glob=file_glob, + kwargs=kwargs, + ) + | bypass + ) else: - filesystem_res = filesystem(bucket_url=bucket_url) | bypass + filesystem_res = filesystem(bucket_url=bucket_url, kwargs=kwargs) | bypass all_files = list(filesystem_res) file_count = len(all_files) @@ -43,8 +53,12 @@ def bypass(items) -> str: @pytest.mark.parametrize("extract_content", [True, False]) -@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS) -def test_load_content_resources(bucket_url: str, extract_content: bool) -> None: +@pytest.mark.parametrize("factory_args", FACTORY_ARGS) +def test_load_content_resources( + factory_args: Dict[str, Any], extract_content: bool +) -> None: + bucket_url, kwargs = unpack_factory_args(factory_args) + @dlt.transformer def assert_sample_content(items: List[FileItem]): # expect just one file @@ -65,6 +79,7 @@ def assert_sample_content(items: List[FileItem]): bucket_url=bucket_url, file_glob="sample.txt", extract_content=extract_content, + kwargs=kwargs, ) | assert_sample_content ) @@ -83,7 +98,11 @@ def assert_csv_file(item: FileItem): # print(item) return item - nested_file = filesystem(bucket_url, file_glob="met_csv/A801/A881_20230920.csv") + nested_file = filesystem( + bucket_url, + file_glob="met_csv/A801/A881_20230920.csv", + kwargs=kwargs, + ) assert len(list(nested_file | assert_csv_file)) == 1 @@ -101,10 +120,12 @@ def test_fsspec_as_credentials(): print(list(gs_resource)) -@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS) -def test_csv_transformers(bucket_url: str) -> None: +@pytest.mark.parametrize("factory_args", FACTORY_ARGS) +def test_csv_transformers(factory_args: Dict[str, Any]) -> None: from sources.filesystem_pipeline import read_csv + bucket_url, kwargs = unpack_factory_args(factory_args) + pipeline = dlt.pipeline( pipeline_name="file_data", destination="duckdb", @@ -114,7 +135,12 @@ def test_csv_transformers(bucket_url: str) -> None: # load all csvs merging data on a date column met_files = ( - filesystem(bucket_url=bucket_url, file_glob="met_csv/A801/*.csv") | read_csv() + filesystem( + bucket_url=bucket_url, + file_glob="met_csv/A801/*.csv", + kwargs=kwargs, + ) + | read_csv() ) met_files.apply_hints(write_disposition="merge", merge_key="date") load_info = pipeline.run(met_files.with_name("met_csv")) @@ -126,7 +152,12 @@ def test_csv_transformers(bucket_url: str) -> None: # load the other folder that contains data for the same day + one other day # the previous data will be replaced met_files = ( - filesystem(bucket_url=bucket_url, file_glob="met_csv/A803/*.csv") | read_csv() + filesystem( + bucket_url=bucket_url, + file_glob="met_csv/A803/*.csv", + kwargs=kwargs, + ) + | read_csv() ) met_files.apply_hints(write_disposition="merge", merge_key="date") load_info = pipeline.run(met_files.with_name("met_csv")) @@ -139,16 +170,23 @@ def test_csv_transformers(bucket_url: str) -> None: assert load_table_counts(pipeline, "met_csv") == {"met_csv": 48} -@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS) -def test_standard_readers(bucket_url: str) -> None: +@pytest.mark.parametrize("factory_args", FACTORY_ARGS) +def test_standard_readers(factory_args: Dict[str, Any]) -> None: + bucket_url, kwargs = unpack_factory_args(factory_args) + # extract pipes with standard readers - jsonl_reader = readers(bucket_url, file_glob="**/*.jsonl").read_jsonl() - parquet_reader = readers(bucket_url, file_glob="**/*.parquet").read_parquet() - # also read zipped csvs - csv_reader = readers(bucket_url, file_glob="**/*.csv*").read_csv( + jsonl_reader = readers( + bucket_url, file_glob="**/*.jsonl", kwargs=kwargs + ).read_jsonl() + parquet_reader = readers( + bucket_url, file_glob="**/*.parquet", kwargs=kwargs + ).read_parquet() + csv_reader = readers(bucket_url, file_glob="**/*.csv*", kwargs=kwargs).read_csv( float_precision="high" ) - csv_duckdb_reader = readers(bucket_url, file_glob="**/*.csv*").read_csv_duckdb() + csv_duckdb_reader = readers( + bucket_url, file_glob="**/*.csv*", kwargs=kwargs + ).read_csv_duckdb() # a step that copies files into test storage def _copy(item: FileItemDict): @@ -161,7 +199,7 @@ def _copy(item: FileItemDict): # return file item unchanged return item - downloader = filesystem(bucket_url, file_glob="**").add_map(_copy) + downloader = filesystem(bucket_url, file_glob="**", kwargs=kwargs).add_map(_copy) # load in single pipeline pipeline = dlt.pipeline( @@ -200,12 +238,14 @@ def _copy(item: FileItemDict): # print(pipeline.default_schema.to_pretty_yaml()) -@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS) -def test_incremental_load(bucket_url: str) -> None: +@pytest.mark.parametrize("factory_args", FACTORY_ARGS) +def test_incremental_load(factory_args: Dict[str, Any]) -> None: @dlt.transformer def bypass(items) -> str: return items + bucket_url, kwargs = unpack_factory_args(factory_args) + pipeline = dlt.pipeline( pipeline_name="file_data", destination="duckdb", @@ -214,7 +254,11 @@ def bypass(items) -> str: ) # Load all files - all_files = filesystem(bucket_url=bucket_url, file_glob="csv/*") + all_files = filesystem( + bucket_url=bucket_url, + file_glob="csv/*", + kwargs=kwargs, + ) # add incremental on modification time all_files.apply_hints(incremental=dlt.sources.incremental("modification_date")) load_info = pipeline.run((all_files | bypass).with_name("csv_files")) @@ -225,7 +269,11 @@ def bypass(items) -> str: assert table_counts["csv_files"] == 4 # load again - all_files = filesystem(bucket_url=bucket_url, file_glob="csv/*") + all_files = filesystem( + bucket_url=bucket_url, + file_glob="csv/*", + kwargs=kwargs, + ) all_files.apply_hints(incremental=dlt.sources.incremental("modification_date")) load_info = pipeline.run((all_files | bypass).with_name("csv_files")) # nothing into csv_files @@ -234,7 +282,11 @@ def bypass(items) -> str: assert table_counts["csv_files"] == 4 # load again into different table - all_files = filesystem(bucket_url=bucket_url, file_glob="csv/*") + all_files = filesystem( + bucket_url=bucket_url, + file_glob="csv/*", + kwargs=kwargs, + ) all_files.apply_hints(incremental=dlt.sources.incremental("modification_date")) load_info = pipeline.run((all_files | bypass).with_name("csv_files_2")) assert_load_info(load_info) @@ -243,7 +295,7 @@ def bypass(items) -> str: def test_file_chunking() -> None: resource = filesystem( - bucket_url=TESTS_BUCKET_URLS[0], + bucket_url=FACTORY_ARGS[0]["bucket_url"], file_glob="*/*.csv", files_per_page=2, ) diff --git a/tests/filesystem/utils.py b/tests/filesystem/utils.py new file mode 100644 index 000000000..4eb62bd74 --- /dev/null +++ b/tests/filesystem/utils.py @@ -0,0 +1,6 @@ +from typing import Any, Dict, List + + +def unpack_factory_args(factory_args: Dict[str, Any]) -> List[Any]: + """Unpacks filesystem factory arguments from pytest parameters.""" + return [factory_args.get(k) for k in ("bucket_url", "kwargs")] From 64d1d9d23ef25a85f6dfa98f7a3670ca434a8ed8 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 6 Feb 2024 12:15:25 +1100 Subject: [PATCH 2/5] dynamically create test git repo --- tests/filesystem/cases/GIT-SETUP.md | 59 -------------------- tests/filesystem/conftest.py | 83 +++++++++++++++++++++++++++++ tests/filesystem/settings.py | 23 +++++--- tests/filesystem/test_filesystem.py | 4 +- 4 files changed, 102 insertions(+), 67 deletions(-) delete mode 100644 tests/filesystem/cases/GIT-SETUP.md create mode 100644 tests/filesystem/conftest.py diff --git a/tests/filesystem/cases/GIT-SETUP.md b/tests/filesystem/cases/GIT-SETUP.md deleted file mode 100644 index 0bdb7dbf1..000000000 --- a/tests/filesystem/cases/GIT-SETUP.md +++ /dev/null @@ -1,59 +0,0 @@ -# Git repo for testing - -The `./git`folder contains a bare repo used for running tests for the `filesystem` dlt Source. - -# Usage - -For example, use it to test a pipeline that reads files using the `gitpythonfs` fsspec implementation. - -The repo is not needed for regular use of dlt. - -For the tests to pass, use the tag (aka `ref`) called `unmodified-samples`. Using HEAD (the default) is intended to fail tests due to modifications such as a file not having the expected file name. It allows testing of the `ref` functionality of git-based fsspec implementations. - -Some features of the repo are intentionally different to the containing repo (eg verified-sources repo) to help prevent mistakenly testing against (or modifying!) the wrong repo: - -- The default branch is `cases-master` -- the sample files root folder is `samples`, not `tests`. - -# Configuration - -When to configure (build?): -- When setting up an environment - CI, local dev etc. (Unless it's now committed in verified-sources repo) -- After modifying any content in `../samples folder` - -Ideally the repo will be created idempotently by a pytest fixture, `make` script or similar. Until then, these are the manual steps to idempotently create/recreate: - -1. Set working directory to `tests/filesystem/cases/git` -2. Check the current folder contains only `.git`. eg `ls -a`. It's also ok if -the current folder is empty. -3. Delete `.git` and all subfolders. ie, delete the repo. `rm -rf` -4. Make a fresh repo using: - -``` -git init -git checkout -b cases-master -``` - -5. Copy in the folder `../../samples`. ie samples folder and all its contents. eg `cp -r ../../samples .` -6. Put some object in the repo: - -``` -git add --all -git commit -m "add standard sample files for tests" -git tag -a unmodified-samples -m "The sample test files with no modifications" -git mv samples/sample.txt samples/sample_renamed.txt -git commit -m "rename samples.txt to make tests fail" -``` - -5. Delete all working files, except `.git`. eg with `rm -rf samples`. (ToDo: that's not officially not a bare repo. Use `git clone --bare path/to/repo` instead. Maybe we create the repo in a temp folder and then bare clone it into `cases/git` folder, discard the temp folder.) - -# Developing - -Note that at least one IDE - VSCode - does not recognise this repo in the Explorer and Source Control tabs. Likely because it is a repo inside another repo. - -If you are considering committing the repo to its containing repo - eg the verified-sources repo - consider the effects on the size of the containing repo: - -`du -sh ./.git/*` - -That's about the same as the samples folder itself. BUT consider that the largest file, `./.git/objects` might change often if the repo is regenerated. So it might be better to ensure the repo is gitignored (so meta!) and "build" it in each environment where needed. - diff --git a/tests/filesystem/conftest.py b/tests/filesystem/conftest.py new file mode 100644 index 000000000..091b6c9dd --- /dev/null +++ b/tests/filesystem/conftest.py @@ -0,0 +1,83 @@ +from typing import Union, Any, Iterator + +import pytest + +import os +import subprocess +import shutil +import pathlib +from .settings import ( + TEST_SAMPLES_PATH, + REPO_FIXTURE_PATH, + REPO_SAFE_PREFIX, + REPO_GOOD_REF, +) + + +@pytest.fixture(scope="module", autouse=True) +def repo_fixture( + repo_path: Union[str, os.PathLike] = REPO_FIXTURE_PATH, + samples_path: Union[str, os.PathLike] = TEST_SAMPLES_PATH, + safe_prefix: str = REPO_SAFE_PREFIX, + good_ref: str = REPO_GOOD_REF, +) -> Iterator[Any]: + """Create a temporary git repository to test git-based filesystems. + + Args: + repo_path (str): The path at which to create the temporary repo. Defaults to REPO_FIXTURE_PATH. It is safest + to create the repo outside your software project's directory tree so it does not interfere with real repos. + samples_path (str): The path to the sample files, which will be added to the repo. Defaults to TEST_SAMPLES_PATH. + safe_prefix (str): Helps prevent mixups between the test repo and real repos. Defaults to REPO_SAFE_PREFIX. + good_ref (str): The git ref for the unmodified sample files. Later commits intentionally break the sample + files so that tests will fail if the system under test doesn't correctly handle refs. + Defaults to REPO_SAFE_REF. + + Yields: + Tuple[str, str]: A tuple containing the repo_path and good_ref. + + """ + repo_path = pathlib.Path(repo_path) + samples_path = pathlib.Path(samples_path) + + try: + try: + os.mkdir(repo_path) + except FileExistsError: + raise FileExistsError( + f"Directory `{repo_path.absolute()}` already exists." + "It should have been removed by the previous test run." + ) + + # NOTE: `git init -b` option requires git 2.28 or later. + subprocess.call(f"git init -b {safe_prefix}master", shell=True, cwd=repo_path) + subprocess.call( + "git config user.email 'you@example.com'", shell=True, cwd=repo_path + ) + subprocess.call("git config user.name 'Your Name'", shell=True, cwd=repo_path) + shutil.copytree( + samples_path, repo_path / f"{safe_prefix}samples", dirs_exist_ok=False + ) + subprocess.call("git add --all", shell=True, cwd=repo_path) + subprocess.call( + "git commit -m 'add standard sample files for tests'", + shell=True, + cwd=repo_path, + ) + subprocess.call( + f"git tag -a {good_ref} -m 'The sample test files with no modifications'", + shell=True, + cwd=repo_path, + ) + subprocess.call( + "git mv sample.txt sample_renamed.txt", + shell=True, + cwd=repo_path / f"{safe_prefix}samples", + ) + subprocess.call( + "git commit -m 'rename samples.txt to make primitive test fail if at HEAD'", + shell=True, + cwd=repo_path, + ) + yield repo_path, good_ref + finally: + shutil.rmtree(repo_path) diff --git a/tests/filesystem/settings.py b/tests/filesystem/settings.py index baafd9dfb..ffb8ce753 100644 --- a/tests/filesystem/settings.py +++ b/tests/filesystem/settings.py @@ -1,17 +1,28 @@ -import os +from typing import Union +from os import PathLike +from pathlib import Path +from tempfile import gettempdir + + +TEST_SAMPLES_PATH: str = "tests/filesystem/samples" +REPO_FIXTURE_PATH: Union[str, PathLike] = Path( + gettempdir(), "dlt_test_repo_t8hY3x" +).absolute() +REPO_SAFE_PREFIX: str = "test-" +REPO_GOOD_REF = "good-ref" FACTORY_ARGS = [ - {"bucket_url": os.path.abspath("tests/filesystem/samples")}, + {"bucket_url": str(Path(TEST_SAMPLES_PATH).absolute())}, {"bucket_url": "s3://dlt-ci-test-bucket/standard_source/samples"}, {"bucket_url": "gs://ci-test-bucket/standard_source/samples"}, {"bucket_url": "az://dlt-ci-test-bucket/standard_source/samples"}, { - "bucket_url": "gitpythonfs://samples", + "bucket_url": f"gitpythonfs://{REPO_SAFE_PREFIX}samples", "kwargs": { - "repo_path": "tests/filesystem/cases/git", - "ref": "unmodified-samples", + "repo_path": REPO_FIXTURE_PATH, + "ref": REPO_GOOD_REF, }, - } + }, ] GLOB_RESULTS = [ diff --git a/tests/filesystem/test_filesystem.py b/tests/filesystem/test_filesystem.py index b3f96b760..20240e458 100644 --- a/tests/filesystem/test_filesystem.py +++ b/tests/filesystem/test_filesystem.py @@ -67,7 +67,7 @@ def assert_sample_content(items: List[FileItem]): content = item.read_bytes() assert content == b"dlthub content" assert item["size_in_bytes"] == 14 - assert item["file_url"].endswith("/samples/sample.txt") + assert item["file_url"].endswith("samples/sample.txt") assert item["mime_type"] == "text/plain" assert isinstance(item["modification_date"], pendulum.DateTime) @@ -93,7 +93,7 @@ def assert_csv_file(item: FileItem): # on windows when checking out, git will convert lf into cr+lf so we have more bytes (+ number of lines: 25) assert item["size_in_bytes"] in (742, 767) assert item["file_name"] == "met_csv/A801/A881_20230920.csv" - assert item["file_url"].endswith("/samples/met_csv/A801/A881_20230920.csv") + assert item["file_url"].endswith("samples/met_csv/A801/A881_20230920.csv") assert item["mime_type"] == "text/csv" # print(item) return item From ff9228a7e3dcecb1ab51eb51cdfa4c8184b48517 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 9 Feb 2024 10:59:41 +1100 Subject: [PATCH 3/5] Use dlt from feature branch --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f2cf9895d..a83f33a92 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,8 @@ packages = [{include = "sources"}] [tool.poetry.dependencies] python = ">=3.8.1,<3.13" -dlt = {version = "0.4.4", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]} +# dlt = {version = "0.4.4", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]} +dlt = {git = "https://github.com/deanja/dlt.git", branch = "add-git-to-filesystem-source-301", extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]} [tool.poetry.group.dev.dependencies] mypy = "1.6.1" From fbbec9172c1c3621611ef3c86b8480093877f050 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 16 Feb 2024 09:48:20 +1100 Subject: [PATCH 4/5] improve readability of test parameters --- tests/filesystem/settings.py | 8 +++++++- tests/filesystem/test_filesystem.py | 22 +++++++++++----------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/tests/filesystem/settings.py b/tests/filesystem/settings.py index ffb8ce753..a7c3db1dd 100644 --- a/tests/filesystem/settings.py +++ b/tests/filesystem/settings.py @@ -25,7 +25,11 @@ }, ] -GLOB_RESULTS = [ +FACTORY_TEST_IDS = [ + f"url={factory_args['bucket_url'][:15]}..." for factory_args in FACTORY_ARGS +] + +GLOBS = [ { "glob": None, "file_names": ["sample.txt"], @@ -84,3 +88,5 @@ "file_names": ["sample.txt"], }, ] + +GLOB_TEST_IDS = [f"glob={glob_result['glob']}" for glob_result in GLOBS] diff --git a/tests/filesystem/test_filesystem.py b/tests/filesystem/test_filesystem.py index 20240e458..3dba411a1 100644 --- a/tests/filesystem/test_filesystem.py +++ b/tests/filesystem/test_filesystem.py @@ -20,12 +20,12 @@ ) from tests.filesystem.utils import unpack_factory_args -from .settings import GLOB_RESULTS, FACTORY_ARGS +from .settings import GLOBS, GLOB_TEST_IDS, FACTORY_ARGS, FACTORY_TEST_IDS -@pytest.mark.parametrize("factory_args", FACTORY_ARGS) -@pytest.mark.parametrize("glob_params", GLOB_RESULTS) -def test_file_list(factory_args: Dict[str, Any], glob_params: Dict[str, Any]) -> None: +@pytest.mark.parametrize("factory_args", FACTORY_ARGS, ids=FACTORY_TEST_IDS) +@pytest.mark.parametrize("globs", GLOBS, ids=GLOB_TEST_IDS) +def test_file_list(factory_args: Dict[str, Any], globs: Dict[str, Any]) -> None: bucket_url, kwargs = unpack_factory_args(factory_args) @dlt.transformer @@ -33,7 +33,7 @@ def bypass(items) -> str: return items # we only pass the glob parameter to the resource if it is not None - if file_glob := glob_params["glob"]: + if file_glob := globs["glob"]: filesystem_res = ( filesystem( bucket_url=bucket_url, @@ -48,12 +48,12 @@ def bypass(items) -> str: all_files = list(filesystem_res) file_count = len(all_files) file_names = [item["file_name"] for item in all_files] - assert file_count == len(glob_params["file_names"]) - assert file_names == glob_params["file_names"] + assert file_count == len(globs["file_names"]) + assert file_names == globs["file_names"] @pytest.mark.parametrize("extract_content", [True, False]) -@pytest.mark.parametrize("factory_args", FACTORY_ARGS) +@pytest.mark.parametrize("factory_args", FACTORY_ARGS, ids=FACTORY_TEST_IDS) def test_load_content_resources( factory_args: Dict[str, Any], extract_content: bool ) -> None: @@ -120,7 +120,7 @@ def test_fsspec_as_credentials(): print(list(gs_resource)) -@pytest.mark.parametrize("factory_args", FACTORY_ARGS) +@pytest.mark.parametrize("factory_args", FACTORY_ARGS, ids=FACTORY_TEST_IDS) def test_csv_transformers(factory_args: Dict[str, Any]) -> None: from sources.filesystem_pipeline import read_csv @@ -170,7 +170,7 @@ def test_csv_transformers(factory_args: Dict[str, Any]) -> None: assert load_table_counts(pipeline, "met_csv") == {"met_csv": 48} -@pytest.mark.parametrize("factory_args", FACTORY_ARGS) +@pytest.mark.parametrize("factory_args", FACTORY_ARGS, ids=FACTORY_TEST_IDS) def test_standard_readers(factory_args: Dict[str, Any]) -> None: bucket_url, kwargs = unpack_factory_args(factory_args) @@ -238,7 +238,7 @@ def _copy(item: FileItemDict): # print(pipeline.default_schema.to_pretty_yaml()) -@pytest.mark.parametrize("factory_args", FACTORY_ARGS) +@pytest.mark.parametrize("factory_args", FACTORY_ARGS, ids=FACTORY_TEST_IDS) def test_incremental_load(factory_args: Dict[str, Any]) -> None: @dlt.transformer def bypass(items) -> str: From 229f7a5fac7dac86aad9a6685f783500df208d5f Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 16 Feb 2024 21:44:49 +1100 Subject: [PATCH 5/5] use distinct filesystem instances in pipeline threads - objects yielded by @dlt.resource can run in different threads - note this does not address possible bug with filesystem instances being cached by fsspec itself. --- sources/filesystem/__init__.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sources/filesystem/__init__.py b/sources/filesystem/__init__.py index d41ccc35d..2b08ea03d 100644 --- a/sources/filesystem/__init__.py +++ b/sources/filesystem/__init__.py @@ -4,8 +4,11 @@ import dlt from dlt.common.typing import copy_sig, DictStrAny from dlt.sources import DltResource -from dlt.sources.filesystem import FileItem, FileItemDict, fsspec_filesystem, glob_files +from dlt.sources.filesystem import FileItem, FileItemDict, glob_files from dlt.sources.credentials import FileSystemCredentials +from dlt.common.storages.fsspec_filesystem import fsspec_from_config +from dlt.common.storages.configuration import FilesystemConfiguration + from .helpers import ( AbstractFileSystem, @@ -90,14 +93,15 @@ def filesystem( Returns: Iterator[List[FileItem]]: The list of files. """ + fs_config = FilesystemConfiguration(bucket_url, credentials, kwargs=kwargs) if isinstance(credentials, AbstractFileSystem): fs_client = credentials else: - fs_client = fsspec_filesystem(bucket_url, credentials, kwargs=kwargs)[0] + fs_client = fsspec_from_config(fs_config)[0] files_chunk: List[FileItem] = [] for file_model in glob_files(fs_client, bucket_url, file_glob): - file_dict = FileItemDict(file_model, fs_client) + file_dict = FileItemDict(file_model, fs_config) if extract_content: file_dict["file_content"] = file_dict.read_bytes() files_chunk.append(file_dict) # type: ignore