Skip to content

Commit

Permalink
Add automated price scraper
Browse files Browse the repository at this point in the history
  • Loading branch information
ericciarla committed Dec 12, 2024
1 parent c855d3b commit 4c5c949
Show file tree
Hide file tree
Showing 15 changed files with 448 additions and 7 deletions.
7 changes: 0 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@

This repository contains example applications developed using Firecrawl. These examples demonstrate various implementations and use cases for Firecrawl.

## Contents

The repository currently includes the following example projects:

1. **local-website-chatbot**: An example of implementing a local chatbot for any website.
2. **roastmywebsite-example-app**: A sample application demonstrating website critique functionality.

## Getting Started

To explore these examples:
Expand Down
2 changes: 2 additions & 0 deletions automated_price_tracking/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FIRECRAWL_API_KEY=
POSTGRES_URL=
33 changes: 33 additions & 0 deletions automated_price_tracking/.github/workflows/check_prices.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: Price Check

on:
schedule:
# Runs every 6 hours
- cron: "0 0,6,12,18 * * *"
workflow_dispatch: # Allows manual triggering

jobs:
check-prices:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
cache: "pip"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run price checker
env:
FIRECRAWL_API_KEY: ${{ secrets.FIRECRAWL_API_KEY }}
POSTGRES_URL: ${{ secrets.POSTGRES_URL }}
DISCORD_WEBHOOK_URL: ${{ secrets.DISCORD_WEBHOOK_URL }}
run: python check_prices.py
2 changes: 2 additions & 0 deletions automated_price_tracking/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.venv
.env
31 changes: 31 additions & 0 deletions automated_price_tracking/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Automated Price Tracking System

A robust price tracking system that monitors product prices across e-commerce websites and notifies users of price changes through Discord.

## Features

- Automated price checking every 6 hours
- Support for multiple e-commerce platforms through Firecrawl API
- Discord notifications for price changes
- Historical price data storage in PostgreSQL database
- Interactive price history visualization with Streamlit

## Setup

1. Clone the repository
2. Install dependencies:

```bash
pip install -r requirements.txt
```

3. Configure environment variables:

```bash
cp .env.example .env
```

Then edit `.env` with your:
- Discord webhook URL
- Database credentials
- Firecrawl API key
Binary file not shown.
Binary file not shown.
Binary file not shown.
49 changes: 49 additions & 0 deletions automated_price_tracking/check_prices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import asyncio
from database import Database
from dotenv import load_dotenv
from firecrawl import FirecrawlApp
from scraper import scrape_product
from notifications import send_price_alert

load_dotenv()

db = Database(os.getenv("POSTGRES_URL"))
app = FirecrawlApp()

# Threshold percentage for price drop alerts (e.g., 5% = 0.05)
PRICE_DROP_THRESHOLD = 0.05


async def check_prices():
products = db.get_all_products()
product_urls = set(product.url for product in products)

for product_url in product_urls:
# Get the price history
price_history = db.get_price_history(product_url)
if not price_history:
continue

# Get the earliest recorded price
earliest_price = price_history[-1].price

# Retrieve updated product data
updated_product = scrape_product(product_url)
current_price = updated_product["price"]

# Add the price to the database
db.add_price(updated_product)
print(f"Added new price entry for {updated_product['name']}")

# Check if price dropped below threshold
if earliest_price > 0: # Avoid division by zero
price_drop = (earliest_price - current_price) / earliest_price
if price_drop >= PRICE_DROP_THRESHOLD:
await send_price_alert(
updated_product["name"], earliest_price, current_price, product_url
)


if __name__ == "__main__":
asyncio.run(check_prices())
134 changes: 134 additions & 0 deletions automated_price_tracking/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from sqlalchemy import create_engine, Column, String, Float, DateTime, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship, declarative_base
from datetime import datetime

Base = declarative_base()


class Product(Base):
__tablename__ = "products"

url = Column(String, primary_key=True)
prices = relationship(
"PriceHistory", back_populates="product", cascade="all, delete-orphan"
)


class PriceHistory(Base):
__tablename__ = "price_histories"

id = Column(String, primary_key=True)
product_url = Column(String, ForeignKey("products.url"))
name = Column(String, nullable=False)
price = Column(Float, nullable=False)
currency = Column(String, nullable=False)
main_image_url = Column(String)
timestamp = Column(DateTime, nullable=False)
product = relationship("Product", back_populates="prices")


class Database:
def __init__(self, connection_string):
self.engine = create_engine(connection_string)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)

def add_product(self, url):
session = self.Session()
try:
# Create the product entry
product = Product(url=url)
session.merge(product) # merge will update if exists, insert if not
session.commit()
finally:
session.close()

def product_exists(self, url):
session = self.Session()
try:
return session.query(Product).filter(Product.url == url).first() is not None
finally:
session.close()

def add_price(self, product_data):
session = self.Session()
try:
# First ensure the product exists
if not self.product_exists(product_data["url"]):
# Create the product if it doesn't exist
product = Product(url=product_data["url"])
session.add(product)
session.flush() # Flush to ensure the product is created before adding price

# Convert timestamp string to datetime if it's a string
timestamp = product_data["timestamp"]
if isinstance(timestamp, str):
timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H-%M")

price_history = PriceHistory(
id=f"{product_data['url']}_{timestamp.strftime('%Y%m%d%H%M%S')}",
product_url=product_data["url"],
name=product_data["name"],
price=product_data["price"],
currency=product_data["currency"],
main_image_url=product_data["main_image_url"],
timestamp=timestamp,
)
session.add(price_history)
session.commit()
finally:
session.close()

def get_all_products(self):
session = self.Session()
try:
return session.query(Product).all()
finally:
session.close()

def get_price_history(self, url):
"""Get price history for a product"""
session = self.Session()
try:
return (
session.query(PriceHistory)
.filter(PriceHistory.product_url == url)
.order_by(PriceHistory.timestamp.desc())
.all()
)
finally:
session.close()

def remove_all_products(self):
session = self.Session()
try:
# First delete all price histories
session.query(PriceHistory).delete()
# Then delete all products
session.query(Product).delete()
session.commit()
finally:
session.close()

# def remove_product(self, url):
# """Remove a product and its price history"""
# session = self.Session()
# try:
# product = session.query(Product).filter(Product.url == url).first()
# if product:
# session.delete(
# product
# ) # This will also delete associated price history due to cascade
# session.commit()
# finally:
# session.close()


if __name__ == "__main__":
from dotenv import load_dotenv
import os

load_dotenv()

db = Database(os.getenv("POSTGRES_URL"))
db.remove_all_products()
36 changes: 36 additions & 0 deletions automated_price_tracking/notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from dotenv import load_dotenv
import os
import aiohttp
import asyncio

load_dotenv()


async def send_price_alert(
product_name: str, old_price: float, new_price: float, url: str
):
"""Send a price drop alert to Discord"""
drop_percentage = ((old_price - new_price) / old_price) * 100

message = {
"embeds": [
{
"title": "Price Drop Alert! 🎉",
"description": f"**{product_name}**\nPrice dropped by {drop_percentage:.1f}%!\n"
f"Old price: ${old_price:.2f}\n"
f"New price: ${new_price:.2f}\n"
f"[View Product]({url})",
"color": 3066993,
}
]
}

try:
async with aiohttp.ClientSession() as session:
await session.post(os.getenv("DISCORD_WEBHOOK_URL"), json=message)
except Exception as e:
print(f"Error sending Discord notification: {e}")


if __name__ == "__main__":
asyncio.run(send_price_alert("Test Product", 100, 90, "https://www.google.com"))
9 changes: 9 additions & 0 deletions automated_price_tracking/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
streamlit
firecrawl-py
pydantic
psycopg2-binary
python-dotenv
sqlalchemy==2.0.35
pandas
plotly
aiohttp
38 changes: 38 additions & 0 deletions automated_price_tracking/scraper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from firecrawl import FirecrawlApp
from pydantic import BaseModel, Field
from datetime import datetime
from dotenv import load_dotenv

load_dotenv()
app = FirecrawlApp()


class Product(BaseModel):
"""Schema for creating a new product"""

url: str = Field(description="The URL of the product")
name: str = Field(description="The product name/title")
price: float = Field(description="The current price of the product")
currency: str = Field(description="Currency code (USD, EUR, etc)")
main_image_url: str = Field(description="The URL of the main image of the product")


def scrape_product(url: str):
extracted_data = app.scrape_url(
url,
params={
"formats": ["extract"],
"extract": {"schema": Product.model_json_schema()},
},
)

# Add the scraping date to the extracted data
extracted_data["extract"]["timestamp"] = datetime.utcnow()

return extracted_data["extract"]


if __name__ == "__main__":
product = "https://www.amazon.com/gp/product/B002U21ZZK/"

print(scrape_product(product))
Loading

0 comments on commit 4c5c949

Please sign in to comment.