From 71a37770d09e0a5da1bb7eef2e987f370ccc95e5 Mon Sep 17 00:00:00 2001 From: Trinh Thanh Phong Date: Thu, 10 Oct 2024 17:24:36 +0700 Subject: [PATCH 1/3] Add HuggingFace adapter --- src/flysystem/adapters/hgface.py | 297 +++++++++++++++++++++++++++++++ src/flysystem/error.py | 53 ++++++ src/flysystem/filesystem.py | 4 +- 3 files changed, 352 insertions(+), 2 deletions(-) create mode 100644 src/flysystem/adapters/hgface.py diff --git a/src/flysystem/adapters/hgface.py b/src/flysystem/adapters/hgface.py new file mode 100644 index 0000000..9a8b976 --- /dev/null +++ b/src/flysystem/adapters/hgface.py @@ -0,0 +1,297 @@ +import os.path +import shutil + +from typing import IO, Any, Dict, List, Optional + +from huggingface_hub import HfApi, snapshot_download +from huggingface_hub.errors import HfHubHTTPError, RepositoryNotFoundError, RevisionNotFoundError + +from src.flysystem.adapters import FilesystemAdapter +from src.flysystem.error import UnableToDownload, UnableToUpload + + +class HuggingFaceFilesystemAdapter(FilesystemAdapter): + """ + Hugging Face Filesystem Adapter + """ + + def __init__(self, token: str) -> None: + self.token = token + self.api = HfApi(token=token) + + def file_exists(self, path: str) -> bool: + """ + Determine if a file exists. + Arguments: + path: The file path + Returns: + True if the file exsited + """ + raise NotImplementedError + + def directory_exists(self, path: str) -> bool: + """ + Determine if a directory exists. + Arguments: + path: The directory path + Returns: + True if the directory existed + """ + raise NotImplementedError + + def write(self, path: str, contents: str, options: Dict[str, Any] = None): + """ + Write the contents of a file. + Arguments: + path: The file path + contents: The contents to write + options: Write options + Returns: + None + """ + raise NotImplementedError + + def write_stream(self, path: str, resource: IO, options: Dict[str, Any] = None): + """ + Write the contents of a file from stream + Arguments: + path: The file path + resource: The stream + options: Write options + Returns: + None + """ + raise NotImplementedError + + def read(self, path: str) -> str: + """ + Get the contents of a file. + Arguments: + path: The file path + Returns: + The contents of file as string + """ + raise NotImplementedError + + def read_stream(self, path: str) -> IO: + """ + Read the contents of a file as stream + Arguments: + path: The file path + Returns: + The contents of file as stream + """ + raise NotImplementedError + + def delete(self, path: str): + """ + Delete a file + Arguments: + path: The file path + Returns: + None + """ + raise NotImplementedError + + def delete_directory(self, path: str): + """ + Recursively delete a directory. + Arguments: + path: Directory path to delete + Returns: + True if the directory is deleted successfully + """ + raise NotImplementedError + + def create_directory(self, path: str, options: Dict[str, Any] = None): + """ + Create a directory. + Arguments: + path: Directory path to create + options: Options for create + Returns: + True if the directory is created successfully + """ + raise NotImplementedError + + def set_visibility(self, path: str, visibility: str): + """ + Set file visibility + Arguments: + path: The file path + visibility: New visibility (Valid value: "public" and "private") + Returns: + None + """ + raise NotImplementedError + + def visibility(self, path: str) -> str: + """ + Get visibility of file + Arguments: + path: The file path + Returns: + The file's visibility + """ + raise NotImplementedError + + def file_size(self, path: str) -> int: + """ + Get size of file + Arguments: + path: The file path + Returns: + The file size in bytes + """ + raise NotImplementedError + + def mime_type(self, path: str) -> str: + """ + Get mimetype of file + Arguments: + path: The file path + Returns: + The file's mimetype + """ + raise NotImplementedError + + def last_modified(self, path: str) -> int: + """ + Get last modified time + Arguments: + path: The file path + Returns: + The file's last modified time as timestamp + """ + raise NotImplementedError + + def list_contents(self, path: str) -> List[str]: + """ + Get all (recursive) of the directories within a given directory. + Arguments: + path: Directory path + Returns: + List all directories in the given directory + """ + raise NotImplementedError + + def copy(self, source: str, destination: str, options: Dict[str, Any] = None): + """ + Copy a file + Arguments: + source: Path to source file + destination: Path to destination file + options: Copy options + Returns: + None + """ + raise NotImplementedError + + def move(self, source: str, destination: str, options: Dict[str, Any] = None): + """ + Move a file + Arguments: + source: Path to source file + destination: Path to destination file + options: Move options + Returns: + None + """ + raise NotImplementedError + + def temporary_url(self, path: str, options: Dict[str, Any] = None): + """ + Get pre-signed url of a file + Arguments: + path: The file path + options: Temporary file options + Returns: + The pre-signed url of file as string + """ + raise NotImplementedError + + def download(self, repo_id: str, local_dir: str, resource: Optional[str] = None): + """ + Download resource from Hugging Face + Arguments: + repo_id: The repository id + local_dir: The path that resource saved after downloading. + resource: File or folder name in repo need downloading. If not resource, download all repo. + Returns: + None + """ + try: + if not resource: + snapshot_download(repo_id=repo_id, local_dir=local_dir, token=self.token) + else: + file_path_list = self.api.list_repo_files(repo_id=repo_id) + for file_path in file_path_list: + if file_path.startswith(resource): + tmp_path = os.path.join(local_dir, "tmp") + self.api.hf_hub_download(repo_id=repo_id, filename=file_path, local_dir=tmp_path) + src_path = os.path.join(tmp_path, resource) + dst_path = os.path.join(local_dir, resource.split("/")[-1]) + shutil.move(src_path, dst_path) + shutil.rmtree(tmp_path) + except RepositoryNotFoundError: + raise UnableToDownload.with_location(repo_id, "Repository not found.") + except RevisionNotFoundError: + raise UnableToDownload.with_location(repo_id, "Revision not found.") + except HfHubHTTPError as e: + raise UnableToDownload.with_location(repo_id, str(e)) + except ValueError: + raise UnableToDownload.with_location(repo_id, "Invalid arguments.") + except Exception as e: + raise UnableToDownload.with_location(repo_id, str(e)) + print(f"Resource downloaded to {local_dir}") + + def upload( + self, + local_resource_path: str, + repo_id: str, + commit_message: str, + path_in_repo: Optional[str], + revision: Optional[str], + ): + """ + Upload resource folder to Hugging Face + Arguments: + local_resource_path: The local path of upload resource. + repo_id: The repository id. + commit_message: The commit message to the repository. + path_in_repo: Relative path in the repository. If null, it will be the file name or folder name + revision: Revision to commit from. If null, it will be "main" branch + Returns: + None + """ + print(f"Uploading resource to {repo_id}...") + try: + if not path_in_repo: + path_in_repo = local_resource_path.split("/")[-1] + if os.path.isdir(local_resource_path): + self.api.upload_folder( + folder_path=local_resource_path, + path_in_repo=path_in_repo, + repo_id=repo_id, + commit_message=commit_message, + revision=revision, + ) + else: + self.api.upload_file( + path_or_fileobj=local_resource_path, + path_in_repo=path_in_repo, + repo_id=repo_id, + commit_message=commit_message, + revision=revision, + ) + except RepositoryNotFoundError: + raise UnableToUpload.with_location(repo_id, "Repository not found.") + except RevisionNotFoundError: + raise UnableToUpload.with_location(repo_id, "Revision not found.") + except HfHubHTTPError as e: + raise UnableToUpload.with_location(repo_id, str(e)) + except ValueError: + raise UnableToUpload.with_location(repo_id, "Invalid arguments.") + except Exception as e: + raise UnableToUpload.with_location(repo_id, str(e)) + print(f"Resource uploaded to {repo_id}") diff --git a/src/flysystem/error.py b/src/flysystem/error.py index 677d52d..4bf5a65 100644 --- a/src/flysystem/error.py +++ b/src/flysystem/error.py @@ -53,6 +53,11 @@ class FilesystemOperationFailed(Enum): OPERATION_SET_VISIBILITY = "SET_VISIBILITY" OPERATION_LIST_CONTENTS = "LIST_CONTENTS" OPERATION_TEMPORARY_URL = "TEMPORARY_URL" + OPERATION_DOWNLOAD = "DOWNLOAD" + OPERATION_DOWNLOAD_DIRECTORY = "DOWNLOAD_DIRECTORY" + OPERATION_UPLOAD = "UPLOAD" + OPERATION_UPLOAD_DIRECTORY = "UPLOAD_DIRECTORY" + OPERATION_CONNECT_GOOGLE_DRIVE = "CONNECT_GOOGLE_DRIVE" class UnableToOperateToFile(FlyFilesystemException): @@ -209,3 +214,51 @@ def with_location(cls, location: str, reason: str = "") -> Self: this._location = location this._reason = reason return this + + +@final +class UnableToFindDriveToken(UnableToOperateToFile): + @classmethod + def with_location(cls, location: str, reason: str = "") -> Self: + msg = f"Unable to get the credential token from location: {location}. {reason}".rstrip() + this = cls(msg) + this._operation = FilesystemOperationFailed.OPERATION_CONNECT_GOOGLE_DRIVE.value + this._location = location + this._reason = reason + return this + + +@final +class UnableToReadCredentialFile(UnableToOperateToFile): + @classmethod + def with_location(cls, location: str, reason: str = "") -> Self: + msg = f"Unable to read the credential file from location: {location}. {reason}".rstrip() + this = cls(msg) + this._operation = FilesystemOperationFailed.OPERATION_CONNECT_GOOGLE_DRIVE.value + this._location = location + this._reason = reason + return this + + +@final +class UnableToDownload(UnableToOperateToFile): + @classmethod + def with_location(cls, location: str, reason: str = "") -> Self: + msg = f"Unable to download resource from location: {location}. {reason}".rstrip() + this = cls(msg) + this._operation = FilesystemOperationFailed.OPERATION_DOWNLOAD.value + this._location = location + this._reason = reason + return this + + +@final +class UnableToUpload(UnableToOperateToFile): + @classmethod + def with_location(cls, location: str, reason: str = "") -> Self: + msg = f"Unable to upload resource from location: {location}. {reason}".rstrip() + this = cls(msg) + this._operation = FilesystemOperationFailed.OPERATION_UPLOAD.value + this._location = location + this._reason = reason + return this diff --git a/src/flysystem/filesystem.py b/src/flysystem/filesystem.py index bce37d8..941d822 100644 --- a/src/flysystem/filesystem.py +++ b/src/flysystem/filesystem.py @@ -42,7 +42,7 @@ def has(self, path: str) -> bool: Arguments: path: The directory or file path Returns: - True if the directory exsited + True if the directory existed """ @abstractmethod @@ -58,7 +58,7 @@ def read(self, path: str) -> str: @abstractmethod def read_stream(self, path: str) -> IO: """ - Read the contents of a file as tream + Read the contents of a file as stream Arguments: path: The file path Returns: From f52c68709a2432c64a21848db84d2f4850d49a59 Mon Sep 17 00:00:00 2001 From: Trinh Thanh Phong Date: Thu, 10 Oct 2024 17:25:24 +0700 Subject: [PATCH 2/3] Add Google Drive adapter --- src/flysystem/adapters/drive.py | 423 ++++++++++++++++++++++++++++++++ 1 file changed, 423 insertions(+) create mode 100644 src/flysystem/adapters/drive.py diff --git a/src/flysystem/adapters/drive.py b/src/flysystem/adapters/drive.py new file mode 100644 index 0000000..173dff3 --- /dev/null +++ b/src/flysystem/adapters/drive.py @@ -0,0 +1,423 @@ +import io +import os +import pickle + +from json import JSONDecodeError +from pickle import UnpicklingError +from typing import IO, Any, Dict, List, Optional + +from google.auth.exceptions import TransportError +from google.auth.transport.requests import Request +from google_auth_oauthlib.flow import InstalledAppFlow +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError +from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload +from tqdm import tqdm + +from ..adapters import FilesystemAdapter +from ..error import ( + UnableToCreateDirectory, + UnableToDownload, + UnableToFindDriveToken, + UnableToReadCredentialFile, + UnableToUpload, +) + +SCOPES = ["https://www.googleapis.com/auth/drive"] + + +class DriveFilesystemAdapter(FilesystemAdapter): + """ + Google Drive filesystem adapter class + """ + + def __init__(self, creds_path: str, token_path: str = "/tmp/drive_token.pickle") -> None: + self.creds = None + # Check if file token.pickle exists + if token_path and os.path.exists(token_path): + # Read the token from the file and store it in the variable self.creds + with open(token_path, "rb") as token: + try: + self.creds = pickle.load(token) + except UnpicklingError: + raise UnableToFindDriveToken.with_location(token_path, "Can't pickle the token.") + + # If no valid credentials are available, request the user to log in. + if not self.creds or not self.creds.valid: + # If token is expired, it will be refreshed, else, we will request a new one. + if self.creds and self.creds.expired and self.creds.refresh_token: + self.creds.refresh(Request()) + else: + try: + flow = InstalledAppFlow.from_client_secrets_file(creds_path, SCOPES) + except JSONDecodeError: + raise UnableToReadCredentialFile.with_location( + creds_path, "The input credential file is not JSON formatted." + ) + self.creds = flow.run_local_server(port=0) + + # Save the access token in token.pickle file for future usage + with open(token_path, "wb") as token: + pickle.dump(self.creds, token) + + # Connect to the API service + self.service = build("drive", "v3", credentials=self.creds) + + def create_directory(self, path: str, options: Dict[str, Any] = None) -> Optional[str]: + """ + Create a new directory. + Arguments: + path: The file name + options: Options for create, include parent id + Returns: + String of directory id or None + """ + file_metadata = {"name": path, "mimeType": "application/vnd.google-apps.folder"} + + if "parent_id" in options: + file_metadata["parents"] = [options["parent_id"]] + try: + folder = self.service.files().create(body=file_metadata, fields="id").execute() + except TransportError as e: + raise UnableToCreateDirectory.with_location(path, str(e)) + except HttpError as err: + raise UnableToCreateDirectory.with_location(path, err.reason) + return folder.get("id") + + def upload_file(self, file_path: str, parent_id: str = None) -> Optional[str]: + """ + Upload a file. + Arguments: + file_path: The file path + parent_id: The parent id + Returns: + String of directory id or None + """ + file_name = os.path.basename(file_path) + file_metadata = {"name": file_name} + if parent_id: + file_metadata["parents"] = [parent_id] + try: + media = MediaFileUpload(file_path, resumable=True) + file = self.service.files().create(body=file_metadata, media_body=media, fields="id").execute() + except TransportError as e: + raise UnableToUpload.with_location(file_path, str(e)) + except HttpError as err: + raise UnableToUpload.with_location(file_path, err.reason) + except FileNotFoundError: + raise UnableToUpload.with_location(file_path, "File not found.") + except PermissionError as e: + raise UnableToUpload.with_location(file_path, str(e)) + except Exception as e: + raise UnableToUpload.with_location(file_path, str(e)) + return file.get("id") + + def upload_folder(self, folder_path: str, parent_id: str = None) -> None: + """ + Upload a directory. + Arguments: + folder_path: The folder path + parent_id: The parent id + Returns: + None + """ + try: + folder_name = os.path.basename(folder_path) + folder_id = self.create_directory(folder_name, parent_id) + except FileNotFoundError: + raise UnableToUpload.with_location(folder_path, "Folder not found.") + except PermissionError as e: + raise UnableToUpload.with_location(folder_path, str(e)) + except Exception as e: + raise UnableToUpload.with_location(folder_path, str(e)) + + for item in tqdm(os.listdir(folder_path)): + item_path = os.path.join(folder_path, item) + if os.path.isfile(item_path): + self.upload_file(item_path, folder_id) + elif os.path.isdir(item_path): + self.upload_folder(item_path, folder_id) + + def get_resource_by_name(self, resource_name: str) -> Dict[str, Any]: + try: + query = f"name='{resource_name}' and trashed=false" + response = ( + self.service.files() + .list(q=query, fields="files(id, name, mimeType, createdTime, size, owners(displayName, emailAddress))") + .execute() + ) + + files = response.get("files", []) + except TransportError as e: + raise UnableToDownload.with_location(resource_name, str(e)) + except HttpError as err: + raise UnableToDownload.with_location(resource_name, err.reason) + except Exception as e: + raise UnableToDownload.with_location(resource_name, str(e)) + + if not files: + raise UnableToDownload.with_location(resource_name, "Resource not found") + if len(files) == 1: + return files[0] + # Display detailed information for each file including owner + print("Multiple files/folders found:") + for idx, file in enumerate(files): + size = file.get("size", "Unknown") + owners = ", ".join( + [f"{owner['displayName']} ({owner['emailAddress']})" for owner in file.get("owners", [])] + ) + print( + f"{idx + 1}. Name: {file['name']}, Type: {file['mimeType']}, " + f"Created: {file['createdTime']}, Size: {size}, Owners: {owners}" + ) + + choice = int(input(f"Select a file or folder (1-{len(files)}): ")) + + if 1 <= choice <= len(files): + return files[choice - 1] + else: + raise UnableToDownload.with_location(resource_name, "Invalid choice") + + def download_file(self, file_id: str, file_name: str, download_path: str) -> None: + """ + Download a file. + Arguments: + file_id: The file id + file_name: The file name + download_path: The download path + Returns: + None + """ + try: + request = self.service.files().get_media(fileId=file_id) + fh = io.BytesIO() + downloader = MediaIoBaseDownload(fh, request) + done = False + while done is False: + status, done = downloader.next_chunk() + + fh.seek(0) + os.makedirs(download_path, exist_ok=True) + with open(os.path.join(download_path, file_name), "wb") as f: + f.write(fh.read()) + except TransportError as e: + raise UnableToDownload.with_location(file_name, str(e)) + except HttpError as err: + raise UnableToDownload.with_location(file_name, err.reason) + except Exception as e: + raise UnableToDownload.with_location(file_name, str(e)) + print("File Downloaded") + + def download_folder(self, folder_id, folder_name: str, download_path: str) -> None: + """ + Download a directory. + Arguments: + folder_id: The folder id + folder_name: The folder name + download_path: The download path + Returns: + None + """ + download_path = os.path.join(download_path, folder_name) + os.makedirs(download_path, exist_ok=True) + + try: + results = ( + self.service.files().list(q=f"'{folder_id}' in parents", fields="files(id, name, mimeType)").execute() + ) + except TransportError as e: + raise UnableToDownload.with_location(folder_name, str(e)) + except HttpError as err: + raise UnableToDownload.with_location(folder_name, err.reason) + except Exception as e: + raise UnableToDownload.with_location(folder_name, str(e)) + items = results.get("files", []) + + for item in tqdm(items): + if item["mimeType"] == "application/vnd.google-apps.folder": + new_folder_path = os.path.join(download_path, item["name"]) + os.makedirs(new_folder_path, exist_ok=True) + self.download_folder(item["id"], item["name"], new_folder_path) + else: + self.download_file(item["id"], item["name"], download_path) + print("Folder downloaded") + + def file_exists(self, path: str) -> bool: + """ + Determine if a file exists. + Arguments: + path: The file path + Returns: + True if the file exsited + """ + raise NotImplementedError + + def directory_exists(self, path: str) -> bool: + """ + Determine if a directory exists. + Arguments: + path: The directory path + Returns: + True if the directory existed + """ + raise NotImplementedError + + def write(self, path: str, contents: str, options: Dict[str, Any] = None): + """ + Write the contents of a file. + Arguments: + path: The file path + contents: The contents to write + options: Write options + Returns: + None + """ + raise NotImplementedError + + def write_stream(self, path: str, resource: IO, options: Dict[str, Any] = None): + """ + Write the contents of a file from stream + Arguments: + path: The file path + resource: The stream + options: Write options + Returns: + None + """ + raise NotImplementedError + + def read(self, path: str) -> str: + """ + Get the contents of a file. + Arguments: + path: The file path + Returns: + The contents of file as string + """ + raise NotImplementedError + + def read_stream(self, path: str) -> IO: + """ + Read the contents of a file as stream + Arguments: + path: The file path + Returns: + The contents of file as stream + """ + raise NotImplementedError + + def delete(self, path: str): + """ + Delete a file + Arguments: + path: The file path + Returns: + None + """ + raise NotImplementedError + + def delete_directory(self, path: str): + """ + Recursively delete a directory. + Arguments: + path: Directory path to delete + Returns: + True if the directory is deleted successfully + """ + raise NotImplementedError + + def set_visibility(self, path: str, visibility: str): + """ + Set file visibility + Arguments: + path: The file path + visibility: New visibility (Valid value: "public" and "private") + Returns: + None + """ + raise NotImplementedError + + def visibility(self, path: str) -> str: + """ + Get visibility of file + Arguments: + path: The file path + Returns: + The file's visibility + """ + raise NotImplementedError + + def file_size(self, path: str) -> int: + """ + Get size of file + Arguments: + path: The file path + Returns: + The file size in bytes + """ + raise NotImplementedError + + def mime_type(self, path: str) -> str: + """ + Get mimetype of file + Arguments: + path: The file path + Returns: + The file's mimetype + """ + raise NotImplementedError + + def last_modified(self, path: str) -> int: + """ + Get last modified time + Arguments: + path: The file path + Returns: + The file's last modified time as timestamp + """ + raise NotImplementedError + + def list_contents(self, path: str) -> List[str]: + """ + Get all (recursive) of the directories within a given directory. + Arguments: + path: Directory path + Returns: + List all directories in the given directory + """ + raise NotImplementedError + + def copy(self, source: str, destination: str, options: Dict[str, Any] = None): + """ + Copy a file + Arguments: + source: Path to source file + destination: Path to destination file + options: Copy options + Returns: + None + """ + raise NotImplementedError + + def move(self, source: str, destination: str, options: Dict[str, Any] = None): + """ + Move a file + Arguments: + source: Path to source file + destination: Path to destination file + options: Move options + Returns: + None + """ + raise NotImplementedError + + def temporary_url(self, path: str, options: Dict[str, Any] = None): + """ + Get pre-signed url of a file + Arguments: + path: The file path + options: Temporary file options + Returns: + The pre-signed url of file as string + """ + raise NotImplementedError From 1b16e2a1f1db097256ce4385c1e5e03153d84008 Mon Sep 17 00:00:00 2001 From: Trinh Thanh Phong Date: Thu, 10 Oct 2024 17:26:05 +0700 Subject: [PATCH 3/3] Add CLI for upload and download resource --- cli/download.py | 81 +++++++++++++++++++++++++++++++++++++++++++++ cli/upload.py | 87 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 cli/download.py create mode 100644 cli/upload.py diff --git a/cli/download.py b/cli/download.py new file mode 100644 index 0000000..fa282c7 --- /dev/null +++ b/cli/download.py @@ -0,0 +1,81 @@ +import argparse +import importlib +import sys + +import decouple + +sys.path.append("../") + + +def get_argument(): + parser = argparse.ArgumentParser(description="Python Flysystem Downloader") + + parser.add_argument( + "--adapter", + help="The type of adapter", + choices=["local", "s3", "drive", "huggingface", "memory"], + required=True, + ) + parser.add_argument("--download-path", help="The path of download resource.", type=str, required=True) + parser.add_argument("--resource", type=str, help="Name or path of resource need download.") + parser.add_argument("--env-file", "-e", type=str, help="Env file path.") + + # huggingface + parser.add_argument("--repo-id", type=str, help="The id of huggingface repo.") + parser.add_argument("--huggingface-token", type=str, help="The access token of Huggingface.") + + # google drive + parser.add_argument("--drive-creds-path", help="The path of Google Drive credentials json.", type=str) + parser.add_argument( + "--drive-token-path", + help="The path of Google Drive token pickle file for quick access.", + type=str, + default="/tmp/drive_token.pickle", + ) + + args = parser.parse_args() + return args + + +def main(): + args = get_argument() + if args.env_file: + decouple.config = decouple.Config(decouple.RepositoryEnv(args.env_file)) + + if args.adapter == "local": + adapter_type = importlib.import_module("src.flysystem.adapters.local") + + elif args.adapter == "s3": + adapter_type = importlib.import_module("src.flysystem.adapters.s3") + + elif args.adapter == "drive": + adapter_type = importlib.import_module("src.flysystem.adapters.drive") + adapter = adapter_type.DriveFilesystemAdapter(args.drive_creds_path, args.drive_token_path) + + if not args.resource: + raise Exception("Resource name is required.") + resource = adapter.get_resource_by_name(args.resource) + print(resource) + if resource.get("mimeType") == "application/vnd.google-apps.folder": + adapter.download_folder(resource["id"], resource["name"], args.download_path) + else: + adapter.download_file(resource["id"], resource["name"], args.download_path) + + elif args.adapter == "huggingface": + adapter_type = importlib.import_module("src.flysystem.adapters.hgface") + if args.huggingface_token: + adapter = adapter_type.HuggingFaceFilesystemAdapter(args.huggingface_token) + else: + token = decouple.config("HUGGINGFACE_TOKEN") + adapter = adapter_type.HuggingFaceFilesystemAdapter(token) + adapter.download(args.repo_id, args.download_path, args.resource) + + elif args.adapter == "memory": + adapter_type = importlib.import_module("src.flysystem.adapters.memory") + + else: + raise Exception("Invalid adapter") + + +if __name__ == "__main__": + main() diff --git a/cli/upload.py b/cli/upload.py new file mode 100644 index 0000000..65d7567 --- /dev/null +++ b/cli/upload.py @@ -0,0 +1,87 @@ +import argparse +import importlib +import os +import sys + +import decouple + +sys.path.append("../") + + +def get_argument(): + parser = argparse.ArgumentParser(description="Python Flysystem Uploader") + + parser.add_argument( + "--adapter", + help="The type of adapter", + choices=["local", "s3", "drive", "huggingface", "memory"], + required=True, + ) + parser.add_argument("--upload-resource", help="The path of upload resource.", type=str, required=True) + parser.add_argument("--env-file", "-e", type=str, help="Env file path.") + + # huggingface + parser.add_argument("--repo-id", type=str, help="The id of huggingface repo.") + parser.add_argument("--huggingface-token", type=str, help="The access token of Huggingface.") + parser.add_argument("--message", type=str, help="The message of commit.") + parser.add_argument("--path-in-repo", type=str, help="The path in repo.") + parser.add_argument("--revision", type=str, help="The revision of commit.") + + # google drive + parser.add_argument("--drive-creds-path", help="The path of Google Drive credentials json.", type=str) + parser.add_argument( + "--drive-token-path", + help="The path of Google Drive token pickle file for quick access.", + type=str, + default="/tmp/drive_token.pickle", + ) + parser.add_argument("--parent-id", type=str, help="The id of parent directory.") + + args = parser.parse_args() + return args + + +def main(): + args = get_argument() + if args.env_file: + decouple.config = decouple.Config(decouple.RepositoryEnv(args.env_file)) + + if args.adapter == "local": + adapter_type = importlib.import_module("src.flysystem.adapters.local") + + elif args.adapter == "s3": + adapter_type = importlib.import_module("src.flysystem.adapters.s3") + + elif args.adapter == "drive": + adapter_type = importlib.import_module("src.flysystem.adapters.drive") + adapter = adapter_type.DriveFilesystemAdapter(args.drive_creds_path, args.drive_token_path) + + if os.path.isdir(args.upload_resource): + adapter.upload_folder(args.upload_resource, args.parent_id) + else: + adapter.upload_file(args.upload_resource, args.parent_id) + + elif args.adapter == "huggingface": + adapter_type = importlib.import_module("src.flysystem.adapters.hgface") + if args.huggingface_token: + adapter = adapter_type.HuggingFaceFilesystemAdapter(args.huggingface_token) + else: + token = decouple.config("HUGGINGFACE_TOKEN") + adapter = adapter_type.HuggingFaceFilesystemAdapter(token) + adapter.upload( + args.upload_resource, + args.repo_id, + args.message, + args.path_in_repo, + args.revision, + ) + + elif args.adapter == "memory": + adapter_type = importlib.import_module("src.flysystem.adapters.memory") + + else: + raise Exception("Invalid adapter") + + +if __name__ == "__main__": + main()