diff --git a/SKILL.md b/SKILL.md index 2be7e16..fe8d9b1 100755 --- a/SKILL.md +++ b/SKILL.md @@ -106,8 +106,59 @@ python scripts/run.py notebook_manager.py remove --id notebook-id ``` ### Quick Workflow -1. Check library: `python scripts/run.py notebook_manager.py list` -2. Ask question: `python scripts/run.py ask_question.py --question "..." --notebook-id ID` +```bash +notebook_manager.py sync # discover all notebooks +notebook_manager.py find "topic" # find by name or source +ask_question.py --question "..." --notebook-id ID # ask a question +``` + +### Step 3: Manage Notebooks (Web-Backed) + +```bash +# Discover all notebooks from web +python scripts/run.py notebook_manager.py sync +python scripts/run.py notebook_manager.py sync --deep # also scrape source names +python scripts/run.py notebook_manager.py sync --deep --stale # only re-scrape changed notebooks + +# Search notebooks (instant, searches names + source filenames) +python scripts/run.py notebook_manager.py find "eigenform" + +# Import a notebook by URL (auto-discovers title + sources) +python scripts/run.py notebook_manager.py import --url "https://notebooklm.google.com/notebook/..." + +# Create, rename, delete notebooks on web +python scripts/run.py notebook_manager.py create --title "New Research" +python scripts/run.py notebook_manager.py rename --id ID --title "Better Name" +python scripts/run.py notebook_manager.py delete --id ID --confirm + +# Local library operations (instant, no browser) +python scripts/run.py notebook_manager.py list [--format json] +python scripts/run.py notebook_manager.py activate --id ID +python scripts/run.py notebook_manager.py exclude --id ID +python scripts/run.py notebook_manager.py stats +``` + +### Step 3b: Manage Sources + +```bash +# List sources (writes to library, then displays from library) +python scripts/run.py source_manager.py list --notebook-id ID + +# Add sources +python scripts/run.py source_manager.py add-text --text "Your content" --notebook-id ID +python scripts/run.py source_manager.py add-text --from-file /path/to/notes.txt --notebook-id ID +python scripts/run.py source_manager.py add-file --file /path/to/file.pdf --notebook-id ID +python scripts/run.py source_manager.py add-website --urls "https://example.com" --notebook-id ID + +# Read, rename, remove sources +python scripts/run.py source_manager.py read --name "Source Name" --notebook-id ID +python scripts/run.py source_manager.py rename --name "Old Name" --title "New Name" --notebook-id ID +python scripts/run.py source_manager.py remove --name "Source Name" --notebook-id ID + +# Select specific sources for querying (deselect others) +python scripts/run.py source_manager.py select --names "source1,source2" --notebook-id ID +python scripts/run.py source_manager.py select --all --notebook-id ID +``` ### Step 4: Ask Questions @@ -116,10 +167,10 @@ python scripts/run.py notebook_manager.py remove --id notebook-id python scripts/run.py ask_question.py --question "Your question here" # Query specific notebook -python scripts/run.py ask_question.py --question "..." --notebook-id notebook-id +python scripts/run.py ask_question.py --question "..." --notebook-id ID -# Query with notebook URL directly -python scripts/run.py ask_question.py --question "..." --notebook-url "https://..." +# Query only specific sources (others excluded) +python scripts/run.py ask_question.py --question "..." --sources "source1,source2" --notebook-id ID # Show browser for debugging python scripts/run.py ask_question.py --question "..." --show-browser @@ -152,17 +203,37 @@ python scripts/run.py auth_manager.py clear # Clear authentication ### Notebook Management (`notebook_manager.py`) ```bash -python scripts/run.py notebook_manager.py add --url URL --name NAME --description DESC --topics TOPICS -python scripts/run.py notebook_manager.py list -python scripts/run.py notebook_manager.py search --query QUERY +# Web-backed +python scripts/run.py notebook_manager.py sync [--deep] [--deep --stale] [--library-only] [--force] +python scripts/run.py notebook_manager.py import --url URL +python scripts/run.py notebook_manager.py create --title "..." +python scripts/run.py notebook_manager.py rename --id ID --title "..." +python scripts/run.py notebook_manager.py delete --id ID --confirm +# Local library +python scripts/run.py notebook_manager.py list [--format json] +python scripts/run.py notebook_manager.py find "query" python scripts/run.py notebook_manager.py activate --id ID -python scripts/run.py notebook_manager.py remove --id ID +python scripts/run.py notebook_manager.py exclude --id ID python scripts/run.py notebook_manager.py stats ``` ### Question Interface (`ask_question.py`) ```bash -python scripts/run.py ask_question.py --question "..." [--notebook-id ID] [--notebook-url URL] [--show-browser] +python scripts/run.py ask_question.py --question "..." [--notebook-id ID] [--sources "s1,s2"] [--show-browser] +``` + +### Source Management (`source_manager.py`) +```bash +python scripts/run.py source_manager.py list [--notebook-id ID] +python scripts/run.py source_manager.py add-text --text "..." [--from-file PATH] [--notebook-id ID] +python scripts/run.py source_manager.py add-file --file PATH [--notebook-id ID] +python scripts/run.py source_manager.py add-website --urls "URL1,URL2" [--notebook-id ID] +python scripts/run.py source_manager.py read --name "Source Name" [--notebook-id ID] +python scripts/run.py source_manager.py rename --name "Name" --title "New Name" [--notebook-id ID] +python scripts/run.py source_manager.py select --names "s1,s2" [--notebook-id ID] +python scripts/run.py source_manager.py select --all [--notebook-id ID] +python scripts/run.py source_manager.py remove --name "Source Name" [--notebook-id ID] +python scripts/run.py source_manager.py test [--notebook-id ID] [--show-browser] ``` ### Data Cleanup (`cleanup_manager.py`) diff --git a/scripts/ask_question.py b/scripts/ask_question.py index aa47e4b..e4a10fe 100755 --- a/scripts/ask_question.py +++ b/scripts/ask_question.py @@ -37,7 +37,54 @@ ) -def ask_notebooklm(question: str, notebook_url: str, headless: bool = True) -> str: +def select_sources_for_query(page, source_names: list) -> None: + """Select specific sources before asking a question.""" + from config import SOURCE_ITEM_SELECTOR + + print(f" ๐ŸŽฏ Selecting {len(source_names)} source(s)...") + + # Click Sources tab + sources_tab = page.get_by_text("Sources", exact=True).first + sources_tab.click() + time.sleep(2) + + # Get all source names + all_sources = [] + for el in page.query_selector_all(SOURCE_ITEM_SELECTOR): + label = el.get_attribute("aria-label") or "" + if label: + all_sources.append(label) + + # Resolve fuzzy names + selected = set() + for name in source_names: + name_lower = name.lower() + for src in all_sources: + if name_lower in src.lower() or src.lower() in name_lower: + selected.add(src) + break + + # Toggle checkboxes + for src_name in all_sources: + checkbox = page.query_selector(f"input[aria-label='{src_name}']") + if not checkbox: + continue + is_checked = checkbox.is_checked() + should_check = src_name in selected + if is_checked != should_check: + checkbox.click(force=True) + time.sleep(0.3) + + for s in selected: + print(f" โœ“ {s}") + + # Switch back to Chat tab + chat_tab = page.get_by_text("Chat", exact=True).first + chat_tab.click() + time.sleep(2) + + +def ask_notebooklm(question: str, notebook_url: str, headless: bool = True, sources: list = None) -> str: """ Ask a question to NotebookLM @@ -79,6 +126,11 @@ def ask_notebooklm(question: str, notebook_url: str, headless: bool = True) -> s # Wait for NotebookLM page.wait_for_url(re.compile(r"^https://notebooklm\.google\.com/"), timeout=10000) + # Select specific sources if requested + if sources: + time.sleep(3) + select_sources_for_query(page, sources) + # Wait for query input (MCP approach) print(" โณ Waiting for query input...") query_element = None @@ -193,6 +245,7 @@ def main(): parser.add_argument('--question', required=True, help='Question to ask') parser.add_argument('--notebook-url', help='NotebookLM notebook URL') parser.add_argument('--notebook-id', help='Notebook ID from library') + parser.add_argument('--sources', help='Comma-separated source names to query (others excluded)') parser.add_argument('--show-browser', action='store_true', help='Show browser') args = parser.parse_args() @@ -231,11 +284,17 @@ def main(): print("python scripts/run.py notebook_manager.py add --url URL --name NAME --description DESC --topics TOPICS") return 1 + # Parse sources if provided + sources = None + if args.sources: + sources = [s.strip() for s in args.sources.split(",")] + # Ask the question answer = ask_notebooklm( question=args.question, notebook_url=notebook_url, - headless=not args.show_browser + headless=not args.show_browser, + sources=sources ) if answer: diff --git a/scripts/config.py b/scripts/config.py index 4486b55..88e2193 100755 --- a/scripts/config.py +++ b/scripts/config.py @@ -27,6 +27,21 @@ "[data-message-author='assistant']", ] +# Source Management Selectors +SOURCE_ITEM_SELECTOR = "button.source-stretched-button" +MORE_VERT_SELECTOR = 'button:has-text("more_vert")' +REMOVE_MENU_SELECTOR = "[role='menuitem']:has-text('Remove')" +DELETE_DIALOG_SELECTOR = "mat-dialog-container" +WEBSITE_URL_INPUT_SELECTOR = "[formcontrolname='urls']" +COPIED_TEXT_INPUT_SELECTOR = "textarea[placeholder='Paste text here']" + +# Notebook Management Selectors (homepage) +NOTEBOOK_CARD_SELECTOR = "mat-card.project-button-card" +NOTEBOOK_LINK_SELECTOR = "a[href*='/notebook/']" +NOTEBOOK_MENU_SELECTOR = "button[aria-label='Project Actions Menu']" +NOTEBOOK_TITLE_FC_SELECTOR = "[formcontrolname='title']" +NOTEBOOKLM_HOME_URL = "https://notebooklm.google.com/" + # Browser Configuration BROWSER_ARGS = [ '--disable-blink-features=AutomationControlled', # Patches navigator.webdriver diff --git a/scripts/notebook_manager.py b/scripts/notebook_manager.py index e10e156..7edad0e 100755 --- a/scripts/notebook_manager.py +++ b/scripts/notebook_manager.py @@ -1,410 +1,928 @@ #!/usr/bin/env python3 """ Notebook Library Management for NotebookLM -Manages a library of NotebookLM notebooks with metadata -Based on the MCP server implementation +Manages a library of NotebookLM notebooks with metadata. +Supports both local library operations and web-backed operations +(sync, create, rename, delete) via browser automation. """ import json import argparse -import uuid -import os +import re +import sys +import time from pathlib import Path from typing import Dict, List, Optional, Any from datetime import datetime +sys.path.insert(0, str(Path(__file__).parent)) + class NotebookLibrary: - """Manages a collection of NotebookLM notebooks with metadata""" + """Manages a collection of NotebookLM notebooks with metadata.""" def __init__(self): - """Initialize the notebook library""" - # Store data within the skill directory - skill_dir = Path(__file__).parent.parent - self.data_dir = skill_dir / "data" - self.data_dir.mkdir(parents=True, exist_ok=True) + from config import DATA_DIR, LIBRARY_FILE - self.library_file = self.data_dir / "library.json" + self.data_dir = DATA_DIR + self.data_dir.mkdir(parents=True, exist_ok=True) + self.library_file = LIBRARY_FILE self.notebooks: Dict[str, Dict[str, Any]] = {} self.active_notebook_id: Optional[str] = None - - # Load existing library + self.excluded_ids: List[str] = [] self._load_library() def _load_library(self): - """Load library from disk""" if self.library_file.exists(): try: - with open(self.library_file, 'r') as f: + with open(self.library_file, "r") as f: data = json.load(f) - self.notebooks = data.get('notebooks', {}) - self.active_notebook_id = data.get('active_notebook_id') + self.notebooks = data.get("notebooks", {}) + self.active_notebook_id = data.get("active_notebook_id") + self.excluded_ids = data.get("excluded_ids", []) print(f"๐Ÿ“š Loaded library with {len(self.notebooks)} notebooks") except Exception as e: print(f"โš ๏ธ Error loading library: {e}") self.notebooks = {} self.active_notebook_id = None + self.excluded_ids = [] else: self._save_library() def _save_library(self): - """Save library to disk""" try: data = { - 'notebooks': self.notebooks, - 'active_notebook_id': self.active_notebook_id, - 'updated_at': datetime.now().isoformat() + "notebooks": self.notebooks, + "active_notebook_id": self.active_notebook_id, + "excluded_ids": self.excluded_ids, + "updated_at": datetime.now().isoformat(), } - with open(self.library_file, 'w') as f: + with open(self.library_file, "w") as f: json.dump(data, f, indent=2) except Exception as e: print(f"โŒ Error saving library: {e}") + @staticmethod + def _make_id(name: str, url: str = "") -> str: + slug = name.lower().replace(" ", "-").replace("_", "-")[:50] + # Append short UUID suffix from URL for uniqueness + match = re.search(r"/notebook/([a-f0-9-]+)", url) + if match: + slug += "-" + match.group(1)[:8] + return slug + + @staticmethod + def _extract_notebook_id_from_url(url: str) -> Optional[str]: + """Extract the UUID from a NotebookLM URL.""" + match = re.search(r"/notebook/([a-f0-9-]+)", url) + return match.group(1) if match else None + + # ------------------------------------------------------------------ # + # Local library operations + # ------------------------------------------------------------------ # + def add_notebook( self, url: str, name: str, - description: str, - topics: List[str], - content_types: Optional[List[str]] = None, - use_cases: Optional[List[str]] = None, - tags: Optional[List[str]] = None + description: str = "", + topics: Optional[List[str]] = None, + source_count: int = 0, ) -> Dict[str, Any]: - """ - Add a new notebook to the library - - Args: - url: NotebookLM notebook URL - name: Display name for the notebook - description: What's in this notebook - topics: Topics covered - content_types: Types of content (optional) - use_cases: When to use this notebook (optional) - tags: Additional tags for organization (optional) - - Returns: - The created notebook object - """ - # Generate ID from name - notebook_id = name.lower().replace(' ', '-').replace('_', '-') - - # Check for duplicates + notebook_id = self._make_id(name, url) if notebook_id in self.notebooks: - raise ValueError(f"Notebook with ID '{notebook_id}' already exists") + # Update existing + return self.update_notebook( + notebook_id, + name=name, + description=description, + topics=topics, + url=url, + source_count=source_count, + ) - # Create notebook object notebook = { - 'id': notebook_id, - 'url': url, - 'name': name, - 'description': description, - 'topics': topics, - 'content_types': content_types or [], - 'use_cases': use_cases or [], - 'tags': tags or [], - 'created_at': datetime.now().isoformat(), - 'updated_at': datetime.now().isoformat(), - 'use_count': 0, - 'last_used': None + "id": notebook_id, + "url": url, + "name": name, + "description": description, + "topics": topics or [], + "source_count": source_count, + "created_at": datetime.now().isoformat(), + "updated_at": datetime.now().isoformat(), + "use_count": 0, + "last_used": None, } - - # Add to library self.notebooks[notebook_id] = notebook - - # Set as active if it's the first notebook if len(self.notebooks) == 1: self.active_notebook_id = notebook_id - self._save_library() - - print(f"โœ… Added notebook: {name} ({notebook_id})") return notebook def remove_notebook(self, notebook_id: str) -> bool: - """ - Remove a notebook from the library - - Args: - notebook_id: ID of notebook to remove - - Returns: - True if removed, False if not found - """ if notebook_id in self.notebooks: del self.notebooks[notebook_id] - - # Clear active if it was removed if self.active_notebook_id == notebook_id: - self.active_notebook_id = None - # Set new active if there are other notebooks - if self.notebooks: - self.active_notebook_id = list(self.notebooks.keys())[0] - + self.active_notebook_id = ( + list(self.notebooks.keys())[0] if self.notebooks else None + ) self._save_library() - print(f"โœ… Removed notebook: {notebook_id}") + print(f"โœ… Removed from library: {notebook_id}") return True - - print(f"โš ๏ธ Notebook not found: {notebook_id}") + print(f"โš ๏ธ Not in library: {notebook_id}") return False - def update_notebook( - self, - notebook_id: str, - name: Optional[str] = None, - description: Optional[str] = None, - topics: Optional[List[str]] = None, - content_types: Optional[List[str]] = None, - use_cases: Optional[List[str]] = None, - tags: Optional[List[str]] = None, - url: Optional[str] = None - ) -> Dict[str, Any]: - """ - Update notebook metadata - - Args: - notebook_id: ID of notebook to update - Other args: Fields to update (None = keep existing) - - Returns: - Updated notebook object - """ + def update_notebook(self, notebook_id: str, **kwargs) -> Dict[str, Any]: if notebook_id not in self.notebooks: raise ValueError(f"Notebook not found: {notebook_id}") - - notebook = self.notebooks[notebook_id] - - # Update fields if provided - if name is not None: - notebook['name'] = name - if description is not None: - notebook['description'] = description - if topics is not None: - notebook['topics'] = topics - if content_types is not None: - notebook['content_types'] = content_types - if use_cases is not None: - notebook['use_cases'] = use_cases - if tags is not None: - notebook['tags'] = tags - if url is not None: - notebook['url'] = url - - notebook['updated_at'] = datetime.now().isoformat() - + nb = self.notebooks[notebook_id] + for key in [ + "name", "description", "topics", "url", "source_count", + "sources", "sources_scraped_at", + ]: + if key in kwargs and kwargs[key] is not None: + nb[key] = kwargs[key] + nb["updated_at"] = datetime.now().isoformat() self._save_library() - print(f"โœ… Updated notebook: {notebook['name']}") - return notebook + return nb def get_notebook(self, notebook_id: str) -> Optional[Dict[str, Any]]: - """Get a specific notebook by ID""" return self.notebooks.get(notebook_id) def list_notebooks(self) -> List[Dict[str, Any]]: - """List all notebooks in the library""" return list(self.notebooks.values()) - def search_notebooks(self, query: str) -> List[Dict[str, Any]]: - """ - Search notebooks by query - - Args: - query: Search query (searches name, description, topics, tags) - - Returns: - List of matching notebooks - """ - query_lower = query.lower() + def find_notebooks(self, query: str) -> List[Dict[str, Any]]: + """Fuzzy search across name, description, topics, and source names.""" + q = query.lower() results = [] - - for notebook in self.notebooks.values(): - # Search in various fields - searchable = [ - notebook['name'].lower(), - notebook['description'].lower(), - ' '.join(notebook['topics']).lower(), - ' '.join(notebook['tags']).lower(), - ' '.join(notebook.get('use_cases', [])).lower() - ] - - if any(query_lower in field for field in searchable): - results.append(notebook) - + for nb in self.notebooks.values(): + searchable = " ".join( + [ + nb["name"].lower(), + nb.get("description", "").lower(), + " ".join(nb.get("topics", [])).lower(), + ] + ) + if q in searchable: + results.append({**nb, "_matched_source": None}) + continue + + # Search source names + for src_name in nb.get("sources", []): + if q in src_name.lower(): + results.append({**nb, "_matched_source": src_name}) + break return results def select_notebook(self, notebook_id: str) -> Dict[str, Any]: - """ - Set a notebook as active - - Args: - notebook_id: ID of notebook to activate - - Returns: - The activated notebook - """ if notebook_id not in self.notebooks: raise ValueError(f"Notebook not found: {notebook_id}") - self.active_notebook_id = notebook_id self._save_library() - - notebook = self.notebooks[notebook_id] - print(f"โœ… Activated notebook: {notebook['name']}") - return notebook + nb = self.notebooks[notebook_id] + print(f"โœ… Activated: {nb['name']}") + return nb def get_active_notebook(self) -> Optional[Dict[str, Any]]: - """Get the currently active notebook""" if self.active_notebook_id: return self.notebooks.get(self.active_notebook_id) return None def increment_use_count(self, notebook_id: str) -> Dict[str, Any]: - """ - Increment usage counter for a notebook - - Args: - notebook_id: ID of notebook that was used - - Returns: - Updated notebook - """ if notebook_id not in self.notebooks: raise ValueError(f"Notebook not found: {notebook_id}") - - notebook = self.notebooks[notebook_id] - notebook['use_count'] += 1 - notebook['last_used'] = datetime.now().isoformat() - + nb = self.notebooks[notebook_id] + nb["use_count"] += 1 + nb["last_used"] = datetime.now().isoformat() self._save_library() - return notebook + return nb + + def exclude_notebook(self, notebook_id: str) -> bool: + """Remove from library and prevent sync from re-adding.""" + self.remove_notebook(notebook_id) + uuid = None + # Try to find the UUID from the notebook's URL before removal + for nb in self.notebooks.values(): + if nb["id"] == notebook_id: + uuid = self._extract_notebook_id_from_url(nb.get("url", "")) + break + if not uuid: + uuid = notebook_id # Assume it's already a UUID + if uuid not in self.excluded_ids: + self.excluded_ids.append(uuid) + self._save_library() + print(f"โœ… Excluded: {notebook_id} (won't be re-imported by sync)") + return True def get_stats(self) -> Dict[str, Any]: - """Get library statistics""" total_notebooks = len(self.notebooks) total_topics = set() total_use_count = 0 - - for notebook in self.notebooks.values(): - total_topics.update(notebook['topics']) - total_use_count += notebook['use_count'] - - # Find most used + for nb in self.notebooks.values(): + total_topics.update(nb.get("topics", [])) + total_use_count += nb["use_count"] most_used = None if self.notebooks: - most_used = max( - self.notebooks.values(), - key=lambda n: n['use_count'] - ) - + most_used = max(self.notebooks.values(), key=lambda n: n["use_count"]) return { - 'total_notebooks': total_notebooks, - 'total_topics': len(total_topics), - 'total_use_count': total_use_count, - 'active_notebook': self.get_active_notebook(), - 'most_used_notebook': most_used, - 'library_path': str(self.library_file) + "total_notebooks": total_notebooks, + "total_topics": len(total_topics), + "total_use_count": total_use_count, + "active_notebook": self.get_active_notebook(), + "most_used_notebook": most_used, + "excluded_count": len(self.excluded_ids), + "library_path": str(self.library_file), } + # ------------------------------------------------------------------ # + # Web-backed operations (browser automation) + # ------------------------------------------------------------------ # -def main(): - """Command-line interface for notebook management""" - parser = argparse.ArgumentParser(description='Manage NotebookLM library') + def _run_browser_op(self, operation, headless=True): + """Shared browser lifecycle for web operations.""" + from auth_manager import AuthManager + from browser_utils import BrowserFactory + from patchright.sync_api import sync_playwright + + auth = AuthManager() + if not auth.is_authenticated(): + print("โš ๏ธ Not authenticated. Run: python auth_manager.py setup") + return None + + playwright = None + context = None + try: + playwright = sync_playwright().start() + context = BrowserFactory.launch_persistent_context( + playwright, headless=headless + ) + page = context.new_page() + if not headless: + page.set_viewport_size({"width": 1200, "height": 800}) + return operation(page) + except Exception as e: + print(f" โŒ Error: {e}") + import traceback + traceback.print_exc() + return None + finally: + if context: + try: + context.close() + except: + pass + if playwright: + try: + playwright.stop() + except: + pass + + def _navigate_home(self, page) -> bool: + """Navigate to NotebookLM homepage.""" + from config import NOTEBOOKLM_HOME_URL + + page.goto(NOTEBOOKLM_HOME_URL, wait_until="domcontentloaded") + time.sleep(5) + if "notebooklm.google.com" not in page.url: + print(" โŒ Failed to navigate to NotebookLM") + return False + return True + + def _scrape_notebooks(self, page) -> List[Dict[str, str]]: + """Scrape notebook list from homepage using JavaScript for reliability.""" + results = page.evaluate("""() => { + const notebooks = []; + const cards = document.querySelectorAll('mat-card.project-button-card'); + for (const card of cards) { + try { + // Get URL from the stretched link overlay inside the card + const link = card.querySelector('a[href*="/notebook/"]') + || card.parentElement?.closest('a[href*="/notebook/"]') + || card.parentElement?.querySelector('a[href*="/notebook/"]'); + if (!link) continue; + const href = link.getAttribute('href'); + if (!href) continue; + + // Get text content, filtering noise + const text = card.innerText; + const lines = text.split('\\n') + .map(l => l.trim()) + .filter(l => l && l !== 'more_vert' && l !== 'public' && l !== 'add'); + + // Title: first line with 4+ chars, not a date/source line + let title = 'Untitled notebook'; + for (const line of lines) { + // Remove emojis + const clean = line.replace(/[\\u{1F300}-\\u{1FAFF}\\u{2702}-\\u{27B0}]/gu, '').trim(); + if (clean.length >= 4 + && !clean.match(/source/i) + && !clean.match(/^(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\\s/)) { + title = clean; + break; + } + } + + // Source count + let sourceCount = 0; + for (const line of lines) { + const m = line.match(/(\\d+)\\s+source/); + if (m) { sourceCount = parseInt(m[1]); break; } + } + + const fullUrl = href.startsWith('/') + ? 'https://notebooklm.google.com' + href : href; + const uuidMatch = href.match(/\\/notebook\\/([a-f0-9-]+)/); + + notebooks.push({ + title, + url: fullUrl, + uuid: uuidMatch ? uuidMatch[1] : '', + source_count: sourceCount + }); + } catch(e) { continue; } + } + return notebooks; + }""") + return results - subparsers = parser.add_subparsers(dest='command', help='Commands') + def _find_notebook_card_by_uuid(self, page, uuid: str): + """Find a notebook card on the homepage by UUID. Returns the mat-card element.""" + from config import NOTEBOOK_CARD_SELECTOR + + # The link lives INSIDE the mat-card, so link.closest() finds it + card_index = page.evaluate("""(uuid) => { + const links = document.querySelectorAll('a[href*="/notebook/"]'); + for (const link of links) { + if (link.href.includes(uuid)) { + const card = link.closest('mat-card.project-button-card') + || link.parentElement; + if (!card) continue; + const allCards = document.querySelectorAll('mat-card.project-button-card'); + for (let i = 0; i < allCards.length; i++) { + if (allCards[i] === card) return i; + } + } + } + return -1; + }""", uuid) - # Add command - add_parser = subparsers.add_parser('add', help='Add a notebook') - add_parser.add_argument('--url', required=True, help='NotebookLM URL') - add_parser.add_argument('--name', required=True, help='Display name') - add_parser.add_argument('--description', required=True, help='Description') - add_parser.add_argument('--topics', required=True, help='Comma-separated topics') - add_parser.add_argument('--use-cases', help='Comma-separated use cases') - add_parser.add_argument('--tags', help='Comma-separated tags') + if card_index < 0: + return None - # List command - subparsers.add_parser('list', help='List all notebooks') + cards = page.query_selector_all(NOTEBOOK_CARD_SELECTOR) + if card_index < len(cards): + return cards[card_index] + return None - # Search command - search_parser = subparsers.add_parser('search', help='Search notebooks') - search_parser.add_argument('--query', required=True, help='Search query') + def _deep_scrape_notebook_sources(self, page, nb_id: str, nb: Dict[str, Any]) -> List[str]: + """Open a notebook, click Sources tab, scrape source names, update library.""" + from config import SOURCE_ITEM_SELECTOR - # Activate command - activate_parser = subparsers.add_parser('activate', help='Set active notebook') - activate_parser.add_argument('--id', required=True, help='Notebook ID') + url = nb.get("url", "") + print(f" ๐Ÿ“‘ Deep scraping: {nb['name'][:50]}...") - # Remove command - remove_parser = subparsers.add_parser('remove', help='Remove a notebook') - remove_parser.add_argument('--id', required=True, help='Notebook ID') + page.goto(url, wait_until="domcontentloaded") + page.wait_for_url( + re.compile(r"^https://notebooklm\.google\.com/"), timeout=15000 + ) + time.sleep(3) - # Stats command - subparsers.add_parser('stats', help='Show library statistics') + # Click Sources tab + try: + sources_tab = page.get_by_text("Sources", exact=True).first + sources_tab.click() + time.sleep(2) + except Exception as e: + print(f" โš ๏ธ Could not open Sources tab: {e}") + return [] + + elements = page.query_selector_all(SOURCE_ITEM_SELECTOR) + names = [] + for el in elements: + label = el.get_attribute("aria-label") or "" + if label: + names.append(label) + + nb["sources"] = names + nb["source_count"] = len(names) + nb["sources_scraped_at"] = datetime.now().isoformat() + nb["updated_at"] = datetime.now().isoformat() + self._save_library() + print(f" โœ… {len(names)} sources found") + return names + + def sync_from_web(self, library_only=False, force=False, headless=True, + deep=False, stale_only=False): + """Sync notebooks from NotebookLM web UI into local library.""" + + def _op(page): + if not self._navigate_home(page): + return None + + print(" ๐Ÿ”„ Scraping notebooks from web...") + web_notebooks = self._scrape_notebooks(page) + print(f" Found {len(web_notebooks)} notebooks on web") + + added, updated, skipped = 0, 0, 0 + for wnb in web_notebooks: + uuid = wnb["uuid"] + + # Check exclusions + if not force and uuid in self.excluded_ids: + skipped += 1 + continue + + # In library-only mode, skip notebooks not already imported + if library_only: + found = any( + self._extract_notebook_id_from_url(nb.get("url", "")) == uuid + for nb in self.notebooks.values() + ) + if not found: + skipped += 1 + continue + + # Find existing by URL match + existing_id = None + for nid, nb in self.notebooks.items(): + if self._extract_notebook_id_from_url(nb.get("url", "")) == uuid: + existing_id = nid + break + + if existing_id: + self.update_notebook( + existing_id, + name=wnb["title"], + source_count=wnb["source_count"], + url=wnb["url"], + ) + updated += 1 + else: + self.add_notebook( + url=wnb["url"], + name=wnb["title"], + source_count=wnb["source_count"], + ) + added += 1 - args = parser.parse_args() + self._save_library() + print( + f" โœ… Sync complete: {added} added, {updated} updated, {skipped} skipped" + ) - # Initialize library - library = NotebookLibrary() + # Deep scrape: open each notebook and scrape source names + if deep: + print("\n ๐Ÿ”ฌ Deep scrape: fetching source names per notebook...") + deep_count = 0 + for nid, nb in list(self.notebooks.items()): + if stale_only: + homepage_count = nb.get("source_count", 0) + scraped_sources = nb.get("sources", []) + if len(scraped_sources) == homepage_count and scraped_sources: + continue + self._deep_scrape_notebook_sources(page, nid, nb) + deep_count += 1 + print(f" โœ… Deep scrape complete: {deep_count} notebooks scraped") + + return {"added": added, "updated": updated, "skipped": skipped} + + return self._run_browser_op(_op, headless=headless) + + def import_notebook(self, url: str, headless=True) -> Optional[Dict[str, Any]]: + """Import a single notebook by URL โ€” auto-discovers title and source count.""" + + def _op(page): + print(f" ๐ŸŒ Opening notebook...") + page.goto(url, wait_until="domcontentloaded") + page.wait_for_url( + re.compile(r"^https://notebooklm\.google\.com/"), timeout=15000 + ) + time.sleep(4) + + # Extract title from the page + title = "Untitled notebook" + # Try the notebook title in the header + for sel in [ + "input.title-input", + "[class*='notebook-title']", + "[class*='project-title']", + ]: + el = page.query_selector(sel) + if el: + val = el.evaluate("e => e.value || e.innerText || ''").strip() + if val: + title = val + break + + # If no title found from input, try the page header text + if title == "Untitled notebook": + header = page.query_selector("h1, [class*='heading']") + if header: + text = header.inner_text().strip() + if text and "Create Audio" not in text: + title = text + + # Count sources + from config import SOURCE_ITEM_SELECTOR + + # Click Sources tab to count + try: + sources_tab = page.get_by_text("Sources", exact=True).first + sources_tab.click() + time.sleep(2) + source_count = len( + page.query_selector_all(SOURCE_ITEM_SELECTOR) + ) + except: + source_count = 0 + + print(f" ๐Ÿ““ Title: {title}") + print(f" ๐Ÿ“Ž Sources: {source_count}") + + nb = self.add_notebook( + url=url, name=title, source_count=source_count + ) + nb_id = nb["id"] + + # Activate it + self.select_notebook(nb_id) + print(f" โœ… Imported and activated: {nb_id}") + return nb + + return self._run_browser_op(_op, headless=headless) + + def create_notebook_web(self, title: str, headless=True) -> Optional[Dict[str, Any]]: + """Create a new notebook on the web and add to library.""" + + def _op(page): + if not self._navigate_home(page): + return None + + print(f" โž• Creating notebook: {title}") + create_btn = page.query_selector( + "button[aria-label='Create new notebook']" + ) + if not create_btn: + print(" โŒ Could not find Create button") + return None + create_btn.click() + time.sleep(5) + + # Wait for the redirect to the actual notebook URL + try: + page.wait_for_url(re.compile(r"/notebook/[a-f0-9]{8}-"), timeout=15000) + except: + pass + time.sleep(2) + new_url = page.url + print(f" ๐Ÿ““ URL: {new_url}") + + # Rename it if title isn't "Untitled notebook" + if title and title != "Untitled notebook": + # Go back to homepage to rename via menu + if not self._navigate_home(page): + return None + + # Find the new untitled notebook and rename it + from config import NOTEBOOK_CARD_SELECTOR, NOTEBOOK_MENU_SELECTOR + + cards = page.query_selector_all(NOTEBOOK_CARD_SELECTOR) + for card in cards: + try: + text = card.inner_text() + href = card.evaluate( + "e => e.closest('a')?.getAttribute('href') || ''" + ) + # Match by URL + uuid = self._extract_notebook_id_from_url(new_url) + if uuid and uuid in href: + menu_btn = card.query_selector(NOTEBOOK_MENU_SELECTOR) + if menu_btn: + menu_btn.click() + time.sleep(1) + edit_item = page.query_selector( + "[role='menuitem']:has-text('Edit title')" + ) + if edit_item: + edit_item.click() + time.sleep(2) + from config import NOTEBOOK_TITLE_FC_SELECTOR + + title_input = page.locator( + NOTEBOOK_TITLE_FC_SELECTOR + ) + title_input.fill(title) + time.sleep(1) + page.get_by_role( + "button", name="Save" + ).first.click() + time.sleep(2) + print(f" โœ๏ธ Renamed to: {title}") + break + except: + continue + + nb = self.add_notebook(url=new_url, name=title, source_count=0) + self.select_notebook(nb["id"]) + print(f" โœ… Created and activated: {nb['id']}") + return nb + + return self._run_browser_op(_op, headless=headless) + + def rename_notebook_web( + self, notebook_id: str, new_title: str, headless=True + ) -> bool: + """Rename a notebook on the web and update library.""" + nb = self.get_notebook(notebook_id) + if not nb: + print(f"โŒ Notebook not found in library: {notebook_id}") + return False + + uuid = self._extract_notebook_id_from_url(nb.get("url", "")) + if not uuid: + print(f"โŒ Could not extract UUID from URL: {nb.get('url')}") + return False + + def _op(page): + if not self._navigate_home(page): + return False + + print(f" โœ๏ธ Renaming: {nb['name']} โ†’ {new_title}") + + from config import NOTEBOOK_MENU_SELECTOR, NOTEBOOK_TITLE_FC_SELECTOR + + card = self._find_notebook_card_by_uuid(page, uuid) + if not card: + print(" โŒ Notebook not found on web") + return False + + menu_btn = card.query_selector(NOTEBOOK_MENU_SELECTOR) + menu_btn.click() + time.sleep(1) + + edit_item = page.query_selector( + "[role='menuitem']:has-text('Edit title')" + ) + edit_item.click() + time.sleep(2) + + title_input = page.locator(NOTEBOOK_TITLE_FC_SELECTOR) + title_input.fill(new_title) + time.sleep(1) + + page.get_by_role("button", name="Save").first.click() + time.sleep(2) - # Execute command - if args.command == 'add': - topics = [t.strip() for t in args.topics.split(',')] - use_cases = [u.strip() for u in args.use_cases.split(',')] if args.use_cases else None - tags = [t.strip() for t in args.tags.split(',')] if args.tags else None + self.update_notebook(notebook_id, name=new_title) + print(f" โœ… Renamed!") + return True + + result = self._run_browser_op(_op, headless=headless) + return result if result else False + + def delete_notebook_web( + self, notebook_id: str, confirm: bool = False, headless=True + ) -> bool: + """Delete a notebook from the web and remove from library.""" + nb = self.get_notebook(notebook_id) + if not nb: + print(f"โŒ Notebook not found in library: {notebook_id}") + return False + + if not confirm: + print(f'\nโš ๏ธ This will PERMANENTLY DELETE "{nb["name"]}"') + print(" Run again with --confirm to proceed") + return False + + uuid = self._extract_notebook_id_from_url(nb.get("url", "")) + if not uuid: + print(f"โŒ Could not extract UUID from URL: {nb.get('url')}") + return False + + def _op(page): + if not self._navigate_home(page): + return False + + print(f" ๐Ÿ—‘๏ธ Deleting: {nb['name']}") + + from config import ( + NOTEBOOK_MENU_SELECTOR, + DELETE_DIALOG_SELECTOR, + ) + + card = self._find_notebook_card_by_uuid(page, uuid) + if not card: + print(" โŒ Notebook not found on web") + return False + + menu_btn = card.query_selector(NOTEBOOK_MENU_SELECTOR) + menu_btn.click() + time.sleep(1) + + del_item = page.query_selector( + "[role='menuitem']:has-text('Delete')" + ) + del_item.click() + time.sleep(2) + + dialog = page.query_selector(DELETE_DIALOG_SELECTOR) + if dialog: + del_btn = dialog.query_selector("button:has-text('Delete')") + if del_btn: + del_btn.click(force=True) + time.sleep(3) + + self.remove_notebook(notebook_id) + print(f" โœ… Deleted from web and library") + return True + + result = self._run_browser_op(_op, headless=headless) + return result if result else False + + +# ------------------------------------------------------------------ # +# CLI +# ------------------------------------------------------------------ # + + +def main(): + parser = argparse.ArgumentParser(description="Manage NotebookLM library") + subparsers = parser.add_subparsers(dest="command", help="Commands") + + # --- Local operations --- + add_p = subparsers.add_parser("add", help="Add a notebook to library") + add_p.add_argument("--url", required=True, help="NotebookLM URL") + add_p.add_argument("--name", required=True, help="Display name") + add_p.add_argument("--description", default="", help="Description") + add_p.add_argument("--topics", help="Comma-separated topics") + + list_p = subparsers.add_parser("list", help="List all notebooks in library") + list_p.add_argument("--format", choices=["text", "json"], default="text", help="Output format") + + find_p = subparsers.add_parser("find", help="Fuzzy search notebooks") + find_p.add_argument("query", help="Search query") + + act_p = subparsers.add_parser("activate", help="Set active notebook") + act_p.add_argument("--id", required=True, help="Notebook ID") + + rm_p = subparsers.add_parser("remove", help="Remove from library (local only)") + rm_p.add_argument("--id", required=True, help="Notebook ID") + + excl_p = subparsers.add_parser("exclude", help="Remove + prevent sync re-import") + excl_p.add_argument("--id", required=True, help="Notebook ID") + + subparsers.add_parser("stats", help="Show library statistics") + + # --- Web-backed operations --- + sync_p = subparsers.add_parser("sync", help="Sync notebooks from web") + sync_p.add_argument( + "--library-only", action="store_true", help="Only refresh existing entries" + ) + sync_p.add_argument( + "--force", action="store_true", help="Ignore exclusion list" + ) + sync_p.add_argument( + "--deep", action="store_true", + help="Also scrape source names from each notebook" + ) + sync_p.add_argument( + "--stale", action="store_true", + help="With --deep, only re-scrape notebooks where source count changed" + ) + sync_p.add_argument("--show-browser", action="store_true") + + imp_p = subparsers.add_parser("import", help="Import notebook by URL") + imp_p.add_argument("--url", required=True, help="NotebookLM URL") + imp_p.add_argument("--show-browser", action="store_true") + + create_p = subparsers.add_parser("create", help="Create new notebook on web") + create_p.add_argument("--title", required=True, help="Notebook title") + create_p.add_argument("--show-browser", action="store_true") + + ren_p = subparsers.add_parser("rename", help="Rename notebook on web") + ren_p.add_argument("--id", required=True, help="Notebook ID") + ren_p.add_argument("--title", required=True, help="New title") + ren_p.add_argument("--show-browser", action="store_true") + + del_p = subparsers.add_parser("delete", help="Delete notebook from web") + del_p.add_argument("--id", required=True, help="Notebook ID") + del_p.add_argument( + "--confirm", action="store_true", help="Confirm permanent deletion" + ) + del_p.add_argument("--show-browser", action="store_true") + + args = parser.parse_args() + library = NotebookLibrary() - notebook = library.add_notebook( + if args.command == "add": + topics = [t.strip() for t in args.topics.split(",")] if args.topics else [] + nb = library.add_notebook( url=args.url, name=args.name, description=args.description, topics=topics, - use_cases=use_cases, - tags=tags ) - print(json.dumps(notebook, indent=2)) + print(f"โœ… Added: {nb['id']}") + print(json.dumps(nb, indent=2)) - elif args.command == 'list': + elif args.command == "list": notebooks = library.list_notebooks() - if notebooks: - print("\n๐Ÿ“š Notebook Library:") - for notebook in notebooks: - active = " [ACTIVE]" if notebook['id'] == library.active_notebook_id else "" - print(f"\n ๐Ÿ““ {notebook['name']}{active}") - print(f" ID: {notebook['id']}") - print(f" Topics: {', '.join(notebook['topics'])}") - print(f" Uses: {notebook['use_count']}") + if args.format == "json": + print(json.dumps(notebooks, indent=2)) + elif notebooks: + print(f"\n๐Ÿ“š Notebook Library ({len(notebooks)}):") + for nb in notebooks: + active = " โ˜…" if nb["id"] == library.active_notebook_id else "" + homepage_count = nb.get("source_count", 0) + scraped_sources = nb.get("sources", []) + scraped_len = len(scraped_sources) + + # Staleness check: homepage count vs deep-scraped list length + stale_marker = "" + stale_detail = "" + if scraped_sources and scraped_len != homepage_count: + stale_marker = " โ˜…" + stale_detail = ( + f" (โš ๏ธ stale โ€” count was {scraped_len}, now {homepage_count})" + ) + + print(f"\n ๐Ÿ““ {nb['name']}{active}") + print(f" ID: {nb['id']}") + print(f" Sources: {homepage_count}{stale_detail}") + if nb.get("description"): + print(f" {nb['description'][:80]}") else: - print("๐Ÿ“š Library is empty. Add notebooks with: notebook_manager.py add") + print("๐Ÿ“š Library empty. Run: notebook_manager.py sync") - elif args.command == 'search': - results = library.search_notebooks(args.query) + elif args.command == "find": + results = library.find_notebooks(args.query) if results: print(f"\n๐Ÿ” Found {len(results)} notebooks:") - for notebook in results: - print(f"\n ๐Ÿ““ {notebook['name']} ({notebook['id']})") - print(f" {notebook['description']}") + for nb in results: + active = " โ˜…" if nb["id"] == library.active_notebook_id else "" + print(f" ๐Ÿ““ {nb['name']}{active} ({nb['id']})") + matched_src = nb.get("_matched_source") + if matched_src: + print(f" โ†ณ source match: \"{matched_src}\"") else: - print(f"๐Ÿ” No notebooks found for: {args.query}") + print(f"๐Ÿ” No notebooks match: {args.query}") - elif args.command == 'activate': - notebook = library.select_notebook(args.id) - print(f"Now using: {notebook['name']}") + elif args.command == "activate": + nb = library.select_notebook(args.id) + print(f"Now using: {nb['name']}") - elif args.command == 'remove': - if library.remove_notebook(args.id): - print("Notebook removed from library") + elif args.command == "remove": + library.remove_notebook(args.id) - elif args.command == 'stats': + elif args.command == "exclude": + library.exclude_notebook(args.id) + + elif args.command == "stats": stats = library.get_stats() print("\n๐Ÿ“Š Library Statistics:") - print(f" Total notebooks: {stats['total_notebooks']}") - print(f" Total topics: {stats['total_topics']}") - print(f" Total uses: {stats['total_use_count']}") - if stats['active_notebook']: + print(f" Notebooks: {stats['total_notebooks']}") + print(f" Topics: {stats['total_topics']}") + print(f" Uses: {stats['total_use_count']}") + print(f" Excluded: {stats['excluded_count']}") + if stats["active_notebook"]: print(f" Active: {stats['active_notebook']['name']}") - if stats['most_used_notebook']: - print(f" Most used: {stats['most_used_notebook']['name']} ({stats['most_used_notebook']['use_count']} uses)") - print(f" Library path: {stats['library_path']}") + if stats["most_used_notebook"]: + print( + f" Most used: {stats['most_used_notebook']['name']} ({stats['most_used_notebook']['use_count']})" + ) + + elif args.command == "sync": + headless = not getattr(args, "show_browser", False) + library.sync_from_web( + library_only=args.library_only, + force=args.force, + headless=headless, + deep=args.deep, + stale_only=args.stale, + ) + + elif args.command == "import": + headless = not getattr(args, "show_browser", False) + library.import_notebook(args.url, headless=headless) + + elif args.command == "create": + headless = not getattr(args, "show_browser", False) + library.create_notebook_web(args.title, headless=headless) + + elif args.command == "rename": + headless = not getattr(args, "show_browser", False) + library.rename_notebook_web(args.id, args.title, headless=headless) + + elif args.command == "delete": + headless = not getattr(args, "show_browser", False) + library.delete_notebook_web( + args.id, confirm=args.confirm, headless=headless + ) else: parser.print_help() if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/scripts/source_manager.py b/scripts/source_manager.py new file mode 100644 index 0000000..0b4e9ff --- /dev/null +++ b/scripts/source_manager.py @@ -0,0 +1,866 @@ +#!/usr/bin/env python3 +""" +Source Management for NotebookLM +Manages sources (documents, websites, text) within NotebookLM notebooks via browser automation. +""" + +import argparse +import re +import sys +import time +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional + +from patchright.sync_api import sync_playwright + +sys.path.insert(0, str(Path(__file__).parent)) + +from auth_manager import AuthManager +from notebook_manager import NotebookLibrary +from browser_utils import BrowserFactory, StealthUtils +from config import ( + SOURCE_ITEM_SELECTOR, + MORE_VERT_SELECTOR, + REMOVE_MENU_SELECTOR, + DELETE_DIALOG_SELECTOR, + WEBSITE_URL_INPUT_SELECTOR, + COPIED_TEXT_INPUT_SELECTOR, +) + + +class SourceManager: + """Manages sources within a NotebookLM notebook via browser automation.""" + + def __init__(self, notebook_url: str, headless: bool = True): + self.notebook_url = notebook_url + self.headless = headless + + def _open_sources_tab(self, page) -> bool: + """Navigate to notebook and open the Sources tab.""" + try: + print(" ๐ŸŒ Opening notebook...") + page.goto(self.notebook_url, wait_until="domcontentloaded") + page.wait_for_url( + re.compile(r"^https://notebooklm\.google\.com/"), timeout=15000 + ) + time.sleep(3) + + print(" ๐Ÿ“‘ Opening Sources tab...") + sources_tab = page.get_by_text("Sources", exact=True).first + sources_tab.click() + time.sleep(2) + return True + except Exception as e: + print(f" โŒ Failed to open Sources tab: {e}") + return False + + def _get_source_names(self, page) -> List[str]: + """Read all source names from the Sources panel.""" + elements = page.query_selector_all(SOURCE_ITEM_SELECTOR) + names = [] + for el in elements: + label = el.get_attribute("aria-label") or "" + if label: + names.append(label) + return names + + def _update_library_sources(self, page) -> None: + """Scrape current source names and write them back to the library entry.""" + names = self._get_source_names(page) + library = NotebookLibrary() + my_uuid = library._extract_notebook_id_from_url(self.notebook_url) + if not my_uuid: + return + + for nid, nb in library.notebooks.items(): + nb_uuid = library._extract_notebook_id_from_url(nb.get("url", "")) + if nb_uuid == my_uuid: + nb["sources"] = names + nb["source_count"] = len(names) + nb["sources_scraped_at"] = datetime.now().isoformat() + nb["updated_at"] = datetime.now().isoformat() + library._save_library() + return + + def _find_source_index(self, page, name: str) -> Optional[int]: + """Find a source by name. Exact match first, then case-insensitive substring.""" + sources = page.query_selector_all(SOURCE_ITEM_SELECTOR) + # Exact match + for i, el in enumerate(sources): + label = el.get_attribute("aria-label") or "" + if label == name: + return i + # Case-insensitive substring + name_lower = name.lower() + for i, el in enumerate(sources): + label = el.get_attribute("aria-label") or "" + if name_lower in label.lower(): + print(f" โš ๏ธ Fuzzy match: '{label}'") + return i + return None + + def _wait_for_source_count(self, page, expected_count: int, timeout: int = 60) -> bool: + """Poll until source count reaches expected value or timeout.""" + deadline = time.time() + timeout + while time.time() < deadline: + current = len(page.query_selector_all(SOURCE_ITEM_SELECTOR)) + if current >= expected_count: + return True + time.sleep(2) + return False + + def _run_browser_op(self, operation): + """Shared browser lifecycle: auth check, launch, run operation, cleanup.""" + auth = AuthManager() + if not auth.is_authenticated(): + print("โš ๏ธ Not authenticated. Run: python auth_manager.py setup") + return None + + playwright = None + context = None + try: + playwright = sync_playwright().start() + context = BrowserFactory.launch_persistent_context( + playwright, headless=self.headless + ) + page = context.new_page() + if not self.headless: + page.set_viewport_size({"width": 1200, "height": 800}) + + if not self._open_sources_tab(page): + return None + + return operation(page) + + except Exception as e: + print(f" โŒ Error: {e}") + import traceback + traceback.print_exc() + return None + finally: + if context: + try: + context.close() + except: + pass + if playwright: + try: + playwright.stop() + except: + pass + + # ------------------------------------------------------------------ # + # Public API + # ------------------------------------------------------------------ # + + def list_sources(self) -> Optional[List[Dict[str, str]]]: + """List all sources in the notebook. Writes to library first, prints from library.""" + + def _op(page): + # Scrape and persist to library + self._update_library_sources(page) + + # Read back from library for display + library = NotebookLibrary() + my_uuid = library._extract_notebook_id_from_url(self.notebook_url) + names = [] + if my_uuid: + for nb in library.notebooks.values(): + nb_uuid = library._extract_notebook_id_from_url(nb.get("url", "")) + if nb_uuid == my_uuid: + names = nb.get("sources", []) + break + + print(f"\n๐Ÿ“š Sources ({len(names)}):") + for i, name in enumerate(names): + print(f" [{i}] {name}") + return [{"name": n, "index": i} for i, n in enumerate(names)] + + return self._run_browser_op(_op) + + def add_source_text(self, text: str) -> bool: + """Add a copied-text source to the notebook.""" + + def _op(page): + initial_count = len(page.query_selector_all(SOURCE_ITEM_SELECTOR)) + print(f" ๐Ÿ“ Adding text source ({len(text)} chars)...") + + # Open Add Source dialog + page.get_by_role( + "button", name=re.compile("add source", re.I) + ).first.click() + time.sleep(2) + + # Click "Copied text" + page.get_by_text("Copied text", exact=True).first.click() + time.sleep(2) + + # Fill the textarea + textarea = page.query_selector(COPIED_TEXT_INPUT_SELECTOR) + if not textarea: + print(" โŒ Could not find text input") + return False + textarea.fill(text) + time.sleep(1) + + # Click Insert + insert_btn = page.query_selector('button:has-text("Insert")') + if not insert_btn: + print(" โŒ Could not find Insert button") + return False + insert_btn.click() + print(" โณ Waiting for processing...") + + if self._wait_for_source_count(page, initial_count + 1): + self._update_library_sources(page) + names = self._get_source_names(page) + print(f" โœ… Source added! Now {len(names)} sources.") + return True + else: + print(" โŒ Timeout waiting for source to be added") + return False + + result = self._run_browser_op(_op) + return result if result is not None else False + + def add_source_website(self, urls: List[str]) -> bool: + """Add website/YouTube URL sources to the notebook.""" + for url in urls: + if not url.startswith(("http://", "https://")): + print(f" โŒ Invalid URL: {url}") + return False + + def _op(page): + initial_count = len(page.query_selector_all(SOURCE_ITEM_SELECTOR)) + print(f" ๐Ÿ”— Adding {len(urls)} URL source(s)...") + + for i, url in enumerate(urls): + if i > 0: + # Re-open the add source dialog for subsequent URLs + page.get_by_role( + "button", name=re.compile("add source", re.I) + ).first.click() + time.sleep(2) + page.get_by_text("Websites", exact=True).first.click() + time.sleep(2) + else: + page.get_by_role( + "button", name=re.compile("add source", re.I) + ).first.click() + time.sleep(2) + page.get_by_text("Websites", exact=True).first.click() + time.sleep(2) + + # Use Angular FormControl selector (triggers reactive form validation) + fc = page.locator("[formcontrolname='urls']") + fc.fill(url) + time.sleep(1) + + # Click Insert button (the actual submit for URL sources) + insert_btn = page.get_by_role("button", name="Insert") + try: + insert_btn.wait_for(state="visible", timeout=5000) + if insert_btn.is_enabled(): + insert_btn.click() + else: + print(f" โŒ Insert button disabled for: {url}") + continue + except: + print(f" โŒ Could not find Insert button for: {url}") + continue + + print(f" โณ Processing ({i + 1}/{len(urls)}): {url}") + self._wait_for_source_count(page, initial_count + i + 1, timeout=60) + + final_count = len(page.query_selector_all(SOURCE_ITEM_SELECTOR)) + added = final_count - initial_count + if added > 0: + self._update_library_sources(page) + print(f" โœ… {added} source(s) added! Now {final_count} sources.") + return True + else: + print(" โŒ No sources were added") + return False + + result = self._run_browser_op(_op) + return result if result is not None else False + + def add_source_file(self, file_path: str) -> bool: + """Upload a file as a source to the notebook.""" + path = Path(file_path) + if not path.exists(): + print(f" โŒ File not found: {file_path}") + return False + + def _op(page): + initial_count = len(page.query_selector_all(SOURCE_ITEM_SELECTOR)) + print(f" ๐Ÿ“ Uploading: {path.name}") + + # Open Add Source dialog + page.get_by_role( + "button", name=re.compile("add source", re.I) + ).first.click() + time.sleep(2) + + # Click "Upload files" and handle file chooser + upload_btn = page.get_by_text("Upload files", exact=True).first + with page.expect_file_chooser() as fc_info: + upload_btn.click() + file_chooser = fc_info.value + file_chooser.set_files(str(path)) + print(" โณ Waiting for processing...") + + if self._wait_for_source_count(page, initial_count + 1): + self._update_library_sources(page) + names = self._get_source_names(page) + print(f" โœ… File uploaded! Now {len(names)} sources.") + return True + else: + print(" โŒ Timeout waiting for file to be processed") + return False + + result = self._run_browser_op(_op) + return result if result is not None else False + + def read_source(self, source_name: str) -> Optional[str]: + """Read a source's content (AI summary + document text).""" + + def _op(page): + idx = self._find_source_index(page, source_name) + if idx is None: + print(f" โŒ Source not found: {source_name}") + return None + + sources = page.query_selector_all(SOURCE_ITEM_SELECTOR) + label = sources[idx].get_attribute("aria-label") or source_name + print(f" ๐Ÿ“– Reading: {label}") + + # Click the source to open its detail view + sources[idx].click() + time.sleep(4) + + # Extract the source guide content from the left panel + # The source detail replaces the sources list with guide + content + content_parts = [] + + # Try to get the source guide summary + guide = page.query_selector(".source-guide, [class*='source-guide']") + if guide: + content_parts.append("=== Source Guide ===") + content_parts.append(guide.inner_text()) + + # Get the full panel text as fallback/supplement + # The Sources panel shows the source content after clicking + panel = page.query_selector( + "source-detail, [class*='source-detail'], [class*='source-content']" + ) + if panel: + content_parts.append("\n=== Source Content ===") + content_parts.append(panel.inner_text()) + + # If no specific selectors matched, grab the whole left panel + if not content_parts: + # The sources column is the first major section + try: + # Get all text from the sources/detail area + left_panel = page.evaluate("""() => { + const el = document.querySelector('notebook'); + if (!el) return ''; + // Get the first column's text + const cols = el.querySelectorAll('[class*="column"], [class*="panel"]'); + for (const col of cols) { + const text = col.innerText; + if (text.length > 200) return text; + } + return ''; + }""") + if left_panel: + content_parts.append(left_panel) + except: + pass + + if not content_parts: + # Last resort: dump visible body text and look for the source content + body_text = page.inner_text("body") + # The source name appears followed by its content + if label in body_text: + start = body_text.index(label) + content_parts.append(body_text[start : start + 5000]) + + content = "\n".join(content_parts).strip() + if content: + print(f" โœ… Read {len(content)} characters") + return content + else: + print(" โŒ Could not extract source content") + return None + + return self._run_browser_op(_op) + + def rename_source(self, source_name: str, new_name: str) -> bool: + """Rename a source in the notebook.""" + + def _op(page): + idx = self._find_source_index(page, source_name) + if idx is None: + print(f" โŒ Source not found: {source_name}") + return False + + sources = page.query_selector_all(SOURCE_ITEM_SELECTOR) + label = sources[idx].get_attribute("aria-label") or source_name + print(f" โœ๏ธ Renaming: {label} โ†’ {new_name}") + + # Hover to reveal more_vert menu + sources[idx].hover() + time.sleep(1) + + # Find closest more_vert button + target_box = sources[idx].bounding_box() + more_btns = [] + for mb in page.query_selector_all(MORE_VERT_SELECTOR): + try: + if mb.is_visible(): + box = mb.bounding_box() + if box and box["x"] < 400: + more_btns.append((mb, box)) + except: + pass + + if not more_btns: + print(" โŒ Could not find source menu button") + return False + + best = min(more_btns, key=lambda x: abs(x[1]["y"] - target_box["y"])) + best[0].click() + time.sleep(1) + + # Click "Rename source" + rename_item = page.query_selector( + "[role='menuitem']:has-text('Rename')" + ) + if not rename_item: + print(" โŒ Could not find Rename option") + return False + rename_item.click() + time.sleep(2) + + # Fill the rename input in the dialog + dialog = page.query_selector(DELETE_DIALOG_SELECTOR) + if dialog: + name_input = dialog.query_selector( + "input, [formcontrolname]" + ) + if name_input: + name_input.fill(new_name) + time.sleep(1) + save_btn = dialog.query_selector( + "button:has-text('Save'), button:has-text('Rename')" + ) + if save_btn: + save_btn.click(force=True) + time.sleep(2) + self._update_library_sources(page) + print(f" โœ… Renamed!") + return True + + print(" โŒ Could not complete rename") + return False + + result = self._run_browser_op(_op) + return result if result is not None else False + + def select_sources(self, names: List[str]) -> bool: + """Select only the specified sources (deselect all others). + + This controls which sources NotebookLM uses when answering questions. + """ + + def _op(page): + all_sources = self._get_source_names(page) + if not all_sources: + print(" โŒ No sources found") + return False + + # Resolve names to match (fuzzy) + selected = set() + for name in names: + name_lower = name.lower() + for src in all_sources: + if name_lower in src.lower() or src.lower() in name_lower: + selected.add(src) + break + + if not selected: + print(f" โŒ No sources matched: {names}") + return False + + print(f" ๐ŸŽฏ Selecting {len(selected)}/{len(all_sources)} sources:") + for s in selected: + print(f" โœ“ {s}") + + # Toggle checkboxes + for src_name in all_sources: + checkbox = page.query_selector(f"input[aria-label='{src_name}']") + if not checkbox: + continue + + is_checked = checkbox.is_checked() + should_check = src_name in selected + + if is_checked and not should_check: + checkbox.click(force=True) + time.sleep(0.3) + elif not is_checked and should_check: + checkbox.click(force=True) + time.sleep(0.3) + + print(f" โœ… Source selection updated") + return True + + result = self._run_browser_op(_op) + return result if result is not None else False + + def select_all_sources(self) -> bool: + """Select all sources (reset to default).""" + + def _op(page): + # Click "Select all sources" checkbox + select_all = page.query_selector( + "input[aria-label='Select all sources']" + ) + if select_all: + if not select_all.is_checked(): + select_all.click(force=True) + time.sleep(1) + print(" โœ… All sources selected") + return True + print(" โŒ Could not find Select All checkbox") + return False + + result = self._run_browser_op(_op) + return result if result is not None else False + + def remove_source(self, source_name: str) -> bool: + """Remove a source from the notebook.""" + + def _op(page): + idx = self._find_source_index(page, source_name) + if idx is None: + print(f" โŒ Source not found: {source_name}") + return False + + sources = page.query_selector_all(SOURCE_ITEM_SELECTOR) + label = sources[idx].get_attribute("aria-label") or source_name + initial_count = len(sources) + print(f" ๐Ÿ—‘๏ธ Removing: {label}") + + # Hover to reveal more_vert menu + sources[idx].hover() + time.sleep(1) + + # Find the more_vert button closest to this source + target_box = sources[idx].bounding_box() + more_btns = [] + for mb in page.query_selector_all(MORE_VERT_SELECTOR): + try: + if mb.is_visible(): + box = mb.bounding_box() + if box and box["x"] < 400: + more_btns.append((mb, box)) + except: + pass + + if not more_btns: + print(" โŒ Could not find source menu button") + return False + + # Click the one closest vertically to the target source + best = min(more_btns, key=lambda x: abs(x[1]["y"] - target_box["y"])) + best[0].click() + time.sleep(1) + + # Click "Remove source" + remove_item = page.query_selector(REMOVE_MENU_SELECTOR) + if not remove_item: + print(" โŒ Could not find Remove option") + return False + remove_item.click() + time.sleep(2) + + # Confirm deletion inside the dialog + dialog = page.query_selector(DELETE_DIALOG_SELECTOR) + if dialog: + delete_btn = dialog.query_selector('button:has-text("Delete")') + if delete_btn: + delete_btn.click(force=True) + time.sleep(3) + else: + print(" โŒ Could not find Delete confirmation button") + return False + else: + # Fallback: force-click any visible Delete button + delete_btn = page.query_selector('button:has-text("Delete")') + if delete_btn: + delete_btn.click(force=True) + time.sleep(3) + else: + print(" โŒ No confirmation dialog found") + return False + + # Verify deletion + final_count = len(page.query_selector_all(SOURCE_ITEM_SELECTOR)) + if final_count < initial_count: + self._update_library_sources(page) + print(f" โœ… Removed! Now {final_count} sources.") + return True + else: + print(" โŒ Source count unchanged after deletion") + return False + + result = self._run_browser_op(_op) + return result if result is not None else False + + def run_test(self) -> bool: + """Integration test: add text source, read it, remove it.""" + print("\n๐Ÿงช Running source management integration test...") + + # Step 1: List initial sources + print("\n--- Step 1: List sources ---") + initial = self.list_sources() + if initial is None: + print("โŒ FAIL: Could not list sources") + return False + initial_count = len(initial) + print(f" Initial count: {initial_count}") + + # Step 2: Add a text source + print("\n--- Step 2: Add text source ---") + test_text = ( + "INTEGRATION TEST SOURCE - This is a temporary source created by " + "the source_manager.py integration test. It will be deleted shortly. " + f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}" + ) + if not self.add_source_text(test_text): + print("โŒ FAIL: Could not add text source") + return False + + # Step 3: Verify count increased + print("\n--- Step 3: Verify source added ---") + after_add = self.list_sources() + if after_add is None or len(after_add) != initial_count + 1: + print(f"โŒ FAIL: Expected {initial_count + 1} sources, got {len(after_add) if after_add else 'None'}") + return False + + # Find the new source name + initial_names = {s["name"] for s in initial} + new_sources = [s for s in after_add if s["name"] not in initial_names] + if not new_sources: + print("โŒ FAIL: Could not identify new source") + return False + new_name = new_sources[0]["name"] + print(f" New source: {new_name}") + + # Step 4: Read the source + print("\n--- Step 4: Read source ---") + content = self.read_source(new_name) + if not content: + print("โš ๏ธ WARN: Could not read source content (continuing)") + + # Step 5: Remove the source + print("\n--- Step 5: Remove source ---") + if not self.remove_source(new_name): + print("โŒ FAIL: Could not remove source") + return False + + # Step 6: Verify count restored + print("\n--- Step 6: Verify source removed ---") + after_remove = self.list_sources() + if after_remove is None or len(after_remove) != initial_count: + print(f"โŒ FAIL: Expected {initial_count} sources, got {len(after_remove) if after_remove else 'None'}") + return False + + print(f"\nโœ… ALL TESTS PASSED ({initial_count} โ†’ {initial_count + 1} โ†’ {initial_count})") + return True + + +# ------------------------------------------------------------------ # +# CLI +# ------------------------------------------------------------------ # + + +def resolve_notebook_url(args) -> Optional[str]: + """Resolve notebook URL from args, matching ask_question.py pattern.""" + if hasattr(args, "notebook_url") and args.notebook_url: + return args.notebook_url + + if hasattr(args, "notebook_id") and args.notebook_id: + library = NotebookLibrary() + notebook = library.get_notebook(args.notebook_id) + if notebook: + return notebook["url"] + print(f"โŒ Notebook '{args.notebook_id}' not found") + return None + + # Try active notebook + library = NotebookLibrary() + active = library.get_active_notebook() + if active: + print(f"๐Ÿ“š Using active notebook: {active['name']}") + return active["url"] + + # Show available + notebooks = library.list_notebooks() + if notebooks: + print("\n๐Ÿ“š Available notebooks:") + for nb in notebooks: + mark = " [ACTIVE]" if nb.get("id") == library.active_notebook_id else "" + print(f" {nb['id']}: {nb['name']}{mark}") + print("\nSpecify with --notebook-id or --notebook-url") + else: + print("โŒ No notebooks in library. Add one first with notebook_manager.py") + return None + + +def add_common_args(parser): + """Add notebook selection and browser args shared by all subcommands.""" + parser.add_argument("--notebook-url", help="NotebookLM notebook URL") + parser.add_argument("--notebook-id", help="Notebook ID from library") + parser.add_argument("--show-browser", action="store_true", help="Show browser window") + + +def main(): + parser = argparse.ArgumentParser(description="Manage NotebookLM sources") + subparsers = parser.add_subparsers(dest="command", help="Commands") + + # list + list_parser = subparsers.add_parser("list", help="List sources in a notebook") + add_common_args(list_parser) + + # add-text + add_text_parser = subparsers.add_parser("add-text", help="Add copied text as source") + add_text_parser.add_argument("--text", help="Text content to add") + add_text_parser.add_argument( + "--from-file", help="Read text from a local file instead" + ) + add_common_args(add_text_parser) + + # add-website + add_web_parser = subparsers.add_parser( + "add-website", help="Add website/YouTube URLs as sources" + ) + add_web_parser.add_argument( + "--urls", required=True, help="Comma-separated URLs" + ) + add_common_args(add_web_parser) + + # add-file + add_file_parser = subparsers.add_parser("add-file", help="Upload a file as source") + add_file_parser.add_argument("--file", required=True, help="Path to file") + add_common_args(add_file_parser) + + # read + read_parser = subparsers.add_parser("read", help="Read a source's content") + read_parser.add_argument("--name", required=True, help="Source name") + add_common_args(read_parser) + + # rename + rename_parser = subparsers.add_parser("rename", help="Rename a source") + rename_parser.add_argument("--name", required=True, help="Current source name") + rename_parser.add_argument("--title", required=True, help="New name") + add_common_args(rename_parser) + + # select + select_parser = subparsers.add_parser( + "select", help="Select specific sources for querying" + ) + select_parser.add_argument( + "--names", help="Comma-separated source names to select (others deselected)" + ) + select_parser.add_argument( + "--all", action="store_true", dest="select_all", help="Select all sources" + ) + add_common_args(select_parser) + + # remove + remove_parser = subparsers.add_parser("remove", help="Remove a source") + remove_parser.add_argument("--name", required=True, help="Source name") + add_common_args(remove_parser) + + # test + test_parser = subparsers.add_parser( + "test", help="Run integration test (add/read/remove)" + ) + add_common_args(test_parser) + + args = parser.parse_args() + + if not args.command: + parser.print_help() + return 1 + + # Resolve notebook + notebook_url = resolve_notebook_url(args) + if not notebook_url: + return 1 + + headless = not args.show_browser + mgr = SourceManager(notebook_url, headless=headless) + + if args.command == "list": + result = mgr.list_sources() + return 0 if result is not None else 1 + + elif args.command == "add-text": + text = args.text + if not text and args.from_file: + path = Path(args.from_file) + if not path.exists(): + print(f"โŒ File not found: {args.from_file}") + return 1 + text = path.read_text() + if not text: + print("โŒ Provide --text or --from-file") + return 1 + return 0 if mgr.add_source_text(text) else 1 + + elif args.command == "add-website": + urls = [u.strip() for u in args.urls.split(",")] + return 0 if mgr.add_source_website(urls) else 1 + + elif args.command == "add-file": + return 0 if mgr.add_source_file(args.file) else 1 + + elif args.command == "read": + content = mgr.read_source(args.name) + if content: + print("\n" + "=" * 60) + print(content) + print("=" * 60) + return 0 + return 1 + + elif args.command == "rename": + return 0 if mgr.rename_source(args.name, args.title) else 1 + + elif args.command == "select": + if args.select_all: + return 0 if mgr.select_all_sources() else 1 + elif args.names: + names = [n.strip() for n in args.names.split(",")] + return 0 if mgr.select_sources(names) else 1 + else: + print("โŒ Provide --names or --all") + return 1 + + elif args.command == "remove": + return 0 if mgr.remove_source(args.name) else 1 + + elif args.command == "test": + return 0 if mgr.run_test() else 1 + + return 1 + + +if __name__ == "__main__": + sys.exit(main())