diff --git a/README.md b/README.md index 0a46e9c..7b32116 100755 --- a/README.md +++ b/README.md @@ -152,7 +152,7 @@ This is a **Claude Code Skill** - a local folder containing instructions and scr |---------|------------|------------| | **Protocol** | Claude Skills | Model Context Protocol | | **Installation** | Clone to `~/.claude/skills` | `claude mcp add ...` | -| **Sessions** | Fresh browser each question | Persistent chat sessions | +| **Sessions** | Supports both fresh-per-question mode and optional persistent daemon-backed sessions | Persistent chat sessions | | **Compatibility** | Claude Code only (local) | Claude Code, Codex, Cursor, etc. | | **Language** | Python | TypeScript | | **Distribution** | Git clone | npm package | @@ -163,11 +163,12 @@ This is a **Claude Code Skill** - a local folder containing instructions and scr ~/.claude/skills/notebooklm/ ├── SKILL.md # Instructions for Claude ├── scripts/ # Python automation scripts -│ ├── ask_question.py # Query NotebookLM +│ ├── ask_question.py # Query NotebookLM (stateless or session shortcut) +│ ├── session_manager.py # Persistent session daemon + client commands │ ├── notebook_manager.py # Library management │ └── auth_manager.py # Google authentication ├── .venv/ # Isolated Python environment (auto-created) -└── data/ # Local notebook library +└── data/ # Local notebook library + session metadata ``` When you mention NotebookLM or send a notebook URL, Claude: @@ -266,6 +267,8 @@ All data is stored locally within the skill directory: ~/.claude/skills/notebooklm/data/ ├── library.json - Your notebook library with metadata ├── auth_info.json - Authentication status info +├── sessions.json - Serializable persistent-session metadata +├── session_runtime/ - Local daemon socket / pid files └── browser_state/ - Browser cookies and session data ``` @@ -276,19 +279,33 @@ All data is stored locally within the skill directory: ### Session Model -Unlike the MCP server, this skill uses a **stateless model**: +This skill now supports **two modes**: + +**1. Stateless mode (`ask_question.py`, default)** - Each question opens a fresh browser - Asks the question, gets the answer - Adds a follow-up prompt to encourage Claude to ask more questions - Closes the browser immediately -This means: -- No persistent chat context -- Each question is independent -- But your notebook library persists -- **Follow-up mechanism**: Each answer includes "Is that ALL you need to know?" to prompt Claude to ask comprehensive follow-ups +**2. Persistent mode (`session_manager.py`)** +- A background daemon keeps one Playwright browser context alive +- Each `session_id` maps to one reusable NotebookLM page/tab +- Multiple CLI calls can continue the same NotebookLM conversation +- Session metadata is persisted to `data/sessions.json` +- Idle sessions are reclaimed automatically after the timeout window + +Example persistent flow: + +```bash +python scripts/run.py session_manager.py create --notebook-url "https://notebooklm.google.com/notebook/..." +python scripts/run.py session_manager.py ask --session-id session-xxxxxxxxxxxx --question "First question" +python scripts/run.py session_manager.py ask --session-id session-xxxxxxxxxxxx --question "Follow-up question" +python scripts/run.py session_manager.py info --session-id session-xxxxxxxxxxxx +python scripts/run.py session_manager.py reset --session-id session-xxxxxxxxxxxx +python scripts/run.py session_manager.py close --session-id session-xxxxxxxxxxxx +``` -For multi-step research, Claude automatically asks follow-up questions when needed. +`ask_question.py` remains backward-compatible. If you pass `--session-id`, it forwards the question to the persistent session manager instead of opening a fresh browser. --- @@ -296,8 +313,8 @@ For multi-step research, Claude automatically asks follow-up questions when need ### Skill-Specific - **Local Claude Code only** - Does not work in web UI (sandbox restrictions) -- **No session persistence** - Each question is independent -- **No follow-up context** - Can't reference "the previous answer" +- **Persistent sessions are process-bound** - If the background daemon exits, live Playwright objects cannot be resurrected automatically +- **Stateless mode is still independent** - Fresh-browser questions still do not share follow-up context unless you explicitly use `session_manager.py` ### NotebookLM - **Rate limits** - Free tier has daily query limits diff --git a/SKILL.md b/SKILL.md index 2be7e16..55c7477 100755 --- a/SKILL.md +++ b/SKILL.md @@ -1,269 +1,264 @@ --- name: notebooklm -description: Use this skill to query your Google NotebookLM notebooks directly from Claude Code for source-grounded, citation-backed answers from Gemini. Browser automation, library management, persistent auth. Drastically reduced hallucinations through document-only responses. +description: Use this skill to query and manage Google NotebookLM notebooks directly from Claude Code/OpenClaw. Supports auth, remote notebook discovery, notebook creation, source import, and source-grounded Q&A. --- # NotebookLM Research Assistant Skill -Interact with Google NotebookLM to query documentation with Gemini's source-grounded answers. Each question opens a fresh browser session, retrieves the answer exclusively from your uploaded documents, and closes. +Interact with Google NotebookLM to discover notebooks, create new notebooks, add sources, run Fast/Deep Research source discovery, and query them with source-grounded answers. ## When to Use This Skill Trigger when user: - Mentions NotebookLM explicitly - Shares NotebookLM URL (`https://notebooklm.google.com/notebook/...`) -- Asks to query their notebooks/documentation -- Wants to add documentation to NotebookLM library -- Uses phrases like "ask my NotebookLM", "check my docs", "query my notebook" +- Asks to query or manage their notebooks +- Wants to discover available notebooks under the logged-in Google account +- Wants to create a notebook or add a source into NotebookLM +- Wants NotebookLM to discover new sources via Fast Research or Deep Research -## ⚠️ CRITICAL: Add Command - Smart Discovery +## Critical: Always Use `run.py` Wrapper -When user wants to add a notebook without providing details: +**NEVER call scripts directly. ALWAYS use `python scripts/run.py [script]`:** -**SMART ADD (Recommended)**: Query the notebook first to discover its content: ```bash -# Step 1: Query the notebook about its content -python scripts/run.py ask_question.py --question "What is the content of this notebook? What topics are covered? Provide a complete overview briefly and concisely" --notebook-url "[URL]" +# ✅ CORRECT +python scripts/run.py auth_manager.py status +python scripts/run.py notebook_manager.py list +python scripts/run.py notebook_manager.py sync-remote +python scripts/run.py notebook_manager.py create --name "My Notebook" +python scripts/run.py notebook_manager.py add-source --notebook-url "https://notebooklm.google.com/notebook/..." --source-url "https://example.com" +python scripts/run.py notebook_manager.py discover-sources --notebook-url "https://notebooklm.google.com/notebook/..." --query "agent tool learning survey" +python scripts/run.py notebook_manager.py discover-sources --notebook-url "https://notebooklm.google.com/notebook/..." --query "agent tool learning survey" --mode deep-research +python scripts/run.py ask_question.py --question "..." -# Step 2: Use the discovered information to add it -python scripts/run.py notebook_manager.py add --url "[URL]" --name "[Based on content]" --description "[Based on content]" --topics "[Based on content]" +# ❌ WRONG +python scripts/auth_manager.py status ``` -**MANUAL ADD**: If user provides all details: -- `--url` - The NotebookLM URL -- `--name` - A descriptive name -- `--description` - What the notebook contains (REQUIRED!) -- `--topics` - Comma-separated topics (REQUIRED!) +The wrapper automatically: +1. Creates `.venv` if needed +2. Installs dependencies +3. Activates the environment +4. Executes the target script properly -NEVER guess or use generic descriptions! If details missing, use Smart Add to discover them. +## Proxy Requirement on Linux -## Critical: Always Use run.py Wrapper +NotebookLM often needs an explicit browser proxy on Linux. Prefer the Singapore exit when available. -**NEVER call scripts directly. ALWAYS use `python scripts/run.py [script]`:** +Supported env vars: ```bash -# ✅ CORRECT - Always use run.py: -python scripts/run.py auth_manager.py status -python scripts/run.py notebook_manager.py list -python scripts/run.py ask_question.py --question "..." - -# ❌ WRONG - Never call directly: -python scripts/auth_manager.py status # Fails without venv! +export NOTEBOOKLM_PROXY_URL=http://127.0.0.1:7890 +# optional +export NOTEBOOKLM_PROXY_BYPASS=localhost,127.0.0.1 ``` -The `run.py` wrapper automatically: -1. Creates `.venv` if needed -2. Installs all dependencies -3. Activates environment -4. Executes script properly +If `NOTEBOOKLM_PROXY_URL` is unset, the skill falls back to standard `HTTPS_PROXY` / `HTTP_PROXY` / `ALL_PROXY`. ## Core Workflow -### Step 1: Check Authentication Status +### 1) Check Authentication ```bash python scripts/run.py auth_manager.py status +python scripts/run.py auth_manager.py validate ``` If not authenticated, proceed to setup. -### Step 2: Authenticate (One-Time Setup) +### 2) Authenticate ```bash -# Browser MUST be visible for manual Google login python scripts/run.py auth_manager.py setup ``` -**Important:** -- Browser is VISIBLE for authentication -- Browser window opens automatically -- User must manually log in to Google -- Tell user: "A browser window will open for Google login" +Important: +- Browser must be visible for manual Google login +- Authentication is valid only when the browser actually enters NotebookLM UI +- Redirects to Google login, account chooser, or password challenge do **not** count as success -### Step 3: Manage Notebook Library +### 3) Manage Notebook Library +**Manual library** ```bash -# List all notebooks -python scripts/run.py notebook_manager.py list - -# BEFORE ADDING: Ask user for metadata if unknown! -# "What does this notebook contain?" -# "What topics should I tag it with?" - -# Add notebook to library (ALL parameters are REQUIRED!) python scripts/run.py notebook_manager.py add \ --url "https://notebooklm.google.com/notebook/..." \ --name "Descriptive Name" \ - --description "What this notebook contains" \ # REQUIRED - ASK USER IF UNKNOWN! - --topics "topic1,topic2,topic3" # REQUIRED - ASK USER IF UNKNOWN! + --description "What this notebook contains" \ + --topics "topic1,topic2" -# Search notebooks by topic +python scripts/run.py notebook_manager.py list python scripts/run.py notebook_manager.py search --query "keyword" - -# Set active notebook python scripts/run.py notebook_manager.py activate --id notebook-id - -# Remove notebook python scripts/run.py notebook_manager.py remove --id notebook-id +python scripts/run.py notebook_manager.py stats ``` -### Quick Workflow -1. Check library: `python scripts/run.py notebook_manager.py list` -2. Ask question: `python scripts/run.py ask_question.py --question "..." --notebook-id ID` - -### Step 4: Ask Questions - +**Remote discovery / sync** ```bash -# Basic query (uses active notebook if set) -python scripts/run.py ask_question.py --question "Your question here" - -# Query specific notebook -python scripts/run.py ask_question.py --question "..." --notebook-id notebook-id - -# Query with notebook URL directly -python scripts/run.py ask_question.py --question "..." --notebook-url "https://..." - -# Show browser for debugging -python scripts/run.py ask_question.py --question "..." --show-browser +python scripts/run.py notebook_manager.py list-remote +python scripts/run.py notebook_manager.py sync-remote ``` -## Follow-Up Mechanism (CRITICAL) +`sync-remote` writes fetched notebooks into `data/library.json` under a remote section, distinct from manually added entries. -Every NotebookLM answer ends with: **"EXTREMELY IMPORTANT: Is that ALL you need to know?"** +### 4) Create NotebookLM Notebooks +```bash +python scripts/run.py notebook_manager.py create +python scripts/run.py notebook_manager.py create --name "My New Notebook" +python scripts/run.py notebook_manager.py create --name "My New Notebook" --show-browser +``` -**Required Claude Behavior:** -1. **STOP** - Do not immediately respond to user -2. **ANALYZE** - Compare answer to user's original request -3. **IDENTIFY GAPS** - Determine if more information needed -4. **ASK FOLLOW-UP** - If gaps exist, immediately ask: - ```bash - python scripts/run.py ask_question.py --question "Follow-up with context..." - ``` -5. **REPEAT** - Continue until information is complete -6. **SYNTHESIZE** - Combine all answers before responding to user +Returns the new notebook title and URL. If UI title editing is unavailable, the command still returns the created URL and reports the limitation. -## Script Reference +### 5) Add Sources to a Notebook -### Authentication Management (`auth_manager.py`) +**Web / YouTube / Google Docs / Slides links** ```bash -python scripts/run.py auth_manager.py setup # Initial setup (browser visible) -python scripts/run.py auth_manager.py status # Check authentication -python scripts/run.py auth_manager.py reauth # Re-authenticate (browser visible) -python scripts/run.py auth_manager.py clear # Clear authentication -``` +python scripts/run.py notebook_manager.py add-source \ + --notebook-url "https://notebooklm.google.com/notebook/..." \ + --source-url "https://example.com" -### Notebook Management (`notebook_manager.py`) -```bash -python scripts/run.py notebook_manager.py add --url URL --name NAME --description DESC --topics TOPICS -python scripts/run.py notebook_manager.py list -python scripts/run.py notebook_manager.py search --query QUERY -python scripts/run.py notebook_manager.py activate --id ID -python scripts/run.py notebook_manager.py remove --id ID -python scripts/run.py notebook_manager.py stats +python scripts/run.py notebook_manager.py add-source \ + --notebook-url "https://notebooklm.google.com/notebook/..." \ + --source-url "https://www.youtube.com/watch?v=..." + +python scripts/run.py notebook_manager.py add-source \ + --notebook-url "https://notebooklm.google.com/notebook/..." \ + --source-url "https://docs.google.com/document/d/.../edit" ``` -### Question Interface (`ask_question.py`) +**Local file / local PDF** ```bash -python scripts/run.py ask_question.py --question "..." [--notebook-id ID] [--notebook-url URL] [--show-browser] +python scripts/run.py notebook_manager.py add-source \ + --notebook-url "https://notebooklm.google.com/notebook/..." \ + --file "/path/to/file.pdf" ``` -### Data Cleanup (`cleanup_manager.py`) +**PDF URL** ```bash -python scripts/run.py cleanup_manager.py # Preview cleanup -python scripts/run.py cleanup_manager.py --confirm # Execute cleanup -python scripts/run.py cleanup_manager.py --preserve-library # Keep notebooks +python scripts/run.py notebook_manager.py add-source \ + --notebook-url "https://notebooklm.google.com/notebook/..." \ + --source-url "https://arxiv.org/pdf/1706.03762.pdf" ``` -## Environment Management - -The virtual environment is automatically managed: -- First run creates `.venv` automatically -- Dependencies install automatically -- Chromium browser installs automatically -- Everything isolated in skill directory +PDF URLs are downloaded first, then uploaded through NotebookLM's file upload UI. -Manual setup (only if automatic fails): +### 6) Discover Sources with Research ```bash -python -m venv .venv -source .venv/bin/activate # Linux/Mac -pip install -r requirements.txt -python -m patchright install chromium +python scripts/run.py notebook_manager.py discover-sources \ + --notebook-url "https://notebooklm.google.com/notebook/..." \ + --query "latest notebooklm research papers" + +python scripts/run.py notebook_manager.py discover-sources \ + --notebook-url "https://notebooklm.google.com/notebook/..." \ + --query "agent tool learning survey" \ + --mode fast-research + +python scripts/run.py notebook_manager.py discover-sources \ + --notebook-url "https://notebooklm.google.com/notebook/..." \ + --query "agent tool learning survey" \ + --mode deep-research + +# If you only want the results page and do not want auto-import: +python scripts/run.py notebook_manager.py discover-sources \ + --notebook-url "https://notebooklm.google.com/notebook/..." \ + --query "agent tool learning survey" \ + --mode deep-research \ + --no-import ``` -## Data Storage - -All data stored in `~/.claude/skills/notebooklm/data/`: -- `library.json` - Notebook metadata -- `auth_info.json` - Authentication status -- `browser_state/` - Browser cookies and session - -**Security:** Protected by `.gitignore`, never commit to git. +Behavior: +- Reuses the Sources view / Add source flow +- Types the query through real UI input events +- Returns visible candidate results and progress states +- Auto-clicks `Import` by default when NotebookLM exposes it +- Verifies import by re-opening the notebook and checking persisted source count +- For `--mode deep-research`, also returns `deep_research_details` when NotebookLM exposes the detailed sources panel + +Deep Research return fields: +- `deep_research_details.report_title` — NotebookLM report title +- `deep_research_details.report_subtitle` — report subtitle (for example `based on 51 sources`) +- `deep_research_details.cited_count` — count from the `Cited in Report` tab +- `deep_research_details.not_cited_count` — count from the `Not cited` tab +- `deep_research_details.cited_sources` — structured list of cited entries, each with `title` and optional `snippet` +- `deep_research_details.not_cited_sources` — structured list of non-cited entries, each with `title` and optional `snippet` + +Import verification fields: +- `source_count_before` — persisted source count before research/import +- `source_count_after` — persisted source count after import confirmation +- `import_result.source_count_before` / `import_result.source_count_after` — post-import persistence check details +- `auto_import_requested` — whether the command attempted Import automatically + +### 7) Ask Questions +```bash +# Stateless mode (existing behavior) +python scripts/run.py ask_question.py --question "Your question here" +python scripts/run.py ask_question.py --question "..." --notebook-id notebook-id +python scripts/run.py ask_question.py --question "..." --notebook-url "https://..." +python scripts/run.py ask_question.py --question "..." --show-browser -## Configuration +# Persistent session mode +python scripts/run.py session_manager.py create --notebook-url "https://notebooklm.google.com/notebook/..." +python scripts/run.py session_manager.py ask --session-id session-xxxxxxxxxxxx --question "first question" +python scripts/run.py session_manager.py ask --session-id session-xxxxxxxxxxxx --question "follow-up question" -Optional `.env` file in skill directory: -```env -HEADLESS=false # Browser visibility -SHOW_BROWSER=false # Default browser display -STEALTH_ENABLED=true # Human-like behavior -TYPING_WPM_MIN=160 # Typing speed -TYPING_WPM_MAX=240 -DEFAULT_NOTEBOOK_ID= # Default notebook +# Optional shortcut: reuse a persistent session via ask_question.py +python scripts/run.py ask_question.py --session-id session-xxxxxxxxxxxx --question "follow-up question" ``` -## Decision Flow - -``` -User mentions NotebookLM - ↓ -Check auth → python scripts/run.py auth_manager.py status - ↓ -If not authenticated → python scripts/run.py auth_manager.py setup - ↓ -Check/Add notebook → python scripts/run.py notebook_manager.py list/add (with --description) - ↓ -Activate notebook → python scripts/run.py notebook_manager.py activate --id ID - ↓ -Ask question → python scripts/run.py ask_question.py --question "..." - ↓ -See "Is that ALL you need?" → Ask follow-ups until complete - ↓ -Synthesize and respond to user +### 8) Persistent Session Lifecycle +```bash +python scripts/run.py session_manager.py list +python scripts/run.py session_manager.py info --session-id session-xxxxxxxxxxxx +python scripts/run.py session_manager.py reset --session-id session-xxxxxxxxxxxx +python scripts/run.py session_manager.py close --session-id session-xxxxxxxxxxxx +python scripts/run.py session_manager.py gc ``` -## Troubleshooting +Behavior: +- `session_manager.py` starts a background process on demand and keeps one live browser context alive +- Each `session_id` maps to one reusable NotebookLM tab/page +- Same-session asks are serialized through the daemon so only one question runs at a time +- Idle sessions are auto-expired after the configured timeout (default `1800` seconds) +- Stateless `ask_question.py` remains the default and is unchanged unless `--session-id` is passed + +## Navigation / Error Model + +The automation reports these states clearly instead of silently failing: +- Redirected to Google login page +- Redirected to Google account chooser +- Sent to Google password challenge page +- Notebook deep link fell back to NotebookLM home page +- Source import/upload failed +- Source import/upload timed out without confirmation +- Sources view / research controls missing +- Fast Research / Deep Research selector missing +- Research query input missing or submission did not start +- Deep Research timed out mid-run +- Research finished without visible results +- Import button missing, click failed, or imported sources did not persist + +The navigation layer waits for URL settling before classifying pages so SPA redirects do not cause false positives. + +## Data Storage -| Problem | Solution | -|---------|----------| -| ModuleNotFoundError | Use `run.py` wrapper | -| Authentication fails | Browser must be visible for setup! --show-browser | -| Rate limit (50/day) | Wait or switch Google account | -| Browser crashes | `python scripts/run.py cleanup_manager.py --preserve-library` | -| Notebook not found | Check with `notebook_manager.py list` | +All state stays in the skill directory: +- `data/library.json` — manual notebooks + remote-synced notebooks +- `data/auth_info.json` — auth metadata +- `data/browser_state/` — browser cookies and session state +- `data/sessions.json` — serializable persistent-session metadata (`session_id`, `notebook_url`, timestamps, counts, status) +- `data/session_runtime/` — daemon runtime files such as the local socket / pid file ## Best Practices -1. **Always use run.py** - Handles environment automatically -2. **Check auth first** - Before any operations -3. **Follow-up questions** - Don't stop at first answer -4. **Browser visible for auth** - Required for manual login -5. **Include context** - Each question is independent -6. **Synthesize answers** - Combine multiple responses +1. Always use `run.py` +2. Validate auth before deeper workflows +3. Use `sync-remote` before asking users for notebook URLs they may not remember +4. Prefer `--show-browser` when debugging UI changes +5. Keep manual library metadata human-meaningful; keep remote sync separate ## Limitations -- No session persistence (each question = new browser) -- Rate limits on free Google accounts (50 queries/day) -- Manual upload required (user must add docs to NotebookLM) -- Browser overhead (few seconds per question) - -## Resources (Skill Structure) - -**Important directories and files:** - -- `scripts/` - All automation scripts (ask_question.py, notebook_manager.py, etc.) -- `data/` - Local storage for authentication and notebook library -- `references/` - Extended documentation: - - `api_reference.md` - Detailed API documentation for all scripts - - `troubleshooting.md` - Common issues and solutions - - `usage_patterns.md` - Best practices and workflow examples -- `.venv/` - Isolated Python environment (auto-created on first run) -- `.gitignore` - Protects sensitive data from being committed +- NotebookLM UI can change selectors without notice +- Google Docs / Slides links currently go through NotebookLM's URL import path; if the account/UI refuses them, the command reports a clear error instead of silently succeeding +- Free Google accounts still have NotebookLM-side rate limits diff --git a/scripts/ask_question.py b/scripts/ask_question.py index aa47e4b..5b131bf 100755 --- a/scripts/ask_question.py +++ b/scripts/ask_question.py @@ -10,9 +10,9 @@ """ import argparse +import os import sys import time -import re from pathlib import Path from patchright.sync_api import sync_playwright @@ -22,12 +22,12 @@ from auth_manager import AuthManager from notebook_manager import NotebookLibrary -from config import QUERY_INPUT_SELECTORS, RESPONSE_SELECTORS +from config import NOTEBOOKLM_GOOGLE_ACCOUNT_ENV, QUERY_INPUT_SELECTORS, RESPONSE_SELECTORS from browser_utils import BrowserFactory, StealthUtils +from notebook_navigation import explain_navigation_failure, open_notebook_page +from session_manager import ask_via_session_manager, SessionManagerError -# Follow-up reminder (adapted from MCP server for stateless operation) -# Since we don't have persistent sessions, we encourage comprehensive questions FOLLOW_UP_REMINDER = ( "\n\nEXTREMELY IMPORTANT: Is that ALL you need to know? " "You can always ask another question! Think about it carefully: " @@ -62,95 +62,89 @@ def ask_notebooklm(question: str, notebook_url: str, headless: bool = True) -> s context = None try: - # Start playwright playwright = sync_playwright().start() - - # Launch persistent browser context using factory context = BrowserFactory.launch_persistent_context( playwright, - headless=headless + headless=headless, ) - # Navigate to notebook page = context.new_page() print(" 🌐 Opening notebook...") - page.goto(notebook_url, wait_until="domcontentloaded") + auth_info = auth.get_auth_info() + preferred_email = os.getenv(NOTEBOOKLM_GOOGLE_ACCOUNT_ENV) or auth_info.get("google_account_email") + navigation = open_notebook_page( + page, + notebook_url, + preferred_email=preferred_email, + ) - # Wait for NotebookLM - page.wait_for_url(re.compile(r"^https://notebooklm\.google\.com/"), timeout=10000) + if navigation["status"] != "ok": + for line in explain_navigation_failure(navigation, preferred_email=preferred_email): + print(line) + if navigation["status"] == "multiple-accounts": + print(f" Set {NOTEBOOKLM_GOOGLE_ACCOUNT_ENV}=you@example.com and retry") + return None - # Wait for query input (MCP approach) print(" ⏳ Waiting for query input...") query_element = None + input_selector = None for selector in QUERY_INPUT_SELECTORS: try: - query_element = page.wait_for_selector( - selector, - timeout=10000, - state="visible" # Only check visibility, not disabled! - ) + query_element = page.wait_for_selector(selector, timeout=10000, state="visible") if query_element: + input_selector = selector print(f" ✓ Found input: {selector}") break - except: + except Exception: continue - if not query_element: + if not query_element or not input_selector: print(" ❌ Could not find query input") return None - # Type question (human-like, fast) print(" ⏳ Typing question...") - - # Use primary selector for typing - input_selector = QUERY_INPUT_SELECTORS[0] StealthUtils.human_type(page, input_selector, question) - # Submit print(" 📤 Submitting...") page.keyboard.press("Enter") - - # Small pause StealthUtils.random_delay(500, 1500) - # Wait for response (MCP approach: poll for stable text) print(" ⏳ Waiting for answer...") - answer = None stable_count = 0 last_text = None - deadline = time.time() + 120 # 2 minutes timeout + deadline = time.time() + 120 while time.time() < deadline: - # Check if NotebookLM is still thinking (most reliable indicator) try: - thinking_element = page.query_selector('div.thinking-message') + thinking_element = page.query_selector("div.thinking-message") if thinking_element and thinking_element.is_visible(): time.sleep(1) continue - except: + except Exception: pass - # Try to find response with MCP selectors for selector in RESPONSE_SELECTORS: try: elements = page.query_selector_all(selector) - if elements: - # Get last (newest) response - latest = elements[-1] - text = latest.inner_text().strip() - - if text: - if text == last_text: - stable_count += 1 - if stable_count >= 3: # Stable for 3 polls - answer = text - break - else: - stable_count = 0 - last_text = text - except: + if not elements: + continue + + latest = elements[-1] + text = latest.inner_text().strip() + if not text: + continue + + if text == last_text: + stable_count += 1 + if stable_count >= 3: + answer = text + break + else: + stable_count = 0 + last_text = text + except Exception: continue if answer: @@ -163,67 +157,80 @@ def ask_notebooklm(question: str, notebook_url: str, headless: bool = True) -> s return None print(" ✅ Got answer!") - # Add follow-up reminder to encourage Claude to ask more questions return answer + FOLLOW_UP_REMINDER - except Exception as e: - print(f" ❌ Error: {e}") + except Exception as error: + print(f" ❌ Error: {error}") import traceback traceback.print_exc() return None finally: - # Always clean up if context: try: context.close() - except: + except Exception: pass if playwright: try: playwright.stop() - except: + except Exception: pass def main(): - parser = argparse.ArgumentParser(description='Ask NotebookLM a question') + parser = argparse.ArgumentParser(description="Ask NotebookLM a question") - parser.add_argument('--question', required=True, help='Question to ask') - parser.add_argument('--notebook-url', help='NotebookLM notebook URL') - parser.add_argument('--notebook-id', help='Notebook ID from library') - parser.add_argument('--show-browser', action='store_true', help='Show browser') + parser.add_argument("--question", required=True, help="Question to ask") + parser.add_argument("--notebook-url", help="NotebookLM notebook URL") + parser.add_argument("--notebook-id", help="Notebook ID from library") + parser.add_argument("--session-id", help="Persistent session ID managed by session_manager.py") + parser.add_argument("--show-browser", action="store_true", help="Show browser") args = parser.parse_args() - # Resolve notebook URL + if args.session_id: + try: + result = ask_via_session_manager(args.session_id, args.question) + except SessionManagerError as error: + print(f"❌ {error}") + return 1 + + print("\n" + "=" * 60) + print(f"Session: {result['session_id']}") + print(f"Question: {result['question']}") + print("=" * 60) + print() + print(result["answer"]) + print() + print("=" * 60) + return 0 + notebook_url = args.notebook_url if not notebook_url and args.notebook_id: library = NotebookLibrary() - notebook = library.get_notebook(args.notebook_id) + notebook = library.get_notebook(args.notebook_id) or library.get_remote_notebook(args.notebook_id) if notebook: - notebook_url = notebook['url'] + notebook_url = notebook["url"] else: print(f"❌ Notebook '{args.notebook_id}' not found") return 1 if not notebook_url: - # Check for active notebook first library = NotebookLibrary() active = library.get_active_notebook() if active: - notebook_url = active['url'] + notebook_url = active["url"] print(f"📚 Using active notebook: {active['name']}") else: - # Show available notebooks notebooks = library.list_notebooks() if notebooks: print("\n📚 Available notebooks:") - for nb in notebooks: - mark = " [ACTIVE]" if nb.get('id') == library.active_notebook_id else "" - print(f" {nb['id']}: {nb['name']}{mark}") + for notebook in notebooks: + mark = " [ACTIVE]" if notebook.get("id") == library.active_notebook_id else "" + print(f" {notebook['id']}: {notebook['name']}{mark}") print("\nSpecify with --notebook-id or set active:") print("python scripts/run.py notebook_manager.py activate --id ID") else: @@ -231,11 +238,10 @@ def main(): print("python scripts/run.py notebook_manager.py add --url URL --name NAME --description DESC --topics TOPICS") return 1 - # Ask the question answer = ask_notebooklm( question=args.question, notebook_url=notebook_url, - headless=not args.show_browser + headless=not args.show_browser, ) if answer: @@ -247,9 +253,9 @@ def main(): print() print("=" * 60) return 0 - else: - print("\n❌ Failed to get answer") - return 1 + + print("\n❌ Failed to get answer") + return 1 if __name__ == "__main__": diff --git a/scripts/auth_manager.py b/scripts/auth_manager.py index 54c8b3b..6c1f075 100755 --- a/scripts/auth_manager.py +++ b/scripts/auth_manager.py @@ -24,8 +24,16 @@ # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent)) -from config import BROWSER_STATE_DIR, STATE_FILE, AUTH_INFO_FILE, DATA_DIR +from config import ( + AUTH_INFO_FILE, + BROWSER_STATE_DIR, + DATA_DIR, + NOTEBOOKLM_BASE_URL, + NOTEBOOKLM_URL_REGEX, + STATE_FILE, +) from browser_utils import BrowserFactory +from notebook_navigation import list_account_choices, open_home_page class AuthManager: @@ -83,6 +91,46 @@ def get_auth_info(self) -> Dict[str, Any]: return info + def _has_google_session(self, context: BrowserContext) -> bool: + """Check whether the browser context can access real NotebookLM UI without any Google sign-in challenge.""" + page = context.new_page() + try: + preferred_email = self.get_auth_info().get('google_account_email') + result = open_home_page(page, preferred_email=preferred_email) + return result.get('status') == 'ok' + finally: + try: + page.close() + except Exception: + pass + + def _detect_google_account_email(self, context: BrowserContext) -> Optional[str]: + """Best-effort detection of the current Google account email.""" + page = context.new_page() + try: + page.goto( + "https://accounts.google.com/AccountChooser?continue=https://myaccount.google.com/", + wait_until="domcontentloaded", + timeout=30000, + ) + + emails = list_account_choices(page) + if len(emails) == 1: + return emails[0] + + body_text = page.locator("body").inner_text(timeout=3000) + match = re.search(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", body_text) + if match: + return match.group(0) + return None + except Exception: + return None + finally: + try: + page.close() + except Exception: + pass + def setup_auth(self, headless: bool = False, timeout_minutes: int = 10) -> bool: """ Perform interactive authentication setup @@ -109,31 +157,32 @@ def setup_auth(self, headless: bool = False, timeout_minutes: int = 10) -> bool: headless=headless ) - # Navigate to NotebookLM - page = context.new_page() - page.goto("https://notebooklm.google.com", wait_until="domcontentloaded") - - # Check if already authenticated - if "notebooklm.google.com" in page.url and "accounts.google.com" not in page.url: - print(" ✅ Already authenticated!") + if self._has_google_session(context): + print(" ✅ Google account session detected") self._save_browser_state(context) + self._save_auth_info(self._detect_google_account_email(context)) return True + # Navigate to NotebookLM / Google login + page = context.new_page() + page.goto(NOTEBOOKLM_BASE_URL, wait_until="domcontentloaded") + # Wait for manual login print("\n ⏳ Please log in to your Google account...") print(f" ⏱️ Waiting up to {timeout_minutes} minutes for login...") try: - # Wait for URL to change to NotebookLM (regex ensures it's the actual domain, not a parameter) - timeout_ms = int(timeout_minutes * 60 * 1000) - page.wait_for_url(re.compile(r"^https://notebooklm\.google\.com/"), timeout=timeout_ms) - - print(f" ✅ Login successful!") - - # Save authentication state - self._save_browser_state(context) - self._save_auth_info() - return True + deadline = time.time() + timeout_minutes * 60 + while time.time() < deadline: + if self._has_google_session(context): + print(" ✅ Login successful!") + self._save_browser_state(context) + self._save_auth_info(self._detect_google_account_email(context)) + return True + time.sleep(2) + + print(" ❌ Authentication timeout: Google account session not detected") + return False except Exception as e: print(f" ❌ Authentication timeout: {e}") @@ -167,13 +216,15 @@ def _save_browser_state(self, context: BrowserContext): print(f" ❌ Failed to save browser state: {e}") raise - def _save_auth_info(self): + def _save_auth_info(self, google_account_email: Optional[str] = None): """Save authentication metadata""" try: info = { 'authenticated_at': time.time(), 'authenticated_at_iso': time.strftime('%Y-%m-%d %H:%M:%S') } + if google_account_email: + info['google_account_email'] = google_account_email with open(self.auth_info_file, 'w') as f: json.dump(info, f, indent=2) except Exception: @@ -255,17 +306,15 @@ def validate_auth(self) -> bool: headless=True ) - # Try to access NotebookLM - page = context.new_page() - page.goto("https://notebooklm.google.com", wait_until="domcontentloaded", timeout=30000) - - # Check if we can access NotebookLM - if "notebooklm.google.com" in page.url and "accounts.google.com" not in page.url: + if self._has_google_session(context): print(" ✅ Authentication is valid") + auth_info = self.get_auth_info() + if not auth_info.get('google_account_email'): + self._save_auth_info(self._detect_google_account_email(context)) return True - else: - print(" ❌ Authentication is invalid (redirected to login)") - return False + + print(" ❌ Authentication is invalid (Google session not detected)") + return False except Exception as e: print(f" ❌ Validation failed: {e}") @@ -330,6 +379,8 @@ def main(): print(f" State age: {info['state_age_hours']:.1f} hours") if info.get('authenticated_at_iso'): print(f" Last auth: {info['authenticated_at_iso']}") + if info.get('google_account_email'): + print(f" Google account: {info['google_account_email']}") print(f" State file: {info['state_file']}") elif args.command == 'validate': diff --git a/scripts/browser_session.py b/scripts/browser_session.py index b121af8..fc8b383 100755 --- a/scripts/browser_session.py +++ b/scripts/browser_session.py @@ -5,17 +5,19 @@ Based on the original NotebookLM API implementation """ -import time import sys -from typing import Any, Dict, Optional +import time from pathlib import Path +from typing import Any, Dict, Optional -from patchright.sync_api import BrowserContext, Page +from patchright.sync_api import BrowserContext # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent)) from browser_utils import StealthUtils +from config import QUERY_INPUT_SELECTORS, QUERY_TIMEOUT_SECONDS, RESPONSE_SELECTORS +from notebook_navigation import explain_navigation_failure, open_notebook_page class BrowserSession: @@ -27,7 +29,13 @@ class BrowserSession: previous messages. """ - def __init__(self, session_id: str, context: BrowserContext, notebook_url: str): + def __init__( + self, + session_id: str, + context: BrowserContext, + notebook_url: str, + preferred_email: Optional[str] = None, + ): """ Initialize a new browser session @@ -35,6 +43,7 @@ def __init__(self, session_id: str, context: BrowserContext, notebook_url: str): session_id: Unique identifier for this session context: Browser context (shared or dedicated) notebook_url: Target NotebookLM URL for this session + preferred_email: Optional Google account email hint for chooser pages """ self.id = session_id self.created_at = time.time() @@ -42,51 +51,50 @@ def __init__(self, session_id: str, context: BrowserContext, notebook_url: str): self.message_count = 0 self.notebook_url = notebook_url self.context = context + self.preferred_email = preferred_email self.page = None self.stealth = StealthUtils() - # Initialize the session self._initialize() def _initialize(self): """Initialize the browser session and navigate to NotebookLM""" print(f"🚀 Creating session {self.id}...") - # Create new page (tab) in context self.page = self.context.new_page() - print(f" 🌐 Navigating to NotebookLM...") + print(" 🌐 Navigating to NotebookLM...") try: - # Navigate to notebook - self.page.goto(self.notebook_url, wait_until="domcontentloaded", timeout=30000) + navigation = open_notebook_page( + self.page, + self.notebook_url, + preferred_email=self.preferred_email, + ) + if navigation["status"] != "ok": + message = " ".join(line.strip() for line in explain_navigation_failure(navigation)) + raise RuntimeError(message) - # Check if login is needed - if "accounts.google.com" in self.page.url: - raise RuntimeError("Authentication required. Please run auth_manager.py setup first.") - - # Wait for page to be ready self._wait_for_ready() - - # Simulate human inspection self.stealth.random_mouse_movement(self.page) self.stealth.random_delay(300, 600) print(f"✅ Session {self.id} ready!") - except Exception as e: - print(f"❌ Failed to initialize session: {e}") + except Exception as error: + print(f"❌ Failed to initialize session: {error}") if self.page: self.page.close() raise def _wait_for_ready(self): """Wait for NotebookLM page to be ready""" - try: - # Wait for chat input - self.page.wait_for_selector("textarea.query-box-input", timeout=10000, state="visible") - except Exception: - # Try alternative selector - self.page.wait_for_selector('textarea[aria-label="Feld für Anfragen"]', timeout=5000, state="visible") + for selector in QUERY_INPUT_SELECTORS: + try: + self.page.wait_for_selector(selector, timeout=10000, state="visible") + return + except Exception: + continue + raise TimeoutError("NotebookLM query input did not become ready") def ask(self, question: str) -> Dict[str, Any]: """ @@ -104,77 +112,70 @@ def ask(self, question: str) -> Dict[str, Any]: print(f"💬 [{self.id}] Asking: {question}") - # Snapshot current answer to detect new response previous_answer = self._snapshot_latest_response() - # Find chat input - chat_input_selector = "textarea.query-box-input" - try: - self.page.wait_for_selector(chat_input_selector, timeout=5000, state="visible") - except Exception: - chat_input_selector = 'textarea[aria-label="Feld für Anfragen"]' - self.page.wait_for_selector(chat_input_selector, timeout=5000, state="visible") + chat_input_selector = None + for selector in QUERY_INPUT_SELECTORS: + try: + self.page.wait_for_selector(selector, timeout=5000, state="visible") + chat_input_selector = selector + break + except Exception: + continue + + if not chat_input_selector: + raise TimeoutError("Could not find NotebookLM query input") - # Click and type with human-like behavior self.stealth.realistic_click(self.page, chat_input_selector) self.stealth.human_type(self.page, chat_input_selector, question) - - # Small pause before submit self.stealth.random_delay(300, 800) - - # Submit self.page.keyboard.press("Enter") - # Wait for response print(" ⏳ Waiting for response...") self.stealth.random_delay(1500, 3000) - # Get new answer answer = self._wait_for_latest_answer(previous_answer) - if not answer: - raise Exception("Empty response from NotebookLM") + raise RuntimeError("Empty response from NotebookLM") print(f" ✅ Got response ({len(answer)} chars)") - return { "status": "success", "question": question, "answer": answer, "session_id": self.id, - "notebook_url": self.notebook_url + "notebook_url": self.notebook_url, } - except Exception as e: - print(f" ❌ Error: {e}") + except Exception as error: + print(f" ❌ Error: {error}") return { "status": "error", "question": question, - "error": str(e), - "session_id": self.id + "error": str(error), + "session_id": self.id, } def _snapshot_latest_response(self) -> Optional[str]: """Get the current latest response text""" - try: - # Use correct NotebookLM selector - responses = self.page.query_selector_all(".to-user-container .message-text-content") - if responses: - return responses[-1].inner_text() - except Exception: - pass + for selector in RESPONSE_SELECTORS: + try: + responses = self.page.query_selector_all(selector) + if responses: + return responses[-1].inner_text() + except Exception: + continue return None - def _wait_for_latest_answer(self, previous_answer: Optional[str], timeout: int = 120) -> str: + def _wait_for_latest_answer(self, previous_answer: Optional[str], timeout: int = QUERY_TIMEOUT_SECONDS) -> str: """Wait for and extract the new answer""" start_time = time.time() last_candidate = None stable_count = 0 while time.time() - start_time < timeout: - # Check if NotebookLM is still thinking (most reliable indicator) try: - thinking_element = self.page.query_selector('div.thinking-message') + thinking_element = self.page.query_selector("div.thinking-message") if thinking_element and thinking_element.is_visible(): time.sleep(0.5) continue @@ -182,15 +183,13 @@ def _wait_for_latest_answer(self, previous_answer: Optional[str], timeout: int = pass try: - # Use correct NotebookLM selector - responses = self.page.query_selector_all(".to-user-container .message-text-content") + for selector in RESPONSE_SELECTORS: + responses = self.page.query_selector_all(selector) + if not responses: + continue - if responses: latest_text = responses[-1].inner_text().strip() - - # Check if it's a new response if latest_text and latest_text != previous_answer: - # Check if text is stable (3 consecutive polls) if latest_text == last_candidate: stable_count += 1 if stable_count >= 3: @@ -198,7 +197,7 @@ def _wait_for_latest_answer(self, previous_answer: Optional[str], timeout: int = else: stable_count = 1 last_candidate = latest_text - + break except Exception: pass @@ -227,8 +226,10 @@ def close(self): if self.page: try: self.page.close() - except Exception as e: - print(f" ⚠️ Error closing page: {e}") + except Exception as error: + print(f" ⚠️ Error closing page: {error}") + finally: + self.page = None print(f"✅ Session {self.id} closed") @@ -241,7 +242,7 @@ def get_info(self) -> Dict[str, Any]: "age_seconds": time.time() - self.created_at, "inactive_seconds": time.time() - self.last_activity, "message_count": self.message_count, - "notebook_url": self.notebook_url + "notebook_url": self.notebook_url, } def is_expired(self, timeout_seconds: int = 900) -> bool: @@ -250,6 +251,5 @@ def is_expired(self, timeout_seconds: int = 900) -> bool: if __name__ == "__main__": - # Example usage print("Browser Session Module - Use ask_question.py for main interface") - print("This module provides low-level browser session management.") \ No newline at end of file + print("This module provides low-level browser session management.") diff --git a/scripts/browser_utils.py b/scripts/browser_utils.py index 60a1210..4a9d39d 100755 --- a/scripts/browser_utils.py +++ b/scripts/browser_utils.py @@ -4,42 +4,115 @@ """ import json -import time +import os import random -from typing import Optional, List +import shutil +import tempfile +import time +from typing import List, Optional +from urllib.parse import urlparse + +from patchright.sync_api import BrowserContext, Page, Playwright -from patchright.sync_api import Playwright, BrowserContext, Page -from config import BROWSER_PROFILE_DIR, STATE_FILE, BROWSER_ARGS, USER_AGENT +from config import ( + BROWSER_ARGS, + BROWSER_PROFILE_DIR, + NOTEBOOKLM_PROXY_BYPASS_ENV, + NOTEBOOKLM_PROXY_URL_ENV, + STATE_FILE, + USER_AGENT, +) class BrowserFactory: """Factory for creating configured browser contexts""" + @staticmethod + def _get_proxy_settings() -> tuple[Optional[dict], List[str]]: + """Build Playwright proxy settings and Chrome args from environment.""" + proxy_url = ( + os.getenv(NOTEBOOKLM_PROXY_URL_ENV) + or os.getenv("HTTPS_PROXY") + or os.getenv("https_proxy") + or os.getenv("HTTP_PROXY") + or os.getenv("http_proxy") + or os.getenv("ALL_PROXY") + or os.getenv("all_proxy") + ) + + if not proxy_url: + return None, [] + + parsed = urlparse(proxy_url) + if not parsed.scheme or not parsed.hostname: + print(f" ⚠️ Ignoring invalid proxy URL: {proxy_url}") + return None, [] + + server = f"{parsed.scheme}://{parsed.hostname}" + if parsed.port: + server += f":{parsed.port}" + + proxy = {"server": server} + if parsed.username: + proxy["username"] = parsed.username + if parsed.password: + proxy["password"] = parsed.password + + bypass = ( + os.getenv(NOTEBOOKLM_PROXY_BYPASS_ENV) + or os.getenv("NO_PROXY") + or os.getenv("no_proxy") + ) + if bypass: + proxy["bypass"] = bypass + + chrome_args = [f"--proxy-server={server}"] + return proxy, chrome_args + @staticmethod def launch_persistent_context( playwright: Playwright, headless: bool = True, - user_data_dir: str = str(BROWSER_PROFILE_DIR) + user_data_dir: str = str(BROWSER_PROFILE_DIR), ) -> BrowserContext: """ Launch a persistent browser context with anti-detection features and cookie workaround. """ - # Launch persistent context - context = playwright.chromium.launch_persistent_context( + proxy, proxy_args = BrowserFactory._get_proxy_settings() + + launch_kwargs = dict( user_data_dir=user_data_dir, - channel="chrome", # Use real Chrome + channel="chrome", headless=headless, no_viewport=True, ignore_default_args=["--enable-automation"], user_agent=USER_AGENT, - args=BROWSER_ARGS + args=BROWSER_ARGS + proxy_args, + proxy=proxy, ) - # Cookie Workaround for Playwright bug #36139 - # Session cookies (expires=-1) don't persist in user_data_dir automatically - BrowserFactory._inject_cookies(context) + try: + context = playwright.chromium.launch_persistent_context(**launch_kwargs) + except Exception as error: + if "ProcessSingleton" not in str(error): + raise + fallback_root = tempfile.mkdtemp(prefix="notebooklm-profile-") + fallback_dir = os.path.join(fallback_root, "browser_profile") + print(" ⚠️ Browser profile is already in use; cloning it to a temporary profile") + try: + shutil.copytree(user_data_dir, fallback_dir, dirs_exist_ok=True) + for lock_name in ["SingletonLock", "SingletonCookie", "SingletonSocket", "DevToolsActivePort"]: + lock_path = os.path.join(fallback_dir, lock_name) + if os.path.exists(lock_path): + os.remove(lock_path) + except Exception as copy_error: + print(f" ⚠️ Could not clone browser profile cleanly: {copy_error}") + os.makedirs(fallback_dir, exist_ok=True) + launch_kwargs["user_data_dir"] = fallback_dir + context = playwright.chromium.launch_persistent_context(**launch_kwargs) + BrowserFactory._inject_cookies(context) return context @staticmethod @@ -47,13 +120,12 @@ def _inject_cookies(context: BrowserContext): """Inject cookies from state.json if available""" if STATE_FILE.exists(): try: - with open(STATE_FILE, 'r') as f: - state = json.load(f) - if 'cookies' in state and len(state['cookies']) > 0: - context.add_cookies(state['cookies']) - # print(f" 🔧 Injected {len(state['cookies'])} cookies from state.json") - except Exception as e: - print(f" ⚠️ Could not load state.json: {e}") + with open(STATE_FILE, "r", encoding="utf-8") as handle: + state = json.load(handle) + if "cookies" in state and len(state["cookies"]) > 0: + context.add_cookies(state["cookies"]) + except Exception as error: + print(f" ⚠️ Could not load state.json: {error}") class StealthUtils: @@ -69,25 +141,35 @@ def human_type(page: Page, selector: str, text: str, wpm_min: int = 320, wpm_max """Type with human-like speed""" element = page.query_selector(selector) if not element: - # Try waiting if not immediately found try: element = page.wait_for_selector(selector, timeout=2000) - except: + except Exception: pass - + if not element: print(f"⚠️ Element not found for typing: {selector}") return - # Click to focus element.click() - - # Type for char in text: element.type(char, delay=random.uniform(25, 75)) if random.random() < 0.05: time.sleep(random.uniform(0.15, 0.4)) + @staticmethod + def random_mouse_movement(page: Page): + """Move the mouse a little to look less robotic.""" + try: + for _ in range(random.randint(2, 4)): + page.mouse.move( + random.randint(50, 900), + random.randint(50, 700), + steps=random.randint(3, 8), + ) + StealthUtils.random_delay(60, 180) + except Exception: + pass + @staticmethod def realistic_click(page: Page, selector: str): """Click with realistic movement""" @@ -95,11 +177,10 @@ def realistic_click(page: Page, selector: str): if not element: return - # Optional: Move mouse to element (simplified) box = element.bounding_box() if box: - x = box['x'] + box['width'] / 2 - y = box['y'] + box['height'] / 2 + x = box["x"] + box["width"] / 2 + y = box["y"] + box["height"] / 2 page.mouse.move(x, y, steps=5) StealthUtils.random_delay(100, 300) diff --git a/scripts/cleanup_manager.py b/scripts/cleanup_manager.py index c4a8fc2..2acd972 100755 --- a/scripts/cleanup_manager.py +++ b/scripts/cleanup_manager.py @@ -42,6 +42,7 @@ def get_cleanup_paths(self, preserve_library: bool = False) -> Dict[str, Any]: paths = { 'browser_state': [], 'sessions': [], + 'session_runtime': [], 'library': [], 'auth': [], 'other': [] @@ -73,6 +74,16 @@ def get_cleanup_paths(self, preserve_library: bool = False) -> Dict[str, Any]: }) total_size += size + session_runtime_dir = self.data_dir / "session_runtime" + if session_runtime_dir.exists(): + size = self._get_size(session_runtime_dir) + paths['session_runtime'].append({ + 'path': str(session_runtime_dir), + 'size': size, + 'type': 'dir' + }) + total_size += size + # Library (unless preserved) if not preserve_library: library_file = self.data_dir / "library.json" @@ -98,7 +109,7 @@ def get_cleanup_paths(self, preserve_library: bool = False) -> Dict[str, Any]: # Other files in data dir (but NEVER .venv!) for item in self.data_dir.iterdir(): - if item.name not in ['browser_state', 'sessions.json', 'library.json', 'auth_info.json']: + if item.name not in ['browser_state', 'sessions.json', 'session_runtime', 'library.json', 'auth_info.json']: size = self._get_size(item) paths['other'].append({ 'path': str(item), diff --git a/scripts/config.py b/scripts/config.py index 4486b55..56b5f83 100755 --- a/scripts/config.py +++ b/scripts/config.py @@ -14,26 +14,81 @@ AUTH_INFO_FILE = DATA_DIR / "auth_info.json" LIBRARY_FILE = DATA_DIR / "library.json" +# NotebookLM URLs +NOTEBOOKLM_PUBLIC_URL = "https://notebooklm.google" +NOTEBOOKLM_APP_URL = "https://notebooklm.google.com" +NOTEBOOKLM_BASE_URL = NOTEBOOKLM_APP_URL +NOTEBOOKLM_URL_REGEX = r"^https://notebooklm\.google(?:\.com)?(?:/|$)" + +# Optional Google account hint for account chooser pages +NOTEBOOKLM_GOOGLE_ACCOUNT_ENV = "NOTEBOOKLM_GOOGLE_ACCOUNT" +NOTEBOOKLM_PROXY_URL_ENV = "NOTEBOOKLM_PROXY_URL" +NOTEBOOKLM_PROXY_BYPASS_ENV = "NOTEBOOKLM_PROXY_BYPASS" + # NotebookLM Selectors QUERY_INPUT_SELECTORS = [ - "textarea.query-box-input", # Primary - 'textarea[aria-label="Feld für Anfragen"]', # Fallback German - 'textarea[aria-label="Input for queries"]', # Fallback English + "textarea.query-box-input", + 'textarea[aria-label="Feld für Anfragen"]', + 'textarea[aria-label="Input for queries"]', ] RESPONSE_SELECTORS = [ - ".to-user-container .message-text-content", # Primary + ".to-user-container .message-text-content", "[data-message-author='bot']", "[data-message-author='assistant']", ] +HOME_READY_SELECTORS = [ + 'button[aria-label="Create new notebook"]', + "mat-card.create-new-action-button", + "div.my-projects-container project-button", +] + +NOTEBOOK_READY_SELECTORS = [ + "input.title-input", + "input[aria-label='Notebook title']", + "button[aria-label='Share notebook']", + "button[aria-label='Configure notebook']", +] + +TITLE_INPUT_SELECTORS = [ + "input.title-input", + "header input", +] + +SOURCE_PANEL_READY_SELECTORS = [ + "input[type='file'][name='Filedata']", + "input[type='file']", + 'textarea[aria-label="Enter URLs"]', + 'textarea[placeholder="Paste any links"]', + 'button:has-text("Upload files")', + 'button:has-text("Websites")', + 'button:has-text("Drive")', + 'button:has-text("Copied text")', +] + +SOURCE_URL_TEXTAREA_SELECTORS = [ + 'textarea[aria-label="Enter URLs"]', + 'textarea[placeholder="Paste any links"]', +] + +SOURCE_FILE_INPUT_SELECTORS = [ + "input[type='file'][name='Filedata']", + "input[type='file']", +] + +CREATE_NOTEBOOK_BUTTON_SELECTORS = [ + 'button[aria-label="Create new notebook"]', + "mat-card.create-new-action-button", +] + # Browser Configuration BROWSER_ARGS = [ - '--disable-blink-features=AutomationControlled', # Patches navigator.webdriver + '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-sandbox', '--no-first-run', - '--no-default-browser-check' + '--no-default-browser-check', ] USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' @@ -42,3 +97,7 @@ LOGIN_TIMEOUT_MINUTES = 10 QUERY_TIMEOUT_SECONDS = 120 PAGE_LOAD_TIMEOUT = 30000 +UI_SHORT_TIMEOUT_MS = 5000 +UI_MEDIUM_TIMEOUT_MS = 10000 +UI_LONG_TIMEOUT_MS = 30000 +SOURCE_UPLOAD_TIMEOUT_SECONDS = 180 diff --git a/scripts/notebook_manager.py b/scripts/notebook_manager.py index e10e156..a346d64 100755 --- a/scripts/notebook_manager.py +++ b/scripts/notebook_manager.py @@ -1,64 +1,69 @@ #!/usr/bin/env python3 """ Notebook Library Management for NotebookLM -Manages a library of NotebookLM notebooks with metadata -Based on the MCP server implementation +Manages manual and remote-discovered NotebookLM notebooks. """ -import json import argparse -import uuid +import json import os -from pathlib import Path -from typing import Dict, List, Optional, Any +import re +import sys from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +from patchright.sync_api import sync_playwright + +sys.path.insert(0, str(Path(__file__).parent)) + +from auth_manager import AuthManager +from browser_utils import BrowserFactory +from config import LIBRARY_FILE, NOTEBOOKLM_GOOGLE_ACCOUNT_ENV +from notebook_navigation import explain_navigation_failure, normalize_notebooklm_url +from notebook_ui import NotebookUIError, add_source, create_notebook, discover_sources, list_remote_notebooks class NotebookLibrary: - """Manages a collection of NotebookLM notebooks with metadata""" + """Manages manual and remote-discovered NotebookLM notebooks.""" def __init__(self): - """Initialize the notebook library""" - # Store data within the skill directory - skill_dir = Path(__file__).parent.parent - self.data_dir = skill_dir / "data" - self.data_dir.mkdir(parents=True, exist_ok=True) - - self.library_file = self.data_dir / "library.json" + self.library_file = LIBRARY_FILE + self.library_file.parent.mkdir(parents=True, exist_ok=True) self.notebooks: Dict[str, Dict[str, Any]] = {} + self.remote_notebooks: Dict[str, Dict[str, Any]] = {} self.active_notebook_id: Optional[str] = None - - # Load existing library self._load_library() def _load_library(self): - """Load library from disk""" if self.library_file.exists(): try: - with open(self.library_file, 'r') as f: - data = json.load(f) - self.notebooks = data.get('notebooks', {}) - self.active_notebook_id = data.get('active_notebook_id') - print(f"📚 Loaded library with {len(self.notebooks)} notebooks") - except Exception as e: - print(f"⚠️ Error loading library: {e}") + with open(self.library_file, 'r') as handle: + data = json.load(handle) + self.notebooks = data.get('notebooks', {}) + self.remote_notebooks = data.get('remote_notebooks', {}) + self.active_notebook_id = data.get('active_notebook_id') + print( + f"📚 Loaded library with {len(self.notebooks)} manual notebooks and " + f"{len(self.remote_notebooks)} remote notebooks" + ) + except Exception as error: + print(f"⚠️ Error loading library: {error}") self.notebooks = {} + self.remote_notebooks = {} self.active_notebook_id = None else: self._save_library() def _save_library(self): - """Save library to disk""" - try: - data = { - 'notebooks': self.notebooks, - 'active_notebook_id': self.active_notebook_id, - 'updated_at': datetime.now().isoformat() - } - with open(self.library_file, 'w') as f: - json.dump(data, f, indent=2) - except Exception as e: - print(f"❌ Error saving library: {e}") + data = { + 'notebooks': self.notebooks, + 'remote_notebooks': self.remote_notebooks, + 'active_notebook_id': self.active_notebook_id, + 'updated_at': datetime.now().isoformat(), + } + with open(self.library_file, 'w') as handle: + json.dump(data, handle, indent=2, ensure_ascii=False) def add_notebook( self, @@ -68,82 +73,43 @@ def add_notebook( topics: List[str], content_types: Optional[List[str]] = None, use_cases: Optional[List[str]] = None, - tags: Optional[List[str]] = None + tags: Optional[List[str]] = None, ) -> Dict[str, Any]: - """ - Add a new notebook to the library - - Args: - url: NotebookLM notebook URL - name: Display name for the notebook - description: What's in this notebook - topics: Topics covered - content_types: Types of content (optional) - use_cases: When to use this notebook (optional) - tags: Additional tags for organization (optional) - - Returns: - The created notebook object - """ - # Generate ID from name - notebook_id = name.lower().replace(' ', '-').replace('_', '-') - - # Check for duplicates + notebook_id = _slugify_name(name) if notebook_id in self.notebooks: raise ValueError(f"Notebook with ID '{notebook_id}' already exists") - # Create notebook object + now = datetime.now().isoformat() notebook = { 'id': notebook_id, - 'url': url, + 'url': normalize_notebooklm_url(url), 'name': name, 'description': description, 'topics': topics, 'content_types': content_types or [], 'use_cases': use_cases or [], 'tags': tags or [], - 'created_at': datetime.now().isoformat(), - 'updated_at': datetime.now().isoformat(), + 'origin': 'manual', + 'created_at': now, + 'updated_at': now, 'use_count': 0, - 'last_used': None + 'last_used': None, } - - # Add to library self.notebooks[notebook_id] = notebook - - # Set as active if it's the first notebook - if len(self.notebooks) == 1: + if len(self.notebooks) == 1 and not self.active_notebook_id: self.active_notebook_id = notebook_id - self._save_library() - print(f"✅ Added notebook: {name} ({notebook_id})") return notebook def remove_notebook(self, notebook_id: str) -> bool: - """ - Remove a notebook from the library - - Args: - notebook_id: ID of notebook to remove - - Returns: - True if removed, False if not found - """ if notebook_id in self.notebooks: del self.notebooks[notebook_id] - - # Clear active if it was removed if self.active_notebook_id == notebook_id: - self.active_notebook_id = None - # Set new active if there are other notebooks - if self.notebooks: - self.active_notebook_id = list(self.notebooks.keys())[0] - + self.active_notebook_id = next(iter(self.notebooks.keys()), None) self._save_library() print(f"✅ Removed notebook: {notebook_id}") return True - print(f"⚠️ Notebook not found: {notebook_id}") return False @@ -156,24 +122,12 @@ def update_notebook( content_types: Optional[List[str]] = None, use_cases: Optional[List[str]] = None, tags: Optional[List[str]] = None, - url: Optional[str] = None + url: Optional[str] = None, ) -> Dict[str, Any]: - """ - Update notebook metadata - - Args: - notebook_id: ID of notebook to update - Other args: Fields to update (None = keep existing) - - Returns: - Updated notebook object - """ if notebook_id not in self.notebooks: raise ValueError(f"Notebook not found: {notebook_id}") notebook = self.notebooks[notebook_id] - - # Update fields if provided if name is not None: notebook['name'] = name if description is not None: @@ -187,132 +141,251 @@ def update_notebook( if tags is not None: notebook['tags'] = tags if url is not None: - notebook['url'] = url - + notebook['url'] = normalize_notebooklm_url(url) notebook['updated_at'] = datetime.now().isoformat() - self._save_library() print(f"✅ Updated notebook: {notebook['name']}") return notebook def get_notebook(self, notebook_id: str) -> Optional[Dict[str, Any]]: - """Get a specific notebook by ID""" return self.notebooks.get(notebook_id) + def get_remote_notebook(self, notebook_id: str) -> Optional[Dict[str, Any]]: + return self.remote_notebooks.get(notebook_id) + def list_notebooks(self) -> List[Dict[str, Any]]: - """List all notebooks in the library""" return list(self.notebooks.values()) - def search_notebooks(self, query: str) -> List[Dict[str, Any]]: - """ - Search notebooks by query + def list_remote(self) -> List[Dict[str, Any]]: + return list(self.remote_notebooks.values()) - Args: - query: Search query (searches name, description, topics, tags) - - Returns: - List of matching notebooks - """ + def search_notebooks(self, query: str) -> List[Dict[str, Any]]: query_lower = query.lower() results = [] - for notebook in self.notebooks.values(): - # Search in various fields searchable = [ notebook['name'].lower(), notebook['description'].lower(), ' '.join(notebook['topics']).lower(), ' '.join(notebook['tags']).lower(), - ' '.join(notebook.get('use_cases', [])).lower() + ' '.join(notebook.get('use_cases', [])).lower(), ] - if any(query_lower in field for field in searchable): results.append(notebook) - return results def select_notebook(self, notebook_id: str) -> Dict[str, Any]: - """ - Set a notebook as active - - Args: - notebook_id: ID of notebook to activate - - Returns: - The activated notebook - """ if notebook_id not in self.notebooks: raise ValueError(f"Notebook not found: {notebook_id}") - self.active_notebook_id = notebook_id self._save_library() - notebook = self.notebooks[notebook_id] print(f"✅ Activated notebook: {notebook['name']}") return notebook def get_active_notebook(self) -> Optional[Dict[str, Any]]: - """Get the currently active notebook""" if self.active_notebook_id: return self.notebooks.get(self.active_notebook_id) return None def increment_use_count(self, notebook_id: str) -> Dict[str, Any]: - """ - Increment usage counter for a notebook - - Args: - notebook_id: ID of notebook that was used - - Returns: - Updated notebook - """ if notebook_id not in self.notebooks: raise ValueError(f"Notebook not found: {notebook_id}") - notebook = self.notebooks[notebook_id] notebook['use_count'] += 1 notebook['last_used'] = datetime.now().isoformat() - self._save_library() return notebook + def sync_remote_notebooks(self, notebooks: List[Dict[str, Any]]) -> Dict[str, Any]: + synced_at = datetime.now().isoformat() + previous_ids = set(self.remote_notebooks.keys()) + incoming_ids = set() + for notebook in notebooks: + notebook_id = notebook['id'] + incoming_ids.add(notebook_id) + previous = self.remote_notebooks.get(notebook_id, {}) + created_at = previous.get('created_at') or synced_at + merged = { + **previous, + **notebook, + 'id': notebook_id, + 'origin': 'remote-sync', + 'created_at': created_at, + 'last_synced_at': synced_at, + } + self.remote_notebooks[notebook_id] = merged + + removed_ids = sorted(previous_ids - incoming_ids) + for notebook_id in removed_ids: + self.remote_notebooks[notebook_id]['remote_missing_at'] = synced_at + self.remote_notebooks[notebook_id]['remote_present'] = False + for notebook_id in incoming_ids: + self.remote_notebooks[notebook_id]['remote_present'] = True + + self._save_library() + return { + 'status': 'ok', + 'synced_at': synced_at, + 'count': len(notebooks), + 'new_ids': sorted(incoming_ids - previous_ids), + 'removed_ids': removed_ids, + 'notebooks': notebooks, + } + + def record_remote_notebook(self, notebook: Dict[str, Any], *, source: str = 'created') -> Dict[str, Any]: + notebook_id = notebook['id'] + now = datetime.now().isoformat() + previous = self.remote_notebooks.get(notebook_id, {}) + merged = { + **previous, + **notebook, + 'id': notebook_id, + 'origin': source, + 'created_at': previous.get('created_at') or now, + 'last_synced_at': now, + 'remote_present': True, + } + self.remote_notebooks[notebook_id] = merged + self._save_library() + return merged + + def resolve_notebook_url(self, notebook_id: Optional[str], notebook_url: Optional[str]) -> Optional[str]: + if notebook_url: + return normalize_notebooklm_url(notebook_url) + if notebook_id: + manual = self.get_notebook(notebook_id) + if manual: + return manual['url'] + remote = self.get_remote_notebook(notebook_id) + if remote: + return remote['url'] + active = self.get_active_notebook() + if active: + return active['url'] + return None + def get_stats(self) -> Dict[str, Any]: - """Get library statistics""" - total_notebooks = len(self.notebooks) total_topics = set() total_use_count = 0 - for notebook in self.notebooks.values(): total_topics.update(notebook['topics']) total_use_count += notebook['use_count'] - # Find most used most_used = None if self.notebooks: - most_used = max( - self.notebooks.values(), - key=lambda n: n['use_count'] - ) + most_used = max(self.notebooks.values(), key=lambda item: item['use_count']) return { - 'total_notebooks': total_notebooks, + 'total_notebooks': len(self.notebooks), + 'total_remote_notebooks': len(self.remote_notebooks), 'total_topics': len(total_topics), 'total_use_count': total_use_count, 'active_notebook': self.get_active_notebook(), 'most_used_notebook': most_used, - 'library_path': str(self.library_file) + 'library_path': str(self.library_file), } -def main(): - """Command-line interface for notebook management""" - parser = argparse.ArgumentParser(description='Manage NotebookLM library') +def _slugify_name(name: str) -> str: + slug = re.sub(r'[^a-z0-9]+', '-', name.lower()).strip('-') + return slug or 'notebook' + + + +def _split_csv(value: Optional[str]) -> Optional[List[str]]: + if not value: + return None + return [item.strip() for item in value.split(',') if item.strip()] + + + +def _remote_entry_from_created(result: Dict[str, Any]) -> Dict[str, Any]: + notebook_url = normalize_notebooklm_url(result['url']) + notebook_id = notebook_url.rstrip('/').split('/notebook/')[-1].split('?', 1)[0] + return { + 'id': notebook_id, + 'url': notebook_url, + 'title': result['title'], + 'subtitle': None, + 'visible_description': None, + 'visible_updated_at': None, + } + + + +def _print_notebooks(notebooks: List[Dict[str, Any]], *, active_id: Optional[str] = None, remote: bool = False): + if not notebooks: + if remote: + print("📚 No remote notebooks recorded") + else: + print("📚 Library is empty. Add notebooks with: notebook_manager.py add") + return + + label = "Remote notebooks" if remote else "Notebook Library" + print(f"\n📚 {label}:") + for notebook in notebooks: + notebook_id = notebook.get('id') + name = notebook.get('name') or notebook.get('title') + active = " [ACTIVE]" if notebook_id == active_id else "" + print(f"\n 📓 {name}{active}") + print(f" ID: {notebook_id}") + print(f" URL: {notebook.get('url')}") + subtitle = notebook.get('subtitle') or notebook.get('visible_updated_at') + if subtitle: + print(f" Summary: {subtitle}") + description = notebook.get('description') or notebook.get('visible_description') + if description: + print(f" Description: {description}") + if not remote: + print(f" Topics: {', '.join(notebook.get('topics', []))}") + print(f" Uses: {notebook.get('use_count', 0)}") + + + +def _run_browser_task(headless: bool, task): + playwright = None + context = None + try: + playwright = sync_playwright().start() + context = BrowserFactory.launch_persistent_context(playwright, headless=headless) + page = context.new_page() + return task(page) + finally: + if context: + try: + context.close() + except Exception: + pass + if playwright: + try: + playwright.stop() + except Exception: + pass + + + +def _handle_ui_error(error: NotebookUIError, preferred_email: Optional[str]) -> int: + details = error.details or {} + if details.get('status'): + for line in explain_navigation_failure(details, preferred_email=preferred_email): + print(line) + else: + print(f"❌ {error}") + if details.get('body_excerpt'): + print(" Body excerpt:") + for line in details['body_excerpt'].splitlines()[:12]: + print(f" {line}") + return 1 + + + +def main(): + parser = argparse.ArgumentParser(description='Manage NotebookLM library and remote notebooks') subparsers = parser.add_subparsers(dest='command', help='Commands') - # Add command - add_parser = subparsers.add_parser('add', help='Add a notebook') + add_parser = subparsers.add_parser('add', help='Add a notebook to the manual library') add_parser.add_argument('--url', required=True, help='NotebookLM URL') add_parser.add_argument('--name', required=True, help='Display name') add_parser.add_argument('--description', required=True, help='Description') @@ -320,91 +393,191 @@ def main(): add_parser.add_argument('--use-cases', help='Comma-separated use cases') add_parser.add_argument('--tags', help='Comma-separated tags') - # List command - subparsers.add_parser('list', help='List all notebooks') + subparsers.add_parser('list', help='List manual notebooks') - # Search command - search_parser = subparsers.add_parser('search', help='Search notebooks') + search_parser = subparsers.add_parser('search', help='Search manual notebooks') search_parser.add_argument('--query', required=True, help='Search query') - # Activate command - activate_parser = subparsers.add_parser('activate', help='Set active notebook') + activate_parser = subparsers.add_parser('activate', help='Set active manual notebook') activate_parser.add_argument('--id', required=True, help='Notebook ID') - # Remove command - remove_parser = subparsers.add_parser('remove', help='Remove a notebook') + remove_parser = subparsers.add_parser('remove', help='Remove a manual notebook') remove_parser.add_argument('--id', required=True, help='Notebook ID') - # Stats command subparsers.add_parser('stats', help='Show library statistics') + subparsers.add_parser('list-remote', help='Fetch and print accessible remote notebooks without saving') + + sync_parser = subparsers.add_parser('sync-remote', help='Fetch remote notebooks and save them into library.json') + sync_parser.add_argument('--show-browser', action='store_true', help='Show browser during sync') + + create_parser = subparsers.add_parser('create', help='Create a new NotebookLM notebook') + create_parser.add_argument('--name', help='Notebook title to set after creation') + create_parser.add_argument('--show-browser', action='store_true', help='Show browser during creation') + + add_source_parser = subparsers.add_parser('add-source', help='Add a source to a notebook') + add_source_parser.add_argument('--notebook-url', help='NotebookLM notebook URL') + add_source_parser.add_argument('--notebook-id', help='Notebook ID from manual or remote library') + add_source_group = add_source_parser.add_mutually_exclusive_group(required=True) + add_source_group.add_argument('--source-url', help='Source URL to import') + add_source_group.add_argument('--file', help='Local file path to upload') + add_source_parser.add_argument('--show-browser', action='store_true', help='Show browser during source import') + + discover_parser = subparsers.add_parser('discover-sources', help='Use Fast Research or Deep Research to discover sources in a notebook') + discover_parser.add_argument('--notebook-url', help='NotebookLM notebook URL') + discover_parser.add_argument('--notebook-id', help='Notebook ID from manual or remote library') + discover_parser.add_argument('--query', required=True, help='Research query to run inside NotebookLM Sources') + discover_parser.add_argument('--mode', choices=['fast-research', 'deep-research'], default='fast-research', help='Research mode') + discover_parser.add_argument('--no-import', action='store_true', help='Do not auto-import discovered sources') + discover_parser.add_argument('--show-browser', action='store_true', help='Show browser during research') args = parser.parse_args() - - # Initialize library library = NotebookLibrary() - # Execute command if args.command == 'add': - topics = [t.strip() for t in args.topics.split(',')] - use_cases = [u.strip() for u in args.use_cases.split(',')] if args.use_cases else None - tags = [t.strip() for t in args.tags.split(',')] if args.tags else None - notebook = library.add_notebook( url=args.url, name=args.name, description=args.description, - topics=topics, - use_cases=use_cases, - tags=tags + topics=[item.strip() for item in args.topics.split(',') if item.strip()], + use_cases=_split_csv(args.use_cases), + tags=_split_csv(args.tags), ) - print(json.dumps(notebook, indent=2)) - - elif args.command == 'list': - notebooks = library.list_notebooks() - if notebooks: - print("\n📚 Notebook Library:") - for notebook in notebooks: - active = " [ACTIVE]" if notebook['id'] == library.active_notebook_id else "" - print(f"\n 📓 {notebook['name']}{active}") - print(f" ID: {notebook['id']}") - print(f" Topics: {', '.join(notebook['topics'])}") - print(f" Uses: {notebook['use_count']}") - else: - print("📚 Library is empty. Add notebooks with: notebook_manager.py add") + print(json.dumps(notebook, indent=2, ensure_ascii=False)) + return 0 + + if args.command == 'list': + _print_notebooks(library.list_notebooks(), active_id=library.active_notebook_id) + return 0 - elif args.command == 'search': + if args.command == 'search': results = library.search_notebooks(args.query) if results: - print(f"\n🔍 Found {len(results)} notebooks:") - for notebook in results: - print(f"\n 📓 {notebook['name']} ({notebook['id']})") - print(f" {notebook['description']}") + _print_notebooks(results, active_id=library.active_notebook_id) else: print(f"🔍 No notebooks found for: {args.query}") + return 0 - elif args.command == 'activate': + if args.command == 'activate': notebook = library.select_notebook(args.id) print(f"Now using: {notebook['name']}") + return 0 - elif args.command == 'remove': + if args.command == 'remove': if library.remove_notebook(args.id): print("Notebook removed from library") + return 0 + return 1 - elif args.command == 'stats': + if args.command == 'stats': stats = library.get_stats() print("\n📊 Library Statistics:") - print(f" Total notebooks: {stats['total_notebooks']}") + print(f" Manual notebooks: {stats['total_notebooks']}") + print(f" Remote notebooks: {stats['total_remote_notebooks']}") print(f" Total topics: {stats['total_topics']}") print(f" Total uses: {stats['total_use_count']}") if stats['active_notebook']: print(f" Active: {stats['active_notebook']['name']}") if stats['most_used_notebook']: - print(f" Most used: {stats['most_used_notebook']['name']} ({stats['most_used_notebook']['use_count']} uses)") + print( + f" Most used: {stats['most_used_notebook']['name']} " + f"({stats['most_used_notebook']['use_count']} uses)" + ) print(f" Library path: {stats['library_path']}") + return 0 - else: - parser.print_help() + auth = AuthManager() + if not auth.is_authenticated(): + print("❌ NotebookLM authentication is missing. Run: python scripts/run.py auth_manager.py setup") + return 1 + + auth_info = auth.get_auth_info() + preferred_email = os.getenv(NOTEBOOKLM_GOOGLE_ACCOUNT_ENV) or auth_info.get('google_account_email') + + if args.command == 'list-remote': + try: + notebooks = _run_browser_task(True, lambda page: list_remote_notebooks(page, preferred_email=preferred_email)) + _print_notebooks(notebooks, remote=True) + print(json.dumps(notebooks, indent=2, ensure_ascii=False)) + return 0 + except NotebookUIError as error: + return _handle_ui_error(error, preferred_email) + + if args.command == 'sync-remote': + try: + notebooks = _run_browser_task(not args.show_browser, lambda page: list_remote_notebooks(page, preferred_email=preferred_email)) + result = library.sync_remote_notebooks(notebooks) + print(f"✅ Synced {result['count']} remote notebooks") + if result['new_ids']: + print(f" New IDs: {', '.join(result['new_ids'])}") + if result['removed_ids']: + print(f" Missing remotely now: {', '.join(result['removed_ids'])}") + print(json.dumps(result, indent=2, ensure_ascii=False)) + return 0 + except NotebookUIError as error: + return _handle_ui_error(error, preferred_email) + + if args.command == 'create': + try: + result = _run_browser_task( + not args.show_browser, + lambda page: create_notebook(page, name=args.name, preferred_email=preferred_email), + ) + library.record_remote_notebook(_remote_entry_from_created(result), source='created') + print("✅ Created notebook") + print(json.dumps(result, indent=2, ensure_ascii=False)) + return 0 + except NotebookUIError as error: + return _handle_ui_error(error, preferred_email) + + if args.command == 'add-source': + notebook_url = library.resolve_notebook_url(args.notebook_id, args.notebook_url) + if not notebook_url: + print("❌ Notebook URL not resolved. Pass --notebook-url or --notebook-id, or set an active notebook.") + return 1 + try: + result = _run_browser_task( + not args.show_browser, + lambda page: add_source( + page, + notebook_url, + source_url=args.source_url, + file_path=args.file, + preferred_email=preferred_email, + ), + ) + print("✅ Added source") + print(json.dumps(result, indent=2, ensure_ascii=False)) + return 0 + except NotebookUIError as error: + return _handle_ui_error(error, preferred_email) + + if args.command == 'discover-sources': + notebook_url = library.resolve_notebook_url(args.notebook_id, args.notebook_url) + if not notebook_url: + print("❌ Notebook URL not resolved. Pass --notebook-url or --notebook-id, or set an active notebook.") + return 1 + try: + result = _run_browser_task( + not args.show_browser, + lambda page: discover_sources( + page, + notebook_url, + query=args.query, + mode=args.mode, + auto_import=not args.no_import, + preferred_email=preferred_email, + progress_callback=lambda message: print(f" ⏳ {message}"), + ), + ) + print("✅ Discover sources completed") + print(json.dumps(result, indent=2, ensure_ascii=False)) + return 0 + except NotebookUIError as error: + return _handle_ui_error(error, preferred_email) + + parser.print_help() + return 0 if __name__ == "__main__": - main() \ No newline at end of file + sys.exit(main()) diff --git a/scripts/notebook_navigation.py b/scripts/notebook_navigation.py new file mode 100644 index 0000000..e9a4d74 --- /dev/null +++ b/scripts/notebook_navigation.py @@ -0,0 +1,427 @@ +#!/usr/bin/env python3 +""" +Navigation helpers for NotebookLM pages. +""" + +import re +import time +from typing import Any, Dict, List, Optional +from urllib.parse import urlparse + +from patchright.sync_api import Page + +from config import ( + HOME_READY_SELECTORS, + NOTEBOOKLM_APP_URL, + NOTEBOOKLM_PUBLIC_URL, + NOTEBOOK_READY_SELECTORS, +) + + +ACCOUNT_TILE_SELECTORS = [ + "[data-identifier]", + "li [data-identifier]", + "div[data-identifier]", +] + +PASSWORD_CHALLENGE_PATTERNS = [ + "signin/challenge/pwd", + "challenge/pwd", + "challenge/password", +] + + +def normalize_notebooklm_url(url: str) -> str: + """Normalize NotebookLM URLs while preserving the app host for notebook deep links.""" + normalized = url.rstrip("/") + if normalized.startswith("https://notebooklm.google.com"): + return normalized + + if is_notebook_url(normalized) and normalized.startswith("https://notebooklm.google"): + return normalized.replace("https://notebooklm.google", NOTEBOOKLM_APP_URL, 1) + + if normalized.startswith("https://notebooklm.google"): + return normalized.replace("https://notebooklm.google", NOTEBOOKLM_PUBLIC_URL, 1) + + return normalized + + +def is_google_auth_url(url: str) -> bool: + return "accounts.google.com" in (url or "") + + +def is_account_chooser_url(url: str) -> bool: + lower_url = (url or "").lower() + return is_google_auth_url(lower_url) and "accountchooser" in lower_url + + +def is_password_challenge_url(url: str) -> bool: + lower_url = (url or "").lower() + return is_google_auth_url(lower_url) and any(pattern in lower_url for pattern in PASSWORD_CHALLENGE_PATTERNS) + + +def is_notebook_url(url: str) -> bool: + return "/notebook/" in (url or "") + + +def is_notebooklm_app_url(url: str) -> bool: + return (url or "").startswith(NOTEBOOKLM_APP_URL) + + +def is_notebooklm_home_url(url: str) -> bool: + parsed = urlparse(url or "") + if parsed.netloc not in {"notebooklm.google.com", "notebooklm.google"}: + return False + path = parsed.path.rstrip("/") + return path in {"", "/"} + + +def list_account_choices(page: Page) -> List[str]: + """Return unique Google account emails shown on an account chooser page.""" + seen: List[str] = [] + + for selector in ACCOUNT_TILE_SELECTORS: + try: + locators = page.locator(selector) + count = locators.count() + except Exception: + continue + + for index in range(count): + try: + value = (locators.nth(index).get_attribute("data-identifier") or "").strip() + except Exception: + continue + if value and value not in seen: + seen.append(value) + + if seen: + break + + if seen: + return seen + + try: + body_text = page.locator("body").inner_text(timeout=3000) + except Exception: + body_text = "" + + for match in re.findall(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", body_text): + if match not in seen: + seen.append(match) + + return seen + + +def _click_account_tile(page: Page, email: str) -> bool: + """Click the Google account chooser tile for the provided email.""" + selectors = [ + f'[data-identifier="{email}"]', + f'li [data-identifier="{email}"]', + f'div[data-identifier="{email}"]', + f'text="{email}"', + ] + + for selector in selectors: + try: + locator = page.locator(selector).first + if locator.count() == 0: + continue + locator.click(timeout=3000) + return True + except Exception: + try: + locator.evaluate("el => el.click()") + return True + except Exception: + continue + + return False + + +def maybe_choose_google_account( + page: Page, + preferred_email: Optional[str] = None, + timeout_ms: int = 10000, +) -> Dict[str, Any]: + """ + Auto-select a Google account on account chooser pages when possible. + """ + if not is_account_chooser_url(page.url): + return {"status": "not-needed"} + + deadline = time.time() + timeout_ms / 1000 + emails: List[str] = [] + + while time.time() < deadline: + emails = list_account_choices(page) + if emails: + break + try: + page.wait_for_timeout(250) + except Exception: + time.sleep(0.25) + + if not emails: + return { + "status": "no-accounts-found", + "current_url": page.url, + } + + selected_email: Optional[str] = None + if preferred_email: + preferred_lower = preferred_email.lower() + for email in emails: + if email.lower() == preferred_lower: + selected_email = email + break + if not selected_email: + return { + "status": "preferred-account-missing", + "emails": emails, + "preferred_email": preferred_email, + "current_url": page.url, + } + elif len(emails) == 1: + selected_email = emails[0] + else: + return { + "status": "multiple-accounts", + "emails": emails, + "current_url": page.url, + } + + if not _click_account_tile(page, selected_email): + return { + "status": "account-click-failed", + "email": selected_email, + "emails": emails, + "current_url": page.url, + } + + deadline = time.time() + timeout_ms / 1000 + while time.time() < deadline: + current_url = page.url.rstrip("/") + if is_password_challenge_url(current_url): + return { + "status": "password-challenge", + "email": selected_email, + "emails": emails, + "current_url": current_url, + } + if not is_account_chooser_url(current_url): + return { + "status": "selected", + "email": selected_email, + "emails": emails, + "current_url": current_url, + } + try: + page.wait_for_timeout(250) + except Exception: + time.sleep(0.25) + + return { + "status": "account-selection-timeout", + "email": selected_email, + "emails": emails, + "current_url": page.url, + } + + +def wait_for_url_to_settle(page: Page, settle_ms: int = 1500, timeout_ms: int = 6000) -> str: + """Wait for SPA redirects to settle before classifying the page.""" + last_url = page.url.rstrip("/") + stable_since = time.time() + deadline = time.time() + timeout_ms / 1000 + + while time.time() < deadline: + try: + page.wait_for_timeout(250) + except Exception: + time.sleep(0.25) + + current_url = page.url.rstrip("/") + if current_url != last_url: + last_url = current_url + stable_since = time.time() + continue + + if (time.time() - stable_since) * 1000 >= settle_ms: + break + + return page.url.rstrip("/") + + +def wait_for_any_selector(page: Page, selectors: List[str], timeout_ms: int = 10000) -> Optional[str]: + """Wait until any selector becomes visible and return the selector.""" + deadline = time.time() + timeout_ms / 1000 + while time.time() < deadline: + for selector in selectors: + try: + locator = page.locator(selector).first + if locator.count() and locator.is_visible(): + return selector + except Exception: + continue + try: + page.wait_for_timeout(250) + except Exception: + time.sleep(0.25) + return None + + +def classify_notebooklm_state(page: Page) -> Dict[str, Any]: + """Classify the current page into NotebookLM, auth, or redirect states.""" + current_url = wait_for_url_to_settle(page) + if is_password_challenge_url(current_url): + return {"status": "password-challenge", "current_url": current_url} + if is_account_chooser_url(current_url): + return {"status": "account-chooser", "current_url": current_url} + if is_google_auth_url(current_url): + return {"status": "google-login", "current_url": current_url} + if is_notebooklm_app_url(current_url): + return {"status": "ok", "current_url": current_url} + return {"status": "unexpected-url", "current_url": current_url} + + +def open_home_page( + page: Page, + preferred_email: Optional[str] = None, + timeout_ms: int = 30000, +) -> Dict[str, Any]: + """Open NotebookLM home/list page and classify auth failures precisely.""" + page.goto(NOTEBOOKLM_APP_URL, wait_until="domcontentloaded", timeout=timeout_ms) + + chooser_result = maybe_choose_google_account(page, preferred_email=preferred_email) + if chooser_result["status"] not in {"not-needed", "selected"}: + chooser_result.setdefault("target_url", NOTEBOOKLM_APP_URL) + return chooser_result + + classified = classify_notebooklm_state(page) + if classified["status"] != "ok": + classified["target_url"] = NOTEBOOKLM_APP_URL + return classified + + if not wait_for_any_selector(page, HOME_READY_SELECTORS, timeout_ms=timeout_ms): + return { + "status": "home-not-ready", + "current_url": page.url.rstrip("/"), + "target_url": NOTEBOOKLM_APP_URL, + } + + return { + "status": "ok", + "current_url": page.url.rstrip("/"), + "target_url": NOTEBOOKLM_APP_URL, + } + + +def open_notebook_page( + page: Page, + notebook_url: str, + preferred_email: Optional[str] = None, + timeout_ms: int = 30000, +) -> Dict[str, Any]: + """ + Navigate to a NotebookLM notebook and classify failures precisely. + """ + target_url = normalize_notebooklm_url(notebook_url) + + for attempt in range(2): + if attempt == 1 and is_notebook_url(target_url): + page.goto(NOTEBOOKLM_APP_URL, wait_until="domcontentloaded", timeout=timeout_ms) + try: + page.wait_for_timeout(1000) + except Exception: + time.sleep(1) + + page.goto(target_url, wait_until="domcontentloaded", timeout=timeout_ms) + + chooser_result = maybe_choose_google_account(page, preferred_email=preferred_email) + if chooser_result["status"] not in {"not-needed", "selected"}: + chooser_result.setdefault("target_url", target_url) + return chooser_result + + classified = classify_notebooklm_state(page) + current_url = classified["current_url"] + if classified["status"] in {"google-login", "password-challenge", "account-chooser", "unexpected-url"}: + classified["target_url"] = target_url + return classified + + if is_notebook_url(target_url) and not is_notebook_url(current_url): + if attempt == 0: + continue + return { + "status": "home-redirect", + "current_url": current_url, + "target_url": target_url, + } + + if not wait_for_any_selector(page, NOTEBOOK_READY_SELECTORS, timeout_ms=timeout_ms): + return { + "status": "notebook-not-ready", + "current_url": current_url, + "target_url": target_url, + } + + return { + "status": "ok", + "current_url": current_url, + "target_url": target_url, + } + + return { + "status": "unknown", + "current_url": page.url.rstrip("/"), + "target_url": target_url, + } + + +def explain_navigation_failure(navigation: Dict[str, Any], preferred_email: Optional[str] = None) -> List[str]: + """Return user-facing failure lines for common NotebookLM navigation outcomes.""" + status = navigation.get("status") + lines: List[str] = [] + + if status == "google-login": + lines.append(" ❌ NotebookLM redirected to the Google login page") + lines.append(" Current browser profile is not fully signed in to NotebookLM") + elif status == "password-challenge": + lines.append(" ❌ Google sent the browser to a password challenge page") + if navigation.get("email"): + lines.append(f" Google account: {navigation['email']}") + elif status == "home-redirect": + lines.append(" ❌ Notebook URL redirected to the NotebookLM home page") + lines.append(" Likely cause: wrong Google account for this notebook, or the notebook is not shared to this account") + if preferred_email: + lines.append(f" Current Google account: {preferred_email}") + elif status == "multiple-accounts": + accounts = ", ".join(navigation.get("emails", [])) + lines.append(" ❌ Google showed an account chooser with multiple accounts") + if accounts: + lines.append(f" Available accounts: {accounts}") + elif status == "preferred-account-missing": + accounts = ", ".join(navigation.get("emails", [])) + lines.append(f" ❌ Preferred Google account not found: {navigation.get('preferred_email')}") + if accounts: + lines.append(f" Available accounts: {accounts}") + elif status == "account-click-failed": + lines.append(f" ❌ Failed to click Google account tile: {navigation.get('email')}") + elif status == "account-selection-timeout": + lines.append(f" ❌ Timed out after selecting Google account: {navigation.get('email')}") + elif status == "no-accounts-found": + lines.append(" ❌ Google account chooser opened, but no account tiles were detected") + elif status == "home-not-ready": + lines.append(" ❌ Entered NotebookLM, but the home page never became interactive") + elif status == "notebook-not-ready": + lines.append(" ❌ Entered the notebook URL, but the notebook shell never became interactive") + elif status == "account-chooser": + lines.append(" ❌ NotebookLM remained on the Google account chooser page") + elif status == "unexpected-url": + lines.append(" ❌ NotebookLM ended on an unexpected URL") + else: + lines.append(f" ❌ Failed to open NotebookLM ({status})") + + if navigation.get("current_url"): + lines.append(f" Final URL: {navigation['current_url']}") + return lines diff --git a/scripts/notebook_ui.py b/scripts/notebook_ui.py new file mode 100644 index 0000000..5e91921 --- /dev/null +++ b/scripts/notebook_ui.py @@ -0,0 +1,1123 @@ +#!/usr/bin/env python3 +""" +UI automation helpers for NotebookLM notebook discovery and management. +""" + +import os +import re +import tempfile +import time +import urllib.request +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional + +from patchright.sync_api import Page + +from config import ( + CREATE_NOTEBOOK_BUTTON_SELECTORS, + HOME_READY_SELECTORS, + NOTEBOOKLM_APP_URL, + NOTEBOOKLM_PROXY_BYPASS_ENV, + NOTEBOOKLM_PROXY_URL_ENV, + SOURCE_FILE_INPUT_SELECTORS, + SOURCE_PANEL_READY_SELECTORS, + SOURCE_UPLOAD_TIMEOUT_SECONDS, + SOURCE_URL_TEXTAREA_SELECTORS, + TITLE_INPUT_SELECTORS, + UI_MEDIUM_TIMEOUT_MS, +) +from notebook_navigation import open_home_page, open_notebook_page, wait_for_any_selector, wait_for_url_to_settle + + +REMOTE_NOTEBOOK_EXTRACTOR = """ +(elements, appUrl) => { + const clean = value => (value || '').replace(/\\s+/g, ' ').trim(); + return elements.map((element) => { + const titleElement = element.querySelector('.project-button-title'); + const title = clean(titleElement?.textContent || ''); + const titleId = titleElement?.id || ''; + const idMatch = titleId.match(/^project-(.+)-title$/); + const notebookId = idMatch ? idMatch[1] : null; + const emoji = clean(element.querySelector('.project-button-box-icon')?.textContent || ''); + const subtitleElement = element.querySelector('.project-button-subtitle'); + const subtitle = clean(subtitleElement?.textContent || ''); + const lines = (element.innerText || '').split(/\\n+/).map(clean).filter(Boolean); + const filtered = lines.filter(line => !['more_vert', emoji, title].includes(line)); + const fallbackSubtitle = subtitle || (filtered.length ? filtered[filtered.length - 1] : ''); + const descriptionLines = filtered.filter(line => line !== fallbackSubtitle); + return { + id: notebookId, + title, + url: notebookId ? `${appUrl}/notebook/${notebookId}` : null, + subtitle: fallbackSubtitle || null, + visible_description: descriptionLines.length ? descriptionLines.join(' | ') : null, + visible_updated_at: fallbackSubtitle && fallbackSubtitle.includes('·') ? clean(fallbackSubtitle.split('·')[0]) : null, + emoji: emoji || null, + }; + }).filter(item => item.id && item.title); +} +""" + + +SOURCE_ERROR_PATTERNS = [ + r"upload failed", + r"failed to import", + r"couldn.?t import", + r"can.?t import", + r"unsupported", + r"try again", + r"not available", + r"unable to", +] + +RESEARCH_QUERY_TEXTAREA_SELECTORS = [ + 'textarea[aria-label="Discover sources based on the inputted query"]', + 'textarea.query-box-textarea', +] + +RESEARCH_MODE_BUTTON_SELECTORS = [ + 'button.researcher-menu-trigger', +] + +RESEARCH_SUBMIT_BUTTON_SELECTORS = [ + 'button.actions-enter-button-active', + 'button[aria-label="Submit"]', +] + +RESEARCH_MODE_LABELS = { + 'fast-research': 'Fast Research', + 'deep-research': 'Deep Research', +} + +RESEARCH_TIMEOUT_SECONDS = 900 +RESEARCH_IMPORT_TIMEOUT_SECONDS = 180 +RESEARCH_COMPLETION_TOKENS = { + 'view', + 'delete', + 'import', + 'thumb_up', + 'thumb_down', + 'link', + 'docs', + 'stop', + 'close', + 'add', + 'drive_pdf', + 'folder_copy', +} + + +class NotebookUIError(RuntimeError): + """Structured UI automation error.""" + + def __init__(self, code: str, message: str, *, details: Optional[Dict[str, Any]] = None): + super().__init__(message) + self.code = code + self.details = details or {} + + + +def list_remote_notebooks(page: Page, preferred_email: Optional[str] = None) -> List[Dict[str, Any]]: + navigation = open_home_page(page, preferred_email=preferred_email) + if navigation["status"] != "ok": + raise NotebookUIError(navigation["status"], "Failed to open NotebookLM home page", details=navigation) + + _load_all_notebook_cards(page) + cards = page.locator("div.my-projects-container project-button") + return cards.evaluate_all(REMOTE_NOTEBOOK_EXTRACTOR, NOTEBOOKLM_APP_URL) + + + +def create_notebook(page: Page, name: Optional[str] = None, preferred_email: Optional[str] = None) -> Dict[str, Any]: + navigation = open_home_page(page, preferred_email=preferred_email) + if navigation["status"] != "ok": + raise NotebookUIError(navigation["status"], "Failed to open NotebookLM home page", details=navigation) + + selector = wait_for_any_selector(page, CREATE_NOTEBOOK_BUTTON_SELECTORS, timeout_ms=UI_MEDIUM_TIMEOUT_MS) + if not selector: + raise NotebookUIError("create-button-missing", "NotebookLM did not expose a create notebook action") + + create_button = page.locator(selector).first + create_button.click(timeout=UI_MEDIUM_TIMEOUT_MS) + + _wait_for_notebook_url(page) + title = _read_current_notebook_title(page) + rename_status = None + if name: + rename_status = rename_current_notebook(page, name) + title = rename_status["title"] + + result = { + "status": "ok", + "url": wait_for_url_to_settle(page), + "title": title, + } + if rename_status and rename_status.get("status") != "ok": + result["rename_status"] = rename_status + return result + + + +def rename_current_notebook(page: Page, name: str) -> Dict[str, Any]: + input_locator = _find_first_visible(page, TITLE_INPUT_SELECTORS) + if not input_locator: + return { + "status": "rename-unavailable", + "title": _read_current_notebook_title(page), + "message": "Notebook title input is not editable in the current UI", + } + + input_locator.fill(name, timeout=UI_MEDIUM_TIMEOUT_MS) + try: + input_locator.press("Enter") + except Exception: + pass + + deadline = time.time() + 10 + while time.time() < deadline: + try: + value = input_locator.input_value(timeout=1000).strip() + except Exception: + value = "" + if value == name: + return { + "status": "ok", + "title": value, + } + page.wait_for_timeout(250) + + return { + "status": "rename-timeout", + "title": _read_current_notebook_title(page), + "message": "Notebook title did not settle to the requested value", + } + + + +def add_source( + page: Page, + notebook_url: str, + *, + source_url: Optional[str] = None, + file_path: Optional[str] = None, + preferred_email: Optional[str] = None, +) -> Dict[str, Any]: + navigation = open_notebook_page(page, notebook_url, preferred_email=preferred_email) + if navigation["status"] != "ok": + raise NotebookUIError(navigation["status"], "Failed to open NotebookLM notebook", details=navigation) + + baseline_count = get_source_count(page) + _ensure_source_panel_open(page) + + temporary_file: Optional[Path] = None + source_kind = None + try: + if file_path: + upload_target = Path(file_path).expanduser().resolve() + if not upload_target.exists() or not upload_target.is_file(): + raise NotebookUIError("file-not-found", f"Local file not found: {upload_target}") + source_kind = _classify_file_source(upload_target) + _upload_local_file(page, upload_target) + elif source_url: + if _looks_like_pdf_url(source_url): + temporary_file = _download_pdf(source_url) + source_kind = "pdf-url" + _upload_local_file(page, temporary_file) + else: + source_kind = _classify_url_source(source_url) + _insert_source_url(page, source_url) + else: + raise NotebookUIError("missing-source", "Either source_url or file_path is required") + + final_count = _wait_for_source_count_increase(page, baseline_count) + return { + "status": "ok", + "notebook_url": wait_for_url_to_settle(page), + "source_kind": source_kind, + "source_count_before": baseline_count, + "source_count_after": final_count, + "title": _read_current_notebook_title(page), + } + finally: + if temporary_file and temporary_file.exists(): + try: + temporary_file.unlink() + except Exception: + pass + + + +def discover_sources( + page: Page, + notebook_url: str, + *, + query: str, + mode: str = "fast-research", + auto_import: bool = True, + preferred_email: Optional[str] = None, + progress_callback: Optional[Callable[[str], None]] = None, +) -> Dict[str, Any]: + normalized_mode = (mode or "fast-research").strip().lower() + if normalized_mode not in RESEARCH_MODE_LABELS: + raise NotebookUIError("research-mode-invalid", f"Unsupported research mode: {mode}") + + navigation = open_notebook_page(page, notebook_url, preferred_email=preferred_email) + if navigation["status"] != "ok": + raise NotebookUIError(navigation["status"], "Failed to open NotebookLM notebook", details=navigation) + + baseline_count = get_source_count(page) + _ensure_source_panel_open(page) + _ensure_research_controls_ready(page) + _fill_research_query(page, query) + selected_mode = _select_research_mode(page, normalized_mode) + _submit_research_query(page) + + result = _wait_for_research_completion( + page, + requested_mode=normalized_mode, + progress_callback=progress_callback, + ) + result.update( + { + "status": "ok", + "query": query, + "mode_requested": normalized_mode, + "mode_selected": selected_mode, + "notebook_url": wait_for_url_to_settle(page), + "title": _read_current_notebook_title(page), + "source_count_before": baseline_count, + "auto_import_requested": auto_import, + } + ) + + if auto_import: + import_result = _import_research_results( + page, + notebook_url, + baseline_count, + preferred_email=preferred_email, + progress_callback=progress_callback, + ) + result["import_result"] = import_result + result["source_count_after"] = import_result["source_count_after"] + else: + result["source_count_after"] = baseline_count + + return result + + + +def get_source_count(page: Page) -> int: + body_text = _safe_body_text(page) + explicit_count = _extract_explicit_source_count(body_text) + + try: + checkboxes = page.locator("input[type='checkbox']") + count = checkboxes.count() + labels = [] + for index in range(count): + try: + aria = (checkboxes.nth(index).get_attribute("aria-label") or "").strip() + except Exception: + aria = "" + if aria and aria.lower() != "select all sources": + labels.append(aria) + checkbox_count = len(labels) + except Exception: + checkbox_count = 0 + + candidates = re.findall(r"\((\d+)\)", body_text) + candidate_count = max(int(value) for value in candidates) if candidates else 0 + + return max(explicit_count, candidate_count, checkbox_count) + + + +def _extract_explicit_source_count(body_text: str) -> int: + explicit = re.findall(r"(\d+)\s+source(?:s)?", body_text.lower()) + if explicit: + return max(int(value) for value in explicit) + return 0 + + + +def _ensure_research_controls_ready(page: Page): + query_box = _find_first_visible(page, RESEARCH_QUERY_TEXTAREA_SELECTORS) + if query_box: + return + raise NotebookUIError( + "research-controls-missing", + "NotebookLM did not expose the research query controls in the Sources view", + details={"body_excerpt": _safe_body_text(page)[:1200]}, + ) + + + +def _fill_research_query(page: Page, query: str): + query_box = _find_first_visible(page, RESEARCH_QUERY_TEXTAREA_SELECTORS) + if not query_box: + raise NotebookUIError("research-query-input-missing", "NotebookLM did not expose the research query input") + + try: + query_box.click(timeout=UI_MEDIUM_TIMEOUT_MS) + query_box.press("Control+A") + query_box.press("Backspace") + except Exception: + pass + + try: + query_box.fill("") + except Exception: + pass + + try: + query_box.type(query, delay=20) + except Exception as error: + raise NotebookUIError("research-query-input-failed", f"Failed to type research query: {error}") from error + + page.wait_for_timeout(600) + + + +def _select_research_mode(page: Page, mode: str) -> str: + label = RESEARCH_MODE_LABELS[mode] + mode_button = _find_research_mode_button(page) + if not mode_button: + raise NotebookUIError("research-mode-control-missing", "NotebookLM did not expose the Fast Research / Deep Research selector") + + current_text = _safe_locator_text(mode_button) + if label.lower() in current_text.lower(): + return mode + + try: + mode_button.click(timeout=UI_MEDIUM_TIMEOUT_MS) + except Exception as error: + raise NotebookUIError("research-mode-open-failed", f"Failed to open the research mode menu: {error}") from error + + option = page.get_by_role("menuitem", name=label).first + if option.count() == 0: + raise NotebookUIError( + f"{mode}-control-missing", + f"NotebookLM did not expose the {label} option", + details={"body_excerpt": _safe_body_text(page)[:1200]}, + ) + + try: + option.click(timeout=UI_MEDIUM_TIMEOUT_MS) + except Exception as error: + raise NotebookUIError("research-mode-select-failed", f"Failed to select {label}: {error}") from error + + deadline = time.time() + 8 + while time.time() < deadline: + current_text = _safe_locator_text(_find_research_mode_button(page)) + if label.lower() in current_text.lower(): + return mode + page.wait_for_timeout(250) + + raise NotebookUIError("research-mode-not-applied", f"NotebookLM did not switch the selector to {label}") + + + +def _submit_research_query(page: Page): + submit_button = _find_research_submit_button(page) + if not submit_button: + raise NotebookUIError("research-submit-missing", "NotebookLM did not expose an enabled research submit action") + + try: + submit_button.click(timeout=UI_MEDIUM_TIMEOUT_MS) + except Exception as error: + raise NotebookUIError("research-submit-failed", f"Failed to submit the research query: {error}") from error + + deadline = time.time() + 12 + while time.time() < deadline: + body_text = _safe_body_text(page) + progress = _classify_research_progress(body_text) + if progress["phase"] in {"running", "completed"}: + return + page.wait_for_timeout(500) + + raise NotebookUIError( + "research-submit-not-started", + "NotebookLM did not start the research workflow after query submission", + details={"body_excerpt": _safe_body_text(page)[:1200]}, + ) + + + +def _wait_for_research_completion( + page: Page, + *, + requested_mode: str, + progress_callback: Optional[Callable[[str], None]] = None, +) -> Dict[str, Any]: + deadline = time.time() + RESEARCH_TIMEOUT_SECONDS + progress_updates: List[str] = [] + last_progress_key = None + inferred_mode = requested_mode + last_body_text = "" + + while time.time() < deadline: + body_text = _safe_body_text(page) + last_body_text = body_text + progress = _classify_research_progress(body_text) + if progress.get("mode") == "deep-research": + inferred_mode = "deep-research" + elif progress.get("mode") == "fast-research" and inferred_mode != "deep-research": + inferred_mode = "fast-research" + + progress_key = progress.get("key") + progress_message = progress.get("message") + if progress_message and progress_key != last_progress_key: + progress_updates.append(progress_message) + if progress_callback: + progress_callback(progress_message) + last_progress_key = progress_key + + if progress["phase"] == "completed" or _research_results_ready(body_text): + visible_results = _extract_visible_results(body_text) + discovered_count = _extract_discovered_source_count(body_text) + more_sources_count = _extract_more_sources_count(body_text) + deep_research_details = None + if inferred_mode == "deep-research": + deep_research_details = _extract_deep_research_details(page) + if deep_research_details.get("report_title") and not visible_results: + visible_results = [deep_research_details["report_title"]] + if deep_research_details.get("cited_count") is not None: + discovered_count = deep_research_details["cited_count"] + if not visible_results and discovered_count is None and more_sources_count is None: + raise NotebookUIError( + "research-results-missing", + "NotebookLM finished the research workflow but did not expose visible source results", + details={"body_excerpt": body_text[:1600]}, + ) + result = { + "research_status": "completed", + "progress_updates": progress_updates, + "mode_observed": inferred_mode, + "completion_message": progress_message or "Research completed", + "discovered_count": discovered_count, + "more_sources_count": more_sources_count, + "visible_results": visible_results, + } + if deep_research_details: + result["deep_research_details"] = deep_research_details + return result + + if progress["phase"] == "error": + raise NotebookUIError( + progress["key"], + progress_message or "NotebookLM reported a research failure", + details={"body_excerpt": body_text[:1600]}, + ) + + page.wait_for_timeout(2000) + + raise NotebookUIError( + "research-timeout", + "NotebookLM did not finish the research workflow before timeout", + details={ + "body_excerpt": last_body_text[:1600], + "progress_updates": progress_updates, + }, + ) + + + +def _import_research_results( + page: Page, + notebook_url: str, + baseline_count: int, + *, + preferred_email: Optional[str] = None, + progress_callback: Optional[Callable[[str], None]] = None, +) -> Dict[str, Any]: + import_button = page.get_by_role("button", name="Import").first + if import_button.count() == 0 or not import_button.is_visible(): + raise NotebookUIError( + "research-import-unavailable", + "NotebookLM research results were generated, but no Import action was available", + details={"body_excerpt": _safe_body_text(page)[:1600]}, + ) + + try: + import_button.click(timeout=UI_MEDIUM_TIMEOUT_MS) + except Exception as error: + raise NotebookUIError("research-import-click-failed", f"Failed to click Import: {error}") from error + + if progress_callback: + progress_callback("Import clicked") + + deadline = time.time() + RESEARCH_IMPORT_TIMEOUT_SECONDS + stable_count = None + stable_rounds = 0 + last_body_text = "" + + while time.time() < deadline: + page.wait_for_timeout(2000) + navigation = open_notebook_page(page, notebook_url, preferred_email=preferred_email) + if navigation["status"] != "ok": + raise NotebookUIError(navigation["status"], "Failed to reopen the notebook after Import", details=navigation) + + current_count = get_source_count(page) + last_body_text = _safe_body_text(page) + if current_count > baseline_count: + if current_count == stable_count: + stable_rounds += 1 + else: + stable_count = current_count + stable_rounds = 1 + if stable_rounds >= 2: + if progress_callback: + progress_callback(f"Import persisted with {current_count} sources") + return { + "status": "ok", + "source_count_before": baseline_count, + "source_count_after": current_count, + } + + raise NotebookUIError( + "research-import-not-persisted", + "NotebookLM showed Import, but the added sources did not persist before timeout", + details={"body_excerpt": last_body_text[:1600]}, + ) + + + +def _find_research_mode_button(page: Page): + return _find_first_visible(page, RESEARCH_MODE_BUTTON_SELECTORS) + + + +def _find_research_submit_button(page: Page): + for selector in RESEARCH_SUBMIT_BUTTON_SELECTORS: + try: + locator = page.locator(selector).first + if locator.count() and locator.is_visible(): + disabled = (locator.get_attribute("disabled") or "").strip().lower() + aria_disabled = (locator.get_attribute("aria-disabled") or "").strip().lower() + classes = (locator.get_attribute("class") or "").strip().lower() + if disabled or aria_disabled == "true": + continue + if selector == 'button[aria-label="Submit"]' and 'disabled' in classes and 'active' not in classes: + continue + return locator + except Exception: + continue + return None + + + +def _classify_research_progress(body_text: str) -> Dict[str, Any]: + lower_text = body_text.lower() + + if "deep research completed!" in lower_text: + return {"phase": "completed", "key": "deep-research-completed", "message": "Deep Research completed!", "mode": "deep-research"} + if "fast research completed!" in lower_text: + return {"phase": "completed", "key": "fast-research-completed", "message": "Fast Research completed!", "mode": "fast-research"} + + step_match = re.search(r"step\s+(\d+)\s*/\s*(\d+)\s+complete", body_text, re.IGNORECASE) + if step_match: + current_step = int(step_match.group(1)) + total_steps = int(step_match.group(2)) + return { + "phase": "running", + "key": f"deep-step-{current_step}-of-{total_steps}", + "message": f"Step {current_step}/{total_steps} Complete", + "mode": "deep-research", + } + + if "planning... please stay on this page" in lower_text: + return {"phase": "running", "key": "planning-stay", "message": "Planning... Please stay on this page", "mode": "deep-research"} + if "planning... feel free to leave" in lower_text: + return {"phase": "running", "key": "planning-leave", "message": "Planning... Feel free to leave", "mode": "deep-research"} + if "researching websites..." in lower_text: + return {"phase": "running", "key": "researching-websites", "message": "Researching Websites...", "mode": None} + if "analyzing results..." in lower_text: + return {"phase": "running", "key": "analyzing-results", "message": "Analyzing Results...", "mode": "deep-research"} + + for pattern in SOURCE_ERROR_PATTERNS: + match = re.search(pattern, lower_text) + if match: + return {"phase": "error", "key": "research-ui-error", "message": f"NotebookLM reported: {match.group(0)}", "mode": None} + + return {"phase": "idle", "key": "idle", "message": None, "mode": None} + + + +def _research_results_ready(body_text: str) -> bool: + lower_text = body_text.lower() + return "import" in lower_text and ( + "sources discovered" in lower_text + or "more sources" in lower_text + or "thumb_up" in lower_text + or "thumb_down" in lower_text + or "view" in lower_text + ) + + + +def _extract_discovered_source_count(body_text: str) -> Optional[int]: + match = re.search(r"(\d+)\s+source(?:s)?\s+discovered", body_text, re.IGNORECASE) + if match: + return int(match.group(1)) + return None + + + +def _extract_more_sources_count(body_text: str) -> Optional[int]: + match = re.search(r"(\d+)\s+more\s+source(?:s)?", body_text, re.IGNORECASE) + if match: + return int(match.group(1)) + return None + + + +def _extract_visible_results(body_text: str) -> List[str]: + lines = [line.strip() for line in body_text.splitlines() if line.strip()] + start_index = None + for index, line in enumerate(lines): + lowered = line.lower() + if "completed!" in lowered or "sources discovered" in lowered: + start_index = index + 1 + break + + if start_index is None: + return [] + + results: List[str] = [] + for line in lines[start_index:]: + lowered = line.lower() + if lowered in RESEARCH_COMPLETION_TOKENS: + continue + if lowered.startswith("saved sources will appear here"): + break + if lowered.startswith("click add source above"): + break + if lowered.startswith("import or delete results before"): + break + if lowered.startswith("select all sources"): + break + if re.fullmatch(r"\d+\s+more\s+source(?:s)?", lowered): + continue + if re.fullmatch(r"\d+\s+source(?:s)?\s+discovered", lowered): + continue + if re.fullmatch(r"top\s+\d+\s+source(?:s)?", lowered): + continue + results.append(line) + if len(results) >= 12: + break + return results + + + +def _extract_deep_research_details(page: Page) -> Dict[str, Any]: + if not _open_deep_research_detailed_view(page): + return {} + + report_title = _safe_locator_text(page.locator('.deep-research-report-header-text-title').first) + report_subtitle = _safe_locator_text(page.locator('.deep-research-report-header-text-subtitle').first) + cited_count, not_cited_count = _extract_deep_research_tab_counts(page) + + details = { + "report_title": report_title or None, + "report_subtitle": report_subtitle or None, + "cited_count": cited_count, + "not_cited_count": not_cited_count, + "cited_sources": _extract_deep_research_source_entries(page, tab_name="Cited in Report"), + } + + if not_cited_count is not None and not_cited_count > 0: + details["not_cited_sources"] = _extract_deep_research_source_entries(page, tab_name="Not cited") + else: + details["not_cited_sources"] = [] + + if cited_count is not None: + try: + page.get_by_role("tab", name=re.compile(r"^Cited in Report", re.IGNORECASE)).first.click(timeout=UI_MEDIUM_TIMEOUT_MS) + page.wait_for_timeout(500) + except Exception: + pass + + return details + + + +def _open_deep_research_detailed_view(page: Page) -> bool: + container = page.locator('.deep-research-sources-container').first + if container.count() and container.is_visible(): + return True + + view_button = page.get_by_role('button', name='View sources').first + if view_button.count() == 0 or not view_button.is_visible(): + return False + + try: + view_button.click(timeout=UI_MEDIUM_TIMEOUT_MS) + except Exception as error: + raise NotebookUIError('deep-research-view-open-failed', f'Failed to open Deep Research detailed sources view: {error}') from error + + deadline = time.time() + 10 + while time.time() < deadline: + if container.count() and container.is_visible(): + return True + page.wait_for_timeout(250) + + raise NotebookUIError( + 'deep-research-view-missing', + 'Deep Research completed, but NotebookLM did not open the detailed sources view', + details={'body_excerpt': _safe_body_text(page)[:1600]}, + ) + + + +def _extract_deep_research_tab_counts(page: Page) -> tuple[Optional[int], Optional[int]]: + cited_count = None + not_cited_count = None + tabs = page.locator('.deep-research-sources-tabs-group [role="tab"]') + for index in range(tabs.count()): + text = _safe_locator_text(tabs.nth(index)) + if not text: + continue + match = re.search(r'(\d+)', text) + if 'cited in report' in text.lower() and match: + cited_count = int(match.group(1)) + elif 'not cited' in text.lower() and match: + not_cited_count = int(match.group(1)) + return cited_count, not_cited_count + + + +def _extract_deep_research_source_entries(page: Page, *, tab_name: str) -> List[Dict[str, Any]]: + tab = page.get_by_role('tab', name=re.compile(rf'^{re.escape(tab_name)}', re.IGNORECASE)).first + if tab.count() == 0: + return [] + + try: + tab.click(timeout=UI_MEDIUM_TIMEOUT_MS) + page.wait_for_timeout(800) + except Exception as error: + raise NotebookUIError('deep-research-tab-open-failed', f'Failed to open the {tab_name} tab: {error}') from error + + _scroll_deep_research_sources(page) + containers = page.locator('.deep-research-sources-container .source-container') + if containers.count() == 0: + return [] + + return containers.evaluate_all( + """ + (nodes) => nodes.map((node) => { + const clean = (value) => (value || '').replace(/\s+/g, ' ').trim(); + const title = clean(node.querySelector('.source-text-header-title')?.textContent || ''); + const subtitle = clean(node.querySelector('.source-text-subtitle')?.textContent || ''); + const header = clean(node.querySelector('.source-text-header')?.textContent || ''); + return { + title: title || header.replace(/\s*open_in_new\s*/g, '').trim() || null, + snippet: subtitle || null, + }; + }).filter(item => item.title) + """ + ) + + + +def _scroll_deep_research_sources(page: Page): + last_count = -1 + stable_rounds = 0 + deadline = time.time() + 20 + while time.time() < deadline and stable_rounds < 3: + count = page.locator('.deep-research-sources-container .source-container').count() + if count == last_count: + stable_rounds += 1 + else: + last_count = count + stable_rounds = 0 + page.mouse.wheel(0, 3000) + page.wait_for_timeout(800) + page.mouse.wheel(0, -3000) + page.wait_for_timeout(400) + + + +def _safe_locator_text(locator) -> str: + if not locator: + return "" + try: + return (locator.inner_text(timeout=1000) or "").strip() + except Exception: + return "" + + + +def _load_all_notebook_cards(page: Page): + stable_rounds = 0 + last_count = -1 + deadline = time.time() + 20 + while time.time() < deadline and stable_rounds < 3: + count = page.locator("div.my-projects-container project-button").count() + if count == last_count: + stable_rounds += 1 + else: + stable_rounds = 0 + last_count = count + + page.mouse.wheel(0, 3000) + page.wait_for_timeout(1200) + + page.mouse.wheel(0, -3000) + page.wait_for_timeout(400) + + + +def _wait_for_notebook_url(page: Page): + deadline = time.time() + 30 + while time.time() < deadline: + current_url = wait_for_url_to_settle(page, settle_ms=800, timeout_ms=1500) + if "/notebook/" in current_url: + if wait_for_any_selector(page, HOME_READY_SELECTORS + TITLE_INPUT_SELECTORS, timeout_ms=UI_MEDIUM_TIMEOUT_MS): + return + page.wait_for_timeout(250) + raise NotebookUIError("create-navigation-timeout", "NotebookLM did not navigate to the new notebook in time") + + + +def _read_current_notebook_title(page: Page) -> str: + input_locator = _find_first_visible(page, TITLE_INPUT_SELECTORS) + if input_locator: + try: + value = input_locator.input_value(timeout=1000).strip() + if value: + return value + except Exception: + pass + + body_text = _safe_body_text(page) + for line in body_text.splitlines(): + stripped = line.strip() + if stripped and stripped not in {"settings", "PRO", "Sources", "Chat", "Studio", "more_vert"}: + if "source" not in stripped.lower() and stripped not in {"Create notebook"}: + return stripped + return "Untitled notebook" + + + +def _ensure_source_panel_open(page: Page): + if _source_panel_is_open(page): + return + + openers = [ + page.locator('button[aria-label=" Opens the upload source dialog"]').first, + page.get_by_role("button", name="Upload a source").first, + page.get_by_role("button", name="Upload files").first, + ] + for locator in openers: + try: + if locator.count() and locator.is_visible(): + locator.click(timeout=UI_MEDIUM_TIMEOUT_MS) + page.wait_for_timeout(600) + if _source_panel_is_open(page): + return + except Exception: + continue + + try: + sources_tab = page.get_by_role("tab", name="Sources").first + if sources_tab.count() and sources_tab.is_visible(): + sources_tab.click(timeout=UI_MEDIUM_TIMEOUT_MS) + page.wait_for_timeout(800) + if _source_panel_is_open(page): + return + add_source_button = page.get_by_role("button", name="Add source").first + if add_source_button.count() and add_source_button.is_visible(): + add_source_button.click(timeout=UI_MEDIUM_TIMEOUT_MS) + page.wait_for_timeout(800) + if _source_panel_is_open(page): + return + except Exception: + pass + + raise NotebookUIError("source-panel-unavailable", "NotebookLM did not open the source upload panel") + + + +def _insert_source_url(page: Page, source_url: str): + textarea = _find_first_visible(page, SOURCE_URL_TEXTAREA_SELECTORS) + if not textarea: + try: + page.get_by_role("button", name="Websites").first.click(timeout=UI_MEDIUM_TIMEOUT_MS) + except Exception: + pass + textarea = _find_first_visible(page, SOURCE_URL_TEXTAREA_SELECTORS) + if not textarea: + raise NotebookUIError("source-url-input-missing", "NotebookLM did not expose the URL source input") + + textarea.fill(source_url, timeout=UI_MEDIUM_TIMEOUT_MS) + insert_button = page.get_by_role("button", name="Insert").first + if insert_button.count() == 0: + raise NotebookUIError("source-url-insert-missing", "NotebookLM did not expose the URL insert action") + insert_button.click(timeout=UI_MEDIUM_TIMEOUT_MS) + + + +def _upload_local_file(page: Page, file_path: Path): + input_locator = _find_first_existing(page, SOURCE_FILE_INPUT_SELECTORS) + if not input_locator: + try: + page.get_by_role("button", name="Upload files").first.click(timeout=UI_MEDIUM_TIMEOUT_MS) + except Exception: + pass + input_locator = _find_first_existing(page, SOURCE_FILE_INPUT_SELECTORS) + if not input_locator: + raise NotebookUIError("source-file-input-missing", "NotebookLM did not expose the local file upload input") + + input_locator.set_input_files(str(file_path)) + + + +def _wait_for_source_count_increase(page: Page, baseline_count: int) -> int: + deadline = time.time() + SOURCE_UPLOAD_TIMEOUT_SECONDS + last_error = None + stable_count = None + stable_rounds = 0 + while time.time() < deadline: + current_count = get_source_count(page) + if current_count > baseline_count: + if current_count == stable_count: + stable_rounds += 1 + else: + stable_count = current_count + stable_rounds = 1 + if stable_rounds >= 2: + return current_count + + body_text = _safe_body_text(page).lower() + for pattern in SOURCE_ERROR_PATTERNS: + match = re.search(pattern, body_text) + if match: + last_error = match.group(0) + break + if last_error: + raise NotebookUIError( + "source-upload-failed", + f"NotebookLM reported a source import failure: {last_error}", + details={"body_excerpt": _safe_body_text(page)[:1200]}, + ) + + page.wait_for_timeout(1000) + + raise NotebookUIError( + "source-processing-timeout", + "NotebookLM did not confirm the source upload/import before timeout", + details={"body_excerpt": _safe_body_text(page)[:1200]}, + ) + + + +def _find_first_visible(page: Page, selectors: List[str]): + for selector in selectors: + try: + locator = page.locator(selector).first + if locator.count() and locator.is_visible(): + return locator + except Exception: + continue + return None + + + +def _source_panel_is_open(page: Page) -> bool: + try: + for button_name in ["Upload files", "Websites", "Drive", "Copied text"]: + locator = page.get_by_role("button", name=button_name).first + if locator.count() and locator.is_visible(): + return True + except Exception: + pass + + return bool(wait_for_any_selector(page, SOURCE_PANEL_READY_SELECTORS, timeout_ms=1200)) + + + +def _find_first_existing(page: Page, selectors: List[str]): + for selector in selectors: + try: + locator = page.locator(selector).first + if locator.count(): + return locator + except Exception: + continue + return None + + + +def _looks_like_pdf_url(source_url: str) -> bool: + return ".pdf" in source_url.lower().split("#", 1)[0].split("?", 1)[0] + + + +def _download_pdf(source_url: str) -> Path: + headers = { + "User-Agent": "Mozilla/5.0", + "Accept": "application/pdf,application/octet-stream;q=0.9,*/*;q=0.8", + } + request = urllib.request.Request(source_url, headers=headers) + proxy_url = ( + os.getenv(NOTEBOOKLM_PROXY_URL_ENV) + or os.getenv("HTTPS_PROXY") + or os.getenv("https_proxy") + or os.getenv("HTTP_PROXY") + or os.getenv("http_proxy") + or os.getenv("ALL_PROXY") + or os.getenv("all_proxy") + ) + proxies = {} + if proxy_url: + proxies["http"] = proxy_url + proxies["https"] = proxy_url + bypass = os.getenv(NOTEBOOKLM_PROXY_BYPASS_ENV) or os.getenv("NO_PROXY") or os.getenv("no_proxy") + if bypass: + for host in [item.strip() for item in bypass.split(',') if item.strip()]: + proxies[host] = None + opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies)) + try: + with opener.open(request, timeout=60) as response: + content_type = (response.headers.get("Content-Type") or "").lower() + if "pdf" not in content_type and not _looks_like_pdf_url(source_url): + raise NotebookUIError( + "pdf-download-not-pdf", + f"URL did not return a PDF document (Content-Type: {content_type or 'unknown'})", + ) + with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as handle: + handle.write(response.read()) + return Path(handle.name) + except NotebookUIError: + raise + except Exception as error: + raise NotebookUIError("pdf-download-failed", f"Failed to download PDF URL: {error}") from error + + + +def _classify_url_source(source_url: str) -> str: + lowered = source_url.lower() + if "youtube.com/" in lowered or "youtu.be/" in lowered: + return "youtube-url" + if "docs.google.com/document/" in lowered: + return "google-docs-url" + if "docs.google.com/presentation/" in lowered: + return "google-slides-url" + return "web-url" + + + +def _classify_file_source(file_path: Path) -> str: + suffix = file_path.suffix.lower() + if suffix == ".pdf": + return "local-pdf" + return f"local-file:{suffix or 'unknown'}" + + + +def _safe_body_text(page: Page) -> str: + try: + return page.locator("body").inner_text(timeout=3000) + except Exception: + return "" diff --git a/scripts/session_manager.py b/scripts/session_manager.py new file mode 100644 index 0000000..0613583 --- /dev/null +++ b/scripts/session_manager.py @@ -0,0 +1,757 @@ +#!/usr/bin/env python3 +""" +Persistent session manager for NotebookLM. + +This module keeps a long-lived background process alive so multiple CLI +invocations can reuse the same Playwright + BrowserContext + Page objects. +""" + +from __future__ import annotations + +import argparse +import atexit +import json +import os +import signal +import socket +import subprocess +import sys +import tempfile +import time +import uuid +from contextlib import contextmanager +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +from patchright.sync_api import sync_playwright + +try: + import fcntl # type: ignore +except ImportError: # pragma: no cover - non-Unix fallback + fcntl = None + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent)) + +from auth_manager import AuthManager +from browser_session import BrowserSession +from config import DATA_DIR, QUERY_TIMEOUT_SECONDS +from notebook_navigation import normalize_notebooklm_url + + +SESSIONS_FILE = DATA_DIR / "sessions.json" +SESSIONS_LOCK_FILE = DATA_DIR / "sessions.lock" +RUNTIME_DIR = DATA_DIR / "session_runtime" +SOCKET_FILE = RUNTIME_DIR / "session_manager.sock" +PID_FILE = RUNTIME_DIR / "session_manager.pid" +DEFAULT_IDLE_TIMEOUT_SECONDS = int(os.getenv("NOTEBOOKLM_SESSION_IDLE_TIMEOUT", "1800")) +DEFAULT_CLIENT_TIMEOUT_SECONDS = max(QUERY_TIMEOUT_SECONDS + 90, 210) +DAEMON_START_TIMEOUT_SECONDS = 12 +ACTIVE_STATUSES = {"active", "busy"} +TERMINAL_STATUSES = {"closed", "expired", "orphaned", "error"} + + +class SessionManagerError(RuntimeError): + """Raised when a session manager command fails.""" + + +def utcnow_iso() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +@contextmanager +def file_lock(path: Path): + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "a+", encoding="utf-8") as handle: + if fcntl is not None: + fcntl.flock(handle.fileno(), fcntl.LOCK_EX) + try: + yield + finally: + if fcntl is not None: + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + + +class SessionMetadataStore: + """Persists serializable session metadata to data/sessions.json.""" + + def __init__(self, path: Path = SESSIONS_FILE): + self.path = path + DATA_DIR.mkdir(parents=True, exist_ok=True) + + def _default_state(self) -> Dict[str, Any]: + return { + "version": 1, + "updated_at": utcnow_iso(), + "daemon": { + "pid": None, + "started_at": None, + "socket": str(SOCKET_FILE), + "idle_timeout_seconds": DEFAULT_IDLE_TIMEOUT_SECONDS, + "status": "stopped", + }, + "sessions": {}, + } + + def load(self) -> Dict[str, Any]: + if not self.path.exists(): + return self._default_state() + + with file_lock(SESSIONS_LOCK_FILE): + try: + with open(self.path, "r", encoding="utf-8") as handle: + data = json.load(handle) + except Exception: + return self._default_state() + + if not isinstance(data, dict): + return self._default_state() + data.setdefault("version", 1) + data.setdefault("updated_at", utcnow_iso()) + data.setdefault("daemon", self._default_state()["daemon"]) + data.setdefault("sessions", {}) + return data + + def save(self, data: Dict[str, Any]): + data["updated_at"] = utcnow_iso() + self.path.parent.mkdir(parents=True, exist_ok=True) + with file_lock(SESSIONS_LOCK_FILE): + with tempfile.NamedTemporaryFile("w", delete=False, dir=str(self.path.parent), encoding="utf-8") as handle: + json.dump(data, handle, indent=2, ensure_ascii=False) + handle.flush() + os.fsync(handle.fileno()) + tmp_path = Path(handle.name) + os.replace(tmp_path, self.path) + + def list_sessions(self) -> List[Dict[str, Any]]: + data = self.load() + sessions = list(data.get("sessions", {}).values()) + sessions.sort(key=lambda item: item.get("last_activity", ""), reverse=True) + return sessions + + def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: + data = self.load() + return data.get("sessions", {}).get(session_id) + + def upsert_session(self, session: Dict[str, Any]): + data = self.load() + data["sessions"][session["session_id"]] = session + self.save(data) + + def update_session(self, session_id: str, **updates): + data = self.load() + session = data.get("sessions", {}).get(session_id) + if not session: + return None + session.update(updates) + data["sessions"][session_id] = session + self.save(data) + return session + + def set_daemon(self, *, pid: Optional[int], started_at: Optional[str], status: str, idle_timeout_seconds: int): + data = self.load() + data["daemon"] = { + "pid": pid, + "started_at": started_at, + "socket": str(SOCKET_FILE), + "idle_timeout_seconds": idle_timeout_seconds, + "status": status, + } + self.save(data) + + def mark_runtime_sessions_orphaned(self, reason: str = "session daemon is not running anymore") -> int: + data = self.load() + changed = 0 + for session in data.get("sessions", {}).values(): + if session.get("status") in ACTIVE_STATUSES: + session["status"] = "orphaned" + session["last_error"] = reason + changed += 1 + if changed: + daemon = data.get("daemon", {}) + daemon["status"] = "stopped" + daemon["pid"] = None + data["daemon"] = daemon + self.save(data) + return changed + + +class SessionManagerClient: + """Client for sending commands to the background daemon.""" + + def __init__(self, socket_path: Path = SOCKET_FILE): + self.socket_path = socket_path + self.store = SessionMetadataStore() + + def is_daemon_running(self) -> bool: + if not self.socket_path.exists(): + return False + try: + self._send({"command": "ping"}, timeout_seconds=5) + return True + except Exception: + return False + + def reconcile_runtime_state(self) -> int: + if self.is_daemon_running(): + return 0 + self._cleanup_stale_runtime_files() + return self.store.mark_runtime_sessions_orphaned() + + def ensure_daemon(self): + if self.is_daemon_running(): + return + + self._cleanup_stale_runtime_files() + + cmd = [sys.executable, str(Path(__file__).resolve()), "_serve"] + subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + start_new_session=True, + cwd=str(Path(__file__).parent.parent), + ) + + deadline = time.time() + DAEMON_START_TIMEOUT_SECONDS + last_error = None + while time.time() < deadline: + try: + if self.is_daemon_running(): + return + except Exception as error: + last_error = error + time.sleep(0.2) + + raise SessionManagerError( + f"Failed to start session daemon within {DAEMON_START_TIMEOUT_SECONDS}s" + + (f": {last_error}" if last_error else "") + ) + + def request(self, command: str, *, auto_start: bool = False, timeout_seconds: Optional[int] = None, **payload) -> Dict[str, Any]: + if auto_start: + self.ensure_daemon() + else: + self.reconcile_runtime_state() + if not self.is_daemon_running(): + raise SessionManagerError("Session daemon is not running") + + request = {"command": command, **payload} + response = self._send(request, timeout_seconds=timeout_seconds or DEFAULT_CLIENT_TIMEOUT_SECONDS) + if not response.get("ok"): + raise SessionManagerError(response.get("error", "Unknown session manager error")) + return response["result"] + + def _cleanup_stale_runtime_files(self): + if SOCKET_FILE.exists(): + try: + SOCKET_FILE.unlink() + except OSError: + pass + if PID_FILE.exists(): + try: + PID_FILE.unlink() + except OSError: + pass + + def _send(self, request: Dict[str, Any], timeout_seconds: int) -> Dict[str, Any]: + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: + sock.settimeout(timeout_seconds) + sock.connect(str(self.socket_path)) + payload = (json.dumps(request, ensure_ascii=False) + "\n").encode("utf-8") + sock.sendall(payload) + sock.shutdown(socket.SHUT_WR) + + chunks: List[bytes] = [] + while True: + chunk = sock.recv(65536) + if not chunk: + break + chunks.append(chunk) + + raw = b"".join(chunks).decode("utf-8").strip() + if not raw: + raise SessionManagerError("Empty response from session daemon") + return json.loads(raw) + + +def ask_via_session_manager(session_id: str, question: str) -> Dict[str, Any]: + """Public helper for ask_question.py.""" + client = SessionManagerClient() + return client.request( + "ask", + session_id=session_id, + question=question, + timeout_seconds=DEFAULT_CLIENT_TIMEOUT_SECONDS, + ) + + +class NotebookSessionDaemon: + """Long-lived process that keeps Playwright + BrowserContext alive.""" + + def __init__(self, idle_timeout_seconds: int = DEFAULT_IDLE_TIMEOUT_SECONDS): + self.idle_timeout_seconds = idle_timeout_seconds + self.store = SessionMetadataStore() + self.auth = AuthManager() + self.playwright = None + self.context = None + self.sessions: Dict[str, BrowserSession] = {} + self.running = True + self.started_at = utcnow_iso() + self.pid = os.getpid() + self.preferred_email = self.auth.get_auth_info().get("google_account_email") + + def run(self): + DATA_DIR.mkdir(parents=True, exist_ok=True) + RUNTIME_DIR.mkdir(parents=True, exist_ok=True) + + self.store.mark_runtime_sessions_orphaned("session daemon restarted before live sessions could be reused") + self.store.set_daemon( + pid=self.pid, + started_at=self.started_at, + status="running", + idle_timeout_seconds=self.idle_timeout_seconds, + ) + + self._install_signal_handlers() + atexit.register(self._cleanup_runtime_files) + + if SOCKET_FILE.exists(): + SOCKET_FILE.unlink() + + with open(PID_FILE, "w", encoding="utf-8") as handle: + handle.write(str(self.pid)) + + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server: + server.bind(str(SOCKET_FILE)) + os.chmod(SOCKET_FILE, 0o600) + server.listen(16) + server.settimeout(1.0) + + while self.running: + try: + conn, _ = server.accept() + except socket.timeout: + self.gc_sessions() + continue + except KeyboardInterrupt: + break + + with conn: + response = self._handle_connection(conn) + conn.sendall((json.dumps(response, ensure_ascii=False) + "\n").encode("utf-8")) + + self.shutdown() + + def _handle_connection(self, conn: socket.socket) -> Dict[str, Any]: + chunks: List[bytes] = [] + while True: + chunk = conn.recv(65536) + if not chunk: + break + chunks.append(chunk) + raw = b"".join(chunks).decode("utf-8").strip() + if not raw: + return {"ok": False, "error": "Empty request"} + + try: + request = json.loads(raw) + result = self._dispatch(request) + return {"ok": True, "result": result} + except Exception as error: + return {"ok": False, "error": str(error)} + + def _dispatch(self, request: Dict[str, Any]) -> Dict[str, Any]: + command = request.get("command") + + if command == "ping": + return { + "status": "ok", + "pid": self.pid, + "started_at": self.started_at, + "session_count": len(self.sessions), + "idle_timeout_seconds": self.idle_timeout_seconds, + } + if command == "create": + notebook_url = request.get("notebook_url") + if not notebook_url: + raise SessionManagerError("Missing notebook_url") + return self.create_session(notebook_url) + if command == "ask": + return self.ask_session(request.get("session_id"), request.get("question")) + if command == "list": + return self.list_sessions() + if command == "info": + return self.info_session(request.get("session_id")) + if command == "reset": + return self.reset_session(request.get("session_id")) + if command == "close": + return self.close_session(request.get("session_id"), reason="closed") + if command == "gc": + return self.gc_sessions() + + raise SessionManagerError(f"Unsupported command: {command}") + + def _install_signal_handlers(self): + def handle_signal(signum, _frame): + self.running = False + + for signum in (signal.SIGTERM, signal.SIGINT): + try: + signal.signal(signum, handle_signal) + except Exception: + pass + + def _cleanup_runtime_files(self): + for path in (SOCKET_FILE, PID_FILE): + if path.exists(): + try: + path.unlink() + except OSError: + pass + + def _ensure_browser(self): + if self.context is not None: + return + if not self.auth.is_authenticated(): + raise SessionManagerError("NotebookLM authentication is missing. Run: python scripts/run.py auth_manager.py setup") + + self.playwright = sync_playwright().start() + from browser_utils import BrowserFactory # local import to avoid import cycles during daemon spawn + + self.context = BrowserFactory.launch_persistent_context( + self.playwright, + headless=True, + ) + self.preferred_email = self.auth.get_auth_info().get("google_account_email") + + def _build_metadata(self, session: BrowserSession, *, status: str, last_error: Optional[str] = None) -> Dict[str, Any]: + return { + "session_id": session.id, + "notebook_url": session.notebook_url, + "created_at": datetime.fromtimestamp(session.created_at, tz=timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"), + "last_activity": datetime.fromtimestamp(session.last_activity, tz=timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"), + "message_count": session.message_count, + "status": status, + "last_error": last_error, + "daemon_pid": self.pid, + "idle_timeout_seconds": self.idle_timeout_seconds, + } + + def _get_live_session(self, session_id: Optional[str]) -> BrowserSession: + if not session_id: + raise SessionManagerError("Missing session_id") + session = self.sessions.get(session_id) + if session is None: + metadata = self.store.get_session(session_id) + if metadata and metadata.get("status") == "orphaned": + raise SessionManagerError( + "Session exists in metadata but its live browser process is gone. Create a new session instead." + ) + raise SessionManagerError(f"Live session not found: {session_id}") + return session + + def create_session(self, notebook_url: str) -> Dict[str, Any]: + self.gc_sessions() + self._ensure_browser() + + session_id = f"session-{uuid.uuid4().hex[:12]}" + normalized_url = normalize_notebooklm_url(notebook_url) + session = BrowserSession( + session_id=session_id, + context=self.context, + notebook_url=normalized_url, + preferred_email=self.preferred_email, + ) + self.sessions[session_id] = session + metadata = self._build_metadata(session, status="active") + self.store.upsert_session(metadata) + return metadata + + def ask_session(self, session_id: Optional[str], question: Optional[str]) -> Dict[str, Any]: + if not question: + raise SessionManagerError("Missing question") + session = self._get_live_session(session_id) + + self.store.update_session(session.id, status="busy", last_error=None) + result = session.ask(question) + if result.get("status") != "success": + error_message = result.get("error", "Unknown NotebookLM error") + self.store.update_session( + session.id, + status="active", + last_error=error_message, + last_activity=datetime.fromtimestamp(session.last_activity, tz=timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"), + message_count=session.message_count, + ) + raise SessionManagerError(error_message) + + metadata = self._build_metadata(session, status="active") + self.store.upsert_session(metadata) + return { + **metadata, + "question": question, + "answer": result["answer"], + } + + def list_sessions(self) -> Dict[str, Any]: + self.gc_sessions() + sessions = [] + for record in self.store.list_sessions(): + record = dict(record) + record["live"] = record.get("session_id") in self.sessions + sessions.append(record) + return { + "count": len(sessions), + "sessions": sessions, + "daemon_pid": self.pid, + "idle_timeout_seconds": self.idle_timeout_seconds, + } + + def info_session(self, session_id: Optional[str]) -> Dict[str, Any]: + if not session_id: + raise SessionManagerError("Missing session_id") + self.gc_sessions() + record = self.store.get_session(session_id) + if not record: + raise SessionManagerError(f"Session not found: {session_id}") + record = dict(record) + record["live"] = session_id in self.sessions + return record + + def reset_session(self, session_id: Optional[str]) -> Dict[str, Any]: + session = self._get_live_session(session_id) + notebook_url = session.notebook_url + created_at = session.created_at + session.close() + + replacement = BrowserSession( + session_id=session.id, + context=self.context, + notebook_url=notebook_url, + preferred_email=self.preferred_email, + ) + replacement.created_at = created_at + self.sessions[session.id] = replacement + metadata = self._build_metadata(replacement, status="active") + self.store.upsert_session(metadata) + return metadata + + def close_session(self, session_id: Optional[str], *, reason: str) -> Dict[str, Any]: + if not session_id: + raise SessionManagerError("Missing session_id") + session = self.sessions.pop(session_id, None) + record = self.store.get_session(session_id) + if session is None and record is None: + raise SessionManagerError(f"Session not found: {session_id}") + + if session is not None: + session.close() + record = self._build_metadata(session, status=reason) + elif record is not None: + record = dict(record) + record["status"] = reason + + record["closed_at"] = utcnow_iso() + record["last_error"] = None if reason == "closed" else record.get("last_error") + self.store.upsert_session(record) + return record + + def gc_sessions(self) -> Dict[str, Any]: + expired_ids: List[str] = [] + for session_id, session in list(self.sessions.items()): + if session.is_expired(self.idle_timeout_seconds): + expired_ids.append(session_id) + self.close_session(session_id, reason="expired") + + return { + "expired_count": len(expired_ids), + "expired_session_ids": expired_ids, + "remaining_live_sessions": len(self.sessions), + } + + def shutdown(self): + for session in list(self.sessions.values()): + try: + session.close() + except Exception: + pass + self.sessions.clear() + + if self.context is not None: + try: + self.context.close() + except Exception: + pass + self.context = None + + if self.playwright is not None: + try: + self.playwright.stop() + except Exception: + pass + self.playwright = None + + self.store.set_daemon( + pid=None, + started_at=self.started_at, + status="stopped", + idle_timeout_seconds=self.idle_timeout_seconds, + ) + self._cleanup_runtime_files() + + +def _print_session_list(result: Dict[str, Any]): + sessions = result.get("sessions", []) + if not sessions: + print("📭 No sessions recorded") + return + + print("\n🧠 NotebookLM Sessions:") + for session in sessions: + live_mark = " live" if session.get("live") else " stored" + print(f"\n • {session['session_id']} [{session['status']}{live_mark}]") + print(f" Notebook: {session['notebook_url']}") + print(f" Created: {session['created_at']}") + print(f" Last activity: {session['last_activity']}") + print(f" Messages: {session.get('message_count', 0)}") + if session.get("last_error"): + print(f" Last error: {session['last_error']}") + + +def _print_session_info(result: Dict[str, Any]): + print(f"Session ID: {result['session_id']}") + print(f"Status: {result['status']}") + print(f"Live: {'yes' if result.get('live') else 'no'}") + print(f"Notebook: {result['notebook_url']}") + print(f"Created: {result['created_at']}") + print(f"Last activity: {result['last_activity']}") + print(f"Messages: {result.get('message_count', 0)}") + if result.get("closed_at"): + print(f"Closed at: {result['closed_at']}") + if result.get("last_error"): + print(f"Last error: {result['last_error']}") + + +def main(): + parser = argparse.ArgumentParser(description="Manage persistent NotebookLM sessions") + subparsers = parser.add_subparsers(dest="command", help="Commands") + + create_parser = subparsers.add_parser("create", help="Create a persistent NotebookLM session") + create_parser.add_argument("--notebook-url", required=True, help="NotebookLM notebook URL") + + ask_parser = subparsers.add_parser("ask", help="Ask a question through an existing session") + ask_parser.add_argument("--session-id", required=True, help="Persistent session ID") + ask_parser.add_argument("--question", required=True, help="Question to ask") + + subparsers.add_parser("list", help="List recorded sessions") + + info_parser = subparsers.add_parser("info", help="Show one session") + info_parser.add_argument("--session-id", required=True, help="Persistent session ID") + + reset_parser = subparsers.add_parser("reset", help="Reset a live session by recreating its page") + reset_parser.add_argument("--session-id", required=True, help="Persistent session ID") + + close_parser = subparsers.add_parser("close", help="Close a live session") + close_parser.add_argument("--session-id", required=True, help="Persistent session ID") + + subparsers.add_parser("gc", help="Collect expired live sessions") + subparsers.add_parser("_serve", help=argparse.SUPPRESS) + + args = parser.parse_args() + + if args.command == "_serve": + daemon = NotebookSessionDaemon() + daemon.run() + return 0 + + client = SessionManagerClient() + client.reconcile_runtime_state() + store = SessionMetadataStore() + + try: + if args.command == "create": + result = client.request("create", auto_start=True, notebook_url=args.notebook_url) + print(f"✅ Created session: {result['session_id']}") + print(json.dumps(result, indent=2, ensure_ascii=False)) + return 0 + + if args.command == "ask": + result = client.request( + "ask", + session_id=args.session_id, + question=args.question, + timeout_seconds=DEFAULT_CLIENT_TIMEOUT_SECONDS, + ) + print("\n" + "=" * 60) + print(f"Session: {result['session_id']}") + print(f"Question: {result['question']}") + print("=" * 60) + print() + print(result["answer"]) + print() + print("=" * 60) + return 0 + + if args.command == "list": + if client.is_daemon_running(): + result = client.request("list") + else: + result = { + "count": len(store.list_sessions()), + "sessions": [ + {**item, "live": False} + for item in store.list_sessions() + ], + } + _print_session_list(result) + return 0 + + if args.command == "info": + if client.is_daemon_running(): + result = client.request("info", session_id=args.session_id) + else: + record = store.get_session(args.session_id) + if not record: + raise SessionManagerError(f"Session not found: {args.session_id}") + result = {**record, "live": False} + _print_session_info(result) + return 0 + + if args.command == "reset": + result = client.request("reset", session_id=args.session_id) + print(f"✅ Reset session: {result['session_id']}") + print(json.dumps(result, indent=2, ensure_ascii=False)) + return 0 + + if args.command == "close": + result = client.request("close", session_id=args.session_id) + print(f"✅ Closed session: {result['session_id']}") + print(json.dumps(result, indent=2, ensure_ascii=False)) + return 0 + + if args.command == "gc": + if client.is_daemon_running(): + result = client.request("gc") + else: + result = { + "expired_count": 0, + "expired_session_ids": [], + "remaining_live_sessions": 0, + "note": "Session daemon is not running; only persisted metadata is available.", + } + print(json.dumps(result, indent=2, ensure_ascii=False)) + return 0 + + parser.print_help() + return 0 + + except SessionManagerError as error: + print(f"❌ {error}") + return 1 + + +if __name__ == "__main__": + sys.exit(main())