From 016b64ad12e1aa08047985b390418a61c60e2385 Mon Sep 17 00:00:00 2001 From: Harish Cherian Date: Thu, 28 Aug 2025 12:36:27 -0400 Subject: [PATCH 1/4] feat: Add complete Apache Iceberg integration with AWS Glue Catalog support - Add comprehensive Iceberg functions library (libs/glue_functions/iceberg_glue_functions.py) - Implement production-ready Lambda handlers for Iceberg table operations - Add time travel queries and metadata access capabilities - Include advanced features: schema evolution, table history, snapshots - Provide complete ETL pipeline with data quality checks - Add CloudFormation infrastructure templates for AWS deployment - Include comprehensive usage examples and documentation - Support for reading Iceberg tables from AWS Glue Catalog - Full S3 integration with proper AWS credentials handling - Production deployment scripts and cleanup utilities Key Features: - Read Iceberg tables with time travel support - Query table metadata (history, snapshots, schema) - Complete Spark session configuration for Lambda - Error handling and comprehensive logging - Multiple Lambda handler templates for different use cases - Infrastructure as Code with CloudFormation - End-to-end testing and validation scripts --- .vscode/settings.json | 3 + ICEBERG_INTEGRATION_SUMMARY.md | 31 ++ examples/USAGE_GUIDE.md | 337 ++++++++++++++ examples/advanced-iceberg-features.py | 174 +++++++ examples/lambda-handler-templates.py | 431 ++++++++++++++++++ examples/production-etl-pipeline.py | 289 ++++++++++++ lambda-deployment/deploy-production-lambda.sh | 158 +++++++ lambda-deployment/spark-iceberg-reader.py | 309 +++++++++++++ lambda-deployment/spark-iceberg-reader.zip | Bin 0 -> 2638 bytes lambda_function.py | 108 +++++ libs/glue_functions/__init__.py | 11 + libs/glue_functions/iceberg_glue_functions.py | 281 ++++++++++++ spark-scripts/.DS_Store | Bin 6148 -> 0 bytes sparkLambdaHandler.py | 350 +++++++++++--- .../cleanup-test-environment.sh | 106 +++++ .../create-sample-iceberg-table.py | 159 +++++++ .../deploy-test-environment.sh | 198 ++++++++ test-infrastructure/iceberg-test-setup.yaml | 140 ++++++ 18 files changed, 3028 insertions(+), 57 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 ICEBERG_INTEGRATION_SUMMARY.md create mode 100644 examples/USAGE_GUIDE.md create mode 100644 examples/advanced-iceberg-features.py create mode 100644 examples/lambda-handler-templates.py create mode 100644 examples/production-etl-pipeline.py create mode 100755 lambda-deployment/deploy-production-lambda.sh create mode 100644 lambda-deployment/spark-iceberg-reader.py create mode 100644 lambda-deployment/spark-iceberg-reader.zip create mode 100644 lambda_function.py create mode 100644 libs/glue_functions/iceberg_glue_functions.py delete mode 100644 spark-scripts/.DS_Store create mode 100755 test-infrastructure/cleanup-test-environment.sh create mode 100644 test-infrastructure/create-sample-iceberg-table.py create mode 100755 test-infrastructure/deploy-test-environment.sh create mode 100644 test-infrastructure/iceberg-test-setup.yaml diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..3f836a2 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "kiroAgent.configureMCP": "Enabled" +} \ No newline at end of file diff --git a/ICEBERG_INTEGRATION_SUMMARY.md b/ICEBERG_INTEGRATION_SUMMARY.md new file mode 100644 index 0000000..12f4b97 --- /dev/null +++ b/ICEBERG_INTEGRATION_SUMMARY.md @@ -0,0 +1,31 @@ +# Iceberg Integration Summary + +## Core Integration Files + +### Essential Library +- `libs/glue_functions/iceberg_glue_functions.py` - Core Iceberg functions for Glue Catalog integration + +### Production Code +- `lambda-deployment/spark-iceberg-reader.py` - Production Lambda handler +- `lambda-deployment/deploy-production-lambda.sh` - Deployment script + +### Key Examples (Kept) +- `examples/advanced-iceberg-features.py` - Time travel and metadata queries +- `examples/lambda-handler-templates.py` - Production Lambda templates +- `examples/production-etl-pipeline.py` - Complete ETL pipeline +- `examples/USAGE_GUIDE.md` - Usage documentation + +### Infrastructure (Kept) +- `test-infrastructure/iceberg-test-setup.yaml` - CloudFormation template +- `test-infrastructure/create-sample-iceberg-table.py` - Table creation +- `test-infrastructure/deploy-test-environment.sh` - Environment deployment +- `test-infrastructure/cleanup-test-environment.sh` - Cleanup script + +## Removed Files +- All redundant example scripts +- Test and demo scripts +- Duplicate functionality files +- Temporary files and guides + +## Usage +Your Iceberg integration is now streamlined with only essential files for production use. \ No newline at end of file diff --git a/examples/USAGE_GUIDE.md b/examples/USAGE_GUIDE.md new file mode 100644 index 0000000..c32d3f8 --- /dev/null +++ b/examples/USAGE_GUIDE.md @@ -0,0 +1,337 @@ +# Iceberg Integration Usage Guide + +This guide shows you how to use the Iceberg integration code in your Spark on AWS Lambda applications. + +## ๐Ÿš€ Quick Start + +### 1. Basic Setup + +```python +# In your Lambda function +import sys +sys.path.append('/home/glue_functions') + +from iceberg_glue_functions import read_iceberg_table_with_spark +from pyspark.sql import SparkSession + +# Create Spark session (use the provided helper) +spark = create_iceberg_spark_session() + +# Read Iceberg table +df = spark.read.format("iceberg").load("glue_catalog.your_database.your_table") +``` + +### 2. Environment Variables + +Set these in your Lambda function: + +```bash +SCRIPT_BUCKET=your-s3-bucket +SPARK_SCRIPT=your-script.py +DATABASE_NAME=your_database +TABLE_NAME=your_table +AWS_REGION=us-east-1 +``` + +## ๐Ÿ“– Usage Examples + +### Example 1: Simple Data Reading + +```python +def lambda_handler(event, context): + spark = create_iceberg_spark_session() + + try: + # Read table + df = spark.read.format("iceberg").load("glue_catalog.analytics.customer_data") + + # Basic operations + print(f"Row count: {df.count()}") + df.show(10) + + # Filter data + recent_data = df.filter(col("created_date") >= "2024-01-01") + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'total_rows': df.count(), + 'recent_rows': recent_data.count() + }) + } + finally: + spark.stop() +``` + +### Example 2: Time Travel Queries + +```python +def lambda_handler(event, context): + spark = create_iceberg_spark_session() + + try: + # Current data + current_df = spark.read.format("iceberg").load("glue_catalog.sales.transactions") + + # Historical data (yesterday) + historical_df = spark.read.format("iceberg") \ + .option("as-of-timestamp", "2024-01-20 00:00:00.000") \ + .load("glue_catalog.sales.transactions") + + # Compare + current_count = current_df.count() + historical_count = historical_df.count() + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'current_transactions': current_count, + 'historical_transactions': historical_count, + 'new_transactions': current_count - historical_count + }) + } + finally: + spark.stop() +``` + +### Example 3: Data Processing Pipeline + +```python +def lambda_handler(event, context): + spark = create_iceberg_spark_session() + + try: + # Read source data + raw_df = spark.read.format("iceberg").load("glue_catalog.raw.events") + + # Process data + processed_df = raw_df \ + .filter(col("event_type") == "purchase") \ + .withColumn("processing_date", current_date()) \ + .groupBy("customer_id", "product_category") \ + .agg( + sum("amount").alias("total_spent"), + count("*").alias("purchase_count") + ) + + # Write to target table + processed_df.write \ + .format("iceberg") \ + .mode("overwrite") \ + .save("glue_catalog.analytics.customer_purchases") + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'processed_customers': processed_df.count(), + 'message': 'Processing completed successfully' + }) + } + finally: + spark.stop() +``` + +## ๐Ÿ”ง Configuration Options + +### Spark Session Configuration + +```python +spark = SparkSession.builder \ + .appName("Your-App-Name") \ + .master("local[*]") \ + .config("spark.driver.memory", "5g") \ + .config("spark.executor.memory", "5g") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ + .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ + .getOrCreate() +``` + +### Lambda Function Settings + +- **Memory**: 3008 MB (recommended for Spark workloads) +- **Timeout**: 15 minutes (maximum) +- **Runtime**: Use container image with Iceberg support +- **Environment Variables**: Set database and table names + +## ๐Ÿ“Š Available Functions + +### Basic Operations + +```python +# Read table +df = read_iceberg_table_with_spark(spark, "database", "table") + +# Get table metadata +metadata = get_iceberg_table_metadata("database", "table", "us-east-1") + +# Get table location +location = get_iceberg_table_location(metadata) +``` + +### Advanced Operations + +```python +# Time travel +historical_df = read_iceberg_table_at_timestamp(spark, "db", "table", "2024-01-01 00:00:00") + +# Snapshot queries +snapshot_df = read_iceberg_table_at_snapshot(spark, "db", "table", "snapshot_id") + +# Table history +history_df = query_iceberg_table_history(spark, "db", "table") + +# Table snapshots +snapshots_df = query_iceberg_table_snapshots(spark, "db", "table") +``` + +## ๐ŸŽฏ Event Formats + +### Simple Read Event + +```json +{ + "handler_type": "simple_reader", + "database": "analytics", + "table": "customer_data", + "limit": 100 +} +``` + +### Analytics Event + +```json +{ + "handler_type": "analytics", + "database": "sales", + "table": "transactions", + "filters": ["date >= '2024-01-01'", "amount > 100"], + "aggregations": { + "group_by": ["product_category"], + "metrics": ["sum(amount) as total_sales", "count(*) as transaction_count"] + } +} +``` + +### Time Travel Event + +```json +{ + "handler_type": "time_travel", + "database": "analytics", + "table": "customer_data", + "timestamp": "2024-01-15 10:00:00.000", + "compare_with_current": true +} +``` + +## ๐Ÿ” Error Handling + +```python +def lambda_handler(event, context): + spark = None + + try: + spark = create_iceberg_spark_session() + + # Your processing logic here + df = spark.read.format("iceberg").load("glue_catalog.db.table") + + return {'statusCode': 200, 'body': 'Success'} + + except Exception as e: + print(f"Error: {e}") + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + finally: + if spark: + spark.stop() +``` + +## ๐Ÿ“ˆ Performance Tips + +1. **Use appropriate filters** to reduce data volume +2. **Set proper memory allocation** (3008 MB recommended) +3. **Enable adaptive query execution** +4. **Use columnar operations** when possible +5. **Consider partitioning** for large tables + +## ๐Ÿ”— Integration Patterns + +### Event-Driven Processing + +```python +# Triggered by S3 events +def process_new_data(event, context): + for record in event['Records']: + bucket = record['s3']['bucket']['name'] + key = record['s3']['object']['key'] + + # Process new file and update Iceberg table + process_file(f"s3a://{bucket}/{key}") +``` + +### Scheduled Processing + +```python +# Triggered by CloudWatch Events +def daily_aggregation(event, context): + yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') + + # Process yesterday's data + df = spark.read.format("iceberg") \ + .load("glue_catalog.raw.events") \ + .filter(f"date = '{yesterday}'") + + # Aggregate and save + aggregated = df.groupBy("category").agg(sum("amount")) + aggregated.write.format("iceberg").mode("append").save("glue_catalog.analytics.daily_summary") +``` + +## ๐Ÿ› ๏ธ Troubleshooting + +### Common Issues + +1. **"Table not found"** + - Check database and table names + - Verify Glue Catalog permissions + +2. **"Access Denied"** + - Check S3 permissions for table location + - Verify IAM role has Glue access + +3. **Memory errors** + - Increase Lambda memory allocation + - Add filters to reduce data volume + +4. **Timeout errors** + - Optimize queries with filters + - Consider breaking into smaller chunks + +### Debug Commands + +```python +# Check table exists +metadata = get_iceberg_table_metadata("db", "table", "us-east-1") +print(f"Table type: {metadata['Table']['Parameters'].get('table_type')}") + +# Check table location +location = get_iceberg_table_location(metadata) +print(f"Location: {location}") + +# Test S3 access +s3_client = boto3.client('s3') +response = s3_client.list_objects_v2(Bucket='bucket', Prefix='prefix', MaxKeys=1) +print(f"S3 accessible: {response.get('KeyCount', 0) >= 0}") +``` + +## ๐Ÿ“š Next Steps + +1. **Start with simple examples** and gradually add complexity +2. **Test with small datasets** before scaling up +3. **Monitor CloudWatch logs** for debugging +4. **Set up proper error handling** and retry logic +5. **Consider cost optimization** with appropriate filtering \ No newline at end of file diff --git a/examples/advanced-iceberg-features.py b/examples/advanced-iceberg-features.py new file mode 100644 index 0000000..0b06f8e --- /dev/null +++ b/examples/advanced-iceberg-features.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +""" +Advanced example: Using Iceberg's time travel and metadata features +""" + +import os +import sys +from datetime import datetime, timedelta +from pyspark.sql import SparkSession +from pyspark.sql.functions import * + +sys.path.append('/home/glue_functions') +from iceberg_glue_functions import ( + read_iceberg_table_at_timestamp, + read_iceberg_table_at_snapshot, + query_iceberg_table_history, + query_iceberg_table_snapshots, + get_iceberg_table_metadata +) + +def create_spark_session(): + """Create Spark session for advanced Iceberg features""" + + aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] + aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] + session_token = os.environ['AWS_SESSION_TOKEN'] + + return SparkSession.builder \ + .appName("Advanced-Iceberg-Features") \ + .master("local[*]") \ + .config("spark.driver.bindAddress", "127.0.0.1") \ + .config("spark.driver.memory", "5g") \ + .config("spark.executor.memory", "5g") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ + .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ + .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \ + .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \ + .config("spark.hadoop.fs.s3a.session.token", session_token) \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \ + .getOrCreate() + +def time_travel_example(): + """Example: Time travel queries""" + + print("โฐ Time Travel Example") + print("=" * 30) + + spark = create_spark_session() + database_name = "analytics" + table_name = "customer_transactions" + + try: + # 1. Read current data + current_df = spark.read.format("iceberg").load(f"glue_catalog.{database_name}.{table_name}") + print(f"Current data count: {current_df.count()}") + + # 2. Read data as it was yesterday + yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S.000') + + historical_df = spark.read.format("iceberg") \ + .option("as-of-timestamp", yesterday) \ + .load(f"glue_catalog.{database_name}.{table_name}") + + print(f"Yesterday's data count: {historical_df.count()}") + + # 3. Compare changes + current_ids = set([row.customer_id for row in current_df.select("customer_id").collect()]) + historical_ids = set([row.customer_id for row in historical_df.select("customer_id").collect()]) + + new_customers = current_ids - historical_ids + print(f"New customers since yesterday: {len(new_customers)}") + + return current_df, historical_df + + finally: + spark.stop() + +def metadata_analysis_example(): + """Example: Analyzing table metadata and history""" + + print("๐Ÿ“Š Metadata Analysis Example") + print("=" * 35) + + spark = create_spark_session() + database_name = "analytics" + table_name = "customer_transactions" + + try: + # 1. Get table history + print("๐Ÿ“š Table History:") + history_df = query_iceberg_table_history(spark, database_name, table_name) + + # 2. Get snapshots + print("๐Ÿ“ธ Table Snapshots:") + snapshots_df = query_iceberg_table_snapshots(spark, database_name, table_name) + + # 3. Analyze table evolution + print("๐Ÿ“ˆ Table Evolution Analysis:") + + # Count operations by type + operation_counts = history_df.groupBy("operation").count().collect() + for row in operation_counts: + print(f" {row.operation}: {row.count} times") + + # Show recent changes + print("๐Ÿ• Recent Changes (last 5):") + recent_changes = history_df.orderBy(desc("made_current_at")).limit(5) + recent_changes.select("made_current_at", "operation", "snapshot_id").show(truncate=False) + + return history_df, snapshots_df + + finally: + spark.stop() + +def snapshot_comparison_example(): + """Example: Compare data between specific snapshots""" + + print("๐Ÿ” Snapshot Comparison Example") + print("=" * 40) + + spark = create_spark_session() + database_name = "analytics" + table_name = "customer_transactions" + + try: + # Get available snapshots + snapshots_df = spark.read.format("iceberg") \ + .load(f"glue_catalog.{database_name}.{table_name}.snapshots") + + snapshots = snapshots_df.select("snapshot_id", "committed_at").orderBy(desc("committed_at")).collect() + + if len(snapshots) >= 2: + # Compare latest two snapshots + latest_snapshot = snapshots[0].snapshot_id + previous_snapshot = snapshots[1].snapshot_id + + print(f"Comparing snapshots:") + print(f" Latest: {latest_snapshot}") + print(f" Previous: {previous_snapshot}") + + # Read data from both snapshots + latest_df = spark.read.format("iceberg") \ + .option("snapshot-id", latest_snapshot) \ + .load(f"glue_catalog.{database_name}.{table_name}") + + previous_df = spark.read.format("iceberg") \ + .option("snapshot-id", previous_snapshot) \ + .load(f"glue_catalog.{database_name}.{table_name}") + + # Compare counts + print(f"Latest snapshot count: {latest_df.count()}") + print(f"Previous snapshot count: {previous_df.count()}") + + # Find differences (example for transactions table) + if "transaction_id" in latest_df.columns: + latest_ids = latest_df.select("transaction_id").distinct() + previous_ids = previous_df.select("transaction_id").distinct() + + new_transactions = latest_ids.subtract(previous_ids) + print(f"New transactions: {new_transactions.count()}") + + return snapshots_df + + finally: + spark.stop() + +if __name__ == "__main__": + # Run advanced examples + time_travel_example() + metadata_analysis_example() + snapshot_comparison_example() \ No newline at end of file diff --git a/examples/lambda-handler-templates.py b/examples/lambda-handler-templates.py new file mode 100644 index 0000000..1ab2d75 --- /dev/null +++ b/examples/lambda-handler-templates.py @@ -0,0 +1,431 @@ +#!/usr/bin/env python3 +""" +Lambda Handler Templates for different Iceberg use cases +""" + +import json +import os +import sys +from datetime import datetime +from pyspark.sql import SparkSession +from pyspark.sql.functions import * + +sys.path.append('/home/glue_functions') +from iceberg_glue_functions import ( + read_iceberg_table_with_spark, + read_iceberg_table_at_timestamp, + query_iceberg_table_history +) + +def create_spark_session(): + """Standard Spark session for Lambda""" + + aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] + aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] + session_token = os.environ['AWS_SESSION_TOKEN'] + + return SparkSession.builder \ + .appName("Lambda-Iceberg-Handler") \ + .master("local[*]") \ + .config("spark.driver.bindAddress", "127.0.0.1") \ + .config("spark.driver.memory", "5g") \ + .config("spark.executor.memory", "5g") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ + .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ + .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \ + .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \ + .config("spark.hadoop.fs.s3a.session.token", session_token) \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \ + .getOrCreate() + +# Template 1: Simple Data Reader +def simple_reader_handler(event, context): + """ + Template for simple Iceberg table reading + + Event format: + { + "database": "your_database", + "table": "your_table", + "limit": 100 + } + """ + + database = event.get('database') + table = event.get('table') + limit = event.get('limit', 100) + + if not database or not table: + return { + 'statusCode': 400, + 'body': json.dumps({'error': 'database and table are required'}) + } + + spark = create_spark_session() + + try: + # Read table + df = spark.read.format("iceberg").load(f"glue_catalog.{database}.{table}") + + # Get sample data + sample_data = df.limit(limit).collect() + + # Convert to JSON-serializable format + result = [] + for row in sample_data: + result.append(row.asDict()) + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'database': database, + 'table': table, + 'row_count': df.count(), + 'sample_data': result + }) + } + + except Exception as e: + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + finally: + spark.stop() + +# Template 2: Filtered Analytics +def analytics_handler(event, context): + """ + Template for analytical queries on Iceberg tables + + Event format: + { + "database": "analytics", + "table": "sales_data", + "filters": ["date >= '2024-01-01'", "region = 'US'"], + "aggregations": { + "group_by": ["product_category"], + "metrics": ["sum(sales_amount) as total_sales", "count(*) as transaction_count"] + } + } + """ + + database = event.get('database') + table = event.get('table') + filters = event.get('filters', []) + aggregations = event.get('aggregations', {}) + + spark = create_spark_session() + + try: + # Read table + df = spark.read.format("iceberg").load(f"glue_catalog.{database}.{table}") + + # Apply filters + for filter_condition in filters: + df = df.filter(filter_condition) + + # Apply aggregations if specified + if aggregations: + group_by = aggregations.get('group_by', []) + metrics = aggregations.get('metrics', []) + + if group_by and metrics: + df = df.groupBy(*group_by).agg(*[expr(metric) for metric in metrics]) + + # Collect results + results = df.collect() + + # Convert to JSON + result_data = [] + for row in results: + result_data.append(row.asDict()) + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'database': database, + 'table': table, + 'filters_applied': filters, + 'result_count': len(result_data), + 'results': result_data + }) + } + + except Exception as e: + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + finally: + spark.stop() + +# Template 3: Time Travel Query +def time_travel_handler(event, context): + """ + Template for time travel queries + + Event format: + { + "database": "analytics", + "table": "customer_data", + "timestamp": "2024-01-15 10:00:00.000", + "compare_with_current": true + } + """ + + database = event.get('database') + table = event.get('table') + timestamp = event.get('timestamp') + compare_with_current = event.get('compare_with_current', False) + + spark = create_spark_session() + + try: + table_identifier = f"glue_catalog.{database}.{table}" + + # Read historical data + historical_df = spark.read.format("iceberg") \ + .option("as-of-timestamp", timestamp) \ + .load(table_identifier) + + historical_count = historical_df.count() + + result = { + 'database': database, + 'table': table, + 'timestamp': timestamp, + 'historical_count': historical_count + } + + # Compare with current if requested + if compare_with_current: + current_df = spark.read.format("iceberg").load(table_identifier) + current_count = current_df.count() + + result.update({ + 'current_count': current_count, + 'difference': current_count - historical_count + }) + + return { + 'statusCode': 200, + 'body': json.dumps(result) + } + + except Exception as e: + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + finally: + spark.stop() + +# Template 4: Data Quality Checker +def data_quality_handler(event, context): + """ + Template for data quality checks on Iceberg tables + + Event format: + { + "database": "data_lake", + "table": "customer_records", + "checks": [ + {"type": "row_count", "min": 1000}, + {"type": "null_check", "columns": ["customer_id", "email"]}, + {"type": "duplicate_check", "columns": ["customer_id"]}, + {"type": "value_range", "column": "age", "min": 0, "max": 120} + ] + } + """ + + database = event.get('database') + table = event.get('table') + checks = event.get('checks', []) + + spark = create_spark_session() + + try: + # Read table + df = spark.read.format("iceberg").load(f"glue_catalog.{database}.{table}") + + quality_results = [] + + for check in checks: + check_result = {'type': check['type'], 'status': 'passed'} + + if check['type'] == 'row_count': + count = df.count() + min_count = check.get('min', 0) + max_count = check.get('max', float('inf')) + + if not (min_count <= count <= max_count): + check_result['status'] = 'failed' + check_result['message'] = f"Row count {count} outside range [{min_count}, {max_count}]" + else: + check_result['message'] = f"Row count: {count}" + + elif check['type'] == 'null_check': + columns = check['columns'] + null_counts = {} + + for column in columns: + null_count = df.filter(col(column).isNull()).count() + null_counts[column] = null_count + + if null_count > 0: + check_result['status'] = 'failed' + + check_result['null_counts'] = null_counts + + elif check['type'] == 'duplicate_check': + columns = check['columns'] + total_count = df.count() + distinct_count = df.select(*columns).distinct().count() + + if total_count != distinct_count: + check_result['status'] = 'failed' + check_result['message'] = f"Found {total_count - distinct_count} duplicates" + else: + check_result['message'] = "No duplicates found" + + elif check['type'] == 'value_range': + column = check['column'] + min_val = check.get('min') + max_val = check.get('max') + + out_of_range = df.filter( + (col(column) < min_val) | (col(column) > max_val) + ).count() + + if out_of_range > 0: + check_result['status'] = 'failed' + check_result['message'] = f"{out_of_range} values outside range [{min_val}, {max_val}]" + else: + check_result['message'] = f"All values within range [{min_val}, {max_val}]" + + quality_results.append(check_result) + + # Overall status + overall_status = 'passed' if all(r['status'] == 'passed' for r in quality_results) else 'failed' + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'database': database, + 'table': table, + 'overall_status': overall_status, + 'checks': quality_results + }) + } + + except Exception as e: + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + finally: + spark.stop() + +# Template 5: Event-Driven Processing +def event_driven_handler(event, context): + """ + Template for event-driven processing (e.g., S3 trigger) + + Event format (S3 event): + { + "Records": [ + { + "s3": { + "bucket": {"name": "my-bucket"}, + "object": {"key": "data/new-file.parquet"} + } + } + ], + "processing_config": { + "target_database": "processed_data", + "target_table": "aggregated_metrics" + } + } + """ + + spark = create_spark_session() + + try: + # Process S3 events + if 'Records' in event: + for record in event['Records']: + if 's3' in record: + bucket = record['s3']['bucket']['name'] + key = record['s3']['object']['key'] + + print(f"Processing file: s3://{bucket}/{key}") + + # Read the new file + file_df = spark.read.parquet(f"s3a://{bucket}/{key}") + + # Process the data (example: simple aggregation) + processed_df = file_df.groupBy("category") \ + .agg( + count("*").alias("record_count"), + sum("amount").alias("total_amount") + ) \ + .withColumn("processed_at", current_timestamp()) + + # Write to target Iceberg table + config = event.get('processing_config', {}) + target_db = config.get('target_database', 'processed_data') + target_table = config.get('target_table', 'processed_metrics') + + processed_df.write \ + .format("iceberg") \ + .mode("append") \ + .save(f"glue_catalog.{target_db}.{target_table}") + + print(f"Processed {processed_df.count()} records") + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'message': 'Event processing completed', + 'processed_files': len(event.get('Records', [])) + }) + } + + except Exception as e: + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + finally: + spark.stop() + +# Main handler router +def lambda_handler(event, context): + """ + Main handler that routes to different templates based on event type + """ + + handler_type = event.get('handler_type', 'simple_reader') + + handlers = { + 'simple_reader': simple_reader_handler, + 'analytics': analytics_handler, + 'time_travel': time_travel_handler, + 'data_quality': data_quality_handler, + 'event_driven': event_driven_handler + } + + if handler_type in handlers: + return handlers[handler_type](event, context) + else: + return { + 'statusCode': 400, + 'body': json.dumps({ + 'error': f'Unknown handler type: {handler_type}', + 'available_types': list(handlers.keys()) + }) + } \ No newline at end of file diff --git a/examples/production-etl-pipeline.py b/examples/production-etl-pipeline.py new file mode 100644 index 0000000..9c3d38b --- /dev/null +++ b/examples/production-etl-pipeline.py @@ -0,0 +1,289 @@ +#!/usr/bin/env python3 +""" +Production ETL Pipeline: Complete example for processing Iceberg tables in Lambda +""" + +import os +import sys +import json +from datetime import datetime +from pyspark.sql import SparkSession +from pyspark.sql.functions import * +from pyspark.sql.types import * + +sys.path.append('/home/glue_functions') +from iceberg_glue_functions import ( + read_iceberg_table_with_spark, + get_iceberg_table_metadata +) + +def create_production_spark_session(app_name="Production-ETL"): + """Production-ready Spark session configuration""" + + aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] + aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] + session_token = os.environ['AWS_SESSION_TOKEN'] + aws_region = os.environ.get('AWS_REGION', 'us-east-1') + + return SparkSession.builder \ + .appName(app_name) \ + .master("local[*]") \ + .config("spark.driver.bindAddress", "127.0.0.1") \ + .config("spark.driver.memory", "5g") \ + .config("spark.executor.memory", "5g") \ + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ + .config("spark.sql.adaptive.enabled", "true") \ + .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ + .config("spark.sql.adaptive.skewJoin.enabled", "true") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ + .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ + .config("spark.sql.catalog.glue_catalog.glue.region", aws_region) \ + .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \ + .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \ + .config("spark.hadoop.fs.s3a.session.token", session_token) \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \ + .getOrCreate() + +class IcebergETLPipeline: + """Production ETL Pipeline for Iceberg tables""" + + def __init__(self, config): + self.config = config + self.spark = create_production_spark_session(config.get('app_name', 'ETL-Pipeline')) + + def read_source_table(self, database, table, filters=None): + """Read source Iceberg table with optional filters""" + + print(f"๐Ÿ“– Reading source table: {database}.{table}") + + try: + df = self.spark.read.format("iceberg").load(f"glue_catalog.{database}.{table}") + + # Apply filters if provided + if filters: + for filter_condition in filters: + df = df.filter(filter_condition) + print(f" Applied filter: {filter_condition}") + + row_count = df.count() + print(f" Loaded {row_count} rows") + + return df + + except Exception as e: + print(f"โŒ Error reading table {database}.{table}: {e}") + raise + + def transform_data(self, df, transformations): + """Apply data transformations""" + + print("๐Ÿ”„ Applying transformations...") + + for i, transform in enumerate(transformations, 1): + print(f" Transformation {i}: {transform['description']}") + + if transform['type'] == 'add_column': + df = df.withColumn(transform['column'], expr(transform['expression'])) + + elif transform['type'] == 'filter': + df = df.filter(transform['condition']) + + elif transform['type'] == 'aggregate': + df = df.groupBy(*transform['group_by']) \ + .agg(*[expr(agg) for agg in transform['aggregations']]) + + elif transform['type'] == 'join': + other_df = self.read_source_table( + transform['join_table']['database'], + transform['join_table']['table'] + ) + df = df.join(other_df, transform['join_condition'], transform['join_type']) + + elif transform['type'] == 'custom': + # Custom transformation function + df = transform['function'](df) + + final_count = df.count() + print(f" Final row count: {final_count}") + + return df + + def write_to_target(self, df, target_config): + """Write processed data to target location""" + + print(f"๐Ÿ’พ Writing to target: {target_config['location']}") + + writer = df.write.mode(target_config.get('mode', 'overwrite')) + + # Configure output format + if target_config['format'] == 'iceberg': + # Write to Iceberg table + table_identifier = f"glue_catalog.{target_config['database']}.{target_config['table']}" + + writer = writer.format("iceberg") + + # Add Iceberg-specific options + if 'iceberg_options' in target_config: + for key, value in target_config['iceberg_options'].items(): + writer = writer.option(key, value) + + writer.save(table_identifier) + + elif target_config['format'] == 'parquet': + writer.format("parquet").save(target_config['location']) + + elif target_config['format'] == 'delta': + writer.format("delta").save(target_config['location']) + + print(f" โœ… Data written successfully") + + def run_pipeline(self): + """Execute the complete ETL pipeline""" + + print("๐Ÿš€ Starting ETL Pipeline") + print("=" * 50) + + try: + # Step 1: Read source data + source_df = self.read_source_table( + self.config['source']['database'], + self.config['source']['table'], + self.config['source'].get('filters') + ) + + # Step 2: Apply transformations + if 'transformations' in self.config: + transformed_df = self.transform_data(source_df, self.config['transformations']) + else: + transformed_df = source_df + + # Step 3: Data quality checks + if 'quality_checks' in self.config: + self.run_quality_checks(transformed_df, self.config['quality_checks']) + + # Step 4: Write to target + self.write_to_target(transformed_df, self.config['target']) + + print("๐ŸŽ‰ ETL Pipeline completed successfully!") + + return { + 'status': 'success', + 'rows_processed': transformed_df.count(), + 'timestamp': datetime.now().isoformat() + } + + except Exception as e: + print(f"โŒ ETL Pipeline failed: {e}") + return { + 'status': 'failed', + 'error': str(e), + 'timestamp': datetime.now().isoformat() + } + + finally: + self.spark.stop() + + def run_quality_checks(self, df, checks): + """Run data quality checks""" + + print("๐Ÿ” Running data quality checks...") + + for check in checks: + if check['type'] == 'row_count': + count = df.count() + min_rows = check.get('min_rows', 0) + max_rows = check.get('max_rows', float('inf')) + + if not (min_rows <= count <= max_rows): + raise ValueError(f"Row count {count} outside expected range [{min_rows}, {max_rows}]") + + print(f" โœ… Row count check passed: {count} rows") + + elif check['type'] == 'null_check': + for column in check['columns']: + null_count = df.filter(col(column).isNull()).count() + if null_count > 0: + raise ValueError(f"Found {null_count} null values in column {column}") + + print(f" โœ… Null check passed for columns: {check['columns']}") + + elif check['type'] == 'custom': + # Custom quality check function + check['function'](df) + +# Example usage in Lambda handler +def lambda_handler(event, context): + """Lambda handler for ETL pipeline""" + + # Example configuration + etl_config = { + 'app_name': 'Customer-Analytics-ETL', + 'source': { + 'database': 'raw_data', + 'table': 'customer_events', + 'filters': [ + "event_date >= '2024-01-01'", + "event_type IN ('purchase', 'signup')" + ] + }, + 'transformations': [ + { + 'type': 'add_column', + 'description': 'Add processing timestamp', + 'column': 'processed_at', + 'expression': 'current_timestamp()' + }, + { + 'type': 'aggregate', + 'description': 'Aggregate by customer and event type', + 'group_by': ['customer_id', 'event_type'], + 'aggregations': [ + 'count(*) as event_count', + 'sum(amount) as total_amount', + 'max(event_date) as last_event_date' + ] + } + ], + 'quality_checks': [ + { + 'type': 'row_count', + 'min_rows': 1 + }, + { + 'type': 'null_check', + 'columns': ['customer_id', 'event_type'] + } + ], + 'target': { + 'format': 'iceberg', + 'database': 'analytics', + 'table': 'customer_summary', + 'mode': 'overwrite', + 'iceberg_options': { + 'write.format.default': 'parquet', + 'write.parquet.compression-codec': 'snappy' + } + } + } + + # Override config with event parameters if provided + if 'config' in event: + etl_config.update(event['config']) + + # Run pipeline + pipeline = IcebergETLPipeline(etl_config) + result = pipeline.run_pipeline() + + return { + 'statusCode': 200 if result['status'] == 'success' else 500, + 'body': json.dumps(result) + } + +if __name__ == "__main__": + # Test the pipeline locally + test_event = {} + result = lambda_handler(test_event, None) + print(json.dumps(result, indent=2)) \ No newline at end of file diff --git a/lambda-deployment/deploy-production-lambda.sh b/lambda-deployment/deploy-production-lambda.sh new file mode 100755 index 0000000..eab5ff0 --- /dev/null +++ b/lambda-deployment/deploy-production-lambda.sh @@ -0,0 +1,158 @@ +#!/bin/bash + +# Deploy Production Iceberg Lambda Function +set -e + +# Configuration +FUNCTION_NAME="spark-iceberg-production" +AWS_REGION="us-east-1" +AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) +ECR_REPO_NAME="sparkonlambda-iceberg" +IMAGE_URI="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${ECR_REPO_NAME}:latest" + +# Colors +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +echo -e "${BLUE}๐Ÿš€ Deploying Production Iceberg Lambda Function${NC}" +echo "==================================================" + +# Get Lambda role ARN from CloudFormation +LAMBDA_ROLE_ARN=$(aws cloudformation describe-stacks \ + --stack-name spark-lambda-iceberg-test \ + --query 'Stacks[0].Outputs[?OutputKey==`LambdaRoleArn`].OutputValue' \ + --output text \ + --region $AWS_REGION) + +echo -e "${BLUE}๐Ÿ“‹ Configuration:${NC}" +echo " Function Name: $FUNCTION_NAME" +echo " AWS Region: $AWS_REGION" +echo " AWS Account: $AWS_ACCOUNT_ID" +echo " Image URI: $IMAGE_URI" +echo " Lambda Role: $LAMBDA_ROLE_ARN" +echo "" + +# Step 1: Update the Lambda handler in the Docker image +echo -e "${YELLOW}๐Ÿ“ฆ Step 1: Updating Lambda handler in container...${NC}" + +# Copy the new handler to the project +cp lambda-deployment/spark-iceberg-reader.py sparkLambdaHandler.py + +echo -e "${GREEN}โœ… Updated Lambda handler${NC}" + +# Step 2: Rebuild and push Docker image +echo -e "${YELLOW}๐Ÿณ Step 2: Rebuilding Docker image...${NC}" + +# Login to ECR +aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com + +# Build new image +docker build --build-arg FRAMEWORK=ICEBERG -t $ECR_REPO_NAME . + +# Tag and push +docker tag $ECR_REPO_NAME:latest $IMAGE_URI +docker push $IMAGE_URI + +echo -e "${GREEN}โœ… Docker image rebuilt and pushed${NC}" + +# Step 3: Create or update Lambda function +echo -e "${YELLOW}โšก Step 3: Creating/updating Lambda function...${NC}" + +# Check if function exists +if aws lambda get-function --function-name $FUNCTION_NAME --region $AWS_REGION > /dev/null 2>&1; then + echo -e "${YELLOW}โš ๏ธ Function exists. Updating...${NC}" + + # Update function code + aws lambda update-function-code \ + --function-name $FUNCTION_NAME \ + --image-uri $IMAGE_URI \ + --region $AWS_REGION + + # Update function configuration + aws lambda update-function-configuration \ + --function-name $FUNCTION_NAME \ + --role $LAMBDA_ROLE_ARN \ + --timeout 900 \ + --memory-size 3008 \ + --environment Variables='{"DATABASE_NAME":"iceberg_test_db","TABLE_NAME":"sample_customers","AWS_REGION":"'$AWS_REGION'"}' \ + --region $AWS_REGION + +else + echo -e "${YELLOW}๐Ÿ†• Creating new Lambda function...${NC}" + + aws lambda create-function \ + --function-name $FUNCTION_NAME \ + --role $LAMBDA_ROLE_ARN \ + --code ImageUri=$IMAGE_URI \ + --package-type Image \ + --timeout 900 \ + --memory-size 3008 \ + --environment Variables='{"DATABASE_NAME":"iceberg_test_db","TABLE_NAME":"sample_customers","AWS_REGION":"'$AWS_REGION'"}' \ + --region $AWS_REGION +fi + +echo -e "${GREEN}โœ… Lambda function deployed: $FUNCTION_NAME${NC}" + +# Step 4: Test the function +echo -e "${YELLOW}๐Ÿงช Step 4: Testing the Lambda function...${NC}" + +# Create test event +cat > test-event.json << EOF +{ + "operation": "read_table", + "database": "iceberg_test_db", + "table": "sample_customers", + "limit": 5, + "include_analytics": true +} +EOF + +# Invoke function +echo -e "${BLUE}๐Ÿ“ค Invoking Lambda function...${NC}" +aws lambda invoke \ + --function-name $FUNCTION_NAME \ + --payload file://test-event.json \ + --region $AWS_REGION \ + response.json + +# Check response +if [ $? -eq 0 ]; then + echo -e "${GREEN}โœ… Lambda invocation successful${NC}" + echo -e "${BLUE}๐Ÿ“„ Response:${NC}" + cat response.json | jq '.' 2>/dev/null || cat response.json + echo "" +else + echo -e "${RED}โŒ Lambda invocation failed${NC}" +fi + +# Step 5: Show CloudWatch logs +echo -e "${YELLOW}๐Ÿ“‹ Step 5: Recent CloudWatch logs:${NC}" +aws logs tail /aws/lambda/$FUNCTION_NAME --since 2m --region $AWS_REGION + +echo "" +echo -e "${BLUE}๐ŸŽ‰ Production Lambda Deployment Complete!${NC}" +echo "==================================================" +echo "" +echo -e "${YELLOW}๐Ÿ“Š Function Details:${NC}" +echo " โ€ข Function Name: $FUNCTION_NAME" +echo " โ€ข Memory: 3008 MB" +echo " โ€ข Timeout: 15 minutes" +echo " โ€ข Runtime: Container (Spark + Iceberg)" +echo "" +echo -e "${YELLOW}๐Ÿ”— Useful Commands:${NC}" +echo " โ€ข Test function:" +echo " aws lambda invoke --function-name $FUNCTION_NAME --payload file://test-event.json response.json" +echo "" +echo " โ€ข View logs:" +echo " aws logs tail /aws/lambda/$FUNCTION_NAME --follow --region $AWS_REGION" +echo "" +echo " โ€ข Update function:" +echo " ./lambda-deployment/deploy-production-lambda.sh" +echo "" +echo -e "${GREEN}โœ… Your Iceberg Lambda function is ready for production use!${NC}" + +# Cleanup +rm -f test-event.json \ No newline at end of file diff --git a/lambda-deployment/spark-iceberg-reader.py b/lambda-deployment/spark-iceberg-reader.py new file mode 100644 index 0000000..2dc57ca --- /dev/null +++ b/lambda-deployment/spark-iceberg-reader.py @@ -0,0 +1,309 @@ +#!/usr/bin/env python3 +""" +Production Lambda function for reading Iceberg tables from Glue Catalog +This is the actual handler that will run in the Lambda container +""" + +import json +import logging +import os +import sys +from datetime import datetime + +from pyspark.sql import SparkSession +from pyspark.sql.functions import * + +# Add glue functions to path +sys.path.append('/home/glue_functions') + +# Set up logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def create_iceberg_spark_session(): + """Create Spark session optimized for Lambda with Iceberg support""" + + aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] + aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] + session_token = os.environ['AWS_SESSION_TOKEN'] + aws_region = os.environ.get('AWS_REGION', 'us-east-1') + + logger.info("Creating Spark session with Iceberg configuration...") + + spark = SparkSession.builder \ + .appName("Lambda-Iceberg-Reader") \ + .master("local[*]") \ + .config("spark.driver.bindAddress", "127.0.0.1") \ + .config("spark.driver.memory", "5g") \ + .config("spark.executor.memory", "5g") \ + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ + .config("spark.sql.adaptive.enabled", "true") \ + .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ + .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ + .config("spark.sql.catalog.glue_catalog.glue.region", aws_region) \ + .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \ + .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \ + .config("spark.hadoop.fs.s3a.session.token", session_token) \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \ + .getOrCreate() + + logger.info("Spark session created successfully") + return spark + +def read_iceberg_table(spark, database_name, table_name, limit=None, filters=None): + """Read Iceberg table with optional filters and limit""" + + table_identifier = f"glue_catalog.{database_name}.{table_name}" + logger.info(f"Reading Iceberg table: {table_identifier}") + + try: + # Read the table + df = spark.read.format("iceberg").load(table_identifier) + + # Apply filters if provided + if filters: + for filter_condition in filters: + df = df.filter(filter_condition) + logger.info(f"Applied filter: {filter_condition}") + + # Apply limit if provided + if limit: + df = df.limit(limit) + + return df + + except Exception as e: + logger.error(f"Error reading table {table_identifier}: {e}") + raise + +def get_table_analytics(df): + """Get basic analytics from the DataFrame""" + + logger.info("Computing table analytics...") + + try: + # Basic stats + total_count = df.count() + + # Get numeric columns for analytics + numeric_columns = [] + for field in df.schema.fields: + if field.dataType.typeName() in ['integer', 'long', 'double', 'decimal', 'float']: + numeric_columns.append(field.name) + + analytics = { + 'total_rows': total_count, + 'columns': len(df.columns), + 'column_names': df.columns + } + + # Add numeric analytics if available + if numeric_columns: + for col_name in numeric_columns: + try: + stats = df.agg( + avg(col_name).alias('avg'), + min(col_name).alias('min'), + max(col_name).alias('max') + ).collect()[0] + + analytics[f'{col_name}_stats'] = { + 'avg': float(stats['avg']) if stats['avg'] else None, + 'min': float(stats['min']) if stats['min'] else None, + 'max': float(stats['max']) if stats['max'] else None + } + except Exception as e: + logger.warning(f"Could not compute stats for {col_name}: {e}") + + return analytics + + except Exception as e: + logger.error(f"Error computing analytics: {e}") + return {'total_rows': 0, 'error': str(e)} + +def convert_rows_to_json(rows): + """Convert Spark rows to JSON-serializable format""" + + results = [] + for row in rows: + row_dict = {} + for field in row.__fields__: + value = getattr(row, field) + + # Handle different data types + if value is None: + row_dict[field] = None + elif hasattr(value, 'isoformat'): # datetime + row_dict[field] = value.isoformat() + elif str(type(value)) == "": + row_dict[field] = float(value) + elif str(type(value)) == "": + row_dict[field] = value.isoformat() + else: + row_dict[field] = value + + results.append(row_dict) + + return results + +def lambda_handler(event, context): + """ + Main Lambda handler for Iceberg table operations + + Event format: + { + "operation": "read_table", + "database": "iceberg_test_db", + "table": "sample_customers", + "limit": 10, + "filters": ["total_spent > 300"], + "include_analytics": true + } + """ + + logger.info("๐Ÿš€ Starting Iceberg Lambda handler") + logger.info(f"Event: {json.dumps(event)}") + + # Parse event parameters + operation = event.get('operation', 'read_table') + database_name = event.get('database', os.environ.get('DATABASE_NAME', 'iceberg_test_db')) + table_name = event.get('table', os.environ.get('TABLE_NAME', 'sample_customers')) + limit = event.get('limit', 10) + filters = event.get('filters', []) + include_analytics = event.get('include_analytics', True) + + logger.info(f"Operation: {operation}") + logger.info(f"Target table: {database_name}.{table_name}") + + spark = None + + try: + # Create Spark session + spark = create_iceberg_spark_session() + + if operation == 'read_table': + # Read table data + df = read_iceberg_table(spark, database_name, table_name, limit, filters) + + # Get sample data + sample_rows = df.collect() + sample_data = convert_rows_to_json(sample_rows) + + # Prepare response + response_body = { + 'message': 'Successfully read Iceberg table', + 'database': database_name, + 'table': table_name, + 'operation': operation, + 'filters_applied': filters, + 'sample_data': sample_data, + 'sample_count': len(sample_data), + 'timestamp': datetime.now().isoformat() + } + + # Add analytics if requested + if include_analytics: + # Read full table for analytics (without limit) + full_df = read_iceberg_table(spark, database_name, table_name, filters=filters) + analytics = get_table_analytics(full_df) + response_body['analytics'] = analytics + + logger.info(f"โœ… Successfully processed {len(sample_data)} rows") + + return { + 'statusCode': 200, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps(response_body) + } + + elif operation == 'table_info': + # Get table information only + df = read_iceberg_table(spark, database_name, table_name, limit=1) + + schema_info = [] + for field in df.schema.fields: + schema_info.append({ + 'name': field.name, + 'type': str(field.dataType), + 'nullable': field.nullable + }) + + # Get full count + full_df = read_iceberg_table(spark, database_name, table_name) + total_count = full_df.count() + + response_body = { + 'message': 'Table information retrieved', + 'database': database_name, + 'table': table_name, + 'total_rows': total_count, + 'schema': schema_info, + 'timestamp': datetime.now().isoformat() + } + + return { + 'statusCode': 200, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps(response_body) + } + + else: + return { + 'statusCode': 400, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps({ + 'error': f'Unknown operation: {operation}', + 'supported_operations': ['read_table', 'table_info'] + }) + } + + except Exception as e: + logger.error(f"โŒ Lambda execution failed: {str(e)}") + + return { + 'statusCode': 500, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps({ + 'error': str(e), + 'message': 'Iceberg table operation failed', + 'timestamp': datetime.now().isoformat() + }) + } + + finally: + if spark: + logger.info("๐Ÿ”ง Stopping Spark session") + spark.stop() + +# For testing locally +if __name__ == "__main__": + # Test event + test_event = { + "operation": "read_table", + "database": "iceberg_test_db", + "table": "sample_customers", + "limit": 5, + "include_analytics": True + } + + # Mock context + class MockContext: + def __init__(self): + self.function_name = "test-iceberg-reader" + self.memory_limit_in_mb = 3008 + self.invoked_function_arn = "arn:aws:lambda:us-east-1:123456789012:function:test" + + result = lambda_handler(test_event, MockContext()) + print(json.dumps(result, indent=2)) \ No newline at end of file diff --git a/lambda-deployment/spark-iceberg-reader.zip b/lambda-deployment/spark-iceberg-reader.zip new file mode 100644 index 0000000000000000000000000000000000000000..7db13a47af08193a94932c7af775b09c947d5b1f GIT binary patch literal 2638 zcmb8xXE@u79tZG$NbI5`_Keo5y?2WS5wT~@SV0gqN{$-QgDTY;RW(9TQKM)%)ZUF% zhbS%8T2)Fb_Na1u?tOQk=f1el@A*Ez@2l_Yk0p{Gd5Z24_6Z5Z;e&jF!{x(5(4kmae=i?TpHPf!s1MrPCsZy3Z)eL4(AS05 z`qlqVL=-Ck0v~_?;NR0n4{8HOS09g{!M3IKsw$qO^#k zc=18?1afqJtWelsYdzO zkIgdgxb-Zl4^y1#=PjPL*<)W|)M7x$4Z@XR(9VyQ?~FK{yywLWTW?erR9N~7d=Cgr zucG8!YnzPcscYe%a${hdJZoZX;|ezrGn;g233g`#_$ZF;Lc)|DQg|_qEcs}CSN>W@ zt00)$-90u!HDo`f^u6HaaJBSReSw#X!{v;b%=nP)j+6;=d=;!vyz%#NP?)>_uRf6@aic@m-d$8qI%|*WW!`zfU zeokf--V_;iSPgX((sO{xX}cjRmp7l>CiXFhsm{cr6cq+Z1*076!Pi~e8a*S9Z)PS) zr;3yrq>&NC%c*%_-Yk4hrhblc;{efs;hw{{t1D8Zgu0|3GQ&{}L!Hk=WDki?3eqUL z+aozBHJm>zmtE%j%zS#e6kjll$%v)3m+N?WusoJY_vJvMrJYYoeRj;t-sRLLY8(Ws8vuE zt%LPly~N$W0#Fqbq+-?Z<4jEW&?kk$Go3Ia!>=>(@4zhh%rWbF$dyc%a%pojdfqjQW9#>Anvlru^tHf%Ob`cL< zKGoBNTF9nn54!JoCODxhD!oKMpxX>PozF8Z(tC3Iam}cCio_>eNz7B&O(?3z8yF_t zATfG^HHq@YQ*^?2bYEz!897V}%Ws;7)EaZj%s->5^H(P+fKt6H(P2SO*Zyr0xbE3htv>U*-Kix|*hqwqA%>o!5 z>vL^DN4Y_V{FG6wj0O+d2<;IxFQ1Q8y?aWlejDQqP0%@m;8P(c5_lg@oD=KIa}kRV z&tUY+FGyBk8PjJvNF_Iu)UJTh)qRuCW;gmjDq{ZdJ>S<4Vq!@cA@tVVE?b;b(!GsB z9aVf4XYH4h{6gP{de$fitGy(sia;&0O}r3EuG7$7yj;x41n)Jl=eaKp#`l8)dM%Tw z6gu4Dm$AePx>(rsxkcJ*n7ceB$5&eE+g;;l(<08rCqpf{;tH+L&a;ik#l&|z?mcN1 zK;pQ!*|mIg$WgXfqaw{=-U~GKhch0YpWge+Tt8&Mrt+E#wdB0UnsO7`bgHT+E>R~5 z*CF4Y_WQ27(3pl9_`Fd=s;qCok|SaeVV<7gkzcHA`;o#Qwd8S(f7BL#C|4Lpet-N3 zlVRyY8$M0uhk1M~MVu7Q4cxZ?v0JHaUEwI~Q*$k05bb zsM69yeWt`425_6l-iwXdddemu0^t(CZt8dmgrL_oUwf5h($8CgK+o^VUU+;=DN@X9 zAtq}EefLmsxDy`Pz95A~S6Fr5(Qw(PU9wy9$uaJrVASMaQ3tA>C{3;9n#%iFGP?M@vFtYMR*ISTt5&S%c2Mrs1J?fgN9Vi z_~M<2IvkmYM7x^Lm9zZbd^Al>yW@+~0xvTz9~ykzGCl}L!A$iy%tb3693{0=$8Vd9 zI1(xQww#$mE-u?l#)PcW(i};CuferOPAg935S0Q2fa2dW*C9!P55Jys8gm`TSjddL zTTS`#UNSX7{%sWCFMOa@D#Z*UYt3&Y6m#5apE<&@+AA7+p%ouGD@gyUulKK+j40zE z8%O>zJi+EE2a*iUw0=4)Ww^slzv$8u>QNn z{JCbCB=uca)2P%JGwzBEc2^s@pl;}bV7^rskDj(4qfe3d>bxu{jdI=P=S^958ot5y zI|o-?LEg=okx^GO+Z;6yOFdk?y1IeiQ$g#Vg40H7;Z0`i$mjN%!6tAx>v*;uNjxT} z{P=z&Ur|%4`$zbqrg!uCOV_f}7mLeE~@u<`N~<{^lV{!wz+$1jfSsH^z`ffvkx=a zCyq*XI>Q?{oT$d1fltEJx~&+cY+BW!8mJmdC_K zt>;&72RA=fJ+?M>*Qs4L!!~SkuH~1u>GqI9Rz&N34zW2cpjn9}m&|PSvLNOISHoY* zJtuI{(`{O;lV8)Hdzc88pQ!e>4T(vMxSz7)k9ox#rjjPMqbt9QX*ZiY+^y2B6S%?_ zsL%E~5Ebi;shBEPx9Vijmfd$c<+enE=!EG17v*?<6Au7>iwr0u{L}sq*g^gSyZ=I+ VB@)8$&k*SM;eP+Y-)aPae*t2W*~$O_ literal 0 HcmV?d00001 diff --git a/lambda_function.py b/lambda_function.py new file mode 100644 index 0000000..5a4c845 --- /dev/null +++ b/lambda_function.py @@ -0,0 +1,108 @@ +import json +import boto3 +import logging + +# Set up logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def lambda_handler(event, context): + """ + Simple Lambda function to test Glue Catalog access for Iceberg tables + """ + + logger.info("๐Ÿš€ Starting Iceberg Glue Catalog test") + + # Get parameters from event or environment + database_name = event.get('DATABASE_NAME', 'iceberg_test_db') + table_name = event.get('TABLE_NAME', 'sample_customers') + + logger.info(f"๐Ÿ“‹ Testing table: {database_name}.{table_name}") + + try: + # Initialize Glue client + glue_client = boto3.client('glue') + + # Test 1: Get database + logger.info("1๏ธโƒฃ Testing database access...") + db_response = glue_client.get_database(Name=database_name) + logger.info(f"โœ… Database found: {db_response['Database']['Name']}") + + # Test 2: Get table + logger.info("2๏ธโƒฃ Testing table access...") + table_response = glue_client.get_table(DatabaseName=database_name, Name=table_name) + table = table_response['Table'] + + logger.info(f"โœ… Table found: {table['Name']}") + + # Test 3: Check if it's an Iceberg table + logger.info("3๏ธโƒฃ Validating Iceberg table...") + table_type = table.get('Parameters', {}).get('table_type', '').upper() + + if table_type == 'ICEBERG': + logger.info("โœ… Confirmed: This is an Iceberg table") + else: + logger.warning(f"โš ๏ธ Warning: Table type is '{table_type}', not 'ICEBERG'") + + # Test 4: Get table schema + logger.info("4๏ธโƒฃ Checking table schema...") + storage_descriptor = table.get('StorageDescriptor', {}) + columns = storage_descriptor.get('Columns', []) + location = storage_descriptor.get('Location', 'N/A') + + logger.info(f"๐Ÿ“ Location: {location}") + logger.info(f"๐Ÿ“Š Column count: {len(columns)}") + + # Test 5: Check S3 access + logger.info("5๏ธโƒฃ Testing S3 location access...") + if location and location.startswith('s3://'): + s3_client = boto3.client('s3') + + # Parse S3 location + location_parts = location.replace('s3://', '').split('/', 1) + bucket_name = location_parts[0] + prefix = location_parts[1] if len(location_parts) > 1 else '' + + try: + response = s3_client.list_objects_v2( + Bucket=bucket_name, + Prefix=prefix, + MaxKeys=10 + ) + + object_count = response.get('KeyCount', 0) + logger.info(f"๐Ÿ“ S3 bucket accessible: {bucket_name}") + logger.info(f"๐Ÿ“„ Objects found: {object_count}") + + except Exception as s3_error: + logger.error(f"โŒ S3 access failed: {s3_error}") + + # Prepare response + result = { + 'statusCode': 200, + 'body': json.dumps({ + 'message': 'Iceberg Glue Catalog test completed successfully', + 'database': database_name, + 'table': table_name, + 'table_type': table_type, + 'location': location, + 'column_count': len(columns), + 'columns': [{'name': col['Name'], 'type': col['Type']} for col in columns] + }) + } + + logger.info("๐ŸŽ‰ Test completed successfully!") + return result + + except Exception as e: + logger.error(f"โŒ Test failed: {str(e)}") + + error_result = { + 'statusCode': 500, + 'body': json.dumps({ + 'error': str(e), + 'message': 'Iceberg Glue Catalog test failed' + }) + } + + return error_result \ No newline at end of file diff --git a/libs/glue_functions/__init__.py b/libs/glue_functions/__init__.py index 19d722f..3729099 100644 --- a/libs/glue_functions/__init__.py +++ b/libs/glue_functions/__init__.py @@ -1 +1,12 @@ from .glue_catalog_functions import get_table, build_schema_for_table, query_table +from .iceberg_glue_functions import ( + get_iceberg_table_metadata, + get_iceberg_table_location, + get_iceberg_table_properties, + read_iceberg_table_with_spark, + read_iceberg_table_by_location, + query_iceberg_table_history, + query_iceberg_table_snapshots, + read_iceberg_table_at_timestamp, + read_iceberg_table_at_snapshot +) diff --git a/libs/glue_functions/iceberg_glue_functions.py b/libs/glue_functions/iceberg_glue_functions.py new file mode 100644 index 0000000..f1f77ed --- /dev/null +++ b/libs/glue_functions/iceberg_glue_functions.py @@ -0,0 +1,281 @@ +import logging +import sys +import boto3 +from typing import Dict, Optional, Any + +from pyspark.sql import SparkSession +from pyspark.sql.types import * + +logger = logging.getLogger() +logger.setLevel(logging.INFO) +handler = logging.StreamHandler(sys.stdout) +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) + + +def get_iceberg_table_metadata(db_name: str, table_name: str, aws_region: str) -> Optional[Dict[str, Any]]: + """ + Fetches Iceberg table metadata from AWS Glue Catalog. + + Parameters: + - db_name (str): The name of the database in Glue Catalog. + - table_name (str): The name of the Iceberg table in Glue Database. + - aws_region (str): AWS region for Glue client. + + Returns: + - dict: The response from the Glue `get_table` API call, or None if error. + """ + try: + glue = boto3.client('glue', region_name=aws_region) + response = glue.get_table(DatabaseName=db_name, Name=table_name) + + # Validate that this is an Iceberg table + table_params = response.get('Table', {}).get('Parameters', {}) + table_type = table_params.get('table_type', '').upper() + + if table_type != 'ICEBERG': + logger.warning(f"Table {table_name} is not an Iceberg table (type: {table_type})") + + return response + except Exception as e: + logger.error(f"Error fetching Iceberg table {table_name} from database {db_name}: {e}") + return None + + +def get_iceberg_table_location(glue_table: Dict[str, Any]) -> Optional[str]: + """ + Extracts the S3 location of an Iceberg table from Glue metadata. + + Parameters: + - glue_table (dict): The table metadata from AWS Glue. + + Returns: + - str: The S3 location of the Iceberg table, or None if not found. + """ + try: + if not glue_table or 'Table' not in glue_table: + return None + + # For Iceberg tables, location is in StorageDescriptor + storage_descriptor = glue_table['Table'].get('StorageDescriptor', {}) + location = storage_descriptor.get('Location', '') + + if location: + # Convert s3:// to s3a:// for Spark compatibility + if location.startswith("s3://"): + location = location.replace("s3://", "s3a://") + return location + + return None + except Exception as e: + logger.error(f"Error extracting table location: {e}") + return None + + +def get_iceberg_table_properties(glue_table: Dict[str, Any]) -> Dict[str, str]: + """ + Extracts Iceberg-specific table properties from Glue metadata. + + Parameters: + - glue_table (dict): The table metadata from AWS Glue. + + Returns: + - dict: Dictionary of Iceberg table properties. + """ + try: + if not glue_table or 'Table' not in glue_table: + return {} + + table_params = glue_table['Table'].get('Parameters', {}) + + # Extract Iceberg-specific properties + iceberg_props = {} + for key, value in table_params.items(): + if key.startswith('iceberg.') or key in ['table_type', 'metadata_location']: + iceberg_props[key] = value + + return iceberg_props + except Exception as e: + logger.error(f"Error extracting Iceberg table properties: {e}") + return {} + + +def read_iceberg_table_with_spark(spark: SparkSession, db_name: str, table_name: str, + catalog_name: str = "glue_catalog"): + """ + Reads an Iceberg table using Spark with Glue Catalog integration. + + Parameters: + - spark (SparkSession): The active SparkSession configured for Iceberg. + - db_name (str): The database name in Glue Catalog. + - table_name (str): The table name in Glue Catalog. + - catalog_name (str): The catalog name configured in Spark (default: "glue_catalog"). + + Returns: + - DataFrame: Spark DataFrame containing the Iceberg table data. + """ + try: + table_identifier = f"{catalog_name}.{db_name}.{table_name}" + logger.info(f"Reading Iceberg table: {table_identifier}") + + df = spark.read.format("iceberg").load(table_identifier) + + logger.info(f"Successfully loaded Iceberg table with {df.count()} rows") + logger.info("Table schema:") + df.printSchema() + + return df + except Exception as e: + logger.error(f"Error reading Iceberg table {table_identifier}: {e}") + raise + + +def read_iceberg_table_by_location(spark: SparkSession, table_location: str): + """ + Reads an Iceberg table directly from its S3 location. + + Parameters: + - spark (SparkSession): The active SparkSession configured for Iceberg. + - table_location (str): The S3 location of the Iceberg table. + + Returns: + - DataFrame: Spark DataFrame containing the Iceberg table data. + """ + try: + # Ensure s3a:// protocol + if table_location.startswith("s3://"): + table_location = table_location.replace("s3://", "s3a://") + + logger.info(f"Reading Iceberg table from location: {table_location}") + + df = spark.read.format("iceberg").load(table_location) + + logger.info(f"Successfully loaded Iceberg table with {df.count()} rows") + logger.info("Table schema:") + df.printSchema() + + return df + except Exception as e: + logger.error(f"Error reading Iceberg table from location {table_location}: {e}") + raise + + +def query_iceberg_table_history(spark: SparkSession, db_name: str, table_name: str, + catalog_name: str = "glue_catalog"): + """ + Queries the history of an Iceberg table to see snapshots and changes. + + Parameters: + - spark (SparkSession): The active SparkSession configured for Iceberg. + - db_name (str): The database name in Glue Catalog. + - table_name (str): The table name in Glue Catalog. + - catalog_name (str): The catalog name configured in Spark. + + Returns: + - DataFrame: DataFrame containing the table history. + """ + try: + table_identifier = f"{catalog_name}.{db_name}.{table_name}" + logger.info(f"Querying history for Iceberg table: {table_identifier}") + + history_df = spark.read.format("iceberg").load(f"{table_identifier}.history") + + logger.info("Table history:") + history_df.show(truncate=False) + + return history_df + except Exception as e: + logger.error(f"Error querying table history for {table_identifier}: {e}") + raise + + +def query_iceberg_table_snapshots(spark: SparkSession, db_name: str, table_name: str, + catalog_name: str = "glue_catalog"): + """ + Queries the snapshots of an Iceberg table. + + Parameters: + - spark (SparkSession): The active SparkSession configured for Iceberg. + - db_name (str): The database name in Glue Catalog. + - table_name (str): The table name in Glue Catalog. + - catalog_name (str): The catalog name configured in Spark. + + Returns: + - DataFrame: DataFrame containing the table snapshots. + """ + try: + table_identifier = f"{catalog_name}.{db_name}.{table_name}" + logger.info(f"Querying snapshots for Iceberg table: {table_identifier}") + + snapshots_df = spark.read.format("iceberg").load(f"{table_identifier}.snapshots") + + logger.info("Table snapshots:") + snapshots_df.show(truncate=False) + + return snapshots_df + except Exception as e: + logger.error(f"Error querying table snapshots for {table_identifier}: {e}") + raise + + +def read_iceberg_table_at_timestamp(spark: SparkSession, db_name: str, table_name: str, + timestamp: str, catalog_name: str = "glue_catalog"): + """ + Reads an Iceberg table as it existed at a specific timestamp (time travel). + + Parameters: + - spark (SparkSession): The active SparkSession configured for Iceberg. + - db_name (str): The database name in Glue Catalog. + - table_name (str): The table name in Glue Catalog. + - timestamp (str): Timestamp in format 'YYYY-MM-DD HH:MM:SS.SSS' + - catalog_name (str): The catalog name configured in Spark. + + Returns: + - DataFrame: Spark DataFrame containing the table data at the specified timestamp. + """ + try: + table_identifier = f"{catalog_name}.{db_name}.{table_name}" + logger.info(f"Reading Iceberg table {table_identifier} at timestamp: {timestamp}") + + df = spark.read.format("iceberg") \ + .option("as-of-timestamp", timestamp) \ + .load(table_identifier) + + logger.info(f"Successfully loaded table at timestamp with {df.count()} rows") + + return df + except Exception as e: + logger.error(f"Error reading table at timestamp {timestamp}: {e}") + raise + + +def read_iceberg_table_at_snapshot(spark: SparkSession, db_name: str, table_name: str, + snapshot_id: str, catalog_name: str = "glue_catalog"): + """ + Reads an Iceberg table at a specific snapshot ID (time travel). + + Parameters: + - spark (SparkSession): The active SparkSession configured for Iceberg. + - db_name (str): The database name in Glue Catalog. + - table_name (str): The table name in Glue Catalog. + - snapshot_id (str): The snapshot ID to read from. + - catalog_name (str): The catalog name configured in Spark. + + Returns: + - DataFrame: Spark DataFrame containing the table data at the specified snapshot. + """ + try: + table_identifier = f"{catalog_name}.{db_name}.{table_name}" + logger.info(f"Reading Iceberg table {table_identifier} at snapshot: {snapshot_id}") + + df = spark.read.format("iceberg") \ + .option("snapshot-id", snapshot_id) \ + .load(table_identifier) + + logger.info(f"Successfully loaded table at snapshot with {df.count()} rows") + + return df + except Exception as e: + logger.error(f"Error reading table at snapshot {snapshot_id}: {e}") + raise \ No newline at end of file diff --git a/spark-scripts/.DS_Store b/spark-scripts/.DS_Store deleted file mode 100644 index c927a13f192f370cf3285e985df127dcf355008d..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHKOG-mQ5UkdKfo!sLIalxoV@OWm1;iwv5C|x`eyf}-M@#i*VR*Vr7D_er)J#v; z3{#8OuL0QNbMpYq0Zi$RIQTF&-*+F`O+}1I=NWJK!glkq-o5Nc**^!IdxuB7V~yAE z{Qb_iWKuv1NC7Dz1*E`*6{rF`on81`9VZ2(z~5KEzYmS>*b9fm_;hfH7JxWmIE?e? zC5X)f#9lZgGD5SY5|e7xVp!4{ZNQd=AjZ#1goGWmf z%emM875$g~|D2?i6p#Y{N&%ZJpOyA`l3opm_ ck(7DO=iKjwLt@Yw4?0ml1Fnlq3S3)(6DqwGWdHyG diff --git a/sparkLambdaHandler.py b/sparkLambdaHandler.py index 990b44f..2dc57ca 100644 --- a/sparkLambdaHandler.py +++ b/sparkLambdaHandler.py @@ -1,73 +1,309 @@ -import boto3 -import sys -import os -import subprocess -import logging +#!/usr/bin/env python3 +""" +Production Lambda function for reading Iceberg tables from Glue Catalog +This is the actual handler that will run in the Lambda container +""" + import json +import logging +import os +import sys +from datetime import datetime + +from pyspark.sql import SparkSession +from pyspark.sql.functions import * + +# Add glue functions to path +sys.path.append('/home/glue_functions') # Set up logging logger = logging.getLogger() logger.setLevel(logging.INFO) -handler = logging.StreamHandler(sys.stdout) -formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") -handler.setFormatter(formatter) -logger.addHandler(handler) -def s3_script_download(s3_bucket_script: str,input_script: str)-> None: - """ - """ - s3_client = boto3.resource("s3") +def create_iceberg_spark_session(): + """Create Spark session optimized for Lambda with Iceberg support""" + + aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] + aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] + session_token = os.environ['AWS_SESSION_TOKEN'] + aws_region = os.environ.get('AWS_REGION', 'us-east-1') + + logger.info("Creating Spark session with Iceberg configuration...") + + spark = SparkSession.builder \ + .appName("Lambda-Iceberg-Reader") \ + .master("local[*]") \ + .config("spark.driver.bindAddress", "127.0.0.1") \ + .config("spark.driver.memory", "5g") \ + .config("spark.executor.memory", "5g") \ + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ + .config("spark.sql.adaptive.enabled", "true") \ + .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ + .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ + .config("spark.sql.catalog.glue_catalog.glue.region", aws_region) \ + .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \ + .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \ + .config("spark.hadoop.fs.s3a.session.token", session_token) \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \ + .getOrCreate() + + logger.info("Spark session created successfully") + return spark +def read_iceberg_table(spark, database_name, table_name, limit=None, filters=None): + """Read Iceberg table with optional filters and limit""" + + table_identifier = f"glue_catalog.{database_name}.{table_name}" + logger.info(f"Reading Iceberg table: {table_identifier}") + try: - logger.info(f'Now downloading script {input_script} in {s3_bucket_script} to /tmp') - s3_client.Bucket(s3_bucket_script).download_file(input_script, "/tmp/spark_script.py") - - except Exception as e : - logger.error(f'Error downloading the script {input_script} in {s3_bucket_script}: {e}') - else: - logger.info(f'Script {input_script} successfully downloaded to /tmp') - - + # Read the table + df = spark.read.format("iceberg").load(table_identifier) + + # Apply filters if provided + if filters: + for filter_condition in filters: + df = df.filter(filter_condition) + logger.info(f"Applied filter: {filter_condition}") + + # Apply limit if provided + if limit: + df = df.limit(limit) + + return df + + except Exception as e: + logger.error(f"Error reading table {table_identifier}: {e}") + raise -def spark_submit(s3_bucket_script: str,input_script: str, event: dict)-> None: - """ - Submits a local Spark script using spark-submit. - """ - # Set the environment variables for the Spark application - # pyspark_submit_args = event.get('PYSPARK_SUBMIT_ARGS', '') - # # Source input and output if available in event - # input_path = event.get('INPUT_PATH','') - # output_path = event.get('OUTPUT_PATH', '') - - for key,value in event.items(): - os.environ[key] = value - # Run the spark-submit command on the local copy of teh script +def get_table_analytics(df): + """Get basic analytics from the DataFrame""" + + logger.info("Computing table analytics...") + try: - logger.info(f'Spark-Submitting the Spark script {input_script} from {s3_bucket_script}') - subprocess.run(["spark-submit", "/tmp/spark_script.py", "--event", json.dumps(event)], check=True, env=os.environ) - except Exception as e : - logger.error(f'Error Spark-Submit with exception: {e}') - raise e - else: - logger.info(f'Script {input_script} successfully submitted') + # Basic stats + total_count = df.count() + + # Get numeric columns for analytics + numeric_columns = [] + for field in df.schema.fields: + if field.dataType.typeName() in ['integer', 'long', 'double', 'decimal', 'float']: + numeric_columns.append(field.name) + + analytics = { + 'total_rows': total_count, + 'columns': len(df.columns), + 'column_names': df.columns + } + + # Add numeric analytics if available + if numeric_columns: + for col_name in numeric_columns: + try: + stats = df.agg( + avg(col_name).alias('avg'), + min(col_name).alias('min'), + max(col_name).alias('max') + ).collect()[0] + + analytics[f'{col_name}_stats'] = { + 'avg': float(stats['avg']) if stats['avg'] else None, + 'min': float(stats['min']) if stats['min'] else None, + 'max': float(stats['max']) if stats['max'] else None + } + except Exception as e: + logger.warning(f"Could not compute stats for {col_name}: {e}") + + return analytics + + except Exception as e: + logger.error(f"Error computing analytics: {e}") + return {'total_rows': 0, 'error': str(e)} -def lambda_handler(event, context): +def convert_rows_to_json(rows): + """Convert Spark rows to JSON-serializable format""" + + results = [] + for row in rows: + row_dict = {} + for field in row.__fields__: + value = getattr(row, field) + + # Handle different data types + if value is None: + row_dict[field] = None + elif hasattr(value, 'isoformat'): # datetime + row_dict[field] = value.isoformat() + elif str(type(value)) == "": + row_dict[field] = float(value) + elif str(type(value)) == "": + row_dict[field] = value.isoformat() + else: + row_dict[field] = value + + results.append(row_dict) + + return results +def lambda_handler(event, context): """ - Lambda_handler is called when the AWS Lambda - is triggered. The function is downloading file - from Amazon S3 location and spark submitting - the script in AWS Lambda + Main Lambda handler for Iceberg table operations + + Event format: + { + "operation": "read_table", + "database": "iceberg_test_db", + "table": "sample_customers", + "limit": 10, + "filters": ["total_spent > 300"], + "include_analytics": true + } """ + + logger.info("๐Ÿš€ Starting Iceberg Lambda handler") + logger.info(f"Event: {json.dumps(event)}") + + # Parse event parameters + operation = event.get('operation', 'read_table') + database_name = event.get('database', os.environ.get('DATABASE_NAME', 'iceberg_test_db')) + table_name = event.get('table', os.environ.get('TABLE_NAME', 'sample_customers')) + limit = event.get('limit', 10) + filters = event.get('filters', []) + include_analytics = event.get('include_analytics', True) + + logger.info(f"Operation: {operation}") + logger.info(f"Target table: {database_name}.{table_name}") + + spark = None + + try: + # Create Spark session + spark = create_iceberg_spark_session() + + if operation == 'read_table': + # Read table data + df = read_iceberg_table(spark, database_name, table_name, limit, filters) + + # Get sample data + sample_rows = df.collect() + sample_data = convert_rows_to_json(sample_rows) + + # Prepare response + response_body = { + 'message': 'Successfully read Iceberg table', + 'database': database_name, + 'table': table_name, + 'operation': operation, + 'filters_applied': filters, + 'sample_data': sample_data, + 'sample_count': len(sample_data), + 'timestamp': datetime.now().isoformat() + } + + # Add analytics if requested + if include_analytics: + # Read full table for analytics (without limit) + full_df = read_iceberg_table(spark, database_name, table_name, filters=filters) + analytics = get_table_analytics(full_df) + response_body['analytics'] = analytics + + logger.info(f"โœ… Successfully processed {len(sample_data)} rows") + + return { + 'statusCode': 200, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps(response_body) + } + + elif operation == 'table_info': + # Get table information only + df = read_iceberg_table(spark, database_name, table_name, limit=1) + + schema_info = [] + for field in df.schema.fields: + schema_info.append({ + 'name': field.name, + 'type': str(field.dataType), + 'nullable': field.nullable + }) + + # Get full count + full_df = read_iceberg_table(spark, database_name, table_name) + total_count = full_df.count() + + response_body = { + 'message': 'Table information retrieved', + 'database': database_name, + 'table': table_name, + 'total_rows': total_count, + 'schema': schema_info, + 'timestamp': datetime.now().isoformat() + } + + return { + 'statusCode': 200, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps(response_body) + } + + else: + return { + 'statusCode': 400, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps({ + 'error': f'Unknown operation: {operation}', + 'supported_operations': ['read_table', 'table_info'] + }) + } + + except Exception as e: + logger.error(f"โŒ Lambda execution failed: {str(e)}") + + return { + 'statusCode': 500, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps({ + 'error': str(e), + 'message': 'Iceberg table operation failed', + 'timestamp': datetime.now().isoformat() + }) + } + + finally: + if spark: + logger.info("๐Ÿ”ง Stopping Spark session") + spark.stop() - logger.info("******************Start AWS Lambda Handler************") - s3_bucket_script = os.environ['SCRIPT_BUCKET'] - input_script = os.environ['SPARK_SCRIPT'] - os.environ['INPUT_PATH'] = event.get('INPUT_PATH','') - os.environ['OUTPUT_PATH'] = event.get('OUTPUT_PATH', '') - - s3_script_download(s3_bucket_script,input_script) +# For testing locally +if __name__ == "__main__": + # Test event + test_event = { + "operation": "read_table", + "database": "iceberg_test_db", + "table": "sample_customers", + "limit": 5, + "include_analytics": True + } + + # Mock context + class MockContext: + def __init__(self): + self.function_name = "test-iceberg-reader" + self.memory_limit_in_mb = 3008 + self.invoked_function_arn = "arn:aws:lambda:us-east-1:123456789012:function:test" - # Set the environment variables for the Spark application - spark_submit(s3_bucket_script,input_script, event) - + result = lambda_handler(test_event, MockContext()) + print(json.dumps(result, indent=2)) \ No newline at end of file diff --git a/test-infrastructure/cleanup-test-environment.sh b/test-infrastructure/cleanup-test-environment.sh new file mode 100755 index 0000000..4828b46 --- /dev/null +++ b/test-infrastructure/cleanup-test-environment.sh @@ -0,0 +1,106 @@ +#!/bin/bash + +# Cleanup Test Environment for Iceberg Glue Catalog Integration + +set -e + +# Configuration +STACK_NAME="spark-lambda-iceberg-test" +AWS_REGION="us-east-1" +LAMBDA_FUNCTION_NAME="spark-iceberg-test" +ECR_REPO_NAME="sparkonlambda-iceberg" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +echo -e "${BLUE}๐Ÿงน Starting Test Environment Cleanup${NC}" +echo "========================================" + +# Get AWS Account ID +AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) +BUCKET_NAME="spark-lambda-iceberg-test-${AWS_ACCOUNT_ID}" + +echo -e "${BLUE}๐Ÿ“‹ Configuration:${NC}" +echo " Stack Name: $STACK_NAME" +echo " AWS Region: $AWS_REGION" +echo " AWS Account: $AWS_ACCOUNT_ID" +echo " S3 Bucket: $BUCKET_NAME" +echo " Lambda Function: $LAMBDA_FUNCTION_NAME" +echo " ECR Repository: $ECR_REPO_NAME" +echo "" + +# Step 1: Delete Lambda function +echo -e "${YELLOW}โšก Step 1: Deleting Lambda function...${NC}" +if aws lambda get-function --function-name $LAMBDA_FUNCTION_NAME --region $AWS_REGION > /dev/null 2>&1; then + aws lambda delete-function --function-name $LAMBDA_FUNCTION_NAME --region $AWS_REGION + echo -e "${GREEN}โœ… Lambda function deleted${NC}" +else + echo -e "${YELLOW}โš ๏ธ Lambda function not found${NC}" +fi + +# Step 2: Empty and delete S3 bucket contents +echo -e "${YELLOW}๐Ÿ—‘๏ธ Step 2: Emptying S3 bucket...${NC}" +if aws s3 ls s3://$BUCKET_NAME --region $AWS_REGION > /dev/null 2>&1; then + aws s3 rm s3://$BUCKET_NAME --recursive --region $AWS_REGION + echo -e "${GREEN}โœ… S3 bucket emptied${NC}" +else + echo -e "${YELLOW}โš ๏ธ S3 bucket not found or already empty${NC}" +fi + +# Step 3: Delete ECR repository +echo -e "${YELLOW}๐Ÿณ Step 3: Deleting ECR repository...${NC}" +if aws ecr describe-repositories --repository-names $ECR_REPO_NAME --region $AWS_REGION > /dev/null 2>&1; then + aws ecr delete-repository --repository-name $ECR_REPO_NAME --force --region $AWS_REGION + echo -e "${GREEN}โœ… ECR repository deleted${NC}" +else + echo -e "${YELLOW}โš ๏ธ ECR repository not found${NC}" +fi + +# Step 4: Delete CloudFormation stack +echo -e "${YELLOW}๐Ÿ“ฆ Step 4: Deleting CloudFormation stack...${NC}" +if aws cloudformation describe-stacks --stack-name $STACK_NAME --region $AWS_REGION > /dev/null 2>&1; then + aws cloudformation delete-stack --stack-name $STACK_NAME --region $AWS_REGION + + echo -e "${YELLOW}โณ Waiting for stack deletion to complete...${NC}" + aws cloudformation wait stack-delete-complete --stack-name $STACK_NAME --region $AWS_REGION + + if [ $? -eq 0 ]; then + echo -e "${GREEN}โœ… CloudFormation stack deleted successfully${NC}" + else + echo -e "${RED}โŒ CloudFormation stack deletion failed or timed out${NC}" + echo -e "${YELLOW}โš ๏ธ Please check the AWS Console for stack status${NC}" + fi +else + echo -e "${YELLOW}โš ๏ธ CloudFormation stack not found${NC}" +fi + +# Step 5: Clean up local Docker images (optional) +echo -e "${YELLOW}๐Ÿณ Step 5: Cleaning up local Docker images...${NC}" +if docker images | grep -q $ECR_REPO_NAME; then + docker rmi $(docker images | grep $ECR_REPO_NAME | awk '{print $3}') 2>/dev/null || true + echo -e "${GREEN}โœ… Local Docker images cleaned up${NC}" +else + echo -e "${YELLOW}โš ๏ธ No local Docker images found${NC}" +fi + +echo "" +echo -e "${BLUE}๐ŸŽ‰ Cleanup Complete!${NC}" +echo "====================" +echo "" +echo -e "${GREEN}โœ… All test resources have been cleaned up${NC}" +echo "" +echo -e "${YELLOW}๐Ÿ“‹ Resources Removed:${NC}" +echo " โ€ข Lambda Function: $LAMBDA_FUNCTION_NAME" +echo " โ€ข S3 Bucket: $BUCKET_NAME (emptied)" +echo " โ€ข ECR Repository: $ECR_REPO_NAME" +echo " โ€ข CloudFormation Stack: $STACK_NAME" +echo " โ€ข IAM Role: SparkLambdaIcebergRole-$STACK_NAME" +echo " โ€ข Glue Database: iceberg_test_db" +echo " โ€ข Glue Table: sample_customers" +echo "" +echo -e "${BLUE}๐Ÿ’ก Note: The S3 bucket itself will be deleted by CloudFormation${NC}" +echo -e "${BLUE}๐Ÿ’ก Note: Check AWS Console to verify all resources are removed${NC}" \ No newline at end of file diff --git a/test-infrastructure/create-sample-iceberg-table.py b/test-infrastructure/create-sample-iceberg-table.py new file mode 100644 index 0000000..f7423f0 --- /dev/null +++ b/test-infrastructure/create-sample-iceberg-table.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +""" +Script to create a sample Iceberg table with test data for Lambda testing. +This script should be run locally or on an EC2 instance with Spark and Iceberg configured. +""" + +import os +import sys +from datetime import datetime, date +from decimal import Decimal + +from pyspark.sql import SparkSession +from pyspark.sql.types import * +from pyspark.sql.functions import * + +def create_spark_session(bucket_name, aws_region='us-east-1'): + """Create Spark session configured for Iceberg with Glue Catalog""" + + spark = SparkSession.builder \ + .appName("CreateSampleIcebergTable") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.glue_catalog.warehouse", f"s3a://{bucket_name}/iceberg-warehouse/") \ + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ + .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \ + .config("spark.sql.catalog.glue_catalog.glue.region", aws_region) \ + .getOrCreate() + + return spark + +def create_sample_data(spark): + """Create sample customer data""" + + # Define schema + schema = StructType([ + StructField("customer_id", LongType(), False), + StructField("customer_name", StringType(), False), + StructField("email", StringType(), False), + StructField("registration_date", DateType(), False), + StructField("total_orders", IntegerType(), False), + StructField("total_spent", DecimalType(10, 2), False), + StructField("last_updated", TimestampType(), False) + ]) + + # Sample data + sample_data = [ + (1, "John Doe", "john.doe@email.com", date(2023, 1, 15), 5, Decimal("299.99"), datetime(2024, 1, 15, 10, 30, 0)), + (2, "Jane Smith", "jane.smith@email.com", date(2023, 2, 20), 8, Decimal("599.50"), datetime(2024, 1, 16, 14, 45, 0)), + (3, "Bob Johnson", "bob.johnson@email.com", date(2023, 3, 10), 3, Decimal("149.75"), datetime(2024, 1, 17, 9, 15, 0)), + (4, "Alice Brown", "alice.brown@email.com", date(2023, 4, 5), 12, Decimal("899.25"), datetime(2024, 1, 18, 16, 20, 0)), + (5, "Charlie Wilson", "charlie.wilson@email.com", date(2023, 5, 12), 7, Decimal("449.80"), datetime(2024, 1, 19, 11, 10, 0)), + (6, "Diana Davis", "diana.davis@email.com", date(2023, 6, 8), 15, Decimal("1299.99"), datetime(2024, 1, 20, 13, 25, 0)), + (7, "Frank Miller", "frank.miller@email.com", date(2023, 7, 22), 4, Decimal("199.95"), datetime(2024, 1, 21, 8, 40, 0)), + (8, "Grace Lee", "grace.lee@email.com", date(2023, 8, 18), 9, Decimal("679.30"), datetime(2024, 1, 22, 15, 55, 0)), + (9, "Henry Taylor", "henry.taylor@email.com", date(2023, 9, 3), 6, Decimal("359.60"), datetime(2024, 1, 23, 12, 5, 0)), + (10, "Ivy Anderson", "ivy.anderson@email.com", date(2023, 10, 14), 11, Decimal("799.85"), datetime(2024, 1, 24, 17, 30, 0)) + ] + + df = spark.createDataFrame(sample_data, schema) + return df + +def create_iceberg_table(spark, database_name, table_name, df): + """Create Iceberg table in Glue Catalog""" + + table_identifier = f"glue_catalog.{database_name}.{table_name}" + + print(f"Creating Iceberg table: {table_identifier}") + + # Write DataFrame as Iceberg table + df.writeTo(table_identifier) \ + .using("iceberg") \ + .tableProperty("format-version", "2") \ + .tableProperty("write.parquet.compression-codec", "snappy") \ + .create() + + print(f"Successfully created Iceberg table: {table_identifier}") + + # Verify the table + verify_df = spark.read.format("iceberg").load(table_identifier) + print(f"Table verification - Row count: {verify_df.count()}") + verify_df.show() + + return table_identifier + +def add_more_data(spark, table_identifier): + """Add more data to demonstrate table evolution""" + + print(f"Adding more data to: {table_identifier}") + + # Additional data + additional_schema = StructType([ + StructField("customer_id", LongType(), False), + StructField("customer_name", StringType(), False), + StructField("email", StringType(), False), + StructField("registration_date", DateType(), False), + StructField("total_orders", IntegerType(), False), + StructField("total_spent", DecimalType(10, 2), False), + StructField("last_updated", TimestampType(), False) + ]) + + additional_data = [ + (11, "Kevin White", "kevin.white@email.com", date(2023, 11, 5), 2, Decimal("99.99"), datetime(2024, 1, 25, 10, 0, 0)), + (12, "Laura Green", "laura.green@email.com", date(2023, 12, 1), 13, Decimal("999.75"), datetime(2024, 1, 26, 14, 30, 0)), + (13, "Mike Black", "mike.black@email.com", date(2024, 1, 10), 1, Decimal("49.99"), datetime(2024, 1, 27, 9, 45, 0)) + ] + + additional_df = spark.createDataFrame(additional_data, additional_schema) + + # Append to existing table + additional_df.writeTo(table_identifier).using("iceberg").append() + + print("Successfully added more data") + + # Verify updated table + updated_df = spark.read.format("iceberg").load(table_identifier) + print(f"Updated table - Row count: {updated_df.count()}") + +def main(): + if len(sys.argv) != 3: + print("Usage: python create-sample-iceberg-table.py ") + print("Example: python create-sample-iceberg-table.py my-test-bucket-123456789 iceberg_test_db") + sys.exit(1) + + bucket_name = sys.argv[1] + database_name = sys.argv[2] + table_name = "sample_customers" + + print(f"Creating sample Iceberg table in bucket: {bucket_name}") + print(f"Database: {database_name}, Table: {table_name}") + + # Create Spark session + spark = create_spark_session(bucket_name) + + try: + # Create sample data + df = create_sample_data(spark) + + # Create Iceberg table + table_identifier = create_iceberg_table(spark, database_name, table_name, df) + + # Add more data to demonstrate table evolution + add_more_data(spark, table_identifier) + + print("\n" + "="*50) + print("SUCCESS: Sample Iceberg table created successfully!") + print(f"Table: {table_identifier}") + print(f"Location: s3://{bucket_name}/iceberg-warehouse/{database_name}/{table_name}/") + print("="*50) + + except Exception as e: + print(f"ERROR: Failed to create sample table: {e}") + raise + finally: + spark.stop() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test-infrastructure/deploy-test-environment.sh b/test-infrastructure/deploy-test-environment.sh new file mode 100755 index 0000000..24e66eb --- /dev/null +++ b/test-infrastructure/deploy-test-environment.sh @@ -0,0 +1,198 @@ +#!/bin/bash + +# Deploy Test Environment for Iceberg Glue Catalog Integration +# This script sets up the complete test environment + +set -e + +# Configuration +STACK_NAME="spark-lambda-iceberg-test" +AWS_REGION="us-east-1" +BUCKET_PREFIX="spark-lambda-iceberg-test" +DATABASE_NAME="iceberg_test_db" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +echo -e "${BLUE}๐Ÿš€ Starting Iceberg Test Environment Deployment${NC}" +echo "==================================================" + +# Check if AWS CLI is configured +if ! aws sts get-caller-identity > /dev/null 2>&1; then + echo -e "${RED}โŒ AWS CLI not configured. Please run 'aws configure' first.${NC}" + exit 1 +fi + +# Get AWS Account ID +AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) +BUCKET_NAME="${BUCKET_PREFIX}-${AWS_ACCOUNT_ID}" + +echo -e "${BLUE}๐Ÿ“‹ Configuration:${NC}" +echo " Stack Name: $STACK_NAME" +echo " AWS Region: $AWS_REGION" +echo " AWS Account: $AWS_ACCOUNT_ID" +echo " S3 Bucket: $BUCKET_NAME" +echo " Database: $DATABASE_NAME" +echo "" + +# Step 1: Deploy CloudFormation stack +echo -e "${YELLOW}๐Ÿ“ฆ Step 1: Deploying CloudFormation stack...${NC}" +aws cloudformation deploy \ + --template-file test-infrastructure/iceberg-test-setup.yaml \ + --stack-name $STACK_NAME \ + --parameter-overrides \ + BucketName=$BUCKET_PREFIX \ + DatabaseName=$DATABASE_NAME \ + --capabilities CAPABILITY_NAMED_IAM \ + --region $AWS_REGION + +if [ $? -eq 0 ]; then + echo -e "${GREEN}โœ… CloudFormation stack deployed successfully${NC}" +else + echo -e "${RED}โŒ CloudFormation deployment failed${NC}" + exit 1 +fi + +# Step 2: Upload test scripts to S3 +echo -e "${YELLOW}๐Ÿ“ค Step 2: Uploading test scripts to S3...${NC}" + +# Create scripts directory in S3 +aws s3 cp spark-scripts/test-iceberg-integration.py s3://$BUCKET_NAME/scripts/ --region $AWS_REGION +aws s3 cp spark-scripts/simple-iceberg-reader.py s3://$BUCKET_NAME/scripts/ --region $AWS_REGION + +echo -e "${GREEN}โœ… Test scripts uploaded to S3${NC}" + +# Step 3: Build and push Docker image (if ECR repo exists) +echo -e "${YELLOW}๐Ÿณ Step 3: Checking for Docker image...${NC}" + +# Check if ECR repository exists +ECR_REPO_NAME="sparkonlambda-iceberg" +if aws ecr describe-repositories --repository-names $ECR_REPO_NAME --region $AWS_REGION > /dev/null 2>&1; then + echo -e "${GREEN}โœ… ECR repository exists: $ECR_REPO_NAME${NC}" + + # Get ECR login + aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com + + # Build and push image + echo -e "${YELLOW}๐Ÿ”จ Building Docker image with Iceberg support...${NC}" + docker build --build-arg FRAMEWORK=ICEBERG -t $ECR_REPO_NAME . + + docker tag $ECR_REPO_NAME:latest ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/$ECR_REPO_NAME:latest + docker push ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/$ECR_REPO_NAME:latest + + echo -e "${GREEN}โœ… Docker image built and pushed${NC}" +else + echo -e "${YELLOW}โš ๏ธ ECR repository not found. Creating it...${NC}" + aws ecr create-repository --repository-name $ECR_REPO_NAME --region $AWS_REGION + + # Get ECR login + aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com + + # Build and push image + echo -e "${YELLOW}๐Ÿ”จ Building Docker image with Iceberg support...${NC}" + docker build --build-arg FRAMEWORK=ICEBERG -t $ECR_REPO_NAME . + + docker tag $ECR_REPO_NAME:latest ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/$ECR_REPO_NAME:latest + docker push ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/$ECR_REPO_NAME:latest + + echo -e "${GREEN}โœ… ECR repository created and Docker image pushed${NC}" +fi + +# Step 4: Get outputs from CloudFormation +echo -e "${YELLOW}๐Ÿ“‹ Step 4: Getting CloudFormation outputs...${NC}" + +LAMBDA_ROLE_ARN=$(aws cloudformation describe-stacks \ + --stack-name $STACK_NAME \ + --query 'Stacks[0].Outputs[?OutputKey==`LambdaRoleArn`].OutputValue' \ + --output text \ + --region $AWS_REGION) + +echo -e "${GREEN}โœ… Lambda Role ARN: $LAMBDA_ROLE_ARN${NC}" + +# Step 5: Create Lambda function +echo -e "${YELLOW}โšก Step 5: Creating Lambda function...${NC}" + +LAMBDA_FUNCTION_NAME="spark-iceberg-test" +IMAGE_URI="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/$ECR_REPO_NAME:latest" + +# Check if Lambda function exists +if aws lambda get-function --function-name $LAMBDA_FUNCTION_NAME --region $AWS_REGION > /dev/null 2>&1; then + echo -e "${YELLOW}โš ๏ธ Lambda function exists. Updating...${NC}" + aws lambda update-function-code \ + --function-name $LAMBDA_FUNCTION_NAME \ + --image-uri $IMAGE_URI \ + --region $AWS_REGION + + aws lambda update-function-configuration \ + --function-name $LAMBDA_FUNCTION_NAME \ + --role $LAMBDA_ROLE_ARN \ + --timeout 900 \ + --memory-size 3008 \ + --environment Variables="{ + SCRIPT_BUCKET=$BUCKET_NAME, + SPARK_SCRIPT=test-iceberg-integration.py, + DATABASE_NAME=$DATABASE_NAME, + TABLE_NAME=sample_customers, + AWS_REGION=$AWS_REGION + }" \ + --region $AWS_REGION +else + echo -e "${YELLOW}๐Ÿ†• Creating new Lambda function...${NC}" + aws lambda create-function \ + --function-name $LAMBDA_FUNCTION_NAME \ + --role $LAMBDA_ROLE_ARN \ + --code ImageUri=$IMAGE_URI \ + --package-type Image \ + --timeout 900 \ + --memory-size 3008 \ + --environment Variables="{ + SCRIPT_BUCKET=$BUCKET_NAME, + SPARK_SCRIPT=test-iceberg-integration.py, + DATABASE_NAME=$DATABASE_NAME, + TABLE_NAME=sample_customers, + AWS_REGION=$AWS_REGION + }" \ + --region $AWS_REGION +fi + +echo -e "${GREEN}โœ… Lambda function created/updated: $LAMBDA_FUNCTION_NAME${NC}" + +# Step 6: Display next steps +echo "" +echo -e "${BLUE}๐ŸŽ‰ Test Environment Deployed Successfully!${NC}" +echo "==================================================" +echo "" +echo -e "${YELLOW}๐Ÿ“‹ Next Steps:${NC}" +echo "" +echo "1. Create sample Iceberg table (run on EC2 or local machine with Spark):" +echo " python test-infrastructure/create-sample-iceberg-table.py $BUCKET_NAME $DATABASE_NAME" +echo "" +echo "2. Test the Lambda function:" +echo " aws lambda invoke \\" +echo " --function-name $LAMBDA_FUNCTION_NAME \\" +echo " --payload '{\"DATABASE_NAME\":\"$DATABASE_NAME\",\"TABLE_NAME\":\"sample_customers\",\"TEST_TYPE\":\"comprehensive\"}' \\" +echo " --region $AWS_REGION \\" +echo " response.json" +echo "" +echo "3. View the results:" +echo " cat response.json" +echo "" +echo -e "${YELLOW}๐Ÿ“Š Resources Created:${NC}" +echo " โ€ข S3 Bucket: $BUCKET_NAME" +echo " โ€ข Glue Database: $DATABASE_NAME" +echo " โ€ข Lambda Function: $LAMBDA_FUNCTION_NAME" +echo " โ€ข ECR Repository: $ECR_REPO_NAME" +echo " โ€ข IAM Role: SparkLambdaIcebergRole-$STACK_NAME" +echo "" +echo -e "${YELLOW}๐Ÿ”— Useful Commands:${NC}" +echo " โ€ข View CloudWatch logs:" +echo " aws logs tail /aws/lambda/$LAMBDA_FUNCTION_NAME --follow --region $AWS_REGION" +echo "" +echo " โ€ข Clean up resources:" +echo " ./test-infrastructure/cleanup-test-environment.sh" +echo "" +echo -e "${GREEN}โœ… Deployment Complete!${NC}" \ No newline at end of file diff --git a/test-infrastructure/iceberg-test-setup.yaml b/test-infrastructure/iceberg-test-setup.yaml new file mode 100644 index 0000000..554fb99 --- /dev/null +++ b/test-infrastructure/iceberg-test-setup.yaml @@ -0,0 +1,140 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: 'Test infrastructure for Iceberg Glue Catalog integration with Spark on Lambda' + +Parameters: + BucketName: + Type: String + Default: 'spark-lambda-iceberg-test' + Description: 'S3 bucket name for test data and scripts' + + DatabaseName: + Type: String + Default: 'iceberg_test_db' + Description: 'Glue database name for Iceberg tables' + +Resources: + # S3 Bucket for test data and scripts + TestDataBucket: + Type: AWS::S3::Bucket + Properties: + BucketName: !Sub '${BucketName}-${AWS::AccountId}' + VersioningConfiguration: + Status: Enabled + PublicAccessBlockConfiguration: + BlockPublicAcls: true + BlockPublicPolicy: true + IgnorePublicAcls: true + RestrictPublicBuckets: true + + # Glue Database + IcebergTestDatabase: + Type: AWS::Glue::Database + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseInput: + Name: !Ref DatabaseName + Description: 'Test database for Iceberg tables' + + # Sample Iceberg Table + SampleIcebergTable: + Type: AWS::Glue::Table + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseName: !Ref IcebergTestDatabase + TableInput: + Name: 'sample_customers' + Description: 'Sample Iceberg table for testing' + TableType: 'EXTERNAL_TABLE' + Parameters: + table_type: 'ICEBERG' + metadata_location: !Sub 's3://${BucketName}-${AWS::AccountId}/iceberg-tables/sample_customers/metadata/00000-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json' + StorageDescriptor: + Location: !Sub 's3://${BucketName}-${AWS::AccountId}/iceberg-tables/sample_customers/' + InputFormat: 'org.apache.iceberg.mr.hive.HiveIcebergInputFormat' + OutputFormat: 'org.apache.iceberg.mr.hive.HiveIcebergOutputFormat' + SerdeInfo: + SerializationLibrary: 'org.apache.iceberg.mr.hive.HiveIcebergSerDe' + Columns: + - Name: 'customer_id' + Type: 'bigint' + - Name: 'customer_name' + Type: 'string' + - Name: 'email' + Type: 'string' + - Name: 'registration_date' + Type: 'date' + - Name: 'total_orders' + Type: 'int' + - Name: 'total_spent' + Type: 'decimal(10,2)' + - Name: 'last_updated' + Type: 'timestamp' + + # Lambda Execution Role + LambdaExecutionRole: + Type: AWS::IAM::Role + Properties: + RoleName: !Sub 'SparkLambdaIcebergRole-${AWS::StackName}' + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + Policies: + - PolicyName: GlueCatalogAccess + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - glue:GetDatabase + - glue:GetTable + - glue:GetTables + - glue:GetPartition + - glue:GetPartitions + Resource: + - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog' + - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${DatabaseName}' + - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DatabaseName}/*' + - PolicyName: S3Access + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:ListBucket + - s3:GetBucketLocation + - s3:PutObject + Resource: + - !GetAtt TestDataBucket.Arn + - !Sub '${TestDataBucket.Arn}/*' + +Outputs: + BucketName: + Description: 'S3 bucket name for test data' + Value: !Ref TestDataBucket + Export: + Name: !Sub '${AWS::StackName}-BucketName' + + DatabaseName: + Description: 'Glue database name' + Value: !Ref IcebergTestDatabase + Export: + Name: !Sub '${AWS::StackName}-DatabaseName' + + TableName: + Description: 'Iceberg table name' + Value: 'sample_customers' + Export: + Name: !Sub '${AWS::StackName}-TableName' + + LambdaRoleArn: + Description: 'Lambda execution role ARN' + Value: !GetAtt LambdaExecutionRole.Arn + Export: + Name: !Sub '${AWS::StackName}-LambdaRoleArn' \ No newline at end of file From 36d164b5940ad7050cdf2ae1d1d4306452861688 Mon Sep 17 00:00:00 2001 From: John Cherian <42749436+JohnChe88@users.noreply.github.com> Date: Thu, 28 Aug 2025 12:39:10 -0400 Subject: [PATCH 2/4] Delete .vscode/settings.json --- .vscode/settings.json | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 3f836a2..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "kiroAgent.configureMCP": "Enabled" -} \ No newline at end of file From 35588da0856bba783d8539935f53f32f7081bd99 Mon Sep 17 00:00:00 2001 From: John Cherian <42749436+JohnChe88@users.noreply.github.com> Date: Thu, 28 Aug 2025 12:40:23 -0400 Subject: [PATCH 3/4] Delete examples/production-etl-pipeline.py --- examples/production-etl-pipeline.py | 289 ---------------------------- 1 file changed, 289 deletions(-) delete mode 100644 examples/production-etl-pipeline.py diff --git a/examples/production-etl-pipeline.py b/examples/production-etl-pipeline.py deleted file mode 100644 index 9c3d38b..0000000 --- a/examples/production-etl-pipeline.py +++ /dev/null @@ -1,289 +0,0 @@ -#!/usr/bin/env python3 -""" -Production ETL Pipeline: Complete example for processing Iceberg tables in Lambda -""" - -import os -import sys -import json -from datetime import datetime -from pyspark.sql import SparkSession -from pyspark.sql.functions import * -from pyspark.sql.types import * - -sys.path.append('/home/glue_functions') -from iceberg_glue_functions import ( - read_iceberg_table_with_spark, - get_iceberg_table_metadata -) - -def create_production_spark_session(app_name="Production-ETL"): - """Production-ready Spark session configuration""" - - aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] - aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] - session_token = os.environ['AWS_SESSION_TOKEN'] - aws_region = os.environ.get('AWS_REGION', 'us-east-1') - - return SparkSession.builder \ - .appName(app_name) \ - .master("local[*]") \ - .config("spark.driver.bindAddress", "127.0.0.1") \ - .config("spark.driver.memory", "5g") \ - .config("spark.executor.memory", "5g") \ - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ - .config("spark.sql.adaptive.enabled", "true") \ - .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ - .config("spark.sql.adaptive.skewJoin.enabled", "true") \ - .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ - .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ - .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ - .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ - .config("spark.sql.catalog.glue_catalog.glue.region", aws_region) \ - .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \ - .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \ - .config("spark.hadoop.fs.s3a.session.token", session_token) \ - .config("spark.hadoop.fs.s3a.aws.credentials.provider", - "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \ - .getOrCreate() - -class IcebergETLPipeline: - """Production ETL Pipeline for Iceberg tables""" - - def __init__(self, config): - self.config = config - self.spark = create_production_spark_session(config.get('app_name', 'ETL-Pipeline')) - - def read_source_table(self, database, table, filters=None): - """Read source Iceberg table with optional filters""" - - print(f"๐Ÿ“– Reading source table: {database}.{table}") - - try: - df = self.spark.read.format("iceberg").load(f"glue_catalog.{database}.{table}") - - # Apply filters if provided - if filters: - for filter_condition in filters: - df = df.filter(filter_condition) - print(f" Applied filter: {filter_condition}") - - row_count = df.count() - print(f" Loaded {row_count} rows") - - return df - - except Exception as e: - print(f"โŒ Error reading table {database}.{table}: {e}") - raise - - def transform_data(self, df, transformations): - """Apply data transformations""" - - print("๐Ÿ”„ Applying transformations...") - - for i, transform in enumerate(transformations, 1): - print(f" Transformation {i}: {transform['description']}") - - if transform['type'] == 'add_column': - df = df.withColumn(transform['column'], expr(transform['expression'])) - - elif transform['type'] == 'filter': - df = df.filter(transform['condition']) - - elif transform['type'] == 'aggregate': - df = df.groupBy(*transform['group_by']) \ - .agg(*[expr(agg) for agg in transform['aggregations']]) - - elif transform['type'] == 'join': - other_df = self.read_source_table( - transform['join_table']['database'], - transform['join_table']['table'] - ) - df = df.join(other_df, transform['join_condition'], transform['join_type']) - - elif transform['type'] == 'custom': - # Custom transformation function - df = transform['function'](df) - - final_count = df.count() - print(f" Final row count: {final_count}") - - return df - - def write_to_target(self, df, target_config): - """Write processed data to target location""" - - print(f"๐Ÿ’พ Writing to target: {target_config['location']}") - - writer = df.write.mode(target_config.get('mode', 'overwrite')) - - # Configure output format - if target_config['format'] == 'iceberg': - # Write to Iceberg table - table_identifier = f"glue_catalog.{target_config['database']}.{target_config['table']}" - - writer = writer.format("iceberg") - - # Add Iceberg-specific options - if 'iceberg_options' in target_config: - for key, value in target_config['iceberg_options'].items(): - writer = writer.option(key, value) - - writer.save(table_identifier) - - elif target_config['format'] == 'parquet': - writer.format("parquet").save(target_config['location']) - - elif target_config['format'] == 'delta': - writer.format("delta").save(target_config['location']) - - print(f" โœ… Data written successfully") - - def run_pipeline(self): - """Execute the complete ETL pipeline""" - - print("๐Ÿš€ Starting ETL Pipeline") - print("=" * 50) - - try: - # Step 1: Read source data - source_df = self.read_source_table( - self.config['source']['database'], - self.config['source']['table'], - self.config['source'].get('filters') - ) - - # Step 2: Apply transformations - if 'transformations' in self.config: - transformed_df = self.transform_data(source_df, self.config['transformations']) - else: - transformed_df = source_df - - # Step 3: Data quality checks - if 'quality_checks' in self.config: - self.run_quality_checks(transformed_df, self.config['quality_checks']) - - # Step 4: Write to target - self.write_to_target(transformed_df, self.config['target']) - - print("๐ŸŽ‰ ETL Pipeline completed successfully!") - - return { - 'status': 'success', - 'rows_processed': transformed_df.count(), - 'timestamp': datetime.now().isoformat() - } - - except Exception as e: - print(f"โŒ ETL Pipeline failed: {e}") - return { - 'status': 'failed', - 'error': str(e), - 'timestamp': datetime.now().isoformat() - } - - finally: - self.spark.stop() - - def run_quality_checks(self, df, checks): - """Run data quality checks""" - - print("๐Ÿ” Running data quality checks...") - - for check in checks: - if check['type'] == 'row_count': - count = df.count() - min_rows = check.get('min_rows', 0) - max_rows = check.get('max_rows', float('inf')) - - if not (min_rows <= count <= max_rows): - raise ValueError(f"Row count {count} outside expected range [{min_rows}, {max_rows}]") - - print(f" โœ… Row count check passed: {count} rows") - - elif check['type'] == 'null_check': - for column in check['columns']: - null_count = df.filter(col(column).isNull()).count() - if null_count > 0: - raise ValueError(f"Found {null_count} null values in column {column}") - - print(f" โœ… Null check passed for columns: {check['columns']}") - - elif check['type'] == 'custom': - # Custom quality check function - check['function'](df) - -# Example usage in Lambda handler -def lambda_handler(event, context): - """Lambda handler for ETL pipeline""" - - # Example configuration - etl_config = { - 'app_name': 'Customer-Analytics-ETL', - 'source': { - 'database': 'raw_data', - 'table': 'customer_events', - 'filters': [ - "event_date >= '2024-01-01'", - "event_type IN ('purchase', 'signup')" - ] - }, - 'transformations': [ - { - 'type': 'add_column', - 'description': 'Add processing timestamp', - 'column': 'processed_at', - 'expression': 'current_timestamp()' - }, - { - 'type': 'aggregate', - 'description': 'Aggregate by customer and event type', - 'group_by': ['customer_id', 'event_type'], - 'aggregations': [ - 'count(*) as event_count', - 'sum(amount) as total_amount', - 'max(event_date) as last_event_date' - ] - } - ], - 'quality_checks': [ - { - 'type': 'row_count', - 'min_rows': 1 - }, - { - 'type': 'null_check', - 'columns': ['customer_id', 'event_type'] - } - ], - 'target': { - 'format': 'iceberg', - 'database': 'analytics', - 'table': 'customer_summary', - 'mode': 'overwrite', - 'iceberg_options': { - 'write.format.default': 'parquet', - 'write.parquet.compression-codec': 'snappy' - } - } - } - - # Override config with event parameters if provided - if 'config' in event: - etl_config.update(event['config']) - - # Run pipeline - pipeline = IcebergETLPipeline(etl_config) - result = pipeline.run_pipeline() - - return { - 'statusCode': 200 if result['status'] == 'success' else 500, - 'body': json.dumps(result) - } - -if __name__ == "__main__": - # Test the pipeline locally - test_event = {} - result = lambda_handler(test_event, None) - print(json.dumps(result, indent=2)) \ No newline at end of file From cddd7bef4c1aad60f738a6c8fe47f445ad927c86 Mon Sep 17 00:00:00 2001 From: johnche88 Date: Thu, 28 Aug 2025 12:47:17 -0400 Subject: [PATCH 4/4] Clean up lambda-deployment folder and update integration summary --- ICEBERG_INTEGRATION_SUMMARY.md | 3 +- lambda-deployment/deploy-production-lambda.sh | 158 --------- lambda-deployment/spark-iceberg-reader.py | 309 ------------------ lambda-deployment/spark-iceberg-reader.zip | Bin 2638 -> 0 bytes 4 files changed, 1 insertion(+), 469 deletions(-) delete mode 100755 lambda-deployment/deploy-production-lambda.sh delete mode 100644 lambda-deployment/spark-iceberg-reader.py delete mode 100644 lambda-deployment/spark-iceberg-reader.zip diff --git a/ICEBERG_INTEGRATION_SUMMARY.md b/ICEBERG_INTEGRATION_SUMMARY.md index 12f4b97..de321bb 100644 --- a/ICEBERG_INTEGRATION_SUMMARY.md +++ b/ICEBERG_INTEGRATION_SUMMARY.md @@ -6,8 +6,7 @@ - `libs/glue_functions/iceberg_glue_functions.py` - Core Iceberg functions for Glue Catalog integration ### Production Code -- `lambda-deployment/spark-iceberg-reader.py` - Production Lambda handler -- `lambda-deployment/deploy-production-lambda.sh` - Deployment script +- `sparkLambdaHandler.py` - Main Lambda handler with Iceberg support ### Key Examples (Kept) - `examples/advanced-iceberg-features.py` - Time travel and metadata queries diff --git a/lambda-deployment/deploy-production-lambda.sh b/lambda-deployment/deploy-production-lambda.sh deleted file mode 100755 index eab5ff0..0000000 --- a/lambda-deployment/deploy-production-lambda.sh +++ /dev/null @@ -1,158 +0,0 @@ -#!/bin/bash - -# Deploy Production Iceberg Lambda Function -set -e - -# Configuration -FUNCTION_NAME="spark-iceberg-production" -AWS_REGION="us-east-1" -AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) -ECR_REPO_NAME="sparkonlambda-iceberg" -IMAGE_URI="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${ECR_REPO_NAME}:latest" - -# Colors -RED='\033[0;31m' -GREEN='\033[0;32m' -YELLOW='\033[1;33m' -BLUE='\033[0;34m' -NC='\033[0m' - -echo -e "${BLUE}๐Ÿš€ Deploying Production Iceberg Lambda Function${NC}" -echo "==================================================" - -# Get Lambda role ARN from CloudFormation -LAMBDA_ROLE_ARN=$(aws cloudformation describe-stacks \ - --stack-name spark-lambda-iceberg-test \ - --query 'Stacks[0].Outputs[?OutputKey==`LambdaRoleArn`].OutputValue' \ - --output text \ - --region $AWS_REGION) - -echo -e "${BLUE}๐Ÿ“‹ Configuration:${NC}" -echo " Function Name: $FUNCTION_NAME" -echo " AWS Region: $AWS_REGION" -echo " AWS Account: $AWS_ACCOUNT_ID" -echo " Image URI: $IMAGE_URI" -echo " Lambda Role: $LAMBDA_ROLE_ARN" -echo "" - -# Step 1: Update the Lambda handler in the Docker image -echo -e "${YELLOW}๐Ÿ“ฆ Step 1: Updating Lambda handler in container...${NC}" - -# Copy the new handler to the project -cp lambda-deployment/spark-iceberg-reader.py sparkLambdaHandler.py - -echo -e "${GREEN}โœ… Updated Lambda handler${NC}" - -# Step 2: Rebuild and push Docker image -echo -e "${YELLOW}๐Ÿณ Step 2: Rebuilding Docker image...${NC}" - -# Login to ECR -aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com - -# Build new image -docker build --build-arg FRAMEWORK=ICEBERG -t $ECR_REPO_NAME . - -# Tag and push -docker tag $ECR_REPO_NAME:latest $IMAGE_URI -docker push $IMAGE_URI - -echo -e "${GREEN}โœ… Docker image rebuilt and pushed${NC}" - -# Step 3: Create or update Lambda function -echo -e "${YELLOW}โšก Step 3: Creating/updating Lambda function...${NC}" - -# Check if function exists -if aws lambda get-function --function-name $FUNCTION_NAME --region $AWS_REGION > /dev/null 2>&1; then - echo -e "${YELLOW}โš ๏ธ Function exists. Updating...${NC}" - - # Update function code - aws lambda update-function-code \ - --function-name $FUNCTION_NAME \ - --image-uri $IMAGE_URI \ - --region $AWS_REGION - - # Update function configuration - aws lambda update-function-configuration \ - --function-name $FUNCTION_NAME \ - --role $LAMBDA_ROLE_ARN \ - --timeout 900 \ - --memory-size 3008 \ - --environment Variables='{"DATABASE_NAME":"iceberg_test_db","TABLE_NAME":"sample_customers","AWS_REGION":"'$AWS_REGION'"}' \ - --region $AWS_REGION - -else - echo -e "${YELLOW}๐Ÿ†• Creating new Lambda function...${NC}" - - aws lambda create-function \ - --function-name $FUNCTION_NAME \ - --role $LAMBDA_ROLE_ARN \ - --code ImageUri=$IMAGE_URI \ - --package-type Image \ - --timeout 900 \ - --memory-size 3008 \ - --environment Variables='{"DATABASE_NAME":"iceberg_test_db","TABLE_NAME":"sample_customers","AWS_REGION":"'$AWS_REGION'"}' \ - --region $AWS_REGION -fi - -echo -e "${GREEN}โœ… Lambda function deployed: $FUNCTION_NAME${NC}" - -# Step 4: Test the function -echo -e "${YELLOW}๐Ÿงช Step 4: Testing the Lambda function...${NC}" - -# Create test event -cat > test-event.json << EOF -{ - "operation": "read_table", - "database": "iceberg_test_db", - "table": "sample_customers", - "limit": 5, - "include_analytics": true -} -EOF - -# Invoke function -echo -e "${BLUE}๐Ÿ“ค Invoking Lambda function...${NC}" -aws lambda invoke \ - --function-name $FUNCTION_NAME \ - --payload file://test-event.json \ - --region $AWS_REGION \ - response.json - -# Check response -if [ $? -eq 0 ]; then - echo -e "${GREEN}โœ… Lambda invocation successful${NC}" - echo -e "${BLUE}๐Ÿ“„ Response:${NC}" - cat response.json | jq '.' 2>/dev/null || cat response.json - echo "" -else - echo -e "${RED}โŒ Lambda invocation failed${NC}" -fi - -# Step 5: Show CloudWatch logs -echo -e "${YELLOW}๐Ÿ“‹ Step 5: Recent CloudWatch logs:${NC}" -aws logs tail /aws/lambda/$FUNCTION_NAME --since 2m --region $AWS_REGION - -echo "" -echo -e "${BLUE}๐ŸŽ‰ Production Lambda Deployment Complete!${NC}" -echo "==================================================" -echo "" -echo -e "${YELLOW}๐Ÿ“Š Function Details:${NC}" -echo " โ€ข Function Name: $FUNCTION_NAME" -echo " โ€ข Memory: 3008 MB" -echo " โ€ข Timeout: 15 minutes" -echo " โ€ข Runtime: Container (Spark + Iceberg)" -echo "" -echo -e "${YELLOW}๐Ÿ”— Useful Commands:${NC}" -echo " โ€ข Test function:" -echo " aws lambda invoke --function-name $FUNCTION_NAME --payload file://test-event.json response.json" -echo "" -echo " โ€ข View logs:" -echo " aws logs tail /aws/lambda/$FUNCTION_NAME --follow --region $AWS_REGION" -echo "" -echo " โ€ข Update function:" -echo " ./lambda-deployment/deploy-production-lambda.sh" -echo "" -echo -e "${GREEN}โœ… Your Iceberg Lambda function is ready for production use!${NC}" - -# Cleanup -rm -f test-event.json \ No newline at end of file diff --git a/lambda-deployment/spark-iceberg-reader.py b/lambda-deployment/spark-iceberg-reader.py deleted file mode 100644 index 2dc57ca..0000000 --- a/lambda-deployment/spark-iceberg-reader.py +++ /dev/null @@ -1,309 +0,0 @@ -#!/usr/bin/env python3 -""" -Production Lambda function for reading Iceberg tables from Glue Catalog -This is the actual handler that will run in the Lambda container -""" - -import json -import logging -import os -import sys -from datetime import datetime - -from pyspark.sql import SparkSession -from pyspark.sql.functions import * - -# Add glue functions to path -sys.path.append('/home/glue_functions') - -# Set up logging -logger = logging.getLogger() -logger.setLevel(logging.INFO) - -def create_iceberg_spark_session(): - """Create Spark session optimized for Lambda with Iceberg support""" - - aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] - aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] - session_token = os.environ['AWS_SESSION_TOKEN'] - aws_region = os.environ.get('AWS_REGION', 'us-east-1') - - logger.info("Creating Spark session with Iceberg configuration...") - - spark = SparkSession.builder \ - .appName("Lambda-Iceberg-Reader") \ - .master("local[*]") \ - .config("spark.driver.bindAddress", "127.0.0.1") \ - .config("spark.driver.memory", "5g") \ - .config("spark.executor.memory", "5g") \ - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ - .config("spark.sql.adaptive.enabled", "true") \ - .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ - .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ - .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ - .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ - .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ - .config("spark.sql.catalog.glue_catalog.glue.region", aws_region) \ - .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \ - .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \ - .config("spark.hadoop.fs.s3a.session.token", session_token) \ - .config("spark.hadoop.fs.s3a.aws.credentials.provider", - "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \ - .getOrCreate() - - logger.info("Spark session created successfully") - return spark - -def read_iceberg_table(spark, database_name, table_name, limit=None, filters=None): - """Read Iceberg table with optional filters and limit""" - - table_identifier = f"glue_catalog.{database_name}.{table_name}" - logger.info(f"Reading Iceberg table: {table_identifier}") - - try: - # Read the table - df = spark.read.format("iceberg").load(table_identifier) - - # Apply filters if provided - if filters: - for filter_condition in filters: - df = df.filter(filter_condition) - logger.info(f"Applied filter: {filter_condition}") - - # Apply limit if provided - if limit: - df = df.limit(limit) - - return df - - except Exception as e: - logger.error(f"Error reading table {table_identifier}: {e}") - raise - -def get_table_analytics(df): - """Get basic analytics from the DataFrame""" - - logger.info("Computing table analytics...") - - try: - # Basic stats - total_count = df.count() - - # Get numeric columns for analytics - numeric_columns = [] - for field in df.schema.fields: - if field.dataType.typeName() in ['integer', 'long', 'double', 'decimal', 'float']: - numeric_columns.append(field.name) - - analytics = { - 'total_rows': total_count, - 'columns': len(df.columns), - 'column_names': df.columns - } - - # Add numeric analytics if available - if numeric_columns: - for col_name in numeric_columns: - try: - stats = df.agg( - avg(col_name).alias('avg'), - min(col_name).alias('min'), - max(col_name).alias('max') - ).collect()[0] - - analytics[f'{col_name}_stats'] = { - 'avg': float(stats['avg']) if stats['avg'] else None, - 'min': float(stats['min']) if stats['min'] else None, - 'max': float(stats['max']) if stats['max'] else None - } - except Exception as e: - logger.warning(f"Could not compute stats for {col_name}: {e}") - - return analytics - - except Exception as e: - logger.error(f"Error computing analytics: {e}") - return {'total_rows': 0, 'error': str(e)} - -def convert_rows_to_json(rows): - """Convert Spark rows to JSON-serializable format""" - - results = [] - for row in rows: - row_dict = {} - for field in row.__fields__: - value = getattr(row, field) - - # Handle different data types - if value is None: - row_dict[field] = None - elif hasattr(value, 'isoformat'): # datetime - row_dict[field] = value.isoformat() - elif str(type(value)) == "": - row_dict[field] = float(value) - elif str(type(value)) == "": - row_dict[field] = value.isoformat() - else: - row_dict[field] = value - - results.append(row_dict) - - return results - -def lambda_handler(event, context): - """ - Main Lambda handler for Iceberg table operations - - Event format: - { - "operation": "read_table", - "database": "iceberg_test_db", - "table": "sample_customers", - "limit": 10, - "filters": ["total_spent > 300"], - "include_analytics": true - } - """ - - logger.info("๐Ÿš€ Starting Iceberg Lambda handler") - logger.info(f"Event: {json.dumps(event)}") - - # Parse event parameters - operation = event.get('operation', 'read_table') - database_name = event.get('database', os.environ.get('DATABASE_NAME', 'iceberg_test_db')) - table_name = event.get('table', os.environ.get('TABLE_NAME', 'sample_customers')) - limit = event.get('limit', 10) - filters = event.get('filters', []) - include_analytics = event.get('include_analytics', True) - - logger.info(f"Operation: {operation}") - logger.info(f"Target table: {database_name}.{table_name}") - - spark = None - - try: - # Create Spark session - spark = create_iceberg_spark_session() - - if operation == 'read_table': - # Read table data - df = read_iceberg_table(spark, database_name, table_name, limit, filters) - - # Get sample data - sample_rows = df.collect() - sample_data = convert_rows_to_json(sample_rows) - - # Prepare response - response_body = { - 'message': 'Successfully read Iceberg table', - 'database': database_name, - 'table': table_name, - 'operation': operation, - 'filters_applied': filters, - 'sample_data': sample_data, - 'sample_count': len(sample_data), - 'timestamp': datetime.now().isoformat() - } - - # Add analytics if requested - if include_analytics: - # Read full table for analytics (without limit) - full_df = read_iceberg_table(spark, database_name, table_name, filters=filters) - analytics = get_table_analytics(full_df) - response_body['analytics'] = analytics - - logger.info(f"โœ… Successfully processed {len(sample_data)} rows") - - return { - 'statusCode': 200, - 'headers': { - 'Content-Type': 'application/json' - }, - 'body': json.dumps(response_body) - } - - elif operation == 'table_info': - # Get table information only - df = read_iceberg_table(spark, database_name, table_name, limit=1) - - schema_info = [] - for field in df.schema.fields: - schema_info.append({ - 'name': field.name, - 'type': str(field.dataType), - 'nullable': field.nullable - }) - - # Get full count - full_df = read_iceberg_table(spark, database_name, table_name) - total_count = full_df.count() - - response_body = { - 'message': 'Table information retrieved', - 'database': database_name, - 'table': table_name, - 'total_rows': total_count, - 'schema': schema_info, - 'timestamp': datetime.now().isoformat() - } - - return { - 'statusCode': 200, - 'headers': { - 'Content-Type': 'application/json' - }, - 'body': json.dumps(response_body) - } - - else: - return { - 'statusCode': 400, - 'headers': { - 'Content-Type': 'application/json' - }, - 'body': json.dumps({ - 'error': f'Unknown operation: {operation}', - 'supported_operations': ['read_table', 'table_info'] - }) - } - - except Exception as e: - logger.error(f"โŒ Lambda execution failed: {str(e)}") - - return { - 'statusCode': 500, - 'headers': { - 'Content-Type': 'application/json' - }, - 'body': json.dumps({ - 'error': str(e), - 'message': 'Iceberg table operation failed', - 'timestamp': datetime.now().isoformat() - }) - } - - finally: - if spark: - logger.info("๐Ÿ”ง Stopping Spark session") - spark.stop() - -# For testing locally -if __name__ == "__main__": - # Test event - test_event = { - "operation": "read_table", - "database": "iceberg_test_db", - "table": "sample_customers", - "limit": 5, - "include_analytics": True - } - - # Mock context - class MockContext: - def __init__(self): - self.function_name = "test-iceberg-reader" - self.memory_limit_in_mb = 3008 - self.invoked_function_arn = "arn:aws:lambda:us-east-1:123456789012:function:test" - - result = lambda_handler(test_event, MockContext()) - print(json.dumps(result, indent=2)) \ No newline at end of file diff --git a/lambda-deployment/spark-iceberg-reader.zip b/lambda-deployment/spark-iceberg-reader.zip deleted file mode 100644 index 7db13a47af08193a94932c7af775b09c947d5b1f..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2638 zcmb8xXE@u79tZG$NbI5`_Keo5y?2WS5wT~@SV0gqN{$-QgDTY;RW(9TQKM)%)ZUF% zhbS%8T2)Fb_Na1u?tOQk=f1el@A*Ez@2l_Yk0p{Gd5Z24_6Z5Z;e&jF!{x(5(4kmae=i?TpHPf!s1MrPCsZy3Z)eL4(AS05 z`qlqVL=-Ck0v~_?;NR0n4{8HOS09g{!M3IKsw$qO^#k zc=18?1afqJtWelsYdzO zkIgdgxb-Zl4^y1#=PjPL*<)W|)M7x$4Z@XR(9VyQ?~FK{yywLWTW?erR9N~7d=Cgr zucG8!YnzPcscYe%a${hdJZoZX;|ezrGn;g233g`#_$ZF;Lc)|DQg|_qEcs}CSN>W@ zt00)$-90u!HDo`f^u6HaaJBSReSw#X!{v;b%=nP)j+6;=d=;!vyz%#NP?)>_uRf6@aic@m-d$8qI%|*WW!`zfU zeokf--V_;iSPgX((sO{xX}cjRmp7l>CiXFhsm{cr6cq+Z1*076!Pi~e8a*S9Z)PS) zr;3yrq>&NC%c*%_-Yk4hrhblc;{efs;hw{{t1D8Zgu0|3GQ&{}L!Hk=WDki?3eqUL z+aozBHJm>zmtE%j%zS#e6kjll$%v)3m+N?WusoJY_vJvMrJYYoeRj;t-sRLLY8(Ws8vuE zt%LPly~N$W0#Fqbq+-?Z<4jEW&?kk$Go3Ia!>=>(@4zhh%rWbF$dyc%a%pojdfqjQW9#>Anvlru^tHf%Ob`cL< zKGoBNTF9nn54!JoCODxhD!oKMpxX>PozF8Z(tC3Iam}cCio_>eNz7B&O(?3z8yF_t zATfG^HHq@YQ*^?2bYEz!897V}%Ws;7)EaZj%s->5^H(P+fKt6H(P2SO*Zyr0xbE3htv>U*-Kix|*hqwqA%>o!5 z>vL^DN4Y_V{FG6wj0O+d2<;IxFQ1Q8y?aWlejDQqP0%@m;8P(c5_lg@oD=KIa}kRV z&tUY+FGyBk8PjJvNF_Iu)UJTh)qRuCW;gmjDq{ZdJ>S<4Vq!@cA@tVVE?b;b(!GsB z9aVf4XYH4h{6gP{de$fitGy(sia;&0O}r3EuG7$7yj;x41n)Jl=eaKp#`l8)dM%Tw z6gu4Dm$AePx>(rsxkcJ*n7ceB$5&eE+g;;l(<08rCqpf{;tH+L&a;ik#l&|z?mcN1 zK;pQ!*|mIg$WgXfqaw{=-U~GKhch0YpWge+Tt8&Mrt+E#wdB0UnsO7`bgHT+E>R~5 z*CF4Y_WQ27(3pl9_`Fd=s;qCok|SaeVV<7gkzcHA`;o#Qwd8S(f7BL#C|4Lpet-N3 zlVRyY8$M0uhk1M~MVu7Q4cxZ?v0JHaUEwI~Q*$k05bb zsM69yeWt`425_6l-iwXdddemu0^t(CZt8dmgrL_oUwf5h($8CgK+o^VUU+;=DN@X9 zAtq}EefLmsxDy`Pz95A~S6Fr5(Qw(PU9wy9$uaJrVASMaQ3tA>C{3;9n#%iFGP?M@vFtYMR*ISTt5&S%c2Mrs1J?fgN9Vi z_~M<2IvkmYM7x^Lm9zZbd^Al>yW@+~0xvTz9~ykzGCl}L!A$iy%tb3693{0=$8Vd9 zI1(xQww#$mE-u?l#)PcW(i};CuferOPAg935S0Q2fa2dW*C9!P55Jys8gm`TSjddL zTTS`#UNSX7{%sWCFMOa@D#Z*UYt3&Y6m#5apE<&@+AA7+p%ouGD@gyUulKK+j40zE z8%O>zJi+EE2a*iUw0=4)Ww^slzv$8u>QNn z{JCbCB=uca)2P%JGwzBEc2^s@pl;}bV7^rskDj(4qfe3d>bxu{jdI=P=S^958ot5y zI|o-?LEg=okx^GO+Z;6yOFdk?y1IeiQ$g#Vg40H7;Z0`i$mjN%!6tAx>v*;uNjxT} z{P=z&Ur|%4`$zbqrg!uCOV_f}7mLeE~@u<`N~<{^lV{!wz+$1jfSsH^z`ffvkx=a zCyq*XI>Q?{oT$d1fltEJx~&+cY+BW!8mJmdC_K zt>;&72RA=fJ+?M>*Qs4L!!~SkuH~1u>GqI9Rz&N34zW2cpjn9}m&|PSvLNOISHoY* zJtuI{(`{O;lV8)Hdzc88pQ!e>4T(vMxSz7)k9ox#rjjPMqbt9QX*ZiY+^y2B6S%?_ zsL%E~5Ebi;shBEPx9Vijmfd$c<+enE=!EG17v*?<6Au7>iwr0u{L}sq*g^gSyZ=I+ VB@)8$&k*SM;eP+Y-)aPae*t2W*~$O_