|
1 | 1 | import logging |
| 2 | +from pathlib import Path |
2 | 3 |
|
3 | 4 | import polars as pl |
4 | 5 | from dotenv import load_dotenv |
5 | 6 |
|
6 | | -from pypi_scout.config import Config |
| 7 | +from pypi_scout.config import Config, StorageBackend |
7 | 8 | from pypi_scout.data.description_cleaner import CLEANING_FAILED, DescriptionCleaner |
8 | 9 | from pypi_scout.data.reader import DataReader |
| 10 | +from pypi_scout.utils.blob_io import BlobIO |
9 | 11 | from pypi_scout.utils.logging import setup_logging |
10 | 12 |
|
11 | 13 |
|
@@ -42,20 +44,59 @@ def clean_descriptions(df): |
42 | 44 | return df |
43 | 45 |
|
44 | 46 |
|
45 | | -def store_processed_dataset(df, processed_dataset_path): |
| 47 | +def store_processed_dataset_local(df: pl.DataFrame, processed_dataset_path: Path): |
46 | 48 | logging.info("Storing the processed dataset...") |
47 | 49 | df.write_csv(processed_dataset_path) |
48 | 50 | logging.info("✅ Done!") |
49 | 51 |
|
50 | 52 |
|
51 | | -def process_dataset(): |
52 | | - load_dotenv() |
53 | | - config = Config() |
| 53 | +def store_processed_dataset_blob(df: pl.DataFrame, blob_io: BlobIO, blob_name: str): |
| 54 | + logging.info(f"Storing the processed dataset as {blob_name} in container '{blob_io.container_name}'...") |
| 55 | + blob_io.upload_csv(df, blob_name) |
| 56 | + logging.info("✅ Done!") |
| 57 | + |
| 58 | + |
| 59 | +def handle_for_local_backend(config: Config): |
| 60 | + if (config.DATA_DIR / config.PROCESSED_DATASET_CSV_NAME).exists(): |
| 61 | + logging.info(f"✔️ Processed dataset {config.PROCESSED_DATASET_CSV_NAME} already exists! Skipping.") |
| 62 | + return |
| 63 | + |
54 | 64 | df = read_raw_dataset(config.DATA_DIR / config.RAW_DATASET_CSV_NAME) |
55 | 65 | if config.FRAC_DATA_TO_INCLUDE < 1.0: |
56 | 66 | df = filter_top_packages(df, config.FRAC_DATA_TO_INCLUDE) |
57 | 67 | df = clean_descriptions(df) |
58 | | - store_processed_dataset(df, config.DATA_DIR / config.PROCESSED_DATASET_CSV_NAME) |
| 68 | + |
| 69 | + store_processed_dataset_local(df, config.DATA_DIR / config.PROCESSED_DATASET_CSV_NAME) |
| 70 | + |
| 71 | + |
| 72 | +def handle_for_blob_backend(config: Config): |
| 73 | + blob_io = BlobIO( |
| 74 | + config.STORAGE_BACKEND_BLOB_ACCOUNT_NAME, |
| 75 | + config.STORAGE_BACKEND_BLOB_CONTAINER_NAME, |
| 76 | + config.STORAGE_BACKEND_BLOB_KEY, |
| 77 | + ) |
| 78 | + |
| 79 | + if blob_io.exists(config.PROCESSED_DATASET_CSV_NAME): |
| 80 | + logging.info( |
| 81 | + f"✔️ Raw dataset {config.PROCESSED_DATASET_CSV_NAME} already exists in container '{config.STORAGE_BACKEND_BLOB_CONTAINER_NAME}'! Skipping download." |
| 82 | + ) |
| 83 | + return |
| 84 | + |
| 85 | + df = read_raw_dataset(config.DATA_DIR / config.RAW_DATASET_CSV_NAME) |
| 86 | + if config.FRAC_DATA_TO_INCLUDE < 1.0: |
| 87 | + df = filter_top_packages(df, config.FRAC_DATA_TO_INCLUDE) |
| 88 | + df = clean_descriptions(df) |
| 89 | + |
| 90 | + store_processed_dataset_blob(df, blob_io, config.PROCESSED_DATASET_CSV_NAME) |
| 91 | + |
| 92 | + |
| 93 | +def process_dataset(): |
| 94 | + load_dotenv() |
| 95 | + config = Config() |
| 96 | + if config.STORAGE_BACKEND == StorageBackend.LOCAL: |
| 97 | + handle_for_local_backend(config) |
| 98 | + else: |
| 99 | + handle_for_blob_backend(config) |
59 | 100 |
|
60 | 101 |
|
61 | 102 | if __name__ == "__main__": |
|
0 commit comments