-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding Azure CosmosDB Mongo vCore as a datastore. (#379)
* Adding mongo vCore as a datastore * updating the readme file
- Loading branch information
1 parent
13a0b03
commit b3459b1
Showing
8 changed files
with
828 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,277 @@ | ||
import logging | ||
import os | ||
|
||
import certifi | ||
import numpy as np | ||
import pymongo | ||
|
||
from pymongo.mongo_client import MongoClient | ||
from abc import ABC, abstractmethod | ||
|
||
from typing import Dict, List, Optional | ||
from datetime import datetime | ||
from datastore.datastore import DataStore | ||
from models.models import ( | ||
DocumentChunk, | ||
DocumentMetadataFilter, | ||
DocumentChunkWithScore, | ||
DocumentMetadataFilter, | ||
QueryResult, | ||
QueryWithEmbedding, | ||
) | ||
from services.date import to_unix_timestamp | ||
|
||
|
||
# Read environment variables for CosmosDB Mongo vCore | ||
AZCOSMOS_API = os.environ.get("AZCOSMOS_API", "mongo-vcore") | ||
AZCOSMOS_CONNSTR = os.environ.get("AZCOSMOS_CONNSTR") | ||
AZCOSMOS_DATABASE_NAME = os.environ.get("AZCOSMOS_DATABASE_NAME") | ||
AZCOSMOS_CONTAINER_NAME = os.environ.get("AZCOSMOS_CONTAINER_NAME") | ||
assert AZCOSMOS_API is not None | ||
assert AZCOSMOS_CONNSTR is not None | ||
assert AZCOSMOS_DATABASE_NAME is not None | ||
assert AZCOSMOS_CONTAINER_NAME is not None | ||
|
||
# OpenAI Ada Embeddings Dimension | ||
VECTOR_DIMENSION = 1536 | ||
|
||
|
||
# Abstract class similar to the original data store that allows API level abstraction | ||
class AzureCosmosDBStoreApi(ABC): | ||
@abstractmethod | ||
async def ensure(self, num_lists, similarity): | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
async def upsert_core(self, docId: str, chunks: List[DocumentChunk]) -> List[str]: | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
async def query_core(self, query: QueryWithEmbedding) -> List[DocumentChunkWithScore]: | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
async def drop_container(self): | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
async def delete_filter(self, filter: DocumentMetadataFilter): | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
async def delete_ids(self, ids: List[str]): | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
async def delete_document_ids(self, documentIds: List[str]): | ||
raise NotImplementedError | ||
|
||
|
||
class MongoStoreApi(AzureCosmosDBStoreApi): | ||
def __init__(self, mongoClient: MongoClient): | ||
self.mongoClient = mongoClient | ||
|
||
@staticmethod | ||
def _get_metadata_filter(filter: DocumentMetadataFilter) -> dict: | ||
returnedFilter: dict = {} | ||
if filter.document_id is not None: | ||
returnedFilter["document_id"] = filter.document_id | ||
if filter.author is not None: | ||
returnedFilter["metadata.author"] = filter.author | ||
if filter.start_date is not None: | ||
returnedFilter["metadata.created_at"] = {"$gt": datetime.fromisoformat(filter.start_date)} | ||
if filter.end_date is not None: | ||
returnedFilter["metadata.created_at"] = {"$lt": datetime.fromisoformat(filter.end_date)} | ||
if filter.source is not None: | ||
returnedFilter["metadata.source"] = filter.source | ||
if filter.source_id is not None: | ||
returnedFilter["metadata.source_id"] = filter.source_id | ||
return returnedFilter | ||
|
||
async def ensure(self, num_lists, similarity): | ||
assert self.mongoClient.is_mongos | ||
self.collection = self.mongoClient[AZCOSMOS_DATABASE_NAME][AZCOSMOS_CONTAINER_NAME] | ||
|
||
indexes = self.collection.index_information() | ||
if indexes.get("embedding_cosmosSearch") is None: | ||
# Ensure the vector index exists. | ||
indexDefs: List[any] = [ | ||
{ | ||
"name": "embedding_cosmosSearch", | ||
"key": {"embedding": "cosmosSearch"}, | ||
"cosmosSearchOptions": { | ||
"kind": "vector-ivf", | ||
"numLists": num_lists, | ||
"similarity": similarity, | ||
"dimensions": VECTOR_DIMENSION, | ||
}, | ||
} | ||
] | ||
self.mongoClient[AZCOSMOS_DATABASE_NAME].command("createIndexes", AZCOSMOS_CONTAINER_NAME, | ||
indexes=indexDefs) | ||
|
||
async def upsert_core(self, docId: str, chunks: List[DocumentChunk]) -> List[str]: | ||
# Until nested doc embedding support is done, treat each chunk as a separate doc. | ||
doc_ids: List[str] = [] | ||
for chunk in chunks: | ||
finalDocChunk: dict = { | ||
"_id": f"doc:{docId}:chunk:{chunk.id}", | ||
"document_id": docId, | ||
'embedding': chunk.embedding, | ||
"text": chunk.text, | ||
"metadata": chunk.metadata.__dict__ | ||
} | ||
|
||
if chunk.metadata.created_at is not None: | ||
finalDocChunk["metadata"]["created_at"] = datetime.fromisoformat(chunk.metadata.created_at) | ||
self.collection.insert_one(finalDocChunk) | ||
doc_ids.append(finalDocChunk["_id"]) | ||
return doc_ids | ||
|
||
async def query_core(self, query: QueryWithEmbedding) -> List[DocumentChunkWithScore]: | ||
pipeline = [ | ||
{ | ||
"$search": { | ||
"cosmosSearch": { | ||
"vector": query.embedding, | ||
"path": "embedding", | ||
"k": query.top_k}, | ||
"returnStoredSource": True} | ||
}, | ||
{ | ||
"$project": { | ||
"similarityScore": { | ||
"$meta": "searchScore" | ||
}, | ||
"document": "$$ROOT" | ||
} | ||
} | ||
] | ||
|
||
# TODO: Add in match filter (once it can be satisfied). | ||
# Perform vector search | ||
query_results: List[DocumentChunkWithScore] = [] | ||
for aggResult in self.collection.aggregate(pipeline): | ||
finalMetadata = aggResult["document"]["metadata"] | ||
if finalMetadata["created_at"] is not None: | ||
finalMetadata["created_at"] = datetime.isoformat(finalMetadata["created_at"]) | ||
result = DocumentChunkWithScore( | ||
id=aggResult["_id"], | ||
score=aggResult["similarityScore"], | ||
text=aggResult["document"]["text"], | ||
metadata=finalMetadata | ||
) | ||
query_results.append(result) | ||
return query_results | ||
|
||
async def drop_container(self): | ||
self.collection.drop() | ||
|
||
async def delete_filter(self, filter: DocumentMetadataFilter): | ||
delete_filter = self._get_metadata_filter(filter) | ||
self.collection.delete_many(delete_filter) | ||
|
||
async def delete_ids(self, ids: List[str]): | ||
self.collection.delete_many({"_id": {"$in": ids}}) | ||
|
||
async def delete_document_ids(self, documentIds: List[str]): | ||
self.collection.delete_many({"document_id": {"$in": documentIds}}) | ||
|
||
|
||
# Datastore implementation. | ||
""" | ||
A class representing a memory store for Azure CosmosDB DataStore, currently only supports Mongo vCore | ||
""" | ||
class AzureCosmosDBDataStore(DataStore): | ||
def __init__(self, cosmosStore: AzureCosmosDBStoreApi): | ||
self.cosmosStore = cosmosStore | ||
|
||
""" | ||
Creates a new datastore based on the Cosmos Api provided in the environment variables, | ||
only supports Mongo vCore for now | ||
Args: | ||
numLists (int) : This integer is the number of clusters that the inverted file (IVF) index | ||
uses to group the vector data. We recommend that numLists is set to | ||
documentCount/1000 for up to 1 million documents and to sqrt(documentCount) | ||
for more than 1 million documents. Using a numLists value of 1 is akin to | ||
performing brute-force search, which has limited performance. | ||
similarity (str) : Similarity metric to use with the IVF index. Possible options are COS (cosine distance), | ||
L2 (Euclidean distance), and IP (inner product). | ||
""" | ||
@staticmethod | ||
async def create(num_lists, similarity) -> DataStore: | ||
|
||
# Create underlying data store based on the API definition. | ||
# Right now this only supports Mongo, but set up to support more. | ||
apiStore: AzureCosmosDBStoreApi = None | ||
if AZCOSMOS_API == "mongo-vcore": | ||
mongoClient = MongoClient(AZCOSMOS_CONNSTR) | ||
apiStore = MongoStoreApi(mongoClient) | ||
else: | ||
raise NotImplementedError | ||
|
||
await apiStore.ensure(num_lists, similarity) | ||
store = AzureCosmosDBDataStore(apiStore) | ||
return store | ||
|
||
async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]: | ||
""" | ||
Takes in a list of list of document chunks and inserts them into the database. | ||
Return a list of document ids. | ||
""" | ||
# Initialize a list of ids to return | ||
doc_ids: List[str] = [] | ||
for doc_id, chunk_list in chunks.items(): | ||
returnedIds = await self.cosmosStore.upsert_core(doc_id, chunk_list) | ||
for returnedId in returnedIds: | ||
doc_ids.append(returnedId) | ||
return doc_ids | ||
|
||
async def _query( | ||
self, | ||
queries: List[QueryWithEmbedding], | ||
) -> List[QueryResult]: | ||
""" | ||
Takes in a list of queries with embeddings and filters and | ||
returns a list of query results with matching document chunks and scores. | ||
""" | ||
# Prepare query responses and results object | ||
results: List[QueryResult] = [] | ||
|
||
# Gather query results in a pipeline | ||
logging.info(f"Gathering {len(queries)} query results", flush=True) | ||
for query in queries: | ||
logging.info(f"Query: {query.query}") | ||
query_results = await self.cosmosStore.query_core(query) | ||
|
||
# Add to overall results | ||
results.append(QueryResult(query=query.query, results=query_results)) | ||
return results | ||
|
||
async def delete( | ||
self, | ||
ids: Optional[List[str]] = None, | ||
filter: Optional[DocumentMetadataFilter] = None, | ||
delete_all: Optional[bool] = None, | ||
) -> bool: | ||
""" | ||
Removes vectors by ids, filter, or everything in the datastore. | ||
Returns whether the operation was successful. | ||
""" | ||
if delete_all: | ||
# fast path - truncate/delete all items. | ||
await self.cosmosStore.drop_container() | ||
return True | ||
|
||
if filter: | ||
if filter.document_id is not None: | ||
await self.cosmosStore.delete_document_ids([filter.document_id]) | ||
else: | ||
await self.cosmosStore.delete_filter(filter) | ||
|
||
if ids: | ||
await self.cosmosStore.delete_ids(ids) | ||
|
||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# Azure Cosmos DB | ||
|
||
[Azure Cosmos DB](https://azure.microsoft.com/en-us/products/cosmos-db/) Azure Cosmos DB is a fully managed NoSQL and relational database for modern app development. Using Azure Cosmos DB for MongoDB vCore, you can store vector embeddings in your documents and perform [vector similarity search](https://learn.microsoft.com/azure/cosmos-db/mongodb/vcore/vector-search) on a fully managed MongoDB-compatible database service. | ||
|
||
Learn more about Azure Cosmos DB for MongoDB vCore [here](https://learn.microsoft.com/azure/cosmos-db/mongodb/vcore/). If you don't have an Azure account, you can start setting one up [here](https://azure.microsoft.com/). | ||
|
||
## Environment variables | ||
|
||
| Name | Required | Description | Default | | ||
| ---------------------------- | -------- |-------------------------------------------------------------------------| ------------------- | | ||
| `DATASTORE` | Yes | Datastore name, set to `azurecosmosdb` | | | ||
| `BEARER_TOKEN` | Yes | Secret token | | | ||
| `OPENAI_API_KEY` | Yes | OpenAI API key | | | ||
| `AZCOSMOS_API` | Yes | Name of the API you're connecting to. Currently supported `mongo-vcore` | | | ||
| `AZCOSMOS_CONNSTR` | Yes | The connection string to your account. | | | ||
| `AZCOSMOS_DATABASE_NAME` | Yes | The database where the data is stored/queried | | | ||
| `AZCOSMOS_CONTAINER_NAME` | Yes | The container where the data is stored/queried | | | ||
|
||
## Indexing | ||
On first insert, the datastore will create the collection and index if necessary on the field `embedding`. Currently hybrid search is not yet supported. |
Oops, something went wrong.