In this project, I developed a real-time analytics pipeline to demonstrate how user conversion funnels can be monitored for a multi-category e-commerce platform. By leveraging a large-scale Kaggle dataset (eCommerce Behavior Data) containing 285 million users' events from November 2019 to April 2020, I engineered a system to track user journeys from product view → add to cart → purchase with millisecond latency. The goal demonstrate how was retailers can be empowered with actionable insights to detect drop-offs, identify trending products and understand customer behaviour.
E-commerce businesses need immediate insight into user clickstream behavior to understand conversion funnels, identify friction points (drop-offs), recognize popular items, and personalize the user experience in real-time. Currently, delays in processing this data prevent timely actions like triggering relevant offers or addressing site issues, potentially leading to user abandonment. This project aims to build a system to ingest, process, and visualize this data live.
Pipeline Type: Hybrid
This project utilizes a hybrid data pipeline architecture, combining elements of both batch and streaming processing to handle historical data simulation and real-time analytics effectively.
-
Data Ingestion:
-
Source: Historical e-commerce event data (Oct 2019 - Apr 2020) downloaded as compressed CSV files (.csv.gz) from Open CDP using Python.
-
Initial Processing: Due to the large dataset size (~8GB uncompressed), initial processing and cleaning are performed using PySpark on the downloaded files.
-
Streaming Simulation: A Python script leveraging a Kafka client acts as a Kafka Producer. It reads the processed data and streams it to a Confluent Cloud Kafka Cluster topic. The schema for this raw event stream is defined using JSON Schema.
-
Ingestion Frequency & Orchestration: To simulate a continuous stream without incurring excessive costs and to manage data volume for the project scope, Apache Airflow is used to orchestrate the Python Kafka producer script. Airflow triggers the script at 3-hour intervals. In each run, the script streams 10,000 rows for each day within a specific month (e.g., all of Oct 2019's daily 10k rows streamed in one go), iterating through the months (Oct 2019 to Apr 2020) in subsequent runs. This provides a controlled, time-distributed flow of data into Kafka.
-
-
Data Lake/Warehouse:
- Technology: Snowflake serves as the central Data Warehouse.
- Setup:: Data lands in Snowflake via two separate Confluent Cloud Snowflake Sink Connectors:
-
Raw Data Sink: Consumes the original event stream (JSON format) directly from the initial Kafka topic. This data populates tables intended for batch processing and historical analysis.
-
Real-time Aggregates Sink: Consumes processed, aggregated data (AVRO format) from separate Kafka topic generated by Flink. This populates tables optimized for the real-time dashboard.
-
- Organization: Data within Snowflake is explicitly organised into three logical layers: a raw layer - schema that stores the unmodified event data ingested directly from Kafka; a processed layer - schema that holds near real-time analytics and aggregates derived from streaming pipelines; and a
REPORTINGschema used for historical batch processing and advanced analytics. This layered architecture ensures data traceability, efficient querying, and clear separation of concerns across different processing stages. To enhance performance and reduce query latency tables in were partitioned based on relevant attributes such as event timestamps and entitiy identifiers.
-
Data Transformation:
-
Initial Batch Transformation (Pre-Kafka): PySpark is used initially to handle the large raw CSV files, likely performing cleaning, basic structuring, and potentially filtering before the data is ready for the Kafka producer.
-
Streaming Transformation: Flink SQL on Confluent Cloud is used for real-time stream processing. It consumes data from the raw Kafka topic, performs calculations like rolling aggregations, windowing functions, or filtering based on event types (view, cart, purchase), and outputs the results to a separate Kafka topic using AVRO schema for efficiency and schema evolution capabilities.
-
Batch Transformation (Post-Kafka, Airflow Orchestrated): Once a streaming segment (e.g., one month's simulated data) is complete via the Python producer, Apache Airflow triggers batch transformation jobs directly within Snowflake using its SQL engine. These jobs run against the raw event data that has landed in Snowflake (via the first sink connector). They perform tasks like table refreshes or full recreations, calculating aggregates (e.g., daily/monthly summaries) needed for historical analysis and the batch dashboard in Metabase.
-
Near Real-time Transformation (Post-Flink): After the Flink-processed data lands in Snowflake (via the second sink connector), minor additional transformations using Snowflake SQL might occur to prepare the data specifically for the real-time dashboard, ensuring it's in the precise format needed for visualization and adheres to the 5-minute refresh requirement.
-
-
Workflow Orchestration:
-
Apache Airflow plays a crucial role in orchestrating the batch-oriented components of this hybrid pipeline. It manages a workflow (DAG) that includes at least two key tasks run at 3-hour intervals:
-
Triggering the Python Kafka producer script to simulate the streaming of a segment of historical data into Confluent Cloud Kafka.
-
Upon successful completion of the data streaming task, triggering the Snowflake SQL batch transformation jobs (e.g., stored procedures or SQL scripts) to process the newly arrived raw data in Snowflake, refreshing or recreating the tables used for batch analysis and the Metabase dashboard.
-
-
-
Airflow ensures that batch processing in Snowflake only happens after the corresponding data segment has been successfully streamed, maintaining data consistency for the batch layer. (Note: Airflow does not directly orchestrate the continuous Flink stream processing or the Snowflake Sink connectors, which operate based on data availability in Kafka).
-
Cloud Provider: Confluent Cloud Note: Confluent Cloud is a managed cloud service that runs on underlying providers. I opted to use AWS for this.
-
Infrastructure as Code (IaC): None
-
Workflow Orchestration: Airflow (Apache Airflow used to schedule the Kafka producer script and the Snowflake batch transformation jobs).
-
Data Warehouse: Snowflake (Used as the central repository for both raw event data and processed/aggregated data).
-
Batch Processing: PySpark (Used for initial processing of large raw CSV files before streaming), Snowflake SQL (Used for batch transformations triggered by Airflow within the data warehouse).
-
Stream Processing: Kafka (Specifically Confluent Cloud Kafka for messaging/event streaming), Flink (Specifically Flink SQL on Confluent Cloud for real-time analytics on the Kafka stream).
-
Dashboarding Tool: Metabase (For visualizing the results of the batch and near-real time processing).
-
Other Tools:
- Python: Used for initial data download script and the Kafka producer script simulating the event stream.
- Confluent Cloud: Managed platform service used to host and operate Kafka, Flink, and Schema Registry (implied for AVRO/JSON), simplifying infrastructure management for streaming components.
- Confluent Snowflake Sink Connector: A specific Kafka Connect plugin used to efficiently and reliably transfer data from Kafka topics (both raw JSON and processed AVRO) into Snowflake tables.
- JSON Schema: Used to define the structure of the raw event data being produced to the initial Kafka topic and sunk to Snowflake.
- AVRO Schema: Used to define the structure of the data processed by Flink, providing schema evolution capabilities and efficient serialization for the processed Kafka topic and subsequent sink to Snowflake.
Explanation of less common tools (if needed for course context):
-
Confluent Cloud: A fully managed, cloud-native service for Apache Kafka and its ecosystem components (like Flink, Kafka Connect, Schema Registry). It allows developers to build streaming applications without managing the underlying infrastructure.
-
AVRO (Apache Avro): A data serialization system that uses schemas (typically defined in JSON) to structure data. It's efficient for storage and transmission, supports schema evolution (handling changes in data structure over time), and integrates well with systems like Kafka and Hadoop. It is required by Confluent Cloud's Schema Registry to create 'Data Contracts'.
-
Confluent Snowflake Sink Connector: Part of the Kafka Connect framework, this specific connector is optimized to stream data from Kafka topics directly into Snowflake tables, handling schema mapping and data loading automatically.
The dashboard consists of 5 tiles: Please find the link to the Metabase Dashboard
This displays a horizontal bar chart showing the total revenue generated by each product category that can be filtered within a specific period (default overall). The chart specifically highlights the 15 categories that brought in the most revenue, usually sorted from highest to lowest. It provides the visualization that quickly identifies the most profitable product categories. It helps businesses understand which areas are driving the most significant portion of income, informing decisions about inventory investment, marketing focus, and identifying potential growth areas or underperforming categories relative to the top performers.
-
Tile 2: Top Selling Brand:
This uses a bar chart to display performance of different brands sold through the e-commerce platform. It offers valuable insights to the business by highlighting brands that are most popular with customer base. It can also be used to optimse inventory to ensure stock levels do not drop for these and sales maximised.
This uses a horizontal line chart where the horizontal axis represents time (days) and vertical axis represents total revenue amount ($). Each point shows how much revenue as achieved on a specific day.
These instructions outline the steps necessary to set up the environment, run the code, and reproduce the results of the E-commerce Stream Analytics project.
1. Prerequisites:
- Software:
- Git
- Python (3.8+ recommended) & pip package manager
- Docker & Docker Compose
- Java Development Kit (JDK) (Required for PySpark, check compatibility with your PySpark version)
- Apache Spark (including PySpark, matching your cluster or local setup if applicable)
- (Optional but Recommended) Snowflake SnowSQL CLI
- (Optional but Recommended) Kafka CLI tools (often bundled with Kafka or available from Confluent)
- Cloud Accounts & Services:
- Confluent Cloud Account: With access to create/manage Kafka Clusters, Topics, Flink SQL Workspaces, Schema Registry, and API Keys/Secrets. Ensure your plan supports the required throughput and Flink usage.
- Snowflake Account: With permissions to create databases, schemas, tables, warehouses, users, roles, and manage access.
- (Optional) AWS Account: If hosting Airflow/Metabase on EC2, or running PySpark on EMR, etc. (The document notes Confluent Cloud was run on AWS).
- Data Access:
- Ability to download the eCommerce Behavior Data dataset (originally linked from Open CDP). Note: This is a large dataset requiring significant download time and storage space.
- Code:
- Access to this project's code repository.
2. Setup:
- Clone Repository:
git clone <your_repository_url> cd <repository_directory>
- Install Python Dependencies: (Ensure you have a
requirements.txtfile listing libraries likeconfluent-kafka,pandas, etc.)pip install -r requirements.txt
- Download & Prepare Data:
- Download the compressed CSV files (
.csv.gz) for Oct 2019 - Apr 2020 from the source. - Place the downloaded files into a designated directory, e.g.,
data/raw/.
- Download the compressed CSV files (
- PySpark Environment:
- Ensure Apache Spark (with PySpark) and a compatible Java JDK are correctly installed and configured in your environment (local machine or cluster). Verify environment variables like
SPARK_HOMEandJAVA_HOMEare set. - Note: Processing the full dataset requires substantial memory and CPU resources.
- Ensure Apache Spark (with PySpark) and a compatible Java JDK are correctly installed and configured in your environment (local machine or cluster). Verify environment variables like
- Confluent Cloud Configuration:
- Log in to your Confluent Cloud account.
- Create a Kafka Cluster if one doesn't exist. Note the Bootstrap Servers.
- Create an API Key and Secret for Kafka access. Securely store these credentials.
- Enable Schema Registry. Note the Schema Registry endpoint and API Key/Secret (if different).
- Create two Kafka Topics:
ecommerce_raw_events(or your chosen name for raw data)ecommerce_aggregated_events(or your chosen name for Flink output)
- Set up a Flink SQL Workspace.
- Snowflake Configuration:
- Log in to your Snowflake account (using the UI or SnowSQL).
- Create a Warehouse (e.g.,
ANALYTICS_WH). - Create a Database (e.g.,
ECOMMERCE_DB). - Create Schemas:
RAW,PROCESSED,REPORTING. - Define and create the target Tables within the
RAWandPROCESSEDschemas that the Snowflake Sink connectors will write to. Ensure data types match the expected JSON/AVRO structures. - Create a User (e.g.,
KAFKA_CONNECTOR_USER,AIRFLOW_USER) and a Role (e.g.,PIPELINE_ROLE). Grant necessary privileges to the role (e.g.,USAGEon DB/Schema,INSERT,SELECT,CREATE TABLEetc. on relevant tables) and assign the role to the users. Securely store user credentials (password or private key). Note your Snowflake Account Identifier (e.g.,xy12345.eu-central-1).
- Confluent Cloud Snowflake Sink Connectors:
- Navigate to Connectors in your Confluent Cloud cluster UI.
- Launch and configure the Snowflake Sink Connector twice:
- Connector 1 (Raw Data):
- Input Topic:
ecommerce_raw_events - Value Format:
JSON - Snowflake Connection: Provide URL (e.g.,
<account_identifier>.snowflakecomputing.com), User, Role, Private Key/Password. - Target: Database
ECOMMERCE_DB, SchemaRAW. Configure table mapping (e.g., topic name to table name).
- Input Topic:
- Connector 2 (Aggregated Data):
- Input Topic:
ecommerce_aggregated_events - Value Format:
AVRO - Schema Registry: Provide SR endpoint and credentials.
- Snowflake Connection: Use the same credentials/details as above.
- Target: Database
ECOMMERCE_DB, SchemaPROCESSED. Configure table mapping.
- Input Topic:
- Connector 1 (Raw Data):
- Ensure both connectors launch successfully and are in a
RUNNINGstate.
- Apache Airflow Setup (using Docker):
- Navigate to the directory containing your Airflow
docker-compose.yamlfile (e.g.,airflow/). - Prepare environment variables: Create a
.envfile or modifydocker-compose.yamlto securely inject necessary credentials and configurations:- Confluent Cloud Bootstrap Servers, API Key/Secret.
- Snowflake Account Identifier, User, Password/Private Key, Role, Warehouse, Database, Schema (for transformations).
- Paths to data directories if needed by the producer script.
- Initialize Airflow (if first time setup, includes creating metadata DB, user): Follow standard Airflow Docker setup instructions (often involves
docker-compose up airflow-init). - Start Airflow services:
docker-compose up -d
- Access the Airflow UI (usually
http://localhost:8080). - Configure Airflow Connections:
- Create a
confluent_cloud_defaultconnection (type Kafka) using your bootstrap servers and API key/secret (via SASL/PLAIN). - Create a
snowflake_defaultconnection (type Snowflake) using your account identifier, login (user), password/private key path, role, warehouse, database, schema.
- Create a
- Ensure your DAG file (e.g.,
dags/ecommerce_pipeline_dag.py) is in the correctdagsfolder mapped in yourdocker-compose.yaml.
- Navigate to the directory containing your Airflow
- Metabase Setup:
- Run Metabase locally via Docker or use a hosted instance.
- Connect Metabase to your Snowflake instance:
- Add a database connection.
- Select Snowflake.
- Provide connection details (Account Identifier, User, Password, Role, Warehouse, Database).
- Ensure Metabase can connect and introspect the
PROCESSEDandREPORTINGschemas.
- Recreate the dashboard tiles based on the descriptions provided, querying the appropriate tables in Snowflake.
3. Execution:
- Initial Data Processing (PySpark):
- Execute your PySpark script (e.g.,
scripts/initial_process.py) to process the raw.csv.gzfiles fromdata/raw/and output the cleaned/structured data to a location accessible by the Kafka producer (e.g.,data/processed/). - Example command (adjust based on your script and Spark setup):
spark-submit --master local[*] scripts/initial_process.py --input data/raw/ --output data/processed/ - Ensure this completes successfully before proceeding.
- Execute your PySpark script (e.g.,
- Deploy and Run Flink SQL Job(s):
- Access your Flink SQL Workspace in Confluent Cloud.
- Paste, configure, and run the Flink SQL statement(s) that:
- Read from the
ecommerce_raw_eventstopic (as JSON). - Perform the required real-time aggregations/transformations.
- Write the results to the
ecommerce_aggregated_eventstopic (as AVRO, defining the schema).
- Read from the
- Ensure the Flink job is running continuously.
- Start & Monitor Airflow Pipeline:
- In the Airflow UI, locate your e-commerce pipeline DAG.
- Ensure the DAG is unpaused (toggle switch is on).
- The DAG should start running based on its schedule (3-hour interval). Monitor its progress:
- Task 1 (Kafka Producer): Verify logs indicate successful streaming of data segments to the
ecommerce_raw_eventstopic. - Task 2 (Snowflake Batch): After Task 1 succeeds, verify logs show successful execution of SQL transformations against the
RAWdata in Snowflake, updating tables in theREPORTINGschema.
- Task 1 (Kafka Producer): Verify logs indicate successful streaming of data segments to the
- Monitor End-to-End Flow:
- Kafka: Check message counts on topics in Confluent Cloud.
- Connectors: Ensure the Snowflake Sink connectors remain in a
RUNNINGstate without errors. - Snowflake: Query tables in
RAW,PROCESSED, andREPORTINGschemas to verify data is arriving and being transformed as expected. - Flink: Monitor the Flink job for any errors or backpressure.
- View Dashboard:
- Open your Metabase instance.
- Navigate to the dashboard created in the setup phase.
- Data should populate the visualizations. The tiles querying
PROCESSEDdata reflect the near real-time Flink pipeline, while tiles queryingREPORTINGdata reflect the Airflow-orchestrated batch updates. Refresh intervals in Metabase may affect perceived latency.
-
Challenge: Extracting and processing large datasets using Pandas proved challenging due to the memory limitations of the processing environment. I resolved this by first testing the environment's memory capacity and then switching to PySpark for datasets that exceeded the available memory threshold, as PySpark efficiently handled the larger volumes. I learned the importance of proactively assessing environmental memory constraints against data size and recognized PySpark's superior capability for processing large datasets, especially when leveraging cluster resources for speed.
-
Challenge: Configuring the Kafka producer to work seamlessly with the Snowflake Sink connector was difficult, particularly achieving compatibility across different data sources (Python producer vs. Flink) and schema formats. I resolved this by establishing clear data contracts per topic and standardizing on one schema format per topic. After finding JSON schemas worked for Python-produced data but not Flink-processed data with the Snowflake connector, I successfully switched the relevant topic and connector configuration to use the AVRO schema format, which proved compatible with both Flink and the Snowflake Sink connector. I learned the necessity of strict data contracts and consistent schema formats (like AVRO) per Kafka topic, and that connector compatibility can differ based on the upstream processing tool, requiring testing to find the right combination (in this case, AVRO for Flink-to-Snowflake).
-
Challenge: Initially, understanding and correctly implementing Snowflake's access control model, including the differences and interactions between roles, accounts, and privileges, was confusing. I resolved this by investing time in studying Snowflake's documentation and security concepts until I understood the hierarchical structure and could configure permissions appropriately. I learned the specific details of Snowflake's role-based access control (RBAC) system and the critical importance of mastering its structure for effective security management.
-
Challenge: Setting up the Apache Airflow environment correctly using Docker was challenging as it had been some time since I last worked extensively with this specific technology stack. I resolved this by reviewing current Docker and Airflow documentation, configuration best practices, and examples, which allowed me to successfully configure the necessary environment. I learned (or refreshed my knowledge on) the specifics of setting up Airflow within Docker and reinforced the need to consult up-to-date documentation when returning to technologies after a period of non-use.
streaming-ecom-analytics ├───airflow_docker │ ├───dags │ │ ├───ecomm_pipeline │ │ └───utils │ ├───data │ └───logs └───images


