Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions .github/workflows/snowflake-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
name: Python Linting, Formatting, Testing, Coverage
on:
workflow_dispatch:
pull_request:
branches: [master, 42-build-data-preparation-infrastructure-for-feature-store-notebooks]
paths:
- "integration/snowflake/**"
jobs:
test:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./integration/snowflake
strategy:
matrix:
python-version: ["3.12", "3.13"]
fail-fast: false
steps:
- uses: actions/checkout@v4
with:
lfs: true
- name: Set up Python ${{ matrix.python-version }} and uv
uses: astral-sh/setup-uv@v5
with:
python-version: ${{ matrix.python-version }}
enable-cache: true
cache-dependency-glob: |
**/pyproject.toml
- name: Install dependencies
run: |
uv sync \
--python ${{ matrix.python-version }} \
--group dev
- name: Check linting and formatting
run: |
uv run --active ruff check .
uv run --active ruff format --check .
- name: Check types
run: |
uv run --active basedpyright
- name: Show TODOs
run: |
uv run --active ruff check --select FIX . || true
- name: Run tests and coverage
env:
SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
SNOWFLAKE_ROLE: ${{ secrets.SNOWFLAKE_ROLE }}
SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }}
SNOWFLAKE_DATABASE: ${{ secrets.SNOWFLAKE_DATABASE }}
run: |-
uv run --active pytest \
--cov=. \
--cov-report=term \
-v
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
**/.DS_Store

# Temp
preparation/
/preparation/

# Binder
.bash_logout
Expand Down Expand Up @@ -152,3 +152,8 @@ dmypy.json
*_spark/
*_pipeline/
.vscode/

# LLM instructions

copilot-instructions.md
CLAUDE.md
32 changes: 32 additions & 0 deletions integration/GENERATE_JAFFLE_SHOP_PARQUET.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Generate Parquet files from Jaffle Shop CSV data

This script reads the Jaffle Shop CSV files and converts them to Parquet format for more efficient storage and querying in Snowflake.

## Generate Jaffle Shop Data (CSV)

To generate the Jaffle Shop CSV data, run the following command:

```bash
pipx run jafgen 6
```

This will create the necessary CSV files in the `jaffle-data` directory.

## Convert CSV to Parquet

To convert the generated CSV files to Parquet format, run the following script:

```bash
python convert_jaffle_csv_to_parquet.py
```

This will read each CSV file from the `jaffle-data` directory and save the corresponding Parquet files in the `jaffle-data/parquet` directory.

## Upload Parquet Files to GCP

To upload the Parquet files to your GCP bucket, use the following commands:

```bash
gcloud config set project getml-infra
gcloud storage cp jaffle-data/parquet/*.parquet gs://static.getml.com/datasets/jaffle_shop/
```
39 changes: 39 additions & 0 deletions integration/convert_jaffle_csv_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from pathlib import Path

import pandas as pd

NAMES: list[str] = [
"raw_customers",
"raw_items",
"raw_orders",
"raw_products",
"raw_stores",
"raw_supplies",
"raw_tweets",
]

JAFFLE_CSV_DATA_PATH = Path("jaffle-data")

if not JAFFLE_CSV_DATA_PATH.exists():
raise FileNotFoundError(
f"Jaffle CSV data path {JAFFLE_CSV_DATA_PATH} does not exist."
" Please run `jafgen` to generate CSVs."
)

JAFFLE_PARQUET_DATA_PATH = JAFFLE_CSV_DATA_PATH / "parquet"
Path.mkdir(JAFFLE_PARQUET_DATA_PATH, exist_ok=True)


for name in NAMES:
csv_filepath = JAFFLE_CSV_DATA_PATH / f"{name}.csv"
parquet_filepath = JAFFLE_PARQUET_DATA_PATH / f"{name}.parquet"
print(f"Loading {csv_filepath}...")

# 1. Read CSV into memory
df: pd.DataFrame = pd.read_csv(csv_filepath)

# 2. Write DataFrame to Parquet
# 'index=False' prevents pandas from adding an extra index column
df.to_parquet(parquet_filepath, index=False)

print(f"Converted {name} to parquet format at {parquet_filepath}.")
15 changes: 15 additions & 0 deletions integration/snowflake/.mcp.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"mcpServers": {
"context7": {
"type": "stdio",
"command": "npx",
"args": [
"-y",
"@upstash/context7-mcp",
"--api-key",
"ctx7sk-409c8344-c67b-4dd0-b067-b5c7527afcce"
],
"env": {}
}
}
}
74 changes: 74 additions & 0 deletions integration/snowflake/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Data operations for getML Feature Store integration with Snowflake.

When settings are provided to data loading and preparation functions,
infrastructure (warehouse, database) is automatically bootstrapped if needed.

Usage example:
from data import (
SnowflakeSettings,
create_session,
load_from_gcs,
create_weekly_sales_by_store_with_target,
get_table_names,
)

# Settings auto-load from SNOWFLAKE_* environment variables
settings = SnowflakeSettings.from_env()

with create_session(settings) as session:
# Load data from GCS - auto-bootstraps warehouse + database
# No GCP credentials required - files are fetched via HTTPS
load_from_gcs(session, settings=settings)

# Prepare weekly sales forecasting data
population_table = create_weekly_sales_by_store_with_target(
session,
settings=settings,
)

# Access tables for Arrow export
tables = get_table_names("RAW")
orders_arrow = session.table(tables["orders"]).to_arrow()
population_arrow = session.table(population_table).to_arrow()
"""

from snowflake.snowpark.exceptions import SnowparkSessionException

from ._bootstrap import (
BootstrapError,
ensure_infrastructure,
)
from ._settings import SnowflakeSettings
from ._snowflake_session import create_session
from ._sql_loader import load_sql
from .ingestion import (
DEFAULT_GCS_BUCKET,
JAFFLE_SHOP_TABLE_NAMES,
DataIngestionError,
get_table_names,
load_from_gcs,
load_from_s3,
)
from .preparation import (
DEFAULT_POPULATION_TABLE_NAME,
DataPreparationError,
create_weekly_sales_by_store_with_target,
)

__all__ = [
"DEFAULT_GCS_BUCKET",
"DEFAULT_POPULATION_TABLE_NAME",
"JAFFLE_SHOP_TABLE_NAMES",
"BootstrapError",
"DataIngestionError",
"DataPreparationError",
"SnowflakeSettings",
"SnowparkSessionException",
"create_session",
"create_weekly_sales_by_store_with_target",
"ensure_infrastructure",
"get_table_names",
"load_from_gcs",
"load_from_s3",
"load_sql",
]
Loading