Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ public ShardSearchPhaseAPMMetrics(MeterRegistry meterRegistry) {
}

@Override
public void onCanMatchPhase(long tookInNanos) {
recordPhaseLatency(canMatchPhaseMetric, tookInNanos);
public void onCanMatchPhase(Map<String, Object> searchRequestAttributes, long tookInNanos) {
canMatchPhaseMetric.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos), searchRequestAttributes);
}

@Override
public void onDfsPhase(SearchContext searchContext, long tookInNanos) {
recordPhaseLatency(dfsPhaseMetric, tookInNanos);
SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
Long timeRangeFilterFromMillis = searchExecutionContext.getTimeRangeFilterFromMillis();
recordPhaseLatency(dfsPhaseMetric, tookInNanos, searchContext.request(), timeRangeFilterFromMillis);
}

@Override
Expand All @@ -80,10 +82,6 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
recordPhaseLatency(fetchPhaseMetric, tookInNanos, searchContext.request(), timeRangeFilterFromMillis);
}

private static void recordPhaseLatency(LongHistogram histogramMetric, long tookInNanos) {
histogramMetric.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos));
}

private static void recordPhaseLatency(
LongHistogram histogramMetric,
long tookInNanos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.transport.TransportRequest;

import java.util.List;
import java.util.Map;

/**
* An listener for search, fetch and context events.
Expand Down Expand Up @@ -91,9 +92,10 @@ default void onFailedDfsPhase(SearchContext searchContext) {}
* Executed after the can-match phase successfully finished.
* Note: this is not invoked if the can match phase execution failed.
*
* @param searchRequestAttributes the attributes of the search request
* @param tookInNanos the number of nanoseconds the can-match execution took
*/
default void onCanMatchPhase(long tookInNanos) {}
default void onCanMatchPhase(Map<String, Object> searchRequestAttributes, long tookInNanos) {}

/**
* Executed when a new reader context was created
Expand Down Expand Up @@ -246,10 +248,10 @@ public void onDfsPhase(SearchContext searchContext, long tookInNanos) {
}

@Override
public void onCanMatchPhase(long tookInNanos) {
public void onCanMatchPhase(Map<String, Object> searchRequestAttributes, long tookInNanos) {
for (SearchOperationListener listener : listeners) {
try {
listener.onCanMatchPhase(tookInNanos);
listener.onCanMatchPhase(searchRequestAttributes, tookInNanos);
} catch (Exception e) {
logger.warn(() -> "onCanMatchPhase listener [" + listener + "] failed", e);
}
Expand Down
15 changes: 14 additions & 1 deletion server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.search.CanMatchNodeRequest;
import org.elasticsearch.action.search.CanMatchNodeResponse;
import org.elasticsearch.action.search.OnlinePrewarmingService;
import org.elasticsearch.action.search.SearchRequestAttributesExtractor;
import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.TransportActions;
Expand Down Expand Up @@ -1982,6 +1983,7 @@ public AliasFilter buildAliasFilter(ProjectState state, String index, Set<Resolv
public void canMatch(CanMatchNodeRequest request, ActionListener<CanMatchNodeResponse> listener) {
var shardLevelRequests = request.getShardLevelRequests();
final List<CanMatchNodeResponse.ResponseOrFailure> responses = new ArrayList<>(shardLevelRequests.size());
Map<String, Object> searchRequestAttributes = null;
for (var shardLevelRequest : shardLevelRequests) {
long shardCanMatchStartTimeInNanos = System.nanoTime();
ShardSearchRequest shardSearchRequest = request.createShardSearchRequest(shardLevelRequest);
Expand All @@ -1992,7 +1994,18 @@ public void canMatch(CanMatchNodeRequest request, ActionListener<CanMatchNodeRes
CanMatchContext canMatchContext = createCanMatchContext(shardSearchRequest);
CanMatchShardResponse canMatchShardResponse = canMatch(canMatchContext, true);
responses.add(new CanMatchNodeResponse.ResponseOrFailure(canMatchShardResponse));
indexShard.getSearchOperationListener().onCanMatchPhase(System.nanoTime() - shardCanMatchStartTimeInNanos);

if (searchRequestAttributes == null) {
// All of the shards in the request are for the same search, so the attributes will be the same for all shards.
searchRequestAttributes = SearchRequestAttributesExtractor.extractAttributes(
shardSearchRequest,
canMatchContext.getTimeRangeFilterFromMillis(),
shardSearchRequest.nowInMillis()
);
}

indexShard.getSearchOperationListener()
.onCanMatchPhase(searchRequestAttributes, System.nanoTime() - shardCanMatchStartTimeInNanos);
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you restore the try catch please and the TODO? Sadly, more work is required to remove it, around removing dead code from the can match serialization layer, add a transport version for it etc. Feel free to take that on as a follow-up if you wish, I can share more details about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to restore the TODO before responses.add(...)

// TODO remove the exception handling as it's now in canMatch itself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is restored based on looking at the full diff between main and this branch (actually the full diff shows the TODO as unmodified and in the try block, not the catch block). Sorry the commit history messed up when I did a bad rebase.

responses.add(new CanMatchNodeResponse.ResponseOrFailure(e));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void testMetricsDfsQueryThenFetch() {
);
final List<Measurement> dfsMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(DFS_SEARCH_PHASE_METRIC);
assertEquals(num_primaries, dfsMeasurements.size());
assertAttributes(dfsMeasurements, false, false);
final List<Measurement> queryMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(QUERY_SEARCH_PHASE_METRIC);
assertEquals(num_primaries, queryMeasurements.size());
final List<Measurement> fetchMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(FETCH_SEARCH_PHASE_METRIC);
Expand All @@ -117,8 +118,10 @@ public void testMetricsDfsQueryThenFetchSystem() {
);
final List<Measurement> dfsMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(DFS_SEARCH_PHASE_METRIC);
assertEquals(0, dfsMeasurements.size()); // DFS phase not done for index with single shard
assertAttributes(dfsMeasurements, true, false);
final List<Measurement> queryMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(QUERY_SEARCH_PHASE_METRIC);
assertEquals(1, queryMeasurements.size());
assertAttributes(queryMeasurements, true, false);
final List<Measurement> fetchMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(FETCH_SEARCH_PHASE_METRIC);
assertEquals(1, fetchMeasurements.size());
assertAttributes(fetchMeasurements, true, false);
Expand Down Expand Up @@ -368,6 +371,17 @@ public void testTimeRangeFilterAllResults() {
});
final List<Measurement> canMatchMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(CAN_MATCH_SEARCH_PHASE_METRIC);
assertEquals(num_primaries, canMatchMeasurements.size());
for (Measurement measurement : canMatchMeasurements) {
Map<String, Object> attributes = measurement.attributes();
assertEquals(5, attributes.size());
assertEquals("user", attributes.get("target"));
assertEquals("hits_only", attributes.get("query_type"));
assertEquals("_score", attributes.get("sort"));
assertEquals(false, attributes.get(SearchRequestAttributesExtractor.SYSTEM_THREAD_ATTRIBUTE_NAME));
// the range query was rewritten to one without bounds: we do track the time range filter from value but we don't set
// the time range filter field because no range query is executed at the shard level.
assertEquals("older_than_14_days", attributes.get("time_range_filter_from"));
}
final List<Measurement> queryMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(QUERY_SEARCH_PHASE_METRIC);
// the two docs are at most spread across two shards, other shards are empty and get filtered out
assertThat(queryMeasurements.size(), Matchers.lessThanOrEqualTo(2));
Expand Down