Skip to content

Commit 70975b2

Browse files
barakbndannycranmer
authored andcommitted
[FLINK-34429][flink-kubernetes] Setting annotations on internal service.
[FLINK-34429][docs] Updating configuration to include kubernetes.internal-service.annotations key. [FLINK-34429][flink-kubernetes] Kubernetes side parameters utility to retrieve INTERNAL_SERVICE_ANNOTATIONS from configuration [FLINK-34429][flink-kubernetes] Test for pulling Internal-Service Annotations parameter (KubernetesJobManagerParameters) [FLINK-34429][flink-kubernetes] Testing that internal-service is created with configured annotations
1 parent a9d9bab commit 70975b2

File tree

7 files changed

+39
-1
lines changed

7 files changed

+39
-1
lines changed

docs/layouts/shortcodes/generated/kubernetes_config_configuration.html

+6
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@
110110
<td>Boolean</td>
111111
<td>Whether to enable HostNetwork mode. The HostNetwork allows the pod could use the node network namespace instead of the individual pod network namespace. Please note that the JobManager service account should have the permission to update Kubernetes service.</td>
112112
</tr>
113+
<tr>
114+
<td><h5>kubernetes.internal-service.annotations</h5></td>
115+
<td style="word-wrap: break-word;">(none)</td>
116+
<td>Map</td>
117+
<td>The user-specified annotations that are set to the internal Service. The value should be in the form of a1:v1,a2:v2</td>
118+
</tr>
113119
<tr>
114120
<td><h5>kubernetes.jobmanager.annotations</h5></td>
115121
<td style="word-wrap: break-word;">(none)</td>

flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java

+8
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,14 @@ public class KubernetesConfigOptions {
377377
"The user-specified annotations that are set to the rest Service. The value should be "
378378
+ "in the form of a1:v1,a2:v2");
379379

380+
public static final ConfigOption<Map<String, String>> INTERNAL_SERVICE_ANNOTATIONS =
381+
key("kubernetes.internal-service.annotations")
382+
.mapType()
383+
.noDefaultValue()
384+
.withDescription(
385+
"The user-specified annotations that are set to the internal Service. The value should be "
386+
+ "in the form of a1:v1,a2:v2");
387+
380388
/**
381389
* Defines the configuration key of that external resource in Kubernetes. This is used as a
382390
* suffix in an actual config.

flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java

+6
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ public Map<String, String> getRestServiceAnnotations() {
118118
.orElse(Collections.emptyMap());
119119
}
120120

121+
public Map<String, String> getInternalServiceAnnotations() {
122+
return flinkConfig
123+
.getOptional(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS)
124+
.orElse(Collections.emptyMap());
125+
}
126+
121127
public int getJobManagerMemoryMB() {
122128
return clusterSpecification.getMasterMemoryMB();
123129
}

flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public Service buildUpInternalService(
5050
.withNewMetadata()
5151
.withName(serviceName)
5252
.withLabels(kubernetesJobManagerParameters.getCommonLabels())
53+
.withAnnotations(kubernetesJobManagerParameters.getInternalServiceAnnotations())
5354
.endMetadata()
5455
.withNewSpec()
5556
.withClusterIP(HEADLESS_CLUSTER_IP)

flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ protected void setupFlinkConfig() {
6363
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + k, v));
6464
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_LABELS, userLabels);
6565
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, userAnnotations);
66+
this.flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS, userAnnotations);
6667
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR, nodeSelector);
6768
this.flinkConfig.set(
6869
JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY));

flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ void testBuildAccompanyingKubernetesResources() throws IOException {
6969
final Map<String, String> expectedLabels = getCommonLabels();
7070
assertThat(internalService.getMetadata().getLabels()).isEqualTo(expectedLabels);
7171

72+
assertThat(internalService.getMetadata().getAnnotations()).isEqualTo(userAnnotations);
73+
7274
assertThat(internalService.getSpec().getType()).isNull();
7375
assertThat(internalService.getSpec().getClusterIP()).isEqualTo("None");
7476

flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ void testGetJobManagerAnnotations() {
9696
}
9797

9898
@Test
99-
void testGetServiceAnnotations() {
99+
void testGetRestServiceAnnotations() {
100100
final Map<String, String> expectedAnnotations = new HashMap<>();
101101
expectedAnnotations.put("a1", "v1");
102102
expectedAnnotations.put("a2", "v2");
@@ -109,6 +109,20 @@ void testGetServiceAnnotations() {
109109
assertThat(resultAnnotations).isEqualTo(expectedAnnotations);
110110
}
111111

112+
@Test
113+
void testGetInternalServiceAnnotations() {
114+
final Map<String, String> expectedAnnotations = new HashMap<>();
115+
expectedAnnotations.put("a1", "v1");
116+
expectedAnnotations.put("a2", "v2");
117+
118+
flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS, expectedAnnotations);
119+
120+
final Map<String, String> resultAnnotations =
121+
kubernetesJobManagerParameters.getInternalServiceAnnotations();
122+
123+
assertThat(resultAnnotations).isEqualTo(expectedAnnotations);
124+
}
125+
112126
@Test
113127
void testGetJobManagerMemoryMB() {
114128
assertThat(kubernetesJobManagerParameters.getJobManagerMemoryMB())

0 commit comments

Comments
 (0)