Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/fly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
- name: Deploy to Staging
run: |
flyctl deploy --remote-only --config fly.staging.toml
python update_database.py staging https://offsets-db-staging.fly.dev/files
python update_database.py staging --url https://offsets-db-staging.fly.dev/files

- name: Deploy to Production
if: github.ref == 'refs/heads/main' || github.event_name == 'workflow_dispatch'
Expand Down Expand Up @@ -121,4 +121,4 @@ jobs:
- name: Update Production DB
if: github.ref == 'refs/heads/main' || github.event_name == 'workflow_dispatch'
run: |
python update_database.py production https://offsets-db.fly.dev/files
python update_database.py production --url https://offsets-db.fly.dev/files
4 changes: 2 additions & 2 deletions .github/workflows/update-db.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ jobs:

- name: Seed Staging Database
run: |
python update_database.py staging https://offsets-db-staging.fly.dev/files/
python update_database.py staging --url https://offsets-db-staging.fly.dev/files/

- name: Seed Production Database
if: github.event_name == 'workflow_dispatch' || github.event_name == 'schedule' || github.event_name == 'repository_dispatch'
run: |
python update_database.py production https://offsets-db.fly.dev/files/
python update_database.py production --url https://offsets-db.fly.dev/files/

- name: Notify Slack on Failure
if: failure() && (github.event_name == 'workflow_dispatch' || github.event_name == 'schedule')
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,4 @@ offsets_db_api/_version.py
*.csv
*.gz
cache-watch-dog/
staging-files.json
119 changes: 98 additions & 21 deletions update_database.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import datetime
import json
import os
Expand All @@ -9,15 +10,18 @@


def generate_path(*, date: datetime.date, bucket: str, category: str) -> str:
"""Generate S3 path for a given date and category."""
return f'{bucket.rstrip("/")}/final/{date.strftime("%Y-%m-%d")}/{category}.parquet'


def calculate_date(*, days_back: int) -> datetime.date:
"""Calculate a date relative to today."""
return datetime.datetime.now(datetime.timezone.utc).date() - datetime.timedelta(days=days_back)


def get_latest(*, bucket: str):
fs = fsspec.filesystem('s3') # Assuming S3, adjust accordingly
def get_latest(*, bucket: str) -> list[dict[str, str]]:
"""Get the latest data files from the S3 bucket."""
fs = fsspec.filesystem('s3')
today, yesterday = calculate_date(days_back=0), calculate_date(days_back=1)

items = [
Expand All @@ -37,18 +41,19 @@ def get_latest(*, bucket: str):
elif fs.exists(previous_path):
entry_url = previous_path
else:
print(f"Warning: Both {latest_path} and {previous_path} don't exist, skipping")
raise ValueError(f"both {latest_path} and {previous_path} file paths don't exist")

data.append({'category': key, 'url': entry_url})

# Handle weekly summaries
weekly_summary_start = datetime.date(year=2024, month=2, day=6)
weekly_summary_end = datetime.datetime.now(datetime.timezone.utc).date()
date_ranges = pd.date_range(
start=weekly_summary_start, end=weekly_summary_end, freq='W-TUE', inclusive='both'
)

added_weeks = set()

for entry in date_ranges:
value = entry.isocalendar()
week_num = f'{value.year}-{value.week}'
Expand All @@ -59,23 +64,32 @@ def get_latest(*, bucket: str):
if fs.exists(weekly_summary_path):
data.append({'category': 'clips', 'url': weekly_summary_path})
added_weeks.add(week_num)
else:
print(f"weekly summary path {weekly_summary_path} doesn't exist")

return data


def post_data_to_environment(*, env: str, bucket: str) -> None:
files = get_latest(bucket=bucket)

[print(file) for file in files]

# get X-API-KEY from env and use it in headers
def load_files_from_json(file_path: str) -> list[dict[str, str]]:
"""Load file definitions from a JSON file."""
try:
with open(file_path) as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f'Error loading file list: {e}')
sys.exit(1)


def post_data_to_environment(
*,
env: str,
url: str,
files: list[dict[str, str]],
) -> None:
"""Post file definitions to the API."""
# Get API key from environment
if env == 'production':
api_key = os.environ.get('OFFSETS_DB_API_KEY_PRODUCTION')
if api_key is None:
raise ValueError('OFFSETS_DB_API_KEY_PRODUCTION environment variable not set')

else:
api_key = os.environ.get('OFFSETS_DB_API_KEY_STAGING')
if api_key is None:
Expand All @@ -86,21 +100,84 @@ def post_data_to_environment(*, env: str, bucket: str) -> None:
'Content-Type': 'application/json',
'X-API-KEY': api_key,
}

print(f'\nSending {len(files)} files to {url}:')
for file in files:
print(f'- {file["category"]}: {file["url"]}')

# Send the request
response = requests.post(url, headers=headers, data=json.dumps(files))

# Log the response
if response.ok:
print(f'Success in {env}:', response.json())
print(f'\nSuccess in {env}:', response.json())
else:
print(f'\nFailed in {env}:', response.text)
sys.exit(1)


def main():
parser = argparse.ArgumentParser(
description='Update offsets database with latest data files',
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

parser.add_argument(
'environment',
choices=['staging', 'production'],
help='Target environment for the update',
)

parser.add_argument(
'--url',
'-u',
default='http://127.0.0.1:8000/files/',
help='API endpoint URL',
)

parser.add_argument(
'--bucket',
'-b',
default='s3://carbonplan-offsets-db',
help='S3 bucket containing data files',
)

parser.add_argument(
'--files',
'-f',
help='JSON file containing list of files to upload (overrides automatic discovery)',
)

parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be uploaded without sending data',
)

args = parser.parse_args()

print(f'Seeding {args.environment} database using URL: {args.url}')

# Determine files to upload
if args.files:
files = load_files_from_json(args.files)
print(f'Using {len(files)} files from {args.files}')
else:
print(f'Failed in {env}:', response.text)
files = get_latest(bucket=args.bucket)
print(f'Found {len(files)} latest files from {args.bucket}')

if not files:
print('No files to upload!')
sys.exit(1)

if args.dry_run:
print('\nDRY RUN - Would upload these files:')
for file in files:
print(f'- {file["category"]}: {file["url"]}')
return

post_data_to_environment(env=args.environment, url=args.url, files=files)


if __name__ == '__main__':
env = sys.argv[1] if len(sys.argv) > 1 else 'staging'
if env not in ['staging', 'production']:
raise ValueError(f'env must be either "staging" or "production", not {env}')
url = sys.argv[2] if len(sys.argv) > 2 else 'http://127.0.0.1:8000/files/'
bucket = 's3://carbonplan-offsets-db'
print(f'Seeding {env} database using URL: {url}...')
post_data_to_environment(env=env, bucket=bucket)
main()
Loading