diff --git a/pyproject.toml b/pyproject.toml index 1407af2..2544b7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "psutil>=7.0.0", "python-dotenv>=1.1.1", "requests>=2.32.4", + "jupyter-collaboration>=4.0.0", ] [project.scripts] diff --git a/scribe/notebook/notebook_server.py b/scribe/notebook/notebook_server.py index 36cf625..5a75dde 100644 --- a/scribe/notebook/notebook_server.py +++ b/scribe/notebook/notebook_server.py @@ -6,6 +6,7 @@ from pathlib import Path from datetime import datetime from typing import Dict, Optional +from functools import partial import sys import os @@ -18,6 +19,10 @@ from . import notebook_sever_handlers as _handlers +# Used as a transient notebook comment to indicate that a cell is running. +# It is used to give users visual feedback that something is happening. +RUNNING_COMMENT = "# \u23f3 Running this cell...\n" + # Request/Response models as simple dicts for Tornado handlers @dataclass @@ -68,6 +73,11 @@ def __init__(self, **kwargs): # Note: notebooks_path will be set up in initialize() after config is parsed self.notebooks_path = None + # RTC state (lazy init) + self._ydoc_ext = None + self._rtc_initialized = False + self._session_ydocs = {} + def initialize(self, argv=None): """Initialize the server after parsing configuration.""" # Call parent initialization first to parse config @@ -112,14 +122,14 @@ def _setup_notebooks_directory(self) -> Path: f"No write permission for notebooks directory: {notebooks_path}" ) - print(f"๐Ÿ“ Notebooks directory: {notebooks_path}") + print(f"\U0001f4c1 Notebooks directory: {notebooks_path}") return notebooks_path except Exception as e: error_msg = ( f"Failed to set up notebooks directory '{self.notebooks_dir}': {str(e)}" ) - print(f"โŒ {error_msg}") + print(f"\u274c {error_msg}") raise ValueError(error_msg) from e def init_webapp(self): @@ -147,6 +157,209 @@ def init_webapp(self): if hasattr(self, "shutdown_check_callback"): self.shutdown_check_callback.start() + # โ”€โ”€ RTC helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + + def _try_init_rtc(self): + """Lazily initialize RTC by finding the YDocExtension. Cached after first call.""" + if self._rtc_initialized: + return self._ydoc_ext + + self._rtc_initialized = True + try: + from jupyter_server_ydoc.app import YDocExtension + + ext_apps = self.extension_manager.extension_apps.get( + "jupyter_server_ydoc" + ) + if ext_apps: + # extension_apps is a dict of name -> list of ExtensionApp instances + # or name -> ExtensionApp depending on version + if isinstance(ext_apps, list): + self._ydoc_ext = ext_apps[0] if ext_apps else None + else: + self._ydoc_ext = ext_apps + + if self._ydoc_ext is not None: + print("RTC: jupyter-collaboration extension found", file=sys.stderr) + else: + print( + "RTC: jupyter-collaboration extension not loaded, using file I/O", + file=sys.stderr, + ) + except Exception as e: + print( + f"RTC: Could not initialize ({e}), using file I/O", + file=sys.stderr, + ) + self._ydoc_ext = None + + return self._ydoc_ext + + async def _get_ydoc_for_session(self, session_id): + """Get (or create) the YNotebook for a session. Returns None if RTC unavailable.""" + if session_id in self._session_ydocs: + return self._session_ydocs[session_id] + + ydoc_ext = self._try_init_rtc() + if ydoc_ext is None: + return None + + session = self.sessions.get(session_id) + if not session: + return None + + try: + from jupyter_server_ydoc.utils import encode_file_path, room_id_from_encoded_path + from jupyter_server_ydoc.rooms import DocumentRoom + from jupyter_server_ydoc.websocketserver import RoomNotFound + + # Get file_id_manager and index the notebook file + file_id_manager = self.web_app.settings["file_id_manager"] + nb_path = session.notebook_path + + try: + relative_path = str(nb_path.relative_to(Path.cwd())) + except ValueError: + relative_path = str(nb_path) + + file_id = file_id_manager.index(relative_path) + if file_id is None: + print(f"RTC: Could not index file {relative_path}", file=sys.stderr) + return None + + encoded_path = encode_file_path("json", "notebook", file_id) + room_id = room_id_from_encoded_path(encoded_path) + + # Check if room already exists + if ydoc_ext.ywebsocket_server.room_exists(room_id): + room = await ydoc_ext.ywebsocket_server.get_room(room_id) + else: + # Create the room (same pattern as YDocWebSocketHandler.prepare) + from jupyter_server_ydoc.stores import SQLiteYStore + from logging import getLogger + + def _exception_logger(exception, log): + log.error( + f"Document Room Exception (room_id={room_id}): ", + exc_info=exception, + ) + return True + + file = ydoc_ext.file_loaders[file_id] + updates_file_path = f".notebook:{file_id}.y" + ystore_class = partial( + ydoc_ext.ystore_class, config=ydoc_ext.config + ) + ystore = ystore_class( + path=updates_file_path, + log=self.log, + ) + + room = DocumentRoom( + room_id, + "json", + "notebook", + file, + self.event_logger, + ystore, + self.log, + exception_handler=_exception_logger, + save_delay=ydoc_ext.document_save_delay, + ) + + await ydoc_ext.ywebsocket_server.start_room(room) + ydoc_ext.ywebsocket_server.add_room(room_id, room) + await room.initialize() + + ynotebook = room._document + self._session_ydocs[session_id] = ynotebook + return ynotebook + + except Exception as e: + print(f"RTC: Failed to get Y-document ({e}), using file I/O", file=sys.stderr) + return None + + def _output_to_ymap(self, output_dict): + """Convert a Scribe output dict to a pycrdt.Map for the Y-document.""" + try: + from pycrdt import Map, Text, Array + + output_type = output_dict.get("output_type") + if output_type == "stream": + ymap = Map() + ymap["output_type"] = output_type + ymap["name"] = output_dict["name"] + ytext = Text() + ytext += output_dict["text"] + ymap["text"] = ytext + return ymap + elif output_type == "execute_result": + ymap = Map() + ymap["output_type"] = output_type + data_map = Map() + for k, v in output_dict.get("data", {}).items(): + data_map[k] = v + ymap["data"] = data_map + meta_map = Map() + for k, v in output_dict.get("metadata", {}).items(): + meta_map[k] = v + ymap["metadata"] = meta_map + if "execution_count" in output_dict: + ymap["execution_count"] = output_dict["execution_count"] + return ymap + elif output_type == "display_data": + ymap = Map() + ymap["output_type"] = output_type + data_map = Map() + for k, v in output_dict.get("data", {}).items(): + data_map[k] = v + ymap["data"] = data_map + meta_map = Map() + for k, v in output_dict.get("metadata", {}).items(): + meta_map[k] = v + ymap["metadata"] = meta_map + return ymap + elif output_type == "error": + ymap = Map() + ymap["output_type"] = output_type + ymap["ename"] = output_dict["ename"] + ymap["evalue"] = output_dict["evalue"] + traceback_arr = Array() + for line in output_dict.get("traceback", []): + traceback_arr.append(line) + ymap["traceback"] = traceback_arr + return ymap + else: + return None + except Exception: + return None + + async def _set_cell_source(self, session_id, cell_index, source_text): + """Set the source of a cell. Uses Y-document if available, else file I/O.""" + ynotebook = self._session_ydocs.get(session_id) + if ynotebook is not None: + try: + ycell = ynotebook.ycells[cell_index] + ysource = ycell["source"] + ysource.clear() + ysource += source_text + return + except Exception as e: + print(f"RTC: Failed to set cell source ({e}), falling back to file I/O", file=sys.stderr) + + # Fallback: file I/O + session = self.sessions.get(session_id) + if not session: + return + with open(session.notebook_path, "r") as f: + nb = nbformat.read(f, as_version=nbformat.NO_CONVERT) + if cell_index < len(nb.cells): + nb.cells[cell_index].source = source_text + with open(session.notebook_path, "w") as f: + nbformat.write(clean_notebook_for_save(nb), f) + + # โ”€โ”€ Session lifecycle โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + async def start_session( self, experiment_name=None, existing_notebook_path=None, fork_prev_notebook=True ): @@ -275,73 +488,44 @@ async def start_session( ) self.sessions[session_id] = scribe_session + # Initialize Y-document room (loads file from disk into Y-doc) + ynotebook = await self._get_ydoc_for_session(session_id) + """STEP 3: If we have an existing notebook, execute all code cells to restore state """ restoration_results = [] if existing_notebook_path: - # Read the notebook - with open(nb_path, "r") as f: - nb = nbformat.read(f, as_version=nbformat.NO_CONVERT) - - if nb.cells: - # Use the same approach for both fork and resume: update cells in place + if ynotebook is not None: + # RTC restoration: iterate Y-cells, execute code cells, update Y-doc + num_cells = ynotebook.cell_number print( - f"{'Restoring' if fork_prev_notebook else 'Resuming'} notebook with {len(nb.cells)} cells..." + f"{'Restoring' if fork_prev_notebook else 'Resuming'} notebook with {num_cells} cells (RTC)..." ) - for i, cell in enumerate(nb.cells): - if cell.cell_type == "code" and cell.source.strip(): + for i in range(num_cells): + cell_dict = ynotebook.get_cell(i) + if cell_dict.get("cell_type") == "code" and cell_dict.get("source", "").strip(): try: - # Clear existing outputs in the cell - cell.outputs = [] + ycell = ynotebook.ycells[i] + + # Clear existing outputs + ycell["outputs"].clear() - # Execute the cell print(f"Executing cell {i + 1} (code cell)...") - # Get next execution count scribe_session.execution_count += 1 execution_count = scribe_session.execution_count - cell.execution_count = execution_count + ycell["execution_count"] = execution_count - # Execute and collect outputs + source = cell_dict["source"] outputs = [] async for output in self._execute_and_stream( - session_id, cell.source + session_id, source ): outputs.append(output) + ymap = self._output_to_ymap(output) + if ymap is not None: + ycell["outputs"].append(ymap) - # Convert output to nbformat and add to cell - if output["output_type"] == "stream": - cell_output = nbformat.v4.new_output( - output_type="stream", - name=output["name"], - text=output["text"], - ) - elif output["output_type"] == "execute_result": - cell_output = nbformat.v4.new_output( - output_type="execute_result", - data=output["data"], - metadata=output.get("metadata", {}), - execution_count=output.get("execution_count"), - ) - elif output["output_type"] == "display_data": - cell_output = nbformat.v4.new_output( - output_type="display_data", - data=output["data"], - metadata=output.get("metadata", {}), - ) - elif output["output_type"] == "error": - cell_output = nbformat.v4.new_output( - output_type="error", - ename=output["ename"], - evalue=output["evalue"], - traceback=output["traceback"], - ) - else: - continue - - cell.outputs.append(cell_output) - - # Check if there were any errors in the outputs errors = [ o for o in outputs if o.get("output_type") == "error" ] @@ -357,7 +541,7 @@ async def start_session( } ) else: - print(f"โœ“ Cell {i + 1} executed successfully") + print(f"\u2713 Cell {i + 1} executed successfully") restoration_results.append( {"cell": i + 1, "status": "success"} ) @@ -368,11 +552,93 @@ async def start_session( restoration_results.append( {"cell": i + 1, "status": "error", "error": error_msg} ) - # Continue with other cells even if one fails + else: + # File I/O restoration (original path) + with open(nb_path, "r") as f: + nb = nbformat.read(f, as_version=nbformat.NO_CONVERT) - # Save the notebook with updated outputs - with open(nb_path, "w") as f: - nbformat.write(clean_notebook_for_save(nb), f) + if nb.cells: + print( + f"{'Restoring' if fork_prev_notebook else 'Resuming'} notebook with {len(nb.cells)} cells..." + ) + + for i, cell in enumerate(nb.cells): + if cell.cell_type == "code" and cell.source.strip(): + try: + cell.outputs = [] + print(f"Executing cell {i + 1} (code cell)...") + + scribe_session.execution_count += 1 + execution_count = scribe_session.execution_count + cell.execution_count = execution_count + + outputs = [] + async for output in self._execute_and_stream( + session_id, cell.source + ): + outputs.append(output) + + if output["output_type"] == "stream": + cell_output = nbformat.v4.new_output( + output_type="stream", + name=output["name"], + text=output["text"], + ) + elif output["output_type"] == "execute_result": + cell_output = nbformat.v4.new_output( + output_type="execute_result", + data=output["data"], + metadata=output.get("metadata", {}), + execution_count=output.get("execution_count"), + ) + elif output["output_type"] == "display_data": + cell_output = nbformat.v4.new_output( + output_type="display_data", + data=output["data"], + metadata=output.get("metadata", {}), + ) + elif output["output_type"] == "error": + cell_output = nbformat.v4.new_output( + output_type="error", + ename=output["ename"], + evalue=output["evalue"], + traceback=output["traceback"], + ) + else: + continue + + cell.outputs.append(cell_output) + + errors = [ + o for o in outputs if o.get("output_type") == "error" + ] + if errors: + error_msg = f"Cell {i + 1} executed with errors: {errors[0]['ename']}: {errors[0]['evalue']}" + print(f"ERROR: {error_msg}") + restoration_results.append( + { + "cell": i + 1, + "status": "error", + "error": error_msg, + "traceback": errors[0].get("traceback", []), + } + ) + else: + print(f"\u2713 Cell {i + 1} executed successfully") + restoration_results.append( + {"cell": i + 1, "status": "success"} + ) + + except Exception as e: + error_msg = f"Failed to execute cell {i + 1}: {str(e)}" + print(f"ERROR: {error_msg}") + restoration_results.append( + {"cell": i + 1, "status": "error", "error": error_msg} + ) + + # Save the notebook with updated outputs + with open(nb_path, "w") as f: + nbformat.write(clean_notebook_for_save(nb), f) result = { "session_id": session_id, @@ -405,15 +671,28 @@ async def add_markdown_cell(self, session_id: str, content: str): if not session: raise ValueError(f"Session {session_id} not found") - # Read notebook + # Try RTC first + ynotebook = await self._get_ydoc_for_session(session_id) + if ynotebook is not None: + try: + ynotebook.append_cell( + { + "cell_type": "markdown", + "source": content, + "metadata": {}, + } + ) + return ynotebook.cell_number + except Exception as e: + print(f"RTC: Failed to add markdown cell ({e}), falling back to file I/O", file=sys.stderr) + + # File I/O fallback with open(session.notebook_path, "r") as f: nb = nbformat.read(f, as_version=nbformat.NO_CONVERT) - # Add markdown cell cell = nbformat.v4.new_markdown_cell(source=content) nb.cells.append(cell) - # Write back with open(session.notebook_path, "w") as f: nbformat.write(clean_notebook_for_save(nb), f) @@ -429,15 +708,31 @@ async def _add_pending_cell(self, session_id: str, code: str) -> int: if not session: raise ValueError(f"Session {session_id} not found") - # Read notebook - with open(session.notebook_path, "r") as f: - nb = nbformat.read(f, as_version=nbformat.NO_CONVERT) - # Get next execution count session.execution_count += 1 execution_count = session.execution_count - # Create new cell with pending status + # Try RTC first + ynotebook = await self._get_ydoc_for_session(session_id) + if ynotebook is not None: + try: + ynotebook.append_cell( + { + "cell_type": "code", + "source": RUNNING_COMMENT + code, + "metadata": {"execution_status": "pending"}, + "execution_count": execution_count, + "outputs": [], + } + ) + return ynotebook.cell_number - 1 + except Exception as e: + print(f"RTC: Failed to add pending cell ({e}), falling back to file I/O", file=sys.stderr) + + # File I/O fallback + with open(session.notebook_path, "r") as f: + nb = nbformat.read(f, as_version=nbformat.NO_CONVERT) + cell = nbformat.v4.new_code_cell( source=code, outputs=[], @@ -445,11 +740,9 @@ async def _add_pending_cell(self, session_id: str, code: str) -> int: metadata={"execution_status": "pending"}, ) - # Append cell nb.cells.append(cell) cell_index = len(nb.cells) - 1 - # Write back immediately with open(session.notebook_path, "w") as f: nbformat.write(clean_notebook_for_save(nb), f) @@ -463,7 +756,20 @@ async def _update_cell_output( if not session: return - # Read notebook + # Try RTC first + ynotebook = self._session_ydocs.get(session_id) + if ynotebook is not None: + try: + ycell = ynotebook.ycells[cell_index] + ymap = self._output_to_ymap(output) + if ymap is not None: + ycell["outputs"].append(ymap) + ycell["execution_state"] = "busy" + return + except Exception as e: + print(f"RTC: Failed to update cell output ({e}), falling back to file I/O", file=sys.stderr) + + # File I/O fallback with open(session.notebook_path, "r") as f: nb = nbformat.read(f, as_version=nbformat.NO_CONVERT) @@ -472,7 +778,6 @@ async def _update_cell_output( cell = nb.cells[cell_index] - # Convert output dict to nbformat output if output["output_type"] == "stream": cell_output = nbformat.v4.new_output( output_type="stream", name=output["name"], text=output["text"] @@ -500,13 +805,9 @@ async def _update_cell_output( else: return - # Append output cell.outputs.append(cell_output) - - # Update status cell.metadata["execution_status"] = status - # Write back with open(session.notebook_path, "w") as f: nbformat.write(clean_notebook_for_save(nb), f) @@ -637,6 +938,11 @@ async def execute_code_in_kernel( session_id, cell_index, error_output, status="error" ) + finally: + # Strip running comment if RTC was used + if cell_index is not None and session_id in self._session_ydocs: + await self._set_cell_source(session_id, cell_index, code) + return { "outputs": outputs, "execution_count": execution_count, @@ -649,14 +955,24 @@ async def _update_cell_status(self, session_id: str, cell_index: int, status: st if not session: return - # Read notebook + # Try RTC first + ynotebook = self._session_ydocs.get(session_id) + if ynotebook is not None: + try: + ycell = ynotebook.ycells[cell_index] + ycell["execution_state"] = "idle" + ycell["metadata"]["execution_status"] = status + return + except Exception as e: + print(f"RTC: Failed to update cell status ({e}), falling back to file I/O", file=sys.stderr) + + # File I/O fallback with open(session.notebook_path, "r") as f: nb = nbformat.read(f, as_version=nbformat.NO_CONVERT) if cell_index < len(nb.cells): nb.cells[cell_index].metadata["execution_status"] = status - # Write back with open(session.notebook_path, "w") as f: nbformat.write(clean_notebook_for_save(nb), f) @@ -672,11 +988,94 @@ async def edit_and_execute_cell( if not session: raise ValueError(f"Session {session_id} not found") - # Read notebook + # Try RTC first + ynotebook = self._session_ydocs.get(session_id) + if ynotebook is not None: + try: + # Find code cells in Y-doc + code_cell_indices = [] + for i in range(ynotebook.cell_number): + cell_dict = ynotebook.get_cell(i) + if cell_dict.get("cell_type") == "code": + code_cell_indices.append(i) + + if not code_cell_indices: + raise ValueError("No code cells found in notebook") + + # Handle negative indexing + idx = cell_index + if idx < 0: + idx = len(code_cell_indices) + idx + + if idx < 0 or idx >= len(code_cell_indices): + raise ValueError( + f"Cell index {cell_index} out of range. Notebook has {len(code_cell_indices)} code cells." + ) + + actual_index = code_cell_indices[idx] + ycell = ynotebook.ycells[actual_index] + + # Update source with running comment + ysource = ycell["source"] + ysource.clear() + ysource += RUNNING_COMMENT + new_code + + # Clear outputs + ycell["outputs"].clear() + + # Update execution count and state + session.execution_count += 1 + execution_count = session.execution_count + ycell["execution_count"] = execution_count + ycell["execution_state"] = "busy" + ycell["metadata"]["execution_status"] = "pending" + + # Execute and stream outputs + outputs = [] + try: + async for output in self._execute_and_stream(session_id, new_code): + outputs.append(output) + ymap = self._output_to_ymap(output) + if ymap is not None: + ycell["outputs"].append(ymap) + + ycell["execution_state"] = "idle" + ycell["metadata"]["execution_status"] = "complete" + + except Exception as e: + error_output = { + "output_type": "error", + "ename": type(e).__name__, + "evalue": str(e), + "traceback": [f"Error during execution: {str(e)}"], + } + outputs.append(error_output) + ymap = self._output_to_ymap(error_output) + if ymap is not None: + ycell["outputs"].append(ymap) + ycell["execution_state"] = "idle" + ycell["metadata"]["execution_status"] = "error" + + finally: + # Strip running comment + await self._set_cell_source(session_id, actual_index, new_code) + + return { + "cell_index": cell_index, + "actual_notebook_index": actual_index, + "outputs": outputs, + "execution_count": execution_count, + } + + except ValueError: + raise + except Exception as e: + print(f"RTC: Failed to edit cell ({e}), falling back to file I/O", file=sys.stderr) + + # File I/O fallback with open(session.notebook_path, "r") as f: nb = nbformat.read(f, as_version=nbformat.NO_CONVERT) - # Find code cells only code_cells = [ (i, cell) for i, cell in enumerate(nb.cells) if cell.cell_type == "code" ] @@ -684,7 +1083,6 @@ async def edit_and_execute_cell( if not code_cells: raise ValueError("No code cells found in notebook") - # Handle negative indexing for convenience if cell_index < 0: cell_index = len(code_cells) + cell_index @@ -693,38 +1091,30 @@ async def edit_and_execute_cell( f"Cell index {cell_index} out of range. Notebook has {len(code_cells)} code cells." ) - # Get the actual notebook index and cell actual_index, cell = code_cells[cell_index] - # Update the cell source and clear outputs cell.source = new_code cell.outputs = [] - # Get next execution count session.execution_count += 1 execution_count = session.execution_count cell.execution_count = execution_count cell.metadata["execution_status"] = "pending" - # Save the updated cell immediately with open(session.notebook_path, "w") as f: nbformat.write(clean_notebook_for_save(nb), f) - # Execute and stream outputs outputs = [] try: async for output in self._execute_and_stream(session_id, new_code): outputs.append(output) - # Update notebook with each output as it arrives await self._update_cell_output( session_id, actual_index, output, status="running" ) - # Mark as complete await self._update_cell_status(session_id, actual_index, "complete") except Exception as e: - # If error occurs, mark cell as error error_output = { "output_type": "error", "ename": type(e).__name__, @@ -753,6 +1143,8 @@ async def shutdown_session(self, session_id: str): sm = self.web_app.settings["session_manager"] await sm.delete_session(session.jupyter_session_id) + # Clean up RTC cache (don't clean up the room itself; extension handles that) + self._session_ydocs.pop(session_id, None) # Clean up our session tracking del self.sessions[session_id] @@ -782,7 +1174,7 @@ async def check_auto_shutdown(self): # Remove inactive sessions for session_id in inactive_sessions: print( - f"๐Ÿงน Cleaning up inactive session {session_id} (idle for {self.auto_shutdown_minutes}+ minutes)" + f"\U0001f9f9 Cleaning up inactive session {session_id} (idle for {self.auto_shutdown_minutes}+ minutes)" ) try: await self.shutdown_session(session_id) @@ -798,7 +1190,7 @@ async def check_auto_shutdown(self): if idle_minutes >= self.auto_shutdown_minutes: print( - f"\nโฐ Auto-shutdown: Server idle for {int(idle_minutes)} minutes" + f"\n\u23f0 Auto-shutdown: Server idle for {int(idle_minutes)} minutes" ) print(" Shutting down...")