Skip to content

Atlas search lookups #325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open

Atlas search lookups #325

wants to merge 10 commits into from

Conversation

WaVEV
Copy link
Collaborator

@WaVEV WaVEV commented Jun 24, 2025

This PR adds the initial implementation of the Atlas operator.

Task:

  • Operators
  • Combinable
  • Vector search
  • Score
  • Docs
  • EmbeddedDocument operator

@WaVEV WaVEV force-pushed the atlas-search-lookups branch from 449b6a3 to ca8a7cf Compare June 26, 2025 02:56
@@ -207,9 +243,36 @@ def _build_aggregation_pipeline(self, ids, group):
pipeline.append({"$unset": "_id"})
return pipeline

def _compound_searches_queries(self, search_replacements):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to preserve this function for the future, probably want to make hybrid search and this part of the code could be useful. I know that it is weird, check the replacement len as 1 and then iterate over it. Also the exception could be raised before this point. Let me know if you want me to refactor this code.

@WaVEV WaVEV force-pushed the atlas-search-lookups branch 3 times, most recently from 9935b25 to a467a57 Compare July 12, 2025 23:32
@WaVEV WaVEV changed the title [WIP] Atlas search lookups Atlas search lookups Jul 14, 2025
@WaVEV WaVEV force-pushed the atlas-search-lookups branch 4 times, most recently from ea2118b to 206b554 Compare July 21, 2025 19:29
Comment on lines 82 to 86
def _tear_down(self, model):
collection = self._get_collection(model)
for search_indexes in collection.list_search_indexes():
collection.drop_search_index(search_indexes["name"])
collection.delete_many({})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment explaining why this is necessary?

Copy link
Collaborator Author

@WaVEV WaVEV Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Between test the data persist, is this the way to get rid of it? or I am missing something? in the same test class

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I need because TransactionTestCase. it does not wrap each test in a transaction that gets rolled back. But not 100% sure.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransactionTestCase, and TestCase when transactions aren't supported, use flush to clear the database between tests. flush uses delete_many(), so yes, it's necessary to clean up the indexes but not the collection. I think create_search_index could add the cleanup collection.drop_search_index(search_indexes["name"]) (or something similar), so that the list_search_indexes() isn't needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to fix it. But If I remove this line, some test fails because the data from the previous test is still in the collection.

Comment on lines 94 to 110
self.create_search_index(
Article,
"equals_headline_index",
{
"mappings": {
"dynamic": False,
"fields": {"headline": {"type": "token"}, "number": {"type": "number"}},
}
},
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we do the index creation/teardown in setupClass? (I would guess indexes aren't modified by any tests?)

def test_constant_score(self):
constant_score = SearchScoreOption({"constant": {"value": 10}})
qs = Article.objects.annotate(score=SearchExists(path="body", score=constant_score))
self.wait_for_assertion(lambda: self.assertCountEqual(qs.all(), [self.article]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I like that wait_for_assertion is a relatively generic API, it really seems like a lot of boilerplate with lambda, all(), ... We may want to think about possibly providing some public test class mixin with assertion helpers for users (which we could also use in this file).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do something like you mention and I didn't find a solution, but I will try again.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I tried some delayed assert, it is not perfect but usable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More generally, what's the reason the query needs to be fetched this way? Executing the same query a few times in a row doesn't return the correct results until some time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 . At some point, Atlas will have synchronized the new data. Then, the query will retrieve it, so we need to wait until the new objects are available.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any MongoDB documentation about this? I don't see any mention of have to retry in the example at https://www.mongodb.com/docs/atlas/atlas-search/tutorial/. It seems unbelievable from a usability perspective. How are querysets going to be used outside of tests? Do we need to document a special pattern? There is no distinction between "no results" and "query hasn't synced yet"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I summon @Jibola to avoid saying something that is not true. What I tried to tell is when a new index is created or data added there is a little time between it get indexed. If I do a query immediately after a new index, it will retrieve nothing, but If I wait a second the value will be pulled correctly. So, this delay that indexes needs, I don't know if it is documented but I got the idea from langchain

Maybe only the index creation needs time, but I don't know. 😬.
For Docarray the same was done:
https://github.com/docarray/docarray/blob/main/tests/index/mongo_atlas/__init__.py#L32

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whew, that makes a lot more sense than the previous theory! Depending on how long the waiting could take, we may want to consider having SchemaEditor.add_index() do the waiting, since Django migrations assume all operations run synchronously, since a data migration that follows a schema migration assumes that the previous operations have completed. (If not, it would be a caveat to document.) If we do have schema editor wait, you could use it to create the indexes in tests. If not, I guess waiting after test index creation is the way go.

Copy link
Collaborator Author

@WaVEV WaVEV Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, I found the docs. it says: This means that data inserted into a MongoDB collection and indexed by Atlas Search will not be available immediately for $search queries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whew, that makes a lot more sense than the previous theory! Depending on how long the waiting could take, we may want to consider having SchemaEditor.add_index() do the waiting, since Django migrations assume all operations run synchronously, since a data migration that follows a schema migration assumes that the previous operations have completed. (If not, it would be a caveat to document.) If we do have schema editor wait, you could use it to create the indexes in tests. If not, I guess waiting after test index creation is the way go.

Totally get the confusion here! It bamboozled me too the first time I ran into the problem.

I would say that having the SchemaEditor wait is not a bad idea! In practice, I don't see many scenarios (please inform me if otherwise!) where someone makes a migration and within 5 seconds begins iterating -- outside of tests -- but I would want it "flaggable" if at all possible.

@WaVEV WaVEV force-pushed the atlas-search-lookups branch from 206b554 to 5b83202 Compare July 22, 2025 02:31
@WaVEV WaVEV force-pushed the atlas-search-lookups branch 2 times, most recently from 4135c7f to 456028d Compare July 22, 2025 03:36
@WaVEV WaVEV force-pushed the atlas-search-lookups branch from 456028d to 65f22e6 Compare July 22, 2025 05:16
@WaVEV WaVEV marked this pull request as ready for review July 24, 2025 19:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants