Status: Active Last Updated: 2026-05-15
Complete reference for the notebooklm Python library.
See also:
- Architecture Guide for structural overview, capability protocols, and transport design.
- RPC Development Guide for custom RPC design, protocols, and mock assertions.
import asyncio
from notebooklm import NotebookLMClient
async def main():
# Create client from saved authentication
async with NotebookLMClient.from_storage() as client:
# List notebooks
notebooks = await client.notebooks.list()
print(f"Found {len(notebooks)} notebooks")
# Create a new notebook
nb = await client.notebooks.create("My Research")
print(f"Created: {nb.id}")
# Add sources
await client.sources.add_url(nb.id, "https://example.com/article")
# Ask a question
result = await client.chat.ask(nb.id, "Summarize the main points")
print(result.answer)
# Generate a podcast
status = await client.artifacts.generate_audio(nb.id)
await client.artifacts.wait_for_completion(nb.id, status.task_id)
output_path = await client.artifacts.download_audio(nb.id, "podcast.mp3")
print(f"Audio saved to: {output_path}")
asyncio.run(main())NotebookLMClient is async re-entrant on a single event loop. You can freely await multiple operations concurrently via asyncio.gather or asyncio.TaskGroup:
notebooks, sources = await asyncio.gather(
client.notebooks.list(),
client.sources.list(notebook_id),
)The client is not thread-safe. Do not share a NotebookLMClient across threads or across multiple event loops. Create one client per loop. A loop-affinity guard raises a clear RuntimeError on the authed POST hot path if you do — see the Concurrency contract section below for the full guarantees, non-guarantees, and production patterns.
If we ever provide thread-safety, it will be a versioned, opt-in API change. Do not assume it.
The client must be used as an async context manager to properly manage HTTP connections:
# Canonical idiom (v0.5.0+) - no `await` on `from_storage`.
async with NotebookLMClient.from_storage() as client:
...
# Legacy idiom (deprecated, removed in v1.0) - works but emits
# DeprecationWarning. Drop the `await` to migrate.
async with await NotebookLMClient.from_storage() as client:
...
# Manual management - still works; the await emits DeprecationWarning.
# Migrate to `async with NotebookLMClient.from_storage()` instead.
client = await NotebookLMClient.from_storage()
await client.__aenter__()
try:
...
finally:
await client.__aexit__(None, None, None)The client requires valid Google session cookies obtained via browser login:
# From storage file (recommended) — use as an async context manager:
async with NotebookLMClient.from_storage() as client:
...
async with NotebookLMClient.from_storage("/path/to/storage_state.json") as client:
...
# From a named profile
async with NotebookLMClient.from_storage(profile="work") as client:
...
# From AuthTokens directly
from notebooklm import AuthTokens
auth = AuthTokens(
cookies={"SID": "...", "HSID": "..."}, # (other cookies elided for brevity)
csrf_token="...",
session_id="..."
)
client = NotebookLMClient(auth)
# AuthTokens also supports profiles (AuthTokens.from_storage is async)
auth = await AuthTokens.from_storage(profile="work")Building a storage state from existing browser cookies ([cookies] extra):
Install with the optional cookies extra to pull cookies from a locally installed browser via rookiepy — useful for headless environments where you cannot run Playwright (full extras matrix: docs/installation.md#optional-extras-matrix):
pip install "notebooklm-py[cookies]"import json
import os
import rookiepy
from notebooklm import NotebookLMClient
from notebooklm.auth import (
REQUIRED_COOKIE_DOMAINS,
convert_rookiepy_cookies_to_storage_state,
)
# Pull Google cookies from Chrome (or .firefox(), .edge(), .safari(), .load() for auto-detect).
# REQUIRED_COOKIE_DOMAINS mirrors the CLI's extraction set so rotation, media
# downloads, and Drive flows all have the cookies they need.
raw = rookiepy.chrome(domains=list(REQUIRED_COOKIE_DOMAINS))
storage_state = convert_rookiepy_cookies_to_storage_state(raw)
# Persist for future runs; restrict to owner-only on POSIX since this file holds auth cookies
storage_path = "/path/to/storage_state.json"
with open(storage_path, "w") as f:
json.dump(storage_state, f)
if os.name != "nt":
os.chmod(storage_path, 0o600)
async with NotebookLMClient.from_storage(storage_path) as client:
notebooks = await client.notebooks.list()convert_rookiepy_cookies_to_storage_state(rookiepy_cookies) converts the
cookie list returned by rookiepy into the storage-state format
NotebookLMClient.from_storage() expects:
- Key remap:
http_only→httpOnly,expires=None→expires=-1(Playwright's session-cookie convention),sameSite="None". - Filtering: cookies missing
name/value/domain, or from domains outside the auth allowlist (regional Google ccTLDs +REQUIRED_COOKIE_DOMAINS∪OPTIONAL_COOKIE_DOMAINS), are silently skipped. - Return:
{"cookies": [...], "origins": []}— drop straight intostorage_state.json.
Cookie extraction (and Google-account selection) happens in the
rookiepy.<browser>(...) call: the storage state reflects whichever Google
account is currently active in the source browser. To pick up cookies for
optional surfaces (YouTube, Docs, MyAccount, Mail), extend the rookiepy
domains= argument with OPTIONAL_COOKIE_DOMAINS (or a label-specific
subset via OPTIONAL_COOKIE_DOMAINS_BY_LABEL) — both imported from
notebooklm.auth alongside REQUIRED_COOKIE_DOMAINS. The CLI equivalent
is notebooklm login --browser-cookies <browser> [--include-domains youtube,docs,...].
Environment Variable Support:
The library respects these environment variables for authentication:
| Variable | Description |
|---|---|
NOTEBOOKLM_HOME |
Base directory for config files (default: ~/.notebooklm) |
NOTEBOOKLM_PROFILE |
Active profile name (default: default) |
NOTEBOOKLM_AUTH_JSON |
Inline auth JSON - no file needed (for CI/CD) |
Precedence (highest to lowest):
- Explicit
pathargument tofrom_storage() NOTEBOOKLM_AUTH_JSONenvironment variable- Explicit
profileargument tofrom_storage(profile="work") NOTEBOOKLM_PROFILEenvironment variable (resolves to~/.notebooklm/profiles/<name>/storage_state.json)- Active profile from
~/.notebooklm/active_profile ~/.notebooklm/profiles/default/storage_state.json~/.notebooklm/storage_state.json(legacy fallback)
CI/CD Example:
import os
# Set auth JSON from environment (e.g., GitHub Actions secret)
os.environ["NOTEBOOKLM_AUTH_JSON"] = '{"cookies": [...]}'
# Client automatically uses the env var
async with NotebookLMClient.from_storage() as client:
notebooks = await client.notebooks.list()The library raises RPCError for API failures:
from notebooklm import RPCError
try:
result = await client.notebooks.create("Test")
except RPCError as e:
print(f"RPC failed: {e}")
# Common causes:
# - Session expired (re-run `notebooklm login`)
# - Rate limited (wait and retry)
# - Invalid parametersAll library exceptions inherit from NotebookLMError. RPC/protocol-level
failures live under RPCError; per-domain failures live under
NotebookError, SourceError, ArtifactError, etc. (NetworkError is
deliberately outside RPCError — it represents transport-level failures
that happen before any RPC is dispatched.) The three "not found" exceptions
sit at the intersection — they're catchable as any of NotFoundError
(cross-domain umbrella), RPCError, or the domain base:
| Exception | Catchable as |
|---|---|
NotebookNotFoundError |
NotFoundError, RPCError, NotebookError, NotebookLMError |
SourceNotFoundError |
NotFoundError, RPCError, SourceError, NotebookLMError |
ArtifactNotFoundError |
NotFoundError, RPCError, ArtifactError, NotebookLMError |
NoteNotFoundError |
NotFoundError, RPCError, NoteError, NotebookLMError |
MindMapNotFoundError |
NotFoundError, RPCError, MindMapError, NotebookLMError |
ArtifactFeatureUnavailableError |
RPCError, ArtifactError, NotebookLMError |
SourceTimeoutError |
WaitTimeoutError, TimeoutError, SourceError, NotebookLMError |
ArtifactTimeoutError |
WaitTimeoutError, TimeoutError, ArtifactError, NotebookLMError |
ResearchTimeoutError |
WaitTimeoutError, TimeoutError, ResearchError, NotebookLMError |
MindMapNotFoundError is raised by the client.mind_maps mutation paths
(rename) on a missing target (issue #1291). NoteNotFoundError (with its
NoteError domain base) is defined now but not raised by any method yet —
it is the prerequisite type for the note not-found work landing in v0.8.0
(issue #1346).
Use the table to pick the right level of catch. As of v0.8.0 (the #1247
flip), client.sources.get(...), client.artifacts.get(...),
client.notes.get(...), and client.mind_maps.get(...) raise the matching
*NotFoundError (SourceNotFoundError / ArtifactNotFoundError /
NoteNotFoundError / MindMapNotFoundError) on a missing entity — matching
client.notebooks.get(...), which raises NotebookNotFoundError. The previous
None-on-miss return (deprecated with a DeprecationWarning through v0.7.0) is
gone; migrate any if result is None: check to try/except <Resource>NotFoundError, or use the paired get_or_none(...) (below) for the
sanctioned None-on-miss contract. See deprecations.md and
issue #1247. client.mind_maps.get(...) was the last
namespace in the #1247 cohort without a runway; use client.mind_maps.get_or_none(...)
for the warning-free None-on-miss contract. If you genuinely want
None-on-miss after the flip, every namespace now offers a paired
get_or_none(...) (client.notebooks.get_or_none(nb_id),
client.sources.get_or_none(nb_id, source_id), and likewise for artifacts,
notes, and mind_maps) — the sanctioned, warning-free None-on-miss lookup.
It returns None for a genuine absence and re-raises transport, auth, and
decode faults rather than swallowing them. (The one documented carve-out is
artifacts, which inherits client.artifacts.list(...)'s deliberate
partial-availability behavior: a transport failure of the mind-map sub-fetch is
logged and the studio artifacts that loaded are still returned — see ADR-0019
Rule 3.) The workflows that
already raise SourceNotFoundError are client.sources.get_fulltext(...) and
client.sources.wait_until_ready(...). Artifact-download workflows raise
ArtifactNotFoundError when a requested artifact ID is not in the listing.
Artifact generation workflows may raise ArtifactFeatureUnavailableError
when NotebookLM accepts the RPC but returns no generation task for a specific
artifact feature. For infographic generation, a null CREATE_ARTIFACT result
is reported this way instead of surfacing as schema drift or a failed
GenerationStatus.
client.artifacts.wait_for_completion(...) raises
ArtifactPendingTimeoutError when a task stays queued and never reaches
in_progress, or ArtifactInProgressTimeoutError when it starts but does not
finish before timeout. Both subclass ArtifactTimeoutError and built-in
TimeoutError. The exception exposes task_id, notebook_id,
timeout_seconds, last_status, stalled_phase, status_history, and
status_transitions so callers can retry, fail soft, or log upstream queueing
patterns without parsing the message.
The CLI defaults to longer wait budgets for media generation (audio: 1200s,
video: 1800s, cinematic-video: 3600s). In Python, pass the same budget
explicitly with wait_for_completion(..., timeout=...).
WaitTimeoutError (added in v0.7.0) is the cross-domain umbrella for every
wait_* / polling timeout. It mixes in the built-in TimeoutError, so
existing except TimeoutError clauses keep working unchanged, and it is the
common base of SourceTimeoutError, ArtifactTimeoutError (and its
ArtifactPendingTimeoutError / ArtifactInProgressTimeoutError subclasses),
and ResearchTimeoutError. Catch it once to handle a wait timeout from any
domain in a single clause:
from notebooklm import WaitTimeoutError
try:
ready = await client.sources.wait_until_ready(nb_id, src_id)
status = await client.artifacts.wait_for_completion(nb_id, task_id)
result = await client.research.wait_for_completion(nb_id, research_task_id)
except WaitTimeoutError as exc:
# Catches SourceTimeoutError, ArtifactTimeoutError, ResearchTimeoutError.
log.warning("wait timed out: %s", exc)ResearchAPI.wait_for_completion previously raised the bare built-in
TimeoutError; it now raises ResearchTimeoutError, which is a
WaitTimeoutError (and therefore still a TimeoutError), so the change is
backward-compatible. The poll cadence keyword on that method is now
initial_interval= (matching the source/artifact waiters); the old interval=
keyword still works with a DeprecationWarning and is removed in v0.8.0 — see
deprecations.
NotFoundError is the cross-domain umbrella. Catch it to handle any
"resource not found" case uniformly:
from notebooklm import NotFoundError
try:
notebook = await client.notebooks.get(nb_id)
source = await client.sources.wait_until_ready(nb_id, src_id)
await client.artifacts.download_audio(nb_id, dest, audio_id)
except NotFoundError as e:
# Catches NotebookNotFoundError, SourceNotFoundError,
# and ArtifactNotFoundError uniformly.
print(f"Missing resource: {e}")Methods that raise a *NotFoundError on not-found include every namespace
get() (as of v0.8.0 — client.notebooks.get, client.sources.get,
client.artifacts.get, client.notes.get, client.mind_maps.get),
client.sources.get_fulltext, client.sources.wait_until_ready, and the
artifact download paths. For a None-on-miss lookup that does not trigger the
umbrella, use the paired get_or_none(...).
Python checks except clauses top to bottom. To get distinct handlers for
"missing resource" vs other RPC failures, list the specific subclass first:
from notebooklm import (
ArtifactNotFoundError,
NotebookNotFoundError,
RPCError,
SourceNotFoundError,
)
try:
fulltext = await client.sources.get_fulltext(notebook_id, source_id)
except SourceNotFoundError:
# Specific handler runs first.
...
except RPCError:
# Catches every other RPC failure: auth, rate limit, decode, etc.
...v0.6.0 BREAKING CHANGE. Before v0.6.0, only
NotebookNotFoundErrormixed inRPCError;SourceNotFoundErrorandArtifactNotFoundErrordid not. In 0.5.x,except RPCErrordid NOT catch a missing source or artifact, so a downstreamexcept SourceNotFoundError/except ArtifactNotFoundErrorclause caught it instead. In 0.6.0,except RPCErrornow catches all three uniformly — if it's listed first, any downstream*NotFoundErrorclauses become unreachable. Reorder yourexceptclauses to put the specific exceptions first.
Automatic Refresh: The client automatically refreshes CSRF tokens when authentication errors are detected. This happens transparently during any API call - you don't need to handle it manually.
When an RPC call fails with an auth error (HTTP 401/403 or auth-related message):
- The client fetches fresh tokens from the NotebookLM homepage
- Waits briefly to avoid rate limiting
- Retries the failed request automatically
Manual Refresh: For proactive refresh (e.g., before a long-running operation):
async with NotebookLMClient.from_storage() as client:
# Manually refresh CSRF token and session ID
await client.refresh_auth()Note: If your session cookies have fully expired (not just CSRF tokens), you'll need to re-run notebooklm login.
Probe-then-retry for create operations. When a network or server error (5xx / 429 / connection drop) interrupts a create call, the client surfaces the failure immediately rather than blindly retrying. For the methods listed below, the client then probes the server to discover whether the resource was already created before attempting a retry. This prevents duplicate resources when the server accepted the request but the response was lost in transit. The probe runs automatically — no opt-in keyword is required.
The following methods are idempotent under retry:
| Method | Probe |
|---|---|
client.notebooks.create(title) |
Snapshot notebook IDs before, list after a transport failure, return the single new notebook with the matching title (or raise on ambiguity). |
client.sources.add_url(notebook_id, url) |
List the notebook's sources, return the existing source whose url exactly matches. |
client.sources.add_url(notebook_id, youtube_url) |
Same probe via canonical YouTube URL. |
client.sources.add_text(notebook_id, title, content) is not retry-safe: text sources lack a reliable server-side dedupe key (titles aren't unique; content isn't exposed in the source list). The default behavior is unchanged from previous releases. If you want explicit failure rather than possible silent duplication on retry, opt in:
from notebooklm import NonIdempotentRetryError
try:
await client.sources.add_text(nb_id, "Title", "Content", idempotent=True)
except NonIdempotentRetryError:
# Embed a UUID in the title and dedupe client-side instead.
...client.sources.add_file(...) and client.sources.add_drive(...) are now also covered by the probe-then-create wrapper: the create RPC runs with disable_internal_retries=True and, on transport failure, the wrapper probes the server-side source list (via idempotent_create) before deciding whether to retry — so transient failures no longer produce duplicate sources. See _source/add.py (SourceAddService.add_drive) and _source/upload.py (SourceUploadPipeline.register_file_source) for the implementation.
This section is the canonical answer to "is NotebookLMClient safe to use from
multiple coroutines / threads / processes / event loops?" The concurrency model
documented here has been hardened to support high-concurrency programmatic
clients (long-running agents, parallel asyncio.gather over many notebooks,
multi-process fleets).
If you only read one subsection, read Non-guarantees — the guard rails are narrow.
Per-loop async safety. A NotebookLMClient instance is bound to the
event loop on which it was opened. A loop-affinity guard checks
the active loop on the authed POST hot path — rpc_call() →
query_post() → _perform_authed_post() — and raises a clear RuntimeError
when the instance is re-used from a different loop. Scope limitation: the
guard fires on the hot path only. ChatAPI.ask adds its own
assert_bound_loop() check as its first statement, so cross-loop chat raises the
same friendly loop-affinity RuntimeError. One cold path remains:
close()awaitssave_cookies+acloseand never routes through_perform_authed_postor a loop guard; a cross-loop close gets a deep asyncioRuntimeError— opaque, not the friendly loop-affinity message.
Best practice: one client per loop, full stop.
Refresh deduplication. Concurrent RPCs that all
trigger a token refresh share a single underlying refresh attempt via
_refresh_lock + asyncio.shield. Waiter cancellation does not kill the
shared refresh task; the next caller in line picks up the finished tokens.
Request-ID monotonicity. next_reqid() returns a monotonic
sequence across concurrent coroutines on the same client. Guarded by
_reqid_lock.
Per-attempt and across-attempt auth snapshot atomicity.
_auth_snapshot_lock serializes AuthSnapshot reads against the
refresh-side mutation block — without this, a token refresh that
completed between the URL-build step and the POST step could produce a
URL stitched together from a mix of pre- and post-refresh credentials
(stale session_id, fresh authuser, etc.), which Google rejects with
an opaque auth error. _build_url consumes the snapshot rather than
reading live session_id / authuser / account_email fields, so the
URL and the headers come from a single consistent auth tuple. (This
obsoletes the warning in the older "Concurrency model" subsection
above.)
Idempotent create RPCs. The following calls are
idempotent under retry via probe-then-create (when idempotent=True,
which is the default):
client.notebooks.create(title)client.sources.add_url(notebook_id, url)(YouTube URLs are auto-detected and routed through the YouTube source pathway internally)
client.sources.add_text(notebook_id, title, content) is declared
non-idempotent: text sources lack a reliable server-side dedupe key
(Google permits duplicate titles, and content is not exposed in source
listings). With idempotent=True it raises NonIdempotentRetryError. If
you set disable_internal_retries=True on the client, the probe-then-retry
wrapper is skipped entirely and the caller is responsible for retry
semantics.
Cancellation safety. Several paths are now shielded against cancellation:
close()is shielded; Ctrl-C during shutdown will not leak the underlyinghttpx.AsyncClient.refresh_auth()runs the shared refresh task underasyncio.shield; cancelling a waiter does not kill the shared refresh.- Upload finalize is shielded; on cancel signal we issue a best-effort Scotty (Google's internal resumable upload service) cancel to release the server-side upload slot.
notes.createshields theUPDATE_NOTEfinalize step and cleans up the partial note on cancel.wait_for_sourcescancels sibling pollers on the first poller's failure rather than letting them race to emit error messages.wait_for_completionuses a leader/follower polling-dedupe registry with a shielded leader task — follower cancellation does not kill the leader's poll.
Idempotent file uploads. SourcesAPI.add_file closes its file handle
under a TOCTOU-safe path and gates concurrent uploads via the
max_concurrent_uploads semaphore so a large fan-out can't exhaust the
per-process file descriptor limit.
NOT thread-safe. A NotebookLMClient instance must not be shared
across OS threads. The internal locks (_refresh_lock, _reqid_lock,
_auth_snapshot_lock) are asyncio.Lock instances and do not protect
against concurrent OS-thread access. If you need a client per thread,
construct one per thread.
NOT reusable across event loops. Per the loop-affinity guard above,
the hot path raises RuntimeError when an instance is re-used on a
different loop. Cold paths (next_reqid(), close()) raise an opaque
asyncio RuntimeError instead — same outcome, less helpful message.
ChatAPI._cache is per-instance. Chat-conversation IDs cached
inside a NotebookLMClient (on the client.chat sub-client) are not
shared across clients in the same process and never persisted across
processes. Two clients pointed at the same notebook will not share
follow-up context.
Cookies in storage are eventually-consistent across processes. When
multiple processes share a storage path, an OS-level file lock plus a
snapshot/delta merge (see docs/auth-cookie-lifecycle.md §3.4) keep concurrent
writers from corrupting the file. They may, however, observe brief
staleness — a write committed by process A may not be visible to a
sibling read in process B until the next refresh cycle. Within a single
process, in-process dedupe ensures only one keepalive task runs per
canonicalized storage path.
One client per app, dependency-injected. A NotebookLMClient is
designed to be a long-lived process resource. In FastAPI, attach it to
the app lifespan:
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, Request
from notebooklm import NotebookLMClient
@asynccontextmanager
async def lifespan(app: FastAPI):
async with NotebookLMClient.from_storage() as client:
app.state.notebooklm = client
yield
# client.close() happens via __aexit__
def get_client(request: Request) -> NotebookLMClient:
return request.app.state.notebooklm
app = FastAPI(lifespan=lifespan)
@app.get("/notebooks")
async def list_notebooks(client: NotebookLMClient = Depends(get_client)):
return await client.notebooks.list()Constraint: FastAPI runs on a single event loop per worker, so one client
per worker is correct. If you run multiple Uvicorn workers, each worker
owns its own client. Do not stash a NotebookLMClient on a
process-global outside the lifespan — multi-worker servers fork the
process and you will end up with the same client object referencing
different event loops.
ConnectionLimits tuning. The HTTP pool defaults
(max_connections=100, max_keepalive_connections=50,
keepalive_expiry=30.0) are sized for typical batchexecute fan-out: a
few dozen concurrent RPCs against a single host with keep-alives held
for an interactive session. Tune via notebooklm.types.ConnectionLimits:
from notebooklm import NotebookLMClient
from notebooklm.types import ConnectionLimits
limits = ConnectionLimits(
max_connections=200, # widen the pool for a heavy worker
max_keepalive_connections=100,
keepalive_expiry=60.0,
)
client = NotebookLMClient(auth, limits=limits, max_concurrent_rpcs=64)For single-request CLI workloads the defaults are wasteful but harmless.
max_concurrent_rpcs knob. A semaphore at
_perform_authed_post caps simultaneous in-flight RPC POSTs. Default
16 — well below the default pool size so short-lived helper requests
(refresh GETs, upload preflights) still have pool headroom. Pass None
to opt out entirely (e.g. when an external rate-limiter handles
back-pressure). The backoff for 429 / 5xx retries is held inside the
semaphore for a circuit-breaker effect: a slow request keeps its slot
while it waits, so the gate naturally throttles fan-out when the server
is unhappy.
Worst-case slot hold time:
| Path | Bound | Default |
|---|---|---|
| 429 retry loop | rate_limit_max_retries × MAX_RETRY_AFTER_SECONDS |
3 × 300 = 900s |
| 5xx / network retry loop | server_error_max_retries × 30s (capped backoff) |
3 × 30 = 90s |
If your workload's tail latency is sensitive, lower
rate_limit_max_retries or tighten the semaphore — slot hold time on
the 429 path is the load-bearing variable.
Constraint (enforced at construction): max_concurrent_rpcs ≤ ConnectionLimits.max_connections. A higher RPC ceiling than the pool
capacity would let the semaphore admit requests the pool can't fulfill,
producing opaque httpx.PoolTimeout errors instead of clean
back-pressure. The NotebookLMClient.__init__ / from_storage()
constructor raises ValueError if this constraint is violated. The
semaphore floor (max_concurrent_rpcs ≥ 1 when not None) is enforced
inside Session.
max_concurrent_uploads knob. Default 4. Gates
file-upload streaming independently from the RPC throttle because
uploads use their own httpx.AsyncClient (Scotty endpoint) and do not
share the RPC connection pool. The motivation is FD exhaustion: each
in-flight upload holds one open file descriptor for the duration of the
upload, so an unbounded fan-out blows the per-process FD limit. None
resolves to the default (4); truly unbounded uploads are intentionally
not supported. Must be ≥ 1 when set explicitly.
Rate-limit retry defaults. rate_limit_max_retries=3,
server_error_max_retries=3. The 429 path honors the Retry-After
header when parseable (clamped at MAX_RETRY_AFTER_SECONDS = 300s);
when the header is absent or unparseable, the loop falls back to
exponential backoff min(2^attempt, 30) seconds with ±20% jitter
(where attempt starts at 0, so the first retry sleeps ~1 s ± 20%
before doubling), matching the 5xx path. Set either to 0 to restore
the pre-retry-loop behavior of raising RateLimitError / ServerError
immediately.
Observability hooks. The client exposes stdlib-only observability so applications can choose their own metrics backend:
from notebooklm import NotebookLMClient, correlation_id
events = []
async with NotebookLMClient.from_storage(on_rpc_event=events.append) as client:
with correlation_id("batch-import-42"):
await client.notebooks.list()
snapshot = client.metrics_snapshot()
print(snapshot.rpc_calls_succeeded, snapshot.rpc_queue_wait_seconds_max)on_rpc_event receives a RpcTelemetryEvent for each logical RPC
completion. metrics_snapshot() returns cumulative counters for RPC
success/failure, retry counts, semaphore queue waits, upload queue waits,
and internal lock wait time. The package does not depend on Prometheus or
OpenTelemetry; forward these values to whichever backend your service uses.
Graceful shutdown. Long-lived services can stop admitting new client operations and wait for in-flight operations before closing:
await client.close(drain=True, drain_timeout=30.0)client.drain(timeout=...) is also available when your framework owns
transport shutdown separately. Once drain starts, new operations raise
RuntimeError; if the timeout expires, the client remains in draining mode.
close(drain=True, ...) still closes the transport after a drain timeout and
then re-raises the timeout.
Upload-timeout configuration. client.sources.add_file(...)
and the related upload entry points accept an upload_timeout argument
that is decoupled from the global timeout. A long-running upload of
a large file should not have to widen the global HTTP timeout to
succeed; pass upload_timeout=600.0 (or larger) to the relevant call
sites instead.
For a service that handles multiple NotebookLM tenants (different
AuthTokens, typically one per user), spin up one
NotebookLMClient per tenant. There is no cross-tenant
ChatAPI._cache bleed (the cache is per-instance), and the
loop-affinity guard plus the per-instance refresh state means tenants
cannot accidentally observe each other's auth.
Cookie storage paths must be canonicalized so two clients pointing at the same logical storage file don't run racing keepalive loops; the keepalive code path handles this automatically.
These validations run in NotebookLMClient.__init__ /
NotebookLMClient.from_storage(). All raise ValueError:
max_concurrent_rpcs ≤ ConnectionLimits.max_connectionswhen both are set (skipped when either isNone).max_concurrent_rpcs ≥ 1when notNone.max_concurrent_uploads ≥ 1when notNone.rate_limit_max_retries ≥ 0.server_error_max_retries ≥ 0.keepalivemust beNoneor a positive finite number; values belowkeepalive_min_interval(default60s) are clamped up to that floor.
Kernel owns the httpx.AsyncClient; NotebookLMClient constructs the
runtime graph and owns the public surface. Per the
ADR-0010 split, Kernel.__init__ in
src/notebooklm/_kernel.py constructs the httpx.AsyncClient and is
responsible for closing it on aclose(). _runtime/init.py constructs
the collaborator bundle, RuntimeTransport, middleware chain, and
RpcExecutor, then binds them into ClientComposed. The supporting state
(metrics, drain bookkeeping, request-id counter, transport plumbing,
conversation cache, etc.) is split across single-responsibility runtime
and kernel collaborator modules such as notebooklm._rpc_executor,
notebooklm._transport_drain, and notebooklm._transport_errors. The
split is internal — module-level constants and helpers live in canonical
seam modules (_runtime_config, _runtime_helpers, _error_injection,
_request_types, _transport_errors, _streaming_post) and are imported
from those modules directly. The historical notebooklm._core
compatibility shim was removed in v0.5.0.
| Module | Owns | Notes |
|---|---|---|
_client_composed |
ClientComposed: bound runtime holder for transport, executor, middleware chain metadata, and the collaborator bundle. |
The composition root binds this once; public methods read the bound collaborators from the client. |
_kernel |
Concrete Kernel transport core; owns the httpx.AsyncClient (constructed in Kernel.__init__, closed in Kernel.aclose()) and the cookie jar. |
Pure transport surface (see Kernel Protocol in _runtime_contracts). |
_runtime_init |
Client composition root helpers: constructor validation, collaborator construction, RuntimeTransport, middleware chain, and RpcExecutor wiring. |
NotebookLMClient calls this during construction and stores the result directly. |
_runtime_transport |
Authenticated transport leg used by RpcExecutor and the middleware chain terminal. |
Routes through Kernel.post and centralizes request-envelope materialization. |
_runtime_config |
Module-level constants: DEFAULT_TIMEOUT, DEFAULT_CHAT_TIMEOUT, DEFAULT_KEEPALIVE_MIN_INTERVAL, DEFAULT_MAX_CONCURRENT_RPCS, DEFAULT_MAX_CONCURRENT_UPLOADS, CORE_LOGGER_NAME, normalize_max_concurrent_uploads. |
Pure constants; importable without side effects. |
_runtime_helpers |
is_auth_error, AUTH_ERROR_PATTERNS, _resolve_keepalive_interval. |
Cross-seam pure helpers; behaviour-bearing (and therefore unit-tested). |
_error_injection |
ERROR_INJECT_ENV_VAR, _get_error_injection_mode, _refuse_synthetic_error_outside_test_context. |
Env-var resolver + startup guard for the synthetic-error harness. |
_runtime_auth |
AuthRefreshCoordinator: refresh-task lifecycle, refresh lock, AuthSnapshot rotation. |
Lazy asyncio.Lock construction; never instantiated outside a running loop. |
_conversation_cache |
Per-instance true-LRU _conversation_cache for ChatAPI continuity; bounds the conversation count and the turns retained per conversation. |
Pure in-process state; not shared across client instances. |
_cookie_persistence |
Cookie-jar → storage-state serialization, __Secure-1PSIDTS rotation. |
Exposes a SaveCookiesToStorage Protocol host. |
_transport_drain |
TransportDrainTracker: in-flight transport counters, _TransportOperationToken, lazy asyncio.Condition powering client.drain(...). |
Construction is event-loop-agnostic; the Condition is allocated on first use. |
_runtime_lifecycle |
ClientLifecycle: loop-affinity guard, aclose plumbing, keepalive task wiring. |
Client lifecycle collaborator. |
_client_metrics |
ClientMetrics: ClientMetricsSnapshot counters, _metrics_lock, on_rpc_event callback, queue-wait recorders. |
__init__ is event-loop-agnostic; emit_rpc_event is async and intentionally awaits the user callback (back-pressure). |
_polling_registry |
Pending-poll registry shared by long-running artifact generations. | Used by artifacts to coordinate and cancel pending polls. |
_reqid_counter |
ReqidCounter: monotonic _reqid for the chat backend, lazy asyncio.Lock for concurrent ChatAPI.ask callers. |
Baseline _value=100000, default step=100000 — both are chat-API contract values; do not change. |
_rpc_executor |
RPC dispatch executor; exposes DecodeResponse Protocol so callers can be unit-tested against a stub. |
NotebookLMClient.rpc_call dispatches here directly. |
_request_types |
AuthSnapshot, BuildRequest, BuildRequestResult, and request materialization helpers. |
Shared request Interface for RPC, chat, auth refresh, and the chain terminal. |
_transport_errors |
Transport exceptions, Retry-After parsing, and raw Kernel.post error mapping. |
Keeps terminal error mapping out of Kernel callers and lets the middleware chain consume a narrow exception Interface. |
_streaming_post |
Streaming POST helper with the response-size cap. | Keeps low-level buffered HTTP read behavior local to the Kernel.post implementation. |
Feature APIs depend on narrow per-capability Protocols defined in
notebooklm._runtime_contracts rather than on a broad runtime facade.
ChatAPI, ArtifactsAPI, and SourceUploadPipeline each take
their direct collaborators by keyword-only constructor argument. The
feature-local composite-runtime Protocols (ChatRuntime,
ArtifactsRuntime, UploadRuntime) and their adapter dataclasses that
previously bundled three collaborators apiece were retired once it was
clear they only hid three stable collaborators with one production
satisfier.
See ADR-0013 and
docs/architecture.md for the rationale and the
post-v0.5.0 collaborator graph.
If you previously imported from notebooklm._core modules, see
docs/refactor-history.md for the
Tier 12 → Tier 13 rename table. The notebooklm._core compatibility
shim was removed in v0.5.0; first-party callers should import directly
from the canonical seam modules (_runtime_config, _runtime_helpers,
_request_types, _transport_errors, _streaming_post, _error_injection,
_transport_drain, etc.).
Main client class providing access to all APIs.
class NotebookLMClient:
notebooks: NotebooksAPI # Notebook operations
sources: SourcesAPI # Source management
artifacts: ArtifactsAPI # Artifact operations (audio, video, reports, etc.)
chat: ChatAPI # Conversations
research: ResearchAPI # Web/Drive research
notes: NotesAPI # User notes
settings: SettingsAPI # User settings (language, etc.)
sharing: SharingAPI # Notebook sharing
labels: LabelsAPI # Source labels (topic grouping)
auth: AuthTokens # Current authentication tokens
is_connected: bool # Connection state
@classmethod
def from_storage(
cls, path: str | None = None, timeout: float = 30.0,
profile: str | None = None,
keepalive: float | None = None,
keepalive_min_interval: float = 60.0,
rate_limit_max_retries: int = 3,
server_error_max_retries: int = 3,
limits: ConnectionLimits | None = None,
max_concurrent_uploads: int | None = DEFAULT_MAX_CONCURRENT_UPLOADS, # 4
max_concurrent_rpcs: int | None = DEFAULT_MAX_CONCURRENT_RPCS, # 16
upload_timeout: httpx.Timeout | None = None,
on_rpc_event: Callable[[RpcTelemetryEvent], object] | None = None,
chat_timeout: float | None = DEFAULT_CHAT_TIMEOUT, # 180
) -> "_FromStorageContext":
# Returns an awaitable async-context-manager wrapper. Use as
# `async with NotebookLMClient.from_storage(...) as client:`.
# Awaiting it directly (legacy) emits DeprecationWarning;
# removed in v1.0.
def __init__(
self, auth: AuthTokens, timeout: float = 30.0,
storage_path: Path | None = None,
keepalive: float | None = None,
keepalive_min_interval: float = 60.0,
rate_limit_max_retries: int = 3,
server_error_max_retries: int = 3,
limits: ConnectionLimits | None = None,
max_concurrent_uploads: int | None = DEFAULT_MAX_CONCURRENT_UPLOADS, # 4
max_concurrent_rpcs: int | None = DEFAULT_MAX_CONCURRENT_RPCS, # 16
upload_timeout: httpx.Timeout | None = None,
on_rpc_event: Callable[[RpcTelemetryEvent], object] | None = None,
cookie_saver: CookieSaver | None = None,
cookie_rotator: CookieRotator | None = None,
chat_timeout: float | None = DEFAULT_CHAT_TIMEOUT, # 180
):
async def refresh_auth(self) -> AuthTokens:
async def rpc_call(
self,
method: RPCMethod,
params: list[Any],
allow_null: bool = False,
*,
disable_internal_retries: bool = False,
) -> Any:RPCMethod is imported from notebooklm.rpc for raw-RPC calls; Any is
typing.Any. The default-shape call (client.rpc_call(method, params))
forwards to the underlying RpcExecutor.rpc_call with its canonical
defaults.
Removed in v0.6.0. The three previously-deprecated kwargs (
source_path,_is_retry,operation_variant) were removed after their v0.5.0 deprecation cycle. The default-shape call (client.rpc_call(method, params)) is unchanged. There is no public replacement for the internal-only_is_retry/operation_variantkwargs; callers that need a non-"/"source_pathshould request a typed sub-client method rather than reach across this wrapper. Seedocs/deprecations.mdfor the canonical removal table.
Long-lived clients: pass keepalive=<seconds> to spawn a background task
that periodically pokes accounts.google.com and persists any rotated
__Secure-1PSIDTS cookie to storage_state.json. This keeps a worker /
agent / long-running async with block from silently staling out. Disabled
by default (keepalive=None). Values below keepalive_min_interval (default
60.0) are clamped up to that floor. See Cookie freshness for long-running
/ unattended use
for the full layered story.
Retry behavior: the client retries transient failures transparently.
server_error_max_retries(default3) retries HTTP 5xx and network-layerhttpx.RequestError(timeouts, connect errors) with exponential backoff capped at 30 seconds (min(2 ** attempt, 30), plus ±20% jitter to desynchronize concurrent retries). Set to0to disable.rate_limit_max_retries(default3) retries HTTP 429 responses. Each retry sleeps for the server'sRetry-Aftervalue when parseable; otherwise the loop falls back to the same capped-exponential-backoff schedule used for 5xx (min(2 ** attempt, 30)seconds with ±20% jitter) so the positive default is still useful when Google omits the hint. Set to0to raiseRateLimitErrorimmediately (e.g. when the calling code implements its own bespoke back-off policy). Mutating create RPCs (notebooks.create,sources.add_url) opt out of this loop viadisable_internal_retriesso the API-layeridempotent_createprobe-then-retry wrapper can own recovery for mutating calls.limitsaccepts aConnectionLimitsdataclass to tune the underlyinghttpxconnection pool. The default (ConnectionLimits()) setsmax_connections=100,max_keepalive_connections=50,keepalive_expiry=30.0— sized for typical batchexecute fan-out. Widen for heavy concurrent workloads such as FastAPI/Django services that share one client across many requests.
from notebooklm import ConnectionLimits, NotebookLMClient
# Default ``rate_limit_max_retries=3`` is on; widen the pool for a heavy worker
async with NotebookLMClient.from_storage(
limits=ConnectionLimits(max_connections=200, max_keepalive_connections=100),
) as client:
...
# Opt out of automatic 429 retries (e.g. for a bespoke back-off layer)
async with NotebookLMClient.from_storage(rate_limit_max_retries=0) as client:
...v0.7.0 breaking change —
delete()/rename()returns (issues #1211, #1255). Applies tonotebooks,sources,artifacts,notes, andmind_maps:
delete()returnsNone(was a hardcodedTrue).True → Noneflips truthy → falsy, soif await client.X.delete(id): ...no longer enters its block — drop theifand calldelete()for its effect.delete()is idempotent: deleting an already-absent target succeeds (returnsNone) and does not raise*NotFoundError; real failures (403/5xx/auth/transport) still raise. Useget()first to assert existence.rename()returns the renamed object and raises*NotFoundError(MindMapNotFoundErrorfor mind maps) on a missing target. Passreturn_object=Falseto skip the hydrate re-fetch and returnNone. Fornotebooks/sources/artifacts, missing-target detection rides on that hydrate re-fetch, soreturn_object=Falsealso skips it (a missing target does not raise under the opt-out). Mind maps are the exception: they detect absence via a content/list lookup before dispatching the rename RPC (never a transport 404), somind_maps.renameraisesMindMapNotFoundErroron a missing target even withreturn_object=False.
CLI equivalent: Notebook Commands — notebooklm list, create, delete, rename, summary.
| Method | Parameters | Returns | Description |
|---|---|---|---|
list() |
- | list[Notebook] |
List all notebooks |
create(title) |
title: str |
Notebook |
Create a notebook |
get(notebook_id) |
notebook_id: str |
Notebook |
Get notebook details |
delete(notebook_id) |
notebook_id: str |
None |
Delete a notebook (idempotent; returns None whether or not it existed) |
rename(notebook_id, new_title) |
notebook_id: str, new_title: str |
Notebook |
Rename a notebook (re-fetched; raises NotebookNotFoundError if missing) |
get_description(notebook_id) |
notebook_id: str |
NotebookDescription |
Get AI summary and topics |
get_metadata(notebook_id) |
notebook_id: str |
NotebookMetadata |
Get notebook metadata and sources |
get_summary(notebook_id) |
notebook_id: str |
str |
Get raw summary text |
share(notebook_id, public=True, artifact_id=None) |
notebook_id: str, bool, str | None |
dict |
Deprecated; use client.sharing.set_public() for notebook-level public sharing |
get_share_url(notebook_id, artifact_id=None) |
notebook_id: str, str | None |
str |
Get a share URL |
remove_from_recent(notebook_id) |
notebook_id: str |
None |
Remove from recently viewed |
get_raw(notebook_id) |
notebook_id: str |
Any |
Get raw API response data |
Example:
# List all notebooks
notebooks = await client.notebooks.list()
for nb in notebooks:
print(f"{nb.id}: {nb.title} ({nb.sources_count} sources)")
# Create and rename
nb = await client.notebooks.create("Draft")
nb = await client.notebooks.rename(nb.id, "Final Version")
# Get AI-generated description (parsed with suggested topics)
desc = await client.notebooks.get_description(nb.id)
print(desc.summary)
for topic in desc.suggested_topics:
print(f" - {topic.question}")
# Get raw summary text (unparsed)
summary = await client.notebooks.get_summary(nb.id)
print(summary)
# Get metadata for automation or exports
metadata = await client.notebooks.get_metadata(nb.id)
print(metadata.title)
# Enable public sharing and fetch the URL
await client.sharing.set_public(nb.id, public=True)
url = await client.notebooks.get_share_url(nb.id)
print(url)get_summary vs get_description:
get_summary()returns the raw summary text stringget_description()returns aNotebookDescriptionobject with the parsed summary and a list ofSuggestedTopicobjects for suggested questions
CLI equivalent: Source Commands — notebooklm source add, list, get, fulltext, guide, rename, refresh, delete, wait.
| Method | Parameters | Returns | Description |
|---|---|---|---|
list(notebook_id, strict=False) |
notebook_id: str, strict: bool = False |
list[Source] |
List sources |
get(notebook_id, source_id) |
str, str |
Source | None |
Get source details (returns None if not found) |
get_fulltext(notebook_id, source_id, *, output_format="text") |
str, str, *, output_format: Literal["text", "markdown"] |
SourceFulltext |
Get full content; "markdown" requires the optional markdownify extra |
get_guide(notebook_id, source_id) |
str, str |
SourceGuide |
Get AI-generated summary + keywords. Use attribute access (guide.summary); legacy guide["summary"] dict-subscript still works (with a DeprecationWarning) until v0.8.0. |
add_url(notebook_id, url, *, wait=False, wait_timeout=120.0) |
str, str, *, bool, float |
Source |
Add URL source (autodetects YouTube URLs and routes them appropriately). wait / wait_timeout are keyword-only (the positional-wait shim was removed in v0.7.0). |
add_text(notebook_id, title, content, *, wait=False, wait_timeout=120.0, idempotent=False) |
str, str, str, *, bool, float, bool |
Source |
Add text content. wait / wait_timeout are keyword-only (the positional-wait shim was removed in v0.7.0). |
add_file(notebook_id, file_path, mime_type=None, *, wait=False, wait_timeout=120.0, title=None, on_progress=None) |
str, str | Path, str | None, *, bool, float, str | None, Callable | None |
Source |
Upload file. mime_type is a supported parameter — it overrides filename-extension inference to set the resumable-upload content-type header (omit it to infer from the extension). wait / wait_timeout are keyword-only (the positional-wait shim was removed in v0.7.0). title sets the display name via a post-upload UPDATE_SOURCE and forces a brief registration wait even when wait=False. on_progress(bytes_sent, total_bytes) may be sync or async. |
add_drive(notebook_id, file_id, title, mime_type="application/vnd.google-apps.document", *, wait=False, wait_timeout=120.0) |
str, str, str, str, *, bool, float |
Source |
Add Google Drive doc. mime_type defaults to Google Docs; override for Slides/Sheets/PDF via DriveMimeType (see notebooklm.types). wait / wait_timeout are keyword-only (the positional-wait shim was removed in v0.7.0). |
rename(notebook_id, source_id, new_title, *, return_object=True) |
str, str, str |
Source | None |
Rename source (prefers the UPDATE_SOURCE echo, else re-fetched; raises SourceNotFoundError if missing). return_object=False returns None without hydrating. |
refresh(notebook_id, source_id) |
str, str |
bool |
Refresh URL/Drive source |
check_freshness(notebook_id, source_id) |
str, str |
bool |
Check if source needs refresh |
delete(notebook_id, source_id) |
str, str |
None |
Delete source (idempotent; returns None whether or not it existed) |
wait_until_ready(notebook_id, source_id, timeout=120.0, ...) |
str, str, float, ... |
Source |
Poll until status == READY (fully processed). Raises SourceTimeoutError/SourceProcessingError/SourceNotFoundError. |
wait_until_registered(notebook_id, source_id, timeout=30.0, ...) |
str, str, float, ... |
Source |
Poll until the source is visible server-side (any non-ERROR status). Completes quickly (seconds for typical sources); intended for narrow follow-up RPCs (e.g. UPDATE_SOURCE) that only require registration, not full processing. |
wait_for_sources(notebook_id, source_ids, timeout=120.0, **kwargs) |
str, list[str], float, ... |
list[Source] |
Wait for multiple sources to become ready in parallel. Per-source timeout; **kwargs are forwarded to wait_until_ready. |
Example:
from pathlib import Path
# Add various source types
await client.sources.add_url(nb_id, "https://example.com/article")
await client.sources.add_url(nb_id, "https://youtube.com/watch?v=...") # YouTube URLs autodetected
await client.sources.add_text(nb_id, "My Notes", "Content here...")
await client.sources.add_file(nb_id, Path("./document.pdf"))
# Upload a file with a custom display title (rename happens after upload via
# UPDATE_SOURCE — a brief registration wait runs even when wait=False so the
# rename can land). The mime_type kwarg is optional: omit it to infer the
# content-type from the filename extension, or pass it to override inference.
await client.sources.add_file(nb_id, Path("./document.pdf"), title="Q4 Strategy Memo")
# Wait for several uploads to finish processing in parallel
ids = [
(await client.sources.add_url(nb_id, "https://example.com/a")).id,
(await client.sources.add_url(nb_id, "https://example.com/b")).id,
]
ready = await client.sources.wait_for_sources(nb_id, ids, timeout=180)
# Narrow wait: only block until the source is visible server-side (not fully
# processed). Use this before follow-up RPCs like UPDATE_SOURCE.
registered = await client.sources.wait_until_registered(nb_id, ids[0])
# List and manage
sources = await client.sources.list(nb_id)
for src in sources:
print(f"{src.id}: {src.title} ({src.kind})")
await client.sources.rename(nb_id, src.id, "Better Title")
await client.sources.refresh(nb_id, src.id) # Re-fetch URL content
# Check if a source needs refreshing (content changed)
is_fresh = await client.sources.check_freshness(nb_id, src.id)
if not is_fresh:
await client.sources.refresh(nb_id, src.id)
# Get full indexed content (what NotebookLM uses for answers)
fulltext = await client.sources.get_fulltext(nb_id, src.id)
print(f"Content ({fulltext.char_count} chars): {fulltext.content[:500]}...")
# Get AI-generated summary and keywords (returns a typed SourceGuide)
guide = await client.sources.get_guide(nb_id, src.id)
print(f"Summary: {guide.summary}")
print(f"Keywords: {guide.keywords}")
# Legacy guide["summary"] dict-subscript still works (with a
# DeprecationWarning) until v0.8.0; prefer attribute access.CLI equivalent: Artifact Commands — notebooklm artifact list, get, rename, delete, export, poll, wait. Generation methods map to Generate Commands (notebooklm generate <type>); download methods map to Download Commands (notebooklm download <type>).
| Method | Parameters | Returns | Description |
|---|---|---|---|
list(notebook_id, artifact_type=None) |
str, ArtifactType | None |
list[Artifact] |
List artifacts |
get(notebook_id, artifact_id) |
str, str |
Artifact | None |
Get artifact details (returns None if not found) |
delete(notebook_id, artifact_id) |
str, str |
None |
Delete artifact (idempotent; returns None whether or not it existed) |
rename(notebook_id, artifact_id, new_title, *, return_object=True) |
str, str, str |
Artifact | None |
Rename artifact (re-fetched; raises ArtifactNotFoundError if missing). return_object=False skips the re-fetch and returns None. |
poll_status(notebook_id, task_id) |
str, str |
GenerationStatus |
Check generation status |
wait_for_completion(notebook_id, task_id, ...) |
str, str, ... |
GenerationStatus |
Wait for generation. Pass on_status_change(status) for sync or async progress callbacks. |
retry_failed(notebook_id, artifact_id) |
str, str |
GenerationStatus |
Retry a failed Studio artifact in place (the UI "Retry"). Same artifact_id preserved; accepted → status="in_progress"; a synchronous refusal (rate limit / quota / not-retryable) raises RateLimitError/RPCError. See below. |
CLI equivalent: notebooklm artifact list --type <audio|video|slide-deck|quiz|flashcard|infographic|data-table|mind-map|report> (see Artifact Commands).
| Method | Parameters | Returns | Description |
|---|---|---|---|
list_audio(notebook_id) |
str |
list[Artifact] |
List audio overview artifacts |
list_video(notebook_id) |
str |
list[Artifact] |
List video overview artifacts |
list_reports(notebook_id) |
str |
list[Artifact] |
List report artifacts (Briefing Doc, Study Guide, Blog Post) |
list_quizzes(notebook_id) |
str |
list[Artifact] |
List quiz artifacts |
list_flashcards(notebook_id) |
str |
list[Artifact] |
List flashcard artifacts |
list_infographics(notebook_id) |
str |
list[Artifact] |
List infographic artifacts |
list_slide_decks(notebook_id) |
str |
list[Artifact] |
List slide deck artifacts |
list_data_tables(notebook_id) |
str |
list[Artifact] |
List data table artifacts |
CLI equivalent: Generate Commands — notebooklm generate audio, video, slide-deck, quiz, flashcards, infographic, data-table, mind-map, report.
| Method | Parameters | Returns | Description |
|---|---|---|---|
generate_audio(...) |
See below | GenerationStatus |
Generate podcast |
generate_video(...) |
See below | GenerationStatus |
Generate video |
generate_report(...) |
See below | GenerationStatus |
Generate report |
generate_quiz(...) |
See below | GenerationStatus |
Generate quiz |
generate_flashcards(...) |
See below | GenerationStatus |
Generate flashcards |
generate_slide_deck(...) |
See below | GenerationStatus |
Generate slide deck |
generate_infographic(...) |
See below | GenerationStatus |
Generate infographic |
generate_data_table(...) |
See below | GenerationStatus |
Generate data table |
generate_mind_map(...) |
See below | MindMapResult |
Generate a mind map and persist it as a note. Use attribute access (result.mind_map, result.note_id); legacy result["mind_map"] dict-subscript still works (with a DeprecationWarning) until v0.8.0. |
CLI equivalent: notebooklm artifact retry <artifact_id> -n <notebook_id> [--json] [--wait].
retry_failed(notebook_id, artifact_id) re-runs generation for an
already-failed artifact in place — the UI "Retry" action. The artifact is
not deleted first; the same artifact_id is preserved and returned as the task
id, so poll_status() / wait_for_completion() keep working against it.
It follows the ADR-0019 "async kickoff" contract: an accepted retry returns
GenerationStatus(status="in_progress"), while a synchronous refusal
(USER_DISPLAYABLE_ERROR — rate limit, quota, or a non-retryable artifact)
raises the underlying RateLimitError / RPCError rather than returning a
status="failed" handle. (As a brand-new method it is born on the right side
of the contract; the generate_* / revise_slide methods still swallow such
refusals into status="failed" until v0.8.0.) A retry can itself fail again
provider-side — observed by later polling as a terminal failed status — so
callers decide whether to re-invoke.
status = await client.artifacts.retry_failed(nb_id, failed_artifact_id)
# status.task_id == failed_artifact_id, status.status == "in_progress"
final = await client.artifacts.wait_for_completion(nb_id, status.task_id)
# Auto-retry on a rate-limited refusal with the public helper. Because
# retry_failed RAISES RateLimitError (rather than returning a rate-limited
# status), with_rate_limit_retry now also catches that exception, backs off,
# and re-raises if the budget is exhausted.
from notebooklm.artifacts import with_rate_limit_retry
status = await with_rate_limit_retry(
lambda: client.artifacts.retry_failed(nb_id, failed_artifact_id),
max_retries=3,
)CLI equivalent: Download Commands — notebooklm download audio, video, slide-deck, infographic, report, mind-map, data-table, quiz, flashcards.
| Method | Parameters | Returns | Description |
|---|---|---|---|
download_audio(notebook_id, output_path, artifact_id=None) |
str, str, str |
str |
Download audio to file (MP4/MP3) |
download_video(notebook_id, output_path, artifact_id=None) |
str, str, str |
str |
Download video to file (MP4) |
download_infographic(notebook_id, output_path, artifact_id=None) |
str, str, str |
str |
Download infographic to file (PNG) |
download_slide_deck(notebook_id, output_path, artifact_id=None, output_format="pdf") |
str, str, str, str |
str |
Download slide deck as PDF or PPTX (output_format: "pdf" or "pptx") |
download_report(notebook_id, output_path, artifact_id=None) |
str, str, str |
str |
Download report as Markdown (.md) |
download_mind_map(notebook_id, output_path, artifact_id=None) |
str, str, str |
str |
Download mind map as JSON (.json) |
download_data_table(notebook_id, output_path, artifact_id=None) |
str, str, str |
str |
Download data table as CSV (.csv) |
download_quiz(notebook_id, output_path, artifact_id=None, output_format="json") |
str, str, str, str |
str |
Download quiz (json/markdown/html) |
download_flashcards(notebook_id, output_path, artifact_id=None, output_format="json") |
str, str, str, str |
str |
Download flashcards (json/markdown/html) |
Download Methods:
# Download the most recent completed audio overview
path = await client.artifacts.download_audio(nb_id, "podcast.mp4")
# Download a specific audio artifact by ID
path = await client.artifacts.download_audio(nb_id, "podcast.mp4", artifact_id="abc123")
# Download video overview
path = await client.artifacts.download_video(nb_id, "video.mp4")
# Download infographic
path = await client.artifacts.download_infographic(nb_id, "infographic.png")
# Download slide deck as PDF
path = await client.artifacts.download_slide_deck(nb_id, "./slides.pdf")
# Returns: "./slides.pdf"
# Download report as Markdown
path = await client.artifacts.download_report(nb_id, "./study-guide.md")
# Extracts markdown content from Briefing Doc, Study Guide, Blog Post, etc.
# Download mind map as JSON
path = await client.artifacts.download_mind_map(nb_id, "./concept-map.json")
# JSON structure: {"name": "Topic", "children": [{"name": "Subtopic", ...}]}
# Download data table as CSV
path = await client.artifacts.download_data_table(nb_id, "./data.csv")
# CSV uses UTF-8 with BOM encoding for Excel compatibility
# Download quiz as JSON (default)
path = await client.artifacts.download_quiz(nb_id, "quiz.json")
# Download quiz as markdown with answers marked
path = await client.artifacts.download_quiz(nb_id, "quiz.md", output_format="markdown")
# Download flashcards as JSON (normalizes f/b to front/back)
path = await client.artifacts.download_flashcards(nb_id, "cards.json")
# Download flashcards as markdown
path = await client.artifacts.download_flashcards(nb_id, "cards.md", output_format="markdown")Notes:
- If
artifact_idis not specified, downloads the first completed artifact of that type - Raises
ValueErrorif no completed artifact is found - Some URLs require browser-based download (handled automatically)
- Report downloads extract the markdown content from the artifact
- Mind map downloads return a JSON tree structure with
nameandchildrenfields - Data table downloads parse the complex rich-text format into CSV rows/columns
- Quiz/flashcard formats:
json(structured),markdown(readable),html(raw) - Downloads automatically use the storage path from
from_storage(path=...)or the resolved profile for cookie authentication
Export artifacts to Google Docs or Google Sheets.
CLI equivalent: notebooklm artifact export <id> --title TEXT --type [docs|sheets] (see Artifact Commands).
| Method | Parameters | Returns | Description |
|---|---|---|---|
export_report(notebook_id, artifact_id, title="Export", export_type=ExportType.DOCS) |
str, str, str, ExportType |
Any |
Export report to Google Docs/Sheets |
export_data_table(notebook_id, artifact_id, title="Export") |
str, str, str |
Any |
Export data table to Google Sheets |
export(notebook_id, artifact_id=None, content=None, title="Export", export_type=ExportType.DOCS) |
str, str | None, str | None, str, ExportType |
Any |
Generic export to Docs/Sheets. All trailing parameters are optional with defaults; pass content=... to export inline content without a pre-existing artifact. |
Export Types (ExportType enum):
ExportType.DOCS(1): Export to Google DocsExportType.SHEETS(2): Export to Google Sheets
from notebooklm import ExportType
# Export a report to Google Docs
result = await client.artifacts.export_report(
nb_id,
artifact_id="report_123",
title="My Briefing Doc",
export_type=ExportType.DOCS
)
# result contains the Google Docs URL
# Export a data table to Google Sheets
result = await client.artifacts.export_data_table(
nb_id,
artifact_id="table_456",
title="Research Data"
)
# result contains the Google Sheets URL
# Generic export (e.g., export any artifact to Sheets). All trailing
# parameters have defaults: `artifact_id=None`, `content=None`,
# `title="Export"`, `export_type=ExportType.DOCS`. Supply `content=...`
# instead of `artifact_id=...` to export inline text without a pre-existing
# artifact.
result = await client.artifacts.export(
nb_id,
artifact_id="artifact_789",
title="Exported Content",
export_type=ExportType.SHEETS
)Generation Methods:
When language is omitted, artifact generation defaults to "en" (the
historical default). Pass language=None to read NOTEBOOKLM_HL and fall back
to "en" if unset, or pass a concrete code such as language="ko" to force
that language.
from notebooklm import (
AudioFormat,
AudioLength,
VideoFormat,
VideoStyle,
ReportFormat,
QuizQuantity,
QuizDifficulty,
)
# Audio (podcast)
status = await client.artifacts.generate_audio(
notebook_id,
source_ids=None, # List of source IDs (None = all)
instructions="...", # Custom instructions
audio_format=AudioFormat.DEEP_DIVE, # DEEP_DIVE, BRIEF, CRITIQUE, DEBATE
audio_length=AudioLength.DEFAULT, # SHORT, DEFAULT, LONG
language="en"
)
# Video
status = await client.artifacts.generate_video(
notebook_id,
source_ids=None,
instructions="...",
video_format=VideoFormat.EXPLAINER, # EXPLAINER, BRIEF, CINEMATIC
video_style=VideoStyle.AUTO_SELECT, # AUTO_SELECT, CLASSIC, WHITEBOARD, KAWAII, ANIME, etc.
language="en"
)
# Report
status = await client.artifacts.generate_report(
notebook_id,
report_format=ReportFormat.STUDY_GUIDE, # BRIEFING_DOC, STUDY_GUIDE, BLOG_POST, CUSTOM
source_ids=None,
language="en",
custom_prompt=None, # Used with ReportFormat.CUSTOM
extra_instructions="..." # Optional append for built-in formats
)
# Quiz
status = await client.artifacts.generate_quiz(
notebook_id,
source_ids=None,
instructions="...",
quantity=QuizQuantity.MORE, # FEWER, STANDARD, MORE (MORE aliases STANDARD)
difficulty=QuizDifficulty.MEDIUM, # EASY, MEDIUM, HARD
)Rate-limit retry for generation:
from notebooklm.artifacts import with_rate_limit_retry
status = await with_rate_limit_retry(
lambda: client.artifacts.generate_audio(
notebook_id,
instructions="focus on the counterarguments",
),
max_retries=3,
)Waiting for Completion:
from notebooklm import ArtifactTimeoutError
# Start generation
status = await client.artifacts.generate_audio(nb_id)
try:
# Wait with polling. Use higher timeouts for media jobs:
# audio=1200s, video=1800s, cinematic-video=3600s.
final = await client.artifacts.wait_for_completion(
nb_id,
status.task_id,
timeout=1200, # Max wait time in seconds
initial_interval=5 # Initial seconds between polls
)
except ArtifactTimeoutError as exc:
print(exc.stalled_phase, exc.last_status, exc.status_history)
raise
if final.is_complete:
path = await client.artifacts.download_audio(nb_id, "podcast.mp3")
print(f"Saved to: {path}")
else:
print(f"Failed or timed out: {final.status}")CLI equivalent: Chat Commands — notebooklm ask, configure, history.
| Method | Parameters | Returns | Description |
|---|---|---|---|
ask(notebook_id, question, ...) |
str, str, ... |
AskResult |
Ask a question |
configure(notebook_id, ...) |
str, ... |
None |
Set chat persona |
get_history(notebook_id, limit=100, conversation_id=None) |
str, int, str |
list[tuple[str, str]] |
Get Q&A pairs from most recent conversation |
get_conversation_id(notebook_id) |
str |
str | None |
Get most recent conversation ID from server |
delete_conversation(notebook_id, conversation_id) |
str, str |
bool |
DESTRUCTIVE. Permanently delete a server-side conversation (web UI's "Delete history" action). The next ask() with no conversation_id then starts a brand-new conversation. |
save_answer_as_note(notebook_id, ask_result, *, title=None) |
str, AskResult, str | None |
Note |
Save a chat answer as a citation-rich note (issue #660) — the resulting note's [N] markers remain interactive hover-anchored citations in the NotebookLM web UI. Owns the saved-from-chat workflow on ChatAPI (the data owner). Raises ValueError if ask_result.references is empty. When title is None, derives f"Chat: {ask_result.answer[:50].strip().replace(chr(10), ' ')}". |
ask() Parameters:
async def ask(
notebook_id: str,
question: str,
source_ids: list[str] | None = None, # Limit to specific sources (None = all)
conversation_id: str | None = None, # Continue existing conversation
) -> AskResult:Conversation semantics (issue #659):
conversation_id=Nonematches the web UI's default: the server attaches the question to your current conversation on this notebook (or creates one if none exists). Repeatedask()calls withoutconversation_idextend the same conversation; they do not start fresh ones. The SDK fetches the server-recorded conversation_id viahPTbtcafter each new-conversation ask and surfaces it onAskResult.conversation_id, so passing it back asconversation_id=for follow-ups works as expected.conversation_id=<existing-id>is a follow-up: the question is appended to the named conversation.- To force a brand-new conversation, call
client.chat.delete_conversation(notebook_id, last_conversation_id)first — the server then has nothing to extend and the next null-convask()starts a fresh thread. This is destructive: deleted turns are not recoverable. The method mirrors the web UI's "Delete history" button (J7GthcRPC) and is the same primitive the CLI'snotebooklm ask --newis built on.
Example:
from notebooklm import ChatGoal, ChatResponseLength
# Ask questions (uses all sources)
result = await client.chat.ask(nb_id, "What are the main themes?")
print(result.answer)
print(result.conversation_id) # server-recorded id, fetched via hPTbtc
# Access source references (cited in answer as [1], [2], etc.)
for ref in result.references:
print(f"Citation {ref.citation_number}: Source {ref.source_id}")
# Ask using only specific sources
result = await client.chat.ask(
nb_id,
"Summarize the key points",
source_ids=["src_001", "src_002"]
)
# Continue conversation explicitly (or omit conversation_id — same effect
# while the most-recent conversation on the notebook stays unchanged).
result = await client.chat.ask(
nb_id,
"Can you elaborate on the first point?",
conversation_id=result.conversation_id
)
# Force a fresh conversation (destructive — turns are not recoverable).
# Mirrors the web UI's "Delete history" button.
last_conv_id = await client.chat.get_conversation_id(nb_id)
if last_conv_id:
await client.chat.delete_conversation(nb_id, last_conv_id)
result = await client.chat.ask(nb_id, "Start fresh — what are the themes?")
assert result.turn_number == 1
# Configure persona
await client.chat.configure(
nb_id,
goal=ChatGoal.LEARNING_GUIDE,
response_length=ChatResponseLength.LONGER,
custom_prompt="Focus on practical applications"
)
# Save a chat answer as a citation-rich note (preserves [N] hover links).
# This is the canonical owner of the saved-from-chat workflow — the data
# owner (`ChatAPI`) persists, so the answer text and references stay
# adjacent to the call that produced them.
result = await client.chat.ask(nb_id, "What fruits are mentioned?")
if result.references:
note = await client.chat.save_answer_as_note(
nb_id, result, title="Fruit Citations"
)
# The NotebookLM server may auto-generate a "smart" title for
# citation-rich notes; note.title reflects what the server stored.CLI equivalent: Research Commands (notebooklm research status, wait) plus notebooklm source add-research (Source: add-research) for the combined start-and-import workflow.
| Method | Parameters | Returns | Description |
|---|---|---|---|
start(notebook_id, query, source, mode) |
str, str, str="web", str="fast" |
ResearchStart | None |
Start research (mode: "fast" or "deep"); raises ValidationError on invalid source/mode |
poll(notebook_id, task_id=None) |
str, str | None = None |
ResearchTask |
Check research status |
wait_for_completion(notebook_id, task_id=None, *, timeout=1800, initial_interval=5) |
str, str | None, float, float |
ResearchTask |
Wait for research to complete, pinning the discovered task ID between polls. Raises ResearchTimeoutError (a WaitTimeoutError/TimeoutError). The legacy interval= keyword is a deprecated alias for initial_interval= (removed in v0.8.0). |
import_sources(notebook_id, task_id, sources) |
str, str, Sequence[dict[str, Any] | ResearchSource] |
list[dict] |
Import findings. Accepts plain dicts or the typed ResearchSource objects from poll().sources. |
Typed returns (since v0.7.0).
start/poll/wait_for_completionreturn the typed dataclassesResearchStart/ResearchTask(whose.sourcesareResearchSourceobjects), with.statusaResearchStatusstr-enum (status == "completed"still holds). Legacy dict-subscript (result["status"]/result["sources"][0]["url"]) keeps working with aDeprecationWarninguntil v0.8.0; prefer attribute access.import_sourcesstill acceptslist[dict]orResearchSourceobjects, so feedingresult.sourcesstraight back in works.
Method Signatures:
async def start(
notebook_id: str,
query: str,
source: str = "web", # "web" or "drive"
mode: str = "fast", # "fast" or "deep" (deep only for web)
) -> ResearchStart | None:
"""
Returns: a ResearchStart with .task_id / .report_id / .notebook_id /
.query / .mode, or None if the RPC returns an empty/unexpected payload.
Legacy result["task_id"] dict-subscript still works (with a
DeprecationWarning) until v0.8.0.
Raises: ValidationError if source/mode combination is invalid
"""
async def poll(notebook_id: str, task_id: str | None = None) -> ResearchTask:
"""
Returns a ResearchTask for the selected research task. If task_id is None,
selects the latest research task and emits an ambiguity warning if multiple
tasks are in-flight. When task_id is supplied but no in-flight task matches,
returns ResearchTask.not_found(task_id) — status NOT_FOUND, a typed
poll-observed-absence sentinel (does not raise); the unfiltered empty poll
stays NO_RESEARCH. Attributes:
- task_id: str — task/report identifier
- status: ResearchStatus — COMPLETED | FAILED | IN_PROGRESS | NO_RESEARCH | NOT_FOUND
(a str enum; == "completed" still holds)
- query: str — original research query
- sources: tuple[ResearchSource, ...]
- summary: str — summary text when present
- report: str — deep-research report markdown when present
- tasks: tuple[ResearchTask, ...] — ALL parsed tasks visible at this poll
Legacy dict-subscript (result["status"], result["sources"][0]["url"]) keeps
working with a DeprecationWarning until v0.8.0; prefer attribute access.
Each ResearchSource exposes:
- url, title
- result_type: int — 1=web, 2=drive, 5=deep-research report entry
- research_task_id: str — task/report ID that produced this source
- report_markdown: str — deep-research report markdown (for type-5 entries)
"""
async def wait_for_completion(
notebook_id: str,
task_id: str | None = None,
*,
timeout: float = 1800,
initial_interval: float = 5, # canonical poll-cadence keyword
interval: float = 5, # DEPRECATED alias for initial_interval (removed in v0.8.0)
) -> ResearchTask:
"""
Loops on poll() until research returns "completed" / "failed" or the
timeout expires. "no_research" returns immediately only before a task_id
is known; when a task_id is supplied or discovered, transient
"no_research" polls are retried. Once a concrete task_id is returned,
later polls reuse it as the discriminator so concurrent research tasks in
the same notebook cannot cross-wire results.
Returns: the final poll() ResearchTask.
Raises:
- ResearchTimeoutError on timeout (a WaitTimeoutError and a built-in
TimeoutError, so `except TimeoutError` / `except WaitTimeoutError`
both catch it).
- ValueError for invalid timeout or non-positive poll interval.
- TypeError if both the deprecated `interval=` and `initial_interval=`
are passed.
"""
async def import_sources(
notebook_id: str,
task_id: str,
sources: Sequence[dict[str, Any] | ResearchSource],
) -> list[dict]:
"""
sources: a sequence of dicts (with 'url' and 'title' keys) OR the typed
ResearchSource objects from poll().sources — both are accepted and
coerced. Deep-research entries may also carry 'report_markdown',
'result_type', and 'research_task_id'.
Returns: list of imported sources with 'id' and 'title'.
Raises:
- ValidationError if `sources` contains entries from more than one
research task (`research_task_id` mismatch). Import each task's
sources in a separate call.
Caveats:
- The API response can under-report — fewer items may come back than
were actually imported. After this call, re-list with
`client.sources.list(notebook_id)` to verify the final source set.
- Entries without a `url` and without a complete report (`title` +
`report_markdown` + `result_type == 5`) are skipped with a warning.
"""Example:
# Start research and capture the task_id discriminator (typed ResearchStart)
result = await client.research.start(nb_id, "AI safety regulations")
if result is None:
raise RuntimeError("Research start returned None")
task_id = result.task_id
# If you launch multiple concurrent research tasks on the same notebook
# (web vs drive, fast vs deep), always pass the task_id to poll() so the
# poll resolves to the intended task. Without it, poll() returns the
# "latest task" and emits an ambiguity warning when multiple are in flight.
# Wait until complete (always pass task_id for unambiguous targeting)
status = await client.research.wait_for_completion(
nb_id,
task_id=task_id,
timeout=1800,
initial_interval=5, # canonical keyword; `interval=` is a deprecated alias
)
# `status` is a typed ResearchTask; `.sources` are ResearchSource objects,
# which import_sources accepts directly.
imported = await client.research.import_sources(nb_id, task_id, list(status.sources)[:5])
print(f"Imported {len(imported)} sources")Unified surface over NotebookLM's two mind-map kinds (issue #1256): the
note-backed kind (JSON tree stored as a note) and the newer interactive
kind (a studio artifact, internally type 4 / variant 4, created by the web GUI).
Each operation dispatches to the correct backend; you work with MindMap /
MindMapKind and never see the split.
| Method | Args | Returns | Description |
|---|---|---|---|
list(notebook_id) |
str |
list[MindMap] |
Both kinds, as distinct MindMap entries. MindMap.tree is populated for note-backed entries but None for interactive ones (None = not fetched, not empty — see below) |
list_note_backed(notebook_id) |
str |
list[MindMap] |
Note-backed entries only (every kind is NOTE_BACKED, tree populated, deleted rows excluded), via a single GET_NOTES_AND_MIND_MAPS RPC — no LIST_ARTIFACTS. Use list() for the union with interactive maps |
get(notebook_id, mind_map_id) |
str, str |
MindMap | None |
Single mind map by id. Deprecated None-on-miss (warns in v0.7.0, raises MindMapNotFoundError in v0.8.0 — issues #1247/#1358); use get_or_none() for the warning-free optional lookup |
get_or_none(notebook_id, mind_map_id) |
str, str |
MindMap | None |
Sanctioned None-on-miss lookup (silent — no deprecation warning) |
generate(notebook_id, source_ids=None, *, kind, language="en", instructions=None, wait=True) |
… | MindMap |
Note-backed (sync) or interactive (CREATE_ARTIFACT + poll). A null CREATE_ARTIFACT raises ArtifactFeatureUnavailableError (a subclass of ArtifactError) |
rename(notebook_id, mind_map_id, new_title, *, kind=None, return_object=True) |
… | MindMap | None |
UPDATE_NOTE / RENAME_ARTIFACT by kind (re-fetched; raises MindMapNotFoundError if missing). return_object=False returns None. |
delete(notebook_id, mind_map_id, *, kind=None) |
… | None |
DELETE_NOTE / DELETE_ARTIFACT by kind (idempotent — deleting an already-absent map returns None, for both kind=None and a supplied kind) |
get_tree(notebook_id, mind_map_id, *, kind=None) |
… | dict | None |
The {"name","children"} node tree; None for a missing or not-yet-populated map (derived read — does not police existence). The explicit kind=INTERACTIVE path delegates absence detection to the RPC (a missing id's value is server-dependent — None today) |
MindMap is a frozen value: id, notebook_id, title, kind (MindMapKind.NOTE_BACKED / INTERACTIVE), created_at, and tree. generate(..., wait=True) returns tree populated for both kinds (interactive maps are polled to completion, then their tree is fetched). list(...) populates tree only for note-backed entries (parsed for free from the listed note content); interactive entries carry tree=None ("not fetched", not "empty" — fetching each would cost a separate GET_INTERACTIVE_HTML), so call get_tree(..., kind=INTERACTIVE) to fetch an individual interactive tree. When kind is omitted from rename/delete/get_tree, the backing is auto-detected (one extra list call).
maps = await client.mind_maps.list(nb_id)
for mm in maps:
print(mm.id, mm.title, mm.kind.value)
# Generate the interactive (web-GUI) kind and poll to completion:
mm = await client.mind_maps.generate(nb_id, kind=MindMapKind.INTERACTIVE)
tree = await client.mind_maps.get_tree(nb_id, mm.id, kind=mm.kind)
await client.mind_maps.rename(nb_id, mm.id, "Renamed", kind=mm.kind)
await client.mind_maps.delete(nb_id, mm.id, kind=mm.kind)In the CLI, mind maps are handled as a type within the existing groups (matching
audio/video/quiz): artifact list --type mind-map, artifact rename,
artifact delete, generate mind-map, and download mind-map.
The kind-specific
artifacts.generate_mind_map()/notes.list_mind_maps()/notes.delete_mind_map()remain fully supported for the note-backed kind — they are not deprecated.client.mind_maps.*is the unified surface that also reaches the interactive kind; use whichever fits.
CLI equivalent: Note Commands — notebooklm note list, create, get, save, rename, delete.
| Method | Parameters | Returns | Description |
|---|---|---|---|
list(notebook_id) |
str |
list[Note] |
List text notes (excludes mind maps) |
create(notebook_id, title="New Note", content="") |
str, str, str |
Note |
Create plain-text note (no citation anchors) |
get(notebook_id, note_id) |
str, str |
Optional[Note] |
Get note by ID |
update(notebook_id, note_id, content, title) |
str, str, str, str |
None |
Update note content and title |
delete(notebook_id, note_id) |
str, str |
None |
Delete note (idempotent; returns None whether or not it existed) |
list_mind_maps(notebook_id) |
str |
list[Any] |
List mind maps in the notebook |
delete_mind_map(notebook_id, mind_map_id) |
str, str |
None |
Delete a mind map (idempotent; returns None whether or not it existed) |
Example:
# Create and manage plain-text notes
note = await client.notes.create(nb_id, title="Meeting Notes", content="Discussion points...")
notes = await client.notes.list(nb_id)
# Update a note
await client.notes.update(nb_id, note.id, "Updated content", "New Title")
# Delete a note
await client.notes.delete(nb_id, note.id)
# Save a chat answer as a citation-rich note (preserves [N] hover links).
# Use ``client.chat.save_answer_as_note(...)`` — the chat-owned canonical
# method (the former ``client.notes.create_from_chat(...)`` forwarder was
# removed in v0.7.0).
result = await client.chat.ask(nb_id, "What fruits are mentioned?")
if result.references:
note = await client.chat.save_answer_as_note(nb_id, result, title="Fruit Citations")
# Note: the NotebookLM server may auto-generate a "smart" title for
# citation-rich notes; note.title reflects what the server stored.Mind Maps:
Mind maps are stored internally using the same structure as notes but contain JSON data with hierarchical node information. The list() method excludes mind maps automatically, while list_mind_maps() returns only mind maps.
# List all mind maps in a notebook
mind_maps = await client.notes.list_mind_maps(nb_id)
for mm in mind_maps:
mm_id = mm[0] # Mind map ID is at index 0
print(f"Mind map: {mm_id}")
# Delete a mind map
await client.notes.delete_mind_map(nb_id, mind_map_id)Note: Mind maps are detected by checking if the content contains '"children":' or '"nodes":'` keys, which indicate JSON mind map data structure.
Two mind-map kinds (issue #1256): NotebookLM has two distinct mind-map objects — the note-backed kind above (list_mind_maps()), and the newer interactive kind the web GUI now creates (a studio artifact, internally type 4 / variant 4). Both are first-class: the interactive kind appears in client.artifacts.list(ArtifactType.MIND_MAP) (and Artifact.is_interactive_mind_map distinguishes the backing), download_mind_map exports either kind's JSON tree, and the unified client.mind_maps surface generates/reads/renames/deletes both behind a MindMapKind discriminator. The notes.*_mind_map helpers here remain fully supported for the note-backed kind.
CLI equivalent: Language Commands — notebooklm language get, set, list. Account limits and tier do not yet have a dedicated CLI surface; use notebooklm status for context.
| Method | Parameters | Returns | Description |
|---|---|---|---|
get_output_language() |
none | Optional[str] |
Get current output language setting |
get_account_limits() |
none | AccountLimits |
Get account-level limits such as max notebooks and sources per notebook |
get_account_tier() |
none | AccountTier |
Get current NotebookLM subscription tier |
set_output_language(language) |
str |
Optional[str] |
Set output language for artifact generation |
Example:
# Get current language setting
lang = await client.settings.get_output_language()
print(f"Current language: {lang}") # e.g., "en", "ja", "zh_Hans"
# Get server-reported account limits
limits = await client.settings.get_account_limits()
print(f"Notebook limit: {limits.notebook_limit}")
# Get current NotebookLM subscription tier
tier = await client.settings.get_account_tier()
print(f"Account tier: {tier.plan_name or tier.tier}")
# Set language for artifact generation
result = await client.settings.set_output_language("ja") # Japanese
print(f"Language set to: {result}")Important: Language is a GLOBAL setting that affects all notebooks in your account. The tier string is internal NotebookLM metadata; use get_account_limits() for quota decisions because the raw tier name may not match the active notebook/source limits. Supported languages include:
en(English),ja(日本語),zh_Hans(中文简体),zh_Hant(中文繁體)ko(한국어),es(Español),fr(Français),de(Deutsch),pt_BR(Português)- And over 70 other languages
CLI equivalent: Share Commands — notebooklm share status, public, view-level, add, update, remove.
| Method | Parameters | Returns | Description |
|---|---|---|---|
get_status(notebook_id) |
str |
ShareStatus |
Get current sharing configuration |
set_public(notebook_id, public) |
str, bool |
ShareStatus |
Enable/disable public link sharing |
set_view_level(notebook_id, level) |
str, ShareViewLevel |
ShareStatus |
Set what viewers can access |
add_user(notebook_id, email, permission, notify, welcome_message) |
str, str, SharePermission, bool, str |
ShareStatus |
Share with a user |
update_user(notebook_id, email, permission) |
str, str, SharePermission |
ShareStatus |
Update user's permission |
remove_user(notebook_id, email) |
str, str |
ShareStatus |
Remove user's access |
Example:
from notebooklm import SharePermission, ShareViewLevel
# Get current sharing status
status = await client.sharing.get_status(notebook_id)
print(f"Public: {status.is_public}")
print(f"Users: {[u.email for u in status.shared_users]}")
# Enable public sharing (anyone with link)
status = await client.sharing.set_public(notebook_id, True)
print(f"Share URL: {status.share_url}")
# Set view level (what viewers can access)
await client.sharing.set_view_level(notebook_id, ShareViewLevel.CHAT_ONLY)
# Share with specific users
status = await client.sharing.add_user(
notebook_id,
"colleague@example.com",
SharePermission.VIEWER,
notify=True,
welcome_message="Check out my research!"
)
# Update user permission
status = await client.sharing.update_user(
notebook_id,
"colleague@example.com",
SharePermission.EDITOR
)
# Remove user access
status = await client.sharing.remove_user(notebook_id, "colleague@example.com")
# Disable public sharing
status = await client.sharing.set_public(notebook_id, False)Permission Levels:
SharePermission.OWNER- Full control (read-only, cannot be assigned)SharePermission.EDITOR- Can edit notebook contentSharePermission.VIEWER- Read-only access
View Levels:
ShareViewLevel.FULL_NOTEBOOK- Viewers can access chat, sources, and notesShareViewLevel.CHAT_ONLY- Viewers can only access the chat interface
CLI equivalent: Label Commands — notebooklm label list, sources, generate, create, rename, emoji, add, remove, delete.
Source labels group a notebook's sources into topic buckets. A label is a
standalone, notebook-scoped entity: membership is many-to-many (a source can
belong to multiple labels), and a label owns a list of source IDs — the source
carries no back-reference. The dataclass is Label (importable as
from notebooklm import Label).
| Method | Parameters | Returns | Description |
|---|---|---|---|
list(notebook_id) |
str |
list[Label] |
List all labels in a notebook (with source membership) |
get(notebook_id, label_id) |
str, str |
Label |
Get a label by id; raises LabelNotFoundError on a miss |
get_or_none(notebook_id, label_id) |
str, str |
Label | None |
Get a label by id, returning None when absent |
sources(notebook_id, label_id) |
str, str |
list[Source] |
Expand a label to its Source objects (group-as-collection accessor); raises LabelNotFoundError if absent |
generate(notebook_id, *, scope="unlabeled") |
str, *, Literal["all", "unlabeled"] |
list[Label] |
AI-group sources into topic labels (the UI's "Reorganize"). scope="unlabeled" (default, safe) labels only unlabeled sources; scope="all" is destructive — it wipes and regenerates every label with new ids. Returns the full post-op set. |
create(notebook_id, name, emoji="") |
str, str, str |
Label |
Create an empty, manually-named label. Locates the new label by id-diff; raises LabelError on an ambiguous concurrent create |
rename(notebook_id, label_id, name, *, return_object=True) |
str, str, str, *, bool |
Label | None |
Rename a label (preserves the existing emoji). Raises LabelNotFoundError if missing |
set_emoji(notebook_id, label_id, emoji, *, return_object=True) |
str, str, str, *, bool |
Label | None |
Set a label's emoji |
update(notebook_id, label_id, *, name=None, emoji=None, return_object=True) |
str, str, *, str | None, str | None, bool |
Label | None |
Set name and/or emoji. Raises ValueError if both are None; raises LabelNotFoundError if the label is missing (in both return_object modes) |
add_sources(notebook_id, label_id, source_ids, *, return_object=True) |
str, str, list[str], *, bool |
Label | None |
Add source(s) to a label. Appends — existing members survive and overlap with other labels is allowed. One RPC per id (deduped); not atomic across ids. Raises ValueError on an empty list |
remove_sources(notebook_id, label_id, source_ids, *, return_object=True) |
str, str, list[str], *, bool |
Label | None |
Un-assign source(s) from a label only — the sources survive in the notebook, and a source in another label stays there. Removing a non-member is a no-op. One RPC per id (deduped). Raises ValueError on an empty list |
delete(notebook_id, label_ids) |
str, str | list[str] |
None |
Delete one or more labels (batch). Idempotent — an absent target is a no-op returning None. Deleting a label does not delete its sources |
For rename/set_emoji/update/add_sources/remove_sources, return_object=False returns
None without re-hydrating, but the existence preflight still runs and raises
LabelNotFoundError on a missing target.
Example:
from notebooklm import Label
# AI-group the notebook's unlabeled sources into topic labels (safe default)
labels = await client.labels.generate(nb_id)
for label in labels:
print(f"{label.id}: {label.emoji or ''}{label.name} ({len(label.source_ids)} sources)")
# Destructive re-label: wipes and regenerates EVERY label with new ids
labels = await client.labels.generate(nb_id, scope="all")
# Create an empty, manually-named label
papers = await client.labels.create(nb_id, "Papers", emoji="📄")
# Add sources (append — does not remove them from any other label)
await client.labels.add_sources(nb_id, papers.id, [source_id])
# Expand a label to its Source objects
members = await client.labels.sources(nb_id, papers.id)
for src in members:
print(f"{src.id}: {src.title}")
# Read with raise-on-miss vs None-on-miss
label = await client.labels.get(nb_id, papers.id) # raises LabelNotFoundError
maybe = await client.labels.get_or_none(nb_id, "missing") # -> None
# Rename (emoji preserved) and re-emoji
await client.labels.rename(nb_id, papers.id, "Research Papers")
await client.labels.set_emoji(nb_id, papers.id, "📚")
# Delete (idempotent; sources become unlabeled, not deleted)
await client.labels.delete(nb_id, papers.id)Note:
add_sourcesappends;remove_sourcesun-assigns the source from the label only (it is not deleted from the notebook, and stays in any other label it belongs to). Both issue oneUPDATE_LABELper id (the wire honours only the first id per call) and are not atomic across ids.
@dataclass
class Notebook:
id: str
title: str
created_at: Optional[datetime] # creation time (tz-aware UTC)
sources_count: int
is_owner: bool
modified_at: Optional[datetime] # last-modified time (tz-aware UTC)@dataclass
class Source:
id: str
title: Optional[str]
url: Optional[str]
created_at: Optional[datetime]
status: int # 1=processing, 2=ready, 3=error, 5=preparing (defaults to READY)
@property
def kind(self) -> SourceType:
"""Get source type as SourceType enum."""
@property
def is_ready(self) -> bool:
"""status == SourceStatus.READY"""
@property
def is_processing(self) -> bool:
"""status == SourceStatus.PROCESSING"""
@property
def is_error(self) -> bool:
"""status == SourceStatus.ERROR"""Removed in v0.5.0:
Source.source_typewas replaced bySource.kind. See stability.md → Removed in v0.5.0.
Type Identification:
Use the .kind property to identify source types. It returns a SourceType enum which is also a str, enabling both enum and string comparisons:
from notebooklm import SourceType
# Enum comparison (recommended)
if source.kind == SourceType.PDF:
print("This is a PDF")
# String comparison (also works)
if source.kind == "pdf":
print("This is a PDF")
# Use in f-strings
print(f"Type: {source.kind}") # "Type: pdf"@dataclass
class Label:
id: str
name: str
notebook_id: Optional[str] = None
emoji: Optional[str] = None
source_ids: list[str] = field(default_factory=list) # empty for a new labelA source Label describes source membership only (no artifact members).
Importable as from notebooklm import Label. See
LabelsAPI.
@dataclass
class Artifact:
id: str
title: str
_artifact_type: int # Internal type code; field order matters. Access via .kind.
status: int # 1=processing, 2=pending, 3=completed, 4=failed
created_at: Optional[datetime]
url: Optional[str]
_variant: int | None = None # Internal variant for type-4 artifacts (1=flashcards, 2=quiz, 4=interactive mind map).
@property
def kind(self) -> ArtifactType:
"""Get artifact type as ArtifactType enum."""
@property
def is_completed(self) -> bool:
"""Check if artifact generation is complete."""
@property
def is_quiz(self) -> bool:
"""Check if this is a quiz artifact."""
@property
def is_flashcards(self) -> bool:
"""Check if this is a flashcards artifact."""
@property
def report_subtype(self) -> str | None:
"""Title-derived report subtype: 'briefing_doc', 'study_guide',
'blog_post', or 'report' for type-2 artifacts; None otherwise.
Use this instead of parsing titles in caller code.
"""Note on _artifact_type / _variant: these are private (leading-underscore) fields with repr=False and are part of the dataclass for from_api_response() round-tripping. Always consume them via the public .kind, .is_quiz, .is_flashcards, and .report_subtype accessors.
Removed in v0.5.0:
Artifact.artifact_typeandArtifact.variantwere replaced byArtifact.kindplus.is_quiz/.is_flashcards. See stability.md → Removed in v0.5.0.
Type Identification:
Use the .kind property to identify artifact types. It returns an ArtifactType enum which is also a str:
from notebooklm import ArtifactType
# Enum comparison (recommended)
if artifact.kind == ArtifactType.AUDIO:
print("This is an audio overview")
# String comparison (also works)
if artifact.kind == "audio":
print("This is an audio overview")
# Check specific types
if artifact.is_quiz:
print("This is a quiz")
elif artifact.is_flashcards:
print("This is a flashcard deck")Returned by poll_status, wait_for_completion, and most artifact generation methods (generate_audio, generate_video, generate_report, generate_quiz, generate_flashcards, generate_slide_deck, generate_infographic, generate_data_table). Note that generate_mind_map returns a dict[str, Any] instead — the mind map is delivered as JSON inline rather than polled.
@dataclass
class GenerationStatus:
task_id: str # Same value as Artifact.id once complete
status: GenerationState # str-Enum: "pending" | "in_progress" | "completed" | "failed" | "not_found" | "removed" | "unknown"
url: str | None = None # Populated for media artifacts when status == "completed"
error: str | None = None
error_code: str | None = None # e.g. "USER_DISPLAYABLE_ERROR" for rate limits
metadata: dict[str, Any] | None = None
@property
def is_complete(self) -> bool:
"""Check if generation is complete."""
@property
def is_failed(self) -> bool:
"""Check if generation failed."""
@property
def is_in_progress(self) -> bool:
"""Check if generation is in progress."""
@property
def is_pending(self) -> bool:
"""Check if generation is pending."""
@property
def is_not_found(self) -> bool:
"""Check if the artifact is absent from the poll response.
Distinct from ``is_pending``: a *pending* artifact exists in the
artifact list and is queued, while *not_found* means the artifact
has either not yet appeared (brief lag after creation) or was
silently removed server-side (e.g. after a daily-quota rejection).
``wait_for_completion`` treats a sustained run of ``not_found``
responses as a *removal* — see its ``max_not_found`` parameter and
``is_removed``.
"""
@property
def is_removed(self) -> bool:
"""Check if the artifact was delisted by the server.
Set by ``wait_for_completion`` when an artifact disappears from the
listing for a *sustained* run of polls (``max_not_found``). The absence
must be sustained: a transient/flapping omission where the artifact
reappears resets the not-found window, so a still-progressing artifact is
never fabricated into a terminal *removed* and instead polls through to
completion (or timeout). Kept *distinct* from ``is_failed``: a *failed*
artifact still exists in the listing with a terminal FAILED status,
whereas a *removed* artifact vanished from the listing and stayed gone —
typically a daily-quota rejection, occasionally a longer-lived server-
side omission. Branch on this when a delisting and a real terminal
failure warrant different handling.
"""
@property
def is_rate_limited(self) -> bool:
"""Check if generation failed (or was removed) due to rate limiting."""status is a GenerationState(str, Enum) (importable from notebooklm and
notebooklm.types), so it remains a str for every existing use — status == "completed", status in {...}, f"{status}", and json.dumps all keep
working unchanged. Prefer the .is_* predicates (status.is_complete,
status.is_failed, …) over raw string comparison for new code; the raw
status == "completed" form stays supported.
GenerationState member |
Value | Emitted by |
|---|---|---|
PENDING |
"pending" |
poll / generation parsers (also the default when no status code is reported yet) |
IN_PROGRESS |
"in_progress" |
poll / generation parsers |
COMPLETED |
"completed" |
poll / generation parsers |
FAILED |
"failed" |
poll / generation parsers; synthesized rate-limit retry events |
NOT_FOUND |
"not_found" |
poll_status when the artifact is absent from the list |
UNKNOWN |
"unknown" |
unrecognized status codes (future-proofing) |
REMOVED |
"removed" |
wait_for_completion after a sustained delisting |
Note: because
statusis now typedGenerationState, constructingGenerationStatus(..., status="completed")with a bare string literal is amypytype error under strict settings — pass a member (GenerationState.COMPLETED) instead. This only affects callers who buildGenerationStatusthemselves; the library's own producers already do. All reading comparisons (status == "completed") remain valid becauseGenerationStatesubclassesstr.
url semantics: poll_status populates url for media artifact types (audio, video, infographic, slide-deck PDF) as soon as the server reports the asset as ready. Slide decks expose the PDF URL here; for the editable PowerPoint, use client.artifacts.download_slide_deck(..., output_format="pptx") instead.
status = await client.artifacts.generate_audio(notebook_id)
final = await client.artifacts.wait_for_completion(notebook_id, status.task_id)
if final.is_complete and final.url:
# Stream the asset directly instead of re-fetching artifact metadata
...@dataclass
class AskResult:
answer: str # The answer text with inline citations [1], [2], etc.
conversation_id: str # ID for follow-up questions
turn_number: int # Turn number in conversation
is_follow_up: bool # Whether this was a follow-up question
references: list[ChatReference] # Source references cited in the answer
raw_response: str # First 1000 chars of raw API response
@dataclass
class ChatReference:
source_id: str # UUID of the source
citation_number: int | None # Citation number in answer (1, 2, etc.)
cited_text: str | None # Actual text passage being cited
start_char: int | None # Start position in source content
end_char: int | None # End position in source content
chunk_id: str | None # ID of the chunk / internal chunk ID (for debugging)
passage_id: str | None # ID of the passage
answer_start_char: int | None # Start character offset in the answer
answer_end_char: int | None # End character offset in the answer
score: float | None # Citation score or relevanceImportant: The cited_text field often contains only a snippet or section header, not the full quoted passage. The start_char/end_char positions reference NotebookLM's internal chunked index, which does not directly correspond to positions in the raw fulltext returned by get_fulltext().
Use SourceFulltext.find_citation_context() to locate citations in the fulltext:
fulltext = await client.sources.get_fulltext(notebook_id, ref.source_id)
matches = fulltext.find_citation_context(ref.cited_text) # Returns list[(context, position)]
if matches:
context, pos = matches[0] # First match
if len(matches) > 1:
print(f"Warning: {len(matches)} matches found, using first")
else:
context = None # Not found - may occur if source was modifiedTip: Cache fulltext when processing multiple citations from the same source to avoid repeated API calls.
@dataclass
class ShareStatus:
notebook_id: str # The notebook ID
is_public: bool # Whether publicly accessible
access: ShareAccess # RESTRICTED or ANYONE_WITH_LINK
view_level: ShareViewLevel # FULL_NOTEBOOK or CHAT_ONLY
shared_users: list[SharedUser] # List of users with access
share_url: str | None # Public URL if is_public=True@dataclass
class SharedUser:
email: str # User's email address
permission: SharePermission # OWNER, EDITOR, or VIEWER
display_name: str | None # User's display name
avatar_url: str | None # URL to user's avatar imageReturned by client.settings.get_account_limits(). Use these fields for
quota decisions in preference to the raw tier string — the server-reported
limits are what NotebookLM actually enforces.
@dataclass(frozen=True)
class AccountLimits:
notebook_limit: int | None = None # Max notebooks the account can hold
source_limit: int | None = None # Max sources per notebook
raw_limits: tuple[Any, ...] = () # Untouched RPC payload for forensic useReturned by client.settings.get_account_tier(). Raw tier metadata from
NotebookLM's homepage tier RPC. plan_name is the user-facing label when
available (e.g. "Google AI Pro"); tier is the internal identifier
("NOTEBOOKLM_TIER_STANDARD", "NOTEBOOKLM_TIER_PLUS", "NOTEBOOKLM_TIER_PRO", "NOTEBOOKLM_TIER_PRO_DASHER_END_USER", "NOTEBOOKLM_TIER_ULTRA").
@dataclass(frozen=True)
class AccountTier:
tier: str | None = None # Internal tier identifier
plan_name: str | None = None # User-facing plan labeltier = await client.settings.get_account_tier()
label = tier.plan_name or tier.tier or "unknown"@dataclass
class SourceFulltext:
source_id: str # UUID of the source
title: str # Source title
content: str # Full indexed text content
url: str | None # Original URL (if applicable)
char_count: int # Character count
@property
def kind(self) -> SourceType:
"""Get source type as SourceType enum."""
def find_citation_context(
self,
cited_text: str,
context_chars: int = 200,
) -> list[tuple[str, int]]:
"""Search for citation text, return list of (context, position) tuples."""Removed in v0.5.0:
SourceFulltext.source_typewas replaced bySourceFulltext.kind. See stability.md → Removed in v0.5.0.
Type Identification:
Like Source, use the .kind property to get the source type:
fulltext = await client.sources.get_fulltext(nb_id, source_id)
print(f"Content type: {fulltext.kind}") # "pdf", "web_page", etc.class AudioFormat(Enum):
DEEP_DIVE = 1 # In-depth discussion
BRIEF = 2 # Quick summary
CRITIQUE = 3 # Critical analysis
DEBATE = 4 # Two-sided debate
class AudioLength(Enum):
SHORT = 1
DEFAULT = 2
LONG = 3class VideoFormat(Enum):
EXPLAINER = 1
BRIEF = 2
CINEMATIC = 3
class VideoStyle(Enum):
AUTO_SELECT = 1
CUSTOM = 2
CLASSIC = 3
WHITEBOARD = 4
KAWAII = 5
ANIME = 6
WATERCOLOR = 7
RETRO_PRINT = 8
HERITAGE = 9
PAPER_CRAFT = 10class QuizQuantity(Enum):
FEWER = 1
STANDARD = 2
MORE = 2 # Alias of STANDARD used by the CLI/web UI
class QuizDifficulty(Enum):
EASY = 1
MEDIUM = 2
HARD = 3class ReportFormat(str, Enum):
BRIEFING_DOC = "briefing_doc"
STUDY_GUIDE = "study_guide"
BLOG_POST = "blog_post"
CUSTOM = "custom"class InfographicOrientation(Enum):
LANDSCAPE = 1
PORTRAIT = 2
SQUARE = 3
class InfographicDetail(Enum):
CONCISE = 1
STANDARD = 2
DETAILED = 3class SlideDeckFormat(Enum):
DETAILED_DECK = 1
PRESENTER_SLIDES = 2
class SlideDeckLength(Enum):
DEFAULT = 1
SHORT = 2class ExportType(Enum):
DOCS = 1 # Export to Google Docs
SHEETS = 2 # Export to Google Sheetsclass ShareAccess(Enum):
RESTRICTED = 0 # Only explicitly shared users
ANYONE_WITH_LINK = 1 # Public link access
class ShareViewLevel(Enum):
FULL_NOTEBOOK = 0 # Chat + sources + notes
CHAT_ONLY = 1 # Chat interface only
class SharePermission(Enum):
OWNER = 1 # Full control (read-only, cannot assign)
EDITOR = 2 # Can edit notebook
VIEWER = 3 # Read-only accessclass SourceType(str, Enum):
"""Source types - use with source.kind property.
This is a str enum, enabling both enum and string comparisons:
source.kind == SourceType.PDF # True
source.kind == "pdf" # Also True
"""
GOOGLE_DOCS = "google_docs"
GOOGLE_SLIDES = "google_slides"
GOOGLE_SPREADSHEET = "google_spreadsheet"
PDF = "pdf"
PASTED_TEXT = "pasted_text"
WEB_PAGE = "web_page"
GOOGLE_DRIVE_AUDIO = "google_drive_audio"
GOOGLE_DRIVE_VIDEO = "google_drive_video"
YOUTUBE = "youtube"
MARKDOWN = "markdown"
DOCX = "docx"
CSV = "csv"
EPUB = "epub"
IMAGE = "image"
MEDIA = "media"
UNKNOWN = "unknown"
class ArtifactType(str, Enum):
"""Artifact types - use with artifact.kind property.
This is a str enum that hides internal variant complexity.
Quizzes and flashcards are distinguished automatically.
"""
AUDIO = "audio"
VIDEO = "video"
REPORT = "report"
QUIZ = "quiz"
FLASHCARDS = "flashcards"
MIND_MAP = "mind_map"
INFOGRAPHIC = "infographic"
SLIDE_DECK = "slide_deck"
DATA_TABLE = "data_table"
UNKNOWN = "unknown"
class SourceStatus(Enum):
PROCESSING = 1 # Source is being processed (indexing content)
READY = 2 # Source is ready for use
ERROR = 3 # Source processing failed
PREPARING = 5 # Source is being prepared/uploaded (pre-processing stage)Usage Example:
from notebooklm import SourceType, ArtifactType
# List sources by type using .kind property
sources = await client.sources.list(nb_id)
for src in sources:
if src.kind == SourceType.PDF:
print(f"PDF: {src.title}")
elif src.kind == SourceType.MEDIA:
print(f"Audio/Video: {src.title}")
elif src.kind == SourceType.IMAGE:
print(f"Image (OCR'd): {src.title}")
elif src.kind == SourceType.UNKNOWN:
print(f"Unknown type: {src.title}")
# List artifacts by type using .kind property
artifacts = await client.artifacts.list(nb_id)
for art in artifacts:
if art.kind == ArtifactType.AUDIO:
print(f"Audio: {art.title}")
elif art.kind == ArtifactType.VIDEO:
print(f"Video: {art.title}")
elif art.kind == ArtifactType.QUIZ:
print(f"Quiz: {art.title}")class ChatGoal(Enum):
DEFAULT = 1 # General purpose
CUSTOM = 2 # Uses custom_prompt
LEARNING_GUIDE = 3 # Educational focus
class ChatResponseLength(Enum):
DEFAULT = 1
LONGER = 4
SHORTER = 5
class ChatMode(Enum):
"""Predefined chat modes for common use cases (service-level enum)."""
DEFAULT = "default" # General purpose
LEARNING_GUIDE = "learning_guide" # Educational focus
CONCISE = "concise" # Brief responses
DETAILED = "detailed" # Verbose responsesChatGoal vs ChatMode:
ChatGoalis an RPC-level enum used withclient.chat.configure()for low-level API configurationChatModeis a service-level enum providing predefined configurations for common use cases
For undocumented features, you can make raw RPC calls:
from notebooklm.rpc import RPCMethod
async with NotebookLMClient.from_storage() as client:
# Each RPCMethod member has its own params shape (a nested list) and
# source_path; mirror the higher-level APIs when in doubt.
result = await client.rpc_call(
RPCMethod.CREATE_NOTEBOOK,
params=["My Notebook", None, None, [2], [1]],
)Google rate limits aggressive API usage:
For artifact-generation methods, use the shared generation retry helper:
from notebooklm.artifacts import with_rate_limit_retry
status = await with_rate_limit_retry(
lambda: client.artifacts.generate_audio(notebook_id),
max_retries=3,
)The chat endpoint supports streaming (internal implementation):
# Standard (non-streaming) - recommended
result = await client.chat.ask(nb_id, "Question")
print(result.answer)
# Streaming is handled internally by the library
# The ask() method returns the complete responseThe following public APIs are available under the top-level notebooklm namespaces for logging, research citation processing, and metadata-aware capability implementations.
Locates the surrounding paragraph/passage of source text for a specific ChatReference citation. Since chat streaming returns only the matching citation fragment, this helper performs a single round-trip to pull the full source text and extract the surrounding context.
async def resolve_chat_reference_passage(
client: NotebookLMClient,
notebook_id: str,
reference: ChatReference,
context_chars: int = 200,
) -> str:
"""Return the surrounding source-text passage for a chat citation."""Example:
from notebooklm import resolve_chat_reference_passage
ask_result = await client.chat.ask(notebook_id, "Explain quantum computing")
first_ref = ask_result.references[0]
passage = await resolve_chat_reference_passage(
client, notebook_id, first_ref, context_chars=150
)
print(f"Context: {passage}")These helpers live in notebooklm.artifacts and can be used with any
artifact-generation callable that returns GenerationStatus.
async def with_rate_limit_retry(
generate_fn: Callable[[], Awaitable[GenerationStatus | None]],
*,
max_retries: int,
initial_delay: float = 60.0,
max_delay: float = 300.0,
multiplier: float = 2.0,
sleep: Callable[[float], Awaitable[Any]] | None = None,
on_retry: Callable[[RateLimitRetryEvent], object | Awaitable[object]] | None = None,
) -> GenerationStatus | None:
"""Run an artifact-generation callable with rate-limit retry."""sleep lets tests or schedulers provide their own async wait function.
on_retry receives a RateLimitRetryEvent before each retry sleep.
Example:
from notebooklm.artifacts import with_rate_limit_retry
status = await with_rate_limit_retry(
lambda: client.artifacts.generate_video(notebook_id),
max_retries=3,
)These are free/pure functions provided in the notebooklm.research module to inspect research reports and parse, normalize, or filter citations.
def normalize_url(url: str) -> str:
"""Normalize source/report URLs for citation matching."""def extract_report_urls(report: str) -> set[str]:
"""Extract normalized URLs from research report markdown/text."""def select_cited_sources(
sources: Sequence[dict[str, Any] | ResearchSource],
report: str,
) -> CitedSourceSelection:
"""Return research sources cited by the completed report.
Falls back to the original source list if no cited URLs are resolved.
"""Example:
from notebooklm.research import select_cited_sources
status = await client.research.wait_for_completion(notebook_id, task_id=task_id)
# Filter only the sources that were explicitly cited in the report markdown
selection = select_cited_sources(status.sources, status.report)
print(f"Total sources: {len(status.sources)}")
print(f"Cited sources: {len(selection.sources)}")Used to configure logging and tag asynchronous execution paths with a persistent correlation ID for tracking requests across concurrency seams.
def configure_logging() -> None:
"""Initialize package logging with redactors and correlation support."""def get_request_id() -> str | None:
"""Return the current correlation id, or None if unset."""
def set_request_id(req_id: str | None = None) -> Token[str | None]:
"""Set the correlation id for this Task/context, returning a ContextVar Token."""
def reset_request_id(token: Token[str | None]) -> None:
"""Restore the correlation id to its previous value."""An asynchronous-safe context manager that manages correlation ID state.
import logging
from notebooklm import correlation_id
logger = logging.getLogger(__name__)
with correlation_id("my-custom-flow-id"):
# All logging statements within this block are tagged with the ID
logger.info("Starting RPC call...")Decomposed Protocols introduced in ADR-0013 to decouple service facades from target domain runtimes.
from typing import Protocol
class NotebookSourceLister(Protocol):
"""Structural source-listing dependency shared across feature APIs."""
async def list(self, notebook_id: str, *, strict: bool = False) -> list[Source]:
"""List sources for a notebook."""from typing import Protocol
class NotebookSourceIdProvider(Protocol):
"""Structural source-id dependency needed by chat and artifact generation."""
async def get_source_ids(self, notebook_id: str) -> list[str]:
"""Return source IDs for a notebook."""