diff --git a/.gitignore b/.gitignore index 356448e..0ddfe2e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ local/finetuning/preferencevalidatordataset/.env +local/.env +local/temp/ diff --git a/deno.lock b/deno.lock index dcfa245..7901036 100644 --- a/deno.lock +++ b/deno.lock @@ -1,17 +1,93 @@ { "version": "5", "specifiers": { + "npm:@supabase/supabase-js@2.39.3": "2.39.3", "npm:@types/node@*": "24.2.0" }, "npm": { + "@supabase/functions-js@2.4.5": { + "integrity": "sha512-v5GSqb9zbosquTo6gBwIiq7W9eQ7rE5QazsK/ezNiQXdCbY+bH8D9qEaBIkhVvX4ZRW5rP03gEfw5yw9tiq4EQ==", + "dependencies": [ + "@supabase/node-fetch" + ] + }, + "@supabase/gotrue-js@2.72.0": { + "integrity": "sha512-cJSFgvZhhTEyOLAwkadKAlx0zUOzUFSpMd/42iacF1C2pm4FLnc6ICHT0D/q2gPr/tz65tu0g0s51n4od82yEQ==", + "dependencies": [ + "@supabase/node-fetch" + ] + }, + "@supabase/node-fetch@2.6.15": { + "integrity": "sha512-1ibVeYUacxWYi9i0cf5efil6adJ9WRyZBLivgjs+AUpewx1F3xPi7gLgaASI2SmIQxPoCEjAsLAzKPgMJVgOUQ==", + "dependencies": [ + "whatwg-url" + ] + }, + "@supabase/postgrest-js@1.21.3": { + "integrity": "sha512-rg3DmmZQKEVCreXq6Am29hMVe1CzemXyIWVYyyua69y6XubfP+DzGfLxME/1uvdgwqdoaPbtjBDpEBhqxq1ZwA==", + "dependencies": [ + "@supabase/node-fetch" + ] + }, + "@supabase/realtime-js@2.15.4": { + "integrity": "sha512-e/FYIWjvQJHOCNACWehnKvg26zosju3694k0NMUNb+JGLdvHJzEa29ZVVLmawd2kvx4hdbv8mxSqfttRnH3+DA==", + "dependencies": [ + "@supabase/node-fetch", + "@types/phoenix", + "@types/ws", + "ws" + ] + }, + "@supabase/storage-js@2.11.0": { + "integrity": "sha512-Y+kx/wDgd4oasAgoAq0bsbQojwQ+ejIif8uczZ9qufRHWFLMU5cODT+ApHsSrDufqUcVKt+eyxtOXSkeh2v9ww==", + "dependencies": [ + "@supabase/node-fetch" + ] + }, + "@supabase/supabase-js@2.39.3": { + "integrity": "sha512-NoltJSaJNKDJNutO5sJPAAi5RIWrn1z2XH+ig1+cHDojT6BTN7TvZPNa3Kq3gFQWfO5H1N9El/bCTZJ3iFW2kQ==", + "dependencies": [ + "@supabase/functions-js", + "@supabase/gotrue-js", + "@supabase/node-fetch", + "@supabase/postgrest-js", + "@supabase/realtime-js", + "@supabase/storage-js" + ] + }, "@types/node@24.2.0": { "integrity": "sha512-3xyG3pMCq3oYCNg7/ZP+E1ooTaGB4cG8JWRsqqOYQdbWNY4zbaV0Ennrd7stjiJEFZCaybcIgpTjJWHRfBSIDw==", "dependencies": [ "undici-types" ] }, + "@types/phoenix@1.6.6": { + "integrity": "sha512-PIzZZlEppgrpoT2QgbnDU+MMzuR6BbCjllj0bM70lWoejMeNJAxCchxnv7J3XFkI8MpygtRpzXrIlmWUBclP5A==" + }, + "@types/ws@8.18.1": { + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "dependencies": [ + "@types/node" + ] + }, + "tr46@0.0.3": { + "integrity": "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==" + }, "undici-types@7.10.0": { "integrity": "sha512-t5Fy/nfn+14LuOc2KNYg75vZqClpAiqscVvMygNnlsHBFpSXdJaYtXMcdNLpl/Qvc3P2cB3s6lOV51nqsFq4ag==" + }, + "webidl-conversions@3.0.1": { + "integrity": "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==" + }, + "whatwg-url@5.0.0": { + "integrity": "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==", + "dependencies": [ + "tr46", + "webidl-conversions" + ] + }, + "ws@8.18.3": { + "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==" } }, "redirects": { diff --git a/local/.env.template b/local/.env.template new file mode 100644 index 0000000..91e50fb --- /dev/null +++ b/local/.env.template @@ -0,0 +1,12 @@ +# Supabase Configuration +# Copy this file to .env and fill in your actual values + +# Your Supabase project URL +SUPABASE_URL=https://your-project-ref.supabase.co + +# Your Supabase secret key (new key type, replaces SUPABASE_SERVICE_ROLE_KEY) +# Get this from your Supabase dashboard > Settings > API +SUPABASE_SECRET_KEY=your_secret_key_here + +# Legacy fallback (if you haven't migrated to new keys yet) +# SUPABASE_SERVICE_ROLE_KEY=your_legacy_service_role_key_here diff --git a/local/.gitignore b/local/.gitignore index 0275b13..6edb89a 100644 --- a/local/.gitignore +++ b/local/.gitignore @@ -1,2 +1,3 @@ supabase-service.json -datasets/ \ No newline at end of file +datasets/ +off_inventory_cache.jsonl \ No newline at end of file diff --git a/local/openfoodfacts/off_ingest.ts b/local/openfoodfacts/off_ingest.ts new file mode 100644 index 0000000..cd88ce4 --- /dev/null +++ b/local/openfoodfacts/off_ingest.ts @@ -0,0 +1,780 @@ +// deno run -A --unstable-kv local/openfoodfacts/off_ingest.ts +// Environment: Copy .env.template to .env and fill in your values +// Performance: Use --unstable-kv for better memory management +// +// IMAGE STORAGE STRATEGY: +// This script extracts raw image metadata (language keys only) from Open Food Facts +// and stores it in the database. Image URLs are NOT constructed here. +// At runtime, server code will: +// - Select appropriate images based on type (front, ingredients, nutrition, packaging) +// - Prefer English, fallback to other languages +// - Prefer medium resolution (400px), fallback to next available size +// - Construct URLs using: https://static.openfoodfacts.org/images/products/{barcodePath}/{imgId}.{size}.jpg + +// Load environment variables from .env file +async function loadEnv() { + try { + const envText = await Deno.readTextFile("local/.env"); + const lines = envText.split("\n"); + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed && !trimmed.startsWith("#")) { + const [key, ...valueParts] = trimmed.split("="); + if (key && valueParts.length > 0) { + const value = valueParts.join("=").trim(); + Deno.env.set(key.trim(), value); + } + } + } + } catch (error) { + console.warn("āš ļø Could not load .env file:", (error as Error).message); + console.warn(" Make sure to copy .env.template to .env and fill in your values"); + } +} + +type Ingredient = { + name: string; + vegan?: boolean; + vegetarian?: boolean; + ingredients?: Ingredient[]; +}; + +type ImageMetadata = { + type: 'front' | 'ingredients' | 'nutrition' | 'packaging'; + language: string; // e.g., 'en', 'fr', 'de' + imgid: string; // numeric image ID (e.g., '1', '2', '3') + sizes: ('full' | '400' | '200' | '100')[]; // available sizes +}; + +type CacheRow = { + barcode: string; + data_source: string; + brand?: string; + name?: string; + ingredients: Ingredient[]; + images: ImageMetadata[]; + off_last_modified_t?: number; +}; + +const OFF_JSONL_GZ_URL = "https://static.openfoodfacts.org/data/openfoodfacts-products.jsonl.gz"; +const OUTPUT_PATH = "local/off_inventory_cache.jsonl"; +const BATCH_UPLOAD_SIZE = 1000; // Products per batch +const BATCHES_PER_CONFIRMATION = 500; // Upload 500 batches (500k rows) before asking for confirmation +const PARALLEL_BATCHES = 10; // Upload 10 batches in parallel = 10,000 products per batch group +const DELAY_BETWEEN_BATCH_GROUPS_MS = 500; // 500ms delay between batch groups to avoid rate limiting +const SAMPLE_SIZE = 10000; // Sample size for size estimation +const SAMPLE_LINES = 100000; // Only process first 100k lines for sampling + +function mapIngredient(node: any): Ingredient { + const item: Ingredient = { + name: typeof node?.text === "string" ? node.text : undefined as unknown as string, + vegan: node?.vegan, + vegetarian: node?.vegetarian, + ingredients: [], + }; + if (Array.isArray(node?.ingredients) && node.ingredients.length > 0) { + item.ingredients = node.ingredients.filter((x: any) => x && typeof x === "object").map(mapIngredient); + } + return item; +} + +/** + * Extract image metadata from Open Food Facts product data. + * Stores only language keys (front_en, ingredients_fr, etc.) - drops numeric keys. + * URL construction happens at runtime in server code. + * + * RUNTIME URL CONSTRUCTION (for server code): + * + * function barcodeToPath(barcode: string): string { + * if (barcode.length <= 8) return barcode; + * const code = barcode.padStart(13, '0'); + * const segments: string[] = []; + * for (let i = 0; i < code.length - 4; i += 3) { + * segments.push(code.slice(i, i + 3)); + * } + * segments.push(code.slice(code.length - 4)); + * return segments.join('/'); + * } + * + * function constructImageUrl(barcode: string, imgId: string, size?: '400' | '200' | '100'): string { + * const path = barcodeToPath(barcode); + * const sizeStr = size ? `.${size}` : ''; + * return `https://static.openfoodfacts.org/images/products/${path}/${imgId}${sizeStr}.jpg`; + * } + */ +function extractDisplayImageUrls(images: any, _barcode: string): ImageMetadata[] { + if (!images || typeof images !== "object") { + return []; + } + + const metadata: ImageMetadata[] = []; + + try { + // Image types we care about + const imageTypes = ['front', 'ingredients', 'nutrition', 'packaging']; + + // Common languages (ordered by priority) + const languages = ['en', 'fr', 'de', 'es', 'it', 'pt', 'nl', 'pl', 'ru', 'ja', 'zh', 'sv', 'da', 'no', 'fi']; + + // Extract all language-specific keys for each image type + for (const imageType of imageTypes) { + for (const lang of languages) { + const key = `${imageType}_${lang}`; + const imageRef = images[key]; + + if (imageRef && typeof imageRef === "object" && imageRef.imgid && imageRef.sizes) { + // Collect which sizes are available + const availableSizes: ('full' | '400' | '200' | '100')[] = []; + if (imageRef.sizes.full) availableSizes.push('full'); + if (imageRef.sizes["400"]) availableSizes.push('400'); + if (imageRef.sizes["200"]) availableSizes.push('200'); + if (imageRef.sizes["100"]) availableSizes.push('100'); + + if (availableSizes.length > 0) { + metadata.push({ + type: imageType as 'front' | 'ingredients' | 'nutrition' | 'packaging', + language: lang, + imgid: String(imageRef.imgid), + sizes: availableSizes + }); + } + } + } + } + } catch (_error) { + // Ignore malformed structures + } + + return metadata; +} + +function mapToCacheRow(product: any): CacheRow | null { + const dataSource = "openfoodfacts/v3"; + + let barcode: string | undefined; + const code = product?.code; + if (typeof code === "string" && code.trim()) { + barcode = code.trim(); + } else if (typeof code === "number") { + barcode = String(code); + } else if (typeof product?._id === "string" && product._id.trim()) { + barcode = product._id.trim(); + } + if (!barcode) return null; + + let brand: string | undefined; + if (typeof product?.brand_owner === "string" && product.brand_owner.trim()) { + brand = product.brand_owner.trim(); + } else if (typeof product?.brands === "string" && product.brands.trim()) { + brand = product.brands.split(",")[0]?.trim(); + } + + let name: string | undefined; + if (typeof product?.product_name === "string" && product.product_name.trim()) { + name = product.product_name.trim(); + } else { + for (const [k, v] of Object.entries(product ?? {})) { + if (k.startsWith("product_name_") && typeof v === "string" && (v as string).trim()) { + name = (v as string).trim(); + break; + } + } + } + + let ingredients: Ingredient[] = []; + if (Array.isArray(product?.ingredients) && product.ingredients.length > 0) { + ingredients = product.ingredients.filter((x: any) => x && typeof x === "object").map(mapIngredient); + } + + const images = extractDisplayImageUrls(product?.images, barcode); + const off_last_modified_t = typeof product?.last_modified_t === "number" ? product.last_modified_t : undefined; + + + + return { + barcode, + data_source: dataSource, + brand, + name, + ingredients, + images, + off_last_modified_t, + }; +} + +async function* iterLinesFromGzip(url: string, showProgress: boolean = true): AsyncGenerator { + if (showProgress) { + console.log("šŸ“„ Downloading Open Food Facts data..."); + } + const res = await fetch(url); + if (!res.body) throw new Error("No response body from OFF"); + + const contentLength = res.headers.get("content-length"); + const totalBytes = contentLength ? parseInt(contentLength) : 0; + if (showProgress) { + console.log(`šŸ“¦ File size: ${totalBytes > 0 ? formatBytes(totalBytes) : "unknown"}`); + } + + const decompressed = res.body.pipeThrough(new DecompressionStream("gzip")); + const textStream = decompressed.pipeThrough(new TextDecoderStream()); + const reader = textStream.getReader(); + let buf = ""; + let lineCount = 0; + let lastProgressTime = Date.now(); + + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + buf += value ?? ""; + let idx: number; + while ((idx = buf.indexOf("\n")) !== -1) { + const line = buf.slice(0, idx); + buf = buf.slice(idx + 1); + lineCount++; + + // Show progress every 50k lines or every 10 seconds (only if showProgress is true) + if (showProgress) { + const now = Date.now(); + if (lineCount % 50000 === 0 || now - lastProgressTime > 10000) { + console.log(`šŸ“Š Processed ${lineCount.toLocaleString()} products...`); + lastProgressTime = now; + } + } + + yield line; + } + } + if (buf.length > 0) { + lineCount++; + yield buf; + } + } finally { + reader.releaseLock(); + } + if (showProgress) { + console.log(`āœ… Download complete! Processed ${lineCount.toLocaleString()} products`); + } +} + +async function writeJsonl(rows: AsyncIterable<{ row: CacheRow; stats: any }>, outPath: string): Promise<{ count: number; totalBytes: number; nonEmpty: { brand: number; name: number; ingredients: number; images: number }; validationStats: any }> { + console.log("šŸ’¾ Writing transformed data to local file..."); + const file = await Deno.open(outPath, { create: true, write: true, truncate: true }); + const encoder = new TextEncoder(); + let count = 0; + let totalBytes = 0; + let nonBrand = 0, nonName = 0, nonIng = 0, nonImg = 0; + let lastProgressTime = Date.now(); + let lastStats: any = null; + + try { + for await (const { row, stats } of rows) { + count++; + if (row.brand) nonBrand++; + if (row.name) nonName++; + if (row.ingredients && row.ingredients.length) nonIng++; + if (row.images && row.images.length) nonImg++; + const json = JSON.stringify(row); + totalBytes += encoder.encode(json).byteLength; + await file.write(encoder.encode(json + "\n")); + lastStats = stats; + + // Show progress every 25k rows or every 5 seconds + const now = Date.now(); + if (count % 25000 === 0 || now - lastProgressTime > 5000) { + const validRate = ((lastStats.validProducts / lastStats.totalLines) * 100).toFixed(1); + const invalidRate = (((lastStats.emptyLines + lastStats.jsonParseErrors + lastStats.noBarcode) / lastStats.totalLines) * 100).toFixed(1); + console.log(`šŸ“ Written ${count.toLocaleString()} products (${validRate}% valid, ${invalidRate}% invalid)`); + lastProgressTime = now; + } + } + } finally { + file.close(); + } + + // Final validation statistics + if (lastStats) { + console.log(`\nšŸ“Š Validation Statistics:`); + console.log(` Total lines processed: ${lastStats.totalLines.toLocaleString()}`); + console.log(` Valid products: ${lastStats.validProducts.toLocaleString()} (${((lastStats.validProducts / lastStats.totalLines) * 100).toFixed(1)}%)`); + console.log(` Invalid products: ${(lastStats.emptyLines + lastStats.jsonParseErrors + lastStats.noBarcode).toLocaleString()} (${(((lastStats.emptyLines + lastStats.jsonParseErrors + lastStats.noBarcode) / lastStats.totalLines) * 100).toFixed(1)}%)`); + console.log(` - Empty lines: ${lastStats.emptyLines.toLocaleString()}`); + console.log(` - JSON parse errors: ${lastStats.jsonParseErrors.toLocaleString()}`); + console.log(` - No barcode: ${lastStats.noBarcode.toLocaleString()}`); + } + + console.log(`āœ… Local file complete! ${count.toLocaleString()} products written`); + return { count, totalBytes, nonEmpty: { brand: nonBrand, name: nonName, ingredients: nonIng, images: nonImg }, validationStats: lastStats }; +} + +async function* projectRows(lines: AsyncIterable): AsyncGenerator<{ row: CacheRow; stats: { totalLines: number; emptyLines: number; jsonParseErrors: number; noBarcode: number; validProducts: number } }> { + let totalLines = 0; + let emptyLines = 0; + let jsonParseErrors = 0; + let noBarcode = 0; + let validProducts = 0; + + for await (const line of lines) { + totalLines++; + + const trimmed = line.trim(); + if (!trimmed) { + emptyLines++; + continue; + } + + try { + const product = JSON.parse(trimmed); + const row = mapToCacheRow(product); + if (row && row.barcode) { + validProducts++; + + // Progress reporting every 50k products + if (validProducts % 50000 === 0) { + const validRate = ((validProducts / totalLines) * 100).toFixed(1); + const invalidRate = (((emptyLines + jsonParseErrors + noBarcode) / totalLines) * 100).toFixed(1); + console.log(`šŸ“Š Processed ${totalLines.toLocaleString()} lines, found ${validProducts.toLocaleString()} valid products (${validRate}% valid, ${invalidRate}% invalid)...`); + } + + yield { row, stats: { totalLines, emptyLines, jsonParseErrors, noBarcode, validProducts } }; + } else { + noBarcode++; + } + } catch (_) { + jsonParseErrors++; + } + } +} + +function formatBytes(bytes: number): string { + const units = ["B", "KB", "MB", "GB", "TB"] as const; + let i = 0; + let n = bytes; + while (n >= 1024 && i < units.length - 1) { + n /= 1024; + i++; + } + return `${n.toFixed(2)} ${units[i]}`; +} + +function estimateDatabaseSize(sampleRows: CacheRow[], totalProducts: number): { + avgRowSize: number; + estimatedTableSize: number; + estimatedIndexSize: number; + estimatedTotalSize: number; +} { + if (sampleRows.length === 0) { + return { avgRowSize: 0, estimatedTableSize: 0, estimatedIndexSize: 0, estimatedTotalSize: 0 }; + } + + // Calculate average row size from sample + const sampleSizes = sampleRows.map(row => { + const json = JSON.stringify(row); + return new TextEncoder().encode(json).byteLength; + }); + + const avgRowSize = sampleSizes.reduce((sum, size) => sum + size, 0) / sampleSizes.length; + + // Estimate table size (data only) + const estimatedTableSize = avgRowSize * totalProducts; + + // Estimate index size (barcode PK + other indexes) + // Barcode index: ~20 bytes per row + overhead + const barcodeIndexSize = (20 + 8) * totalProducts; // 20 bytes key + 8 bytes pointer + const otherIndexSize = totalProducts * 16; // Additional indexes + const estimatedIndexSize = barcodeIndexSize + otherIndexSize; + + // PostgreSQL overhead (20-30% for metadata, TOAST, etc.) + const overhead = 0.25; + const estimatedTotalSize = (estimatedTableSize + estimatedIndexSize) * (1 + overhead); + + return { + avgRowSize: Math.round(avgRowSize), + estimatedTableSize: Math.round(estimatedTableSize), + estimatedIndexSize: Math.round(estimatedIndexSize), + estimatedTotalSize: Math.round(estimatedTotalSize) + }; +} + +async function sampleProductsForSizeEstimation(lines: AsyncIterable, sampleSize: number, maxLines: number): Promise<{ sampleRows: CacheRow[]; estimatedTotalValidProducts: number; sampleLines: number }> { + console.log(`šŸ“Š Sampling ${sampleSize.toLocaleString()} products from first ${maxLines.toLocaleString()} lines for size estimation...`); + const sampleRows: CacheRow[] = []; + let sampleLines = 0; + let validProductsInSample = 0; + let sampled = 0; + let lastProgressTime = Date.now(); + + for await (const line of lines) { + sampleLines++; + const trimmed = line.trim(); + if (!trimmed) continue; + + try { + const product = JSON.parse(trimmed); + const row = mapToCacheRow(product); + if (row && row.barcode) { + validProductsInSample++; + if (sampled < sampleSize) { + sampleRows.push(row); + sampled++; + } + } + } catch (_) { + // skip invalid lines + } + + // Show progress every 10k lines or every 5 seconds + const now = Date.now(); + if (sampleLines % 10000 === 0 || now - lastProgressTime > 5000) { + console.log(`šŸ“Š Scanned ${sampleLines.toLocaleString()} lines, found ${validProductsInSample.toLocaleString()} valid products...`); + lastProgressTime = now; + } + + // Stop after processing maxLines + if (sampleLines >= maxLines) break; + } + + // Estimate total valid products based on sample ratio + const validRatio = validProductsInSample / sampleLines; + const estimatedTotalValidProducts = Math.round(validRatio * 4046118); // Known total lines from earlier + + console.log(`āœ… Sampled ${sampled.toLocaleString()} products from ${validProductsInSample.toLocaleString()} valid products in ${sampleLines.toLocaleString()} lines`); + console.log(`šŸ“Š Estimated total valid products: ${estimatedTotalValidProducts.toLocaleString()} (${(validRatio * 100).toFixed(1)}% valid rate)`); + + return { sampleRows, estimatedTotalValidProducts, sampleLines }; +} + +async function askUploadPermission(stats: { count: number; totalBytes: number; dbEstimate?: any }): Promise { + console.log("\nSummary:"); + console.log(` Rows: ${stats.count}`); + console.log(` Payload size (JSON only): ${formatBytes(stats.totalBytes)} (~${Math.round(stats.totalBytes / Math.max(1, stats.count))} B/row)`); + + if (stats.dbEstimate) { + console.log("\nšŸ“Š Database Size Estimate:"); + console.log(` Average row size: ${formatBytes(stats.dbEstimate.avgRowSize)}`); + console.log(` Table data size: ${formatBytes(stats.dbEstimate.estimatedTableSize)}`); + console.log(` Index size: ${formatBytes(stats.dbEstimate.estimatedIndexSize)}`); + console.log(` Total estimated size: ${formatBytes(stats.dbEstimate.estimatedTotalSize)}`); + } + + const answer = confirm("Upload to Supabase inventory_cache? This can take a long time. (y/N)"); + return !!answer; +} + + +async function uploadJsonlToSupabase(path: string) { + console.log("šŸ“¤ Starting upload to Supabase..."); + + // Load env for Supabase connection + const url = Deno.env.get("SUPABASE_URL") ?? ""; + const key = Deno.env.get("SUPABASE_SECRET_KEY") ?? Deno.env.get("SUPABASE_SERVICE_ROLE_KEY") ?? ""; + + if (!url || !key) { + console.error("āŒ SUPABASE_URL and SUPABASE_SECRET_KEY must be set"); + Deno.exit(1); + } + + const { createClient } = await import("npm:@supabase/supabase-js@2.39.3"); + const supabase = createClient(url, key, { auth: { persistSession: false } }); + + console.log("šŸ” Opening file:", path); + const file = await Deno.open(path, { read: true }); + console.log("āœ… File opened successfully"); + + const decoder = new TextDecoder(); + const bufSize = 64 * 1024; + const buf = new Uint8Array(bufSize); + let pending: any[] = []; + let leftover = ""; + let total = 0; + let batchCount = 0; + let uploadedBatches = 0; + let parallelUploads: Promise[] = []; // Track parallel uploads + + // Progress tracking + const startTime = Date.now(); + const progressInterval = setInterval(() => { + const elapsed = (Date.now() - startTime) / 1000; + const rate = total / elapsed; + console.log(`šŸ“Š Progress: ${total} products processed, ${batchCount} batches uploaded (${rate.toFixed(0)} products/sec)`); + }, 10000); // Every 10 seconds + + try { + while (true) { + const read = await file.read(buf); + if (read === null) { + break; + } + + const chunk = decoder.decode(buf.subarray(0, read)); + let data = leftover + chunk; + let idx: number; + let linesInChunk = 0; + while ((idx = data.indexOf("\n")) !== -1) { + linesInChunk++; + const line = data.slice(0, idx); + data = data.slice(idx + 1); + if (!line) continue; + + try { + const row = JSON.parse(line); + row.last_refreshed_at = new Date().toISOString(); + pending.push(row); + total++; + + if (pending.length >= BATCH_UPLOAD_SIZE) { + batchCount++; + const currentBatchNum = batchCount; + + // Deduplicate by barcode (keep last occurrence) to avoid "ON CONFLICT DO UPDATE command cannot affect row a second time" error + const deduped = new Map(); + for (const row of pending) { + deduped.set(row.barcode, row); + } + + // Create batch for upload + const batch = Array.from(deduped.values()).map(row => ({ + ...row, + last_refreshed_at: new Date().toISOString() + })); + + // Log if duplicates were found + const duplicateCount = pending.length - batch.length; + if (duplicateCount > 0) { + console.log(`āš ļø Removed ${duplicateCount} duplicate barcodes from batch ${currentBatchNum}`); + } + + // Clear pending immediately + pending = []; + uploadedBatches++; + + // Wait if we've reached the parallel limit BEFORE creating new promise + if (parallelUploads.length >= PARALLEL_BATCHES) { + console.log(`ā³ Waiting for ${parallelUploads.length} parallel uploads to complete...`); + await Promise.all(parallelUploads); + parallelUploads = []; + console.log(`āœ… Completed batch group at ${batchCount} batches`); + + // Add delay to avoid rate limiting + if (DELAY_BETWEEN_BATCH_GROUPS_MS > 0) { + await new Promise(resolve => setTimeout(resolve, DELAY_BETWEEN_BATCH_GROUPS_MS)); + } + } + + // Now create and add the upload promise + const uploadPromise = (async () => { + try { + const { error } = await supabase + .from('inventory_cache') + .upsert(batch, { onConflict: 'barcode' }); + + if (error) { + console.error(`āŒ Batch ${currentBatchNum} failed:`, error.message); + throw error; + } + + // Only log every 10 batches + if (currentBatchNum % 10 === 0) { + console.log(`āœ… Uploaded batch ${currentBatchNum} (${batch.length} rows)`); + } + } catch (error) { + console.error(`āŒ Upload error at batch ${currentBatchNum}:`, error); + throw error; + } + })(); + + parallelUploads.push(uploadPromise); + + // Check if we need confirmation (every 500k products) + if (uploadedBatches >= BATCHES_PER_CONFIRMATION) { + // Wait for any pending uploads before asking + if (parallelUploads.length > 0) { + await Promise.all(parallelUploads); + parallelUploads = []; + } + + console.log(`\nšŸ“Š Checkpoint: ${total.toLocaleString()} products uploaded in ${batchCount} batches`); + console.log(` (Uploaded ${(uploadedBatches * BATCH_UPLOAD_SIZE).toLocaleString()} products since last checkpoint)`); + const continueUpload = confirm(`Continue uploading next 500k products? [y/N]`); + if (!continueUpload) { + console.log("āŒ Upload cancelled by user"); + return; + } + uploadedBatches = 0; + console.log("āœ… Continuing upload...\n"); + } + } + } catch (error) { + console.error(`āŒ JSON parse error on line:`, (error as Error).message); + // skip invalid JSON + } + } + leftover = data; + } + + // Handle remaining data + if (leftover.trim()) { + try { + const row = JSON.parse(leftover.trim()); + row.last_refreshed_at = new Date().toISOString(); + pending.push(row); + total++; + } catch (_) { + // skip invalid JSON + } + } + + // Upload final batch if there's pending data + if (pending.length > 0) { + batchCount++; + const currentBatchNum = batchCount; + + // Deduplicate by barcode (keep last occurrence) + const deduped = new Map(); + for (const row of pending) { + deduped.set(row.barcode, row); + } + + const batch = Array.from(deduped.values()).map(row => ({ + ...row, + last_refreshed_at: new Date().toISOString() + })); + + const duplicateCount = pending.length - batch.length; + if (duplicateCount > 0) { + console.log(`āš ļø Removed ${duplicateCount} duplicate barcodes from final batch ${currentBatchNum}`); + } + + const uploadPromise = (async () => { + try { + const { error } = await supabase + .from('inventory_cache') + .upsert(batch, { onConflict: 'barcode' }); + + if (error) { + console.error(`āŒ Final batch ${currentBatchNum} failed:`, error.message); + throw error; + } + + console.log(`āœ… Uploaded final batch ${currentBatchNum} (${batch.length} rows)`); + } catch (error) { + console.error(`āŒ Upload error at final batch ${currentBatchNum}:`, error); + throw error; + } + })(); + + parallelUploads.push(uploadPromise); + } + + // Wait for all remaining parallel uploads to complete + if (parallelUploads.length > 0) { + console.log(`ā³ Waiting for final ${parallelUploads.length} uploads to complete...`); + await Promise.all(parallelUploads); + console.log(`āœ… All uploads completed!`); + } + + console.log(`āœ… Upload complete! ${total} rows processed in ${batchCount} batches`); + + } finally { + file.close(); + clearInterval(progressInterval); + } +} + +async function main() { + // Parse command line arguments + const args = Deno.args; + const skipDownload = args.includes('--upload-only') || args.includes('-u'); + const showHelp = args.includes('--help') || args.includes('-h'); + + if (showHelp) { + console.log(` +Usage: deno run -A --unstable-kv local/openfoodfacts/off_ingest.ts [options] + +Options: + --upload-only, -u Skip download and processing, go straight to upload + --help, -h Show this help message + +Examples: + deno run -A --unstable-kv local/openfoodfacts/off_ingest.ts # Full process + deno run -A --unstable-kv local/openfoodfacts/off_ingest.ts --upload-only # Upload only + deno run -A --unstable-kv local/openfoodfacts/off_ingest.ts -u # Upload only (short) + `); + return; + } + + if (skipDownload) { + console.log("šŸš€ Starting in upload-only mode (skipping download and processing)..."); + } + + // Load environment variables from .env file + await loadEnv(); + + if (!skipDownload) { + console.log("Downloading OFF JSONL.gz and streaming transform..."); + + // First pass: Sample products for size estimation (fast) + const lines = iterLinesFromGzip(OFF_JSONL_GZ_URL, false); + const { sampleRows, estimatedTotalValidProducts } = await sampleProductsForSizeEstimation(lines, SAMPLE_SIZE, SAMPLE_LINES); + + // Estimate database size for the entire dataset + const dbEstimate = estimateDatabaseSize(sampleRows, estimatedTotalValidProducts); + console.log("\nšŸ“Š Database Size Estimate (Full Dataset):"); + console.log(` Estimated total valid products: ${estimatedTotalValidProducts.toLocaleString()}`); + console.log(` Average row size: ${formatBytes(dbEstimate.avgRowSize)}`); + console.log(` Table data size: ${formatBytes(dbEstimate.estimatedTableSize)}`); + console.log(` Index size: ${formatBytes(dbEstimate.estimatedIndexSize)}`); + console.log(` Total estimated size: ${formatBytes(dbEstimate.estimatedTotalSize)}`); + + // Second pass: Process all products + console.log("\nšŸ”„ Processing all products..."); + const rows = projectRows(iterLinesFromGzip(OFF_JSONL_GZ_URL, true)); + const stats = await writeJsonl(rows, OUTPUT_PATH); + + console.log("\nField coverage (non-empty counts):"); + console.log(` brand: ${stats.nonEmpty.brand}`); + console.log(` name: ${stats.nonEmpty.name}`); + console.log(` ingredients: ${stats.nonEmpty.ingredients}`); + console.log(` images: ${stats.nonEmpty.images}`); + + // Show validation summary + if (stats.validationStats) { + console.log("\nšŸ“Š Data Quality Summary:"); + const validRate = ((stats.validationStats.validProducts / stats.validationStats.totalLines) * 100).toFixed(1); + console.log(` Success rate: ${validRate}% (${stats.validationStats.validProducts.toLocaleString()} valid out of ${stats.validationStats.totalLines.toLocaleString()} total)`); + console.log(` Invalid breakdown:`); + console.log(` - Empty lines: ${stats.validationStats.emptyLines.toLocaleString()}`); + console.log(` - JSON parse errors: ${stats.validationStats.jsonParseErrors.toLocaleString()}`); + console.log(` - No barcode: ${stats.validationStats.noBarcode.toLocaleString()}`); + } + + const proceed = await askUploadPermission({ ...stats, dbEstimate }); + if (!proceed) { + console.log("Upload skipped."); + return; + } + } else { + // Check if the local file exists + try { + const stat = await Deno.stat(OUTPUT_PATH); + console.log(`šŸ“ Found existing file: ${OUTPUT_PATH} (${formatBytes(stat.size)})`); + } catch { + console.error(`āŒ No existing ${OUTPUT_PATH} file found. Run without --upload-only first.`); + return; + } + } + console.log("\nUploading to Supabase (batched upserts)..."); + const start = Date.now(); + await uploadJsonlToSupabase(OUTPUT_PATH); + const elapsed = (Date.now() - start) / 1000; + console.log(`Done in ${elapsed.toFixed(1)}s.`); +} + +if (import.meta.main) { + await main().catch((err) => { + console.error("Error:", err); + Deno.exit(1); + }); +} + + diff --git a/local/openfoodfacts/off_upload_batch.ts b/local/openfoodfacts/off_upload_batch.ts new file mode 100644 index 0000000..7d82820 --- /dev/null +++ b/local/openfoodfacts/off_upload_batch.ts @@ -0,0 +1,56 @@ +// Child process for uploading a single batch to Supabase +import { createClient } from "npm:@supabase/supabase-js@2.39.3"; + +async function uploadBatch() { + const batchFile = Deno.args[0]; + const batchNumber = Deno.args[1]; + + if (!batchFile || !batchNumber) { + console.error("Usage: deno run off_upload_batch.ts "); + Deno.exit(1); + } + + const url = Deno.env.get("SUPABASE_URL") ?? ""; + const key = Deno.env.get("SUPABASE_SECRET_KEY") ?? Deno.env.get("SUPABASE_SERVICE_ROLE_KEY") ?? ""; + + if (!url || !key) { + console.error("SUPABASE_URL and SUPABASE_SECRET_KEY must be set"); + Deno.exit(1); + } + + const supabase = createClient(url, key, { auth: { persistSession: false } }); + + try { + // Read batch file + const batchData = await Deno.readTextFile(batchFile); + const rows = batchData.trim().split('\n').map(line => JSON.parse(line)); + + // Upload to Supabase + const { error } = await supabase + .from('inventory_cache') + .upsert(rows, { onConflict: 'barcode' }); + + if (error) { + console.error(`āŒ Batch ${batchNumber} failed:`, error.message); + Deno.exit(1); + } + + // Only log every 10th batch to reduce output + if (parseInt(batchNumber) % 10 === 0) { + console.log(`āœ… Uploaded batch ${batchNumber} (${rows.length} rows)`); + } + + } catch (error) { + console.error(`āŒ Batch ${batchNumber} error:`, error.message); + Deno.exit(1); + } finally { + // Clean up temp file + try { + await Deno.remove(batchFile); + } catch { + // Ignore cleanup errors + } + } +} + +uploadBatch(); diff --git a/supabase/database/tables.sql b/supabase/database/tables.sql index 9d816f6..bd404bb 100644 --- a/supabase/database/tables.sql +++ b/supabase/database/tables.sql @@ -1,6 +1,99 @@ -------------------------------------------------------------------------------- +create table + public.inventory_cache ( + created_at timestamp with time zone not null default now(), + updated_at timestamp with time zone not null default now(), + last_refreshed_at timestamp with time zone, + barcode text not null, + data_source text not null default 'openfoodfacts/v3', + name text, + brand text, + ingredients jsonb not null default '[]'::jsonb, + images jsonb not null default '[]'::jsonb, + off_last_modified_t bigint, + etag text, + constraint inventory_cache_pkey primary key (barcode) + ) tablespace pg_default; + +alter table public.inventory_cache enable row level security; + +create policy "Select for all authenticated users" on public.inventory_cache + for select + using (true); + +create policy "Write for service role only" on public.inventory_cache + for ALL + using (auth.role() = 'service_role') + with check (auth.role() = 'service_role'); + +create or replace function set_inventory_cache_updated_at() +returns trigger as $$ +begin + new.updated_at = now(); + return new; +end; +$$ language plpgsql; + +create trigger trg_inventory_cache_updated_at +before update on public.inventory_cache +for each row execute function set_inventory_cache_updated_at(); + +-- Function to match barcodes with or without leading zeros +-- Only pads UPWARD to avoid false matches between different barcode types +-- Based on inventory stats: 93% are 13-digit, 5% are 8-digit +-- Examples: "884912373946" (12) matches "0884912373946" (13) āœ“ +-- "12345678" (8) does NOT match "0000012345678" (13) āœ— +create or replace function barcode_matches(barcode1 text, barcode2 text) +returns boolean as $$ +declare + len1 int; + len2 int; + min_len int; +begin + if barcode1 is null or barcode2 is null then + return false; + end if; + + -- Direct match (fastest check) + if barcode1 = barcode2 then + return true; + end if; + + len1 := length(barcode1); + len2 := length(barcode2); + min_len := least(len1, len2); + + -- Only pad to lengths equal to or greater than the shorter barcode + -- This prevents 8-digit codes from matching unrelated 13-digit codes + + -- Try 8 digits (EAN-8) only if shortest is <= 8 + if min_len <= 8 and lpad(barcode1, 8, '0') = lpad(barcode2, 8, '0') then + return true; + end if; + + -- Try 12 digits (UPC-A) only if shortest is <= 12 + if min_len <= 12 and lpad(barcode1, 12, '0') = lpad(barcode2, 12, '0') then + return true; + end if; + + -- Try 13 digits (EAN-13) only if shortest is <= 13 + if min_len <= 13 and lpad(barcode1, 13, '0') = lpad(barcode2, 13, '0') then + return true; + end if; + + -- Try 14 digits (ITF-14) only if shortest is <= 14 + if min_len <= 14 and lpad(barcode1, 14, '0') = lpad(barcode2, 14, '0') then + return true; + end if; + + return false; +end; +$$ language plpgsql immutable; + +-------------------------------------------------------------------------------- + create table public.user_list_items ( created_at timestamp with time zone not null default now(), @@ -63,33 +156,6 @@ CREATE POLICY user_update_own_log_infer ON public.log_feedback -------------------------------------------------------------------------------- -create table - public.log_inventory ( - created_at timestamp with time zone not null default now(), - start_time timestamp with time zone, - end_time timestamp with time zone, - user_id uuid not null, - client_activity_id uuid, - barcode text not null, - data_source text not null, - name text, - brand text, - ingredients json, - images json - ) tablespace pg_default; - -alter table public.log_inventory enable row level security; - -create policy "Select for all authenticated users" on public.log_inventory - for select - using (true); - -create policy "Insert for authenticated users" on public.log_inventory - for insert - with check (auth.uid() = user_id); - --------------------------------------------------------------------------------- - create table public.inventory_traderjoes ( created_at timestamp with time zone not null default now(), @@ -264,12 +330,12 @@ BEGIN SELECT DISTINCT ON (barcode, name, brand) la.created_at, la.client_activity_id, - COALESCE(li.barcode, le.barcode) AS barcode, - COALESCE(li.name, le.name) AS name, - COALESCE(li.brand, le.brand) AS brand, - COALESCE(li.ingredients, le.ingredients) AS ingredients, + COALESCE(le.barcode, ic.barcode) AS barcode, + COALESCE(ic.name, le.name) AS name, + COALESCE(ic.brand, le.brand) AS brand, + COALESCE(ic.ingredients::json, le.ingredients) AS ingredients, COALESCE( - li.images, + ic.images::json, (SELECT json_agg(json_build_object('imageFileHash', text_val)) FROM unnest(le.images) AS dt(text_val)) ) AS images, la.response_body AS ingredient_recommendations, @@ -283,31 +349,27 @@ BEGIN ) AS favorited FROM public.log_analyzebarcode la - LEFT JOIN public.log_inventory li - ON la.client_activity_id = li.client_activity_id LEFT JOIN public.log_extract le ON la.client_activity_id = le.client_activity_id + LEFT JOIN public.inventory_cache ic + ON barcode_matches(le.barcode, ic.barcode) LEFT JOIN public.log_feedback lf ON la.client_activity_id = lf.client_activity_id WHERE la.created_at > '2024-03-15'::date AND - ( - li.client_activity_id IS NOT NULL - OR - le.client_activity_id IS NOT NULL - ) + le.client_activity_id IS NOT NULL AND ( search_query IS NULL OR - to_tsvector('english', COALESCE(li.name, le.name) || ' ' || COALESCE(li.brand, le.brand) || ' ' || COALESCE(li.ingredients::text, le.ingredients::text)) @@ plainto_tsquery('english', search_query) + to_tsvector('english', COALESCE(ic.name, le.name) || ' ' || COALESCE(ic.brand, le.brand) || ' ' || COALESCE(ic.ingredients::text, le.ingredients::text)) @@ plainto_tsquery('english', search_query) OR - COALESCE(li.name, le.name) ILIKE '%' || search_query || '%' + COALESCE(ic.name, le.name) ILIKE '%' || search_query || '%' OR - COALESCE(li.brand, le.brand) ILIKE '%' || search_query || '%' + COALESCE(ic.brand, le.brand) ILIKE '%' || search_query || '%' OR - COALESCE(li.ingredients::text, le.ingredients::text) ILIKE '%' || search_query || '%' + COALESCE(ic.ingredients::text, le.ingredients::text) ILIKE '%' || search_query || '%' ) ORDER BY barcode, name, brand, la.created_at DESC @@ -339,37 +401,33 @@ BEGIN uli.created_at, uli.list_id, uli.list_item_id, - COALESCE(li.barcode, le.barcode) AS barcode, - COALESCE(li.name, le.name) AS name, - COALESCE(li.brand, le.brand) AS brand, - COALESCE(li.ingredients, le.ingredients::json) AS ingredients, + COALESCE(le.barcode, ic.barcode) AS barcode, + COALESCE(ic.name, le.name) AS name, + COALESCE(ic.brand, le.brand) AS brand, + COALESCE(ic.ingredients::json, le.ingredients::json) AS ingredients, COALESCE( - li.images, + ic.images::json, (SELECT json_agg(json_build_object('imageFileHash', text_val)) FROM unnest(le.images) AS dt(text_val)) ) AS images FROM public.user_list_items uli - LEFT JOIN public.log_inventory li ON uli.list_item_id = li.client_activity_id LEFT JOIN public.log_extract le ON uli.list_item_id = le.client_activity_id + LEFT JOIN public.inventory_cache ic ON barcode_matches(le.barcode, ic.barcode) WHERE uli.list_id = input_list_id AND - ( - li.client_activity_id IS NOT NULL - OR - le.client_activity_id IS NOT NULL - ) + le.client_activity_id IS NOT NULL AND ( search_query IS NULL OR - to_tsvector('english', COALESCE(li.name, le.name) || ' ' || COALESCE(li.brand, le.brand) || ' ' || COALESCE(li.ingredients::text, le.ingredients::text)) @@ plainto_tsquery('english', search_query) + to_tsvector('english', COALESCE(ic.name, le.name) || ' ' || COALESCE(ic.brand, le.brand) || ' ' || COALESCE(ic.ingredients::text, le.ingredients::text)) @@ plainto_tsquery('english', search_query) OR - COALESCE(li.name, le.name) ILIKE '%' || search_query || '%' + COALESCE(ic.name, le.name) ILIKE '%' || search_query || '%' OR - COALESCE(li.brand, le.brand) ILIKE '%' || search_query || '%' + COALESCE(ic.brand, le.brand) ILIKE '%' || search_query || '%' OR - COALESCE(li.ingredients::text, le.ingredients::text) ILIKE '%' || search_query || '%' + COALESCE(ic.ingredients::text, le.ingredients::text) ILIKE '%' || search_query || '%' ) ORDER BY uli.created_at DESC; diff --git a/supabase/functions/background/index.ts b/supabase/functions/background/index.ts index b7219d7..32da2bd 100644 --- a/supabase/functions/background/index.ts +++ b/supabase/functions/background/index.ts @@ -45,25 +45,6 @@ router } ctx.response.status = 201 }) - .post('/background/log_inventory', async (ctx) => { - const body = ctx.request.body({ type: 'json', limit: 0 }) - const body_json = await body.value - const user_id = await KitchenSink.getUserId(ctx) - const entry = { - ...body_json, - user_id: user_id, - } - const result = await ctx.state.supabaseClient - .from('log_inventory') - .insert(entry) - if (result.error) { - console.log('supabaseClient.from(log_inventory).insert() failed: ', result.error) - ctx.response.status = 500 - ctx.response.body = result.error - return - } - ctx.response.status = 201 - }) .post('/background/log_llmcalls', async (ctx) => { const body = ctx.request.body({ type: 'json', limit: 0 }) const body_json = await body.value diff --git a/supabase/functions/ingredicheck/analyzer.ts b/supabase/functions/ingredicheck/analyzer.ts index 6cda7b1..cfe5113 100644 --- a/supabase/functions/ingredicheck/analyzer.ts +++ b/supabase/functions/ingredicheck/analyzer.ts @@ -1,93 +1,232 @@ +import { Context } from "https://deno.land/x/oak@v12.6.0/mod.ts"; +import * as DB from "../shared/db.ts"; +import { + ingredientAnalyzerAgent, + IngredientRecommendation, +} from "../shared/llm/ingredientanalyzeragent.ts"; +import * as Inventory from "./inventory.ts"; -import { Context } from 'https://deno.land/x/oak@v12.6.0/mod.ts' -import * as DB from '../shared/db.ts' -import { ingredientAnalyzerAgent } from '../shared/llm/ingredientanalyzeragent.ts' +const MB = 1024 * 1024; -const MB = 1024 * 1024 +export type AnalysisRequest = { + barcode?: string; + userPreferenceText?: string; + clientActivityId?: string; +}; export async function analyze(ctx: Context) { + const startTime = new Date(); + let requestBody: AnalysisRequest = {}; + let responseBody: unknown = []; + let responseStatus = 200; - const startTime = new Date() - let requestBody: any = {} - let product = DB.defaultProduct() - - try { - const body = ctx.request.body({ type: "form-data" }) - const formData = await body.value.read({ maxSize: 10 * MB }) - - requestBody = { - barcode: formData.fields['barcode'], - userPreferenceText: formData.fields['userPreferenceText'], - clientActivityId: formData.fields['clientActivityId'] - } - - ctx.state.clientActivityId = requestBody.clientActivityId - - if (requestBody.barcode !== undefined) { - const result = await ctx.state.supabaseClient - .from('log_inventory') - .select() - .eq('barcode', requestBody.barcode) - .order('created_at', { ascending: false }) - .limit(1) - .single() - - if (result.error) { - throw result.error - } - - product = result.data as DB.Product - } else { - const result = await ctx.state.supabaseClient - .from('log_extract') - .select() - .eq('client_activity_id', ctx.state.clientActivityId) - .order('created_at', { ascending: false }) - .limit(1) - .single() - - if (result.error) { - throw result.error - } - - product = { - barcode: result.data.barcode, - brand: result.data.brand, - name: result.data.name, - ingredients: result.data.ingredients ?? [], - images: [] - } - } - - // Skip analyzer agent if user has no preferences set - const hasValidPreferences = requestBody.userPreferenceText && - requestBody.userPreferenceText.trim() !== "" && - requestBody.userPreferenceText.trim().toLowerCase() !== "none" - - const ingredientRecommendations = - product.ingredients && product.ingredients.length !== 0 && hasValidPreferences - ? await ingredientAnalyzerAgent(ctx, product, requestBody.userPreferenceText) - : [] - - ctx.response.status = 200 - ctx.response.body = ingredientRecommendations - } catch (error) { - ctx.response.status = 500 - ctx.response.body = error + try { + const body = ctx.request.body({ type: "form-data" }); + const formData = await body.value.read({ maxSize: 10 * MB }); + + requestBody = { + barcode: formData.fields["barcode"], + userPreferenceText: formData.fields["userPreferenceText"], + clientActivityId: formData.fields["clientActivityId"], + }; + + const result = await performAnalysis({ + ctx, + requestBody, + }); + + responseStatus = 200; + responseBody = result.recommendations; + } catch (error) { + responseStatus = 500; + responseBody = error; + } + + ctx.response.status = responseStatus; + ctx.response.body = responseBody; + + await logAnalysisResult( + ctx, + startTime, + requestBody, + responseStatus, + responseBody, + ); +} + +export async function streamInventoryAndAnalysis(ctx: Context) { + const barcode = ctx.params.barcode; + const clientActivityId = + ctx.request.url.searchParams.get("clientActivityId") ?? undefined; + const userPreferenceText = + ctx.request.url.searchParams.get("userPreferenceText") ?? undefined; + + const sse = ctx.sendEvents(); + + if (!barcode) { + sse.dispatchMessage({ + event: "error", + data: JSON.stringify({ message: "Barcode is required." }), + }); + sse.close(); + return; + } + + const inventoryResult = await Inventory.getProductFromCache({ + supabaseClient: ctx.state.supabaseClient, + barcode, + clientActivityId, + }); + + if (inventoryResult.status !== 200 || !inventoryResult.product) { + const errorPayload = { + message: inventoryResult.error ?? "Product not found.", + status: inventoryResult.status, + }; + sse.dispatchMessage({ + event: "error", + data: JSON.stringify(errorPayload), + }); + sse.close(); + return; + } + + sse.dispatchMessage({ + event: "product", + data: JSON.stringify(inventoryResult.product), + }); + + const analysisStartTime = new Date(); + + const analysisRequest: AnalysisRequest = { + barcode, + userPreferenceText, + clientActivityId, + }; + + try { + const analysisResult = await performAnalysis({ + ctx, + requestBody: analysisRequest, + productOverride: inventoryResult.product, + }); + + sse.dispatchMessage({ + event: "analysis", + data: JSON.stringify(analysisResult.recommendations), + }); + + await logAnalysisResult( + ctx, + analysisStartTime, + analysisRequest, + 200, + analysisResult.recommendations, + ); + } catch (error) { + const message = error instanceof Error ? error.message : "Analysis failed."; + + sse.dispatchMessage({ + event: "error", + data: JSON.stringify({ message }), + }); + + await logAnalysisResult( + ctx, + analysisStartTime, + analysisRequest, + 500, + { message }, + ); + } finally { + sse.dispatchComment("done"); + sse.close(); + } +} + +type PerformAnalysisOptions = { + ctx: Context; + requestBody: AnalysisRequest; + productOverride?: DB.Product; +}; + +type PerformAnalysisResult = { + product: DB.Product; + recommendations: IngredientRecommendation[]; +}; + +export async function performAnalysis( + options: PerformAnalysisOptions, +): Promise { + const { ctx, requestBody, productOverride } = options; + + ctx.state.clientActivityId = requestBody.clientActivityId; + + let product: DB.Product; + + if (productOverride) { + product = productOverride; + } else { + const result = await Inventory.getProductFromCache({ + supabaseClient: ctx.state.supabaseClient, + barcode: requestBody.barcode, + clientActivityId: ctx.state.clientActivityId, + }); + + if (result.status !== 200 || !result.product) { + throw new Error(result.error ?? "Product not found"); } - const endTime = new Date() + product = result.product; + } + + const hasValidPreferences = requestBody.userPreferenceText && + requestBody.userPreferenceText.trim() !== "" && + requestBody.userPreferenceText.trim().toLowerCase() !== "none"; + + const hasIngredients = Array.isArray(product.ingredients) && + product.ingredients.length > 0; + + const recommendations = hasValidPreferences && hasIngredients + ? await ingredientAnalyzerAgent( + ctx, + product, + requestBody.userPreferenceText!, + ) + : []; + + return { + product, + recommendations, + }; +} + +export async function logAnalysisResult( + ctx: Context, + startTime: Date, + requestBody: AnalysisRequest, + responseStatus: number, + responseBody: unknown, +) { + const endTime = new Date(); - ctx.state.supabaseClient.functions.invoke('background/log_analyzebarcode', { + try { + await ctx.state.supabaseClient.functions.invoke( + "background/log_analyzebarcode", + { body: { - activity_id: ctx.state.activityId, - client_activity_id: ctx.state.clientActivityId, - start_time: startTime, - end_time: endTime, - request_body: requestBody, - response_status: ctx.response.status, - response_body: ctx.response.body + activity_id: ctx.state.activityId, + client_activity_id: ctx.state.clientActivityId, + start_time: startTime, + end_time: endTime, + request_body: requestBody, + response_status: responseStatus, + response_body: responseBody, }, - method: 'POST' - }) -} \ No newline at end of file + method: "POST", + }, + ); + } catch (error) { + console.error("Failed to log analyze barcode event", error); + } +} diff --git a/supabase/functions/ingredicheck/index.ts b/supabase/functions/ingredicheck/index.ts index c606bec..d4d0ccc 100644 --- a/supabase/functions/ingredicheck/index.ts +++ b/supabase/functions/ingredicheck/index.ts @@ -46,6 +46,7 @@ router } ctx.response.status = 204 }) + .get('/ingredicheck/inventory/:barcode/analyze-stream', Analyzer.streamInventoryAndAnalysis) .get('/ingredicheck/inventory/:barcode', async (ctx) => { const clientActivityId = ctx.request.url.searchParams.get("clientActivityId") await Inventory.get(ctx, ctx.params.barcode, clientActivityId) diff --git a/supabase/functions/ingredicheck/inventory.ts b/supabase/functions/ingredicheck/inventory.ts index 7f0bffa..34c1043 100644 --- a/supabase/functions/ingredicheck/inventory.ts +++ b/supabase/functions/ingredicheck/inventory.ts @@ -1,124 +1,138 @@ -import { Context } from 'https://deno.land/x/oak@v12.6.0/mod.ts' -import * as DB from '../shared/db.ts' - -export async function get(ctx: Context, barcode: string, clientActivityId: string | null) { - - let result_json: any = {} - let log_json: any = { - start_time: new Date(), - barcode: barcode, - data_source: 'openfoodfacts/v3', - client_activity_id: clientActivityId, - } - - const url = `https://world.openfoodfacts.org/api/v3/product/${barcode}.json` - const response = await fetch(url) - const data = await response.json() - - if (data.status === 'failure') { - console.log(`Unexpected product details: ${JSON.stringify(data, null, 2)}`) - ctx.response.status = 404 +import { Context } from "https://deno.land/x/oak@v12.6.0/mod.ts"; +import * as DB from "../shared/db.ts"; + +type InventoryCacheOptions = { + supabaseClient: any; + barcode?: string; + clientActivityId?: string; +}; + +type InventoryCacheResult = { + status: number; + product: DB.Product | null; + error?: string; +}; + +/** + * Queries the inventory_cache for a product by barcode. + * If no barcode is provided, falls back to log_extract by clientActivityId. + */ +export async function getProductFromCache( + options: InventoryCacheOptions, +): Promise { + const { supabaseClient, barcode, clientActivityId } = options; + + // Query inventory_cache if barcode is provided + if (barcode !== undefined) { + // Try to match barcodes with or without leading zeros + // Only pad UPWARD to avoid false matches between different barcode types + const variants = [barcode]; // Always include original + const len = barcode.length; + + if (len <= 8) { + // EAN-8 format (5% of inventory) - only pad to 8 + variants.push(barcode.padStart(8, '0')); + } else if (len <= 12) { + // UPC-A format - pad to 12, 13, 14 + variants.push(barcode.padStart(12, '0')); + variants.push(barcode.padStart(13, '0')); // UPC-A → EAN-13 conversion + variants.push(barcode.padStart(14, '0')); + } else if (len === 13) { + // EAN-13 format (93% of inventory) - pad to 13, 14 + variants.push(barcode.padStart(13, '0')); + variants.push(barcode.padStart(14, '0')); } else { - // console.log(`brand: ${data.product.brand_owner}`) - // console.log(`name: ${data.product.product_name}`) - // console.log(`ingredients: ${data.product.ingredients}`) - // console.log(`images: ${data.product.selected_images?.front?.display?.en}`) - result_json = processOpenFoodFactsProductData(barcode, data.product) - log_json = { - ...log_json, - ...result_json - } - ctx.response.status = 200 - } - - log_json.end_time = new Date() - - await ctx.state.supabaseClient.functions.invoke('background/log_inventory', { - body: log_json, - method: 'POST' - }) - - ctx.response.body = result_json -} - -type SelectedImages = { - [key: string]: { - display: { - [key: string]: string - } - } -} - -type ImageUrl = { - url: string -} - -function extractDisplayImageUrls(selectedImages?: SelectedImages): ImageUrl[] { - if (selectedImages) { - return Object.values(selectedImages).flatMap(image => { - if (image.display?.en) { - return [{ - url: image.display.en - }] - } - return [] - }) - } - return [] -} - -function processOpenFoodFactsProductData(barcode: string, product: any) : DB.Product { - - let brand: string | undefined = undefined - let name: string | undefined = undefined - let ingredients: any[] = [] - - if (product.brand_owner) { - brand = product.brand_owner + // 14+ digits - only pad to 14 + variants.push(barcode.padStart(14, '0')); } - - if (product.product_name) { - name = product.product_name + + // Remove duplicates and create OR condition + const uniqueVariants = [...new Set(variants)]; + const orCondition = uniqueVariants.map(v => `barcode.eq.${v}`).join(','); + + const result = await supabaseClient + .from("inventory_cache") + .select() + .or(orCondition) + .limit(1) + .maybeSingle(); + + if (result.error) { + return { + status: 404, + product: null, + error: result.error.message ?? "Product not found in cache.", + }; } - if (product.ingredients) { - ingredients = - product.ingredients.map((i: any) => { - return { - name: i.text, - vegan: i.vegan, - vegetarian: i.vegetarian, - ingredients: i.ingredients?.map((i2: any) => { - return { - name: i2.text, - vegan: i2.vegan, - vegetarian: i2.vegetarian, - ingredients: i2.ingredients?.map((i3: any) => { - return { - name: i3.text, - vegan: i3.vegan, - vegetarian: i3.vegetarian, - ingredients: [] - } - }) ?? [] - } - }) ?? [] - } - }) + if (!result.data) { + return { + status: 404, + product: null, + error: "Product not found in cache.", + }; } - const images = extractDisplayImageUrls(product.selected_images) - - // Workaround for known issues with OpenFoodFacts data - if (barcode === '0096619362776') { - // Label says 'Contains No Animal Rennet', but ingredient list has 'Animal Rennet'. - ingredients = ingredients.filter((i) => i.name !== 'Animal Rennet') + return { + status: 200, + product: result.data as DB.Product, + }; + } + + // Fallback to log_extract if no barcode provided + if (clientActivityId !== undefined) { + const result = await supabaseClient + .from("log_extract") + .select() + .eq("client_activity_id", clientActivityId) + .order("created_at", { ascending: false }) + .limit(1) + .single(); + + if (result.error) { + return { + status: 404, + product: null, + error: result.error.message ?? "Product not found in extract log.", + }; } return { - brand: brand, - name: name, - ingredients: ingredients, - images: images - } -} \ No newline at end of file + status: 200, + product: { + barcode: result.data.barcode, + brand: result.data.brand, + name: result.data.name, + ingredients: result.data.ingredients ?? [], + images: [], + }, + }; + } + + return { + status: 400, + product: null, + error: "Either barcode or clientActivityId must be provided.", + }; +} + +export async function get( + ctx: Context, + barcode: string, + clientActivityId: string | null, +) { + const result = await getProductFromCache({ + supabaseClient: ctx.state.supabaseClient, + barcode, + clientActivityId: clientActivityId ?? undefined, + }); + + ctx.response.status = result.status; + if (result.status === 200 && result.product) { + ctx.response.body = result.product; + } else { + ctx.response.body = { + error: result.error ?? "Product not found in cache.", + }; + } +} diff --git a/supabase/functions/shared/db.ts b/supabase/functions/shared/db.ts index 878af75..c8908e8 100644 --- a/supabase/functions/shared/db.ts +++ b/supabase/functions/shared/db.ts @@ -22,6 +22,10 @@ export type Product = { export function defaultProduct(): Product { return { + barcode: undefined, + data_source: undefined, + brand: undefined, + name: undefined, ingredients: [], images: [], } diff --git a/supabase/functions/shared/llm/ingredientanalyzeragent.ts b/supabase/functions/shared/llm/ingredientanalyzeragent.ts index 6c79db5..e2a8b86 100644 --- a/supabase/functions/shared/llm/ingredientanalyzeragent.ts +++ b/supabase/functions/shared/llm/ingredientanalyzeragent.ts @@ -7,7 +7,7 @@ import { import { createGeminiProgram } from "./programs.ts"; import { ChatMessage } from "./types.ts"; -type IngredientRecommendation = { +export type IngredientRecommendation = { ingredientName: string; safetyRecommendation: "MaybeUnsafe" | "DefinitelyUnsafe"; reasoning: string;