Skip to content

Commit 29e01e1

Browse files
authored
Ensure vectors are always included in reindex actions (#130834)
This change modifies reindex behavior to always include vector fields, even if the target index omits embeddings from _source. This prepares for scenarios where embeddings may be automatically excluded (#130382).
1 parent 950e129 commit 29e01e1

File tree

12 files changed

+244
-24
lines changed

12 files changed

+244
-24
lines changed

docs/changelog/130834.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 130834
2+
summary: Ensure vectors are always included in reindex actions
3+
area: Vector Search
4+
type: enhancement
5+
issues: []

modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.script.Script;
4545
import org.elasticsearch.script.ScriptService;
4646
import org.elasticsearch.search.builder.SearchSourceBuilder;
47+
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
4748
import org.elasticsearch.search.sort.SortBuilder;
4849
import org.elasticsearch.threadpool.ThreadPool;
4950

@@ -119,6 +120,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
119120
BulkByScrollTask task,
120121
boolean needsSourceDocumentVersions,
121122
boolean needsSourceDocumentSeqNoAndPrimaryTerm,
123+
boolean needsVectors,
122124
Logger logger,
123125
ParentTaskAssigningClient client,
124126
ThreadPool threadPool,
@@ -131,6 +133,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
131133
task,
132134
needsSourceDocumentVersions,
133135
needsSourceDocumentSeqNoAndPrimaryTerm,
136+
needsVectors,
134137
logger,
135138
client,
136139
client,
@@ -146,6 +149,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
146149
BulkByScrollTask task,
147150
boolean needsSourceDocumentVersions,
148151
boolean needsSourceDocumentSeqNoAndPrimaryTerm,
152+
boolean needsVectors,
149153
Logger logger,
150154
ParentTaskAssigningClient searchClient,
151155
ParentTaskAssigningClient bulkClient,
@@ -173,7 +177,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
173177
bulkRetry = new Retry(BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool);
174178
scrollSource = buildScrollableResultSource(
175179
backoffPolicy,
176-
prepareSearchRequest(mainRequest, needsSourceDocumentVersions, needsSourceDocumentSeqNoAndPrimaryTerm)
180+
prepareSearchRequest(mainRequest, needsSourceDocumentVersions, needsSourceDocumentSeqNoAndPrimaryTerm, needsVectors)
177181
);
178182
scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
179183
}
@@ -186,7 +190,8 @@ public abstract class AbstractAsyncBulkByScrollAction<
186190
static <Request extends AbstractBulkByScrollRequest<Request>> SearchRequest prepareSearchRequest(
187191
Request mainRequest,
188192
boolean needsSourceDocumentVersions,
189-
boolean needsSourceDocumentSeqNoAndPrimaryTerm
193+
boolean needsSourceDocumentSeqNoAndPrimaryTerm,
194+
boolean needsVectors
190195
) {
191196
var preparedSearchRequest = new SearchRequest(mainRequest.getSearchRequest());
192197

@@ -205,6 +210,16 @@ static <Request extends AbstractBulkByScrollRequest<Request>> SearchRequest prep
205210
sourceBuilder.version(needsSourceDocumentVersions);
206211
sourceBuilder.seqNoAndPrimaryTerm(needsSourceDocumentSeqNoAndPrimaryTerm);
207212

213+
if (needsVectors) {
214+
// always include vectors in the response unless explicitly set
215+
var fetchSource = sourceBuilder.fetchSource();
216+
if (fetchSource == null) {
217+
sourceBuilder.fetchSource(FetchSourceContext.FETCH_ALL_SOURCE);
218+
} else if (fetchSource.excludeVectors() == null) {
219+
sourceBuilder.excludeVectors(false);
220+
}
221+
}
222+
208223
/*
209224
* Do not open scroll if max docs <= scroll size and not resuming on version conflicts
210225
*/

modules/reindex/src/main/java/org/elasticsearch/reindex/AsyncDeleteByQueryAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public AsyncDeleteByQueryAction(
3434
ScriptService scriptService,
3535
ActionListener<BulkByScrollResponse> listener
3636
) {
37-
super(task, false, true, logger, client, threadPool, request, listener, scriptService, null);
37+
super(task, false, true, false, logger, client, threadPool, request, listener, scriptService, null);
3838
}
3939

4040
@Override

modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re
237237
*/
238238
request.getDestination().versionType() != VersionType.INTERNAL,
239239
false,
240+
true,
240241
logger,
241242
searchClient,
242243
bulkClient,

modules/reindex/src/main/java/org/elasticsearch/reindex/TransportUpdateByQueryAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Up
124124
// use sequence number powered optimistic concurrency control unless requested
125125
request.getSearchRequest().source() != null && Boolean.TRUE.equals(request.getSearchRequest().source().version()),
126126
true,
127+
true,
127128
logger,
128129
client,
129130
threadPool,

modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteRequestBuilders.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
2121
import org.elasticsearch.common.xcontent.XContentHelper;
2222
import org.elasticsearch.core.TimeValue;
23+
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
2324
import org.elasticsearch.search.sort.FieldSortBuilder;
2425
import org.elasticsearch.search.sort.SortBuilder;
2526
import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -140,13 +141,33 @@ static Request initialSearch(SearchRequest searchRequest, BytesReference query,
140141
}
141142
}
142143

143-
if (searchRequest.source().fetchSource() != null) {
144-
entity.field("_source", searchRequest.source().fetchSource());
145-
} else {
144+
var fetchSource = searchRequest.source().fetchSource();
145+
if (fetchSource == null) {
146146
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
147147
// Versions before 1.0 don't support `"_source": true` so we have to ask for the source as a stored field.
148148
entity.field("_source", true);
149149
}
150+
} else {
151+
if (remoteVersion.onOrAfter(Version.V_9_1_0) || fetchSource.excludeVectors() == null) {
152+
entity.field("_source", fetchSource);
153+
} else {
154+
// Versions before 9.1.0 don't support "exclude_vectors" so we need to manually convert.
155+
if (fetchSource.includes().length == 0 && fetchSource.excludes().length == 0) {
156+
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
157+
// Versions before 1.0 don't support `"_source": true` so we have to ask for the source as a stored field.
158+
entity.field("_source", true);
159+
}
160+
} else {
161+
entity.startObject("_source");
162+
if (fetchSource.includes().length > 0) {
163+
entity.field(FetchSourceContext.INCLUDES_FIELD.getPreferredName(), fetchSource.includes());
164+
}
165+
if (fetchSource.excludes().length > 0) {
166+
entity.field(FetchSourceContext.EXCLUDES_FIELD.getPreferredName(), fetchSource.excludes());
167+
}
168+
entity.endObject();
169+
}
170+
}
150171
}
151172

152173
entity.endObject();

modules/reindex/src/test/java/org/elasticsearch/reindex/AsyncBulkByScrollActionTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -876,15 +876,15 @@ public void done(TimeValue extraKeepAlive) {}
876876
}
877877

878878
public void testEnableScrollByDefault() {
879-
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
879+
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false, false);
880880
assertThat(preparedSearchRequest.scroll(), notNullValue());
881881
}
882882

883883
public void testEnableScrollWhenMaxDocsIsGreaterThenScrollSize() {
884884
testRequest.setMaxDocs(between(101, 1000));
885885
testRequest.getSearchRequest().source().size(100);
886886

887-
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
887+
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false, false);
888888

889889
assertThat(preparedSearchRequest.scroll(), notNullValue());
890890
}
@@ -893,7 +893,7 @@ public void testDisableScrollWhenMaxDocsIsLessThenScrollSize() {
893893
testRequest.setMaxDocs(between(1, 100));
894894
testRequest.getSearchRequest().source().size(100);
895895

896-
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
896+
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false, false);
897897

898898
assertThat(preparedSearchRequest.scroll(), nullValue());
899899
}
@@ -903,7 +903,7 @@ public void testEnableScrollWhenProceedOnVersionConflict() {
903903
testRequest.getSearchRequest().source().size(100);
904904
testRequest.setAbortOnVersionConflict(false);
905905

906-
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
906+
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false, false);
907907

908908
assertThat(preparedSearchRequest.scroll(), notNullValue());
909909
}
@@ -943,6 +943,7 @@ private class DummyAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollActi
943943
testTask,
944944
randomBoolean(),
945945
randomBoolean(),
946+
randomBoolean(),
946947
AsyncBulkByScrollActionTests.this.logger,
947948
new ParentTaskAssigningClient(client, localNode, testTask),
948949
client.threadPool(),

modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexBasicTests.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
package org.elasticsearch.reindex;
1111

1212
import org.elasticsearch.action.index.IndexRequestBuilder;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.index.IndexSettings;
1315
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
1416
import org.elasticsearch.index.reindex.BulkByScrollResponse;
1517
import org.elasticsearch.index.reindex.ReindexRequestBuilder;
@@ -22,7 +24,10 @@
2224
import java.util.stream.Collectors;
2325

2426
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
27+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
2528
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
29+
import static org.hamcrest.Matchers.anyOf;
30+
import static org.hamcrest.Matchers.equalTo;
2631
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
2732
import static org.hamcrest.Matchers.hasSize;
2833
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -175,4 +180,60 @@ public void testReindexFromComplexDateMathIndexName() throws Exception {
175180
assertHitCount(prepareSearch(destIndexName).setSize(0), 4);
176181
}
177182

183+
public void testReindexIncludeVectors() throws Exception {
184+
var resp1 = prepareCreate("test").setSettings(
185+
Settings.builder().put(IndexSettings.INDEX_MAPPING_SOURCE_SYNTHETIC_VECTORS_SETTING.getKey(), true).build()
186+
).setMapping("foo", "type=dense_vector,similarity=l2_norm", "bar", "type=sparse_vector").get();
187+
assertAcked(resp1);
188+
189+
var resp2 = prepareCreate("test_reindex").setSettings(
190+
Settings.builder().put(IndexSettings.INDEX_MAPPING_SOURCE_SYNTHETIC_VECTORS_SETTING.getKey(), true).build()
191+
).setMapping("foo", "type=dense_vector,similarity=l2_norm", "bar", "type=sparse_vector").get();
192+
assertAcked(resp2);
193+
194+
indexRandom(
195+
true,
196+
prepareIndex("test").setId("1").setSource("foo", List.of(3f, 2f, 1.5f), "bar", Map.of("token_1", 4f, "token_2", 7f))
197+
);
198+
199+
var searchResponse = prepareSearch("test").get();
200+
try {
201+
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
202+
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
203+
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
204+
assertThat(sourceMap.size(), equalTo(0));
205+
} finally {
206+
searchResponse.decRef();
207+
}
208+
209+
// Copy all the docs
210+
ReindexRequestBuilder copy = reindex().source("test").destination("test_reindex").refresh(true);
211+
var reindexResponse = copy.get();
212+
assertThat(reindexResponse, matcher().created(1));
213+
214+
searchResponse = prepareSearch("test_reindex").get();
215+
try {
216+
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
217+
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
218+
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
219+
assertThat(sourceMap.size(), equalTo(0));
220+
} finally {
221+
searchResponse.decRef();
222+
}
223+
224+
searchResponse = prepareSearch("test_reindex").setExcludeVectors(false).get();
225+
try {
226+
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
227+
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
228+
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
229+
assertThat(sourceMap.get("foo"), anyOf(equalTo(List.of(3f, 2f, 1.5f)), equalTo(List.of(3d, 2d, 1.5d))));
230+
assertThat(
231+
sourceMap.get("bar"),
232+
anyOf(equalTo(Map.of("token_1", 4f, "token_2", 7f)), equalTo(Map.of("token_1", 4d, "token_2", 7d)))
233+
);
234+
} finally {
235+
searchResponse.decRef();
236+
}
237+
}
238+
178239
}

modules/reindex/src/test/java/org/elasticsearch/reindex/UpdateByQueryBasicTests.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
package org.elasticsearch.reindex;
1111

1212
import org.elasticsearch.action.index.IndexRequestBuilder;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.index.IndexSettings;
1315
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
1416
import org.elasticsearch.index.reindex.BulkByScrollResponse;
1517
import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
@@ -23,7 +25,10 @@
2325
import java.util.stream.Collectors;
2426

2527
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
28+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
2629
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
30+
import static org.hamcrest.Matchers.anyOf;
31+
import static org.hamcrest.Matchers.equalTo;
2732
import static org.hamcrest.Matchers.hasSize;
2833

2934
public class UpdateByQueryBasicTests extends ReindexTestCase {
@@ -150,4 +155,54 @@ public void testMissingSources() {
150155
.get();
151156
assertThat(response, matcher().updated(0).slices(hasSize(0)));
152157
}
158+
159+
public void testUpdateByQueryIncludeVectors() throws Exception {
160+
var resp1 = prepareCreate("test").setSettings(
161+
Settings.builder().put(IndexSettings.INDEX_MAPPING_SOURCE_SYNTHETIC_VECTORS_SETTING.getKey(), true).build()
162+
).setMapping("foo", "type=dense_vector,similarity=l2_norm", "bar", "type=sparse_vector").get();
163+
assertAcked(resp1);
164+
165+
indexRandom(
166+
true,
167+
prepareIndex("test").setId("1").setSource("foo", List.of(3.0f, 2.0f, 1.5f), "bar", Map.of("token_1", 4f, "token_2", 7f))
168+
);
169+
170+
var searchResponse = prepareSearch("test").get();
171+
try {
172+
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
173+
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
174+
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
175+
assertThat(sourceMap.size(), equalTo(0));
176+
} finally {
177+
searchResponse.decRef();
178+
}
179+
180+
// Copy all the docs
181+
var updateByQueryResponse = updateByQuery().source("test").refresh(true).get();
182+
assertThat(updateByQueryResponse, matcher().updated(1L));
183+
184+
searchResponse = prepareSearch("test").get();
185+
try {
186+
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
187+
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
188+
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
189+
assertThat(sourceMap.size(), equalTo(0));
190+
} finally {
191+
searchResponse.decRef();
192+
}
193+
194+
searchResponse = prepareSearch("test").setExcludeVectors(false).get();
195+
try {
196+
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
197+
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
198+
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
199+
assertThat(sourceMap.get("foo"), anyOf(equalTo(List.of(3f, 2f, 1.5f)), equalTo(List.of(3d, 2d, 1.5d))));
200+
assertThat(
201+
sourceMap.get("bar"),
202+
anyOf(equalTo(Map.of("token_1", 4f, "token_2", 7f)), equalTo(Map.of("token_1", 4d, "token_2", 7d)))
203+
);
204+
} finally {
205+
searchResponse.decRef();
206+
}
207+
}
153208
}

0 commit comments

Comments
 (0)