diff --git a/.github/workflows/fly.yml b/.github/workflows/fly.yml index ff6116e..3c89f8c 100644 --- a/.github/workflows/fly.yml +++ b/.github/workflows/fly.yml @@ -64,7 +64,7 @@ jobs: - name: Deploy to Staging run: | flyctl deploy --remote-only --config fly.staging.toml - python update_database.py staging https://offsets-db-staging.fly.dev/files + python update_database.py staging --url https://offsets-db-staging.fly.dev/files - name: Deploy to Production if: github.ref == 'refs/heads/main' || github.event_name == 'workflow_dispatch' @@ -121,4 +121,4 @@ jobs: - name: Update Production DB if: github.ref == 'refs/heads/main' || github.event_name == 'workflow_dispatch' run: | - python update_database.py production https://offsets-db.fly.dev/files + python update_database.py production --url https://offsets-db.fly.dev/files diff --git a/.github/workflows/update-db.yaml b/.github/workflows/update-db.yaml index d55c546..e0c0d3f 100644 --- a/.github/workflows/update-db.yaml +++ b/.github/workflows/update-db.yaml @@ -55,12 +55,12 @@ jobs: - name: Seed Staging Database run: | - python update_database.py staging https://offsets-db-staging.fly.dev/files/ + python update_database.py staging --url https://offsets-db-staging.fly.dev/files/ - name: Seed Production Database if: github.event_name == 'workflow_dispatch' || github.event_name == 'schedule' || github.event_name == 'repository_dispatch' run: | - python update_database.py production https://offsets-db.fly.dev/files/ + python update_database.py production --url https://offsets-db.fly.dev/files/ - name: Notify Slack on Failure if: failure() && (github.event_name == 'workflow_dispatch' || github.event_name == 'schedule') diff --git a/.gitignore b/.gitignore index 0dfe524..54ae151 100644 --- a/.gitignore +++ b/.gitignore @@ -137,3 +137,4 @@ offsets_db_api/_version.py *.csv *.gz cache-watch-dog/ +staging-files.json diff --git a/update_database.py b/update_database.py index e6ffe21..fe9f9b5 100644 --- a/update_database.py +++ b/update_database.py @@ -1,3 +1,4 @@ +import argparse import datetime import json import os @@ -9,15 +10,18 @@ def generate_path(*, date: datetime.date, bucket: str, category: str) -> str: + """Generate S3 path for a given date and category.""" return f'{bucket.rstrip("/")}/final/{date.strftime("%Y-%m-%d")}/{category}.parquet' def calculate_date(*, days_back: int) -> datetime.date: + """Calculate a date relative to today.""" return datetime.datetime.now(datetime.timezone.utc).date() - datetime.timedelta(days=days_back) -def get_latest(*, bucket: str): - fs = fsspec.filesystem('s3') # Assuming S3, adjust accordingly +def get_latest(*, bucket: str) -> list[dict[str, str]]: + """Get the latest data files from the S3 bucket.""" + fs = fsspec.filesystem('s3') today, yesterday = calculate_date(days_back=0), calculate_date(days_back=1) items = [ @@ -37,10 +41,12 @@ def get_latest(*, bucket: str): elif fs.exists(previous_path): entry_url = previous_path else: + print(f"Warning: Both {latest_path} and {previous_path} don't exist, skipping") raise ValueError(f"both {latest_path} and {previous_path} file paths don't exist") data.append({'category': key, 'url': entry_url}) + # Handle weekly summaries weekly_summary_start = datetime.date(year=2024, month=2, day=6) weekly_summary_end = datetime.datetime.now(datetime.timezone.utc).date() date_ranges = pd.date_range( @@ -48,7 +54,6 @@ def get_latest(*, bucket: str): ) added_weeks = set() - for entry in date_ranges: value = entry.isocalendar() week_num = f'{value.year}-{value.week}' @@ -59,23 +64,32 @@ def get_latest(*, bucket: str): if fs.exists(weekly_summary_path): data.append({'category': 'clips', 'url': weekly_summary_path}) added_weeks.add(week_num) - else: - print(f"weekly summary path {weekly_summary_path} doesn't exist") return data -def post_data_to_environment(*, env: str, bucket: str) -> None: - files = get_latest(bucket=bucket) - - [print(file) for file in files] - - # get X-API-KEY from env and use it in headers +def load_files_from_json(file_path: str) -> list[dict[str, str]]: + """Load file definitions from a JSON file.""" + try: + with open(file_path) as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError) as e: + print(f'Error loading file list: {e}') + sys.exit(1) + + +def post_data_to_environment( + *, + env: str, + url: str, + files: list[dict[str, str]], +) -> None: + """Post file definitions to the API.""" + # Get API key from environment if env == 'production': api_key = os.environ.get('OFFSETS_DB_API_KEY_PRODUCTION') if api_key is None: raise ValueError('OFFSETS_DB_API_KEY_PRODUCTION environment variable not set') - else: api_key = os.environ.get('OFFSETS_DB_API_KEY_STAGING') if api_key is None: @@ -86,21 +100,84 @@ def post_data_to_environment(*, env: str, bucket: str) -> None: 'Content-Type': 'application/json', 'X-API-KEY': api_key, } + + print(f'\nSending {len(files)} files to {url}:') + for file in files: + print(f'- {file["category"]}: {file["url"]}') + # Send the request response = requests.post(url, headers=headers, data=json.dumps(files)) # Log the response if response.ok: - print(f'Success in {env}:', response.json()) + print(f'\nSuccess in {env}:', response.json()) + else: + print(f'\nFailed in {env}:', response.text) + sys.exit(1) + + +def main(): + parser = argparse.ArgumentParser( + description='Update offsets database with latest data files', + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + 'environment', + choices=['staging', 'production'], + help='Target environment for the update', + ) + + parser.add_argument( + '--url', + '-u', + default='http://127.0.0.1:8000/files/', + help='API endpoint URL', + ) + + parser.add_argument( + '--bucket', + '-b', + default='s3://carbonplan-offsets-db', + help='S3 bucket containing data files', + ) + + parser.add_argument( + '--files', + '-f', + help='JSON file containing list of files to upload (overrides automatic discovery)', + ) + + parser.add_argument( + '--dry-run', + action='store_true', + help='Show what would be uploaded without sending data', + ) + + args = parser.parse_args() + + print(f'Seeding {args.environment} database using URL: {args.url}') + + # Determine files to upload + if args.files: + files = load_files_from_json(args.files) + print(f'Using {len(files)} files from {args.files}') else: - print(f'Failed in {env}:', response.text) + files = get_latest(bucket=args.bucket) + print(f'Found {len(files)} latest files from {args.bucket}') + + if not files: + print('No files to upload!') + sys.exit(1) + + if args.dry_run: + print('\nDRY RUN - Would upload these files:') + for file in files: + print(f'- {file["category"]}: {file["url"]}') + return + + post_data_to_environment(env=args.environment, url=args.url, files=files) if __name__ == '__main__': - env = sys.argv[1] if len(sys.argv) > 1 else 'staging' - if env not in ['staging', 'production']: - raise ValueError(f'env must be either "staging" or "production", not {env}') - url = sys.argv[2] if len(sys.argv) > 2 else 'http://127.0.0.1:8000/files/' - bucket = 's3://carbonplan-offsets-db' - print(f'Seeding {env} database using URL: {url}...') - post_data_to_environment(env=env, bucket=bucket) + main()