diff --git a/README.rst b/README.rst index 9f76d6b5..c5da0942 100644 --- a/README.rst +++ b/README.rst @@ -158,6 +158,22 @@ See the full `Chat Message History`_ tutorial. .. _`Chat Message History`: https://github.com/googleapis/langchain-google-cloud-sql-pg-python/tree/main/docs/chat_message_history.ipynb +Langgraph Checkpoint Usage +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Use ``PostgresSaver`` to save snapshots of the graph state at a given point in time. + +.. code:: python + + from langchain_google_cloud_sql_pg import PostgresSaver, PostgresEngine + + engine = PostgresEngine.from_instance("project-id", "region", "my-instance", "my-database") + checkpoint = PostgresSaver.create_sync(engine) + +See the full `Checkpoint`_ tutorial. + +.. _`Checkpoint`: https://github.com/googleapis/langchain-google-cloud-sql-pg-python/tree/main/docs/langgraph_checkpointer.ipynb + Contributions ~~~~~~~~~~~~~ diff --git a/docs/langgraph_checkpoint.ipynb b/docs/langgraph_checkpoint.ipynb new file mode 100644 index 00000000..c663f95d --- /dev/null +++ b/docs/langgraph_checkpoint.ipynb @@ -0,0 +1,424 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3 (ipykernel)", + "language": "python" + }, + "language_info": { + "name": "python" + } + }, + "cells": [ + { + "cell_type": "markdown", + "source": [ + "# Google Cloud SQL for PostgreSQL\n", + "\n", + "> [Cloud SQL](https://cloud.google.com/sql) is a fully managed relational database service that offers high performance, seamless integration, and impressive scalability. It offers MySQL, PostgreSQL, and SQL Server database engines. Extend your database application to build AI-powered experiences leveraging Cloud SQL's Langchain integrations.\n", + "\n", + "This notebook goes over how to use `Cloud SQL for PostgreSQL` to store checkpoints with the PostgresSaver class.\n", + "\n", + "Learn more about the package on [GitHub](https://github.com/googleapis/langchain-google-cloud-sql-pg-python/).\n", + "\n", + "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/googleapis/langchain-google-cloud-sql-pg-python/blob/main/docs/langgraph_checkpoint.ipynb)" + ], + "metadata": { + "id": "xHq_1Zh8TfCz" + } + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Prerequisites\n", + "\n", + "This guide assumes familiarity with the following:\n", + "\n", + "- [LangGraph Persistence](https://langchain-ai.github.io/langgraph/concepts/persistence/)\n", + "- [Postgresql](https://www.postgresql.org/about/)\n", + "\n", + "When creating LangGraph agents, you can also set them up so that they persist their state. This allows you to do things like interact with an agent multiple times and have it remember previous interactions." + ] + }, + { + "cell_type": "markdown", + "source": [ + "## Before You Begin\n", + "\n", + "To run this notebook, you will need to do the following:\n", + "\n", + " * [Create a Google Cloud Project](https://developers.google.com/workspace/guides/create-project)\n", + " * [Enable the Cloud SQL Admin API.](https://console.cloud.google.com/marketplace/product/google/sqladmin.googleapis.com)\n", + " * [Create a Cloud SQL for PostgreSQL instance](https://cloud.google.com/sql/docs/postgres/create-instance)\n", + " * [Create a Cloud SQL database](https://cloud.google.com/sql/docs/mysql/create-manage-databases)\n", + " * [Add an IAM database user to the database](https://cloud.google.com/sql/docs/postgres/add-manage-iam-users#creating-a-database-user) (Optional)\n" + ], + "metadata": { + "id": "5NW0xqHRToAM" + } + }, + { + "cell_type": "markdown", + "source": [ + "### 🦜🔗 Library Installation\n", + "The integration lives in its own `langchain-google-cloud-sql-pg` package, so we need to install it." + ], + "metadata": { + "id": "Owc-OZpOT0Jy" + } + }, + { + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "TpNTWLbZT1Dw", + "outputId": "a54015ba-0281-4955-e9e0-36d3125c4887" + }, + "cell_type": "code", + "source": "%pip install --upgrade --quiet langchain-google-cloud-sql-pg[langgraph] langchain-google-vertexai langgraph", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "**Colab only:** Uncomment the following cell to restart the kernel or use the button to restart the kernel. For Vertex AI Workbench you can restart the terminal using the button on top." + }, + { + "cell_type": "code", + "source": [ + "# # Automatically restart kernel after installs so that your environment can access the new packages\n", + "# import IPython\n", + "#\n", + "# app = IPython.Application.instance()\n", + "# app.kernel.do_shutdown(True)" + ], + "metadata": { + "id": "OQrizsZGT4xx" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "### 🔐 Authentication\n", + "Authenticate to Google Cloud as the IAM user logged into this notebook in order to access your Google Cloud Project.\n", + "\n", + "* If you are using Colab to run this notebook, use the cell below and continue.\n", + "* If you are using Vertex AI Workbench, check out the setup instructions [here](https://github.com/GoogleCloudPlatform/generative-ai/tree/main/setup-env)." + ], + "metadata": { + "id": "XpCaEQgvUPBQ" + } + }, + { + "cell_type": "code", + "source": [ + "from google.colab import auth\n", + "\n", + "auth.authenticate_user()" + ], + "metadata": { + "id": "WvydoWCKUOlM" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "### ☁ Set Your Google Cloud Project\n", + "Set your Google Cloud project so that you can leverage Google Cloud resources within this notebook.\n", + "\n", + "If you don't know your project ID, try the following:\n", + "\n", + "* Run `gcloud config list`.\n", + "* Run `gcloud projects list`.\n", + "* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113)." + ], + "metadata": { + "id": "nNK4c-n-Umq9" + } + }, + { + "cell_type": "code", + "source": [ + "# @markdown Please fill in the value below with your Google Cloud project ID and then run the cell.\n", + "\n", + "PROJECT_ID = \"my-project-id\" # @param {type:\"string\"}\n", + "\n", + "# Set the project id\n", + "!gcloud config set project {PROJECT_ID}" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "5pCxbzAGUP9p", + "outputId": "625f4fea-75a7-41ad-fef0-96467df9f41b" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "### 💡 API Enablement\n", + "The `langchain-google-cloud-sql-pg` package requires that you [enable the Cloud SQL Admin API](https://console.cloud.google.com/flows/enableapi?apiid=sqladmin.googleapis.com) in your Google Cloud Project." + ], + "metadata": { + "id": "sfN2P8EkUydF" + } + }, + { + "cell_type": "code", + "source": [ + "# enable Cloud SQL Admin API\n", + "!gcloud services enable sqladmin.googleapis.com" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "HS5OmGusUnP4", + "outputId": "952fdfd9-279f-4246-83a7-bc35793869f2" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "## Basic Usage" + ], + "metadata": { + "id": "j6Sd_BIHU_cV" + } + }, + { + "cell_type": "markdown", + "source": [ + "### Set Cloud SQL database values\n", + "Find your database values, in the [Cloud SQL Instances page](https://console.cloud.google.com/sql?_ga=2.223735448.2062268965.1707700487-2088871159.1707257687)." + ], + "metadata": { + "id": "Kd3WmpRRVGJH" + } + }, + { + "cell_type": "code", + "source": [ + "# @title Set Your Values Here { display-mode: \"form\" }\n", + "REGION = \"us-central1\" # @param {type: \"string\"}\n", + "INSTANCE = \"my-postgresql-instance\" # @param {type: \"string\"}\n", + "DATABASE = \"my-database\" # @param {type: \"string\"}" + ], + "metadata": { + "id": "ASWTYxvGUzfR" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "### PostgresEngine Connection Pool\n", + "\n", + "One of the requirements and arguments to establish Cloud SQL as a PostgresSaver is a `PostgresEngine` object. The `PostgresEngine` configures a connection pool to your Cloud SQL database, enabling successful connections from your application and following industry best practices.\n", + "\n", + "To create a `PostgresEngine` using `PostgresEngine.from_instance()` you need to provide only 4 things:\n", + "\n", + "1. `project_id` : Project ID of the Google Cloud Project where the Cloud SQL instance is located.\n", + "1. `region` : Region where the Cloud SQL instance is located.\n", + "1. `instance` : The name of the Cloud SQL instance.\n", + "1. `database` : The name of the database to connect to on the Cloud SQL instance.\n", + "\n", + "By default, [IAM database authentication](https://cloud.google.com/sql/docs/postgres/iam-authentication#iam-db-auth) will be used as the method of database authentication. This library uses the IAM principal belonging to the [Application Default Credentials (ADC)](https://cloud.google.com/docs/authentication/application-default-credentials) sourced from the envionment.\n", + "\n", + "For more informatin on IAM database authentication please see:\n", + "* [Configure an instance for IAM database authentication](https://cloud.google.com/sql/docs/postgres/create-edit-iam-instances)\n", + "* [Manage users with IAM database authentication](https://cloud.google.com/sql/docs/postgres/add-manage-iam-users)\n", + "\n", + "Optionally, [built-in database authentication](https://cloud.google.com/sql/docs/postgres/built-in-authentication) using a username and password to access the Cloud SQL database can also be used. Just provide the optional `user` and `password` arguments to `PostgresEngine.from_instance()`:\n", + "\n", + "* `user` : Database user to use for built-in database authentication and login\n", + "* `password` : Database password to use for built-in database authentication and login." + ], + "metadata": { + "id": "5noUSRjIVtlO" + } + }, + { + "cell_type": "code", + "source": [ + "from langchain_google_cloud_sql_pg import PostgresEngine\n", + "\n", + "engine = PostgresEngine.from_instance(\n", + " project_id=PROJECT_ID, region=REGION, instance=INSTANCE, database=DATABASE\n", + ")" + ], + "metadata": { + "id": "uYh8ulGNVAus" + }, + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "### Initialize a table\n", + "The `PostgresSaver` class requires a database table with a specific schema in order to store the persist LangGraph agents state.\n", + "The `PostgresEngine` engine has a helper method `init_checkpoint_table()` that can be used to create a table with the proper schema for you." + ] + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "engine.init_checkpoint_table() # Use table_name to customise the table name" + ], + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "#### Optional Tip: 💡\n", + "You can also specify a schema name by passing `schema_name` wherever you pass `table_name`. Eg:\n", + "\n", + "```python\n", + "SCHEMA_NAME=\"my_schema\"\n", + "\n", + "engine.init_chat_history_table(\n", + " table_name=TABLE_NAME,\n", + " schema_name=SCHEMA_NAME # Default: \"public\"\n", + ")\n", + "```" + ] + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "### PostgresSaver\n", + "\n", + "To initialize the `PostgresSaver` class you need to provide only 3 things:\n", + "\n", + "1. `engine` - An instance of a `PostgresEngine` engine.\n", + "2. `table_name` : The name of the table within the Cloud SQL database to store the checkpoints (Default: \"checkpoints\").\n", + "3. `schema_name` : The name of the database schema containing the checkpoints table (Default: \"public\")\n", + "4. `serde`: Serializer for encoding/decoding checkpoints (Default: None)\n" + ] + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "### Example of Checkpointer methods" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "from langchain_google_cloud_sql_pg import PostgresSaver\n", + "\n", + "checkpointer = PostgresSaver.create_sync(\n", + " engine,\n", + " # table_name = TABLE_NAME,\n", + " # schema_name = SCHEMA_NAME,\n", + " # serde = None,\n", + ")" + ], + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "write_config = {\"configurable\": {\"thread_id\": \"1\", \"checkpoint_ns\": \"\"}}\n", + "read_config = {\"configurable\": {\"thread_id\": \"1\"}}\n", + "\n", + "checkpoint = {\n", + " \"v\": 1,\n", + " \"ts\": \"2024-07-31T20:14:19.804150+00:00\",\n", + " \"id\": \"1ef4f797-8335-6428-8001-8a1503f9b875\",\n", + " \"channel_values\": {\"my_key\": \"meow\", \"node\": \"node\"},\n", + " \"channel_versions\": {\"__start__\": 2, \"my_key\": 3, \"start:node\": 3, \"node\": 3},\n", + " \"versions_seen\": {\n", + " \"__input__\": {},\n", + " \"__start__\": {\"__start__\": 1},\n", + " \"node\": {\"start:node\": 2},\n", + " },\n", + " \"pending_sends\": [],\n", + "}\n", + "\n", + "# store checkpoint\n", + "checkpointer.put(write_config, checkpoint, {}, {})\n", + "# load checkpoint\n", + "checkpointer.get(read_config)" + ], + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## 🔗 Adding persistence to the pre-built create react agent" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "from typing import Literal\n", + "\n", + "from langchain_core.tools import tool\n", + "from langchain_google_vertexai import ChatVertexAI\n", + "from langgraph.prebuilt import create_react_agent\n", + "import vertexai\n", + "\n", + "vertexai.init(project=PROJECT_ID, location=REGION)\n", + "\n", + "\n", + "@tool\n", + "def get_weather(city: Literal[\"nyc\", \"sf\"]):\n", + " if city == \"nyc\":\n", + " return \"It might be cloudy in nyc\"\n", + " elif city == \"sf\":\n", + " return \"It's always sunny in sf\"\n", + " else:\n", + " raise AssertionError(\"Unknown city\")\n", + "\n", + "\n", + "tools = [get_weather]\n", + "model = ChatVertexAI(project_name=PROJECT_ID, model_name=\"gemini-2.0-flash-exp\")\n", + "\n", + "graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)\n", + "config = {\"configurable\": {\"thread_id\": \"2\"}}\n", + "res = graph.invoke({\"messages\": [(\"human\", \"what's the weather in sf\")]}, config)\n", + "print(res)" + ], + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "# Example of resulting checkpoint config\n", + "checkpoint = checkpointer.get(config)" + ], + "outputs": [], + "execution_count": null + } + ] +} diff --git a/noxfile.py b/noxfile.py index 8fb4d61b..75eae944 100644 --- a/noxfile.py +++ b/noxfile.py @@ -41,7 +41,7 @@ def docs(session): """Build the docs for this library.""" - session.install("-e", ".") + session.install("-e", ".[test]") session.install( # We need to pin to specific versions of the `sphinxcontrib-*` packages # which still support sphinx 4.x. diff --git a/pyproject.toml b/pyproject.toml index 52f53087..63bb192d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,13 +39,17 @@ Repository = "https://github.com/googleapis/langchain-google-cloud-sql-pg-python Changelog = "https://github.com/googleapis/langchain-google-cloud-sql-pg-python/blob/main/CHANGELOG.md" [project.optional-dependencies] +langgraph = [ + "langgraph-checkpoint>=2.0.9, <3.0.0" +] test = [ "black[jupyter]==25.1.0", "isort==6.0.1", "mypy==1.15.0", "pytest-asyncio==0.25.3", "pytest==8.3.4", - "pytest-cov==6.0.0" + "pytest-cov==6.0.0", + "langgraph==0.2.74" ] [build-system] diff --git a/requirements.txt b/requirements.txt index 6a4889de..c52a1b3d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ numpy==2.2.3; python_version > "3.9" numpy== 2.0.2; python_version <= "3.9" pgvector==0.3.6 SQLAlchemy[asyncio]==2.0.38 +langgraph-checkpoint==2.0.10 \ No newline at end of file diff --git a/src/langchain_google_cloud_sql_pg/__init__.py b/src/langchain_google_cloud_sql_pg/__init__.py index f0d2eab0..ca8ab9ef 100644 --- a/src/langchain_google_cloud_sql_pg/__init__.py +++ b/src/langchain_google_cloud_sql_pg/__init__.py @@ -14,6 +14,7 @@ from . import indexes from .chat_message_history import PostgresChatMessageHistory +from .checkpoint import PostgresSaver from .engine import Column, PostgresEngine from .loader import PostgresDocumentSaver, PostgresLoader from .vectorstore import PostgresVectorStore @@ -27,5 +28,6 @@ "PostgresEngine", "PostgresLoader", "PostgresDocumentSaver", + "PostgresSaver", "__version__", ] diff --git a/src/langchain_google_cloud_sql_pg/async_checkpoint.py b/src/langchain_google_cloud_sql_pg/async_checkpoint.py new file mode 100644 index 00000000..560182f7 --- /dev/null +++ b/src/langchain_google_cloud_sql_pg/async_checkpoint.py @@ -0,0 +1,592 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from typing import Any, AsyncIterator, Iterator, Optional, Sequence, Tuple + +from langchain_core.runnables import RunnableConfig +from langgraph.checkpoint.base import ( + WRITES_IDX_MAP, + BaseCheckpointSaver, + ChannelVersions, + Checkpoint, + CheckpointMetadata, + CheckpointTuple, + get_checkpoint_id, +) +from langgraph.checkpoint.serde.base import SerializerProtocol +from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer +from langgraph.checkpoint.serde.types import TASKS +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine + +from .engine import CHECKPOINTS_TABLE, PostgresEngine + +MetadataInput = Optional[dict[str, Any]] + +checkpoints_columns = [ + "thread_id", + "checkpoint_ns", + "checkpoint_id", + "parent_checkpoint_id", + "type", + "checkpoint", + "metadata", +] + +writes_columns = [ + "thread_id", + "checkpoint_ns", + "checkpoint_id", + "task_id", + "idx", + "channel", + "type", + "blob", +] + + +class AsyncPostgresSaver(BaseCheckpointSaver[str]): + """Checkpoint stored in PgSQL""" + + __create_key = object() + + jsonplus_serde = JsonPlusSerializer() + + def __init__( + self, + key: object, + pool: AsyncEngine, + table_name: str = CHECKPOINTS_TABLE, + schema_name: str = "public", + serde: Optional[SerializerProtocol] = None, + ) -> None: + super().__init__(serde=serde) + if key != AsyncPostgresSaver.__create_key: + raise Exception( + "only create class through 'create' or 'create_sync' methods" + ) + self.pool = pool + self.table_name = table_name + self.table_name_writes = f"{table_name}_writes" + self.schema_name = schema_name + + @classmethod + async def create( + cls, + engine: PostgresEngine, + table_name: str = CHECKPOINTS_TABLE, + schema_name: str = "public", + serde: Optional[SerializerProtocol] = None, + ) -> "AsyncPostgresSaver": + """Create a new AsyncPostgresSaver instance. + + Args: + engine (PostgresEngine): PostgresEngine engine to use. + schema_name (str): The schema name where the table is located (default: "public"). + serde (SerializerProtocol): Serializer for encoding/decoding checkpoints (default: None). + table_name (str): Custom table name to use (default: CHECKPOINTS_TABLE). + + Raises: + IndexError: If the table provided does not contain required schema. + + Returns: + AsyncPostgresSaver: A newly created instance of AsyncPostgresSaver. + """ + + checkpoints_table_schema = await engine._aload_table_schema( + table_name, schema_name + ) + checkpoints_column_names = checkpoints_table_schema.columns.keys() + + if not (all(x in checkpoints_column_names for x in checkpoints_columns)): + raise IndexError( + f"Table checkpoints.'{schema_name}' has incorrect schema. Got " + f"column names '{checkpoints_column_names}' but required column names " + f"'{checkpoints_columns}'.\nPlease create table with following schema:" + f"\nCREATE TABLE {schema_name}.checkpoints (" + "\n thread_id TEXT NOT NULL," + "\n checkpoint_ns TEXT NOT NULL," + "\n checkpoint_id TEXT NOT NULL," + "\n parent_checkpoint_id TEXT," + "\n type TEXT," + "\n checkpoint JSONB NOT NULL," + "\n metadata JSONB NOT NULL" + "\n);" + ) + + checkpoint_writes_table_schema = await engine._aload_table_schema( + f"{table_name}_writes", schema_name + ) + checkpoint_writes_column_names = checkpoint_writes_table_schema.columns.keys() + + if not (all(x in checkpoint_writes_column_names for x in writes_columns)): + raise IndexError( + f"Table checkpoint_writes.'{schema_name}' has incorrect schema. Got " + f"column names '{checkpoint_writes_column_names}' but required column names " + f"'{writes_columns}'.\nPlease create table with following schema:" + f"\nCREATE TABLE {schema_name}.checkpoint_writes (" + "\n thread_id TEXT NOT NULL," + "\n checkpoint_ns TEXT NOT NULL," + "\n checkpoint_id TEXT NOT NULL," + "\n task_id TEXT NOT NULL," + "\n idx INT NOT NULL," + "\n channel TEXT NOT NULL," + "\n type TEXT," + "\n blob JSONB NOT NULL" + "\n);" + ) + return cls(cls.__create_key, engine._pool, table_name, schema_name, serde) + + def _dump_writes( + self, + thread_id: str, + checkpoint_ns: str, + checkpoint_id: str, + task_id: str, + task_path: str, + writes: Sequence[tuple[str, Any]], + ) -> list[dict[str, Any]]: + return [ + { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint_id, + "task_id": task_id, + "task_path": task_path, + "idx": WRITES_IDX_MAP.get(channel, idx), + "channel": channel, + "type": self.serde.dumps_typed(value)[0], + "blob": self.serde.dumps_typed(value)[1], + } + for idx, (channel, value) in enumerate(writes) + ] + + def _load_writes( + self, writes: list[tuple[bytes, bytes, bytes, bytes]] + ) -> list[tuple[str, str, Any]]: + return ( + [ + ( + tid.decode(), + channel.decode(), + self.serde.loads_typed((t.decode(), v)), + ) + for tid, channel, t, v in writes + ] + if writes + else [] + ) + + def _search_where( + self, + config: Optional[RunnableConfig], + filter: MetadataInput, + before: Optional[RunnableConfig] = None, + ) -> tuple[str, dict[str, Any]]: + """Return WHERE clause predicates for alist() given config, filter, before. + + This method returns a tuple of a string and a tuple of values. The string + is the parameterized WHERE clause predicate (including the WHERE keyword): + "WHERE column1 = $1 AND column2 IS $2". The list of values contains the + values for each of the corresponding parameters. + """ + wheres = [] + param_values = {} + + # construct predicate for config filter + if config: + wheres.append("thread_id = :thread_id") + param_values.update({"thread_id": config["configurable"]["thread_id"]}) + checkpoint_ns = config["configurable"].get("checkpoint_ns") + if checkpoint_ns is not None: + wheres.append("checkpoint_ns = :checkpoint_ns") + param_values.update({"checkpoint_ns": checkpoint_ns}) + + if checkpoint_id := get_checkpoint_id(config): + wheres.append("checkpoint_id = :checkpoint_id") + param_values.update({"checkpoint_id": checkpoint_id}) + + # construct predicate for metadata filter + if filter: + wheres.append("encode(metadata,'escape')::jsonb @> :metadata ") + param_values.update({"metadata": f"{json.dumps(filter)}"}) + + # construct predicate for `before` + if before is not None: + wheres.append("checkpoint_id < :checkpoint_id") + param_values.update({"checkpoint_id": get_checkpoint_id(before)}) + + return ( + "WHERE " + " AND ".join(wheres) if wheres else "", + param_values, + ) + + async def aput( + self, + config: RunnableConfig, + checkpoint: Checkpoint, + metadata: CheckpointMetadata, + new_versions: ChannelVersions, + ) -> RunnableConfig: + """Asynchronously store a checkpoint with its configuration and metadata. + + Args: + config (RunnableConfig): Configuration for the checkpoint. + checkpoint (Checkpoint): The checkpoint to store. + metadata (CheckpointMetadata): Additional metadata for the checkpoint. + new_versions (ChannelVersions): New channel versions as of this write. + + Returns: + RunnableConfig: Updated configuration after storing the checkpoint. + """ + configurable = config["configurable"] + thread_id = configurable.get("thread_id") + checkpoint_ns = configurable.get("checkpoint_ns") + checkpoint_id = configurable.get( + "checkpoint_id", configurable.get("thread_ts", None) + ) + + next_config: RunnableConfig = { + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint["id"], + } + } + + query = f"""INSERT INTO "{self.schema_name}"."{self.table_name}" (thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, type, checkpoint, metadata) + VALUES (:thread_id, :checkpoint_ns, :checkpoint_id, :parent_checkpoint_id, :type, :checkpoint, :metadata) + ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id) + DO UPDATE SET + checkpoint = EXCLUDED.checkpoint, + metadata = EXCLUDED.metadata; + """ + + async with self.pool.connect() as conn: + type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) + serialized_metadata = self.jsonplus_serde.dumps(metadata) + await conn.execute( + text(query), + { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint["id"], + "parent_checkpoint_id": checkpoint_id, + "type": type_, + "checkpoint": serialized_checkpoint, + "metadata": serialized_metadata, + }, + ) + await conn.commit() + + return next_config + + async def aput_writes( + self, + config: RunnableConfig, + writes: Sequence[Tuple[str, Any]], + task_id: str, + task_path: str = "", + ) -> None: + """Asynchronously store intermediate writes linked to a checkpoint. + Args: + config (RunnableConfig): Configuration of the related checkpoint. + writes (List[Tuple[str, Any]]): List of writes to store. + task_id (str): Identifier for the task creating the writes. + task_path (str): Path of the task creating the writes. + + Returns: + None + """ + upsert = f"""INSERT INTO "{self.schema_name}"."{self.table_name_writes}"(thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, type, blob) + VALUES (:thread_id, :checkpoint_ns, :checkpoint_id, :task_id, :idx, :channel, :type, :blob) + ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) DO UPDATE SET + channel = EXCLUDED.channel, + type = EXCLUDED.type, + blob = EXCLUDED.blob; + """ + insert = f"""INSERT INTO "{self.schema_name}"."{self.table_name_writes}"(thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, type, blob) + VALUES (:thread_id, :checkpoint_ns, :checkpoint_id, :task_id, :idx, :channel, :type, :blob) + ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) DO NOTHING + """ + query = upsert if all(w[0] in WRITES_IDX_MAP for w in writes) else insert + + params = self._dump_writes( + config["configurable"]["thread_id"], + config["configurable"]["checkpoint_ns"], + config["configurable"]["checkpoint_id"], + task_id, + task_path, + writes, + ) + + async with self.pool.connect() as conn: + await conn.execute( + text(query), + params, + ) + await conn.commit() + + async def alist( + self, + config: Optional[RunnableConfig], + *, + filter: Optional[dict[str, Any]] = None, + before: Optional[RunnableConfig] = None, + limit: Optional[int] = None, + ) -> AsyncIterator[CheckpointTuple]: + """Asynchronously list checkpoints that match the given criteria. + + Args: + config (Optional[RunnableConfig]): Base configuration for filtering checkpoints. + filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata. + before (Optional[RunnableConfig]): List checkpoints created before this configuration. + limit (Optional[int]): Maximum number of checkpoints to return. + + Returns: + AsyncIterator[CheckpointTuple]: Async iterator of matching checkpoint tuples. + """ + SELECT = f""" + SELECT + thread_id, + checkpoint, + checkpoint_ns, + checkpoint_id, + parent_checkpoint_id, + metadata, + type, + ( + SELECT array_agg(array[cw.task_id::text::bytea, cw.channel::bytea, cw.type::bytea, cw.blob] order by cw.task_id, cw.idx) + FROM "{self.schema_name}"."{self.table_name_writes}" cw + where cw.thread_id = c.thread_id + AND cw.checkpoint_ns = c.checkpoint_ns + AND cw.checkpoint_id = c.checkpoint_id + ) AS pending_writes, + ( + SELECT array_agg(array[cw.type::bytea, cw.blob] order by cw.task_path, cw.task_id, cw.idx) + FROM "{self.schema_name}"."{self.table_name_writes}" cw + WHERE cw.thread_id = c.thread_id + AND cw.checkpoint_ns = c.checkpoint_ns + AND cw.checkpoint_id = c.parent_checkpoint_id + AND cw.channel = '{TASKS}' + ) AS pending_sends + FROM "{self.schema_name}"."{self.table_name}" c + """ + + where, args = self._search_where(config, filter, before) + query = SELECT + where + " ORDER BY checkpoint_id DESC" + if limit: + query += f" LIMIT {limit}" + + async with self.pool.connect() as conn: + result = await conn.execute(text(query), args) + while True: + row = result.fetchone() + if not row: + break + value = row._mapping + yield CheckpointTuple( + config={ + "configurable": { + "thread_id": value["thread_id"], + "checkpoint_ns": value["checkpoint_ns"], + "checkpoint_id": value["checkpoint_id"], + } + }, + checkpoint=self.serde.loads_typed( + (value["type"], value["checkpoint"]) + ), + metadata=( + self.jsonplus_serde.loads(value["metadata"]) # type: ignore + if value["metadata"] is not None + else {} + ), + parent_config=( + { + "configurable": { + "thread_id": value["thread_id"], + "checkpoint_ns": value["checkpoint_ns"], + "checkpoint_id": value["parent_checkpoint_id"], + } + } + if value["parent_checkpoint_id"] + else None + ), + pending_writes=self._load_writes(value["pending_writes"]), + ) + + async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: + """Asynchronously fetch a checkpoint tuple using the given configuration. + + Args: + config (RunnableConfig): Configuration specifying which checkpoint to retrieve. + + Returns: + Optional[CheckpointTuple]: The requested checkpoint tuple, or None if not found. + """ + + SELECT = f""" + SELECT + thread_id, + checkpoint, + checkpoint_ns, + checkpoint_id, + parent_checkpoint_id, + metadata, + type, + ( + SELECT array_agg(array[cw.task_id::text::bytea, cw.channel::bytea, cw.type::bytea, cw.blob] order by cw.task_id, cw.idx) + FROM "{self.schema_name}"."{self.table_name_writes}" cw + where cw.thread_id = c.thread_id + AND cw.checkpoint_ns = c.checkpoint_ns + AND cw.checkpoint_id = c.checkpoint_id + ) AS pending_writes, + ( + SELECT array_agg(array[cw.type::bytea, cw.blob] order by cw.task_path, cw.task_id, cw.idx) + FROM "{self.schema_name}"."{self.table_name_writes}" cw + WHERE cw.thread_id = c.thread_id + AND cw.checkpoint_ns = c.checkpoint_ns + AND cw.checkpoint_id = c.parent_checkpoint_id + AND cw.channel = '{TASKS}' + ) AS pending_sends + FROM "{self.schema_name}"."{self.table_name}" c + """ + + thread_id = config["configurable"]["thread_id"] + checkpoint_id = get_checkpoint_id(config) + checkpoint_ns = config["configurable"].get("checkpoint_ns", "") + if checkpoint_id: + args = { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint_id, + } + where = "WHERE thread_id = :thread_id AND checkpoint_ns = :checkpoint_ns AND checkpoint_id = :checkpoint_id" + else: + args = {"thread_id": thread_id, "checkpoint_ns": checkpoint_ns} + where = "WHERE thread_id = :thread_id AND checkpoint_ns = :checkpoint_ns ORDER BY checkpoint_id DESC LIMIT 1" + + async with self.pool.connect() as conn: + result = await conn.execute(text(SELECT + where), args) + row = result.fetchone() + if not row: + return None + value = row._mapping + return CheckpointTuple( + config={ + "configurable": { + "thread_id": value["thread_id"], + "checkpoint_ns": value["checkpoint_ns"], + "checkpoint_id": value["checkpoint_id"], + } + }, + checkpoint=self.serde.loads_typed((value["type"], value["checkpoint"])), + metadata=( + self.jsonplus_serde.loads(value["metadata"]) # type: ignore + if value["metadata"] is not None + else {} + ), + parent_config=( + { + "configurable": { + "thread_id": value["thread_id"], + "checkpoint_ns": value["checkpoint_ns"], + "checkpoint_id": value["parent_checkpoint_id"], + } + } + if value["parent_checkpoint_id"] + else None + ), + pending_writes=self._load_writes(value["pending_writes"]), + ) + + def put( + self, + config: RunnableConfig, + checkpoint: Checkpoint, + metadata: CheckpointMetadata, + new_versions: ChannelVersions, + ) -> RunnableConfig: + """Asynchronously store a checkpoint with its configuration and metadata. + + Args: + config (RunnableConfig): Configuration for the checkpoint. + checkpoint (Checkpoint): The checkpoint to store. + metadata (CheckpointMetadata): Additional metadata for the checkpoint. + new_versions (ChannelVersions): New channel versions as of this write. + + Returns: + RunnableConfig: Updated configuration after storing the checkpoint. + """ + raise NotImplementedError( + "Sync methods are not implemented for AsyncPostgresSaver. Use PostgresSaver interface instead." + ) + + def put_writes( + self, + config: RunnableConfig, + writes: Sequence[Tuple[str, Any]], + task_id: str, + task_path: str = "", + ) -> None: + """Asynchronously store intermediate writes linked to a checkpoint. + Args: + config (RunnableConfig): Configuration of the related checkpoint. + writes (List[Tuple[str, Any]]): List of writes to store. + task_id (str): Identifier for the task creating the writes. + task_path (str): Path of the task creating the writes. + + Returns: + None + """ + raise NotImplementedError( + "Sync methods are not implemented for AsyncPostgresSaver. Use PostgresSaver interface instead." + ) + + def list( + self, + config: Optional[RunnableConfig], + *, + filter: Optional[dict[str, Any]] = None, + before: Optional[RunnableConfig] = None, + limit: Optional[int] = None, + ) -> Iterator[CheckpointTuple]: + """Asynchronously list checkpoints that match the given criteria. + + Args: + config (Optional[RunnableConfig]): Base configuration for filtering checkpoints. + filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata. + before (Optional[RunnableConfig]): List checkpoints created before this configuration. + limit (Optional[int]): Maximum number of checkpoints to return. + + Returns: + AsyncIterator[CheckpointTuple]: Async iterator of matching checkpoint tuples. + """ + raise NotImplementedError( + "Sync methods are not implemented for AsyncPostgresSaver. Use PostgresSaver interface instead." + ) + + def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: + """Asynchronously fetch a checkpoint tuple using the given configuration. + + Args: + config (RunnableConfig): Configuration specifying which checkpoint to retrieve. + + Returns: + Optional[CheckpointTuple]: The requested checkpoint tuple, or None if not found. + """ + raise NotImplementedError( + "Sync methods are not implemented for AsyncPostgresSaver. Use PostgresSaver interface instead." + ) diff --git a/src/langchain_google_cloud_sql_pg/checkpoint.py b/src/langchain_google_cloud_sql_pg/checkpoint.py new file mode 100644 index 00000000..661261bc --- /dev/null +++ b/src/langchain_google_cloud_sql_pg/checkpoint.py @@ -0,0 +1,248 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, AsyncIterator, Iterator, Optional, Sequence, Tuple + +from langchain_core.runnables import RunnableConfig +from langgraph.checkpoint.base import ( + BaseCheckpointSaver, + ChannelVersions, + Checkpoint, + CheckpointMetadata, + CheckpointTuple, +) +from langgraph.checkpoint.serde.base import SerializerProtocol + +from .async_checkpoint import AsyncPostgresSaver +from .engine import CHECKPOINTS_TABLE, PostgresEngine + + +class PostgresSaver(BaseCheckpointSaver[str]): + """Checkpoint stored in PgSQL""" + + __create_key = object() + + def __init__( + self, + key: object, + engine: PostgresEngine, + checkpoint: AsyncPostgresSaver, + table_name: str = CHECKPOINTS_TABLE, + schema_name: str = "public", + serde: Optional[SerializerProtocol] = None, + ) -> None: + super().__init__(serde=serde) + if key != PostgresSaver.__create_key: + raise Exception( + "only create class through 'create' or 'create_sync' methods" + ) + self._engine = engine + self.__checkpoint = checkpoint + + @classmethod + async def create( + cls, + engine: PostgresEngine, + table_name: str = CHECKPOINTS_TABLE, + schema_name: str = "public", + serde: Optional[SerializerProtocol] = None, + ) -> "PostgresSaver": + """Create a new PostgresSaver instance. + Args: + engine (PostgresEngine): PgSQL engine to use. + table_name (str): Table name that stores the checkpoints (default: "checkpoints"). + schema_name (str): The schema name where the table is located (default: "public"). + serde (SerializerProtocol): Serializer for encoding/decoding checkpoints (default: None). + Raises: + IndexError: If the table provided does not contain required schema. + Returns: + PostgresSaver: A newly created instance of PostgresSaver. + """ + coro = AsyncPostgresSaver.create(engine, table_name, schema_name, serde) + checkpoint = await engine._run_as_async(coro) + return cls(cls.__create_key, engine, checkpoint) + + @classmethod + def create_sync( + cls, + engine: PostgresEngine, + table_name: str = CHECKPOINTS_TABLE, + schema_name: str = "public", + serde: Optional[SerializerProtocol] = None, + ) -> "PostgresSaver": + """Create a new PostgresSaver instance. + Args: + engine (PostgresEngine): PgSQL engine to use. + table_name (str): Table name that stores the checkpoints (default: "checkpoints"). + schema_name (str): The schema name where the table is located (default: "public"). + serde (SerializerProtocol): Serializer for encoding/decoding checkpoints (default: None). + Raises: + IndexError: If the table provided does not contain required schema. + Returns: + PostgresSaver: A newly created instance of PostgresSaver. + """ + coro = AsyncPostgresSaver.create(engine, table_name, schema_name, serde) + checkpoint = engine._run_as_sync(coro) + return cls(cls.__create_key, engine, checkpoint) + + async def alist( + self, + config: Optional[RunnableConfig], + filter: Optional[dict[str, Any]] = None, + before: Optional[RunnableConfig] = None, + limit: Optional[int] = None, + ) -> AsyncIterator[CheckpointTuple]: + """Asynchronously list checkpoints that match the given criteria + Args: + config (Optional[RunnableConfig]): Base configuration for filtering checkpoints. + filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata. + before (Optional[RunnableConfig]): List checkpoints created before this configuration. + limit (Optional[int]): Maximum number of checkpoints to return. + Returns: + AsyncIterator[CheckpointTuple]: Async iterator of matching checkpoint tuples. + """ + iterator = self.__checkpoint.alist( + config=config, filter=filter, before=before, limit=limit + ) + while True: + try: + result = await self._engine._run_as_async(iterator.__anext__()) + yield result + except StopAsyncIteration: + break + + def list( + self, + config: Optional[RunnableConfig], + filter: Optional[dict[str, Any]] = None, + before: Optional[RunnableConfig] = None, + limit: Optional[int] = None, + ) -> Iterator[CheckpointTuple]: + """List checkpoints from PgSQL + Args: + config (RunnableConfig): The config to use for listing the checkpoints. + filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata. Defaults to None. + before (Optional[RunnableConfig]): If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None. + limit (Optional[int]): The maximum number of checkpoints to return. Defaults to None. + Yields: + Iterator[CheckpointTuple]: An iterator of checkpoint tuples. + """ + + iterator: AsyncIterator[CheckpointTuple] = self.__checkpoint.alist( + config=config, filter=filter, before=before, limit=limit + ) + while True: + try: + result = self._engine._run_as_sync(iterator.__anext__()) + yield result + except StopAsyncIteration: + break + + async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: + """Asynchronously fetch a checkpoint tuple using the given configuration. + Args: + config (RunnableConfig): The config to use for retrieving the checkpoint. + Returns: + Optional[CheckpointTuple]: The retrieved checkpoint tuple, or None if no matching checkpoint was found. + """ + return await self._engine._run_as_async(self.__checkpoint.aget_tuple(config)) + + def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: + """Get a checkpoint tuple from PgSQL. + Args: + config (RunnableConfig): The config to use for retrieving the checkpoint. + Returns: + Optional[CheckpointTuple]: The retrieved checkpoint tuple, or None if no matching checkpoint was found. + """ + return self._engine._run_as_sync(self.__checkpoint.aget_tuple(config)) + + async def aput( + self, + config: RunnableConfig, + checkpoint: Checkpoint, + metadata: CheckpointMetadata, + new_versions: ChannelVersions, + ) -> RunnableConfig: + """Asynchronously store a checkpoint with its configuration and metadata. + Args: + config (RunnableConfig): The config to associate with the checkpoint. + checkpoint (Checkpoint): The checkpoint to save. + metadata (CheckpointMetadata): Additional metadata to save with the checkpoint. + new_versions (ChannelVersions): New channel versions as of this write. + Returns: + RunnableConfig: Updated configuration after storing the checkpoint. + """ + return await self._engine._run_as_async( + self.__checkpoint.aput(config, checkpoint, metadata, new_versions) + ) + + def put( + self, + config: RunnableConfig, + checkpoint: Checkpoint, + metadata: CheckpointMetadata, + new_versions: ChannelVersions, + ) -> RunnableConfig: + """Save a checkpoint to the database. + Args: + config (RunnableConfig): The config to associate with the checkpoint. + checkpoint (Checkpoint): The checkpoint to save. + metadata (CheckpointMetadata): Additional metadata to save with the checkpoint. + new_versions (ChannelVersions): New channel versions as of this write. + Returns: + RunnableConfig: Updated configuration after storing the checkpoint. + """ + return self._engine._run_as_sync( + self.__checkpoint.aput(config, checkpoint, metadata, new_versions) + ) + + async def aput_writes( + self, + config: RunnableConfig, + writes: Sequence[Tuple[str, Any]], + task_id: str, + task_path: str = "", + ) -> None: + """Asynchronously store intermediate writes linked to a checkpoint. + Args: + config (RunnableConfig): Configuration of the related checkpoint. + writes (List[Tuple[str, Any]]): List of writes to store. + task_id (str): Identifier for the task creating the writes. + task_path (str): Path of the task creating the writes. + Returns: + None + """ + await self._engine._run_as_async( + self.__checkpoint.aput_writes(config, writes, task_id, task_path) + ) + + def put_writes( + self, + config: RunnableConfig, + writes: Sequence[tuple[str, Any]], + task_id: str, + task_path: str = "", + ) -> None: + """Store intermediate writes linked to a checkpoint. + Args: + config (RunnableConfig): Configuration of the related checkpoint. + writes (List[Tuple[str, Any]]): List of writes to store. + task_id (str): Identifier for the task creating the writes. + task_path (str): Path of the task creating the writes. + Returns: + None + """ + self._engine._run_as_sync( + self.__checkpoint.aput_writes(config, writes, task_id, task_path) + ) diff --git a/src/langchain_google_cloud_sql_pg/engine.py b/src/langchain_google_cloud_sql_pg/engine.py index e5a872a1..c40462b5 100644 --- a/src/langchain_google_cloud_sql_pg/engine.py +++ b/src/langchain_google_cloud_sql_pg/engine.py @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -39,6 +39,8 @@ USER_AGENT = "langchain-google-cloud-sql-pg-python/" + __version__ +CHECKPOINTS_TABLE = "checkpoints" + async def _get_iam_principal_email( credentials: google.auth.credentials.Credentials, @@ -747,6 +749,87 @@ def init_document_table( ) ) + async def _ainit_checkpoint_table( + self, table_name: str = CHECKPOINTS_TABLE, schema_name: str = "public" + ) -> None: + """ + Create PgSQL tables to save checkpoints. + + Args: + schema_name (str): The schema name to store the checkpoint tables. + Default: "public". + table_name (str): The PgSQL database table name. + Default: "checkpoints". + + Returns: + None + """ + create_checkpoints_table = f"""CREATE TABLE "{schema_name}"."{table_name}"( + thread_id TEXT NOT NULL, + checkpoint_ns TEXT NOT NULL DEFAULT '', + checkpoint_id TEXT NOT NULL, + parent_checkpoint_id TEXT, + type TEXT, + checkpoint BYTEA, + metadata BYTEA, + PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id) + );""" + + create_checkpoint_writes_table = f"""CREATE TABLE "{schema_name}"."{table_name + "_writes"}"( + thread_id TEXT NOT NULL, + checkpoint_ns TEXT NOT NULL DEFAULT '', + checkpoint_id TEXT NOT NULL, + task_id TEXT NOT NULL, + idx INTEGER NOT NULL, + channel TEXT NOT NULL, + type TEXT, + blob BYTEA NOT NULL, + task_path TEXT NOT NULL DEFAULT '', + PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) + );""" + + async with self._pool.connect() as conn: + await conn.execute(text(create_checkpoints_table)) + await conn.execute(text(create_checkpoint_writes_table)) + await conn.commit() + + async def ainit_checkpoint_table( + self, table_name: str = CHECKPOINTS_TABLE, schema_name: str = "public" + ) -> None: + """Create an PgSQL table to save checkpoint messages. + + Args: + schema_name (str): The schema name to store checkpoint tables. + Default: "public". + table_name (str): The PgSQL database table name. + Default: "checkpoints". + + Returns: + None + """ + await self._run_as_async( + self._ainit_checkpoint_table( + table_name, + schema_name, + ) + ) + + def init_checkpoint_table( + self, table_name: str = CHECKPOINTS_TABLE, schema_name: str = "public" + ) -> None: + """Create Cloud SQL tables to store checkpoints. + + Args: + schema_name (str): The schema name to store checkpoint tables. + Default: "public". + table_name (str): The PgSQL database table name. + Default: "checkpoints". + + Returns: + None + """ + self._run_as_sync(self._ainit_checkpoint_table(table_name, schema_name)) + async def _aload_table_schema( self, table_name: str, @@ -765,7 +848,7 @@ async def _aload_table_schema( ) except InvalidRequestError as e: raise ValueError( - f"Table, '{schema_name}'.'{table_name}', does not exist: " + str(e) + f'Table, "{schema_name}"."{table_name}", does not exist: ' + str(e) ) table = Table(table_name, metadata, schema=schema_name) diff --git a/tests/test_async_checkpoint.py b/tests/test_async_checkpoint.py new file mode 100644 index 00000000..1cad46a5 --- /dev/null +++ b/tests/test_async_checkpoint.py @@ -0,0 +1,430 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import uuid +from typing import Any, List, Literal, Optional, Sequence, Tuple, Union + +import pytest +import pytest_asyncio +from langchain_core.callbacks import CallbackManagerForLLMRun +from langchain_core.language_models import BaseChatModel, LanguageModelInput +from langchain_core.messages import ( + AIMessage, + AnyMessage, + BaseMessage, + HumanMessage, + SystemMessage, + ToolCall, + ToolMessage, +) +from langchain_core.outputs import ChatGeneration, ChatResult +from langchain_core.runnables import RunnableConfig +from langgraph.checkpoint.base import ( + Checkpoint, + CheckpointMetadata, + create_checkpoint, + empty_checkpoint, +) +from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer +from langgraph.prebuilt import ( + ToolNode, + ValidationNode, + create_react_agent, + tools_condition, +) +from sqlalchemy import text +from sqlalchemy.engine.row import RowMapping + +from langchain_google_cloud_sql_pg.async_checkpoint import AsyncPostgresSaver +from langchain_google_cloud_sql_pg.engine import PostgresEngine + +write_config: RunnableConfig = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}} +read_config: RunnableConfig = {"configurable": {"thread_id": "1"}} +thread_agent_config: RunnableConfig = {"configurable": {"thread_id": "123"}} + +project_id = os.environ["PROJECT_ID"] +region = os.environ["REGION"] +instance_id = os.environ["INSTANCE_ID"] +db_name = os.environ["DATABASE_ID"] +table_name = "checkpoint" + str(uuid.uuid4()) +table_name_writes = table_name + "_writes" + +checkpoint: Checkpoint = { + "v": 1, + "ts": "2024-07-31T20:14:19.804150+00:00", + "id": "1ef4f797-8335-6428-8001-8a1503f9b875", + "channel_values": {"my_key": "meow", "node": "node"}, + "channel_versions": { + "__start__": 2, + "my_key": 3, + "start:node": 3, + "node": 3, + }, + "versions_seen": { + "__input__": {}, + "__start__": {"__start__": 1}, + "node": {"start:node": 2}, + }, + "pending_sends": [], +} + + +class AnyStr(str): + def __init__(self, prefix: Union[str, re.Pattern] = "") -> None: + super().__init__() + self.prefix = prefix + + def __eq__(self, other: object) -> bool: + return isinstance(other, str) and ( + ( + other.startswith(self.prefix) + if isinstance(self.prefix, str) + else bool(self.prefix.match(other)) + ) + ) + + def __hash__(self) -> int: + return hash((str(self), self.prefix)) + + +def _AnyIdToolMessage(**kwargs: Any) -> ToolMessage: + """Create a tool message with an any id field.""" + message = ToolMessage(**kwargs) + message.id = AnyStr() + return message + + +async def aexecute(engine: PostgresEngine, query: str) -> None: + async with engine._pool.connect() as conn: + await conn.execute(text(query)) + await conn.commit() + + +async def afetch(engine: PostgresEngine, query: str) -> Sequence[RowMapping]: + async with engine._pool.connect() as conn: + result = await conn.execute(text(query)) + result_map = result.mappings() + result_fetch = result_map.fetchall() + return result_fetch + + +@pytest_asyncio.fixture +async def async_engine(): + async_engine = await PostgresEngine.afrom_instance( + project_id=project_id, + region=region, + instance=instance_id, + database=db_name, + ) + + yield async_engine + + await aexecute(async_engine, f'DROP TABLE IF EXISTS "{table_name}"') + await aexecute(async_engine, f'DROP TABLE IF EXISTS "{table_name_writes}"') + await async_engine.close() + await async_engine._connector.close_async() + + +@pytest_asyncio.fixture +async def checkpointer(async_engine): + await async_engine._ainit_checkpoint_table(table_name=table_name) + checkpointer = await AsyncPostgresSaver.create( + async_engine, + table_name, # serde=JsonPlusSerializer + ) + yield checkpointer + + +@pytest.mark.asyncio +async def test_checkpoint_async( + async_engine: PostgresEngine, + checkpointer: AsyncPostgresSaver, +) -> None: + test_config = { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875", + } + } + # Verify if updated configuration after storing the checkpoint is correct + next_config = await checkpointer.aput(write_config, checkpoint, {}, {}) + assert dict(next_config) == test_config + + # Verify if the checkpoint is stored correctly in the database + results = await afetch(async_engine, f'SELECT * FROM "{table_name}"') + assert len(results) == 1 + for row in results: + assert isinstance(row["thread_id"], str) + await aexecute(async_engine, f'TRUNCATE TABLE "{table_name}"') + + +@pytest.fixture +def test_data(): + """Fixture providing test data for checkpoint tests.""" + config_0: RunnableConfig = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}} + config_1: RunnableConfig = { + "configurable": { + "thread_id": "thread-1", + # for backwards compatibility testing + "thread_ts": "1", + "checkpoint_ns": "", + } + } + config_2: RunnableConfig = { + "configurable": { + "thread_id": "thread-2", + "checkpoint_id": "2", + "checkpoint_ns": "", + } + } + config_3: RunnableConfig = { + "configurable": { + "thread_id": "thread-2", + "checkpoint_id": "2-inner", + "checkpoint_ns": "inner", + } + } + chkpnt_0: Checkpoint = { + "v": 1, + "ts": "2024-07-31T20:14:19.804150+00:00", + "id": "1ef4f797-8335-6428-8001-8a1503f9b875", + "channel_values": {"my_key": "meow", "node": "node"}, + "channel_versions": { + "__start__": 2, + "my_key": 3, + "start:node": 3, + "node": 3, + }, + "versions_seen": { + "__input__": {}, + "__start__": {"__start__": 1}, + "node": {"start:node": 2}, + }, + "pending_sends": [], + } + chkpnt_1: Checkpoint = empty_checkpoint() + chkpnt_2: Checkpoint = create_checkpoint(chkpnt_1, {}, 1) + chkpnt_3: Checkpoint = empty_checkpoint() + + metadata_1: CheckpointMetadata = { + "source": "input", + "step": 2, + "writes": {}, + "parents": 1, + } + metadata_2: CheckpointMetadata = { + "source": "loop", + "step": 1, + "writes": {"foo": "bar"}, + "parents": None, + } + metadata_3: CheckpointMetadata = {} + + return { + "configs": [config_0, config_1, config_2, config_3], + "checkpoints": [chkpnt_0, chkpnt_1, chkpnt_2, chkpnt_3], + "metadata": [metadata_1, metadata_2, metadata_3], + } + + +@pytest.mark.asyncio +async def test_checkpoint_aput_writes( + async_engine: PostgresEngine, + checkpointer: AsyncPostgresSaver, +) -> None: + config: RunnableConfig = { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875", + } + } + + # Verify if the checkpoint writes are stored correctly in the database + writes: Sequence[Tuple[str, Any]] = [ + ("test_channel1", {}), + ("test_channel2", {}), + ] + await checkpointer.aput_writes(config, writes, task_id="1") + + results = await afetch(async_engine, f'SELECT * FROM "{table_name_writes}"') + assert len(results) == 2 + for row in results: + assert isinstance(row["task_id"], str) + await aexecute(async_engine, f'TRUNCATE TABLE "{table_name_writes}"') + + +@pytest.mark.asyncio +async def test_checkpoint_alist( + async_engine: PostgresEngine, + checkpointer: AsyncPostgresSaver, + test_data: dict[str, Any], +) -> None: + configs = test_data["configs"] + checkpoints = test_data["checkpoints"] + metadata = test_data["metadata"] + + await checkpointer.aput(configs[1], checkpoints[1], metadata[0], {}) + await checkpointer.aput(configs[2], checkpoints[2], metadata[1], {}) + await checkpointer.aput(configs[3], checkpoints[3], metadata[2], {}) + + # call method / assertions + query_1 = {"source": "input"} # search by 1 key + query_2 = { + "step": 1, + "writes": {"foo": "bar"}, + } # search by multiple keys + query_3: dict[str, Any] = {} # search by no keys, return all checkpoints + query_4 = {"source": "update", "step": 1} # no match + + search_results_1 = [c async for c in checkpointer.alist(None, filter=query_1)] + assert len(search_results_1) == 1 + print(metadata[0]) + print(search_results_1[0].metadata) + assert search_results_1[0].metadata == metadata[0] + + search_results_2 = [c async for c in checkpointer.alist(None, filter=query_2)] + assert len(search_results_2) == 1 + assert search_results_2[0].metadata == metadata[1] + + search_results_3 = [c async for c in checkpointer.alist(None, filter=query_3)] + assert len(search_results_3) == 3 + + search_results_4 = [c async for c in checkpointer.alist(None, filter=query_4)] + assert len(search_results_4) == 0 + + # search by config (defaults to checkpoints across all namespaces) + search_results_5 = [ + c async for c in checkpointer.alist({"configurable": {"thread_id": "thread-2"}}) + ] + assert len(search_results_5) == 2 + assert { + search_results_5[0].config["configurable"]["checkpoint_ns"], + search_results_5[1].config["configurable"]["checkpoint_ns"], + } == {"", "inner"} + + +class FakeToolCallingModel(BaseChatModel): + tool_calls: Optional[list[list[ToolCall]]] = None + index: int = 0 + tool_style: Literal["openai", "anthropic"] = "openai" + + def _generate( + self, + messages: List[BaseMessage], + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> ChatResult: + """Top Level call""" + messages_string = "-".join( + [str(m.content) for m in messages if isinstance(m.content, str)] + ) + tool_calls = ( + self.tool_calls[self.index % len(self.tool_calls)] + if self.tool_calls + else [] + ) + message = AIMessage( + content=messages_string, + id=str(self.index), + tool_calls=tool_calls.copy(), + ) + self.index += 1 + return ChatResult(generations=[ChatGeneration(message=message)]) + + @property + def _llm_type(self) -> str: + return "fake-tool-call-model" + + +@pytest.mark.asyncio +async def test_checkpoint_with_agent( + checkpointer: AsyncPostgresSaver, +) -> None: + # from the tests in https://github.com/langchain-ai/langgraph/blob/909190cede6a80bb94a2d4cfe7dedc49ef0d4127/libs/langgraph/tests/test_prebuilt.py + model = FakeToolCallingModel() + + agent = create_react_agent(model, [], checkpointer=checkpointer) + inputs = [HumanMessage("hi?")] + response = await agent.ainvoke( + {"messages": inputs}, config=thread_agent_config, debug=True + ) + expected_response = {"messages": inputs + [AIMessage(content="hi?", id="0")]} + assert response == expected_response + + def _AnyIdHumanMessage(**kwargs: Any) -> HumanMessage: + """Create a human message with an any id field.""" + message = HumanMessage(**kwargs) + message.id = AnyStr() + return message + + saved = await checkpointer.aget_tuple(thread_agent_config) + assert saved is not None + assert saved.checkpoint["channel_values"] == { + "messages": [ + _AnyIdHumanMessage(content="hi?"), + AIMessage(content="hi?", id="0"), + ], + "agent": "agent", + } + assert saved.metadata == { + "parents": {}, + "source": "loop", + "writes": {"agent": {"messages": [AIMessage(content="hi?", id="0")]}}, + "step": 1, + "thread_id": "123", + } + assert saved.pending_writes == [] + + +@pytest.mark.asyncio +async def test_checkpoint_aget_tuple( + checkpointer: AsyncPostgresSaver, + test_data: dict[str, Any], +) -> None: + configs = test_data["configs"] + checkpoints = test_data["checkpoints"] + metadata = test_data["metadata"] + + new_config = await checkpointer.aput(configs[1], checkpoints[1], metadata[0], {}) + + # Matching checkpoint + search_results_1 = await checkpointer.aget_tuple(new_config) + assert search_results_1.metadata == metadata[0] # type: ignore + + # No matching checkpoint + assert await checkpointer.aget_tuple(configs[0]) is None + + +@pytest.mark.asyncio +async def test_metadata( + checkpointer: AsyncPostgresSaver, + test_data: dict[str, Any], +) -> None: + config = await checkpointer.aput( + test_data["configs"][0], + test_data["checkpoints"][0], + {"my_key": "abc"}, # type: ignore + {}, + ) + assert (await checkpointer.aget_tuple(config)).metadata["my_key"] == "abc" # type: ignore + assert [c async for c in checkpointer.alist(None, filter={"my_key": "abc"})][ + 0 + ].metadata[ + "my_key" # type: ignore + ] == "abc" # type: ignore diff --git a/tests/test_checkpoint.py b/tests/test_checkpoint.py new file mode 100644 index 00000000..3a9c5bd3 --- /dev/null +++ b/tests/test_checkpoint.py @@ -0,0 +1,355 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid +from typing import Any, Sequence, Tuple + +import pytest +import pytest_asyncio +from langchain_core.runnables import RunnableConfig +from langgraph.checkpoint.base import ( + Checkpoint, + CheckpointMetadata, + create_checkpoint, + empty_checkpoint, +) +from sqlalchemy import text +from sqlalchemy.engine.row import RowMapping + +from langchain_google_cloud_sql_pg.checkpoint import PostgresSaver +from langchain_google_cloud_sql_pg.engine import PostgresEngine + +write_config: RunnableConfig = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}} +read_config: RunnableConfig = {"configurable": {"thread_id": "1"}} + +project_id = os.environ["PROJECT_ID"] +region = os.environ["REGION"] +instance_id = os.environ["INSTANCE_ID"] +db_name = os.environ["DATABASE_ID"] +table_name = "checkpoint" + str(uuid.uuid4()) +table_name_writes = table_name + "_writes" +table_name_async = "checkpoint" + str(uuid.uuid4()) +table_name_writes_async = table_name_async + "_writes" + +checkpoint: Checkpoint = { + "v": 1, + "ts": "2024-07-31T20:14:19.804150+00:00", + "id": "1ef4f797-8335-6428-8001-8a1503f9b875", + "channel_values": {"my_key": "meow", "node": "node"}, + "channel_versions": { + "__start__": 2, + "my_key": 3, + "start:node": 3, + "node": 3, + }, + "versions_seen": { + "__input__": {}, + "__start__": {"__start__": 1}, + "node": {"start:node": 2}, + }, + "pending_sends": [], +} + + +def get_env_var(key: str, desc: str) -> str: + v = os.environ.get(key) + if v is None: + raise ValueError(f"Must set env var {key} to: {desc}") + return v + + +async def aexecute(engine: PostgresEngine, query: str) -> None: + async def run(engine, query): + async with engine._pool.connect() as conn: + await conn.execute(text(query)) + await conn.commit() + + await engine._run_as_async(run(engine, query)) + + +async def afetch(engine: PostgresEngine, query: str) -> Sequence[RowMapping]: + async def run(engine, query): + async with engine._pool.connect() as conn: + result = await conn.execute(text(query)) + result_map = result.mappings() + result_fetch = result_map.fetchall() + return result_fetch + + return await engine._run_as_async(run(engine, query)) + + +@pytest_asyncio.fixture +async def engine(): + engine = PostgresEngine.from_instance( + project_id=project_id, + region=region, + instance=instance_id, + database=db_name, + ) + yield engine + # use default table + await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name}"') + await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name_writes}"') + await engine.close() + await engine._connector.close_async() + + +@pytest_asyncio.fixture +async def async_engine(): + async_engine = await PostgresEngine.afrom_instance( + project_id=project_id, + region=region, + instance=instance_id, + database=db_name, + ) + yield async_engine + + await aexecute(async_engine, f'DROP TABLE IF EXISTS "{table_name_async}"') + await aexecute(async_engine, f'DROP TABLE IF EXISTS "{table_name_writes_async}"') + await async_engine.close() + await async_engine._connector.close_async() + + +@pytest_asyncio.fixture +def checkpointer(engine): + engine.init_checkpoint_table(table_name=table_name) + checkpointer = PostgresSaver.create_sync(engine, table_name) + yield checkpointer + + +@pytest_asyncio.fixture +async def async_checkpointer(async_engine): + await async_engine.ainit_checkpoint_table(table_name=table_name_async) + async_checkpointer = await PostgresSaver.create(async_engine, table_name_async) + yield async_checkpointer + + +@pytest.mark.asyncio +async def test_checkpoint_async( + async_engine: PostgresEngine, + async_checkpointer: PostgresSaver, +) -> None: + test_config = { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875", + } + } + # Verify if updated configuration after storing the checkpoint is correct + next_config = await async_checkpointer.aput(write_config, checkpoint, {}, {}) + assert dict(next_config) == test_config + + # Verify if the checkpoint is stored correctly in the database + results = await afetch(async_engine, f'SELECT * FROM "{table_name_async}"') + assert len(results) == 1 + for row in results: + assert isinstance(row["thread_id"], str) + await aexecute(async_engine, f'TRUNCATE TABLE "{table_name_async}"') + + +# Test put method for checkpoint +@pytest.mark.asyncio +async def test_checkpoint_sync( + engine: PostgresEngine, + checkpointer: PostgresSaver, +) -> None: + test_config = { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875", + } + } + # Verify if updated configuration after storing the checkpoint is correct + next_config = checkpointer.put(write_config, checkpoint, {}, {}) + assert dict(next_config) == test_config + + # Verify if the checkpoint is stored correctly in the database + results = await afetch(engine, f'SELECT * FROM "{table_name}"') + assert len(results) == 1 + for row in results: + assert isinstance(row["thread_id"], str) + await aexecute(engine, f'TRUNCATE TABLE "{table_name}"') + + +@pytest.mark.asyncio +async def test_chat_table_async(async_engine): + with pytest.raises(ValueError): + await PostgresSaver.create(engine=async_engine, table_name="doesnotexist") + + +def test_checkpoint_table(engine: Any) -> None: + with pytest.raises(ValueError): + PostgresSaver.create_sync(engine=engine, table_name="doesnotexist") + + +@pytest.fixture +def test_data(): + """Fixture providing test data for checkpoint tests.""" + config_0: RunnableConfig = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}} + config_1: RunnableConfig = { + "configurable": { + "thread_id": "thread-1", + # for backwards compatibility testing + "thread_ts": "1", + "checkpoint_ns": "", + } + } + config_2: RunnableConfig = { + "configurable": { + "thread_id": "thread-2", + "checkpoint_id": "2", + "checkpoint_ns": "", + } + } + config_3: RunnableConfig = { + "configurable": { + "thread_id": "thread-2", + "checkpoint_id": "2-inner", + "checkpoint_ns": "inner", + } + } + chkpnt_0: Checkpoint = { + "v": 1, + "ts": "2024-07-31T20:14:19.804150+00:00", + "id": "1ef4f797-8335-6428-8001-8a1503f9b875", + "channel_values": {"my_key": "meow", "node": "node"}, + "channel_versions": { + "__start__": 2, + "my_key": 3, + "start:node": 3, + "node": 3, + }, + "versions_seen": { + "__input__": {}, + "__start__": {"__start__": 1}, + "node": {"start:node": 2}, + }, + "pending_sends": [], + } + chkpnt_1: Checkpoint = empty_checkpoint() + chkpnt_2: Checkpoint = create_checkpoint(chkpnt_1, {}, 1) + chkpnt_3: Checkpoint = empty_checkpoint() + + metadata_1: CheckpointMetadata = { + "source": "input", + "step": 2, + "writes": {}, + "parents": 1, + } + metadata_2: CheckpointMetadata = { + "source": "loop", + "step": 1, + "writes": {"foo": "bar"}, + "parents": None, + } + metadata_3: CheckpointMetadata = {} + + return { + "configs": [config_0, config_1, config_2, config_3], + "checkpoints": [chkpnt_0, chkpnt_1, chkpnt_2, chkpnt_3], + "metadata": [metadata_1, metadata_2, metadata_3], + } + + +@pytest.mark.asyncio +async def test_checkpoint_put_writes( + engine: PostgresEngine, + checkpointer: PostgresSaver, +) -> None: + config: RunnableConfig = { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875", + } + } + + # Verify if the checkpoint writes are stored correctly in the database + writes: Sequence[Tuple[str, Any]] = [ + ("test_channel1", {}), + ("test_channel2", {}), + ] + checkpointer.put_writes(config, writes, task_id="1") + + results = await afetch(engine, f'SELECT * FROM "{table_name_writes}"') + assert len(results) == 2 + for row in results: + assert isinstance(row["task_id"], str) + await aexecute(engine, f'TRUNCATE TABLE "{table_name_writes}"') + + +def test_checkpoint_list( + checkpointer: PostgresSaver, + test_data: dict[str, Any], +) -> None: + configs = test_data["configs"] + checkpoints = test_data["checkpoints"] + metadata = test_data["metadata"] + + checkpointer.put(configs[1], checkpoints[1], metadata[0], {}) + checkpointer.put(configs[2], checkpoints[2], metadata[1], {}) + checkpointer.put(configs[3], checkpoints[3], metadata[2], {}) + + # call method / assertions + query_1 = {"source": "input"} # search by 1 key + query_2 = { + "step": 1, + "writes": {"foo": "bar"}, + } # search by multiple keys + query_3: dict[str, Any] = {} # search by no keys, return all checkpoints + query_4 = {"source": "update", "step": 1} # no match + + search_results_1 = list(checkpointer.list(None, filter=query_1)) + assert len(search_results_1) == 1 + assert search_results_1[0].metadata == metadata[0] + search_results_2 = list(checkpointer.list(None, filter=query_2)) + assert len(search_results_2) == 1 + assert search_results_2[0].metadata == metadata[1] + + search_results_3 = list(checkpointer.list(None, filter=query_3)) + assert len(search_results_3) == 3 + + search_results_4 = list(checkpointer.list(None, filter=query_4)) + assert len(search_results_4) == 0 + + # search by config (defaults to checkpoints across all namespaces) + search_results_5 = list( + checkpointer.list({"configurable": {"thread_id": "thread-2"}}) + ) + assert len(search_results_5) == 2 + assert { + search_results_5[0].config["configurable"]["checkpoint_ns"], + search_results_5[1].config["configurable"]["checkpoint_ns"], + } == {"", "inner"} + + +def test_checkpoint_get_tuple( + checkpointer: PostgresSaver, + test_data: dict[str, Any], +) -> None: + configs = test_data["configs"] + checkpoints = test_data["checkpoints"] + metadata = test_data["metadata"] + + new_config = checkpointer.put(configs[1], checkpoints[1], metadata[0], {}) + + # Matching checkpoint + search_results_1 = checkpointer.get_tuple(new_config) + assert search_results_1.metadata == metadata[0] # type: ignore + + # No matching checkpoint + assert checkpointer.get_tuple(configs[0]) is None diff --git a/tests/test_engine.py b/tests/test_engine.py index eeff53e1..7883cf4b 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -202,6 +202,7 @@ async def test_password( assert engine await aexecute(engine, "SELECT 1") PostgresEngine._connector = None + await engine.close() async def test_from_engine( self, @@ -302,6 +303,41 @@ async def test_iam_account_override( await aexecute(engine, "SELECT 1") await engine.close() + async def test_ainit_checkpoint_writes_table(self, engine): + table_name = f"checkpoint{uuid.uuid4()}" + table_name_writes = f"{table_name}_writes" + await engine.ainit_checkpoint_table(table_name=table_name) + stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table_name_writes}';" + results = await afetch(engine, stmt) + expected = [ + {"column_name": "thread_id", "data_type": "text"}, + {"column_name": "checkpoint_ns", "data_type": "text"}, + {"column_name": "checkpoint_id", "data_type": "text"}, + {"column_name": "task_id", "data_type": "text"}, + {"column_name": "idx", "data_type": "integer"}, + {"column_name": "channel", "data_type": "text"}, + {"column_name": "type", "data_type": "text"}, + {"column_name": "blob", "data_type": "bytea"}, + {"column_name": "task_path", "data_type": "text"}, + ] + for row in results: + assert row in expected + stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table_name}';" + results = await afetch(engine, stmt) + expected = [ + {"column_name": "thread_id", "data_type": "text"}, + {"column_name": "checkpoint_ns", "data_type": "text"}, + {"column_name": "checkpoint_id", "data_type": "text"}, + {"column_name": "parent_checkpoint_id", "data_type": "text"}, + {"column_name": "checkpoint", "data_type": "bytea"}, + {"column_name": "metadata", "data_type": "bytea"}, + {"column_name": "type", "data_type": "text"}, + ] + for row in results: + assert row in expected + await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name}"') + await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name_writes}"') + @pytest.mark.asyncio(scope="module") class TestEngineSync: @@ -425,6 +461,7 @@ async def test_password( assert engine await aexecute(engine, "SELECT 1") PostgresEngine._connector = None + await engine.close() async def test_engine_constructor_key( self, @@ -453,3 +490,38 @@ async def test_iam_account_override( assert engine await aexecute(engine, "SELECT 1") await engine.close() + + async def test_init_checkpoints_table(self, engine): + table_name = f"checkpoint{uuid.uuid4()}" + table_name_writes = f"{table_name}_writes" + engine.init_checkpoint_table(table_name=table_name) + stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table_name}';" + results = await afetch(engine, stmt) + expected = [ + {"column_name": "thread_id", "data_type": "text"}, + {"column_name": "checkpoint_ns", "data_type": "text"}, + {"column_name": "checkpoint_id", "data_type": "text"}, + {"column_name": "parent_checkpoint_id", "data_type": "text"}, + {"column_name": "type", "data_type": "text"}, + {"column_name": "checkpoint", "data_type": "bytea"}, + {"column_name": "metadata", "data_type": "bytea"}, + ] + for row in results: + assert row in expected + stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table_name_writes}';" + results = await afetch(engine, stmt) + expected = [ + {"column_name": "thread_id", "data_type": "text"}, + {"column_name": "checkpoint_ns", "data_type": "text"}, + {"column_name": "checkpoint_id", "data_type": "text"}, + {"column_name": "task_id", "data_type": "text"}, + {"column_name": "idx", "data_type": "integer"}, + {"column_name": "channel", "data_type": "text"}, + {"column_name": "type", "data_type": "text"}, + {"column_name": "blob", "data_type": "bytea"}, + {"column_name": "task_path", "data_type": "text"}, + ] + for row in results: + assert row in expected + await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name}"') + await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name_writes}"')