diff --git a/docs/vector_store.ipynb b/docs/vector_store.ipynb index cf2814fe..74bc3f54 100644 --- a/docs/vector_store.ipynb +++ b/docs/vector_store.ipynb @@ -585,10 +585,49 @@ "all_texts = [\"Apples and oranges\", \"Cars and airplanes\", \"Pineapple\", \"Train\", \"Banana\"]\n", "metadatas = [{\"len\": len(t)} for t in all_texts]\n", "ids = [str(uuid.uuid4()) for _ in all_texts]\n", - "await custom_store.aadd_texts(all_texts, metadatas=metadatas, ids=ids)\n", + "await custom_store.aadd_texts(all_texts, metadatas=metadatas, ids=ids)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### For v0.15.0+\n", + "\n", + "**Important Update:** Support for string filters has been deprecated. Please use dictionaries to add filters." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Use filter on search\n", + "docs = await custom_store.asimilarity_search_by_vector(\n", + " query_vector, filter={\"len\": {\"$gte\": 6}}\n", + ")\n", "\n", + "print(docs)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### For v0.14 and under\n", + "\n", + "You can make use of the string filters to filter on metadata" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ "# Use filter on search\n", - "docs = await custom_store.asimilarity_search_by_vector(query_vector, filter=\"len >= 6\")\n", + "docs = await custom_store.asimilarity_search(query, filter=\"len >= 6\")\n", "\n", "print(docs)" ] diff --git a/pyproject.toml b/pyproject.toml index 22b65ebb..8e691bda 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,11 +11,9 @@ authors = [ dependencies = [ "cloud-sql-python-connector[asyncpg] >= 1.10.0, <2.0.0", - "langchain-core>=0.2.36, <1.0.0 ", "numpy>=1.24.4, <3.0.0; python_version >= '3.11'", "numpy>=1.24.4, <=2.2.6; python_version == '3.10'", "numpy>=1.24.4, <=2.0.2; python_version <= '3.9'", - "SQLAlchemy[asyncio]>=2.0.25, <3.0.0", "langchain-postgres>=0.0.15", ] diff --git a/requirements.txt b/requirements.txt index 9332ba72..ac28b2c2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,6 @@ cloud-sql-python-connector[asyncpg]==1.18.2 -langchain-core==0.3.68 numpy==2.3.1; python_version >= "3.11" numpy==2.2.6; python_version == "3.10" numpy==2.0.2; python_version <= "3.9" -SQLAlchemy[asyncio]==2.0.41 langgraph==0.6.0 langchain-postgres==0.0.15 diff --git a/src/langchain_google_cloud_sql_pg/__init__.py b/src/langchain_google_cloud_sql_pg/__init__.py index f2e3dc87..34bf7514 100644 --- a/src/langchain_google_cloud_sql_pg/__init__.py +++ b/src/langchain_google_cloud_sql_pg/__init__.py @@ -13,6 +13,11 @@ # limitations under the License. from langchain_postgres import Column +from langchain_postgres.v2.hybrid_search_config import ( + HybridSearchConfig, + reciprocal_rank_fusion, + weighted_sum_ranking, +) from . import indexes from .chat_message_history import PostgresChatMessageHistory @@ -31,5 +36,8 @@ "PostgresLoader", "PostgresDocumentSaver", "PostgresSaver", + "HybridSearchConfig", + "reciprocal_rank_fusion", + "weighted_sum_ranking", "__version__", ] diff --git a/src/langchain_google_cloud_sql_pg/async_vectorstore.py b/src/langchain_google_cloud_sql_pg/async_vectorstore.py index 0cde1f8d..d40470f3 100644 --- a/src/langchain_google_cloud_sql_pg/async_vectorstore.py +++ b/src/langchain_google_cloud_sql_pg/async_vectorstore.py @@ -15,1187 +15,11 @@ # TODO: Remove below import when minimum supported Python version is 3.10 from __future__ import annotations -import copy -import json -import uuid -from typing import Any, Callable, Iterable, Optional, Sequence +from langchain_postgres.v2.async_vectorstore import AsyncPGVectorStore -import numpy as np -from langchain_core.documents import Document -from langchain_core.embeddings import Embeddings -from langchain_core.vectorstores import VectorStore, utils -from sqlalchemy import text -from sqlalchemy.engine.row import RowMapping -from sqlalchemy.ext.asyncio import AsyncEngine -from .engine import PostgresEngine -from .indexes import ( - DEFAULT_DISTANCE_STRATEGY, - DEFAULT_INDEX_NAME_SUFFIX, - BaseIndex, - DistanceStrategy, - ExactNearestNeighbor, - QueryOptions, -) - -COMPARISONS_TO_NATIVE = { - "$eq": "=", - "$ne": "!=", - "$lt": "<", - "$lte": "<=", - "$gt": ">", - "$gte": ">=", -} - -SPECIAL_CASED_OPERATORS = { - "$in", - "$nin", - "$between", - "$exists", -} - -TEXT_OPERATORS = { - "$like", - "$ilike", -} - -LOGICAL_OPERATORS = {"$and", "$or", "$not"} - -SUPPORTED_OPERATORS = ( - set(COMPARISONS_TO_NATIVE) - .union(TEXT_OPERATORS) - .union(LOGICAL_OPERATORS) - .union(SPECIAL_CASED_OPERATORS) -) - - -class AsyncPostgresVectorStore(VectorStore): +class AsyncPostgresVectorStore(AsyncPGVectorStore): """Google Cloud SQL for PostgreSQL Vector Store class""" - __create_key = object() - - def __init__( - self, - key: object, - pool: AsyncEngine, - embedding_service: Embeddings, - table_name: str, - schema_name: str = "public", - content_column: str = "content", - embedding_column: str = "embedding", - metadata_columns: list[str] = [], - id_column: str = "langchain_id", - metadata_json_column: Optional[str] = "langchain_metadata", - distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - index_query_options: Optional[QueryOptions] = None, - ): - """AsyncPostgresVectorStore constructor. - Args: - key (object): Prevent direct constructor usage. - pool (PostgresEngine): Connection pool engine for managing connections to Postgres database. - embedding_service (Embeddings): Text embedding model to use. - table_name (str): Name of the existing table or the table to be created. - schema_name (str, optional): Database schema name of the table. Defaults to "public". - content_column (str): Column that represent a Document's page_content. Defaults to "content". - embedding_column (str): Column for embedding vectors. The embedding is generated from the document value. Defaults to "embedding". - metadata_columns (list[str]): Column(s) that represent a document's metadata. - id_column (str): Column that represents the Document's id. Defaults to "langchain_id". - metadata_json_column (str): Column to store metadata as JSON. Defaults to "langchain_metadata". - distance_strategy (DistanceStrategy): Distance strategy to use for vector similarity search. Defaults to COSINE_DISTANCE. - k (int): Number of Documents to return from search. Defaults to 4. - fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. - lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. - index_query_options (QueryOptions): Index query option. - - - Raises: - Exception: If called directly by user. - """ - if key != AsyncPostgresVectorStore.__create_key: - raise Exception( - "Only create class through 'create' or 'create_sync' methods!" - ) - - self.pool = pool - self.embedding_service = embedding_service - self.table_name = table_name - self.schema_name = schema_name - self.content_column = content_column - self.embedding_column = embedding_column - self.metadata_columns = metadata_columns - self.id_column = id_column - self.metadata_json_column = metadata_json_column - self.distance_strategy = distance_strategy - self.k = k - self.fetch_k = fetch_k - self.lambda_mult = lambda_mult - self.index_query_options = index_query_options - - @classmethod - async def create( - cls, - engine: PostgresEngine, - embedding_service: Embeddings, - table_name: str, - schema_name: str = "public", - content_column: str = "content", - embedding_column: str = "embedding", - metadata_columns: list[str] = [], - ignore_metadata_columns: Optional[list[str]] = None, - id_column: str = "langchain_id", - metadata_json_column: Optional[str] = "langchain_metadata", - distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - index_query_options: Optional[QueryOptions] = None, - ) -> AsyncPostgresVectorStore: - """Create a new AsyncPostgresVectorStore instance. - - Args: - engine (PostgresEngine): Connection pool engine for managing connections to Cloud SQL for PostgreSQL database. - embedding_service (Embeddings): Text embedding model to use. - table_name (str): Name of an existing table or table to be created. - schema_name (str, optional): Database schema name of the table. Defaults to "public". - content_column (str): Column that represent a Document's page_content. Defaults to "content". - embedding_column (str): Column for embedding vectors. The embedding is generated from the document value. Defaults to "embedding". - metadata_columns (list[str]): Column(s) that represent a document's metadata. - ignore_metadata_columns (list[str]): Column(s) to ignore in pre-existing tables for a document's metadata. Can not be used with metadata_columns. Defaults to None. - id_column (str): Column that represents the Document's id. Defaults to "langchain_id". - metadata_json_column (str): Column to store metadata as JSON. Defaults to "langchain_metadata". - distance_strategy (DistanceStrategy): Distance strategy to use for vector similarity search. Defaults to COSINE_DISTANCE. - k (int): Number of Documents to return from search. Defaults to 4. - fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. - lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. - index_query_options (QueryOptions): Index query option. - - Returns: - AsyncPostgresVectorStore - """ - if metadata_columns and ignore_metadata_columns: - raise ValueError( - "Can not use both metadata_columns and ignore_metadata_columns." - ) - # Get field type information - async with engine._pool.connect() as conn: - result = await conn.execute( - text( - f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table_name}'AND table_schema = '{schema_name}'" - ) - ) - result_map = result.mappings() - results = result_map.fetchall() - - columns = {} - for field in results: - columns[field["column_name"]] = field["data_type"] - - # Check columns - if id_column not in columns: - raise ValueError(f"Id column, {id_column}, does not exist.") - if content_column not in columns: - raise ValueError(f"Content column, {content_column}, does not exist.") - content_type = columns[content_column] - if content_type != "text" and "char" not in content_type: - raise ValueError( - f"Content column, {content_column}, is type, {content_type}. It must be a type of character string." - ) - if embedding_column not in columns: - raise ValueError(f"Embedding column, {embedding_column}, does not exist.") - if columns[embedding_column] != "USER-DEFINED": - raise ValueError( - f"Embedding column, {embedding_column}, is not type Vector." - ) - - metadata_json_column = ( - None if metadata_json_column not in columns else metadata_json_column - ) - - # If using metadata_columns check to make sure column exists - for column in metadata_columns: - if column not in columns: - raise ValueError(f"Metadata column, {column}, does not exist.") - - # If using ignore_metadata_columns, filter out known columns and set known metadata columns - all_columns = columns - if ignore_metadata_columns: - for column in ignore_metadata_columns: - del all_columns[column] - - del all_columns[id_column] - del all_columns[content_column] - del all_columns[embedding_column] - metadata_columns = [k for k in all_columns.keys()] - - return cls( - cls.__create_key, - engine._pool, - embedding_service, - table_name, - schema_name, - content_column, - embedding_column, - metadata_columns, - id_column, - metadata_json_column, - distance_strategy, - k, - fetch_k, - lambda_mult, - index_query_options, - ) - - @property - def embeddings(self) -> Embeddings: - return self.embedding_service - - async def __aadd_embeddings( - self, - texts: Iterable[str], - embeddings: list[list[float]], - metadatas: Optional[list[dict]] = None, - ids: Optional[list] = None, - **kwargs: Any, - ) -> list[str]: - """Add embeddings to the table. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - """ - if not ids: - ids = [str(uuid.uuid4()) for _ in texts] - else: - # This is done to fill in any missing ids - ids = [id if id is not None else str(uuid.uuid4()) for id in ids] - if not metadatas: - metadatas = [{} for _ in texts] - # Insert embeddings - for id, content, embedding, metadata in zip(ids, texts, embeddings, metadatas): - metadata_col_names = ( - ", " + ", ".join(f'"{col}"' for col in self.metadata_columns) - if len(self.metadata_columns) > 0 - else "" - ) - insert_stmt = f'INSERT INTO "{self.schema_name}"."{self.table_name}"("{self.id_column}", "{self.content_column}", "{self.embedding_column}"{metadata_col_names}' - values = { - "langchain_id": id, - "content": content, - "embedding": str([float(dimension) for dimension in embedding]), - } - values_stmt = "VALUES (:langchain_id, :content, :embedding" - - # Add metadata - extra = copy.deepcopy(metadata) - for metadata_column in self.metadata_columns: - if metadata_column in metadata: - values_stmt += f", :{metadata_column}" - values[metadata_column] = metadata[metadata_column] - del extra[metadata_column] - else: - values_stmt += ",null" - - # Add JSON column and/or close statement - insert_stmt += ( - f""", "{self.metadata_json_column}")""" - if self.metadata_json_column - else ")" - ) - if self.metadata_json_column: - values_stmt += ", :extra)" - values["extra"] = json.dumps(extra) - else: - values_stmt += ")" - - upsert_stmt = f' ON CONFLICT ("{self.id_column}") DO UPDATE SET "{self.content_column}" = EXCLUDED."{self.content_column}", "{self.embedding_column}" = EXCLUDED."{self.embedding_column}"' - - if self.metadata_json_column: - upsert_stmt += f', "{self.metadata_json_column}" = EXCLUDED."{self.metadata_json_column}"' - - for column in self.metadata_columns: - upsert_stmt += f', "{column}" = EXCLUDED."{column}"' - - upsert_stmt += ";" - - query = insert_stmt + values_stmt + upsert_stmt - async with self.pool.connect() as conn: - await conn.execute(text(query), values) - await conn.commit() - - return ids - - async def aget_by_ids(self, ids: Sequence[str]) -> list[Document]: - """Get documents by ids.""" - - quoted_ids = [f"'{id_val}'" for id_val in ids] - id_list_str = ", ".join(quoted_ids) - - columns = self.metadata_columns + [ - self.id_column, - self.content_column, - ] - if self.metadata_json_column: - columns.append(self.metadata_json_column) - - column_names = ", ".join(f'"{col}"' for col in columns) - - query = f'SELECT {column_names} FROM "{self.schema_name}"."{self.table_name}" WHERE "{self.id_column}" IN ({id_list_str});' - - async with self.pool.connect() as conn: - result = await conn.execute(text(query)) - result_map = result.mappings() - results = result_map.fetchall() - - documents = [] - for row in results: - metadata = ( - row[self.metadata_json_column] - if self.metadata_json_column and row[self.metadata_json_column] - else {} - ) - for col in self.metadata_columns: - metadata[col] = row[col] - documents.append( - ( - Document( - page_content=row[self.content_column], - metadata=metadata, - id=str(row[self.id_column]), - ) - ) - ) - - return documents - - async def aadd_texts( - self, - texts: Iterable[str], - metadatas: Optional[list[dict]] = None, - ids: Optional[list] = None, - **kwargs: Any, - ) -> list[str]: - """Embed texts and add to the table. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - """ - embeddings = self.embedding_service.embed_documents(list(texts)) - ids = await self.__aadd_embeddings( - texts, embeddings, metadatas=metadatas, ids=ids, **kwargs - ) - return ids - - async def aadd_documents( - self, - documents: list[Document], - ids: Optional[list] = None, - **kwargs: Any, - ) -> list[str]: - """Embed documents and add to the table. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - """ - texts = [doc.page_content for doc in documents] - metadatas = [doc.metadata for doc in documents] - if not ids: - ids = [doc.id for doc in documents] - ids = await self.aadd_texts(texts, metadatas=metadatas, ids=ids, **kwargs) - return ids - - async def adelete( - self, - ids: Optional[list] = None, - **kwargs: Any, - ) -> Optional[bool]: - """Delete records from the table. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - """ - if not ids: - return False - - id_list = ", ".join([f"'{id}'" for id in ids]) - query = f'DELETE FROM "{self.schema_name}"."{self.table_name}" WHERE {self.id_column} in ({id_list})' - async with self.pool.connect() as conn: - await conn.execute(text(query)) - await conn.commit() - return True - - @classmethod - async def afrom_texts( # type: ignore[override] - cls: type[AsyncPostgresVectorStore], - texts: list[str], - embedding: Embeddings, - engine: PostgresEngine, - table_name: str, - schema_name: str = "public", - metadatas: Optional[list[dict]] = None, - ids: Optional[list] = None, - content_column: str = "content", - embedding_column: str = "embedding", - metadata_columns: list[str] = [], - ignore_metadata_columns: Optional[list[str]] = None, - id_column: str = "langchain_id", - metadata_json_column: str = "langchain_metadata", - distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - index_query_options: Optional[QueryOptions] = None, - **kwargs: Any, - ) -> AsyncPostgresVectorStore: - """Create an AsyncPostgresVectorStore instance from texts. - - Args: - texts (list[str]): Texts to add to the vector store. - embedding (Embeddings): Text embedding model to use. - engine (PostgresEngine): Connection pool engine for managing connections to Postgres database. - table_name (str): Name of the existing table or the table to be created. - schema_name (str, optional): Database schema name of the table. Defaults to "public". - metadatas (Optional[list[dict]]): List of metadatas to add to table records. - ids: (Optional[list[str]]): List of IDs to add to table records. - content_column (str): Column that represent a Document’s page_content. Defaults to "content". - embedding_column (str): Column for embedding vectors. The embedding is generated from the document value. Defaults to "embedding". - metadata_columns (list[str]): Column(s) that represent a document's metadata. - ignore_metadata_columns (list[str]): Column(s) to ignore in pre-existing tables for a document's metadata. Can not be used with metadata_columns. Defaults to None. - id_column (str): Column that represents the Document's id. Defaults to "langchain_id". - metadata_json_column (str): Column to store metadata as JSON. Defaults to "langchain_metadata". - distance_strategy (DistanceStrategy): Distance strategy to use for vector similarity search. Defaults to COSINE_DISTANCE. - k (int): Number of Documents to return from search. Defaults to 4. - fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. - lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. - index_query_options (QueryOptions): Index query option. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - - Returns: - AsyncPostgresVectorStore - """ - vs = await cls.create( - engine, - embedding, - table_name, - schema_name, - content_column, - embedding_column, - metadata_columns, - ignore_metadata_columns, - id_column, - metadata_json_column, - distance_strategy, - k, - fetch_k, - lambda_mult, - index_query_options, - ) - await vs.aadd_texts(texts, metadatas=metadatas, ids=ids, **kwargs) - return vs - - @classmethod - async def afrom_documents( # type: ignore[override] - cls: type[AsyncPostgresVectorStore], - documents: list[Document], - embedding: Embeddings, - engine: PostgresEngine, - table_name: str, - schema_name: str = "public", - ids: Optional[list] = None, - content_column: str = "content", - embedding_column: str = "embedding", - metadata_columns: list[str] = [], - ignore_metadata_columns: Optional[list[str]] = None, - id_column: str = "langchain_id", - metadata_json_column: str = "langchain_metadata", - distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - index_query_options: Optional[QueryOptions] = None, - **kwargs: Any, - ) -> AsyncPostgresVectorStore: - """Create an AsyncPostgresVectorStore instance from documents. - - Args: - documents (list[Document]): Documents to add to the vector store. - embedding (Embeddings): Text embedding model to use. - engine (PostgresEngine): Connection pool engine for managing connections to Postgres database. - table_name (str): Name of the existing table or the table to be created. - schema_name (str, optional): Database schema name of the table. Defaults to "public". - metadatas (Optional[list[dict]]): List of metadatas to add to table records. - ids: (Optional[list[str]]): List of IDs to add to table records. - content_column (str): Column that represent a Document's page_content. Defaults to "content". - embedding_column (str): Column for embedding vectors. The embedding is generated from the document value. Defaults to "embedding". - metadata_columns (list[str]): Column(s) that represent a document's metadata. - ignore_metadata_columns (list[str]): Column(s) to ignore in pre-existing tables for a document's metadata. Can not be used with metadata_columns. Defaults to None. - id_column (str): Column that represents the Document's id. Defaults to "langchain_id". - metadata_json_column (str): Column to store metadata as JSON. Defaults to "langchain_metadata". - distance_strategy (DistanceStrategy): Distance strategy to use for vector similarity search. Defaults to COSINE_DISTANCE. - k (int): Number of Documents to return from search. Defaults to 4. - fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. - lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. - index_query_options (QueryOptions): Index query option. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - - Returns: - AsyncPostgresVectorStore - """ - vs = await cls.create( - engine, - embedding, - table_name, - schema_name, - content_column, - embedding_column, - metadata_columns, - ignore_metadata_columns, - id_column, - metadata_json_column, - distance_strategy, - k, - fetch_k, - lambda_mult, - index_query_options, - ) - texts = [doc.page_content for doc in documents] - metadatas = [doc.metadata for doc in documents] - await vs.aadd_texts(texts, metadatas=metadatas, ids=ids, **kwargs) - return vs - - async def __query_collection( - self, - embedding: list[float], - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> Sequence[RowMapping]: - """Perform similarity search query on the vector store table.""" - k = k if k else self.k - operator = self.distance_strategy.operator - search_function = self.distance_strategy.search_function - - columns = self.metadata_columns + [ - self.id_column, - self.content_column, - self.embedding_column, - ] - if self.metadata_json_column: - columns.append(self.metadata_json_column) - - column_names = ", ".join(f'"{col}"' for col in columns) - - if filter and isinstance(filter, dict): - filter = self._create_filter_clause(filter) - filter = f"WHERE {filter}" if filter else "" - embedding_string = f"'{[float(dimension) for dimension in embedding]}'" - stmt = f'SELECT {column_names}, {search_function}({self.embedding_column}, {embedding_string}) as distance FROM "{self.schema_name}"."{self.table_name}" {filter} ORDER BY {self.embedding_column} {operator} {embedding_string} LIMIT {k};' - if self.index_query_options: - async with self.pool.connect() as conn: - await conn.execute( - text(f"SET LOCAL {self.index_query_options.to_string()};") - ) - result = await conn.execute(text(stmt)) - result_map = result.mappings() - results = result_map.fetchall() - else: - async with self.pool.connect() as conn: - result = await conn.execute(text(stmt)) - result_map = result.mappings() - results = result_map.fetchall() - return results - - async def asimilarity_search( - self, - query: str, - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - """Return docs selected by similarity search on query.""" - embedding = self.embedding_service.embed_query(text=query) - - return await self.asimilarity_search_by_vector( - embedding=embedding, k=k, filter=filter, **kwargs - ) - - def _select_relevance_score_fn(self) -> Callable[[float], float]: - """Select a relevance function based on distance strategy.""" - # Calculate distance strategy provided in - # vectorstore constructor - if self.distance_strategy == DistanceStrategy.COSINE_DISTANCE: - return self._cosine_relevance_score_fn - if self.distance_strategy == DistanceStrategy.INNER_PRODUCT: - return self._max_inner_product_relevance_score_fn - elif self.distance_strategy == DistanceStrategy.EUCLIDEAN: - return self._euclidean_relevance_score_fn - - async def asimilarity_search_with_score( - self, - query: str, - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - """Return docs and distance scores selected by similarity search on query.""" - embedding = self.embedding_service.embed_query(query) - docs = await self.asimilarity_search_with_score_by_vector( - embedding=embedding, k=k, filter=filter, **kwargs - ) - return docs - - async def asimilarity_search_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - """Return docs selected by vector similarity search.""" - docs_and_scores = await self.asimilarity_search_with_score_by_vector( - embedding=embedding, k=k, filter=filter, **kwargs - ) - - return [doc for doc, _ in docs_and_scores] - - async def asimilarity_search_with_score_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - """Return docs and distance scores selected by vector similarity search.""" - results = await self.__query_collection( - embedding=embedding, k=k, filter=filter, **kwargs - ) - - documents_with_scores = [] - for row in results: - metadata = ( - row[self.metadata_json_column] - if self.metadata_json_column and row[self.metadata_json_column] - else {} - ) - for col in self.metadata_columns: - metadata[col] = row[col] - documents_with_scores.append( - ( - Document( - page_content=row[self.content_column], - metadata=metadata, - id=str(row[self.id_column]), - ), - row["distance"], - ) - ) - - return documents_with_scores - - async def amax_marginal_relevance_search( - self, - query: str, - k: Optional[int] = None, - fetch_k: Optional[int] = None, - lambda_mult: Optional[float] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - """Return docs selected using the maximal marginal relevance.""" - embedding = self.embedding_service.embed_query(text=query) - - return await self.amax_marginal_relevance_search_by_vector( - embedding=embedding, - k=k, - fetch_k=fetch_k, - lambda_mult=lambda_mult, - filter=filter, - **kwargs, - ) - - async def amax_marginal_relevance_search_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - fetch_k: Optional[int] = None, - lambda_mult: Optional[float] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - """Return docs selected using the maximal marginal relevance.""" - docs_and_scores = ( - await self.amax_marginal_relevance_search_with_score_by_vector( - embedding, - k=k, - fetch_k=fetch_k, - lambda_mult=lambda_mult, - filter=filter, - **kwargs, - ) - ) - - return [result[0] for result in docs_and_scores] - - async def amax_marginal_relevance_search_with_score_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - fetch_k: Optional[int] = None, - lambda_mult: Optional[float] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - """Return docs and distance scores selected using the maximal marginal relevance.""" - results = await self.__query_collection( - embedding=embedding, k=fetch_k, filter=filter, **kwargs - ) - - k = k if k else self.k - fetch_k = fetch_k if fetch_k else self.fetch_k - lambda_mult = lambda_mult if lambda_mult else self.lambda_mult - embedding_list = [json.loads(row[self.embedding_column]) for row in results] - mmr_selected = utils.maximal_marginal_relevance( - np.array(embedding, dtype=np.float32), - embedding_list, - k=k, - lambda_mult=lambda_mult, - ) - - documents_with_scores = [] - for row in results: - metadata = ( - row[self.metadata_json_column] - if self.metadata_json_column and row[self.metadata_json_column] - else {} - ) - for col in self.metadata_columns: - metadata[col] = row[col] - documents_with_scores.append( - ( - Document( - page_content=row[self.content_column], - metadata=metadata, - id=str(row[self.id_column]), - ), - row["distance"], - ) - ) - - return [r for i, r in enumerate(documents_with_scores) if i in mmr_selected] - - async def aapply_vector_index( - self, - index: BaseIndex, - name: Optional[str] = None, - concurrently: bool = False, - ) -> None: - """Create an index on the vector store table.""" - if isinstance(index, ExactNearestNeighbor): - await self.adrop_vector_index() - return - - filter = f"WHERE ({index.partial_indexes})" if index.partial_indexes else "" - params = "WITH " + index.index_options() - function = index.distance_strategy.index_function - if name is None: - if index.name == None: - index.name = self.table_name + DEFAULT_INDEX_NAME_SUFFIX - name = index.name - stmt = f'CREATE INDEX {"CONCURRENTLY" if concurrently else ""} {name} ON "{self.schema_name}"."{self.table_name}" USING {index.index_type} ({self.embedding_column} {function}) {params} {filter};' - if concurrently: - async with self.pool.connect() as conn: - await conn.execute(text("COMMIT")) - await conn.execute(text(stmt)) - else: - async with self.pool.connect() as conn: - await conn.execute(text(stmt)) - await conn.commit() - - async def areindex(self, index_name: Optional[str] = None) -> None: - """Re-index the vector store table.""" - index_name = index_name or self.table_name + DEFAULT_INDEX_NAME_SUFFIX - query = f"REINDEX INDEX {index_name};" - async with self.pool.connect() as conn: - await conn.execute(text(query)) - await conn.commit() - - async def adrop_vector_index( - self, - index_name: Optional[str] = None, - ) -> None: - """Drop the vector index.""" - index_name = index_name or self.table_name + DEFAULT_INDEX_NAME_SUFFIX - query = f"DROP INDEX IF EXISTS {index_name};" - async with self.pool.connect() as conn: - await conn.execute(text(query)) - await conn.commit() - - async def is_valid_index( - self, - index_name: Optional[str] = None, - ) -> bool: - """Check if index exists in the table.""" - index_name = index_name or self.table_name + DEFAULT_INDEX_NAME_SUFFIX - stmt = f""" - SELECT tablename, indexname - FROM pg_indexes - WHERE tablename = '{self.table_name}' AND schemaname = '{self.schema_name}' AND indexname = '{index_name}'; - """ - async with self.pool.connect() as conn: - result = await conn.execute(text(stmt)) - result_map = result.mappings() - results = result_map.fetchall() - - return bool(len(results) == 1) - - def _handle_field_filter( - self, - field: str, - value: Any, - ) -> str: - """Create a filter for a specific field. - Args: - field: name of field - value: value to filter - If provided as is then this will be an equality filter - If provided as a dictionary then this will be a filter, the key - will be the operator and the value will be the value to filter by - Returns: - sql where query as a string - """ - if not isinstance(field, str): - raise ValueError( - f"field should be a string but got: {type(field)} with value: {field}" - ) - - if field.startswith("$"): - raise ValueError( - f"Invalid filter condition. Expected a field but got an operator: " - f"{field}" - ) - - # Allow [a-zA-Z0-9_], disallow $ for now until we support escape characters - if not field.isidentifier(): - raise ValueError( - f"Invalid field name: {field}. Expected a valid identifier." - ) - - if isinstance(value, dict): - # This is a filter specification - if len(value) != 1: - raise ValueError( - "Invalid filter condition. Expected a value which " - "is a dictionary with a single key that corresponds to an operator " - f"but got a dictionary with {len(value)} keys. The first few " - f"keys are: {list(value.keys())[:3]}" - ) - operator, filter_value = list(value.items())[0] - # Verify that that operator is an operator - if operator not in SUPPORTED_OPERATORS: - raise ValueError( - f"Invalid operator: {operator}. " - f"Expected one of {SUPPORTED_OPERATORS}" - ) - else: # Then we assume an equality operator - operator = "$eq" - filter_value = value - - if operator in COMPARISONS_TO_NATIVE: - # Then we implement an equality filter - # native is trusted input - if isinstance(filter_value, str): - filter_value = f"'{filter_value}'" - native = COMPARISONS_TO_NATIVE[operator] - return f"({field} {native} {filter_value})" - elif operator == "$between": - # Use AND with two comparisons - low, high = filter_value - - return f"({field} BETWEEN {low} AND {high})" - elif operator in {"$in", "$nin", "$like", "$ilike"}: - # We'll do force coercion to text - if operator in {"$in", "$nin"}: - for val in filter_value: - if not isinstance(val, (str, int, float)): - raise NotImplementedError( - f"Unsupported type: {type(val)} for value: {val}" - ) - - if isinstance(val, bool): # b/c bool is an instance of int - raise NotImplementedError( - f"Unsupported type: {type(val)} for value: {val}" - ) - - if operator in {"$in"}: - values = str(tuple(val for val in filter_value)) - return f"({field} IN {values})" - elif operator in {"$nin"}: - values = str(tuple(val for val in filter_value)) - return f"({field} NOT IN {values})" - elif operator in {"$like"}: - return f"({field} LIKE '{filter_value}')" - elif operator in {"$ilike"}: - return f"({field} ILIKE '{filter_value}')" - else: - raise NotImplementedError() - elif operator == "$exists": - if not isinstance(filter_value, bool): - raise ValueError( - "Expected a boolean value for $exists " - f"operator, but got: {filter_value}" - ) - else: - if filter_value: - return f"({field} IS NOT NULL)" - else: - return f"({field} IS NULL)" - else: - raise NotImplementedError() - - def _create_filter_clause(self, filters: Any) -> str: - """Create LangChain filter representation to matching SQL where clauses - Args: - filters: Dictionary of filters to apply to the query. - Returns: - String containing the sql where query. - """ - - if not isinstance(filters, dict): - raise ValueError( - f"Invalid type: Expected a dictionary but got type: {type(filters)}" - ) - if len(filters) == 1: - # The only operators allowed at the top level are $AND, $OR, and $NOT - # First check if an operator or a field - key, value = list(filters.items())[0] - if key.startswith("$"): - # Then it's an operator - if key.lower() not in ["$and", "$or", "$not"]: - raise ValueError( - f"Invalid filter condition. Expected $and, $or or $not " - f"but got: {key}" - ) - else: - # Then it's a field - return self._handle_field_filter(key, filters[key]) - - if key.lower() == "$and" or key.lower() == "$or": - if not isinstance(value, list): - raise ValueError( - f"Expected a list, but got {type(value)} for value: {value}" - ) - op = key[1:].upper() # Extract the operator - filter_clause = [self._create_filter_clause(el) for el in value] - if len(filter_clause) > 1: - return f"({f' {op} '.join(filter_clause)})" - elif len(filter_clause) == 1: - return filter_clause[0] - else: - raise ValueError( - "Invalid filter condition. Expected a dictionary " - "but got an empty dictionary" - ) - elif key.lower() == "$not": - if isinstance(value, list): - not_conditions = [ - self._create_filter_clause(item) for item in value - ] - not_stmts = [f"NOT {condition}" for condition in not_conditions] - return f"({' AND '.join(not_stmts)})" - elif isinstance(value, dict): - not_ = self._create_filter_clause(value) - return f"(NOT {not_})" - else: - raise ValueError( - f"Invalid filter condition. Expected a dictionary " - f"or a list but got: {type(value)}" - ) - else: - raise ValueError( - f"Invalid filter condition. Expected $and, $or or $not " - f"but got: {key}" - ) - elif len(filters) > 1: - # Then all keys have to be fields (they cannot be operators) - for key in filters.keys(): - if key.startswith("$"): - raise ValueError( - f"Invalid filter condition. Expected a field but got: {key}" - ) - # These should all be fields and combined using an $and operator - and_ = [self._handle_field_filter(k, v) for k, v in filters.items()] - if len(and_) > 1: - return f"({' AND '.join(and_)})" - elif len(and_) == 1: - return and_[0] - else: - raise ValueError( - "Invalid filter condition. Expected a dictionary " - "but got an empty dictionary" - ) - else: - return "" - - def get_by_ids(self, ids: Sequence[str]) -> list[Document]: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) - - def similarity_search( - self, - query: str, - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) - - def add_texts( - self, - texts: Iterable[str], - metadatas: Optional[list[dict]] = None, - ids: Optional[list] = None, - **kwargs: Any, - ) -> list[str]: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) - - def add_documents( - self, - documents: list[Document], - ids: Optional[list] = None, - **kwargs: Any, - ) -> list[str]: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) - - def delete( - self, - ids: Optional[list] = None, - **kwargs: Any, - ) -> Optional[bool]: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) - - @classmethod - def from_texts( # type: ignore[override] - cls: type[AsyncPostgresVectorStore], - texts: list[str], - embedding: Embeddings, - engine: PostgresEngine, - table_name: str, - metadatas: Optional[list[dict]] = None, - ids: Optional[list] = None, - content_column: str = "content", - embedding_column: str = "embedding", - metadata_columns: list[str] = [], - ignore_metadata_columns: Optional[list[str]] = None, - id_column: str = "langchain_id", - metadata_json_column: str = "langchain_metadata", - **kwargs: Any, - ) -> AsyncPostgresVectorStore: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) - - @classmethod - def from_documents( # type: ignore[override] - cls: type[AsyncPostgresVectorStore], - documents: list[Document], - embedding: Embeddings, - engine: PostgresEngine, - table_name: str, - ids: Optional[list] = None, - content_column: str = "content", - embedding_column: str = "embedding", - metadata_columns: list[str] = [], - ignore_metadata_columns: Optional[list[str]] = None, - id_column: str = "langchain_id", - metadata_json_column: str = "langchain_metadata", - **kwargs: Any, - ) -> AsyncPostgresVectorStore: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) - - def similarity_search_with_score( - self, - query: str, - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) - - def similarity_search_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) - - def similarity_search_with_score_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) - - def max_marginal_relevance_search( - self, - query: str, - k: Optional[int] = None, - fetch_k: Optional[int] = None, - lambda_mult: Optional[float] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) - - def max_marginal_relevance_search_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - fetch_k: Optional[int] = None, - lambda_mult: Optional[float] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) - - def max_marginal_relevance_search_with_score_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - fetch_k: Optional[int] = None, - lambda_mult: Optional[float] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - raise NotImplementedError( - "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." - ) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) diff --git a/src/langchain_google_cloud_sql_pg/vectorstore.py b/src/langchain_google_cloud_sql_pg/vectorstore.py index f5333fd6..75598b85 100644 --- a/src/langchain_google_cloud_sql_pg/vectorstore.py +++ b/src/langchain_google_cloud_sql_pg/vectorstore.py @@ -15,12 +15,12 @@ # TODO: Remove below import when minimum supported Python version is 3.10 from __future__ import annotations -from typing import Any, Callable, Iterable, Optional, Sequence +from typing import Optional -import numpy as np -from langchain_core.documents import Document from langchain_core.embeddings import Embeddings -from langchain_core.vectorstores import VectorStore +from langchain_postgres import PGVectorStore + +from langchain_google_cloud_sql_pg import HybridSearchConfig from .async_vectorstore import AsyncPostgresVectorStore from .engine import PostgresEngine @@ -32,41 +32,22 @@ ) -class PostgresVectorStore(VectorStore): +class PostgresVectorStore(PGVectorStore): """Google Cloud SQL for PostgreSQL Vector Store class""" - __create_key = object() - - def __init__( - self, key: object, engine: PostgresEngine, vs: AsyncPostgresVectorStore - ): - """PostgresVectorStore constructor. - Args: - key (object): Prevent direct constructor usage. - engine (PostgresEngine): Connection pool engine for managing connections to Postgres database. - vs (AsyncPostgresVectorstore): The async only VectorStore implementation - - Raises: - Exception: If called directly by user. - """ - if key != PostgresVectorStore.__create_key: - raise Exception( - "Only create class through 'create' or 'create_sync' methods!" - ) - - self._engine = engine - self.__vs = vs + _engine: PostgresEngine + __vs: AsyncPostgresVectorStore @classmethod async def create( cls, - engine: PostgresEngine, + engine: PostgresEngine, # type: ignore embedding_service: Embeddings, table_name: str, schema_name: str = "public", content_column: str = "content", embedding_column: str = "embedding", - metadata_columns: list[str] = [], + metadata_columns: Optional[list[str]] = None, ignore_metadata_columns: Optional[list[str]] = None, id_column: str = "langchain_id", metadata_json_column: Optional[str] = "langchain_metadata", @@ -75,6 +56,7 @@ async def create( fetch_k: int = 20, lambda_mult: float = 0.5, index_query_options: Optional[QueryOptions] = None, + hybrid_search_config: Optional[HybridSearchConfig] = None, ) -> PostgresVectorStore: """Create a new PostgresVectorStore instance. @@ -94,6 +76,7 @@ async def create( fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. index_query_options (QueryOptions): Index query option. + hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None. Returns: PostgresVectorStore @@ -102,32 +85,33 @@ async def create( engine, embedding_service, table_name, - schema_name, - content_column, - embedding_column, - metadata_columns, - ignore_metadata_columns, - id_column, - metadata_json_column, - distance_strategy, - k, - fetch_k, - lambda_mult, - index_query_options, + schema_name=schema_name, + content_column=content_column, + embedding_column=embedding_column, + metadata_columns=metadata_columns, + ignore_metadata_columns=ignore_metadata_columns, + metadata_json_column=metadata_json_column, + id_column=id_column, + distance_strategy=distance_strategy, + k=k, + fetch_k=fetch_k, + lambda_mult=lambda_mult, + index_query_options=index_query_options, + hybrid_search_config=hybrid_search_config, ) vs = await engine._run_as_async(coro) - return cls(cls.__create_key, engine, vs) + return cls(cls._PGVectorStore__create_key, engine, vs) # type: ignore @classmethod def create_sync( cls, - engine: PostgresEngine, + engine: PostgresEngine, # type: ignore embedding_service: Embeddings, table_name: str, schema_name: str = "public", content_column: str = "content", embedding_column: str = "embedding", - metadata_columns: list[str] = [], + metadata_columns: Optional[list[str]] = None, ignore_metadata_columns: Optional[list[str]] = None, id_column: str = "langchain_id", metadata_json_column: str = "langchain_metadata", @@ -136,6 +120,7 @@ def create_sync( fetch_k: int = 20, lambda_mult: float = 0.5, index_query_options: Optional[QueryOptions] = None, + hybrid_search_config: Optional[HybridSearchConfig] = None, ) -> PostgresVectorStore: """Create a new PostgresVectorStore instance. @@ -155,6 +140,7 @@ def create_sync( fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. index_query_options (QueryOptions): Index query option. + hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None. Returns: PostgresVectorStore @@ -163,661 +149,19 @@ def create_sync( engine, embedding_service, table_name, - schema_name, - content_column, - embedding_column, - metadata_columns, - ignore_metadata_columns, - id_column, - metadata_json_column, - distance_strategy, - k, - fetch_k, - lambda_mult, - index_query_options, + schema_name=schema_name, + content_column=content_column, + embedding_column=embedding_column, + metadata_columns=metadata_columns, + ignore_metadata_columns=ignore_metadata_columns, + metadata_json_column=metadata_json_column, + id_column=id_column, + distance_strategy=distance_strategy, + k=k, + fetch_k=fetch_k, + lambda_mult=lambda_mult, + index_query_options=index_query_options, + hybrid_search_config=hybrid_search_config, ) vs = engine._run_as_sync(coro) - return cls(cls.__create_key, engine, vs) - - @property - def embeddings(self) -> Embeddings: - return self.__vs.embedding_service - - async def aadd_texts( - self, - texts: Iterable[str], - metadatas: Optional[list[dict]] = None, - ids: Optional[list] = None, - **kwargs: Any, - ) -> list[str]: - """Embed texts and add to the table. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - """ - return await self._engine._run_as_async( - self.__vs.aadd_texts(texts, metadatas, ids, **kwargs) - ) - - def add_texts( - self, - texts: Iterable[str], - metadatas: Optional[list[dict]] = None, - ids: Optional[list] = None, - **kwargs: Any, - ) -> list[str]: - """Embed texts and add to the table. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - """ - return self._engine._run_as_sync( - self.__vs.aadd_texts(texts, metadatas, ids, **kwargs) - ) - - async def aadd_documents( - self, - documents: list[Document], - ids: Optional[list] = None, - **kwargs: Any, - ) -> list[str]: - """Embed documents and add to the table. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - """ - return await self._engine._run_as_async( - self.__vs.aadd_documents(documents, ids, **kwargs) - ) - - def add_documents( - self, - documents: list[Document], - ids: Optional[list] = None, - **kwargs: Any, - ) -> list[str]: - """Embed documents and add to the table. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - """ - return self._engine._run_as_sync( - self.__vs.aadd_documents(documents, ids, **kwargs) - ) - - async def adelete( - self, - ids: Optional[list] = None, - **kwargs: Any, - ) -> Optional[bool]: - """Delete records from the table. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - """ - return await self._engine._run_as_async(self.__vs.adelete(ids, **kwargs)) - - def delete( - self, - ids: Optional[list] = None, - **kwargs: Any, - ) -> Optional[bool]: - """Delete records from the table. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - """ - return self._engine._run_as_sync(self.__vs.adelete(ids, **kwargs)) - - @classmethod - async def afrom_texts( # type: ignore[override] - cls: type[PostgresVectorStore], - texts: list[str], - embedding: Embeddings, - engine: PostgresEngine, - table_name: str, - schema_name: str = "public", - metadatas: Optional[list[dict]] = None, - ids: Optional[list] = None, - content_column: str = "content", - embedding_column: str = "embedding", - metadata_columns: list[str] = [], - ignore_metadata_columns: Optional[list[str]] = None, - id_column: str = "langchain_id", - metadata_json_column: str = "langchain_metadata", - distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - index_query_options: Optional[QueryOptions] = None, - ) -> PostgresVectorStore: - """Create an PostgresVectorStore instance from texts. - - Args: - texts (list[str]): Texts to add to the vector store. - embedding (Embeddings): Text embedding model to use. - engine (PostgresEngine): Connection pool engine for managing connections to Postgres database. - table_name (str): Name of the existing table or the table to be created. - schema_name (str, optional): Database schema name of the table. Defaults to "public". - metadatas (Optional[list[dict]]): List of metadatas to add to table records. - ids: (Optional[list]): List of IDs to add to table records. - content_column (str): Column that represent a Document’s page_content. Defaults to "content". - embedding_column (str): Column for embedding vectors. The embedding is generated from the document value. Defaults to "embedding". - metadata_columns (list[str]): Column(s) that represent a document's metadata. - ignore_metadata_columns (list[str]): Column(s) to ignore in pre-existing tables for a document's metadata. Can not be used with metadata_columns. Defaults to None. - id_column (str): Column that represents the Document's id. Defaults to "langchain_id". - metadata_json_column (str): Column to store metadata as JSON. Defaults to "langchain_metadata". - distance_strategy (DistanceStrategy): Distance strategy to use for vector similarity search. Defaults to COSINE_DISTANCE. - k (int): Number of Documents to return from search. Defaults to 4. - fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. - lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. - index_query_options (QueryOptions): Index query option. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - - Returns: - PostgresVectorStore - """ - vs = await cls.create( - engine, - embedding, - table_name, - schema_name, - content_column, - embedding_column, - metadata_columns, - ignore_metadata_columns, - id_column, - metadata_json_column, - distance_strategy, - k, - fetch_k, - lambda_mult, - index_query_options, - ) - await vs.aadd_texts(texts, metadatas=metadatas, ids=ids) - return vs - - @classmethod - async def afrom_documents( # type: ignore[override] - cls: type[PostgresVectorStore], - documents: list[Document], - embedding: Embeddings, - engine: PostgresEngine, - table_name: str, - schema_name: str = "public", - ids: Optional[list] = None, - content_column: str = "content", - embedding_column: str = "embedding", - metadata_columns: list[str] = [], - ignore_metadata_columns: Optional[list[str]] = None, - id_column: str = "langchain_id", - metadata_json_column: str = "langchain_metadata", - distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - index_query_options: Optional[QueryOptions] = None, - ) -> PostgresVectorStore: - """Create an PostgresVectorStore instance from documents. - - Args: - documents (list[Document]): Documents to add to the vector store. - embedding (Embeddings): Text embedding model to use. - engine (PostgresEngine): Connection pool engine for managing connections to Postgres database. - table_name (str): Name of the existing table or the table to be created. - schema_name (str, optional): Database schema name of the table. Defaults to "public". - metadatas (Optional[list[dict]]): List of metadatas to add to table records. - ids: (Optional[list]): List of IDs to add to table records. - content_column (str): Column that represent a Document’s page_content. Defaults to "content". - embedding_column (str): Column for embedding vectors. The embedding is generated from the document value. Defaults to "embedding". - metadata_columns (list[str]): Column(s) that represent a document's metadata. - ignore_metadata_columns (list[str]): Column(s) to ignore in pre-existing tables for a document's metadata. Can not be used with metadata_columns. Defaults to None. - id_column (str): Column that represents the Document's id. Defaults to "langchain_id". - metadata_json_column (str): Column to store metadata as JSON. Defaults to "langchain_metadata". - distance_strategy (DistanceStrategy): Distance strategy to use for vector similarity search. Defaults to COSINE_DISTANCE. - k (int): Number of Documents to return from search. Defaults to 4. - fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. - lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. - index_query_options (QueryOptions): Index query option. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - - Returns: - PostgresVectorStore - """ - vs = await cls.create( - engine, - embedding, - table_name, - schema_name, - content_column, - embedding_column, - metadata_columns, - ignore_metadata_columns, - id_column, - metadata_json_column, - distance_strategy, - k, - fetch_k, - lambda_mult, - index_query_options, - ) - await vs.aadd_documents(documents, ids=ids) - return vs - - @classmethod - def from_texts( # type: ignore[override] - cls: type[PostgresVectorStore], - texts: list[str], - embedding: Embeddings, - engine: PostgresEngine, - table_name: str, - schema_name: str = "public", - metadatas: Optional[list[dict]] = None, - ids: Optional[list] = None, - content_column: str = "content", - embedding_column: str = "embedding", - metadata_columns: list[str] = [], - ignore_metadata_columns: Optional[list[str]] = None, - id_column: str = "langchain_id", - metadata_json_column: str = "langchain_metadata", - distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - index_query_options: Optional[QueryOptions] = None, - ) -> PostgresVectorStore: - """Create an PostgresVectorStore instance from texts. - - Args: - texts (list[str]): Texts to add to the vector store. - embedding (Embeddings): Text embedding model to use. - engine (PostgresEngine): Connection pool engine for managing connections to Postgres database. - table_name (str): Name of the existing table or the table to be created. - schema_name (str, optional): Database schema name of the table. Defaults to "public". - metadatas (Optional[list[dict]]): List of metadatas to add to table records. - ids: (Optional[list]): List of IDs to add to table records. - content_column (str): Column that represent a Document’s page_content. Defaults to "content". - embedding_column (str): Column for embedding vectors. The embedding is generated from the document value. Defaults to "embedding". - metadata_columns (list[str]): Column(s) that represent a document's metadata. - ignore_metadata_columns (list[str]): Column(s) to ignore in pre-existing tables for a document's metadata. Can not be used with metadata_columns. Defaults to None. - id_column (str): Column that represents the Document's id. Defaults to "langchain_id". - metadata_json_column (str): Column to store metadata as JSON. Defaults to "langchain_metadata". - distance_strategy (DistanceStrategy): Distance strategy to use for vector similarity search. Defaults to COSINE_DISTANCE. - k (int): Number of Documents to return from search. Defaults to 4. - fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. - lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. - index_query_options (QueryOptions): Index query option. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - - Returns: - PostgresVectorStore - """ - vs = cls.create_sync( - engine, - embedding, - table_name, - schema_name, - content_column, - embedding_column, - metadata_columns, - ignore_metadata_columns, - id_column, - metadata_json_column, - distance_strategy, - k, - fetch_k, - lambda_mult, - index_query_options, - ) - vs.add_texts(texts, metadatas=metadatas, ids=ids) - return vs - - @classmethod - def from_documents( # type: ignore[override] - cls: type[PostgresVectorStore], - documents: list[Document], - embedding: Embeddings, - engine: PostgresEngine, - table_name: str, - schema_name: str = "public", - ids: Optional[list] = None, - content_column: str = "content", - embedding_column: str = "embedding", - metadata_columns: list[str] = [], - ignore_metadata_columns: Optional[list[str]] = None, - id_column: str = "langchain_id", - metadata_json_column: str = "langchain_metadata", - distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - index_query_options: Optional[QueryOptions] = None, - ) -> PostgresVectorStore: - """Create an PostgresVectorStore instance from documents. - - Args: - documents (list[Document]): Documents to add to the vector store. - embedding (Embeddings): Text embedding model to use. - engine (PostgresEngine): Connection pool engine for managing connections to Postgres database. - table_name (str): Name of the existing table or the table to be created. - schema_name (str, optional): Database schema name of the table. Defaults to "public". - metadatas (Optional[list[dict]]): List of metadatas to add to table records. - ids: (Optional[list]): List of IDs to add to table records. - content_column (str): Column that represent a Document’s page_content. Defaults to "content". - embedding_column (str): Column for embedding vectors. The embedding is generated from the document value. Defaults to "embedding". - metadata_columns (list[str]): Column(s) that represent a document's metadata. - ignore_metadata_columns (list[str]): Column(s) to ignore in pre-existing tables for a document's metadata. Can not be used with metadata_columns. Defaults to None. - id_column (str): Column that represents the Document's id. Defaults to "langchain_id". - metadata_json_column (str): Column to store metadata as JSON. Defaults to "langchain_metadata". - distance_strategy (DistanceStrategy): Distance strategy to use for vector similarity search. Defaults to COSINE_DISTANCE. - k (int): Number of Documents to return from search. Defaults to 4. - fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. - lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. - index_query_options (QueryOptions): Index query option. - - Raises: - :class:`InvalidTextRepresentationError `: if the `ids` data type does not match that of the `id_column`. - - Returns: - PostgresVectorStore - """ - vs = cls.create_sync( - engine, - embedding, - table_name, - schema_name, - content_column, - embedding_column, - metadata_columns, - ignore_metadata_columns, - id_column, - metadata_json_column, - distance_strategy, - k, - fetch_k, - lambda_mult, - index_query_options, - ) - vs.add_documents(documents, ids=ids) - return vs - - async def asimilarity_search( - self, - query: str, - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - """Return docs selected by similarity search on query.""" - return await self._engine._run_as_async( - self.__vs.asimilarity_search(query, k, filter, **kwargs) - ) - - def similarity_search( - self, - query: str, - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - """Return docs selected by similarity search on query.""" - return self._engine._run_as_sync( - self.__vs.asimilarity_search(query, k, filter, **kwargs) - ) - - # Required for (a)similarity_search_with_relevance_scores - def _select_relevance_score_fn(self) -> Callable[[float], float]: - """Select a relevance function based on distance strategy.""" - # Calculate distance strategy provided in vectorstore constructor - if self.__vs.distance_strategy == DistanceStrategy.COSINE_DISTANCE: - return self._cosine_relevance_score_fn - if self.__vs.distance_strategy == DistanceStrategy.INNER_PRODUCT: - return self._max_inner_product_relevance_score_fn - elif self.__vs.distance_strategy == DistanceStrategy.EUCLIDEAN: - return self._euclidean_relevance_score_fn - - async def asimilarity_search_with_score( - self, - query: str, - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - """Return docs and distance scores selected by similarity search on query.""" - return await self._engine._run_as_async( - self.__vs.asimilarity_search_with_score(query, k, filter, **kwargs) - ) - - def similarity_search_with_score( - self, - query: str, - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - """Return docs and distance scores selected by similarity search on query.""" - return self._engine._run_as_sync( - self.__vs.asimilarity_search_with_score(query, k, filter, **kwargs) - ) - - async def asimilarity_search_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - """Return docs selected by vector similarity search.""" - return await self._engine._run_as_async( - self.__vs.asimilarity_search_by_vector(embedding, k, filter, **kwargs) - ) - - def similarity_search_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - """Return docs selected by vector similarity search.""" - return self._engine._run_as_sync( - self.__vs.asimilarity_search_by_vector(embedding, k, filter, **kwargs) - ) - - async def asimilarity_search_with_score_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - """Return docs and distance scores selected by vector similarity search.""" - return await self._engine._run_as_async( - self.__vs.asimilarity_search_with_score_by_vector( - embedding, k, filter, **kwargs - ) - ) - - def similarity_search_with_score_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - """Return docs and distance scores selected by similarity search on vector.""" - return self._engine._run_as_sync( - self.__vs.asimilarity_search_with_score_by_vector( - embedding, k, filter, **kwargs - ) - ) - - async def amax_marginal_relevance_search( - self, - query: str, - k: Optional[int] = None, - fetch_k: Optional[int] = None, - lambda_mult: Optional[float] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - """Return docs selected using the maximal marginal relevance.""" - return await self._engine._run_as_async( - self.__vs.amax_marginal_relevance_search( - query, k, fetch_k, lambda_mult, filter, **kwargs - ) - ) - - def max_marginal_relevance_search( - self, - query: str, - k: Optional[int] = None, - fetch_k: Optional[int] = None, - lambda_mult: Optional[float] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - """Return docs selected using the maximal marginal relevance.""" - return self._engine._run_as_sync( - self.__vs.amax_marginal_relevance_search( - query, k, fetch_k, lambda_mult, filter, **kwargs - ) - ) - - async def amax_marginal_relevance_search_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - fetch_k: Optional[int] = None, - lambda_mult: Optional[float] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - """Return docs selected using the maximal marginal relevance.""" - return await self._engine._run_as_async( - self.__vs.amax_marginal_relevance_search_by_vector( - embedding, k, fetch_k, lambda_mult, filter, **kwargs - ) - ) - - def max_marginal_relevance_search_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - fetch_k: Optional[int] = None, - lambda_mult: Optional[float] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[Document]: - """Return docs selected using the maximal marginal relevance.""" - return self._engine._run_as_sync( - self.__vs.amax_marginal_relevance_search_by_vector( - embedding, k, fetch_k, lambda_mult, filter, **kwargs - ) - ) - - async def amax_marginal_relevance_search_with_score_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - fetch_k: Optional[int] = None, - lambda_mult: Optional[float] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - """Return docs and distance scores selected using the maximal marginal relevance.""" - return await self._engine._run_as_async( - self.__vs.amax_marginal_relevance_search_with_score_by_vector( - embedding, k, fetch_k, lambda_mult, filter, **kwargs - ) - ) - - def max_marginal_relevance_search_with_score_by_vector( - self, - embedding: list[float], - k: Optional[int] = None, - fetch_k: Optional[int] = None, - lambda_mult: Optional[float] = None, - filter: Optional[dict] | Optional[str] = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - """Return docs and distance scores selected using the maximal marginal relevance.""" - return self._engine._run_as_sync( - self.__vs.amax_marginal_relevance_search_with_score_by_vector( - embedding, k, fetch_k, lambda_mult, filter, **kwargs - ) - ) - - async def aapply_vector_index( - self, - index: BaseIndex, - name: Optional[str] = None, - concurrently: bool = False, - ) -> None: - """Create an index on the vector store table.""" - return await self._engine._run_as_async( - self.__vs.aapply_vector_index(index, name, concurrently) - ) - - def apply_vector_index( - self, - index: BaseIndex, - name: Optional[str] = None, - concurrently: bool = False, - ) -> None: - """Create an index on the vector store table.""" - return self._engine._run_as_sync( - self.__vs.aapply_vector_index(index, name, concurrently) - ) - - async def areindex(self, index_name: Optional[str] = None) -> None: - """Re-index the vector store table.""" - return await self._engine._run_as_async(self.__vs.areindex(index_name)) - - def reindex(self, index_name: Optional[str] = None) -> None: - """Re-index the vector store table.""" - return self._engine._run_as_sync(self.__vs.areindex(index_name)) - - async def adrop_vector_index( - self, - index_name: Optional[str] = None, - ) -> None: - """Drop the vector index.""" - return await self._engine._run_as_async( - self.__vs.adrop_vector_index(index_name) - ) - - def drop_vector_index( - self, - index_name: Optional[str] = None, - ) -> None: - """Drop the vector index.""" - return self._engine._run_as_sync(self.__vs.adrop_vector_index(index_name)) - - async def ais_valid_index( - self, - index_name: Optional[str] = None, - ) -> bool: - """Check if index exists in the table.""" - return await self._engine._run_as_async(self.__vs.is_valid_index(index_name)) - - def is_valid_index( - self, - index_name: Optional[str] = None, - ) -> bool: - """Check if index exists in the table.""" - return self._engine._run_as_sync(self.__vs.is_valid_index(index_name)) - - async def aget_by_ids(self, ids: Sequence[str]) -> list[Document]: - """Get documents by ids.""" - return await self._engine._run_as_async(self.__vs.aget_by_ids(ids=ids)) - - def get_by_ids(self, ids: Sequence[str]) -> list[Document]: - """Get documents by ids.""" - return self._engine._run_as_sync(self.__vs.aget_by_ids(ids=ids)) + return cls(cls._PGVectorStore__create_key, engine, vs) # type: ignore diff --git a/tests/test_async_vectorstore.py b/tests/test_async_vectorstore.py index 12fb6506..d0e85d0b 100644 --- a/tests/test_async_vectorstore.py +++ b/tests/test_async_vectorstore.py @@ -28,7 +28,7 @@ DEFAULT_TABLE = "test_table" + str(uuid.uuid4()) DEFAULT_TABLE_SYNC = "test_table_sync" + str(uuid.uuid4()) -CUSTOM_TABLE = "test-table-custom" + str(uuid.uuid4()) +CUSTOM_TABLE = "table-custom" + str(uuid.uuid4()) VECTOR_SIZE = 768 embeddings_service = DeterministicFakeEmbedding(size=VECTOR_SIZE) diff --git a/tests/test_async_vectorstore_from_methods.py b/tests/test_async_vectorstore_from_methods.py index 59274f6a..529675c2 100644 --- a/tests/test_async_vectorstore_from_methods.py +++ b/tests/test_async_vectorstore_from_methods.py @@ -29,9 +29,7 @@ DEFAULT_TABLE = "test_table" + str(uuid.uuid4()).replace("-", "_") DEFAULT_TABLE_SYNC = "test_table_sync" + str(uuid.uuid4()).replace("-", "_") CUSTOM_TABLE = "test_table_custom" + str(uuid.uuid4()).replace("-", "_") -CUSTOM_TABLE_WITH_INT_ID = "test_table_custom_with_int_it" + str(uuid.uuid4()).replace( - "-", "_" -) +CUSTOM_TABLE_WITH_INT_ID = "custom_int" + str(uuid.uuid4()).replace("-", "_") VECTOR_SIZE = 768 diff --git a/tests/test_async_vectorstore_index.py b/tests/test_async_vectorstore_index.py index 68bc4e72..d45e114f 100644 --- a/tests/test_async_vectorstore_index.py +++ b/tests/test_async_vectorstore_index.py @@ -14,7 +14,6 @@ import os -import sys import uuid import pytest @@ -23,7 +22,10 @@ from langchain_core.embeddings import DeterministicFakeEmbedding from sqlalchemy import text -from langchain_google_cloud_sql_pg import PostgresEngine +from langchain_google_cloud_sql_pg import ( # type: ignore + HybridSearchConfig, + PostgresEngine, +) from langchain_google_cloud_sql_pg.async_vectorstore import AsyncPostgresVectorStore from langchain_google_cloud_sql_pg.indexes import ( DEFAULT_INDEX_NAME_SUFFIX, @@ -32,9 +34,11 @@ IVFFlatIndex, ) -DEFAULT_TABLE = "test_table" + str(uuid.uuid4()).replace("-", "_") -CUSTOM_TABLE = "test_table_custom" + str(uuid.uuid4()).replace("-", "_") -DEFAULT_INDEX_NAME = DEFAULT_TABLE + DEFAULT_INDEX_NAME_SUFFIX +UUID_STR = str(uuid.uuid4()).replace("-", "_") +DEFAULT_TABLE = "table" + UUID_STR +SIMPLE_TABLE = "simple" + UUID_STR +DEFAULT_HYBRID_TABLE = "hybrid" + UUID_STR +DEFAULT_INDEX_NAME = DEFAULT_INDEX_NAME_SUFFIX + UUID_STR VECTOR_SIZE = 768 embeddings_service = DeterministicFakeEmbedding(size=VECTOR_SIZE) @@ -90,11 +94,15 @@ async def engine(self, db_project, db_region, db_instance, db_name): ) yield engine await aexecute(engine, f"DROP TABLE IF EXISTS {DEFAULT_TABLE}") + await aexecute(engine, f"DROP TABLE IF EXISTS {DEFAULT_HYBRID_TABLE}") + await aexecute(engine, f"DROP TABLE IF EXISTS {SIMPLE_TABLE}") await engine.close() @pytest_asyncio.fixture(scope="class") async def vs(self, engine): - await engine._ainit_vectorstore_table(DEFAULT_TABLE, VECTOR_SIZE) + await engine._ainit_vectorstore_table( + DEFAULT_TABLE, VECTOR_SIZE, overwrite_existing=True + ) vs = await AsyncPostgresVectorStore.create( engine, embedding_service=embeddings_service, @@ -105,9 +113,26 @@ async def vs(self, engine): await vs.adrop_vector_index() yield vs - async def test_aapply_vector_index(self, vs): + async def test_apply_default_name_vector_index(self, engine): + await engine._ainit_vectorstore_table( + SIMPLE_TABLE, VECTOR_SIZE, overwrite_existing=True + ) + vs = await AsyncPostgresVectorStore.create( + engine, + embedding_service=embeddings_service, + table_name=SIMPLE_TABLE, + ) + await vs.aadd_texts(texts, ids=ids) + await vs.adrop_vector_index() index = HNSWIndex() await vs.aapply_vector_index(index) + assert await vs.is_valid_index() + await vs.adrop_vector_index() + + async def test_aapply_vector_index(self, vs): + await vs.adrop_vector_index(DEFAULT_INDEX_NAME) + index = HNSWIndex(name=DEFAULT_INDEX_NAME) + await vs.aapply_vector_index(index) assert await vs.is_valid_index(DEFAULT_INDEX_NAME) await vs.adrop_vector_index() @@ -115,18 +140,21 @@ async def test_areindex(self, vs): if not await vs.is_valid_index(DEFAULT_INDEX_NAME): index = HNSWIndex() await vs.aapply_vector_index(index) - await vs.areindex() + await vs.areindex(DEFAULT_INDEX_NAME) await vs.areindex(DEFAULT_INDEX_NAME) assert await vs.is_valid_index(DEFAULT_INDEX_NAME) await vs.adrop_vector_index() async def test_dropindex(self, vs): - await vs.adrop_vector_index() + await vs.adrop_vector_index(DEFAULT_INDEX_NAME) result = await vs.is_valid_index(DEFAULT_INDEX_NAME) assert not result async def test_aapply_vector_index_ivfflat(self, vs): - index = IVFFlatIndex(distance_strategy=DistanceStrategy.EUCLIDEAN) + await vs.adrop_vector_index(DEFAULT_INDEX_NAME) + index = IVFFlatIndex( + name=DEFAULT_INDEX_NAME, distance_strategy=DistanceStrategy.EUCLIDEAN + ) await vs.aapply_vector_index(index, concurrently=True) assert await vs.is_valid_index(DEFAULT_INDEX_NAME) index = IVFFlatIndex( @@ -136,8 +164,55 @@ async def test_aapply_vector_index_ivfflat(self, vs): await vs.aapply_vector_index(index) assert await vs.is_valid_index("secondindex") await vs.adrop_vector_index("secondindex") - await vs.adrop_vector_index() + await vs.adrop_vector_index(DEFAULT_INDEX_NAME) async def test_is_valid_index(self, vs): is_valid = await vs.is_valid_index("invalid_index") assert is_valid == False + + async def test_aapply_hybrid_search_index_table_without_tsv_column( + self, engine, vs + ): + # overwriting vs to get a hybrid vs + tsv_index_name = "index_without_tsv_column_" + UUID_STR + vs = await AsyncPostgresVectorStore.create( + engine, + embedding_service=embeddings_service, + table_name=DEFAULT_TABLE, + hybrid_search_config=HybridSearchConfig(index_name=tsv_index_name), + ) + is_valid_index = await vs.is_valid_index(tsv_index_name) + assert is_valid_index == False + await vs.aapply_hybrid_search_index() + assert await vs.is_valid_index(tsv_index_name) + await vs.adrop_vector_index(tsv_index_name) + is_valid_index = await vs.is_valid_index(tsv_index_name) + assert is_valid_index == False + + async def test_aapply_hybrid_search_index_table_with_tsv_column(self, engine): + tsv_index_name = "index_without_tsv_column_" + UUID_STR + config = HybridSearchConfig( + tsv_column="tsv_column", + tsv_lang="pg_catalog.english", + index_name=tsv_index_name, + ) + await engine._ainit_vectorstore_table( + DEFAULT_HYBRID_TABLE, + VECTOR_SIZE, + hybrid_search_config=config, + ) + vs = await AsyncPostgresVectorStore.create( + engine, + embedding_service=embeddings_service, + table_name=DEFAULT_HYBRID_TABLE, + hybrid_search_config=config, + ) + is_valid_index = await vs.is_valid_index(tsv_index_name) + assert is_valid_index == False + await vs.aapply_hybrid_search_index() + assert await vs.is_valid_index(tsv_index_name) + await vs.areindex(tsv_index_name) + assert await vs.is_valid_index(tsv_index_name) + await vs.adrop_vector_index(tsv_index_name) + is_valid_index = await vs.is_valid_index(tsv_index_name) + assert is_valid_index == False diff --git a/tests/test_async_vectorstore_search.py b/tests/test_async_vectorstore_search.py index 418dbbad..7e4effdf 100644 --- a/tests/test_async_vectorstore_search.py +++ b/tests/test_async_vectorstore_search.py @@ -22,15 +22,22 @@ from metadata_filtering_data import FILTERING_TEST_CASES, METADATAS from sqlalchemy import text -from langchain_google_cloud_sql_pg import Column, PostgresEngine +from langchain_google_cloud_sql_pg import ( # type: ignore + Column, + HybridSearchConfig, + PostgresEngine, + reciprocal_rank_fusion, + weighted_sum_ranking, +) from langchain_google_cloud_sql_pg.async_vectorstore import AsyncPostgresVectorStore from langchain_google_cloud_sql_pg.indexes import DistanceStrategy, HNSWQueryOptions DEFAULT_TABLE = "test_table" + str(uuid.uuid4()).replace("-", "_") CUSTOM_TABLE = "test_table_custom" + str(uuid.uuid4()).replace("-", "_") -CUSTOM_FILTER_TABLE = "test_table_custom_filter" + str(uuid.uuid4()).replace("-", "_") +CUSTOM_FILTER_TABLE = "custom_filter" + str(uuid.uuid4()).replace("-", "_") +HYBRID_SEARCH_TABLE1 = "hybrid1" + str(uuid.uuid4()).replace("-", "_") +HYBRID_SEARCH_TABLE2 = "hybrid2" + str(uuid.uuid4()).replace("-", "_") VECTOR_SIZE = 768 -sync_method_exception_str = "Sync methods are not implemented for AsyncPostgresVectorStore. Use PostgresVectorStore interface instead." embeddings_service = DeterministicFakeEmbedding(size=VECTOR_SIZE) @@ -45,6 +52,19 @@ ] embeddings = [embeddings_service.embed_query("foo") for i in range(len(texts))] +# Documents designed for hybrid search testing +hybrid_docs_content = { + "hs_doc_apple_fruit": "An apple is a sweet and edible fruit produced by an apple tree. Apples are very common.", + "hs_doc_apple_tech": "Apple Inc. is a multinational technology company. Their latest tech is amazing.", + "hs_doc_orange_fruit": "The orange is the fruit of various citrus species. Oranges are tasty.", + "hs_doc_generic_tech": "Technology drives innovation in the modern world. Tech is evolving.", + "hs_doc_unrelated_cat": "A fluffy cat sat on a mat quietly observing a mouse.", +} +hybrid_docs = [ + Document(page_content=content, metadata={"doc_id_key": key}) + for key, content in hybrid_docs_content.items() +] + def get_env_var(key: str, desc: str) -> str: v = os.environ.get(key) @@ -92,6 +112,8 @@ async def engine(self, db_project, db_region, db_instance, db_name): await aexecute(engine, f"DROP TABLE IF EXISTS {DEFAULT_TABLE}") await aexecute(engine, f"DROP TABLE IF EXISTS {CUSTOM_TABLE}") await aexecute(engine, f"DROP TABLE IF EXISTS {CUSTOM_FILTER_TABLE}") + await aexecute(engine, f"DROP TABLE IF EXISTS {HYBRID_SEARCH_TABLE1}") + await aexecute(engine, f"DROP TABLE IF EXISTS {HYBRID_SEARCH_TABLE2}") await engine.close() @pytest_asyncio.fixture(scope="class") @@ -170,11 +192,54 @@ async def vs_custom_filter(self, engine): await vs_custom_filter.aadd_documents(filter_docs, ids=ids) yield vs_custom_filter + @pytest_asyncio.fixture(scope="class") + async def vs_hybrid_search_with_tsv_column(self, engine): + hybrid_search_config = HybridSearchConfig( + tsv_column="mycontent_tsv", + tsv_lang="pg_catalog.english", + fts_query="my_fts_query", + fusion_function=reciprocal_rank_fusion, + fusion_function_parameters={ + "rrf_k": 60, + "fetch_top_k": 10, + }, + ) + await engine._ainit_vectorstore_table( + HYBRID_SEARCH_TABLE1, + VECTOR_SIZE, + id_column=Column("myid", "TEXT"), + content_column="mycontent", + embedding_column="myembedding", + metadata_columns=[ + Column("page", "TEXT"), + Column("source", "TEXT"), + Column("doc_id_key", "TEXT"), + ], + metadata_json_column="mymetadata", # ignored + store_metadata=False, + hybrid_search_config=hybrid_search_config, + ) + + vs_custom = await AsyncPostgresVectorStore.create( + engine, + embedding_service=embeddings_service, + table_name=HYBRID_SEARCH_TABLE1, + id_column="myid", + content_column="mycontent", + embedding_column="myembedding", + metadata_json_column="mymetadata", + metadata_columns=["doc_id_key"], + index_query_options=HNSWQueryOptions(ef_search=1), + hybrid_search_config=hybrid_search_config, + ) + await vs_custom.aadd_documents(hybrid_docs) + yield vs_custom + async def test_asimilarity_search(self, vs): results = await vs.asimilarity_search("foo", k=1) assert len(results) == 1 assert results == [Document(page_content="foo", id=ids[0])] - results = await vs.asimilarity_search("foo", k=1, filter="content = 'bar'") + results = await vs.asimilarity_search("foo", k=1, filter={"content": "bar"}) assert results == [Document(page_content="bar", id=ids[1])] async def test_asimilarity_search_score(self, vs): @@ -242,7 +307,7 @@ async def test_amax_marginal_relevance_search(self, vs): results = await vs.amax_marginal_relevance_search("bar") assert results[0] == Document(page_content="bar", id=ids[1]) results = await vs.amax_marginal_relevance_search( - "bar", filter="content = 'boo'" + "bar", filter={"content": "boo"} ) assert results[0] == Document(page_content="boo", id=ids[3]) @@ -268,7 +333,7 @@ async def test_similarity_search(self, vs_custom): assert len(results) == 1 assert results == [Document(page_content="foo", id=ids[0])] results = await vs_custom.asimilarity_search( - "foo", k=1, filter="mycontent = 'bar'" + "foo", k=1, filter={"mycontent": "bar"} ) assert results == [Document(page_content="bar", id=ids[1])] @@ -291,7 +356,7 @@ async def test_max_marginal_relevance_search(self, vs_custom): results = await vs_custom.amax_marginal_relevance_search("bar") assert results[0] == Document(page_content="bar", id=ids[1]) results = await vs_custom.amax_marginal_relevance_search( - "bar", filter="mycontent = 'boo'" + "bar", filter={"mycontent": "boo"} ) assert results[0] == Document(page_content="boo", id=ids[3]) @@ -326,7 +391,7 @@ async def test_aget_by_ids_custom_vs(self, vs_custom): def test_get_by_ids(self, vs): test_ids = [ids[0]] - with pytest.raises(Exception, match=sync_method_exception_str): + with pytest.raises(Exception): vs.get_by_ids(ids=test_ids) @pytest.mark.parametrize("test_filter, expected_ids", FILTERING_TEST_CASES) @@ -341,3 +406,319 @@ async def test_vectorstore_with_metadata_filters( "meow", k=5, filter=test_filter ) assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter + + async def test_asimilarity_hybrid_search_rrk(self, vs): + results = await vs.asimilarity_search( + "foo", + k=1, + hybrid_search_config=HybridSearchConfig( + fusion_function=reciprocal_rank_fusion + ), + ) + assert len(results) == 1 + assert results == [Document(page_content="foo", id=ids[0])] + + results = await vs.asimilarity_search( + "bar", + k=1, + filter={"content": {"$ne": "baz"}}, + hybrid_search_config=HybridSearchConfig( + fusion_function=reciprocal_rank_fusion, + fusion_function_parameters={ + "rrf_k": 100, + "fetch_top_k": 10, + }, + primary_top_k=1, + secondary_top_k=1, + ), + ) + assert results == [Document(page_content="bar", id=ids[1])] + + async def test_hybrid_search_weighted_sum_default( + self, vs_hybrid_search_with_tsv_column + ): + """Test hybrid search with default weighted sum (0.5 vector, 0.5 FTS).""" + query = "apple" # Should match "apple" in FTS and vector + + # The vs_hybrid_search_with_tsv_column instance is already configured for hybrid search. + # Default fusion is weighted_sum_ranking with 0.5/0.5 weights. + # fts_query will default to the main query. + results_with_scores = ( + await vs_hybrid_search_with_tsv_column.asimilarity_search_with_score( + query, k=3 + ) + ) + + assert len(results_with_scores) > 1 + result_ids = [doc.metadata["doc_id_key"] for doc, score in results_with_scores] + + # Expect "hs_doc_apple_fruit" and "hs_doc_apple_tech" to be highly ranked. + assert "hs_doc_apple_fruit" in result_ids + + # Scores should be floats (fused scores) + for doc, score in results_with_scores: + assert isinstance(score, float) + + # Check if sorted by score (descending for weighted_sum_ranking with positive scores) + assert results_with_scores[0][1] >= results_with_scores[1][1] + + async def test_hybrid_search_weighted_sum_vector_bias( + self, vs_hybrid_search_with_tsv_column + ): + """Test weighted sum with higher weight for vector results.""" + query = "Apple Inc technology" # More specific for vector similarity + + config = HybridSearchConfig( + tsv_column="mycontent_tsv", # Must match table setup + fusion_function_parameters={ + "primary_results_weight": 0.8, # Vector bias + "secondary_results_weight": 0.2, + }, + # fts_query will default to main query + ) + results = await vs_hybrid_search_with_tsv_column.asimilarity_search( + query, k=2, hybrid_search_config=config + ) + result_ids = [doc.metadata["doc_id_key"] for doc in results] + + assert len(result_ids) > 0 + assert result_ids[0] == "hs_doc_orange_fruit" + + async def test_hybrid_search_weighted_sum_fts_bias( + self, vs_hybrid_search_with_tsv_column + ): + """Test weighted sum with higher weight for FTS results.""" + query = "fruit common tasty" # Strong FTS signal for fruit docs + + config = HybridSearchConfig( + tsv_column="mycontent_tsv", + fusion_function=weighted_sum_ranking, + fusion_function_parameters={ + "primary_results_weight": 0.01, + "secondary_results_weight": 0.99, # FTS bias + }, + ) + results = await vs_hybrid_search_with_tsv_column.asimilarity_search( + query, k=2, hybrid_search_config=config + ) + result_ids = [doc.metadata["doc_id_key"] for doc in results] + + assert len(result_ids) == 2 + assert "hs_doc_apple_fruit" in result_ids + + async def test_hybrid_search_reciprocal_rank_fusion( + self, vs_hybrid_search_with_tsv_column + ): + """Test hybrid search with Reciprocal Rank Fusion.""" + query = "technology company" + + # Configure RRF. primary_top_k and secondary_top_k control inputs to fusion. + # fusion_function_parameters.fetch_top_k controls output count from RRF. + config = HybridSearchConfig( + tsv_column="mycontent_tsv", + fusion_function=reciprocal_rank_fusion, + primary_top_k=3, # How many dense results to consider + secondary_top_k=3, # How many sparse results to consider + fusion_function_parameters={ + "rrf_k": 60, + "fetch_top_k": 2, + }, # RRF specific params + ) + # The `k` in asimilarity_search here is the final desired number of results, + # which should align with fusion_function_parameters.fetch_top_k for RRF. + results = await vs_hybrid_search_with_tsv_column.asimilarity_search( + query, k=2, hybrid_search_config=config + ) + result_ids = [doc.metadata["doc_id_key"] for doc in results] + + assert len(result_ids) == 2 + # "hs_doc_apple_tech" (FTS: technology, company; Vector: Apple Inc technology) + # "hs_doc_generic_tech" (FTS: technology; Vector: Technology drives innovation) + # RRF should combine these ranks. "hs_doc_apple_tech" is likely higher. + assert "hs_doc_apple_tech" in result_ids + assert result_ids[0] == "hs_doc_apple_tech" # Stronger combined signal + + async def test_hybrid_search_explicit_fts_query( + self, vs_hybrid_search_with_tsv_column + ): + """Test hybrid search when fts_query in HybridSearchConfig is different from main query.""" + main_vector_query = "Apple Inc." # For vector search + fts_specific_query = "fruit" # For FTS + + config = HybridSearchConfig( + tsv_column="mycontent_tsv", + fts_query=fts_specific_query, # Override FTS query + fusion_function_parameters={ # Using default weighted_sum_ranking + "primary_results_weight": 0.5, + "secondary_results_weight": 0.5, + }, + ) + results = await vs_hybrid_search_with_tsv_column.asimilarity_search( + main_vector_query, k=2, hybrid_search_config=config + ) + result_ids = [doc.metadata["doc_id_key"] for doc in results] + + # Vector search for "Apple Inc.": hs_doc_apple_tech + # FTS search for "fruit": hs_doc_apple_fruit, hs_doc_orange_fruit + # Combined: hs_doc_apple_fruit (strong FTS) and hs_doc_apple_tech (strong vector) are candidates. + # "hs_doc_apple_fruit" might get a boost if "Apple Inc." vector has some similarity to "apple fruit" doc. + assert len(result_ids) > 0 + assert ( + "hs_doc_apple_fruit" in result_ids + or "hs_doc_apple_tech" in result_ids + or "hs_doc_orange_fruit" in result_ids + ) + + async def test_hybrid_search_with_filter(self, vs_hybrid_search_with_tsv_column): + """Test hybrid search with a metadata filter applied.""" + query = "apple" + # Filter to only include "tech" related apple docs using metadata + # Assuming metadata_columns=["doc_id_key"] was set up for vs_hybrid_search_with_tsv_column + doc_filter = {"doc_id_key": {"$eq": "hs_doc_apple_tech"}} + + config = HybridSearchConfig( + tsv_column="mycontent_tsv", + ) + results = await vs_hybrid_search_with_tsv_column.asimilarity_search( + query, k=2, filter=doc_filter, hybrid_search_config=config + ) + result_ids = [doc.metadata["doc_id_key"] for doc in results] + + assert len(results) == 1 + assert result_ids[0] == "hs_doc_apple_tech" + + async def test_hybrid_search_fts_empty_results( + self, vs_hybrid_search_with_tsv_column + ): + """Test when FTS query yields no results, should fall back to vector search.""" + vector_query = "apple" + no_match_fts_query = "zzyyxx_gibberish_term_for_fts_nomatch" + + config = HybridSearchConfig( + tsv_column="mycontent_tsv", + fts_query=no_match_fts_query, + fusion_function_parameters={ + "primary_results_weight": 0.6, + "secondary_results_weight": 0.4, + }, + ) + results = await vs_hybrid_search_with_tsv_column.asimilarity_search( + vector_query, k=2, hybrid_search_config=config + ) + result_ids = [doc.metadata["doc_id_key"] for doc in results] + + # Expect results based purely on vector search for "apple" + assert len(result_ids) > 0 + assert "hs_doc_apple_fruit" in result_ids or "hs_doc_apple_tech" in result_ids + # The top result should be one of the apple documents based on vector search + assert results[0].metadata["doc_id_key"].startswith("hs_doc_unrelated_cat") + + async def test_hybrid_search_vector_empty_results_effectively( + self, vs_hybrid_search_with_tsv_column + ): + """Test when vector query is very dissimilar to docs, should rely on FTS.""" + # This is hard to guarantee with fake embeddings, but we try. + # A better way might be to use a filter that excludes all docs for the vector part, + # but filters are applied to both. + vector_query_far_off = "supercalifragilisticexpialidocious_vector_nomatch" + fts_query_match = "orange fruit" # Should match hs_doc_orange_fruit + + config = HybridSearchConfig( + tsv_column="mycontent_tsv", + fts_query=fts_query_match, + fusion_function_parameters={ + "primary_results_weight": 0.4, + "secondary_results_weight": 0.6, + }, + ) + results = await vs_hybrid_search_with_tsv_column.asimilarity_search( + vector_query_far_off, k=1, hybrid_search_config=config + ) + result_ids = [doc.metadata["doc_id_key"] for doc in results] + + # Expect results based purely on FTS search for "orange fruit" + assert len(result_ids) == 1 + assert result_ids[0] == "hs_doc_generic_tech" + + async def test_hybrid_search_without_tsv_column(self, engine): + """Test hybrid search without a TSV column.""" + # This is hard to guarantee with fake embeddings, but we try. + # A better way might be to use a filter that excludes all docs for the vector part, + # but filters are applied to both. + vector_query_far_off = "apple iphone tech is better designed than macs" + fts_query_match = "apple fruit" + + config = HybridSearchConfig( + tsv_column="mycontent_tsv", + fts_query=fts_query_match, + fusion_function_parameters={ + "primary_results_weight": 0.1, + "secondary_results_weight": 0.9, + }, + ) + await engine._ainit_vectorstore_table( + HYBRID_SEARCH_TABLE2, + VECTOR_SIZE, + id_column=Column("myid", "TEXT"), + content_column="mycontent", + embedding_column="myembedding", + metadata_columns=[ + Column("page", "TEXT"), + Column("source", "TEXT"), + Column("doc_id_key", "TEXT"), + ], + store_metadata=False, + hybrid_search_config=config, + ) + + vs_with_tsv_column = await AsyncPostgresVectorStore.create( + engine, + embedding_service=embeddings_service, + table_name=HYBRID_SEARCH_TABLE2, + id_column="myid", + content_column="mycontent", + embedding_column="myembedding", + metadata_columns=["doc_id_key"], + index_query_options=HNSWQueryOptions(ef_search=1), + hybrid_search_config=config, + ) + await vs_with_tsv_column.aadd_documents(hybrid_docs) + + config = HybridSearchConfig( + tsv_column="", # no TSV column + fts_query=fts_query_match, + fusion_function_parameters={ + "primary_results_weight": 0.9, + "secondary_results_weight": 0.1, + }, + ) + vs_without_tsv_column = await AsyncPostgresVectorStore.create( + engine, + embedding_service=embeddings_service, + table_name=HYBRID_SEARCH_TABLE2, + id_column="myid", + content_column="mycontent", + embedding_column="myembedding", + metadata_columns=["doc_id_key"], + index_query_options=HNSWQueryOptions(ef_search=1), + hybrid_search_config=config, + ) + + results_with_tsv_column = await vs_with_tsv_column.asimilarity_search( + vector_query_far_off, k=1, hybrid_search_config=config + ) + results_without_tsv_column = await vs_without_tsv_column.asimilarity_search( + vector_query_far_off, k=1, hybrid_search_config=config + ) + result_ids_with_tsv_column = [ + doc.metadata["doc_id_key"] for doc in results_with_tsv_column + ] + result_ids_without_tsv_column = [ + doc.metadata["doc_id_key"] for doc in results_without_tsv_column + ] + + # Expect results based purely on FTS search for "orange fruit" + assert len(result_ids_with_tsv_column) == 1 + assert len(result_ids_without_tsv_column) == 1 + assert result_ids_with_tsv_column[0] == "hs_doc_apple_tech" + assert result_ids_without_tsv_column[0] == "hs_doc_apple_tech" diff --git a/tests/test_engine.py b/tests/test_engine.py index eaf1b617..4a34c575 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -27,16 +27,18 @@ from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.pool import NullPool -from langchain_google_cloud_sql_pg import Column, PostgresEngine +from langchain_google_cloud_sql_pg import Column, HybridSearchConfig, PostgresEngine DEFAULT_TABLE = "test_table" + str(uuid.uuid4()).replace("-", "_") CUSTOM_TABLE = "test_table_custom" + str(uuid.uuid4()).replace("-", "_") INT_ID_CUSTOM_TABLE = "test_table_custom_int_id" + str(uuid.uuid4()).replace("-", "_") +HYBRID_SEARCH_TABLE = "hybrid" + str(uuid.uuid4()).replace("-", "_") DEFAULT_TABLE_SYNC = "test_table" + str(uuid.uuid4()).replace("-", "_") CUSTOM_TABLE_SYNC = "test_table_custom" + str(uuid.uuid4()).replace("-", "_") INT_ID_CUSTOM_TABLE_SYNC = "test_table_custom_int_id" + str(uuid.uuid4()).replace( "-", "_" ) +HYBRID_SEARCH_TABLE_SYNC = "hybrid_sync" + str(uuid.uuid4()).replace("-", "_") VECTOR_SIZE = 768 embeddings_service = DeterministicFakeEmbedding(size=VECTOR_SIZE) @@ -120,6 +122,7 @@ async def engine(self, db_project, db_region, db_instance, db_name): await aexecute(engine, f'DROP TABLE "{CUSTOM_TABLE}"') await aexecute(engine, f'DROP TABLE "{DEFAULT_TABLE}"') await aexecute(engine, f'DROP TABLE "{INT_ID_CUSTOM_TABLE}"') + await aexecute(engine, f'DROP TABLE "{HYBRID_SEARCH_TABLE}"') await engine.close() async def test_engine_args(self, engine): @@ -360,6 +363,31 @@ async def test_ainit_checkpoint_writes_table(self, engine): await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name}"') await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name_writes}"') + async def test_init_table_hybrid_search(self, engine): + await engine.ainit_vectorstore_table( + HYBRID_SEARCH_TABLE, + VECTOR_SIZE, + id_column="uuid", + content_column="my-content", + embedding_column="my_embedding", + metadata_columns=[Column("page", "TEXT"), Column("source", "TEXT")], + store_metadata=True, + hybrid_search_config=HybridSearchConfig(), + ) + stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{HYBRID_SEARCH_TABLE}';" + results = await afetch(engine, stmt) + expected = [ + {"column_name": "uuid", "data_type": "uuid"}, + {"column_name": "my_embedding", "data_type": "USER-DEFINED"}, + {"column_name": "langchain_metadata", "data_type": "json"}, + {"column_name": "my-content", "data_type": "text"}, + {"column_name": "my-content_tsv", "data_type": "tsvector"}, + {"column_name": "page", "data_type": "text"}, + {"column_name": "source", "data_type": "text"}, + ] + for row in results: + assert row in expected + @pytest.mark.asyncio(scope="module") class TestEngineSync: @@ -403,6 +431,7 @@ async def engine(self, db_project, db_region, db_instance, db_name): await aexecute(engine, f'DROP TABLE "{CUSTOM_TABLE_SYNC}"') await aexecute(engine, f'DROP TABLE "{DEFAULT_TABLE_SYNC}"') await aexecute(engine, f'DROP TABLE "{INT_ID_CUSTOM_TABLE_SYNC}"') + await aexecute(engine, f'DROP TABLE "{HYBRID_SEARCH_TABLE_SYNC}"') await engine.close() async def test_init_table(self, engine): @@ -547,3 +576,28 @@ async def test_init_checkpoints_table(self, engine): assert row in expected await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name}"') await aexecute(engine, f'DROP TABLE IF EXISTS "{table_name_writes}"') + + async def test_init_table_hybrid_search(self, engine): + engine.init_vectorstore_table( + HYBRID_SEARCH_TABLE_SYNC, + VECTOR_SIZE, + id_column="uuid", + content_column="my-content", + embedding_column="my_embedding", + metadata_columns=[Column("page", "TEXT"), Column("source", "TEXT")], + store_metadata=True, + hybrid_search_config=HybridSearchConfig(), + ) + stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{HYBRID_SEARCH_TABLE_SYNC}';" + results = await afetch(engine, stmt) + expected = [ + {"column_name": "uuid", "data_type": "uuid"}, + {"column_name": "my_embedding", "data_type": "USER-DEFINED"}, + {"column_name": "langchain_metadata", "data_type": "json"}, + {"column_name": "my-content", "data_type": "text"}, + {"column_name": "my-content_tsv", "data_type": "tsvector"}, + {"column_name": "page", "data_type": "text"}, + {"column_name": "source", "data_type": "text"}, + ] + for row in results: + assert row in expected diff --git a/tests/test_standard_test_suite.py b/tests/test_standard_test_suite.py index 19c77128..2a853bdd 100644 --- a/tests/test_standard_test_suite.py +++ b/tests/test_standard_test_suite.py @@ -23,8 +23,8 @@ from langchain_google_cloud_sql_pg import Column, PostgresEngine, PostgresVectorStore -DEFAULT_TABLE = "test_table_standard_test_suite" + str(uuid.uuid4()) -DEFAULT_TABLE_SYNC = "test_table_sync_standard_test_suite" + str(uuid.uuid4()) +DEFAULT_TABLE = "test_table" + str(uuid.uuid4()) +DEFAULT_TABLE_SYNC = "test_table_sync" + str(uuid.uuid4()) def get_env_var(key: str, desc: str) -> str: diff --git a/tests/test_vectorstore.py b/tests/test_vectorstore.py index 7995cd63..4e82cab6 100644 --- a/tests/test_vectorstore.py +++ b/tests/test_vectorstore.py @@ -31,7 +31,7 @@ DEFAULT_TABLE = "test_table" + str(uuid.uuid4()) DEFAULT_TABLE_SYNC = "test_table_sync" + str(uuid.uuid4()) -CUSTOM_TABLE = "test-table-custom" + str(uuid.uuid4()) +CUSTOM_TABLE = "custom" + str(uuid.uuid4()) VECTOR_SIZE = 768 embeddings_service = DeterministicFakeEmbedding(size=VECTOR_SIZE) diff --git a/tests/test_vectorstore_from_methods.py b/tests/test_vectorstore_from_methods.py index fadf8fc1..5d054dfb 100644 --- a/tests/test_vectorstore_from_methods.py +++ b/tests/test_vectorstore_from_methods.py @@ -29,10 +29,8 @@ DEFAULT_TABLE = "test_table" + str(uuid.uuid4()).replace("-", "_") DEFAULT_TABLE_SYNC = "test_table_sync" + str(uuid.uuid4()).replace("-", "_") CUSTOM_TABLE = "test_table_custom" + str(uuid.uuid4()).replace("-", "_") -CUSTOM_TABLE_WITH_INT_ID = "test_table_with_int_id" + str(uuid.uuid4()).replace( - "-", "_" -) -CUSTOM_TABLE_WITH_INT_ID_SYNC = "test_table_with_int_id" + str(uuid.uuid4()).replace( +CUSTOM_TABLE_WITH_INT_ID = "test_table_int_id" + str(uuid.uuid4()).replace("-", "_") +CUSTOM_TABLE_WITH_INT_ID_SYNC = "test_table_int_id" + str(uuid.uuid4()).replace( "-", "_" ) VECTOR_SIZE = 768 diff --git a/tests/test_vectorstore_index.py b/tests/test_vectorstore_index.py index cb797219..72a99b00 100644 --- a/tests/test_vectorstore_index.py +++ b/tests/test_vectorstore_index.py @@ -31,8 +31,8 @@ IVFFlatIndex, ) -DEFAULT_TABLE = "test_table" + str(uuid.uuid4()).replace("-", "_") -CUSTOM_TABLE = "test_table_custom" + str(uuid.uuid4()).replace("-", "_") +DEFAULT_TABLE = "table" + str(uuid.uuid4()).replace("-", "_") +CUSTOM_TABLE = "custom" + str(uuid.uuid4()).replace("-", "_") DEFAULT_INDEX_NAME = DEFAULT_TABLE + DEFAULT_INDEX_NAME_SUFFIX VECTOR_SIZE = 768 @@ -120,7 +120,7 @@ async def test_areindex(self, vs): if not vs.is_valid_index(DEFAULT_INDEX_NAME): index = HNSWIndex() vs.apply_vector_index(index) - vs.reindex() + vs.reindex(DEFAULT_INDEX_NAME) vs.reindex(DEFAULT_INDEX_NAME) assert vs.is_valid_index(DEFAULT_INDEX_NAME) vs.drop_vector_index(DEFAULT_INDEX_NAME) @@ -201,7 +201,7 @@ async def test_areindex(self, vs): if not await vs.ais_valid_index(DEFAULT_INDEX_NAME): index = HNSWIndex() await vs.aapply_vector_index(index) - await vs.areindex() + await vs.areindex(DEFAULT_INDEX_NAME) await vs.areindex(DEFAULT_INDEX_NAME) assert await vs.ais_valid_index(DEFAULT_INDEX_NAME) await vs.adrop_vector_index(DEFAULT_INDEX_NAME) diff --git a/tests/test_vectorstore_search.py b/tests/test_vectorstore_search.py index ae1341ed..963bc41b 100644 --- a/tests/test_vectorstore_search.py +++ b/tests/test_vectorstore_search.py @@ -22,16 +22,21 @@ from metadata_filtering_data import FILTERING_TEST_CASES, METADATAS, NEGATIVE_TEST_CASES from sqlalchemy import text -from langchain_google_cloud_sql_pg import Column, PostgresEngine, PostgresVectorStore +from langchain_google_cloud_sql_pg import ( # type: ignore + Column, + HybridSearchConfig, + PostgresEngine, + PostgresVectorStore, + reciprocal_rank_fusion, + weighted_sum_ranking, +) from langchain_google_cloud_sql_pg.indexes import DistanceStrategy, HNSWQueryOptions -DEFAULT_TABLE = "test_table" + str(uuid.uuid4()).replace("-", "_") -CUSTOM_TABLE = "test_table_custom" + str(uuid.uuid4()).replace("-", "_") -CUSTOM_TABLE_SYNC = "test_table_sync" + str(uuid.uuid4()).replace("-", "_") -CUSTOM_FILTER_TABLE = "test_table_custom_filter" + str(uuid.uuid4()).replace("-", "_") -CUSTOM_FILTER_TABLE_SYNC = "test_table_custom_filter_sync" + str(uuid.uuid4()).replace( - "-", "_" -) +DEFAULT_TABLE = "default" + str(uuid.uuid4()).replace("-", "_") +CUSTOM_TABLE = "custom" + str(uuid.uuid4()).replace("-", "_") +CUSTOM_TABLE_SYNC = "custom_sync" + str(uuid.uuid4()).replace("-", "_") +CUSTOM_FILTER_TABLE = "custom_filter" + str(uuid.uuid4()).replace("-", "_") +CUSTOM_FILTER_TABLE_SYNC = "custom_filter_sync" + str(uuid.uuid4()).replace("-", "_") VECTOR_SIZE = 768 embeddings_service = DeterministicFakeEmbedding(size=VECTOR_SIZE) @@ -191,7 +196,7 @@ async def test_asimilarity_search(self, vs): results = await vs.asimilarity_search("foo", k=1) assert len(results) == 1 assert results == [Document(page_content="foo", id=ids[0])] - results = await vs.asimilarity_search("foo", k=1, filter="content = 'bar'") + results = await vs.asimilarity_search("foo", k=1, filter={"content": "bar"}) assert results == [Document(page_content="bar", id=ids[1])] async def test_asimilarity_search_score(self, vs): @@ -252,7 +257,7 @@ async def test_amax_marginal_relevance_search(self, vs): results = await vs.amax_marginal_relevance_search("bar") assert results[0] == Document(page_content="bar", id=ids[1]) results = await vs.amax_marginal_relevance_search( - "bar", filter="content = 'boo'" + "bar", filter={"content": "boo"} ) assert results[0] == Document(page_content="boo", id=ids[3]) @@ -298,6 +303,37 @@ async def test_vectorstore_with_metadata_filters( ) assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter + async def test_asimilarity_hybrid_search(self, vs): + results = await vs.asimilarity_search( + "foo", k=1, hybrid_search_config=HybridSearchConfig() + ) + assert len(results) == 1 + assert results == [Document(page_content="foo", id=ids[0])] + + results = await vs.asimilarity_search( + "bar", + k=1, + hybrid_search_config=HybridSearchConfig(), + ) + assert results[0] == Document(page_content="bar", id=ids[1]) + + results = await vs.asimilarity_search( + "foo", + k=1, + filter={"content": {"$ne": "baz"}}, + hybrid_search_config=HybridSearchConfig( + fusion_function=weighted_sum_ranking, + fusion_function_parameters={ + "primary_results_weight": 0.1, + "secondary_results_weight": 0.9, + "fetch_top_k": 10, + }, + primary_top_k=1, + secondary_top_k=1, + ), + ) + assert results == [Document(page_content="foo", id=ids[0])] + class TestVectorStoreSearchSync: @pytest.fixture(scope="module") @@ -398,7 +434,7 @@ def test_similarity_search(self, vs_custom): results = vs_custom.similarity_search("foo", k=1) assert len(results) == 1 assert results == [Document(page_content="foo", id=ids[0])] - results = vs_custom.similarity_search("foo", k=1, filter="mycontent = 'bar'") + results = vs_custom.similarity_search("foo", k=1, filter={"mycontent": "bar"}) assert results == [Document(page_content="bar", id=ids[1])] def test_similarity_search_score(self, vs_custom): @@ -420,7 +456,7 @@ def test_max_marginal_relevance_search(self, vs_custom): results = vs_custom.max_marginal_relevance_search("bar") assert results[0] == Document(page_content="bar", id=ids[1]) results = vs_custom.max_marginal_relevance_search( - "bar", filter="mycontent = 'boo'" + "bar", filter={"mycontent": "boo"} ) assert results[0] == Document(page_content="boo", id=ids[3]) @@ -465,3 +501,27 @@ def test_metadata_filter_negative_tests(self, vs_custom_filter_sync, test_filter docs = vs_custom_filter_sync.similarity_search( "meow", k=5, filter=test_filter ) + + def test_similarity_hybrid_search(self, vs_custom): + results = vs_custom.similarity_search( + "foo", k=1, hybrid_search_config=HybridSearchConfig() + ) + assert len(results) == 1 + assert results == [Document(page_content="foo", id=ids[0])] + + results = vs_custom.similarity_search( + "bar", + k=1, + hybrid_search_config=HybridSearchConfig(), + ) + assert results == [Document(page_content="bar", id=ids[1])] + + results = vs_custom.similarity_search( + "foo", + k=1, + filter={"mycontent": {"$ne": "baz"}}, + hybrid_search_config=HybridSearchConfig( + fusion_function=reciprocal_rank_fusion + ), + ) + assert results == [Document(page_content="foo", id=ids[0])]