diff --git a/community/multi-platform-sales-monitor/README.md b/community/multi-platform-sales-monitor/README.md new file mode 100644 index 00000000..eb665d04 --- /dev/null +++ b/community/multi-platform-sales-monitor/README.md @@ -0,0 +1,121 @@ +# Multi-Platform Sales Monitor + +![Community](https://img.shields.io/badge/OpenHome-Community-orange?style=flat-square) +![Author](https://img.shields.io/badge/Author-@ammadyousaf-lightgrey?style=flat-square) + +## What It Does + +Get real-time sales insights from Gumroad and Shopify with voice commands. Track revenue, compare platforms, analyze trends, and monitor your e-commerce performance—all hands-free. + +## Suggested Trigger Words + +Configure these (or similar) in the OpenHome dashboard for this ability: + +- "check my sales" +- "online sales" +- "sales revenue" +- "shopify sales" +- "gumroad sales" +- "how much did I make" +- "sales dashboard" +- "store sales" + +## Setup + +You need API credentials from Gumroad and Shopify: + +**1. Gumroad API token** + +- Open https://app.gumroad.com/settings/advanced +- Under **Application**, generate an access token and copy it. + +**2. Shopify Admin API** + +- In Shopify Admin: **Settings** → **Apps and sales channels** → **Develop apps** +- Create an app, enable Admin API scopes such as `read_orders` (and `read_products` if needed), install the app, and copy the Admin API access token. +- Note your shop domain (e.g. `your-store.myshopify.com`). + +**3. Preferences file** + +Create `sales_monitor_prefs.json` with **your** secrets (never commit real tokens to git): + +```json +{ + "gumroad_access_token": "YOUR_GUMROAD_ACCESS_TOKEN", + "shopify_shop_url": "your-store.myshopify.com", + "shopify_access_token": "YOUR_SHOPIFY_ADMIN_API_ACCESS_TOKEN" +} +``` + +Upload this file via OpenHome file storage (or the path your deployment uses for `sales_monitor_prefs.json`). + +## How It Works + +1. User says a trigger phrase (e.g. "check my online sales"). +2. The ability loads preferences from `sales_monitor_prefs.json` and calls the Gumroad and Shopify APIs. +3. It opens with a short dashboard summary: "Today you're at X dollars from Y sales. Want the full breakdown?" +4. If user says "yes", "sure", or "go ahead", it provides comprehensive stats (week, month, platform breakdown, best seller). +5. Follow-up queries are handled with LLM-based intent classification: + - Platform breakdown ("What about Shopify?", "Check Gumroad sales") + - Digital vs physical sales comparison + - Trends (today vs yesterday growth) + - Weekly/monthly totals + - Best sellers, customer counts, average orders +6. Multi-number responses are split into separate speak calls for better pacing. +7. Follow-up prompts vary ("What else?", "Anything else?", "Want to know more?") to feel more natural. +8. Say "thanks", "stop", "done", or similar to exit. Every exit path calls `resume_normal_flow()`. + +## Example Conversation + +> **User:** "Check my online sales" +> +> **AI:** "Today you're at 477 dollars from 6 sales. Want the full breakdown?" +> +> **User:** "Yes" +> +> **AI:** "This week you're at 477 dollars, and this month 477 dollars." +> +> **AI:** "Today, Gumroad's at 177 dollars and Shopify's at 300 dollars." +> +> **AI:** "Your best seller this month is Logo T-Shirt with 2 units." +> +> **AI:** "Anything else?" +> +> **User:** "What about Shopify?" +> +> **AI:** "Shopify's at 300 dollars from 3 orders." +> +> **AI:** "Want to know more?" +> +> **User:** "Check trends" +> +> **AI:** "You're up 15 percent compared to yesterday." +> +> **AI:** "What else would you like to know?" +> +> **User:** "Thanks" +> +> **AI:** "Okay, talk to you later!" + +## Features + +- **LLM Intent Classification**: Uses `text_to_text_response()` to understand natural phrases like "how's business been" or "what did I pull in today" +- **Smart Platform Detection**: "Check Shopify sales" only reports Shopify, not both platforms +- **Paced Responses**: Multi-number responses split into separate speak calls for better comprehension +- **Varied Prompts**: Rotates through 5 different follow-up phrases to feel less robotic +- **Comprehensive Breakdown**: Say "yes" after opening to get week/month/platform/best seller stats +- **Multiple Time Ranges**: Today, yesterday, this week, this month, all-time (past year) +- **Product Analytics**: Best sellers, product counts, sales by item +- **Customer Insights**: Unique customer counts, average order values +- **Growth Tracking**: Compare today vs yesterday with percentage changes + +## Contributing to OpenHome (upstream PR) + +To submit this ability to [OpenHome-dev/abilities](https://github.com/OpenHome-dev/abilities): + +1. Fork the repo and clone it. +2. From branch `dev`, create a branch such as `add-multi-platform-sales-monitor`. +3. Copy this folder's contents into `community/multi-platform-sales-monitor/` in the fork (`main.py`, `README.md`, and any other required files per the repo). +4. Open a **Pull Request against `dev`** (not `main`), and complete the PR template. + +See the repository's contributing guide for validation, linting, and review expectations. \ No newline at end of file diff --git a/community/multi-platform-sales-monitor/__init__.py b/community/multi-platform-sales-monitor/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/community/multi-platform-sales-monitor/main.py b/community/multi-platform-sales-monitor/main.py new file mode 100644 index 00000000..2a1183a0 --- /dev/null +++ b/community/multi-platform-sales-monitor/main.py @@ -0,0 +1,716 @@ +""" +Multi-Platform Sales Monitor - OpenHome Ability +Voice-activated sales dashboard for Gumroad and Shopify. +Combines revenue data from multiple platforms into unified analytics. + +Author: Ammad Yousaf +Version: 1.1 +""" + +import json +import random +from datetime import datetime, timezone, timedelta +from typing import Optional, Dict, List, Any + +import requests +from src.agent.capability import MatchingCapability +from src.agent.capability_worker import CapabilityWorker +from src.main import AgentWorker + +GUMROAD_API_BASE = "https://api.gumroad.com/v2" +SHOPIFY_API_VERSION = "2024-01" + +CONTINUE_PROMPTS = [ + "What else would you like to know?", + "Anything else?", + "Want to know more?", + "What else can I tell you?", + "Need anything else?", +] + +PREFS_FILE = "sales_monitor_prefs.json" + +UNIQUE_NAME = "multi_platform_sales_monitor" +# Primary trigger phrases are configured in the OpenHome dashboard; see README. +MATCHING_HOTWORDS: List[str] = [] + + +class MultiSalesMonitorCapability(MatchingCapability): + """Voice-activated multi-platform sales monitoring capability.""" + + worker: AgentWorker = None + capability_worker: CapabilityWorker = None + + # {{register_capability}} + @classmethod + def register_capability(cls) -> "MatchingCapability": + return cls( + unique_name=UNIQUE_NAME, + matching_hotwords=MATCHING_HOTWORDS, + ) + + def call(self, worker: AgentWorker): + self.worker = worker + self.capability_worker = CapabilityWorker(self.worker) + self.worker.session_tasks.create(self.run_sales_monitor()) + + # ========== FILE OPERATIONS ========== + + async def _load_prefs(self) -> Dict[str, Any]: + """Load preferences using OpenHome File Storage API.""" + try: + if await self.capability_worker.check_if_file_exists(PREFS_FILE, False): + content = await self.capability_worker.read_file(PREFS_FILE, False) + return json.loads(content) + except Exception as e: + self.worker.editor_logging_handler.error(f"Failed to load prefs: {e}") + + return {} + + async def _save_prefs(self, prefs: Dict[str, Any]): + """Save preferences using SDK-compliant delete-then-write pattern.""" + try: + if await self.capability_worker.check_if_file_exists(PREFS_FILE, False): + await self.capability_worker.delete_file(PREFS_FILE, False) + + await self.capability_worker.write_file( + PREFS_FILE, + json.dumps(prefs, indent=2), + False, + ) + except Exception as e: + self.worker.editor_logging_handler.error(f"Failed to save prefs: {e}") + + # ========== GUMROAD API ========== + + async def _fetch_gumroad_sales( + self, start_date: str, end_date: str + ) -> List[Dict[str, Any]]: + """Fetch sales from Gumroad API.""" + prefs = await self._load_prefs() + + try: + access_token = prefs.get("gumroad_access_token") + + if not access_token: + self.worker.editor_logging_handler.warning("No Gumroad token configured") + return [] + + url = f"{GUMROAD_API_BASE}/sales" + headers = {"Authorization": f"Bearer {access_token}"} + params = {"after": start_date, "before": end_date} + + self.worker.editor_logging_handler.info(f"Calling Gumroad API: {url}") + response = requests.get(url, headers=headers, params=params, timeout=15) + self.worker.editor_logging_handler.info(f"Gumroad response: {response.status_code}") + + if response.status_code == 200: + data = response.json() + sales = data.get("sales", []) + self.worker.editor_logging_handler.info(f"Gumroad returned {len(sales)} sales") + return sales + else: + self.worker.editor_logging_handler.error( + f"Gumroad API error: {response.status_code}" + ) + return [] + + except Exception as e: + self.worker.editor_logging_handler.error(f"Gumroad fetch error: {e}") + return [] + + # ========== SHOPIFY API ========== + + async def _fetch_shopify_orders( + self, start_date: str, end_date: str + ) -> List[Dict[str, Any]]: + """Fetch orders from Shopify API.""" + prefs = await self._load_prefs() + + try: + shop_url = prefs.get("shopify_shop_url") + access_token = prefs.get("shopify_access_token") + + if not shop_url or not access_token: + self.worker.editor_logging_handler.warning("No Shopify credentials configured") + return [] + + url = f"https://{shop_url}/admin/api/{SHOPIFY_API_VERSION}/orders.json" + headers = {"X-Shopify-Access-Token": access_token} + params = { + "created_at_min": start_date, + "created_at_max": end_date, + "status": "any", + "limit": 250, + } + + self.worker.editor_logging_handler.info(f"Calling Shopify API: {url}") + response = requests.get(url, headers=headers, params=params, timeout=15) + self.worker.editor_logging_handler.info(f"Shopify response: {response.status_code}") + + if response.status_code == 200: + data = response.json() + orders = data.get("orders", []) + self.worker.editor_logging_handler.info(f"Shopify returned {len(orders)} orders") + return orders + else: + self.worker.editor_logging_handler.error( + f"Shopify API error: {response.status_code}" + ) + return [] + + except Exception as e: + self.worker.editor_logging_handler.error(f"Shopify fetch error: {e}") + return [] + + # ========== DATA AGGREGATION ========== + + def _aggregate_sales_data( + self, gumroad_sales: List, shopify_orders: List + ) -> Dict[str, Any]: + """Aggregate sales data from both platforms.""" + gumroad_revenue = sum(sale.get("price", 0) / 100 for sale in gumroad_sales) + gumroad_count = len(gumroad_sales) + + shopify_revenue = sum(float(order.get("total_price", 0)) for order in shopify_orders) + shopify_count = len(shopify_orders) + + total_revenue = gumroad_revenue + shopify_revenue + total_count = gumroad_count + shopify_count + avg_order_value = total_revenue / total_count if total_count > 0 else 0 + + return { + "total_revenue": total_revenue, + "total_count": total_count, + "gumroad_revenue": gumroad_revenue, + "gumroad_count": gumroad_count, + "shopify_revenue": shopify_revenue, + "shopify_count": shopify_count, + "digital_revenue": gumroad_revenue, + "physical_revenue": shopify_revenue, + "avg_order_value": avg_order_value, + } + + # ========== FORMATTING HELPERS ========== + + def _format_currency(self, amount: float, currency: str = "USD") -> str: + """Format amount as currency for voice.""" + if amount == 0: + return "zero dollars" + rounded = round(amount, 2) + if rounded == int(rounded): + return f"{int(rounded)} dollars" + return f"{rounded} dollars" + + def _format_percentage(self, value: float) -> str: + """Format percentage for voice.""" + return f"{round(value)} percent" + + # ========== LLM INTENT CLASSIFICATION ========== + + async def _classify_intent(self, user_input: str) -> str: + """Classify user intent using LLM via text_to_text_response.""" + if not user_input: + return "unknown" + + # Use LLM to extract intent from natural language + prompt = f"""Classify this sales query into ONE category: +- exit (if user wants to stop/quit/exit/done/thanks) +- full_breakdown (if user says yes/sure/go ahead/show me everything/full breakdown/tell me more) +- total_sales (general sales, "how much", "what did I make") +- platform_breakdown (Gumroad vs Shopify, platform comparison) +- digital_vs_physical (digital vs physical products) +- best_seller (top products) +- customer_count (how many customers) +- average_order (average order value) +- this_week (this week's sales) +- this_month (this month's sales) +- yesterday (yesterday's sales) +- all_time (all-time/overall revenue) +- trends (growth, today vs yesterday) +- product_count (how many products) +- unknown (anything else) + +User query: "{user_input}" + +Respond with ONLY the category name, nothing else.""" + + try: + intent = self.capability_worker.text_to_text_response(prompt) + intent = intent.strip().lower() + + valid_intents = [ + "exit", "full_breakdown", "total_sales", "platform_breakdown", "digital_vs_physical", + "best_seller", "customer_count", "average_order", "this_week", + "this_month", "yesterday", "all_time", "trends", "product_count", + ] + + if intent in valid_intents: + self.worker.editor_logging_handler.info(f"Intent classified: {intent}") + return intent + + self.worker.editor_logging_handler.warning(f"Unknown intent: {intent}") + return "unknown" + + except Exception as e: + self.worker.editor_logging_handler.error(f"Intent classification error: {e}") + return "unknown" + + # ========== MAIN LOOP ========== + + async def run_sales_monitor(self) -> None: + """Main conversation loop for sales monitoring.""" + try: + await self._handle_dashboard_summary() + + while True: + response = await self.capability_worker.user_response() + + if not response: + await self.capability_worker.speak("Okay, talk to you later!") + break + + intent = await self._classify_intent(response) + + if intent == "exit": + await self.capability_worker.speak("Okay, talk to you later!") + break + elif intent == "full_breakdown": + await self._handle_full_breakdown() + elif intent == "total_sales": + await self._handle_total_sales() + elif intent == "this_week": + await self._handle_this_week() + elif intent == "this_month": + await self._handle_this_month() + elif intent == "all_time": + await self._handle_all_time() + elif intent == "yesterday": + await self._handle_yesterday() + elif intent == "best_seller": + await self._handle_best_seller() + elif intent == "product_count": + await self._handle_product_count() + elif intent == "platform_breakdown": + await self._handle_platform_breakdown(response) + elif intent == "digital_vs_physical": + await self._handle_digital_vs_physical() + elif intent == "customer_count": + await self._handle_customer_count() + elif intent == "average_order": + await self._handle_average_order() + elif intent == "trends": + await self._handle_trends() + else: + await self.capability_worker.speak( + "I can help with sales totals, platform breakdown, or trends. What would you like?" + ) + + prompt = random.choice(CONTINUE_PROMPTS) + await self.capability_worker.speak(prompt) + + except Exception as e: + self.worker.editor_logging_handler.error(f"Sales monitor error: {e}") + await self.capability_worker.speak("Something went wrong. Try again later.") + finally: + self.capability_worker.resume_normal_flow() + + # ========== INTENT HANDLERS ========== + + async def _handle_dashboard_summary(self) -> None: + """Provide shortened dashboard summary with offer to go deeper.""" + today = datetime.now(timezone.utc).date() + today_start = today.isoformat() + today_end = (today + timedelta(days=1)).isoformat() + + gumroad_today = await self._fetch_gumroad_sales(today_start, today_end) + shopify_today = await self._fetch_shopify_orders(today_start, today_end) + today_stats = self._aggregate_sales_data(gumroad_today, shopify_today) + + if today_stats["total_count"] == 0: + await self.capability_worker.speak("No sales yet today. Want to check other time periods?") + else: + today_str = self._format_currency(today_stats["total_revenue"]) + await self.capability_worker.speak( + f"Today you're at {today_str} from {today_stats['total_count']} sales. Want the full breakdown?" + ) + + async def _handle_full_breakdown(self) -> None: + """Provide comprehensive breakdown with week, month, platforms, and best seller.""" + today = datetime.now(timezone.utc).date() + + today_start = today.isoformat() + today_end = (today + timedelta(days=1)).isoformat() + gumroad_today = await self._fetch_gumroad_sales(today_start, today_end) + shopify_today = await self._fetch_shopify_orders(today_start, today_end) + today_stats = self._aggregate_sales_data(gumroad_today, shopify_today) + + week_start = today - timedelta(days=today.weekday()) + week_start_str = week_start.isoformat() + gumroad_week = await self._fetch_gumroad_sales(week_start_str, today_end) + shopify_week = await self._fetch_shopify_orders(week_start_str, today_end) + week_stats = self._aggregate_sales_data(gumroad_week, shopify_week) + + month_start = today.replace(day=1) + month_start_str = month_start.isoformat() + gumroad_month = await self._fetch_gumroad_sales(month_start_str, today_end) + shopify_month = await self._fetch_shopify_orders(month_start_str, today_end) + month_stats = self._aggregate_sales_data(gumroad_month, shopify_month) + + week_str = self._format_currency(week_stats["total_revenue"]) + month_str = self._format_currency(month_stats["total_revenue"]) + await self.capability_worker.speak(f"This week you're at {week_str}, and this month {month_str}.") + + gumroad_str = self._format_currency(today_stats["gumroad_revenue"]) + shopify_str = self._format_currency(today_stats["shopify_revenue"]) + await self.capability_worker.speak(f"Today, Gumroad's at {gumroad_str} and Shopify's at {shopify_str}.") + + best_seller_info = await self._get_best_seller_info() + if best_seller_info: + await self.capability_worker.speak(f"Your best seller this month is {best_seller_info}.") + + async def _get_best_seller_info(self) -> Optional[str]: + """Get best seller information for summary.""" + today = datetime.now(timezone.utc).date() + start_date = (today - timedelta(days=30)).isoformat() + end_date = (today + timedelta(days=1)).isoformat() + + gumroad_sales = await self._fetch_gumroad_sales(start_date, end_date) + shopify_orders = await self._fetch_shopify_orders(start_date, end_date) + + product_sales: Dict[str, int] = {} + + for sale in gumroad_sales: + product = sale.get("product_name", "Unknown Product") + product_sales[product] = product_sales.get(product, 0) + 1 + + for order in shopify_orders: + for item in order.get("line_items", []): + product = item.get("title", "Unknown Product") + quantity = item.get("quantity", 1) + product_sales[product] = product_sales.get(product, 0) + quantity + + if not product_sales: + return None + + best_seller = max(product_sales.items(), key=lambda x: x[1]) + product_name, count = best_seller + return f"{product_name} with {count} units" + + async def _handle_total_sales(self) -> None: + """Fetch and speak total sales across all platforms.""" + today = datetime.now(timezone.utc).date() + start_date = today.isoformat() + end_date = (today + timedelta(days=1)).isoformat() + + gumroad_sales = await self._fetch_gumroad_sales(start_date, end_date) + shopify_orders = await self._fetch_shopify_orders(start_date, end_date) + stats = self._aggregate_sales_data(gumroad_sales, shopify_orders) + + total_str = self._format_currency(stats["total_revenue"]) + + if stats["total_count"] == 0: + await self.capability_worker.speak("You haven't made any sales today yet.") + elif stats["total_count"] == 1: + await self.capability_worker.speak(f"You've got one sale today for {total_str}.") + else: + await self.capability_worker.speak(f"You've made {total_str} today from {stats['total_count']} sales.") + + async def _handle_platform_breakdown(self, user_query: str = "") -> None: + """Break down sales by platform.""" + today = datetime.now(timezone.utc).date() + start_date = today.isoformat() + end_date = (today + timedelta(days=1)).isoformat() + + gumroad_sales = await self._fetch_gumroad_sales(start_date, end_date) + shopify_orders = await self._fetch_shopify_orders(start_date, end_date) + stats = self._aggregate_sales_data(gumroad_sales, shopify_orders) + + gumroad_str = self._format_currency(stats["gumroad_revenue"]) + shopify_str = self._format_currency(stats["shopify_revenue"]) + + user_query_lower = user_query.lower() + asking_gumroad = "gumroad" in user_query_lower + asking_shopify = "shopify" in user_query_lower + + if asking_gumroad and not asking_shopify: + if stats["gumroad_count"] == 0: + await self.capability_worker.speak("No Gumroad sales yet today.") + else: + await self.capability_worker.speak( + f"Gumroad's at {gumroad_str} from {stats['gumroad_count']} sales." + ) + elif asking_shopify and not asking_gumroad: + if stats["shopify_count"] == 0: + await self.capability_worker.speak("No Shopify orders yet today.") + else: + await self.capability_worker.speak( + f"Shopify's at {shopify_str} from {stats['shopify_count']} orders." + ) + else: + if stats["gumroad_count"] == 0 and stats["shopify_count"] == 0: + await self.capability_worker.speak("No sales on either platform today.") + else: + if stats["gumroad_count"] > 0: + await self.capability_worker.speak( + f"Gumroad's at {gumroad_str} from {stats['gumroad_count']} sales." + ) + else: + await self.capability_worker.speak("No Gumroad sales yet today.") + + if stats["shopify_count"] > 0: + await self.capability_worker.speak( + f"Shopify's at {shopify_str} from {stats['shopify_count']} orders." + ) + else: + await self.capability_worker.speak("No Shopify orders yet today.") + + async def _handle_digital_vs_physical(self) -> None: + """Compare digital vs physical product sales.""" + today = datetime.now(timezone.utc).date() + start_date = today.isoformat() + end_date = (today + timedelta(days=1)).isoformat() + + gumroad_sales = await self._fetch_gumroad_sales(start_date, end_date) + shopify_orders = await self._fetch_shopify_orders(start_date, end_date) + stats = self._aggregate_sales_data(gumroad_sales, shopify_orders) + + total = stats["total_revenue"] + if total > 0: + digital_pct = (stats["digital_revenue"] / total) * 100 + physical_pct = (stats["physical_revenue"] / total) * 100 + else: + digital_pct = 0 + physical_pct = 0 + + digital_str = self._format_currency(stats["digital_revenue"]) + physical_str = self._format_currency(stats["physical_revenue"]) + + if total == 0: + await self.capability_worker.speak("No sales to compare yet today.") + elif stats["digital_revenue"] == 0: + await self.capability_worker.speak(f"All physical products today, {physical_str} total.") + elif stats["physical_revenue"] == 0: + await self.capability_worker.speak(f"All digital products today, {digital_str} total.") + else: + await self.capability_worker.speak( + f"Digital's at {digital_str}, that's {self._format_percentage(digital_pct)}." + ) + await self.capability_worker.speak( + f"Physical's at {physical_str}, which is {self._format_percentage(physical_pct)}." + ) + + async def _handle_customer_count(self) -> None: + """Report unique customer count.""" + today = datetime.now(timezone.utc).date() + start_date = today.isoformat() + end_date = (today + timedelta(days=1)).isoformat() + + gumroad_sales = await self._fetch_gumroad_sales(start_date, end_date) + shopify_orders = await self._fetch_shopify_orders(start_date, end_date) + + emails: set = set() + for sale in gumroad_sales: + if sale.get("email"): + emails.add(sale["email"]) + for order in shopify_orders: + if order.get("customer", {}).get("email"): + emails.add(order["customer"]["email"]) + + customer_count = len(emails) + + if customer_count == 0: + await self.capability_worker.speak("No customers yet today.") + elif customer_count == 1: + await self.capability_worker.speak("Just one customer so far today.") + else: + await self.capability_worker.speak(f"You've had {customer_count} customers today.") + + async def _handle_average_order(self) -> None: + """Report average order value.""" + today = datetime.now(timezone.utc).date() + start_date = today.isoformat() + end_date = (today + timedelta(days=1)).isoformat() + + gumroad_sales = await self._fetch_gumroad_sales(start_date, end_date) + shopify_orders = await self._fetch_shopify_orders(start_date, end_date) + stats = self._aggregate_sales_data(gumroad_sales, shopify_orders) + + if stats["total_count"] == 0: + await self.capability_worker.speak("No sales yet to calculate an average.") + else: + avg_str = self._format_currency(stats["avg_order_value"]) + await self.capability_worker.speak(f"Your average order today is {avg_str}.") + + async def _handle_this_week(self) -> None: + """Report this week's sales.""" + today = datetime.now(timezone.utc).date() + week_start = today - timedelta(days=today.weekday()) + start_date = week_start.isoformat() + end_date = (today + timedelta(days=1)).isoformat() + + gumroad_sales = await self._fetch_gumroad_sales(start_date, end_date) + shopify_orders = await self._fetch_shopify_orders(start_date, end_date) + stats = self._aggregate_sales_data(gumroad_sales, shopify_orders) + + total_str = self._format_currency(stats["total_revenue"]) + + if stats["total_count"] == 0: + await self.capability_worker.speak("No sales this week yet.") + else: + await self.capability_worker.speak( + f"This week you've made {total_str} from {stats['total_count']} sales." + ) + + async def _handle_this_month(self) -> None: + """Report this month's sales.""" + today = datetime.now(timezone.utc).date() + month_start = today.replace(day=1) + start_date = month_start.isoformat() + end_date = (today + timedelta(days=1)).isoformat() + + gumroad_sales = await self._fetch_gumroad_sales(start_date, end_date) + shopify_orders = await self._fetch_shopify_orders(start_date, end_date) + stats = self._aggregate_sales_data(gumroad_sales, shopify_orders) + + total_str = self._format_currency(stats["total_revenue"]) + + if stats["total_count"] == 0: + await self.capability_worker.speak("No sales this month yet.") + else: + await self.capability_worker.speak(f"This month you've made {total_str}.") + + async def _handle_all_time(self) -> None: + """Report all-time revenue (last 365 days as proxy).""" + today = datetime.now(timezone.utc).date() + year_ago = today - timedelta(days=365) + start_date = year_ago.isoformat() + end_date = (today + timedelta(days=1)).isoformat() + + gumroad_sales = await self._fetch_gumroad_sales(start_date, end_date) + shopify_orders = await self._fetch_shopify_orders(start_date, end_date) + stats = self._aggregate_sales_data(gumroad_sales, shopify_orders) + + total_str = self._format_currency(stats["total_revenue"]) + + if stats["total_count"] == 0: + await self.capability_worker.speak("No sales in the past year.") + else: + await self.capability_worker.speak(f"In the past year, you've made {total_str}.") + + async def _handle_yesterday(self) -> None: + """Report yesterday's sales.""" + today = datetime.now(timezone.utc).date() + yesterday = today - timedelta(days=1) + start_date = yesterday.isoformat() + end_date = today.isoformat() + + gumroad_sales = await self._fetch_gumroad_sales(start_date, end_date) + shopify_orders = await self._fetch_shopify_orders(start_date, end_date) + stats = self._aggregate_sales_data(gumroad_sales, shopify_orders) + + total_str = self._format_currency(stats["total_revenue"]) + + if stats["total_count"] == 0: + await self.capability_worker.speak("No sales yesterday.") + else: + await self.capability_worker.speak(f"Yesterday you made {total_str}.") + + async def _handle_best_seller(self) -> None: + """Report best-selling product.""" + today = datetime.now(timezone.utc).date() + start_date = (today - timedelta(days=30)).isoformat() + end_date = (today + timedelta(days=1)).isoformat() + + gumroad_sales = await self._fetch_gumroad_sales(start_date, end_date) + shopify_orders = await self._fetch_shopify_orders(start_date, end_date) + + product_sales: Dict[str, int] = {} + + for sale in gumroad_sales: + product = sale.get("product_name", "Unknown Product") + product_sales[product] = product_sales.get(product, 0) + 1 + + for order in shopify_orders: + for item in order.get("line_items", []): + product = item.get("title", "Unknown Product") + quantity = item.get("quantity", 1) + product_sales[product] = product_sales.get(product, 0) + quantity + + if not product_sales: + await self.capability_worker.speak("No products sold in the last month.") + else: + best_seller = max(product_sales.items(), key=lambda x: x[1]) + product_name, count = best_seller + await self.capability_worker.speak( + f"Your best seller is {product_name} with {count} sales." + ) + + async def _handle_product_count(self) -> None: + """Report total products sold recently.""" + today = datetime.now(timezone.utc).date() + start_date = (today - timedelta(days=90)).isoformat() + end_date = (today + timedelta(days=1)).isoformat() + + gumroad_sales = await self._fetch_gumroad_sales(start_date, end_date) + shopify_orders = await self._fetch_shopify_orders(start_date, end_date) + + products: set = set() + + for sale in gumroad_sales: + product = sale.get("product_name") + if product: + products.add(product) + + for order in shopify_orders: + for item in order.get("line_items", []): + product = item.get("title") + if product: + products.add(product) + + count = len(products) + + if count == 0: + await self.capability_worker.speak("No products sold recently.") + elif count == 1: + await self.capability_worker.speak("One product has sold recently.") + else: + await self.capability_worker.speak(f"{count} different products have sold recently.") + + async def _handle_trends(self) -> None: + """Compare today vs yesterday.""" + today = datetime.now(timezone.utc).date() + yesterday = today - timedelta(days=1) + + today_start = today.isoformat() + today_end = (today + timedelta(days=1)).isoformat() + gumroad_today = await self._fetch_gumroad_sales(today_start, today_end) + shopify_today = await self._fetch_shopify_orders(today_start, today_end) + today_stats = self._aggregate_sales_data(gumroad_today, shopify_today) + + yesterday_start = yesterday.isoformat() + yesterday_end = today.isoformat() + gumroad_yesterday = await self._fetch_gumroad_sales(yesterday_start, yesterday_end) + shopify_yesterday = await self._fetch_shopify_orders(yesterday_start, yesterday_end) + yesterday_stats = self._aggregate_sales_data(gumroad_yesterday, shopify_yesterday) + + today_rev = today_stats["total_revenue"] + yesterday_rev = yesterday_stats["total_revenue"] + + if yesterday_rev == 0 and today_rev == 0: + await self.capability_worker.speak("No sales today or yesterday.") + elif yesterday_rev == 0: + today_str = self._format_currency(today_rev) + await self.capability_worker.speak(f"You're at {today_str} today, up from zero yesterday.") + else: + change_pct = ((today_rev - yesterday_rev) / yesterday_rev) * 100 + if change_pct > 0: + await self.capability_worker.speak( + f"You're up {self._format_percentage(abs(change_pct))} compared to yesterday." + ) + elif change_pct < 0: + await self.capability_worker.speak( + f"You're down {self._format_percentage(abs(change_pct))} from yesterday." + ) + else: + await self.capability_worker.speak("Same as yesterday.")