Skip to content

Commit d6891f2

Browse files
committed
Remove unused code for pre-computed hashes in exchanges
1 parent 187d53e commit d6891f2

31 files changed

+91
-268
lines changed

core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.trino.operator.BucketPartitionFunction;
2525
import io.trino.operator.HashGenerator;
2626
import io.trino.operator.PartitionFunction;
27-
import io.trino.operator.PrecomputedHashGenerator;
2827
import io.trino.operator.output.SkewedPartitionRebalancer;
2928
import io.trino.spi.Page;
3029
import io.trino.spi.type.Type;
@@ -37,7 +36,6 @@
3736
import java.io.Closeable;
3837
import java.util.HashSet;
3938
import java.util.List;
40-
import java.util.Optional;
4139
import java.util.Set;
4240
import java.util.concurrent.atomic.AtomicInteger;
4341
import java.util.concurrent.atomic.AtomicLong;
@@ -94,7 +92,6 @@ public LocalExchange(
9492
PartitioningHandle partitioning,
9593
List<Integer> partitionChannels,
9694
List<Type> partitionChannelTypes,
97-
Optional<Integer> partitionHashChannel,
9895
DataSize maxBufferedBytes,
9996
TypeOperators typeOperators,
10097
DataSize writerScalingMinDataProcessed,
@@ -159,8 +156,7 @@ else if (isScaledWriterHashDistribution(partitioning)) {
159156
partitioning,
160157
partitionCount,
161158
partitionChannels,
162-
partitionChannelTypes,
163-
partitionHashChannel);
159+
partitionChannelTypes);
164160
return new ScaleWriterPartitioningExchanger(
165161
asPageConsumers(sources),
166162
memoryManager,
@@ -187,8 +183,7 @@ else if (partitioning.equals(FIXED_HASH_DISTRIBUTION) || partitioning.getCatalog
187183
partitioning,
188184
bufferCount,
189185
partitionChannels,
190-
partitionChannelTypes,
191-
partitionHashChannel);
186+
partitionChannelTypes);
192187
return new PartitioningExchanger(
193188
asPageConsumers(sources),
194189
memoryManager,
@@ -242,19 +237,12 @@ private static PartitionFunction createPartitionFunction(
242237
PartitioningHandle partitioning,
243238
int partitionCount,
244239
List<Integer> partitionChannels,
245-
List<Type> partitionChannelTypes,
246-
Optional<Integer> partitionHashChannel)
240+
List<Type> partitionChannelTypes)
247241
{
248242
checkArgument(Integer.bitCount(partitionCount) == 1, "partitionCount must be a power of 2");
249243

250244
if (isSystemPartitioning(partitioning)) {
251-
HashGenerator hashGenerator;
252-
if (partitionHashChannel.isPresent()) {
253-
hashGenerator = new PrecomputedHashGenerator(partitionHashChannel.get());
254-
}
255-
else {
256-
hashGenerator = createChannelsHashGenerator(partitionChannelTypes, Ints.toArray(partitionChannels), typeOperators);
257-
}
245+
HashGenerator hashGenerator = createChannelsHashGenerator(partitionChannelTypes, Ints.toArray(partitionChannels), typeOperators);
258246
return new LocalPartitionGenerator(hashGenerator, partitionCount);
259247
}
260248

core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java

Lines changed: 24 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -566,40 +566,30 @@ public LocalExecutionPlan plan(
566566
}
567567

568568
// We can convert the symbols directly into channels, because the root must be a sink and therefore the layout is fixed
569-
List<Integer> partitionChannels;
570-
List<Optional<NullableValue>> partitionConstants;
571-
List<Type> partitionChannelTypes;
572-
if (partitioningScheme.getHashColumn().isPresent()) {
573-
partitionChannels = ImmutableList.of(outputLayout.indexOf(partitioningScheme.getHashColumn().get()));
574-
partitionConstants = ImmutableList.of(Optional.empty());
575-
partitionChannelTypes = ImmutableList.of(BIGINT);
576-
}
577-
else {
578-
partitionChannels = partitioningScheme.getPartitioning().getArguments().stream()
579-
.map(argument -> {
580-
if (argument.isConstant()) {
581-
return -1;
582-
}
583-
return outputLayout.indexOf(argument.getColumn());
584-
})
585-
.collect(toImmutableList());
586-
partitionConstants = partitioningScheme.getPartitioning().getArguments().stream()
587-
.map(argument -> {
588-
if (argument.isConstant()) {
589-
return Optional.of(argument.getConstant());
590-
}
591-
return Optional.<NullableValue>empty();
592-
})
593-
.collect(toImmutableList());
594-
partitionChannelTypes = partitioningScheme.getPartitioning().getArguments().stream()
595-
.map(argument -> {
596-
if (argument.isConstant()) {
597-
return argument.getConstant().getType();
598-
}
599-
return argument.getColumn().type();
600-
})
601-
.collect(toImmutableList());
602-
}
569+
List<Integer> partitionChannels = partitioningScheme.getPartitioning().getArguments().stream()
570+
.map(argument -> {
571+
if (argument.isConstant()) {
572+
return -1;
573+
}
574+
return outputLayout.indexOf(argument.getColumn());
575+
})
576+
.collect(toImmutableList());
577+
List<Optional<NullableValue>> partitionConstants = partitioningScheme.getPartitioning().getArguments().stream()
578+
.map(argument -> {
579+
if (argument.isConstant()) {
580+
return Optional.of(argument.getConstant());
581+
}
582+
return Optional.<NullableValue>empty();
583+
})
584+
.collect(toImmutableList());
585+
List<Type> partitionChannelTypes = partitioningScheme.getPartitioning().getArguments().stream()
586+
.map(argument -> {
587+
if (argument.isConstant()) {
588+
return argument.getConstant().getType();
589+
}
590+
return argument.getColumn().type();
591+
})
592+
.collect(toImmutableList());
603593

604594
PartitionFunction partitionFunction;
605595
Optional<SkewedPartitionRebalancer> skewedPartitionRebalancer = Optional.empty();
@@ -3706,7 +3696,6 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan
37063696
node.getPartitioningScheme().getPartitioning().getHandle(),
37073697
ImmutableList.of(),
37083698
ImmutableList.of(),
3709-
Optional.empty(),
37103699
maxLocalExchangeBufferSize,
37113700
typeOperators,
37123701
getWriterScalingMinDataProcessed(session),
@@ -3761,8 +3750,6 @@ else if (context.getDriverInstanceCount().isPresent()) {
37613750
List<Integer> partitionChannels = node.getPartitioningScheme().getPartitioning().getArguments().stream()
37623751
.map(argument -> node.getOutputSymbols().indexOf(argument.getColumn()))
37633752
.collect(toImmutableList());
3764-
Optional<Integer> hashChannel = node.getPartitioningScheme().getHashColumn()
3765-
.map(symbol -> node.getOutputSymbols().indexOf(symbol));
37663753
List<Type> partitionChannelTypes = partitionChannels.stream()
37673754
.map(types::get)
37683755
.collect(toImmutableList());
@@ -3783,7 +3770,6 @@ else if (context.getDriverInstanceCount().isPresent()) {
37833770
node.getPartitioningScheme().getPartitioning().getHandle(),
37843771
partitionChannels,
37853772
partitionChannelTypes,
3786-
hashChannel,
37873773
maxLocalExchangeBufferSize,
37883774
typeOperators,
37893775
getWriterScalingMinDataProcessed(session),

core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,6 @@ else if (isUsePreferredWritePartitioning(session)) {
722722
partitioningScheme = Optional.of(new PartitioningScheme(
723723
Partitioning.create(FIXED_HASH_DISTRIBUTION, partitionFunctionArguments),
724724
outputLayout,
725-
Optional.empty(),
726725
false,
727726
Optional.empty(),
728727
maxWritersNodesCount));
@@ -1015,7 +1014,6 @@ else if (isUsePreferredWritePartitioning(session)) {
10151014
partitioningScheme = Optional.of(new PartitioningScheme(
10161015
Partitioning.create(FIXED_HASH_DISTRIBUTION, partitionFunctionArguments),
10171016
outputLayout,
1018-
Optional.empty(),
10191017
false,
10201018
Optional.empty(),
10211019
maxWritersNodesCount));

core/trino-main/src/main/java/io/trino/sql/planner/NodePartitioningManager.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ public PartitionFunction getPartitionFunction(
8989
if (partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle) {
9090
return ((SystemPartitioningHandle) partitioningHandle.getConnectorHandle()).getPartitionFunction(
9191
partitionChannelTypes,
92-
partitioningScheme.getHashColumn().isPresent(),
9392
bucketToPartition,
9493
typeOperators);
9594
}
@@ -111,7 +110,6 @@ public PartitionFunction getPartitionFunction(Session session, PartitioningSchem
111110
if (partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle handle) {
112111
return handle.getPartitionFunction(
113112
partitionChannelTypes,
114-
partitioningScheme.getHashColumn().isPresent(),
115113
bucketToPartition,
116114
typeOperators);
117115
}

core/trino-main/src/main/java/io/trino/sql/planner/PartitioningScheme.java

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ public class PartitioningScheme
3131
{
3232
private final Partitioning partitioning;
3333
private final List<Symbol> outputLayout;
34-
private final Optional<Symbol> hashColumn;
3534
private final boolean replicateNullsAndAny;
3635
private final Optional<int[]> bucketToPartition;
3736
private final Optional<Integer> partitionCount;
@@ -41,18 +40,6 @@ public PartitioningScheme(Partitioning partitioning, List<Symbol> outputLayout)
4140
this(
4241
partitioning,
4342
outputLayout,
44-
Optional.empty(),
45-
false,
46-
Optional.empty(),
47-
Optional.empty());
48-
}
49-
50-
public PartitioningScheme(Partitioning partitioning, List<Symbol> outputLayout, Optional<Symbol> hashColumn)
51-
{
52-
this(
53-
partitioning,
54-
outputLayout,
55-
hashColumn,
5643
false,
5744
Optional.empty(),
5845
Optional.empty());
@@ -62,7 +49,6 @@ public PartitioningScheme(Partitioning partitioning, List<Symbol> outputLayout,
6249
public PartitioningScheme(
6350
@JsonProperty("partitioning") Partitioning partitioning,
6451
@JsonProperty("outputLayout") List<Symbol> outputLayout,
65-
@JsonProperty("hashColumn") Optional<Symbol> hashColumn,
6652
@JsonProperty("replicateNullsAndAny") boolean replicateNullsAndAny,
6753
@JsonProperty("bucketToPartition") Optional<int[]> bucketToPartition,
6854
@JsonProperty("partitionCount") Optional<Integer> partitionCount)
@@ -74,10 +60,6 @@ public PartitioningScheme(
7460
checkArgument(ImmutableSet.copyOf(outputLayout).containsAll(columns),
7561
"Output layout (%s) don't include all partition columns (%s)", outputLayout, columns);
7662

77-
this.hashColumn = requireNonNull(hashColumn, "hashColumn is null");
78-
hashColumn.ifPresent(column -> checkArgument(outputLayout.contains(column),
79-
"Output layout (%s) don't include hash column (%s)", outputLayout, column));
80-
8163
checkArgument(!replicateNullsAndAny || columns.size() <= 1, "Must have at most one partitioning column when nullPartition is REPLICATE.");
8264
this.replicateNullsAndAny = replicateNullsAndAny;
8365
this.bucketToPartition = requireNonNull(bucketToPartition, "bucketToPartition is null");
@@ -99,12 +81,6 @@ public List<Symbol> getOutputLayout()
9981
return outputLayout;
10082
}
10183

102-
@JsonProperty
103-
public Optional<Symbol> getHashColumn()
104-
{
105-
return hashColumn;
106-
}
107-
10884
@JsonProperty
10985
public boolean isReplicateNullsAndAny()
11086
{
@@ -125,18 +101,18 @@ public Optional<Integer> getPartitionCount()
125101

126102
public PartitioningScheme withBucketToPartition(Optional<int[]> bucketToPartition)
127103
{
128-
return new PartitioningScheme(partitioning, outputLayout, hashColumn, replicateNullsAndAny, bucketToPartition, partitionCount);
104+
return new PartitioningScheme(partitioning, outputLayout, replicateNullsAndAny, bucketToPartition, partitionCount);
129105
}
130106

131107
public PartitioningScheme withPartitioningHandle(PartitioningHandle partitioningHandle)
132108
{
133109
Partitioning newPartitioning = partitioning.withAlternativePartitioningHandle(partitioningHandle);
134-
return new PartitioningScheme(newPartitioning, outputLayout, hashColumn, replicateNullsAndAny, bucketToPartition, partitionCount);
110+
return new PartitioningScheme(newPartitioning, outputLayout, replicateNullsAndAny, bucketToPartition, partitionCount);
135111
}
136112

137113
public PartitioningScheme withPartitionCount(Optional<Integer> partitionCount)
138114
{
139-
return new PartitioningScheme(partitioning, outputLayout, hashColumn, replicateNullsAndAny, bucketToPartition, partitionCount);
115+
return new PartitioningScheme(partitioning, outputLayout, replicateNullsAndAny, bucketToPartition, partitionCount);
140116
}
141117

142118
public PartitioningScheme translateOutputLayout(List<Symbol> newOutputLayout)
@@ -147,11 +123,7 @@ public PartitioningScheme translateOutputLayout(List<Symbol> newOutputLayout)
147123

148124
Partitioning newPartitioning = partitioning.translate(symbol -> newOutputLayout.get(outputLayout.indexOf(symbol)));
149125

150-
Optional<Symbol> newHashSymbol = hashColumn
151-
.map(outputLayout::indexOf)
152-
.map(newOutputLayout::get);
153-
154-
return new PartitioningScheme(newPartitioning, newOutputLayout, newHashSymbol, replicateNullsAndAny, bucketToPartition, partitionCount);
126+
return new PartitioningScheme(newPartitioning, newOutputLayout, replicateNullsAndAny, bucketToPartition, partitionCount);
155127
}
156128

157129
@Override
@@ -183,7 +155,6 @@ public String toString()
183155
return toStringHelper(this)
184156
.add("partitioning", partitioning)
185157
.add("outputLayout", outputLayout)
186-
.add("hashChannel", hashColumn)
187158
.add("replicateNullsAndAny", replicateNullsAndAny)
188159
.add("bucketToPartition", bucketToPartition)
189160
.add("partitionCount", partitionCount)

core/trino-main/src/main/java/io/trino/sql/planner/PlanFragmenter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ private SubPlan reassignPartitioningHandleIfNecessaryHelper(Session session, Sub
224224
new PartitioningScheme(
225225
newOutputPartitioning,
226226
outputPartitioningScheme.getOutputLayout(),
227-
outputPartitioningScheme.getHashColumn(),
228227
outputPartitioningScheme.isReplicateNullsAndAny(),
229228
outputPartitioningScheme.getBucketToPartition(),
230229
outputPartitioningScheme.getPartitionCount()),

core/trino-main/src/main/java/io/trino/sql/planner/SystemPartitioningHandle.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import com.fasterxml.jackson.annotation.JsonProperty;
1818
import io.trino.operator.BucketPartitionFunction;
1919
import io.trino.operator.PartitionFunction;
20-
import io.trino.operator.PrecomputedHashGenerator;
2120
import io.trino.spi.Page;
2221
import io.trino.spi.connector.BucketFunction;
2322
import io.trino.spi.connector.ConnectorPartitioningHandle;
@@ -136,60 +135,55 @@ public String toString()
136135
return partitioning.toString();
137136
}
138137

139-
public PartitionFunction getPartitionFunction(List<Type> partitionChannelTypes, boolean isHashPrecomputed, int[] bucketToPartition, TypeOperators typeOperators)
138+
public PartitionFunction getPartitionFunction(List<Type> partitionChannelTypes, int[] bucketToPartition, TypeOperators typeOperators)
140139
{
141140
requireNonNull(partitionChannelTypes, "partitionChannelTypes is null");
142141
requireNonNull(bucketToPartition, "bucketToPartition is null");
143142

144-
BucketFunction bucketFunction = function.createBucketFunction(partitionChannelTypes, isHashPrecomputed, bucketToPartition.length, typeOperators);
143+
BucketFunction bucketFunction = function.createBucketFunction(partitionChannelTypes, bucketToPartition.length, typeOperators);
145144
return new BucketPartitionFunction(bucketFunction, bucketToPartition);
146145
}
147146

148147
public enum SystemPartitionFunction
149148
{
150149
SINGLE {
151150
@Override
152-
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, boolean isHashPrecomputed, int bucketCount, TypeOperators typeOperators)
151+
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, int bucketCount, TypeOperators typeOperators)
153152
{
154153
checkArgument(bucketCount == 1, "Single partition can only have one bucket");
155154
return new SingleBucketFunction();
156155
}
157156
},
158157
HASH {
159158
@Override
160-
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, boolean isHashPrecomputed, int bucketCount, TypeOperators typeOperators)
159+
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, int bucketCount, TypeOperators typeOperators)
161160
{
162-
if (isHashPrecomputed) {
163-
return new HashBucketFunction(new PrecomputedHashGenerator(0), bucketCount);
164-
}
165-
166161
return new HashBucketFunction(createPagePrefixHashGenerator(partitionChannelTypes, typeOperators), bucketCount);
167162
}
168163
},
169164
ROUND_ROBIN {
170165
@Override
171-
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, boolean isHashPrecomputed, int bucketCount, TypeOperators typeOperators)
166+
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, int bucketCount, TypeOperators typeOperators)
172167
{
173168
return new RoundRobinBucketFunction(bucketCount);
174169
}
175170
},
176171
BROADCAST {
177172
@Override
178-
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, boolean isHashPrecomputed, int bucketCount, TypeOperators typeOperators)
173+
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, int bucketCount, TypeOperators typeOperators)
179174
{
180175
throw new UnsupportedOperationException();
181176
}
182177
},
183178
UNKNOWN {
184179
@Override
185-
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, boolean isHashPrecomputed, int bucketCount, TypeOperators typeOperators)
180+
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, int bucketCount, TypeOperators typeOperators)
186181
{
187182
throw new UnsupportedOperationException();
188183
}
189184
};
190185

191186
public abstract BucketFunction createBucketFunction(List<Type> partitionChannelTypes,
192-
boolean isHashPrecomputed,
193187
int bucketCount,
194188
TypeOperators typeOperators);
195189

0 commit comments

Comments
 (0)