Skip to content

Commit a447096

Browse files
JunRuiLeezhuzhurk
authored andcommitted
[FLINK-33985][runtime] Support obtain all partitions existing in cluster through ShuffleMaster.
This closes apache#24553.
1 parent ec1311c commit a447096

27 files changed

+556
-0
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java

+7
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
4646
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
4747
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
48+
import org.apache.flink.runtime.shuffle.ShuffleMetrics;
4849
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
4950
import org.apache.flink.util.Preconditions;
5051

@@ -198,6 +199,12 @@ public Collection<ResultPartitionID> getPartitionsOccupyingLocalResources() {
198199
return resultPartitionManager.getUnreleasedPartitions();
199200
}
200201

202+
@Override
203+
public Optional<ShuffleMetrics> getMetricsIfPartitionOccupyingLocalResource(
204+
ResultPartitionID partitionId) {
205+
return resultPartitionManager.getMetricsOfPartition(partitionId);
206+
}
207+
201208
// --------------------------------------------------------------------------------------------
202209
// Create Output Writers and Input Readers
203210
// --------------------------------------------------------------------------------------------

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java

+4
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,10 @@ public ResultPartitionType getPartitionType() {
215215
return partitionType;
216216
}
217217

218+
public ResultPartitionBytesCounter getResultPartitionBytes() {
219+
return resultPartitionBytes;
220+
}
221+
218222
// ------------------------------------------------------------------------
219223

220224
@Override

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java

+16
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.flink.runtime.io.network.partition;
2020

2121
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.runtime.shuffle.DefaultShuffleMetrics;
23+
import org.apache.flink.runtime.shuffle.ShuffleMetrics;
2224
import org.apache.flink.util.CollectionUtil;
2325
import org.apache.flink.util.concurrent.ScheduledExecutor;
2426

@@ -293,4 +295,18 @@ public Collection<ResultPartitionID> getUnreleasedPartitions() {
293295
return registeredPartitions.keySet();
294296
}
295297
}
298+
299+
public Optional<ShuffleMetrics> getMetricsOfPartition(ResultPartitionID partitionId) {
300+
synchronized (registeredPartitions) {
301+
final ResultPartition partition = registeredPartitions.get(partitionId);
302+
303+
if (partition == null) {
304+
return Optional.empty();
305+
}
306+
307+
return Optional.of(
308+
new DefaultShuffleMetrics(
309+
partition.getResultPartitionBytes().createSnapshot()));
310+
}
311+
}
296312
}

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTracker.java

+8
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ public interface TaskExecutorPartitionTracker
4444
*/
4545
void stopTrackingAndReleaseJobPartitionsFor(JobID producingJobId);
4646

47+
/**
48+
* Get all partitions tracked for the given job.
49+
*
50+
* @param producingJobId the job id
51+
* @return the tracked partitions
52+
*/
53+
Collection<TaskExecutorPartitionInfo> getTrackedPartitionsFor(JobID producingJobId);
54+
4755
/** Promotes the given partitions. */
4856
void promoteJobPartitions(Collection<ResultPartitionID> partitionsToPromote);
4957

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java

+15
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import java.util.Set;
3636
import java.util.stream.Collectors;
3737

38+
import static java.util.stream.Collectors.toList;
39+
3840
/**
3941
* Utility for tracking partitions and issuing release calls to task executors and shuffle masters.
4042
*/
@@ -83,6 +85,19 @@ public void stopTrackingAndReleaseJobPartitionsFor(JobID producingJobId) {
8385
shuffleEnvironment.releasePartitionsLocally(partitionsForJob);
8486
}
8587

88+
@Override
89+
public Collection<TaskExecutorPartitionInfo> getTrackedPartitionsFor(JobID producingJobId) {
90+
return partitionTable.getTrackedPartitions(producingJobId).stream()
91+
.map(
92+
partitionId -> {
93+
final PartitionInfo<JobID, TaskExecutorPartitionInfo> partitionInfo =
94+
partitionInfos.get(partitionId);
95+
Preconditions.checkNotNull(partitionInfo);
96+
return partitionInfo.getMetaInfo();
97+
})
98+
.collect(toList());
99+
}
100+
86101
@Override
87102
public void promoteJobPartitions(Collection<ResultPartitionID> partitionsToPromote) {
88103
LOG.debug("Promoting Job Partitions {}", partitionsToPromote);

flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java

+24
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.apache.flink.runtime.scheduler.SchedulerNG;
9191
import org.apache.flink.runtime.shuffle.JobShuffleContext;
9292
import org.apache.flink.runtime.shuffle.JobShuffleContextImpl;
93+
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
9394
import org.apache.flink.runtime.shuffle.ShuffleMaster;
9495
import org.apache.flink.runtime.slots.ResourceRequirement;
9596
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -115,9 +116,11 @@
115116

116117
import java.io.IOException;
117118
import java.net.InetSocketAddress;
119+
import java.util.ArrayList;
118120
import java.util.Collection;
119121
import java.util.HashMap;
120122
import java.util.HashSet;
123+
import java.util.List;
121124
import java.util.Map;
122125
import java.util.Objects;
123126
import java.util.Optional;
@@ -957,6 +960,27 @@ public CompletableFuture<?> stopTrackingAndReleasePartitions(
957960
return future;
958961
}
959962

963+
@Override
964+
public CompletableFuture<Collection<PartitionWithMetrics>>
965+
getAllPartitionWithMetricsOnTaskManagers() {
966+
final List<CompletableFuture<Collection<PartitionWithMetrics>>> allFutures =
967+
new ArrayList<>();
968+
registeredTaskManagers
969+
.values()
970+
.forEach(
971+
taskManager ->
972+
allFutures.add(
973+
taskManager
974+
.getTaskExecutorGateway()
975+
.getPartitionWithMetrics(jobGraph.getJobID())));
976+
return FutureUtils.combineAll(allFutures)
977+
.thenApply(
978+
partitions ->
979+
partitions.stream()
980+
.flatMap(Collection::stream)
981+
.collect(Collectors.toList()));
982+
}
983+
960984
@Override
961985
public CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNode> newNodes) {
962986
blocklistHandler.addNewBlockedNodes(newNodes);

flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java

+7
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.flink.runtime.rpc.FencedRpcGateway;
4747
import org.apache.flink.runtime.rpc.RpcTimeout;
4848
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
49+
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
4950
import org.apache.flink.runtime.slots.ResourceRequirement;
5051
import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
5152
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -55,6 +56,7 @@
5556
import javax.annotation.Nullable;
5657

5758
import java.util.Collection;
59+
import java.util.Collections;
5860
import java.util.concurrent.CompletableFuture;
5961

6062
/** {@link JobMaster} rpc gateway interface. */
@@ -301,6 +303,11 @@ CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
301303
CompletableFuture<?> stopTrackingAndReleasePartitions(
302304
Collection<ResultPartitionID> partitionIds);
303305

306+
default CompletableFuture<Collection<PartitionWithMetrics>>
307+
getAllPartitionWithMetricsOnTaskManagers() {
308+
return CompletableFuture.completedFuture(Collections.emptyList());
309+
}
310+
304311
/**
305312
* Read current {@link JobResourceRequirements job resource requirements}.
306313
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.shuffle;
20+
21+
import static org.apache.flink.util.Preconditions.checkNotNull;
22+
23+
/** Default {@link PartitionWithMetrics} implementation. */
24+
public class DefaultPartitionWithMetrics implements PartitionWithMetrics {
25+
private final ShuffleDescriptor shuffleDescriptor;
26+
private final ShuffleMetrics partitionMetrics;
27+
28+
public DefaultPartitionWithMetrics(
29+
ShuffleDescriptor shuffleDescriptor, ShuffleMetrics partitionMetrics) {
30+
this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
31+
this.partitionMetrics = checkNotNull(partitionMetrics);
32+
}
33+
34+
@Override
35+
public ShuffleMetrics getPartitionMetrics() {
36+
return partitionMetrics;
37+
}
38+
39+
@Override
40+
public ShuffleDescriptor getPartition() {
41+
return shuffleDescriptor;
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.shuffle;
20+
21+
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
22+
23+
import static org.apache.flink.util.Preconditions.checkNotNull;
24+
25+
/** Default {@link ShuffleMetrics} implementation. */
26+
public class DefaultShuffleMetrics implements ShuffleMetrics {
27+
private final ResultPartitionBytes partitionBytes;
28+
29+
public DefaultShuffleMetrics(ResultPartitionBytes partitionBytes) {
30+
this.partitionBytes = checkNotNull(partitionBytes);
31+
}
32+
33+
@Override
34+
public ResultPartitionBytes getPartitionBytes() {
35+
return partitionBytes;
36+
}
37+
}

flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContext.java

+6
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,10 @@ public interface JobShuffleContext {
3939
*/
4040
CompletableFuture<?> stopTrackingAndReleasePartitions(
4141
Collection<ResultPartitionID> partitionIds);
42+
43+
/**
44+
* Retrieves a collection containing descriptions and metrics of existing result partitions from
45+
* all TaskManagers.
46+
*/
47+
CompletableFuture<Collection<PartitionWithMetrics>> getAllPartitionWithMetricsOnTaskManagers();
4248
}

flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContextImpl.java

+6
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,10 @@ public CompletableFuture<?> stopTrackingAndReleasePartitions(
4949
Collection<ResultPartitionID> partitionIds) {
5050
return jobMasterGateway.stopTrackingAndReleasePartitions(partitionIds);
5151
}
52+
53+
@Override
54+
public CompletableFuture<Collection<PartitionWithMetrics>>
55+
getAllPartitionWithMetricsOnTaskManagers() {
56+
return jobMasterGateway.getAllPartitionWithMetricsOnTaskManagers();
57+
}
5258
}

flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java

+22
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131

3232
import javax.annotation.Nullable;
3333

34+
import java.util.Collection;
35+
import java.util.HashMap;
36+
import java.util.Map;
3437
import java.util.Optional;
3538
import java.util.concurrent.CompletableFuture;
3639

@@ -58,6 +61,8 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor>
5861

5962
@Nullable private final TieredInternalShuffleMaster tieredInternalShuffleMaster;
6063

64+
private final Map<JobID, JobShuffleContext> jobShuffleContexts = new HashMap<>();
65+
6166
public NettyShuffleMaster(Configuration conf) {
6267
checkNotNull(conf);
6368
buffersPerInputChannel =
@@ -165,4 +170,21 @@ private boolean isHybridShuffleNewModeEnabled(Configuration conf) {
165170
|| conf.get(BATCH_SHUFFLE_MODE) == ALL_EXCHANGES_HYBRID_SELECTIVE)
166171
&& conf.get(NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE);
167172
}
173+
174+
@Override
175+
public CompletableFuture<Collection<PartitionWithMetrics>> getAllPartitionWithMetrics(
176+
JobID jobId) {
177+
return checkNotNull(jobShuffleContexts.get(jobId))
178+
.getAllPartitionWithMetricsOnTaskManagers();
179+
}
180+
181+
@Override
182+
public void registerJob(JobShuffleContext context) {
183+
jobShuffleContexts.put(context.getJobId(), context);
184+
}
185+
186+
@Override
187+
public void unregisterJob(JobID jobId) {
188+
jobShuffleContexts.remove(jobId);
189+
}
168190
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.shuffle;
20+
21+
import java.io.Serializable;
22+
23+
/** Interface representing the description and metrics of a result partition. */
24+
public interface PartitionWithMetrics extends Serializable {
25+
ShuffleMetrics getPartitionMetrics();
26+
27+
ShuffleDescriptor getPartition();
28+
}

flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java

+13
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.io.IOException;
3434
import java.util.Collection;
3535
import java.util.List;
36+
import java.util.Optional;
3637

3738
/**
3839
* Interface for the implementation of shuffle service local environment.
@@ -162,6 +163,18 @@ List<P> createResultPartitionWriters(
162163
*/
163164
Collection<ResultPartitionID> getPartitionsOccupyingLocalResources();
164165

166+
/**
167+
* Get metrics of the partition if it still occupies some resources locally and have not been
168+
* released yet.
169+
*
170+
* @param partitionId the partition id
171+
* @return An Optional of {@link ShuffleMetrics}, if found, of the given partition
172+
*/
173+
default Optional<ShuffleMetrics> getMetricsIfPartitionOccupyingLocalResource(
174+
ResultPartitionID partitionId) {
175+
return Optional.empty();
176+
}
177+
165178
/**
166179
* Factory method for the {@link InputGate InputGates} to consume result partitions.
167180
*

0 commit comments

Comments
 (0)