diff --git a/.github/workflows/schedule_reporter.yml b/.github/workflows/schedule_reporter.yml index 3549f373..8a974d66 100644 --- a/.github/workflows/schedule_reporter.yml +++ b/.github/workflows/schedule_reporter.yml @@ -20,6 +20,10 @@ on: jobs: run_reporter: - uses: googleapis/langchain-google-alloydb-pg-python/.github/workflows/cloud_build_failure_reporter.yml@main + permissions: + issues: 'write' + checks: 'read' + contents: 'read' + uses: googleapis/langchain-google-alloydb-pg-python/.github/workflows/cloud_build_failure_reporter.yml@074f9932a8099256ff210771473badbd2156713b with: trigger_names: "pg-integration-test-nightly,pg-continuous-test-on-merge" diff --git a/.kokoro/requirements.txt b/.kokoro/requirements.txt index cd183f45..01481136 100644 --- a/.kokoro/requirements.txt +++ b/.kokoro/requirements.txt @@ -276,9 +276,9 @@ jeepney==0.8.0 \ # via # keyring # secretstorage -jinja2==3.1.5 \ - --hash=sha256:8fefff8dc3034e27bb80d67c671eb8a9bc424c0ef4c0826edbff304cceff43bb \ - --hash=sha256:aba0f4dc9ed8013c424088f68a5c226f7d6097ed89b246d7749c2ec4175c6adb +jinja2==3.1.6 \ + --hash=sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d \ + --hash=sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67 # via gcp-releasetool keyring==24.3.1 \ --hash=sha256:c3327b6ffafc0e8befbdb597cacdb4928ffe5c1212f7645f186e6d9957a898db \ diff --git a/README.rst b/README.rst index 9f76d6b5..c5da0942 100644 --- a/README.rst +++ b/README.rst @@ -158,6 +158,22 @@ See the full `Chat Message History`_ tutorial. .. _`Chat Message History`: https://github.com/googleapis/langchain-google-cloud-sql-pg-python/tree/main/docs/chat_message_history.ipynb +Langgraph Checkpoint Usage +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Use ``PostgresSaver`` to save snapshots of the graph state at a given point in time. + +.. code:: python + + from langchain_google_cloud_sql_pg import PostgresSaver, PostgresEngine + + engine = PostgresEngine.from_instance("project-id", "region", "my-instance", "my-database") + checkpoint = PostgresSaver.create_sync(engine) + +See the full `Checkpoint`_ tutorial. + +.. _`Checkpoint`: https://github.com/googleapis/langchain-google-cloud-sql-pg-python/tree/main/docs/langgraph_checkpointer.ipynb + Contributions ~~~~~~~~~~~~~ diff --git a/docs/langgraph_checkpoint.ipynb b/docs/langgraph_checkpoint.ipynb new file mode 100644 index 00000000..c663f95d --- /dev/null +++ b/docs/langgraph_checkpoint.ipynb @@ -0,0 +1,424 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3 (ipykernel)", + "language": "python" + }, + "language_info": { + "name": "python" + } + }, + "cells": [ + { + "cell_type": "markdown", + "source": [ + "# Google Cloud SQL for PostgreSQL\n", + "\n", + "> [Cloud SQL](https://cloud.google.com/sql) is a fully managed relational database service that offers high performance, seamless integration, and impressive scalability. It offers MySQL, PostgreSQL, and SQL Server database engines. Extend your database application to build AI-powered experiences leveraging Cloud SQL's Langchain integrations.\n", + "\n", + "This notebook goes over how to use `Cloud SQL for PostgreSQL` to store checkpoints with the PostgresSaver class.\n", + "\n", + "Learn more about the package on [GitHub](https://github.com/googleapis/langchain-google-cloud-sql-pg-python/).\n", + "\n", + "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/googleapis/langchain-google-cloud-sql-pg-python/blob/main/docs/langgraph_checkpoint.ipynb)" + ], + "metadata": { + "id": "xHq_1Zh8TfCz" + } + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Prerequisites\n", + "\n", + "This guide assumes familiarity with the following:\n", + "\n", + "- [LangGraph Persistence](https://langchain-ai.github.io/langgraph/concepts/persistence/)\n", + "- [Postgresql](https://www.postgresql.org/about/)\n", + "\n", + "When creating LangGraph agents, you can also set them up so that they persist their state. This allows you to do things like interact with an agent multiple times and have it remember previous interactions." + ] + }, + { + "cell_type": "markdown", + "source": [ + "## Before You Begin\n", + "\n", + "To run this notebook, you will need to do the following:\n", + "\n", + " * [Create a Google Cloud Project](https://developers.google.com/workspace/guides/create-project)\n", + " * [Enable the Cloud SQL Admin API.](https://console.cloud.google.com/marketplace/product/google/sqladmin.googleapis.com)\n", + " * [Create a Cloud SQL for PostgreSQL instance](https://cloud.google.com/sql/docs/postgres/create-instance)\n", + " * [Create a Cloud SQL database](https://cloud.google.com/sql/docs/mysql/create-manage-databases)\n", + " * [Add an IAM database user to the database](https://cloud.google.com/sql/docs/postgres/add-manage-iam-users#creating-a-database-user) (Optional)\n" + ], + "metadata": { + "id": "5NW0xqHRToAM" + } + }, + { + "cell_type": "markdown", + "source": [ + "### 🦜🔗 Library Installation\n", + "The integration lives in its own `langchain-google-cloud-sql-pg` package, so we need to install it." + ], + "metadata": { + "id": "Owc-OZpOT0Jy" + } + }, + { + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "TpNTWLbZT1Dw", + "outputId": "a54015ba-0281-4955-e9e0-36d3125c4887" + }, + "cell_type": "code", + "source": "%pip install --upgrade --quiet langchain-google-cloud-sql-pg[langgraph] langchain-google-vertexai langgraph", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "**Colab only:** Uncomment the following cell to restart the kernel or use the button to restart the kernel. For Vertex AI Workbench you can restart the terminal using the button on top." + }, + { + "cell_type": "code", + "source": [ + "# # Automatically restart kernel after installs so that your environment can access the new packages\n", + "# import IPython\n", + "#\n", + "# app = IPython.Application.instance()\n", + "# app.kernel.do_shutdown(True)" + ], + "metadata": { + "id": "OQrizsZGT4xx" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "### 🔐 Authentication\n", + "Authenticate to Google Cloud as the IAM user logged into this notebook in order to access your Google Cloud Project.\n", + "\n", + "* If you are using Colab to run this notebook, use the cell below and continue.\n", + "* If you are using Vertex AI Workbench, check out the setup instructions [here](https://github.com/GoogleCloudPlatform/generative-ai/tree/main/setup-env)." + ], + "metadata": { + "id": "XpCaEQgvUPBQ" + } + }, + { + "cell_type": "code", + "source": [ + "from google.colab import auth\n", + "\n", + "auth.authenticate_user()" + ], + "metadata": { + "id": "WvydoWCKUOlM" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "### ☁ Set Your Google Cloud Project\n", + "Set your Google Cloud project so that you can leverage Google Cloud resources within this notebook.\n", + "\n", + "If you don't know your project ID, try the following:\n", + "\n", + "* Run `gcloud config list`.\n", + "* Run `gcloud projects list`.\n", + "* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113)." + ], + "metadata": { + "id": "nNK4c-n-Umq9" + } + }, + { + "cell_type": "code", + "source": [ + "# @markdown Please fill in the value below with your Google Cloud project ID and then run the cell.\n", + "\n", + "PROJECT_ID = \"my-project-id\" # @param {type:\"string\"}\n", + "\n", + "# Set the project id\n", + "!gcloud config set project {PROJECT_ID}" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "5pCxbzAGUP9p", + "outputId": "625f4fea-75a7-41ad-fef0-96467df9f41b" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "### 💡 API Enablement\n", + "The `langchain-google-cloud-sql-pg` package requires that you [enable the Cloud SQL Admin API](https://console.cloud.google.com/flows/enableapi?apiid=sqladmin.googleapis.com) in your Google Cloud Project." + ], + "metadata": { + "id": "sfN2P8EkUydF" + } + }, + { + "cell_type": "code", + "source": [ + "# enable Cloud SQL Admin API\n", + "!gcloud services enable sqladmin.googleapis.com" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "HS5OmGusUnP4", + "outputId": "952fdfd9-279f-4246-83a7-bc35793869f2" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "## Basic Usage" + ], + "metadata": { + "id": "j6Sd_BIHU_cV" + } + }, + { + "cell_type": "markdown", + "source": [ + "### Set Cloud SQL database values\n", + "Find your database values, in the [Cloud SQL Instances page](https://console.cloud.google.com/sql?_ga=2.223735448.2062268965.1707700487-2088871159.1707257687)." + ], + "metadata": { + "id": "Kd3WmpRRVGJH" + } + }, + { + "cell_type": "code", + "source": [ + "# @title Set Your Values Here { display-mode: \"form\" }\n", + "REGION = \"us-central1\" # @param {type: \"string\"}\n", + "INSTANCE = \"my-postgresql-instance\" # @param {type: \"string\"}\n", + "DATABASE = \"my-database\" # @param {type: \"string\"}" + ], + "metadata": { + "id": "ASWTYxvGUzfR" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "### PostgresEngine Connection Pool\n", + "\n", + "One of the requirements and arguments to establish Cloud SQL as a PostgresSaver is a `PostgresEngine` object. The `PostgresEngine` configures a connection pool to your Cloud SQL database, enabling successful connections from your application and following industry best practices.\n", + "\n", + "To create a `PostgresEngine` using `PostgresEngine.from_instance()` you need to provide only 4 things:\n", + "\n", + "1. `project_id` : Project ID of the Google Cloud Project where the Cloud SQL instance is located.\n", + "1. `region` : Region where the Cloud SQL instance is located.\n", + "1. `instance` : The name of the Cloud SQL instance.\n", + "1. `database` : The name of the database to connect to on the Cloud SQL instance.\n", + "\n", + "By default, [IAM database authentication](https://cloud.google.com/sql/docs/postgres/iam-authentication#iam-db-auth) will be used as the method of database authentication. This library uses the IAM principal belonging to the [Application Default Credentials (ADC)](https://cloud.google.com/docs/authentication/application-default-credentials) sourced from the envionment.\n", + "\n", + "For more informatin on IAM database authentication please see:\n", + "* [Configure an instance for IAM database authentication](https://cloud.google.com/sql/docs/postgres/create-edit-iam-instances)\n", + "* [Manage users with IAM database authentication](https://cloud.google.com/sql/docs/postgres/add-manage-iam-users)\n", + "\n", + "Optionally, [built-in database authentication](https://cloud.google.com/sql/docs/postgres/built-in-authentication) using a username and password to access the Cloud SQL database can also be used. Just provide the optional `user` and `password` arguments to `PostgresEngine.from_instance()`:\n", + "\n", + "* `user` : Database user to use for built-in database authentication and login\n", + "* `password` : Database password to use for built-in database authentication and login." + ], + "metadata": { + "id": "5noUSRjIVtlO" + } + }, + { + "cell_type": "code", + "source": [ + "from langchain_google_cloud_sql_pg import PostgresEngine\n", + "\n", + "engine = PostgresEngine.from_instance(\n", + " project_id=PROJECT_ID, region=REGION, instance=INSTANCE, database=DATABASE\n", + ")" + ], + "metadata": { + "id": "uYh8ulGNVAus" + }, + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "### Initialize a table\n", + "The `PostgresSaver` class requires a database table with a specific schema in order to store the persist LangGraph agents state.\n", + "The `PostgresEngine` engine has a helper method `init_checkpoint_table()` that can be used to create a table with the proper schema for you." + ] + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "engine.init_checkpoint_table() # Use table_name to customise the table name" + ], + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "#### Optional Tip: 💡\n", + "You can also specify a schema name by passing `schema_name` wherever you pass `table_name`. Eg:\n", + "\n", + "```python\n", + "SCHEMA_NAME=\"my_schema\"\n", + "\n", + "engine.init_chat_history_table(\n", + " table_name=TABLE_NAME,\n", + " schema_name=SCHEMA_NAME # Default: \"public\"\n", + ")\n", + "```" + ] + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "### PostgresSaver\n", + "\n", + "To initialize the `PostgresSaver` class you need to provide only 3 things:\n", + "\n", + "1. `engine` - An instance of a `PostgresEngine` engine.\n", + "2. `table_name` : The name of the table within the Cloud SQL database to store the checkpoints (Default: \"checkpoints\").\n", + "3. `schema_name` : The name of the database schema containing the checkpoints table (Default: \"public\")\n", + "4. `serde`: Serializer for encoding/decoding checkpoints (Default: None)\n" + ] + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "### Example of Checkpointer methods" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "from langchain_google_cloud_sql_pg import PostgresSaver\n", + "\n", + "checkpointer = PostgresSaver.create_sync(\n", + " engine,\n", + " # table_name = TABLE_NAME,\n", + " # schema_name = SCHEMA_NAME,\n", + " # serde = None,\n", + ")" + ], + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "write_config = {\"configurable\": {\"thread_id\": \"1\", \"checkpoint_ns\": \"\"}}\n", + "read_config = {\"configurable\": {\"thread_id\": \"1\"}}\n", + "\n", + "checkpoint = {\n", + " \"v\": 1,\n", + " \"ts\": \"2024-07-31T20:14:19.804150+00:00\",\n", + " \"id\": \"1ef4f797-8335-6428-8001-8a1503f9b875\",\n", + " \"channel_values\": {\"my_key\": \"meow\", \"node\": \"node\"},\n", + " \"channel_versions\": {\"__start__\": 2, \"my_key\": 3, \"start:node\": 3, \"node\": 3},\n", + " \"versions_seen\": {\n", + " \"__input__\": {},\n", + " \"__start__\": {\"__start__\": 1},\n", + " \"node\": {\"start:node\": 2},\n", + " },\n", + " \"pending_sends\": [],\n", + "}\n", + "\n", + "# store checkpoint\n", + "checkpointer.put(write_config, checkpoint, {}, {})\n", + "# load checkpoint\n", + "checkpointer.get(read_config)" + ], + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## 🔗 Adding persistence to the pre-built create react agent" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "from typing import Literal\n", + "\n", + "from langchain_core.tools import tool\n", + "from langchain_google_vertexai import ChatVertexAI\n", + "from langgraph.prebuilt import create_react_agent\n", + "import vertexai\n", + "\n", + "vertexai.init(project=PROJECT_ID, location=REGION)\n", + "\n", + "\n", + "@tool\n", + "def get_weather(city: Literal[\"nyc\", \"sf\"]):\n", + " if city == \"nyc\":\n", + " return \"It might be cloudy in nyc\"\n", + " elif city == \"sf\":\n", + " return \"It's always sunny in sf\"\n", + " else:\n", + " raise AssertionError(\"Unknown city\")\n", + "\n", + "\n", + "tools = [get_weather]\n", + "model = ChatVertexAI(project_name=PROJECT_ID, model_name=\"gemini-2.0-flash-exp\")\n", + "\n", + "graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)\n", + "config = {\"configurable\": {\"thread_id\": \"2\"}}\n", + "res = graph.invoke({\"messages\": [(\"human\", \"what's the weather in sf\")]}, config)\n", + "print(res)" + ], + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "# Example of resulting checkpoint config\n", + "checkpoint = checkpointer.get(config)" + ], + "outputs": [], + "execution_count": null + } + ] +} diff --git a/noxfile.py b/noxfile.py index 8fb4d61b..75eae944 100644 --- a/noxfile.py +++ b/noxfile.py @@ -41,7 +41,7 @@ def docs(session): """Build the docs for this library.""" - session.install("-e", ".") + session.install("-e", ".[test]") session.install( # We need to pin to specific versions of the `sphinxcontrib-*` packages # which still support sphinx 4.x. diff --git a/pyproject.toml b/pyproject.toml index de0f1aca..f773a589 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,8 @@ authors = [ dependencies = [ "cloud-sql-python-connector[asyncpg] >= 1.10.0, <2.0.0", "langchain-core>=0.2.36, <1.0.0 ", - "numpy>=1.24.4, <2.0.0", + "numpy>=1.24.4, <3.0.0; python_version > '3.9'", + "numpy>=1.24.4, <=2.0.2; python_version <= '3.9'", "pgvector>=0.2.5, <1.0.0", "SQLAlchemy[asyncio]>=2.0.25, <3.0.0" ] @@ -38,11 +39,14 @@ Repository = "https://github.com/googleapis/langchain-google-cloud-sql-pg-python Changelog = "https://github.com/googleapis/langchain-google-cloud-sql-pg-python/blob/main/CHANGELOG.md" [project.optional-dependencies] +langgraph = [ + "langgraph-checkpoint>=2.0.9, <3.0.0" +] test = [ "black[jupyter]==25.1.0", - "isort==6.0.0", - "mypy==1.13.0", - "pytest-asyncio==0.24.0", + "isort==6.0.1", + "mypy==1.15.0", + "pytest-asyncio==0.25.3", "pytest==8.3.4", "pytest-cov==6.0.0", "langchain-tests==0.3.12" diff --git a/requirements.txt b/requirements.txt index 79e8db67..c52a1b3d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ -cloud-sql-python-connector[asyncpg]==1.14.0 -langchain-core==0.3.22 -numpy==1.26.4 +cloud-sql-python-connector[asyncpg]==1.17.0 +langchain-core==0.3.40 +numpy==2.2.3; python_version > "3.9" +numpy== 2.0.2; python_version <= "3.9" pgvector==0.3.6 -SQLAlchemy[asyncio]==2.0.36 +SQLAlchemy[asyncio]==2.0.38 +langgraph-checkpoint==2.0.10 \ No newline at end of file diff --git a/samples/index_tuning_sample/requirements.txt b/samples/index_tuning_sample/requirements.txt index 9351541e..19489979 100644 --- a/samples/index_tuning_sample/requirements.txt +++ b/samples/index_tuning_sample/requirements.txt @@ -1,3 +1,3 @@ -langchain-community==0.3.16 -langchain-google-cloud-sql-pg==0.11.1 -langchain-google-vertexai==2.0.12 +langchain-community==0.3.18 +langchain-google-cloud-sql-pg==0.12.1 +langchain-google-vertexai==2.0.14 diff --git a/samples/langchain_on_vertexai/requirements.txt b/samples/langchain_on_vertexai/requirements.txt index 090f6c2e..153755af 100644 --- a/samples/langchain_on_vertexai/requirements.txt +++ b/samples/langchain_on_vertexai/requirements.txt @@ -1,5 +1,5 @@ -google-cloud-aiplatform[reasoningengine,langchain]==1.79.0 -google-cloud-resource-manager==1.14.0 -langchain-community==0.3.16 -langchain-google-cloud-sql-pg==0.11.1 -langchain-google-vertexai==2.0.12 +google-cloud-aiplatform[reasoningengine,langchain]==1.81.0 +google-cloud-resource-manager==1.14.1 +langchain-community==0.3.18 +langchain-google-cloud-sql-pg==0.12.1 +langchain-google-vertexai==2.0.14 diff --git a/samples/requirements.txt b/samples/requirements.txt index 090f6c2e..153755af 100644 --- a/samples/requirements.txt +++ b/samples/requirements.txt @@ -1,5 +1,5 @@ -google-cloud-aiplatform[reasoningengine,langchain]==1.79.0 -google-cloud-resource-manager==1.14.0 -langchain-community==0.3.16 -langchain-google-cloud-sql-pg==0.11.1 -langchain-google-vertexai==2.0.12 +google-cloud-aiplatform[reasoningengine,langchain]==1.81.0 +google-cloud-resource-manager==1.14.1 +langchain-community==0.3.18 +langchain-google-cloud-sql-pg==0.12.1 +langchain-google-vertexai==2.0.14 diff --git a/src/langchain_google_cloud_sql_pg/__init__.py b/src/langchain_google_cloud_sql_pg/__init__.py index f0d2eab0..ca8ab9ef 100644 --- a/src/langchain_google_cloud_sql_pg/__init__.py +++ b/src/langchain_google_cloud_sql_pg/__init__.py @@ -14,6 +14,7 @@ from . import indexes from .chat_message_history import PostgresChatMessageHistory +from .checkpoint import PostgresSaver from .engine import Column, PostgresEngine from .loader import PostgresDocumentSaver, PostgresLoader from .vectorstore import PostgresVectorStore @@ -27,5 +28,6 @@ "PostgresEngine", "PostgresLoader", "PostgresDocumentSaver", + "PostgresSaver", "__version__", ] diff --git a/src/langchain_google_cloud_sql_pg/async_checkpoint.py b/src/langchain_google_cloud_sql_pg/async_checkpoint.py new file mode 100644 index 00000000..560182f7 --- /dev/null +++ b/src/langchain_google_cloud_sql_pg/async_checkpoint.py @@ -0,0 +1,592 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from typing import Any, AsyncIterator, Iterator, Optional, Sequence, Tuple + +from langchain_core.runnables import RunnableConfig +from langgraph.checkpoint.base import ( + WRITES_IDX_MAP, + BaseCheckpointSaver, + ChannelVersions, + Checkpoint, + CheckpointMetadata, + CheckpointTuple, + get_checkpoint_id, +) +from langgraph.checkpoint.serde.base import SerializerProtocol +from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer +from langgraph.checkpoint.serde.types import TASKS +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine + +from .engine import CHECKPOINTS_TABLE, PostgresEngine + +MetadataInput = Optional[dict[str, Any]] + +checkpoints_columns = [ + "thread_id", + "checkpoint_ns", + "checkpoint_id", + "parent_checkpoint_id", + "type", + "checkpoint", + "metadata", +] + +writes_columns = [ + "thread_id", + "checkpoint_ns", + "checkpoint_id", + "task_id", + "idx", + "channel", + "type", + "blob", +] + + +class AsyncPostgresSaver(BaseCheckpointSaver[str]): + """Checkpoint stored in PgSQL""" + + __create_key = object() + + jsonplus_serde = JsonPlusSerializer() + + def __init__( + self, + key: object, + pool: AsyncEngine, + table_name: str = CHECKPOINTS_TABLE, + schema_name: str = "public", + serde: Optional[SerializerProtocol] = None, + ) -> None: + super().__init__(serde=serde) + if key != AsyncPostgresSaver.__create_key: + raise Exception( + "only create class through 'create' or 'create_sync' methods" + ) + self.pool = pool + self.table_name = table_name + self.table_name_writes = f"{table_name}_writes" + self.schema_name = schema_name + + @classmethod + async def create( + cls, + engine: PostgresEngine, + table_name: str = CHECKPOINTS_TABLE, + schema_name: str = "public", + serde: Optional[SerializerProtocol] = None, + ) -> "AsyncPostgresSaver": + """Create a new AsyncPostgresSaver instance. + + Args: + engine (PostgresEngine): PostgresEngine engine to use. + schema_name (str): The schema name where the table is located (default: "public"). + serde (SerializerProtocol): Serializer for encoding/decoding checkpoints (default: None). + table_name (str): Custom table name to use (default: CHECKPOINTS_TABLE). + + Raises: + IndexError: If the table provided does not contain required schema. + + Returns: + AsyncPostgresSaver: A newly created instance of AsyncPostgresSaver. + """ + + checkpoints_table_schema = await engine._aload_table_schema( + table_name, schema_name + ) + checkpoints_column_names = checkpoints_table_schema.columns.keys() + + if not (all(x in checkpoints_column_names for x in checkpoints_columns)): + raise IndexError( + f"Table checkpoints.'{schema_name}' has incorrect schema. Got " + f"column names '{checkpoints_column_names}' but required column names " + f"'{checkpoints_columns}'.\nPlease create table with following schema:" + f"\nCREATE TABLE {schema_name}.checkpoints (" + "\n thread_id TEXT NOT NULL," + "\n checkpoint_ns TEXT NOT NULL," + "\n checkpoint_id TEXT NOT NULL," + "\n parent_checkpoint_id TEXT," + "\n type TEXT," + "\n checkpoint JSONB NOT NULL," + "\n metadata JSONB NOT NULL" + "\n);" + ) + + checkpoint_writes_table_schema = await engine._aload_table_schema( + f"{table_name}_writes", schema_name + ) + checkpoint_writes_column_names = checkpoint_writes_table_schema.columns.keys() + + if not (all(x in checkpoint_writes_column_names for x in writes_columns)): + raise IndexError( + f"Table checkpoint_writes.'{schema_name}' has incorrect schema. Got " + f"column names '{checkpoint_writes_column_names}' but required column names " + f"'{writes_columns}'.\nPlease create table with following schema:" + f"\nCREATE TABLE {schema_name}.checkpoint_writes (" + "\n thread_id TEXT NOT NULL," + "\n checkpoint_ns TEXT NOT NULL," + "\n checkpoint_id TEXT NOT NULL," + "\n task_id TEXT NOT NULL," + "\n idx INT NOT NULL," + "\n channel TEXT NOT NULL," + "\n type TEXT," + "\n blob JSONB NOT NULL" + "\n);" + ) + return cls(cls.__create_key, engine._pool, table_name, schema_name, serde) + + def _dump_writes( + self, + thread_id: str, + checkpoint_ns: str, + checkpoint_id: str, + task_id: str, + task_path: str, + writes: Sequence[tuple[str, Any]], + ) -> list[dict[str, Any]]: + return [ + { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint_id, + "task_id": task_id, + "task_path": task_path, + "idx": WRITES_IDX_MAP.get(channel, idx), + "channel": channel, + "type": self.serde.dumps_typed(value)[0], + "blob": self.serde.dumps_typed(value)[1], + } + for idx, (channel, value) in enumerate(writes) + ] + + def _load_writes( + self, writes: list[tuple[bytes, bytes, bytes, bytes]] + ) -> list[tuple[str, str, Any]]: + return ( + [ + ( + tid.decode(), + channel.decode(), + self.serde.loads_typed((t.decode(), v)), + ) + for tid, channel, t, v in writes + ] + if writes + else [] + ) + + def _search_where( + self, + config: Optional[RunnableConfig], + filter: MetadataInput, + before: Optional[RunnableConfig] = None, + ) -> tuple[str, dict[str, Any]]: + """Return WHERE clause predicates for alist() given config, filter, before. + + This method returns a tuple of a string and a tuple of values. The string + is the parameterized WHERE clause predicate (including the WHERE keyword): + "WHERE column1 = $1 AND column2 IS $2". The list of values contains the + values for each of the corresponding parameters. + """ + wheres = [] + param_values = {} + + # construct predicate for config filter + if config: + wheres.append("thread_id = :thread_id") + param_values.update({"thread_id": config["configurable"]["thread_id"]}) + checkpoint_ns = config["configurable"].get("checkpoint_ns") + if checkpoint_ns is not None: + wheres.append("checkpoint_ns = :checkpoint_ns") + param_values.update({"checkpoint_ns": checkpoint_ns}) + + if checkpoint_id := get_checkpoint_id(config): + wheres.append("checkpoint_id = :checkpoint_id") + param_values.update({"checkpoint_id": checkpoint_id}) + + # construct predicate for metadata filter + if filter: + wheres.append("encode(metadata,'escape')::jsonb @> :metadata ") + param_values.update({"metadata": f"{json.dumps(filter)}"}) + + # construct predicate for `before` + if before is not None: + wheres.append("checkpoint_id < :checkpoint_id") + param_values.update({"checkpoint_id": get_checkpoint_id(before)}) + + return ( + "WHERE " + " AND ".join(wheres) if wheres else "", + param_values, + ) + + async def aput( + self, + config: RunnableConfig, + checkpoint: Checkpoint, + metadata: CheckpointMetadata, + new_versions: ChannelVersions, + ) -> RunnableConfig: + """Asynchronously store a checkpoint with its configuration and metadata. + + Args: + config (RunnableConfig): Configuration for the checkpoint. + checkpoint (Checkpoint): The checkpoint to store. + metadata (CheckpointMetadata): Additional metadata for the checkpoint. + new_versions (ChannelVersions): New channel versions as of this write. + + Returns: + RunnableConfig: Updated configuration after storing the checkpoint. + """ + configurable = config["configurable"] + thread_id = configurable.get("thread_id") + checkpoint_ns = configurable.get("checkpoint_ns") + checkpoint_id = configurable.get( + "checkpoint_id", configurable.get("thread_ts", None) + ) + + next_config: RunnableConfig = { + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint["id"], + } + } + + query = f"""INSERT INTO "{self.schema_name}"."{self.table_name}" (thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, type, checkpoint, metadata) + VALUES (:thread_id, :checkpoint_ns, :checkpoint_id, :parent_checkpoint_id, :type, :checkpoint, :metadata) + ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id) + DO UPDATE SET + checkpoint = EXCLUDED.checkpoint, + metadata = EXCLUDED.metadata; + """ + + async with self.pool.connect() as conn: + type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) + serialized_metadata = self.jsonplus_serde.dumps(metadata) + await conn.execute( + text(query), + { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint["id"], + "parent_checkpoint_id": checkpoint_id, + "type": type_, + "checkpoint": serialized_checkpoint, + "metadata": serialized_metadata, + }, + ) + await conn.commit() + + return next_config + + async def aput_writes( + self, + config: RunnableConfig, + writes: Sequence[Tuple[str, Any]], + task_id: str, + task_path: str = "", + ) -> None: + """Asynchronously store intermediate writes linked to a checkpoint. + Args: + config (RunnableConfig): Configuration of the related checkpoint. + writes (List[Tuple[str, Any]]): List of writes to store. + task_id (str): Identifier for the task creating the writes. + task_path (str): Path of the task creating the writes. + + Returns: + None + """ + upsert = f"""INSERT INTO "{self.schema_name}"."{self.table_name_writes}"(thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, type, blob) + VALUES (:thread_id, :checkpoint_ns, :checkpoint_id, :task_id, :idx, :channel, :type, :blob) + ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) DO UPDATE SET + channel = EXCLUDED.channel, + type = EXCLUDED.type, + blob = EXCLUDED.blob; + """ + insert = f"""INSERT INTO "{self.schema_name}"."{self.table_name_writes}"(thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, type, blob) + VALUES (:thread_id, :checkpoint_ns, :checkpoint_id, :task_id, :idx, :channel, :type, :blob) + ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) DO NOTHING + """ + query = upsert if all(w[0] in WRITES_IDX_MAP for w in writes) else insert + + params = self._dump_writes( + config["configurable"]["thread_id"], + config["configurable"]["checkpoint_ns"], + config["configurable"]["checkpoint_id"], + task_id, + task_path, + writes, + ) + + async with self.pool.connect() as conn: + await conn.execute( + text(query), + params, + ) + await conn.commit() + + async def alist( + self, + config: Optional[RunnableConfig], + *, + filter: Optional[dict[str, Any]] = None, + before: Optional[RunnableConfig] = None, + limit: Optional[int] = None, + ) -> AsyncIterator[CheckpointTuple]: + """Asynchronously list checkpoints that match the given criteria. + + Args: + config (Optional[RunnableConfig]): Base configuration for filtering checkpoints. + filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata. + before (Optional[RunnableConfig]): List checkpoints created before this configuration. + limit (Optional[int]): Maximum number of checkpoints to return. + + Returns: + AsyncIterator[CheckpointTuple]: Async iterator of matching checkpoint tuples. + """ + SELECT = f""" + SELECT + thread_id, + checkpoint, + checkpoint_ns, + checkpoint_id, + parent_checkpoint_id, + metadata, + type, + ( + SELECT array_agg(array[cw.task_id::text::bytea, cw.channel::bytea, cw.type::bytea, cw.blob] order by cw.task_id, cw.idx) + FROM "{self.schema_name}"."{self.table_name_writes}" cw + where cw.thread_id = c.thread_id + AND cw.checkpoint_ns = c.checkpoint_ns + AND cw.checkpoint_id = c.checkpoint_id + ) AS pending_writes, + ( + SELECT array_agg(array[cw.type::bytea, cw.blob] order by cw.task_path, cw.task_id, cw.idx) + FROM "{self.schema_name}"."{self.table_name_writes}" cw + WHERE cw.thread_id = c.thread_id + AND cw.checkpoint_ns = c.checkpoint_ns + AND cw.checkpoint_id = c.parent_checkpoint_id + AND cw.channel = '{TASKS}' + ) AS pending_sends + FROM "{self.schema_name}"."{self.table_name}" c + """ + + where, args = self._search_where(config, filter, before) + query = SELECT + where + " ORDER BY checkpoint_id DESC" + if limit: + query += f" LIMIT {limit}" + + async with self.pool.connect() as conn: + result = await conn.execute(text(query), args) + while True: + row = result.fetchone() + if not row: + break + value = row._mapping + yield CheckpointTuple( + config={ + "configurable": { + "thread_id": value["thread_id"], + "checkpoint_ns": value["checkpoint_ns"], + "checkpoint_id": value["checkpoint_id"], + } + }, + checkpoint=self.serde.loads_typed( + (value["type"], value["checkpoint"]) + ), + metadata=( + self.jsonplus_serde.loads(value["metadata"]) # type: ignore + if value["metadata"] is not None + else {} + ), + parent_config=( + { + "configurable": { + "thread_id": value["thread_id"], + "checkpoint_ns": value["checkpoint_ns"], + "checkpoint_id": value["parent_checkpoint_id"], + } + } + if value["parent_checkpoint_id"] + else None + ), + pending_writes=self._load_writes(value["pending_writes"]), + ) + + async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: + """Asynchronously fetch a checkpoint tuple using the given configuration. + + Args: + config (RunnableConfig): Configuration specifying which checkpoint to retrieve. + + Returns: + Optional[CheckpointTuple]: The requested checkpoint tuple, or None if not found. + """ + + SELECT = f""" + SELECT + thread_id, + checkpoint, + checkpoint_ns, + checkpoint_id, + parent_checkpoint_id, + metadata, + type, + ( + SELECT array_agg(array[cw.task_id::text::bytea, cw.channel::bytea, cw.type::bytea, cw.blob] order by cw.task_id, cw.idx) + FROM "{self.schema_name}"."{self.table_name_writes}" cw + where cw.thread_id = c.thread_id + AND cw.checkpoint_ns = c.checkpoint_ns + AND cw.checkpoint_id = c.checkpoint_id + ) AS pending_writes, + ( + SELECT array_agg(array[cw.type::bytea, cw.blob] order by cw.task_path, cw.task_id, cw.idx) + FROM "{self.schema_name}"."{self.table_name_writes}" cw + WHERE cw.thread_id = c.thread_id + AND cw.checkpoint_ns = c.checkpoint_ns + AND cw.checkpoint_id = c.parent_checkpoint_id + AND cw.channel = '{TASKS}' + ) AS pending_sends + FROM "{self.schema_name}"."{self.table_name}" c + """ + + thread_id = config["configurable"]["thread_id"] + checkpoint_id = get_checkpoint_id(config) + checkpoint_ns = config["configurable"].get("checkpoint_ns", "") + if checkpoint_id: + args = { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint_id, + } + where = "WHERE thread_id = :thread_id AND checkpoint_ns = :checkpoint_ns AND checkpoint_id = :checkpoint_id" + else: + args = {"thread_id": thread_id, "checkpoint_ns": checkpoint_ns} + where = "WHERE thread_id = :thread_id AND checkpoint_ns = :checkpoint_ns ORDER BY checkpoint_id DESC LIMIT 1" + + async with self.pool.connect() as conn: + result = await conn.execute(text(SELECT + where), args) + row = result.fetchone() + if not row: + return None + value = row._mapping + return CheckpointTuple( + config={ + "configurable": { + "thread_id": value["thread_id"], + "checkpoint_ns": value["checkpoint_ns"], + "checkpoint_id": value["checkpoint_id"], + } + }, + checkpoint=self.serde.loads_typed((value["type"], value["checkpoint"])), + metadata=( + self.jsonplus_serde.loads(value["metadata"]) # type: ignore + if value["metadata"] is not None + else {} + ), + parent_config=( + { + "configurable": { + "thread_id": value["thread_id"], + "checkpoint_ns": value["checkpoint_ns"], + "checkpoint_id": value["parent_checkpoint_id"], + } + } + if value["parent_checkpoint_id"] + else None + ), + pending_writes=self._load_writes(value["pending_writes"]), + ) + + def put( + self, + config: RunnableConfig, + checkpoint: Checkpoint, + metadata: CheckpointMetadata, + new_versions: ChannelVersions, + ) -> RunnableConfig: + """Asynchronously store a checkpoint with its configuration and metadata. + + Args: + config (RunnableConfig): Configuration for the checkpoint. + checkpoint (Checkpoint): The checkpoint to store. + metadata (CheckpointMetadata): Additional metadata for the checkpoint. + new_versions (ChannelVersions): New channel versions as of this write. + + Returns: + RunnableConfig: Updated configuration after storing the checkpoint. + """ + raise NotImplementedError( + "Sync methods are not implemented for AsyncPostgresSaver. Use PostgresSaver interface instead." + ) + + def put_writes( + self, + config: RunnableConfig, + writes: Sequence[Tuple[str, Any]], + task_id: str, + task_path: str = "", + ) -> None: + """Asynchronously store intermediate writes linked to a checkpoint. + Args: + config (RunnableConfig): Configuration of the related checkpoint. + writes (List[Tuple[str, Any]]): List of writes to store. + task_id (str): Identifier for the task creating the writes. + task_path (str): Path of the task creating the writes. + + Returns: + None + """ + raise NotImplementedError( + "Sync methods are not implemented for AsyncPostgresSaver. Use PostgresSaver interface instead." + ) + + def list( + self, + config: Optional[RunnableConfig], + *, + filter: Optional[dict[str, Any]] = None, + before: Optional[RunnableConfig] = None, + limit: Optional[int] = None, + ) -> Iterator[CheckpointTuple]: + """Asynchronously list checkpoints that match the given criteria. + + Args: + config (Optional[RunnableConfig]): Base configuration for filtering checkpoints. + filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata. + before (Optional[RunnableConfig]): List checkpoints created before this configuration. + limit (Optional[int]): Maximum number of checkpoints to return. + + Returns: + AsyncIterator[CheckpointTuple]: Async iterator of matching checkpoint tuples. + """ + raise NotImplementedError( + "Sync methods are not implemented for AsyncPostgresSaver. Use PostgresSaver interface instead." + ) + + def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: + """Asynchronously fetch a checkpoint tuple using the given configuration. + + Args: + config (RunnableConfig): Configuration specifying which checkpoint to retrieve. + + Returns: + Optional[CheckpointTuple]: The requested checkpoint tuple, or None if not found. + """ + raise NotImplementedError( + "Sync methods are not implemented for AsyncPostgresSaver. Use PostgresSaver interface instead." + ) diff --git a/src/langchain_google_cloud_sql_pg/async_vectorstore.py b/src/langchain_google_cloud_sql_pg/async_vectorstore.py index 0aa4bc07..75333674 100644 --- a/src/langchain_google_cloud_sql_pg/async_vectorstore.py +++ b/src/langchain_google_cloud_sql_pg/async_vectorstore.py @@ -15,6 +15,7 @@ # TODO: Remove below import when minimum supported Python version is 3.10 from __future__ import annotations +import copy import json import uuid from typing import Any, Callable, Iterable, Optional, Sequence @@ -37,6 +38,36 @@ QueryOptions, ) +COMPARISONS_TO_NATIVE = { + "$eq": "=", + "$ne": "!=", + "$lt": "<", + "$lte": "<=", + "$gt": ">", + "$gte": ">=", +} + +SPECIAL_CASED_OPERATORS = { + "$in", + "$nin", + "$between", + "$exists", +} + +TEXT_OPERATORS = { + "$like", + "$ilike", +} + +LOGICAL_OPERATORS = {"$and", "$or", "$not"} + +SUPPORTED_OPERATORS = ( + set(COMPARISONS_TO_NATIVE) + .union(TEXT_OPERATORS) + .union(LOGICAL_OPERATORS) + .union(SPECIAL_CASED_OPERATORS) +) + class AsyncPostgresVectorStore(VectorStore): """Google Cloud SQL for PostgreSQL Vector Store class""" @@ -253,7 +284,7 @@ async def __aadd_embeddings( values_stmt = "VALUES (:id, :content, :embedding" # Add metadata - extra = metadata + extra = copy.deepcopy(metadata) for metadata_column in self.metadata_columns: if metadata_column in metadata: values_stmt += f", :{metadata_column}" @@ -327,7 +358,7 @@ async def aget_by_ids(self, ids: Sequence[str]) -> list[Document]: Document( page_content=row[self.content_column], metadata=metadata, - id=str(row[self.id_column]), + id=row[self.id_column], ) ) ) @@ -537,7 +568,7 @@ async def __query_collection( self, embedding: list[float], k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> Sequence[RowMapping]: """Perform similarity search query on the vector store table.""" @@ -555,6 +586,8 @@ async def __query_collection( column_names = ", ".join(f'"{col}"' for col in columns) + if filter and isinstance(filter, dict): + filter = self._create_filter_clause(filter) filter = f"WHERE {filter}" if filter else "" stmt = f"SELECT {column_names}, {search_function}({self.embedding_column}, '{embedding}') as distance FROM \"{self.schema_name}\".\"{self.table_name}\" {filter} ORDER BY {self.embedding_column} {operator} '{embedding}' LIMIT {k};" if self.index_query_options: @@ -576,7 +609,7 @@ async def asimilarity_search( self, query: str, k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: """Return docs selected by similarity search on query.""" @@ -601,7 +634,7 @@ async def asimilarity_search_with_score( self, query: str, k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[tuple[Document, float]]: """Return docs and distance scores selected by similarity search on query.""" @@ -615,7 +648,7 @@ async def asimilarity_search_by_vector( self, embedding: list[float], k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: """Return docs selected by vector similarity search.""" @@ -629,7 +662,7 @@ async def asimilarity_search_with_score_by_vector( self, embedding: list[float], k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[tuple[Document, float]]: """Return docs and distance scores selected by vector similarity search.""" @@ -651,7 +684,7 @@ async def asimilarity_search_with_score_by_vector( Document( page_content=row[self.content_column], metadata=metadata, - id=str(row[self.id_column]), + id=row[self.id_column], ), row["distance"], ) @@ -665,7 +698,7 @@ async def amax_marginal_relevance_search( k: Optional[int] = None, fetch_k: Optional[int] = None, lambda_mult: Optional[float] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: """Return docs selected using the maximal marginal relevance.""" @@ -686,7 +719,7 @@ async def amax_marginal_relevance_search_by_vector( k: Optional[int] = None, fetch_k: Optional[int] = None, lambda_mult: Optional[float] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: """Return docs selected using the maximal marginal relevance.""" @@ -709,7 +742,7 @@ async def amax_marginal_relevance_search_with_score_by_vector( k: Optional[int] = None, fetch_k: Optional[int] = None, lambda_mult: Optional[float] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[tuple[Document, float]]: """Return docs and distance scores selected using the maximal marginal relevance.""" @@ -742,7 +775,7 @@ async def amax_marginal_relevance_search_with_score_by_vector( Document( page_content=row[self.content_column], metadata=metadata, - id=str(row[self.id_column]), + id=row[self.id_column], ), row["distance"], ) @@ -815,6 +848,194 @@ async def is_valid_index( return bool(len(results) == 1) + def _handle_field_filter( + self, + field: str, + value: Any, + ) -> str: + """Create a filter for a specific field. + Args: + field: name of field + value: value to filter + If provided as is then this will be an equality filter + If provided as a dictionary then this will be a filter, the key + will be the operator and the value will be the value to filter by + Returns: + sql where query as a string + """ + if not isinstance(field, str): + raise ValueError( + f"field should be a string but got: {type(field)} with value: {field}" + ) + + if field.startswith("$"): + raise ValueError( + f"Invalid filter condition. Expected a field but got an operator: " + f"{field}" + ) + + # Allow [a-zA-Z0-9_], disallow $ for now until we support escape characters + if not field.isidentifier(): + raise ValueError( + f"Invalid field name: {field}. Expected a valid identifier." + ) + + if isinstance(value, dict): + # This is a filter specification + if len(value) != 1: + raise ValueError( + "Invalid filter condition. Expected a value which " + "is a dictionary with a single key that corresponds to an operator " + f"but got a dictionary with {len(value)} keys. The first few " + f"keys are: {list(value.keys())[:3]}" + ) + operator, filter_value = list(value.items())[0] + # Verify that that operator is an operator + if operator not in SUPPORTED_OPERATORS: + raise ValueError( + f"Invalid operator: {operator}. " + f"Expected one of {SUPPORTED_OPERATORS}" + ) + else: # Then we assume an equality operator + operator = "$eq" + filter_value = value + + if operator in COMPARISONS_TO_NATIVE: + # Then we implement an equality filter + # native is trusted input + if isinstance(filter_value, str): + filter_value = f"'{filter_value}'" + native = COMPARISONS_TO_NATIVE[operator] + return f"({field} {native} {filter_value})" + elif operator == "$between": + # Use AND with two comparisons + low, high = filter_value + + return f"({field} BETWEEN {low} AND {high})" + elif operator in {"$in", "$nin", "$like", "$ilike"}: + # We'll do force coercion to text + if operator in {"$in", "$nin"}: + for val in filter_value: + if not isinstance(val, (str, int, float)): + raise NotImplementedError( + f"Unsupported type: {type(val)} for value: {val}" + ) + + if isinstance(val, bool): # b/c bool is an instance of int + raise NotImplementedError( + f"Unsupported type: {type(val)} for value: {val}" + ) + + if operator in {"$in"}: + values = str(tuple(val for val in filter_value)) + return f"({field} IN {values})" + elif operator in {"$nin"}: + values = str(tuple(val for val in filter_value)) + return f"({field} NOT IN {values})" + elif operator in {"$like"}: + return f"({field} LIKE '{filter_value}')" + elif operator in {"$ilike"}: + return f"({field} ILIKE '{filter_value}')" + else: + raise NotImplementedError() + elif operator == "$exists": + if not isinstance(filter_value, bool): + raise ValueError( + "Expected a boolean value for $exists " + f"operator, but got: {filter_value}" + ) + else: + if filter_value: + return f"({field} IS NOT NULL)" + else: + return f"({field} IS NULL)" + else: + raise NotImplementedError() + + def _create_filter_clause(self, filters: Any) -> str: + """Create LangChain filter representation to matching SQL where clauses + Args: + filters: Dictionary of filters to apply to the query. + Returns: + String containing the sql where query. + """ + + if not isinstance(filters, dict): + raise ValueError( + f"Invalid type: Expected a dictionary but got type: {type(filters)}" + ) + if len(filters) == 1: + # The only operators allowed at the top level are $AND, $OR, and $NOT + # First check if an operator or a field + key, value = list(filters.items())[0] + if key.startswith("$"): + # Then it's an operator + if key.lower() not in ["$and", "$or", "$not"]: + raise ValueError( + f"Invalid filter condition. Expected $and, $or or $not " + f"but got: {key}" + ) + else: + # Then it's a field + return self._handle_field_filter(key, filters[key]) + + if key.lower() == "$and" or key.lower() == "$or": + if not isinstance(value, list): + raise ValueError( + f"Expected a list, but got {type(value)} for value: {value}" + ) + op = key[1:].upper() # Extract the operator + filter_clause = [self._create_filter_clause(el) for el in value] + if len(filter_clause) > 1: + return f"({f' {op} '.join(filter_clause)})" + elif len(filter_clause) == 1: + return filter_clause[0] + else: + raise ValueError( + "Invalid filter condition. Expected a dictionary " + "but got an empty dictionary" + ) + elif key.lower() == "$not": + if isinstance(value, list): + not_conditions = [ + self._create_filter_clause(item) for item in value + ] + not_stmts = [f"NOT {condition}" for condition in not_conditions] + return f"({' AND '.join(not_stmts)})" + elif isinstance(value, dict): + not_ = self._create_filter_clause(value) + return f"(NOT {not_})" + else: + raise ValueError( + f"Invalid filter condition. Expected a dictionary " + f"or a list but got: {type(value)}" + ) + else: + raise ValueError( + f"Invalid filter condition. Expected $and, $or or $not " + f"but got: {key}" + ) + elif len(filters) > 1: + # Then all keys have to be fields (they cannot be operators) + for key in filters.keys(): + if key.startswith("$"): + raise ValueError( + f"Invalid filter condition. Expected a field but got: {key}" + ) + # These should all be fields and combined using an $and operator + and_ = [self._handle_field_filter(k, v) for k, v in filters.items()] + if len(and_) > 1: + return f"({' AND '.join(and_)})" + elif len(and_) == 1: + return and_[0] + else: + raise ValueError( + "Invalid filter condition. Expected a dictionary " + "but got an empty dictionary" + ) + else: + return "" + def get_by_ids(self, ids: Sequence[str]) -> list[Document]: raise NotImplementedError( "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." @@ -824,7 +1045,7 @@ def similarity_search( self, query: str, k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: raise NotImplementedError( @@ -906,7 +1127,7 @@ def similarity_search_with_score( self, query: str, k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[tuple[Document, float]]: raise NotImplementedError( @@ -917,7 +1138,7 @@ def similarity_search_by_vector( self, embedding: list[float], k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: raise NotImplementedError( @@ -928,7 +1149,7 @@ def similarity_search_with_score_by_vector( self, embedding: list[float], k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[tuple[Document, float]]: raise NotImplementedError( @@ -941,7 +1162,7 @@ def max_marginal_relevance_search( k: Optional[int] = None, fetch_k: Optional[int] = None, lambda_mult: Optional[float] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: raise NotImplementedError( @@ -954,7 +1175,7 @@ def max_marginal_relevance_search_by_vector( k: Optional[int] = None, fetch_k: Optional[int] = None, lambda_mult: Optional[float] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: raise NotImplementedError( @@ -967,7 +1188,7 @@ def max_marginal_relevance_search_with_score_by_vector( k: Optional[int] = None, fetch_k: Optional[int] = None, lambda_mult: Optional[float] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[tuple[Document, float]]: raise NotImplementedError( diff --git a/src/langchain_google_cloud_sql_pg/checkpoint.py b/src/langchain_google_cloud_sql_pg/checkpoint.py new file mode 100644 index 00000000..661261bc --- /dev/null +++ b/src/langchain_google_cloud_sql_pg/checkpoint.py @@ -0,0 +1,248 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, AsyncIterator, Iterator, Optional, Sequence, Tuple + +from langchain_core.runnables import RunnableConfig +from langgraph.checkpoint.base import ( + BaseCheckpointSaver, + ChannelVersions, + Checkpoint, + CheckpointMetadata, + CheckpointTuple, +) +from langgraph.checkpoint.serde.base import SerializerProtocol + +from .async_checkpoint import AsyncPostgresSaver +from .engine import CHECKPOINTS_TABLE, PostgresEngine + + +class PostgresSaver(BaseCheckpointSaver[str]): + """Checkpoint stored in PgSQL""" + + __create_key = object() + + def __init__( + self, + key: object, + engine: PostgresEngine, + checkpoint: AsyncPostgresSaver, + table_name: str = CHECKPOINTS_TABLE, + schema_name: str = "public", + serde: Optional[SerializerProtocol] = None, + ) -> None: + super().__init__(serde=serde) + if key != PostgresSaver.__create_key: + raise Exception( + "only create class through 'create' or 'create_sync' methods" + ) + self._engine = engine + self.__checkpoint = checkpoint + + @classmethod + async def create( + cls, + engine: PostgresEngine, + table_name: str = CHECKPOINTS_TABLE, + schema_name: str = "public", + serde: Optional[SerializerProtocol] = None, + ) -> "PostgresSaver": + """Create a new PostgresSaver instance. + Args: + engine (PostgresEngine): PgSQL engine to use. + table_name (str): Table name that stores the checkpoints (default: "checkpoints"). + schema_name (str): The schema name where the table is located (default: "public"). + serde (SerializerProtocol): Serializer for encoding/decoding checkpoints (default: None). + Raises: + IndexError: If the table provided does not contain required schema. + Returns: + PostgresSaver: A newly created instance of PostgresSaver. + """ + coro = AsyncPostgresSaver.create(engine, table_name, schema_name, serde) + checkpoint = await engine._run_as_async(coro) + return cls(cls.__create_key, engine, checkpoint) + + @classmethod + def create_sync( + cls, + engine: PostgresEngine, + table_name: str = CHECKPOINTS_TABLE, + schema_name: str = "public", + serde: Optional[SerializerProtocol] = None, + ) -> "PostgresSaver": + """Create a new PostgresSaver instance. + Args: + engine (PostgresEngine): PgSQL engine to use. + table_name (str): Table name that stores the checkpoints (default: "checkpoints"). + schema_name (str): The schema name where the table is located (default: "public"). + serde (SerializerProtocol): Serializer for encoding/decoding checkpoints (default: None). + Raises: + IndexError: If the table provided does not contain required schema. + Returns: + PostgresSaver: A newly created instance of PostgresSaver. + """ + coro = AsyncPostgresSaver.create(engine, table_name, schema_name, serde) + checkpoint = engine._run_as_sync(coro) + return cls(cls.__create_key, engine, checkpoint) + + async def alist( + self, + config: Optional[RunnableConfig], + filter: Optional[dict[str, Any]] = None, + before: Optional[RunnableConfig] = None, + limit: Optional[int] = None, + ) -> AsyncIterator[CheckpointTuple]: + """Asynchronously list checkpoints that match the given criteria + Args: + config (Optional[RunnableConfig]): Base configuration for filtering checkpoints. + filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata. + before (Optional[RunnableConfig]): List checkpoints created before this configuration. + limit (Optional[int]): Maximum number of checkpoints to return. + Returns: + AsyncIterator[CheckpointTuple]: Async iterator of matching checkpoint tuples. + """ + iterator = self.__checkpoint.alist( + config=config, filter=filter, before=before, limit=limit + ) + while True: + try: + result = await self._engine._run_as_async(iterator.__anext__()) + yield result + except StopAsyncIteration: + break + + def list( + self, + config: Optional[RunnableConfig], + filter: Optional[dict[str, Any]] = None, + before: Optional[RunnableConfig] = None, + limit: Optional[int] = None, + ) -> Iterator[CheckpointTuple]: + """List checkpoints from PgSQL + Args: + config (RunnableConfig): The config to use for listing the checkpoints. + filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata. Defaults to None. + before (Optional[RunnableConfig]): If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None. + limit (Optional[int]): The maximum number of checkpoints to return. Defaults to None. + Yields: + Iterator[CheckpointTuple]: An iterator of checkpoint tuples. + """ + + iterator: AsyncIterator[CheckpointTuple] = self.__checkpoint.alist( + config=config, filter=filter, before=before, limit=limit + ) + while True: + try: + result = self._engine._run_as_sync(iterator.__anext__()) + yield result + except StopAsyncIteration: + break + + async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: + """Asynchronously fetch a checkpoint tuple using the given configuration. + Args: + config (RunnableConfig): The config to use for retrieving the checkpoint. + Returns: + Optional[CheckpointTuple]: The retrieved checkpoint tuple, or None if no matching checkpoint was found. + """ + return await self._engine._run_as_async(self.__checkpoint.aget_tuple(config)) + + def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: + """Get a checkpoint tuple from PgSQL. + Args: + config (RunnableConfig): The config to use for retrieving the checkpoint. + Returns: + Optional[CheckpointTuple]: The retrieved checkpoint tuple, or None if no matching checkpoint was found. + """ + return self._engine._run_as_sync(self.__checkpoint.aget_tuple(config)) + + async def aput( + self, + config: RunnableConfig, + checkpoint: Checkpoint, + metadata: CheckpointMetadata, + new_versions: ChannelVersions, + ) -> RunnableConfig: + """Asynchronously store a checkpoint with its configuration and metadata. + Args: + config (RunnableConfig): The config to associate with the checkpoint. + checkpoint (Checkpoint): The checkpoint to save. + metadata (CheckpointMetadata): Additional metadata to save with the checkpoint. + new_versions (ChannelVersions): New channel versions as of this write. + Returns: + RunnableConfig: Updated configuration after storing the checkpoint. + """ + return await self._engine._run_as_async( + self.__checkpoint.aput(config, checkpoint, metadata, new_versions) + ) + + def put( + self, + config: RunnableConfig, + checkpoint: Checkpoint, + metadata: CheckpointMetadata, + new_versions: ChannelVersions, + ) -> RunnableConfig: + """Save a checkpoint to the database. + Args: + config (RunnableConfig): The config to associate with the checkpoint. + checkpoint (Checkpoint): The checkpoint to save. + metadata (CheckpointMetadata): Additional metadata to save with the checkpoint. + new_versions (ChannelVersions): New channel versions as of this write. + Returns: + RunnableConfig: Updated configuration after storing the checkpoint. + """ + return self._engine._run_as_sync( + self.__checkpoint.aput(config, checkpoint, metadata, new_versions) + ) + + async def aput_writes( + self, + config: RunnableConfig, + writes: Sequence[Tuple[str, Any]], + task_id: str, + task_path: str = "", + ) -> None: + """Asynchronously store intermediate writes linked to a checkpoint. + Args: + config (RunnableConfig): Configuration of the related checkpoint. + writes (List[Tuple[str, Any]]): List of writes to store. + task_id (str): Identifier for the task creating the writes. + task_path (str): Path of the task creating the writes. + Returns: + None + """ + await self._engine._run_as_async( + self.__checkpoint.aput_writes(config, writes, task_id, task_path) + ) + + def put_writes( + self, + config: RunnableConfig, + writes: Sequence[tuple[str, Any]], + task_id: str, + task_path: str = "", + ) -> None: + """Store intermediate writes linked to a checkpoint. + Args: + config (RunnableConfig): Configuration of the related checkpoint. + writes (List[Tuple[str, Any]]): List of writes to store. + task_id (str): Identifier for the task creating the writes. + task_path (str): Path of the task creating the writes. + Returns: + None + """ + self._engine._run_as_sync( + self.__checkpoint.aput_writes(config, writes, task_id, task_path) + ) diff --git a/src/langchain_google_cloud_sql_pg/engine.py b/src/langchain_google_cloud_sql_pg/engine.py index 1fc30815..c40462b5 100644 --- a/src/langchain_google_cloud_sql_pg/engine.py +++ b/src/langchain_google_cloud_sql_pg/engine.py @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -39,6 +39,8 @@ USER_AGENT = "langchain-google-cloud-sql-pg-python/" + __version__ +CHECKPOINTS_TABLE = "checkpoints" + async def _get_iam_principal_email( credentials: google.auth.credentials.Credentials, @@ -416,7 +418,7 @@ def _run_as_sync(self, coro: Awaitable[T]) -> T: async def close(self) -> None: """Dispose of connection pool""" - await self._pool.dispose() + await self._run_as_async(self._pool.dispose()) async def _ainit_vectorstore_table( self, @@ -747,6 +749,87 @@ def init_document_table( ) ) + async def _ainit_checkpoint_table( + self, table_name: str = CHECKPOINTS_TABLE, schema_name: str = "public" + ) -> None: + """ + Create PgSQL tables to save checkpoints. + + Args: + schema_name (str): The schema name to store the checkpoint tables. + Default: "public". + table_name (str): The PgSQL database table name. + Default: "checkpoints". + + Returns: + None + """ + create_checkpoints_table = f"""CREATE TABLE "{schema_name}"."{table_name}"( + thread_id TEXT NOT NULL, + checkpoint_ns TEXT NOT NULL DEFAULT '', + checkpoint_id TEXT NOT NULL, + parent_checkpoint_id TEXT, + type TEXT, + checkpoint BYTEA, + metadata BYTEA, + PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id) + );""" + + create_checkpoint_writes_table = f"""CREATE TABLE "{schema_name}"."{table_name + "_writes"}"( + thread_id TEXT NOT NULL, + checkpoint_ns TEXT NOT NULL DEFAULT '', + checkpoint_id TEXT NOT NULL, + task_id TEXT NOT NULL, + idx INTEGER NOT NULL, + channel TEXT NOT NULL, + type TEXT, + blob BYTEA NOT NULL, + task_path TEXT NOT NULL DEFAULT '', + PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) + );""" + + async with self._pool.connect() as conn: + await conn.execute(text(create_checkpoints_table)) + await conn.execute(text(create_checkpoint_writes_table)) + await conn.commit() + + async def ainit_checkpoint_table( + self, table_name: str = CHECKPOINTS_TABLE, schema_name: str = "public" + ) -> None: + """Create an PgSQL table to save checkpoint messages. + + Args: + schema_name (str): The schema name to store checkpoint tables. + Default: "public". + table_name (str): The PgSQL database table name. + Default: "checkpoints". + + Returns: + None + """ + await self._run_as_async( + self._ainit_checkpoint_table( + table_name, + schema_name, + ) + ) + + def init_checkpoint_table( + self, table_name: str = CHECKPOINTS_TABLE, schema_name: str = "public" + ) -> None: + """Create Cloud SQL tables to store checkpoints. + + Args: + schema_name (str): The schema name to store checkpoint tables. + Default: "public". + table_name (str): The PgSQL database table name. + Default: "checkpoints". + + Returns: + None + """ + self._run_as_sync(self._ainit_checkpoint_table(table_name, schema_name)) + async def _aload_table_schema( self, table_name: str, @@ -765,7 +848,7 @@ async def _aload_table_schema( ) except InvalidRequestError as e: raise ValueError( - f"Table, '{schema_name}'.'{table_name}', does not exist: " + str(e) + f'Table, "{schema_name}"."{table_name}", does not exist: ' + str(e) ) table = Table(table_name, metadata, schema=schema_name) diff --git a/src/langchain_google_cloud_sql_pg/vectorstore.py b/src/langchain_google_cloud_sql_pg/vectorstore.py index e59deedf..f5333fd6 100644 --- a/src/langchain_google_cloud_sql_pg/vectorstore.py +++ b/src/langchain_google_cloud_sql_pg/vectorstore.py @@ -551,7 +551,7 @@ async def asimilarity_search( self, query: str, k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: """Return docs selected by similarity search on query.""" @@ -563,7 +563,7 @@ def similarity_search( self, query: str, k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: """Return docs selected by similarity search on query.""" @@ -586,7 +586,7 @@ async def asimilarity_search_with_score( self, query: str, k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[tuple[Document, float]]: """Return docs and distance scores selected by similarity search on query.""" @@ -598,7 +598,7 @@ def similarity_search_with_score( self, query: str, k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[tuple[Document, float]]: """Return docs and distance scores selected by similarity search on query.""" @@ -610,7 +610,7 @@ async def asimilarity_search_by_vector( self, embedding: list[float], k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: """Return docs selected by vector similarity search.""" @@ -622,7 +622,7 @@ def similarity_search_by_vector( self, embedding: list[float], k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: """Return docs selected by vector similarity search.""" @@ -634,7 +634,7 @@ async def asimilarity_search_with_score_by_vector( self, embedding: list[float], k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[tuple[Document, float]]: """Return docs and distance scores selected by vector similarity search.""" @@ -648,7 +648,7 @@ def similarity_search_with_score_by_vector( self, embedding: list[float], k: Optional[int] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[tuple[Document, float]]: """Return docs and distance scores selected by similarity search on vector.""" @@ -664,7 +664,7 @@ async def amax_marginal_relevance_search( k: Optional[int] = None, fetch_k: Optional[int] = None, lambda_mult: Optional[float] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: """Return docs selected using the maximal marginal relevance.""" @@ -680,7 +680,7 @@ def max_marginal_relevance_search( k: Optional[int] = None, fetch_k: Optional[int] = None, lambda_mult: Optional[float] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: """Return docs selected using the maximal marginal relevance.""" @@ -696,7 +696,7 @@ async def amax_marginal_relevance_search_by_vector( k: Optional[int] = None, fetch_k: Optional[int] = None, lambda_mult: Optional[float] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: """Return docs selected using the maximal marginal relevance.""" @@ -712,7 +712,7 @@ def max_marginal_relevance_search_by_vector( k: Optional[int] = None, fetch_k: Optional[int] = None, lambda_mult: Optional[float] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[Document]: """Return docs selected using the maximal marginal relevance.""" @@ -728,7 +728,7 @@ async def amax_marginal_relevance_search_with_score_by_vector( k: Optional[int] = None, fetch_k: Optional[int] = None, lambda_mult: Optional[float] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[tuple[Document, float]]: """Return docs and distance scores selected using the maximal marginal relevance.""" @@ -744,7 +744,7 @@ def max_marginal_relevance_search_with_score_by_vector( k: Optional[int] = None, fetch_k: Optional[int] = None, lambda_mult: Optional[float] = None, - filter: Optional[str] = None, + filter: Optional[dict] | Optional[str] = None, **kwargs: Any, ) -> list[tuple[Document, float]]: """Return docs and distance scores selected using the maximal marginal relevance.""" diff --git a/tests/metadata_filtering_data.py b/tests/metadata_filtering_data.py new file mode 100644 index 00000000..21cbaddd --- /dev/null +++ b/tests/metadata_filtering_data.py @@ -0,0 +1,251 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +METADATAS = [ + { + "name": "Wireless Headphones", + "code": "WH001", + "price": 149.99, + "is_available": True, + "release_date": "2023-10-26", + "tags": ["audio", "wireless", "electronics"], + "dimensions": [18.5, 7.2, 21.0], + "inventory_location": [101, 102], + "available_quantity": 50, + }, + { + "name": "Ergonomic Office Chair", + "code": "EC002", + "price": 299.00, + "is_available": True, + "release_date": "2023-08-15", + "tags": ["furniture", "office", "ergonomic"], + "dimensions": [65.0, 60.0, 110.0], + "inventory_location": [201], + "available_quantity": 10, + }, + { + "name": "Stainless Steel Water Bottle", + "code": "WB003", + "price": 25.50, + "is_available": False, + "release_date": "2024-01-05", + "tags": ["hydration", "eco-friendly", "kitchen"], + "dimensions": [7.5, 7.5, 25.0], + "available_quantity": 0, + }, + { + "name": "Smart Fitness Tracker", + "code": "FT004", + "price": 79.95, + "is_available": True, + "release_date": "2023-11-12", + "tags": ["fitness", "wearable", "technology"], + "dimensions": [2.0, 1.0, 25.0], + "inventory_location": [401], + "available_quantity": 100, + }, +] + +FILTERING_TEST_CASES = [ + # These tests only involve equality checks + ( + {"code": "FT004"}, + ["FT004"], + ), + # String field + ( + # check name + {"name": "Smart Fitness Tracker"}, + ["FT004"], + ), + # Boolean fields + ( + {"is_available": True}, + ["WH001", "FT004", "EC002"], + ), + # And semantics for top level filtering + ( + {"code": "WH001", "is_available": True}, + ["WH001"], + ), + # These involve equality checks and other operators + # like $ne, $gt, $gte, $lt, $lte + ( + {"available_quantity": {"$eq": 10}}, + ["EC002"], + ), + ( + {"available_quantity": {"$ne": 0}}, + ["WH001", "FT004", "EC002"], + ), + ( + {"available_quantity": {"$gt": 60}}, + ["FT004"], + ), + ( + {"available_quantity": {"$gte": 50}}, + ["WH001", "FT004"], + ), + ( + {"available_quantity": {"$lt": 5}}, + ["WB003"], + ), + ( + {"available_quantity": {"$lte": 10}}, + ["WB003", "EC002"], + ), + # Repeat all the same tests with name (string column) + ( + {"code": {"$eq": "WH001"}}, + ["WH001"], + ), + ( + {"code": {"$ne": "WB003"}}, + ["WH001", "FT004", "EC002"], + ), + # And also gt, gte, lt, lte relying on lexicographical ordering + ( + {"name": {"$gt": "Wireless Headphones"}}, + [], + ), + ( + {"name": {"$gte": "Wireless Headphones"}}, + ["WH001"], + ), + ( + {"name": {"$lt": "Smart Fitness Tracker"}}, + ["EC002"], + ), + ( + {"name": {"$lte": "Smart Fitness Tracker"}}, + ["FT004", "EC002"], + ), + ( + {"is_available": {"$eq": True}}, + ["WH001", "FT004", "EC002"], + ), + ( + {"is_available": {"$ne": True}}, + ["WB003"], + ), + # Test float column. + ( + {"price": {"$gt": 200.0}}, + ["EC002"], + ), + ( + {"price": {"$gte": 149.99}}, + ["WH001", "EC002"], + ), + ( + {"price": {"$lt": 50.0}}, + ["WB003"], + ), + ( + {"price": {"$lte": 79.95}}, + ["FT004", "WB003"], + ), + # These involve usage of AND, OR and NOT operators + ( + {"$or": [{"code": "WH001"}, {"code": "EC002"}]}, + ["WH001", "EC002"], + ), + ( + {"$or": [{"code": "WH001"}, {"available_quantity": 10}]}, + ["WH001", "EC002"], + ), + ( + {"$and": [{"code": "WH001"}, {"code": "EC002"}]}, + [], + ), + # Test for $not operator + ( + {"$not": {"code": "WB003"}}, + ["WH001", "FT004", "EC002"], + ), + ( + {"$not": [{"code": "WB003"}]}, + ["WH001", "FT004", "EC002"], + ), + ( + {"$not": {"available_quantity": 0}}, + ["WH001", "FT004", "EC002"], + ), + ( + {"$not": [{"available_quantity": 0}]}, + ["WH001", "FT004", "EC002"], + ), + ( + {"$not": {"is_available": True}}, + ["WB003"], + ), + ( + {"$not": [{"is_available": True}]}, + ["WB003"], + ), + ( + {"$not": {"price": {"$gt": 150.0}}}, + ["WH001", "FT004", "WB003"], + ), + ( + {"$not": [{"price": {"$gt": 150.0}}]}, + ["WH001", "FT004", "WB003"], + ), + # These involve special operators like $in, $nin, $between + # Test between + ( + {"available_quantity": {"$between": (40, 60)}}, + ["WH001"], + ), + # Test in + ( + {"name": {"$in": ["Smart Fitness Tracker", "Stainless Steel Water Bottle"]}}, + ["FT004", "WB003"], + ), + # With numeric fields + ( + {"available_quantity": {"$in": [0, 10]}}, + ["WB003", "EC002"], + ), + # Test nin + ( + {"name": {"$nin": ["Smart Fitness Tracker", "Stainless Steel Water Bottle"]}}, + ["WH001", "EC002"], + ), + ## with numeric fields + ( + {"available_quantity": {"$nin": [50, 0, 10]}}, + ["FT004"], + ), + # These involve special operators like $like, $ilike that + # may be specified to certain databases. + ( + {"name": {"$like": "Wireless%"}}, + ["WH001"], + ), + ( + {"name": {"$like": "%less%"}}, # adam and jane + ["WH001", "WB003"], + ), + # These involve the special operator $exists + ( + {"tags": {"$exists": False}}, + [], + ), + ( + {"inventory_location": {"$exists": False}}, + ["WB003"], + ), +] diff --git a/tests/test_async_checkpoint.py b/tests/test_async_checkpoint.py new file mode 100644 index 00000000..1cad46a5 --- /dev/null +++ b/tests/test_async_checkpoint.py @@ -0,0 +1,430 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import uuid +from typing import Any, List, Literal, Optional, Sequence, Tuple, Union + +import pytest +import pytest_asyncio +from langchain_core.callbacks import CallbackManagerForLLMRun +from langchain_core.language_models import BaseChatModel, LanguageModelInput +from langchain_core.messages import ( + AIMessage, + AnyMessage, + BaseMessage, + HumanMessage, + SystemMessage, + ToolCall, + ToolMessage, +) +from langchain_core.outputs import ChatGeneration, ChatResult +from langchain_core.runnables import RunnableConfig +from langgraph.checkpoint.base import ( + Checkpoint, + CheckpointMetadata, + create_checkpoint, + empty_checkpoint, +) +from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer +from langgraph.prebuilt import ( + ToolNode, + ValidationNode, + create_react_agent, + tools_condition, +) +from sqlalchemy import text +from sqlalchemy.engine.row import RowMapping + +from langchain_google_cloud_sql_pg.async_checkpoint import AsyncPostgresSaver +from langchain_google_cloud_sql_pg.engine import PostgresEngine + +write_config: RunnableConfig = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}} +read_config: RunnableConfig = {"configurable": {"thread_id": "1"}} +thread_agent_config: RunnableConfig = {"configurable": {"thread_id": "123"}} + +project_id = os.environ["PROJECT_ID"] +region = os.environ["REGION"] +instance_id = os.environ["INSTANCE_ID"] +db_name = os.environ["DATABASE_ID"] +table_name = "checkpoint" + str(uuid.uuid4()) +table_name_writes = table_name + "_writes" + +checkpoint: Checkpoint = { + "v": 1, + "ts": "2024-07-31T20:14:19.804150+00:00", + "id": "1ef4f797-8335-6428-8001-8a1503f9b875", + "channel_values": {"my_key": "meow", "node": "node"}, + "channel_versions": { + "__start__": 2, + "my_key": 3, + "start:node": 3, + "node": 3, + }, + "versions_seen": { + "__input__": {}, + "__start__": {"__start__": 1}, + "node": {"start:node": 2}, + }, + "pending_sends": [], +} + + +class AnyStr(str): + def __init__(self, prefix: Union[str, re.Pattern] = "") -> None: + super().__init__() + self.prefix = prefix + + def __eq__(self, other: object) -> bool: + return isinstance(other, str) and ( + ( + other.startswith(self.prefix) + if isinstance(self.prefix, str) + else bool(self.prefix.match(other)) + ) + ) + + def __hash__(self) -> int: + return hash((str(self), self.prefix)) + + +def _AnyIdToolMessage(**kwargs: Any) -> ToolMessage: + """Create a tool message with an any id field.""" + message = ToolMessage(**kwargs) + message.id = AnyStr() + return message + + +async def aexecute(engine: PostgresEngine, query: str) -> None: + async with engine._pool.connect() as conn: + await conn.execute(text(query)) + await conn.commit() + + +async def afetch(engine: PostgresEngine, query: str) -> Sequence[RowMapping]: + async with engine._pool.connect() as conn: + result = await conn.execute(text(query)) + result_map = result.mappings() + result_fetch = result_map.fetchall() + return result_fetch + + +@pytest_asyncio.fixture +async def async_engine(): + async_engine = await PostgresEngine.afrom_instance( + project_id=project_id, + region=region, + instance=instance_id, + database=db_name, + ) + + yield async_engine + + await aexecute(async_engine, f'DROP TABLE IF EXISTS "{table_name}"') + await aexecute(async_engine, f'DROP TABLE IF EXISTS "{table_name_writes}"') + await async_engine.close() + await async_engine._connector.close_async() + + +@pytest_asyncio.fixture +async def checkpointer(async_engine): + await async_engine._ainit_checkpoint_table(table_name=table_name) + checkpointer = await AsyncPostgresSaver.create( + async_engine, + table_name, # serde=JsonPlusSerializer + ) + yield checkpointer + + +@pytest.mark.asyncio +async def test_checkpoint_async( + async_engine: PostgresEngine, + checkpointer: AsyncPostgresSaver, +) -> None: + test_config = { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875", + } + } + # Verify if updated configuration after storing the checkpoint is correct + next_config = await checkpointer.aput(write_config, checkpoint, {}, {}) + assert dict(next_config) == test_config + + # Verify if the checkpoint is stored correctly in the database + results = await afetch(async_engine, f'SELECT * FROM "{table_name}"') + assert len(results) == 1 + for row in results: + assert isinstance(row["thread_id"], str) + await aexecute(async_engine, f'TRUNCATE TABLE "{table_name}"') + + +@pytest.fixture +def test_data(): + """Fixture providing test data for checkpoint tests.""" + config_0: RunnableConfig = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}} + config_1: RunnableConfig = { + "configurable": { + "thread_id": "thread-1", + # for backwards compatibility testing + "thread_ts": "1", + "checkpoint_ns": "", + } + } + config_2: RunnableConfig = { + "configurable": { + "thread_id": "thread-2", + "checkpoint_id": "2", + "checkpoint_ns": "", + } + } + config_3: RunnableConfig = { + "configurable": { + "thread_id": "thread-2", + "checkpoint_id": "2-inner", + "checkpoint_ns": "inner", + } + } + chkpnt_0: Checkpoint = { + "v": 1, + "ts": "2024-07-31T20:14:19.804150+00:00", + "id": "1ef4f797-8335-6428-8001-8a1503f9b875", + "channel_values": {"my_key": "meow", "node": "node"}, + "channel_versions": { + "__start__": 2, + "my_key": 3, + "start:node": 3, + "node": 3, + }, + "versions_seen": { + "__input__": {}, + "__start__": {"__start__": 1}, + "node": {"start:node": 2}, + }, + "pending_sends": [], + } + chkpnt_1: Checkpoint = empty_checkpoint() + chkpnt_2: Checkpoint = create_checkpoint(chkpnt_1, {}, 1) + chkpnt_3: Checkpoint = empty_checkpoint() + + metadata_1: CheckpointMetadata = { + "source": "input", + "step": 2, + "writes": {}, + "parents": 1, + } + metadata_2: CheckpointMetadata = { + "source": "loop", + "step": 1, + "writes": {"foo": "bar"}, + "parents": None, + } + metadata_3: CheckpointMetadata = {} + + return { + "configs": [config_0, config_1, config_2, config_3], + "checkpoints": [chkpnt_0, chkpnt_1, chkpnt_2, chkpnt_3], + "metadata": [metadata_1, metadata_2, metadata_3], + } + + +@pytest.mark.asyncio +async def test_checkpoint_aput_writes( + async_engine: PostgresEngine, + checkpointer: AsyncPostgresSaver, +) -> None: + config: RunnableConfig = { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875", + } + } + + # Verify if the checkpoint writes are stored correctly in the database + writes: Sequence[Tuple[str, Any]] = [ + ("test_channel1", {}), + ("test_channel2", {}), + ] + await checkpointer.aput_writes(config, writes, task_id="1") + + results = await afetch(async_engine, f'SELECT * FROM "{table_name_writes}"') + assert len(results) == 2 + for row in results: + assert isinstance(row["task_id"], str) + await aexecute(async_engine, f'TRUNCATE TABLE "{table_name_writes}"') + + +@pytest.mark.asyncio +async def test_checkpoint_alist( + async_engine: PostgresEngine, + checkpointer: AsyncPostgresSaver, + test_data: dict[str, Any], +) -> None: + configs = test_data["configs"] + checkpoints = test_data["checkpoints"] + metadata = test_data["metadata"] + + await checkpointer.aput(configs[1], checkpoints[1], metadata[0], {}) + await checkpointer.aput(configs[2], checkpoints[2], metadata[1], {}) + await checkpointer.aput(configs[3], checkpoints[3], metadata[2], {}) + + # call method / assertions + query_1 = {"source": "input"} # search by 1 key + query_2 = { + "step": 1, + "writes": {"foo": "bar"}, + } # search by multiple keys + query_3: dict[str, Any] = {} # search by no keys, return all checkpoints + query_4 = {"source": "update", "step": 1} # no match + + search_results_1 = [c async for c in checkpointer.alist(None, filter=query_1)] + assert len(search_results_1) == 1 + print(metadata[0]) + print(search_results_1[0].metadata) + assert search_results_1[0].metadata == metadata[0] + + search_results_2 = [c async for c in checkpointer.alist(None, filter=query_2)] + assert len(search_results_2) == 1 + assert search_results_2[0].metadata == metadata[1] + + search_results_3 = [c async for c in checkpointer.alist(None, filter=query_3)] + assert len(search_results_3) == 3 + + search_results_4 = [c async for c in checkpointer.alist(None, filter=query_4)] + assert len(search_results_4) == 0 + + # search by config (defaults to checkpoints across all namespaces) + search_results_5 = [ + c async for c in checkpointer.alist({"configurable": {"thread_id": "thread-2"}}) + ] + assert len(search_results_5) == 2 + assert { + search_results_5[0].config["configurable"]["checkpoint_ns"], + search_results_5[1].config["configurable"]["checkpoint_ns"], + } == {"", "inner"} + + +class FakeToolCallingModel(BaseChatModel): + tool_calls: Optional[list[list[ToolCall]]] = None + index: int = 0 + tool_style: Literal["openai", "anthropic"] = "openai" + + def _generate( + self, + messages: List[BaseMessage], + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> ChatResult: + """Top Level call""" + messages_string = "-".join( + [str(m.content) for m in messages if isinstance(m.content, str)] + ) + tool_calls = ( + self.tool_calls[self.index % len(self.tool_calls)] + if self.tool_calls + else [] + ) + message = AIMessage( + content=messages_string, + id=str(self.index), + tool_calls=tool_calls.copy(), + ) + self.index += 1 + return ChatResult(generations=[ChatGeneration(message=message)]) + + @property + def _llm_type(self) -> str: + return "fake-tool-call-model" + + +@pytest.mark.asyncio +async def test_checkpoint_with_agent( + checkpointer: AsyncPostgresSaver, +) -> None: + # from the tests in https://github.com/langchain-ai/langgraph/blob/909190cede6a80bb94a2d4cfe7dedc49ef0d4127/libs/langgraph/tests/test_prebuilt.py + model = FakeToolCallingModel() + + agent = create_react_agent(model, [], checkpointer=checkpointer) + inputs = [HumanMessage("hi?")] + response = await agent.ainvoke( + {"messages": inputs}, config=thread_agent_config, debug=True + ) + expected_response = {"messages": inputs + [AIMessage(content="hi?", id="0")]} + assert response == expected_response + + def _AnyIdHumanMessage(**kwargs: Any) -> HumanMessage: + """Create a human message with an any id field.""" + message = HumanMessage(**kwargs) + message.id = AnyStr() + return message + + saved = await checkpointer.aget_tuple(thread_agent_config) + assert saved is not None + assert saved.checkpoint["channel_values"] == { + "messages": [ + _AnyIdHumanMessage(content="hi?"), + AIMessage(content="hi?", id="0"), + ], + "agent": "agent", + } + assert saved.metadata == { + "parents": {}, + "source": "loop", + "writes": {"agent": {"messages": [AIMessage(content="hi?", id="0")]}}, + "step": 1, + "thread_id": "123", + } + assert saved.pending_writes == [] + + +@pytest.mark.asyncio +async def test_checkpoint_aget_tuple( + checkpointer: AsyncPostgresSaver, + test_data: dict[str, Any], +) -> None: + configs = test_data["configs"] + checkpoints = test_data["checkpoints"] + metadata = test_data["metadata"] + + new_config = await checkpointer.aput(configs[1], checkpoints[1], metadata[0], {}) + + # Matching checkpoint + search_results_1 = await checkpointer.aget_tuple(new_config) + assert search_results_1.metadata == metadata[0] # type: ignore + + # No matching checkpoint + assert await checkpointer.aget_tuple(configs[0]) is None + + +@pytest.mark.asyncio +async def test_metadata( + checkpointer: AsyncPostgresSaver, + test_data: dict[str, Any], +) -> None: + config = await checkpointer.aput( + test_data["configs"][0], + test_data["checkpoints"][0], + {"my_key": "abc"}, # type: ignore + {}, + ) + assert (await checkpointer.aget_tuple(config)).metadata["my_key"] == "abc" # type: ignore + assert [c async for c in checkpointer.alist(None, filter={"my_key": "abc"})][ + 0 + ].metadata[ + "my_key" # type: ignore + ] == "abc" # type: ignore diff --git a/tests/test_async_vectorstore_search.py b/tests/test_async_vectorstore_search.py index fae5e964..418dbbad 100644 --- a/tests/test_async_vectorstore_search.py +++ b/tests/test_async_vectorstore_search.py @@ -19,6 +19,7 @@ import pytest_asyncio from langchain_core.documents import Document from langchain_core.embeddings import DeterministicFakeEmbedding +from metadata_filtering_data import FILTERING_TEST_CASES, METADATAS from sqlalchemy import text from langchain_google_cloud_sql_pg import Column, PostgresEngine @@ -27,6 +28,7 @@ DEFAULT_TABLE = "test_table" + str(uuid.uuid4()).replace("-", "_") CUSTOM_TABLE = "test_table_custom" + str(uuid.uuid4()).replace("-", "_") +CUSTOM_FILTER_TABLE = "test_table_custom_filter" + str(uuid.uuid4()).replace("-", "_") VECTOR_SIZE = 768 sync_method_exception_str = "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." @@ -38,7 +40,9 @@ docs = [ Document(page_content=texts[i], metadata=metadatas[i]) for i in range(len(texts)) ] - +filter_docs = [ + Document(page_content=texts[i], metadata=METADATAS[i]) for i in range(len(texts)) +] embeddings = [embeddings_service.embed_query("foo") for i in range(len(texts))] @@ -87,6 +91,7 @@ async def engine(self, db_project, db_region, db_instance, db_name): yield engine await aexecute(engine, f"DROP TABLE IF EXISTS {DEFAULT_TABLE}") await aexecute(engine, f"DROP TABLE IF EXISTS {CUSTOM_TABLE}") + await aexecute(engine, f"DROP TABLE IF EXISTS {CUSTOM_FILTER_TABLE}") await engine.close() @pytest_asyncio.fixture(scope="class") @@ -129,6 +134,42 @@ async def vs_custom(self, engine): await vs_custom.aadd_documents(docs, ids=ids) yield vs_custom + @pytest_asyncio.fixture(scope="class") + async def vs_custom_filter(self, engine): + await engine._ainit_vectorstore_table( + CUSTOM_FILTER_TABLE, + VECTOR_SIZE, + metadata_columns=[ + Column("name", "TEXT"), + Column("code", "TEXT"), + Column("price", "FLOAT"), + Column("is_available", "BOOLEAN"), + Column("tags", "TEXT[]"), + Column("inventory_location", "INTEGER[]"), + Column("available_quantity", "INTEGER", nullable=True), + ], + id_column="langchain_id", + store_metadata=False, + ) + + vs_custom_filter = await AsyncPostgresVectorStore.create( + engine, + embedding_service=embeddings_service, + table_name=CUSTOM_FILTER_TABLE, + metadata_columns=[ + "name", + "code", + "price", + "is_available", + "tags", + "inventory_location", + "available_quantity", + ], + id_column="langchain_id", + ) + await vs_custom_filter.aadd_documents(filter_docs, ids=ids) + yield vs_custom_filter + async def test_asimilarity_search(self, vs): results = await vs.asimilarity_search("foo", k=1) assert len(results) == 1 @@ -287,3 +328,16 @@ def test_get_by_ids(self, vs): test_ids = [ids[0]] with pytest.raises(Exception, match=sync_method_exception_str): vs.get_by_ids(ids=test_ids) + + @pytest.mark.parametrize("test_filter, expected_ids", FILTERING_TEST_CASES) + async def test_vectorstore_with_metadata_filters( + self, + vs_custom_filter, + test_filter, + expected_ids, + ): + """Test end to end construction and search.""" + docs = await vs_custom_filter.asimilarity_search( + "meow", k=5, filter=test_filter + ) + assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter diff --git a/tests/test_checkpoint.py b/tests/test_checkpoint.py new file mode 100644 index 00000000..3a9c5bd3 --- /dev/null +++ b/tests/test_checkpoint.py @@ -0,0 +1,355 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid +from typing import Any, Sequence, Tuple + +import pytest +import pytest_asyncio +from langchain_core.runnables import RunnableConfig +from langgraph.checkpoint.base import ( + Checkpoint, + CheckpointMetadata, + create_checkpoint, + empty_checkpoint, +) +from sqlalchemy import text +from sqlalchemy.engine.row import RowMapping + +from langchain_google_cloud_sql_pg.checkpoint import PostgresSaver +from langchain_google_cloud_sql_pg.engine import PostgresEngine + +write_config: RunnableConfig = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}} +read_config: RunnableConfig = {"configurable": {"thread_id": "1"}} + +project_id = os.environ["PROJECT_ID"] +region = os.environ["REGION"] +instance_id = os.environ["INSTANCE_ID"] +db_name = os.environ["DATABASE_ID"] +table_name = "checkpoint" + str(uuid.uuid4()) +table_name_writes = table_name + "_writes" +table_name_async = "checkpoint" + str(uuid.uuid4()) +table_name_writes_async = table_name_async + "_writes" + +checkpoint: Checkpoint = { + "v": 1, + "ts": "2024-07-31T20:14:19.804150+00:00", + "id": "1ef4f797-8335-6428-8001-8a1503f9b875", + "channel_values": {"my_key": "meow", "node": "node"}, + "channel_versions": { + "__start__": 2, + "my_key": 3, + "start:node": 3, + "node": 3, + }, + "versions_seen": { + "__input__": {}, + "__start__": {"__start__": 1}, + "node": {"start:node": 2}, + }, + "pending_sends": [], +} + + +def get_env_var(key: str, desc: str) -> str: + v = os.environ.get(key) + if v is None: + raise ValueError(f"Must set env var {key} to: {desc}") + return v + + +async def aexecute(engine: PostgresEngine, query: str) -> None: + async def run(engine, query): + async with engine._pool.connect() as conn: + await conn.execute(text(query)) + await conn.commit() + + await engine._run_as_async(run(engine, query)) + + +async def afetch(engine: PostgresEngine, query: str) -> Sequence[RowMapping]: + async def run(engine, query): + async with engine._pool.connect() as conn: + result = await conn.execute(text(query)) + result_map = result.mappings() + result_fetch = result_map.fetchall() + return result_fetch + + return await engine._run_as_async(run(engine, query)) + + +@pytest_asyncio.fixture +async def engine(): + engine = PostgresEngine.from_instance( + project_id=project_id, + region=region, + instance=instance_id, + database=db_name, + ) + yield engine + # use default table + await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name}"') + await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name_writes}"') + await engine.close() + await engine._connector.close_async() + + +@pytest_asyncio.fixture +async def async_engine(): + async_engine = await PostgresEngine.afrom_instance( + project_id=project_id, + region=region, + instance=instance_id, + database=db_name, + ) + yield async_engine + + await aexecute(async_engine, f'DROP TABLE IF EXISTS "{table_name_async}"') + await aexecute(async_engine, f'DROP TABLE IF EXISTS "{table_name_writes_async}"') + await async_engine.close() + await async_engine._connector.close_async() + + +@pytest_asyncio.fixture +def checkpointer(engine): + engine.init_checkpoint_table(table_name=table_name) + checkpointer = PostgresSaver.create_sync(engine, table_name) + yield checkpointer + + +@pytest_asyncio.fixture +async def async_checkpointer(async_engine): + await async_engine.ainit_checkpoint_table(table_name=table_name_async) + async_checkpointer = await PostgresSaver.create(async_engine, table_name_async) + yield async_checkpointer + + +@pytest.mark.asyncio +async def test_checkpoint_async( + async_engine: PostgresEngine, + async_checkpointer: PostgresSaver, +) -> None: + test_config = { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875", + } + } + # Verify if updated configuration after storing the checkpoint is correct + next_config = await async_checkpointer.aput(write_config, checkpoint, {}, {}) + assert dict(next_config) == test_config + + # Verify if the checkpoint is stored correctly in the database + results = await afetch(async_engine, f'SELECT * FROM "{table_name_async}"') + assert len(results) == 1 + for row in results: + assert isinstance(row["thread_id"], str) + await aexecute(async_engine, f'TRUNCATE TABLE "{table_name_async}"') + + +# Test put method for checkpoint +@pytest.mark.asyncio +async def test_checkpoint_sync( + engine: PostgresEngine, + checkpointer: PostgresSaver, +) -> None: + test_config = { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875", + } + } + # Verify if updated configuration after storing the checkpoint is correct + next_config = checkpointer.put(write_config, checkpoint, {}, {}) + assert dict(next_config) == test_config + + # Verify if the checkpoint is stored correctly in the database + results = await afetch(engine, f'SELECT * FROM "{table_name}"') + assert len(results) == 1 + for row in results: + assert isinstance(row["thread_id"], str) + await aexecute(engine, f'TRUNCATE TABLE "{table_name}"') + + +@pytest.mark.asyncio +async def test_chat_table_async(async_engine): + with pytest.raises(ValueError): + await PostgresSaver.create(engine=async_engine, table_name="doesnotexist") + + +def test_checkpoint_table(engine: Any) -> None: + with pytest.raises(ValueError): + PostgresSaver.create_sync(engine=engine, table_name="doesnotexist") + + +@pytest.fixture +def test_data(): + """Fixture providing test data for checkpoint tests.""" + config_0: RunnableConfig = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}} + config_1: RunnableConfig = { + "configurable": { + "thread_id": "thread-1", + # for backwards compatibility testing + "thread_ts": "1", + "checkpoint_ns": "", + } + } + config_2: RunnableConfig = { + "configurable": { + "thread_id": "thread-2", + "checkpoint_id": "2", + "checkpoint_ns": "", + } + } + config_3: RunnableConfig = { + "configurable": { + "thread_id": "thread-2", + "checkpoint_id": "2-inner", + "checkpoint_ns": "inner", + } + } + chkpnt_0: Checkpoint = { + "v": 1, + "ts": "2024-07-31T20:14:19.804150+00:00", + "id": "1ef4f797-8335-6428-8001-8a1503f9b875", + "channel_values": {"my_key": "meow", "node": "node"}, + "channel_versions": { + "__start__": 2, + "my_key": 3, + "start:node": 3, + "node": 3, + }, + "versions_seen": { + "__input__": {}, + "__start__": {"__start__": 1}, + "node": {"start:node": 2}, + }, + "pending_sends": [], + } + chkpnt_1: Checkpoint = empty_checkpoint() + chkpnt_2: Checkpoint = create_checkpoint(chkpnt_1, {}, 1) + chkpnt_3: Checkpoint = empty_checkpoint() + + metadata_1: CheckpointMetadata = { + "source": "input", + "step": 2, + "writes": {}, + "parents": 1, + } + metadata_2: CheckpointMetadata = { + "source": "loop", + "step": 1, + "writes": {"foo": "bar"}, + "parents": None, + } + metadata_3: CheckpointMetadata = {} + + return { + "configs": [config_0, config_1, config_2, config_3], + "checkpoints": [chkpnt_0, chkpnt_1, chkpnt_2, chkpnt_3], + "metadata": [metadata_1, metadata_2, metadata_3], + } + + +@pytest.mark.asyncio +async def test_checkpoint_put_writes( + engine: PostgresEngine, + checkpointer: PostgresSaver, +) -> None: + config: RunnableConfig = { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875", + } + } + + # Verify if the checkpoint writes are stored correctly in the database + writes: Sequence[Tuple[str, Any]] = [ + ("test_channel1", {}), + ("test_channel2", {}), + ] + checkpointer.put_writes(config, writes, task_id="1") + + results = await afetch(engine, f'SELECT * FROM "{table_name_writes}"') + assert len(results) == 2 + for row in results: + assert isinstance(row["task_id"], str) + await aexecute(engine, f'TRUNCATE TABLE "{table_name_writes}"') + + +def test_checkpoint_list( + checkpointer: PostgresSaver, + test_data: dict[str, Any], +) -> None: + configs = test_data["configs"] + checkpoints = test_data["checkpoints"] + metadata = test_data["metadata"] + + checkpointer.put(configs[1], checkpoints[1], metadata[0], {}) + checkpointer.put(configs[2], checkpoints[2], metadata[1], {}) + checkpointer.put(configs[3], checkpoints[3], metadata[2], {}) + + # call method / assertions + query_1 = {"source": "input"} # search by 1 key + query_2 = { + "step": 1, + "writes": {"foo": "bar"}, + } # search by multiple keys + query_3: dict[str, Any] = {} # search by no keys, return all checkpoints + query_4 = {"source": "update", "step": 1} # no match + + search_results_1 = list(checkpointer.list(None, filter=query_1)) + assert len(search_results_1) == 1 + assert search_results_1[0].metadata == metadata[0] + search_results_2 = list(checkpointer.list(None, filter=query_2)) + assert len(search_results_2) == 1 + assert search_results_2[0].metadata == metadata[1] + + search_results_3 = list(checkpointer.list(None, filter=query_3)) + assert len(search_results_3) == 3 + + search_results_4 = list(checkpointer.list(None, filter=query_4)) + assert len(search_results_4) == 0 + + # search by config (defaults to checkpoints across all namespaces) + search_results_5 = list( + checkpointer.list({"configurable": {"thread_id": "thread-2"}}) + ) + assert len(search_results_5) == 2 + assert { + search_results_5[0].config["configurable"]["checkpoint_ns"], + search_results_5[1].config["configurable"]["checkpoint_ns"], + } == {"", "inner"} + + +def test_checkpoint_get_tuple( + checkpointer: PostgresSaver, + test_data: dict[str, Any], +) -> None: + configs = test_data["configs"] + checkpoints = test_data["checkpoints"] + metadata = test_data["metadata"] + + new_config = checkpointer.put(configs[1], checkpoints[1], metadata[0], {}) + + # Matching checkpoint + search_results_1 = checkpointer.get_tuple(new_config) + assert search_results_1.metadata == metadata[0] # type: ignore + + # No matching checkpoint + assert checkpointer.get_tuple(configs[0]) is None diff --git a/tests/test_engine.py b/tests/test_engine.py index 1c2653bf..7883cf4b 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -130,7 +130,9 @@ async def test_init_table(self, engine): id = str(uuid.uuid4()) content = "coffee" embedding = await embeddings_service.aembed_query(content) - stmt = f"INSERT INTO {DEFAULT_TABLE} (langchain_id, content, embedding) VALUES ('{id}', '{content}','{embedding}');" + # Note: DeterministicFakeEmbedding generates a numpy array, converting to list a list of float values + embedding_string = [float(dimension) for dimension in embedding] + stmt = f"INSERT INTO {DEFAULT_TABLE} (langchain_id, content, embedding) VALUES ('{id}', '{content}','{embedding_string}');" await aexecute(engine, stmt) async def test_init_table_custom(self, engine): @@ -200,6 +202,7 @@ async def test_password( assert engine await aexecute(engine, "SELECT 1") PostgresEngine._connector = None + await engine.close() async def test_from_engine( self, @@ -300,6 +303,41 @@ async def test_iam_account_override( await aexecute(engine, "SELECT 1") await engine.close() + async def test_ainit_checkpoint_writes_table(self, engine): + table_name = f"checkpoint{uuid.uuid4()}" + table_name_writes = f"{table_name}_writes" + await engine.ainit_checkpoint_table(table_name=table_name) + stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table_name_writes}';" + results = await afetch(engine, stmt) + expected = [ + {"column_name": "thread_id", "data_type": "text"}, + {"column_name": "checkpoint_ns", "data_type": "text"}, + {"column_name": "checkpoint_id", "data_type": "text"}, + {"column_name": "task_id", "data_type": "text"}, + {"column_name": "idx", "data_type": "integer"}, + {"column_name": "channel", "data_type": "text"}, + {"column_name": "type", "data_type": "text"}, + {"column_name": "blob", "data_type": "bytea"}, + {"column_name": "task_path", "data_type": "text"}, + ] + for row in results: + assert row in expected + stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table_name}';" + results = await afetch(engine, stmt) + expected = [ + {"column_name": "thread_id", "data_type": "text"}, + {"column_name": "checkpoint_ns", "data_type": "text"}, + {"column_name": "checkpoint_id", "data_type": "text"}, + {"column_name": "parent_checkpoint_id", "data_type": "text"}, + {"column_name": "checkpoint", "data_type": "bytea"}, + {"column_name": "metadata", "data_type": "bytea"}, + {"column_name": "type", "data_type": "text"}, + ] + for row in results: + assert row in expected + await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name}"') + await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name_writes}"') + @pytest.mark.asyncio(scope="module") class TestEngineSync: @@ -350,7 +388,9 @@ async def test_init_table(self, engine): id = str(uuid.uuid4()) content = "coffee" embedding = await embeddings_service.aembed_query(content) - stmt = f"INSERT INTO {DEFAULT_TABLE_SYNC} (langchain_id, content, embedding) VALUES ('{id}', '{content}','{embedding}');" + # Note: DeterministicFakeEmbedding generates a numpy array, converting to list a list of float values + embedding_string = [float(dimension) for dimension in embedding] + stmt = f"INSERT INTO {DEFAULT_TABLE_SYNC} (langchain_id, content, embedding) VALUES ('{id}', '{content}','{embedding_string}');" await aexecute(engine, stmt) async def test_init_table_custom(self, engine): @@ -421,6 +461,7 @@ async def test_password( assert engine await aexecute(engine, "SELECT 1") PostgresEngine._connector = None + await engine.close() async def test_engine_constructor_key( self, @@ -449,3 +490,38 @@ async def test_iam_account_override( assert engine await aexecute(engine, "SELECT 1") await engine.close() + + async def test_init_checkpoints_table(self, engine): + table_name = f"checkpoint{uuid.uuid4()}" + table_name_writes = f"{table_name}_writes" + engine.init_checkpoint_table(table_name=table_name) + stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table_name}';" + results = await afetch(engine, stmt) + expected = [ + {"column_name": "thread_id", "data_type": "text"}, + {"column_name": "checkpoint_ns", "data_type": "text"}, + {"column_name": "checkpoint_id", "data_type": "text"}, + {"column_name": "parent_checkpoint_id", "data_type": "text"}, + {"column_name": "type", "data_type": "text"}, + {"column_name": "checkpoint", "data_type": "bytea"}, + {"column_name": "metadata", "data_type": "bytea"}, + ] + for row in results: + assert row in expected + stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table_name_writes}';" + results = await afetch(engine, stmt) + expected = [ + {"column_name": "thread_id", "data_type": "text"}, + {"column_name": "checkpoint_ns", "data_type": "text"}, + {"column_name": "checkpoint_id", "data_type": "text"}, + {"column_name": "task_id", "data_type": "text"}, + {"column_name": "idx", "data_type": "integer"}, + {"column_name": "channel", "data_type": "text"}, + {"column_name": "type", "data_type": "text"}, + {"column_name": "blob", "data_type": "bytea"}, + {"column_name": "task_path", "data_type": "text"}, + ] + for row in results: + assert row in expected + await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name}"') + await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name_writes}"') diff --git a/tests/test_vectorstore_search.py b/tests/test_vectorstore_search.py index 2141d951..5d53b918 100644 --- a/tests/test_vectorstore_search.py +++ b/tests/test_vectorstore_search.py @@ -19,6 +19,7 @@ import pytest_asyncio from langchain_core.documents import Document from langchain_core.embeddings import DeterministicFakeEmbedding +from metadata_filtering_data import FILTERING_TEST_CASES, METADATAS from sqlalchemy import text from langchain_google_cloud_sql_pg import Column, PostgresEngine, PostgresVectorStore @@ -27,6 +28,10 @@ DEFAULT_TABLE = "test_table" + str(uuid.uuid4()).replace("-", "_") CUSTOM_TABLE = "test_table_custom" + str(uuid.uuid4()).replace("-", "_") CUSTOM_TABLE_SYNC = "test_table_sync" + str(uuid.uuid4()).replace("-", "_") +CUSTOM_FILTER_TABLE = "test_table_custom_filter" + str(uuid.uuid4()).replace("-", "_") +CUSTOM_FILTER_TABLE_SYNC = "test_table_custom_filter_sync" + str(uuid.uuid4()).replace( + "-", "_" +) VECTOR_SIZE = 768 embeddings_service = DeterministicFakeEmbedding(size=VECTOR_SIZE) @@ -37,7 +42,9 @@ docs = [ Document(page_content=texts[i], metadata=metadatas[i]) for i in range(len(texts)) ] - +filter_docs = [ + Document(page_content=texts[i], metadata=METADATAS[i]) for i in range(len(texts)) +] embeddings = [embeddings_service.embed_query("foo") for i in range(len(texts))] @@ -88,6 +95,7 @@ async def engine(self, db_project, db_region, db_instance, db_name): ) yield engine await aexecute(engine, f"DROP TABLE IF EXISTS {DEFAULT_TABLE}") + await aexecute(engine, f"DROP TABLE IF EXISTS {CUSTOM_FILTER_TABLE}") await engine.close() @pytest_asyncio.fixture(scope="class") @@ -142,6 +150,43 @@ async def vs_custom(self, engine_sync): vs_custom.add_documents(docs, ids=ids) yield vs_custom + @pytest_asyncio.fixture(scope="class") + async def vs_custom_filter(self, engine): + await engine.ainit_vectorstore_table( + CUSTOM_FILTER_TABLE, + VECTOR_SIZE, + metadata_columns=[ + Column("name", "TEXT"), + Column("code", "TEXT"), + Column("price", "FLOAT"), + Column("is_available", "BOOLEAN"), + Column("tags", "TEXT[]"), + Column("inventory_location", "INTEGER[]"), + Column("available_quantity", "INTEGER", nullable=True), + ], + id_column="langchain_id", + store_metadata=False, + overwrite_existing=True, + ) + + vs_custom_filter = await PostgresVectorStore.create( + engine, + embedding_service=embeddings_service, + table_name=CUSTOM_FILTER_TABLE, + metadata_columns=[ + "name", + "code", + "price", + "is_available", + "tags", + "inventory_location", + "available_quantity", + ], + id_column="langchain_id", + ) + await vs_custom_filter.aadd_documents(filter_docs, ids=ids) + yield vs_custom_filter + async def test_asimilarity_search(self, vs): results = await vs.asimilarity_search("foo", k=1) assert len(results) == 1 @@ -240,6 +285,19 @@ async def test_aget_by_ids_custom_vs(self, vs_custom): assert results[0] == Document(page_content="foo", id=ids[0]) + @pytest.mark.parametrize("test_filter, expected_ids", FILTERING_TEST_CASES) + async def test_vectorstore_with_metadata_filters( + self, + vs_custom_filter, + test_filter, + expected_ids, + ): + """Test end to end construction and search.""" + docs = await vs_custom_filter.asimilarity_search( + "meow", k=5, filter=test_filter + ) + assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter + class TestVectorStoreSearchSync: @pytest.fixture(scope="module") @@ -268,6 +326,7 @@ async def engine_sync(self, db_project, db_region, db_instance, db_name): ) yield engine await aexecute(engine, f"DROP TABLE IF EXISTS {CUSTOM_TABLE_SYNC}") + await aexecute(engine, f"DROP TABLE IF EXISTS {CUSTOM_FILTER_TABLE_SYNC}") await engine.close() @pytest.fixture(scope="class") @@ -297,6 +356,44 @@ def vs_custom(self, engine_sync): vs_custom.add_documents(docs, ids=ids) yield vs_custom + @pytest_asyncio.fixture(scope="class") + async def vs_custom_filter_sync(self, engine_sync): + engine_sync.init_vectorstore_table( + CUSTOM_FILTER_TABLE_SYNC, + VECTOR_SIZE, + metadata_columns=[ + Column("name", "TEXT"), + Column("code", "TEXT"), + Column("price", "FLOAT"), + Column("is_available", "BOOLEAN"), + Column("tags", "TEXT[]"), + Column("inventory_location", "INTEGER[]"), + Column("available_quantity", "INTEGER", nullable=True), + ], + id_column="langchain_id", + store_metadata=False, + overwrite_existing=True, + ) + + vs_custom_filter_sync = await PostgresVectorStore.create( + engine_sync, + embedding_service=embeddings_service, + table_name=CUSTOM_FILTER_TABLE_SYNC, + metadata_columns=[ + "name", + "code", + "price", + "is_available", + "tags", + "inventory_location", + "available_quantity", + ], + id_column="langchain_id", + ) + + vs_custom_filter_sync.add_documents(filter_docs, ids=ids) + yield vs_custom_filter_sync + def test_similarity_search(self, vs_custom): results = vs_custom.similarity_search("foo", k=1) assert len(results) == 1 @@ -349,3 +446,15 @@ def test_get_by_ids_custom_vs(self, vs_custom): results = vs_custom.get_by_ids(ids=test_ids) assert results[0] == Document(page_content="foo", id=ids[0]) + + @pytest.mark.parametrize("test_filter, expected_ids", FILTERING_TEST_CASES) + async def test_sync_vectorstore_with_metadata_filters( + self, + vs_custom_filter_sync, + test_filter, + expected_ids, + ): + """Test end to end construction and search.""" + + docs = vs_custom_filter_sync.similarity_search("meow", k=5, filter=test_filter) + assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter