Skip to content

Commit 6d192e7

Browse files
committed
Only update new vectors
1 parent b446eb3 commit 6d192e7

File tree

4 files changed

+65
-4
lines changed

4 files changed

+65
-4
lines changed

plugins/qdrant/superduper_qdrant/qdrant.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def __init__(
9191
config_dict["timeout"] = 60 # 60 seconds timeout
9292
self.client = QdrantClient(**config_dict)
9393

94-
logging.info('Found these collections in the qdrant collection:')
94+
logging.info('Found these collections in the qdrant connection:')
9595
collections = self.client.get_collections().collections
9696
for collection in collections:
9797
logging.info(f" - {collection.name}")
@@ -122,6 +122,45 @@ def initialize(self):
122122
def __len__(self):
123123
return self.client.get_collection(self.identifier).vectors_count
124124

125+
def list(self):
126+
"""List all ids in the index."""
127+
if not self.client.collection_exists(self.identifier):
128+
return []
129+
130+
ids: t.List[str] = []
131+
next_page: t.Optional[models.ScrollCursor] = None
132+
limit = max(256, self.batch_size) # reasonable paging size
133+
134+
# Reuse the same retry pattern as upserts
135+
retry = Retry(exception_types=QDRANT_RETRY_EXCEPTIONS)
136+
137+
@retry
138+
def _do_scroll(offset):
139+
return self.client.scroll(
140+
collection_name=self.identifier,
141+
limit=limit,
142+
with_payload=[ID_PAYLOAD_KEY],
143+
with_vectors=False,
144+
offset=offset,
145+
)
146+
147+
while True:
148+
records, next_page = _do_scroll(next_page)
149+
150+
if not records:
151+
break
152+
153+
for rec in records:
154+
# We store original IDs in the payload under ID_PAYLOAD_KEY
155+
if rec.payload and ID_PAYLOAD_KEY in rec.payload:
156+
ids.append(rec.payload[ID_PAYLOAD_KEY])
157+
158+
if next_page is None:
159+
break
160+
161+
return ids
162+
163+
125164
def _create_collection(self):
126165
measure = (
127166
self.measure.name

superduper/backends/base/vector_search.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ def db(self) -> 'Datalayer':
187187
def db(self, value: 'Datalayer'):
188188
self._db = value
189189

190+
@abstractmethod
191+
def list(self) -> t.List[str]:
192+
"""List all ids in the index."""
193+
pass
194+
190195
@classmethod
191196
def from_component(cls, index: 'VectorIndex'):
192197
"""Create a vector searcher from a vector index.

superduper/backends/local/vector_search.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,26 @@ def initialize(self):
5050
if t.is_component and t.cls is not None:
5151
if issubclass(t.cls, VectorIndex):
5252
components.append(t.identifier)
53+
5354
for component in components:
5455
try:
5556
for identifier in self.db.show(component):
5657
try:
5758
vector_index = self.db.load(component, identifier=identifier)
5859
self.put_component(component, vector_index.uuid)
59-
vectors = vector_index.get_vectors()
60-
vectors = [VectorItem(**vector) for vector in vectors]
61-
self.get_tool(vector_index.uuid).add(vectors)
60+
all_ids = vector_index.list()
61+
tool = self.get_tool(vector_index.uuid)
62+
deployed_ids = tool.list()
63+
to_deploy_ids = list(set(all_ids) - set(deployed_ids))
64+
if to_deploy_ids:
65+
vectors = vector_index.get_vectors(ids=to_deploy_ids)
66+
vectors = [VectorItem(**vector) for vector in vectors]
67+
self.get_tool(vector_index.uuid).add(vectors)
68+
else:
69+
logging.info(
70+
'Skipping since have already deployed vectors for '
71+
f'component: {component}, vector_index: {vector_index.uuid}'
72+
)
6273

6374
except FileNotFoundError:
6475
logging.error(
@@ -191,6 +202,9 @@ def _setup(self, h, index):
191202
self.index = index
192203
self.lookup = dict(zip(index, range(len(index))))
193204

205+
def list(self):
206+
return self.index if self.index is not None else []
207+
194208
def describe(self):
195209
"""Describe the vector index.
196210

superduper/components/vector_index.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ def setup(self):
8585
)
8686
return self
8787

88+
def list(self):
89+
return self.db[self.indexing_listener.outputs].ids()
90+
8891
@ensure_setup
8992
def get_vectors(self, ids: t.Sequence[str] | None = None):
9093
"""Get vectors from the vector index.

0 commit comments

Comments
 (0)