Skip to content

Commit 2b8d23f

Browse files
committed
[ES|QL] Fix agg_metric_double sub-block serialization
Fixes serializing error when sub-blocks are ConstantNullBlocks. Related to #137279
1 parent fafb162 commit 2b8d23f

File tree

6 files changed

+88
-31
lines changed

6 files changed

+88
-31
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9205000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
esql_resolve_fields_response_used,9204000
1+
esql_aggregate_metric_double_serialize_null_block_fix,9205000

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.data;
99

10+
import org.elasticsearch.TransportVersion;
1011
import org.elasticsearch.common.io.stream.StreamInput;
1112
import org.elasticsearch.common.io.stream.StreamOutput;
1213
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -25,6 +26,10 @@ public final class AggregateMetricDoubleArrayBlock extends AbstractNonThreadSafe
2526
private final IntBlock countBlock;
2627
private final int positionCount;
2728

29+
private static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_SERIALIZE_NULL_BLOCK_FIX = TransportVersion.fromName(
30+
"esql_aggregate_metric_double_serialize_null_block_fix"
31+
);
32+
2833
public AggregateMetricDoubleArrayBlock(DoubleBlock minBlock, DoubleBlock maxBlock, DoubleBlock sumBlock, IntBlock countBlock) {
2934
this.minBlock = minBlock;
3035
this.maxBlock = maxBlock;
@@ -235,8 +240,14 @@ public AggregateMetricDoubleBlock expand() {
235240

236241
@Override
237242
public void writeTo(StreamOutput out) throws IOException {
238-
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
239-
block.writeTo(out);
243+
if (out.getTransportVersion().supports(ESQL_AGGREGATE_METRIC_DOUBLE_SERIALIZE_NULL_BLOCK_FIX)) {
244+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
245+
Block.writeTypedBlock(block, out);
246+
}
247+
} else {
248+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
249+
block.writeTo(out);
250+
}
240251
}
241252
}
242253

@@ -248,10 +259,17 @@ public static Block readFrom(StreamInput in) throws IOException {
248259
IntBlock countBlock = null;
249260
BlockStreamInput blockStreamInput = (BlockStreamInput) in;
250261
try {
251-
minBlock = DoubleBlock.readFrom(blockStreamInput);
252-
maxBlock = DoubleBlock.readFrom(blockStreamInput);
253-
sumBlock = DoubleBlock.readFrom(blockStreamInput);
254-
countBlock = IntBlock.readFrom(blockStreamInput);
262+
if (blockStreamInput.getTransportVersion().supports(ESQL_AGGREGATE_METRIC_DOUBLE_SERIALIZE_NULL_BLOCK_FIX)) {
263+
minBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
264+
maxBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
265+
sumBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
266+
countBlock = (IntBlock) Block.readTypedBlock(blockStreamInput);
267+
} else {
268+
minBlock = DoubleBlock.readFrom(blockStreamInput);
269+
maxBlock = DoubleBlock.readFrom(blockStreamInput);
270+
sumBlock = DoubleBlock.readFrom(blockStreamInput);
271+
countBlock = IntBlock.readFrom(blockStreamInput);
272+
}
255273
AggregateMetricDoubleArrayBlock result = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, sumBlock, countBlock);
256274
success = true;
257275
return result;
Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,62 @@
77

88
package org.elasticsearch.compute.data;
99

10+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1011
import org.elasticsearch.compute.test.ComputeTestCase;
1112
import org.elasticsearch.compute.test.TestBlockFactory;
1213
import org.elasticsearch.core.Releasables;
1314

15+
import java.io.IOException;
1416
import java.util.List;
1517

16-
public class AggregateMetricDoubleBlockEqualityTests extends ComputeTestCase {
18+
import static org.elasticsearch.compute.test.BlockTestUtils.randomAggregateMetricDoubleLiteral;
19+
import static org.hamcrest.Matchers.equalTo;
20+
21+
public class AggregateMetricDoubleBlockTests extends ComputeTestCase {
1722

1823
static final BlockFactory blockFactory = TestBlockFactory.getNonBreakingInstance();
1924

20-
public void testEmptyBlock() {
25+
public void testPopulatedBlockSerialization() throws IOException {
26+
int elementCount = randomIntBetween(1, 100);
27+
AggregateMetricDoubleBlockBuilder builder = blockFactory.newAggregateMetricDoubleBlockBuilder(elementCount);
28+
for (int i = 0; i < elementCount; i++) {
29+
if (randomBoolean()) {
30+
builder.appendNull();
31+
} else {
32+
builder.appendLiteral(randomAggregateMetricDoubleLiteral(false));
33+
}
34+
}
35+
AggregateMetricDoubleBlock block = builder.build();
36+
Block deserializedBlock = serializationRoundTrip(block);
37+
assertThat(deserializedBlock, equalTo(block));
38+
Releasables.close(block, deserializedBlock);
39+
}
40+
41+
public void testNullSerialization() throws IOException {
42+
// sub-blocks can be constant null, those should serialize correctly too
43+
int elementCount = randomIntBetween(1, 100);
44+
45+
Block block = new AggregateMetricDoubleArrayBlock(
46+
(DoubleBlock) blockFactory.newConstantNullBlock(elementCount),
47+
(DoubleBlock) blockFactory.newConstantNullBlock(elementCount),
48+
(DoubleBlock) blockFactory.newConstantNullBlock(elementCount),
49+
(IntBlock) blockFactory.newConstantNullBlock(elementCount)
50+
);
51+
52+
Block deserializedBlock = serializationRoundTrip(block);
53+
assertThat(deserializedBlock, equalTo(block));
54+
Releasables.close(block, deserializedBlock);
55+
}
56+
57+
private Block serializationRoundTrip(Block block) throws IOException {
58+
BytesStreamOutput out = new BytesStreamOutput();
59+
Block.writeTypedBlock(block, out);
60+
try (BlockStreamInput input = new BlockStreamInput(out.bytes().streamInput(), blockFactory)) {
61+
return Block.readTypedBlock(input);
62+
}
63+
}
64+
65+
public void testEmptyBlockEquality() {
2166
// all these "empty" blocks should be equivalent
2267
var partialMetricBuilder = blockFactory.newAggregateMetricDoubleBlockBuilder(0);
2368
for (var subBuilder : List.of(

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.apache.lucene.util.BytesRef;
1414
import org.elasticsearch.common.breaker.CircuitBreaker;
1515
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
16-
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral;
1716
import org.elasticsearch.compute.data.Block;
1817
import org.elasticsearch.compute.data.BlockFactory;
1918
import org.elasticsearch.compute.data.BlockUtils;
@@ -30,6 +29,7 @@
3029
import java.util.List;
3130
import java.util.function.Supplier;
3231

32+
import static org.elasticsearch.compute.test.BlockTestUtils.randomAggregateMetricDoubleLiteral;
3333
import static org.hamcrest.Matchers.equalTo;
3434
import static org.hamcrest.Matchers.greaterThan;
3535

@@ -54,15 +54,15 @@ public static Iterable<Object[]> parameters() {
5454
"regular aggregate_metric_double",
5555
e,
5656
TopNEncoder.DEFAULT_UNSORTABLE,
57-
() -> randomAggregateMetricDouble(true)
57+
() -> randomAggregateMetricDoubleLiteral(true)
5858
)
5959
);
6060
cases.add(
6161
valueTestCase(
62-
"aggregate_metric_double with nulls",
62+
"aggregate_metric_double that can have nulls",
6363
e,
6464
TopNEncoder.DEFAULT_UNSORTABLE,
65-
() -> randomAggregateMetricDouble(false)
65+
() -> randomAggregateMetricDoubleLiteral(false)
6666
)
6767
);
6868
}
@@ -247,16 +247,4 @@ public void testInKey() {
247247

248248
assertThat(result.build(), equalTo(value));
249249
}
250-
251-
public static AggregateMetricDoubleLiteral randomAggregateMetricDouble(boolean allMetrics) {
252-
if (allMetrics) {
253-
return new AggregateMetricDoubleLiteral(randomDouble(), randomDouble(), randomDouble(), randomInt());
254-
}
255-
return new AggregateMetricDoubleLiteral(
256-
randomBoolean() ? randomDouble() : null,
257-
randomBoolean() ? randomDouble() : null,
258-
randomBoolean() ? randomDouble() : null,
259-
randomBoolean() ? randomInt() : null
260-
);
261-
}
262250
}

x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.lucene.util.BytesRef;
1111
import org.elasticsearch.compute.data.AggregateMetricDoubleBlock;
1212
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
13+
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral;
1314
import org.elasticsearch.compute.data.Block;
1415
import org.elasticsearch.compute.data.BlockFactory;
1516
import org.elasticsearch.compute.data.BlockUtils;
@@ -65,12 +66,7 @@ public static Object randomValue(ElementType e) {
6566
case DOUBLE -> randomDouble();
6667
case BYTES_REF -> new BytesRef(randomRealisticUnicodeOfCodepointLengthBetween(0, 5)); // TODO: also test spatial WKB
6768
case BOOLEAN -> randomBoolean();
68-
case AGGREGATE_METRIC_DOUBLE -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(
69-
randomDouble(),
70-
randomDouble(),
71-
randomDouble(),
72-
randomNonNegativeInt()
73-
);
69+
case AGGREGATE_METRIC_DOUBLE -> randomAggregateMetricDoubleLiteral(false);
7470
case DOC -> new BlockUtils.Doc(
7571
randomIntBetween(0, 255), // Shard ID should be small and non-negative.
7672
randomInt(),
@@ -397,6 +393,15 @@ public static ExponentialHistogram randomExponentialHistogram() {
397393
return histo;
398394
}
399395

396+
public static AggregateMetricDoubleLiteral randomAggregateMetricDoubleLiteral(boolean allMetrics) {
397+
return new AggregateMetricDoubleLiteral(
398+
allMetrics || randomBoolean() ? randomDouble() : null,
399+
allMetrics || randomBoolean() ? randomDouble() : null,
400+
allMetrics || randomBoolean() ? randomDouble() : null,
401+
allMetrics || randomBoolean() ? randomNonNegativeInt() : null
402+
);
403+
}
404+
400405
private static int dedupe(Map<BytesRef, Integer> dedupe, BytesRefVector.Builder bytes, BytesRef v) {
401406
Integer current = dedupe.get(v);
402407
if (current != null) {

0 commit comments

Comments
 (0)