Skip to content

Commit 01fccd3

Browse files
authored
KAFKA-15186 AppInfo metrics don't contain the client-id (#20493)
All Kafka component register AppInfo metrics to track the application start time or commit-id etc. These metrics are useful for monitoring and debugging. However, the AppInfo doesn't provide client-id, which is an important information for custom metrics reporter. The AppInfoParser class registers a JMX MBean with the provided client-id, but when it adds metrics to the Metrics registry, the client-id is not included. This KIP aims to add the client-id as a tag. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 07b786e commit 01fccd3

File tree

3 files changed

+111
-35
lines changed

3 files changed

+111
-35
lines changed

clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.io.InputStream;
2727
import java.lang.management.ManagementFactory;
28+
import java.util.Map;
2829
import java.util.Properties;
2930

3031
import javax.management.JMException;
@@ -68,7 +69,7 @@ public static synchronized void registerAppInfo(String prefix, String id, Metric
6869
AppInfo mBean = new AppInfo(nowMs);
6970
server.registerMBean(mBean, name);
7071

71-
registerMetrics(metrics, mBean); // prefix will be added later by JmxReporter
72+
registerMetrics(metrics, mBean, id); // prefix will be added later by JmxReporter
7273
} catch (JMException e) {
7374
log.warn("Error registering AppInfo mbean", e);
7475
}
@@ -81,31 +82,44 @@ public static synchronized void unregisterAppInfo(String prefix, String id, Metr
8182
if (server.isRegistered(name))
8283
server.unregisterMBean(name);
8384

84-
unregisterMetrics(metrics);
85+
unregisterMetrics(metrics, id);
8586
} catch (JMException e) {
8687
log.warn("Error unregistering AppInfo mbean", e);
8788
} finally {
8889
log.info("App info {} for {} unregistered", prefix, id);
8990
}
9091
}
9192

92-
private static MetricName metricName(Metrics metrics, String name) {
93-
return metrics.metricName(name, "app-info", "Metric indicating " + name);
93+
private static MetricName metricName(Metrics metrics, String name, Map<String, String> tags) {
94+
return metrics.metricName(name, "app-info", "Metric indicating " + name, tags);
9495
}
9596

96-
private static void registerMetrics(Metrics metrics, AppInfo appInfo) {
97-
if (metrics != null) {
98-
metrics.addMetric(metricName(metrics, "version"), (Gauge<String>) (config, now) -> appInfo.getVersion());
99-
metrics.addMetric(metricName(metrics, "commit-id"), (Gauge<String>) (config, now) -> appInfo.getCommitId());
100-
metrics.addMetric(metricName(metrics, "start-time-ms"), (Gauge<Long>) (config, now) -> appInfo.getStartTimeMs());
97+
private static void registerMetrics(Metrics metrics, AppInfo appInfo, String clientId) {
98+
if (metrics == null) return;
99+
// Most Kafka clients (producer/consumer/admin) set the client-id tag in the metrics config.
100+
// Although we don’t explicitly parse client-id here, these metrics are automatically tagged with client-id.
101+
metrics.addMetric(metricName(metrics, "version", Map.of()), (Gauge<String>) (config, now) -> appInfo.getVersion());
102+
metrics.addMetric(metricName(metrics, "commit-id", Map.of()), (Gauge<String>) (config, now) -> appInfo.getCommitId());
103+
metrics.addMetric(metricName(metrics, "start-time-ms", Map.of()), (Gauge<Long>) (config, now) -> appInfo.getStartTimeMs());
104+
// MirrorMaker/Worker doesn't set client-id tag into the metrics config, so we need to set it here.
105+
if (!metrics.config().tags().containsKey("client-id") && clientId != null) {
106+
metrics.addMetric(metricName(metrics, "version", Map.of("client-id", clientId)), (Gauge<String>) (config, now) -> appInfo.getVersion());
107+
metrics.addMetric(metricName(metrics, "commit-id", Map.of("client-id", clientId)), (Gauge<String>) (config, now) -> appInfo.getCommitId());
108+
metrics.addMetric(metricName(metrics, "start-time-ms", Map.of("client-id", clientId)), (Gauge<Long>) (config, now) -> appInfo.getStartTimeMs());
101109
}
102110
}
103111

104-
private static void unregisterMetrics(Metrics metrics) {
105-
if (metrics != null) {
106-
metrics.removeMetric(metricName(metrics, "version"));
107-
metrics.removeMetric(metricName(metrics, "commit-id"));
108-
metrics.removeMetric(metricName(metrics, "start-time-ms"));
112+
private static void unregisterMetrics(Metrics metrics, String clientId) {
113+
if (metrics == null) return;
114+
115+
metrics.removeMetric(metricName(metrics, "version", Map.of()));
116+
metrics.removeMetric(metricName(metrics, "commit-id", Map.of()));
117+
metrics.removeMetric(metricName(metrics, "start-time-ms", Map.of()));
118+
119+
if (!metrics.config().tags().containsKey("client-id") && clientId != null) {
120+
metrics.removeMetric(metricName(metrics, "version", Map.of("client-id", clientId)));
121+
metrics.removeMetric(metricName(metrics, "commit-id", Map.of("client-id", clientId)));
122+
metrics.removeMetric(metricName(metrics, "start-time-ms", Map.of("client-id", clientId)));
109123
}
110124
}
111125

clients/src/test/java/org/apache/kafka/common/utils/AppInfoParserTest.java

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
*/
1717
package org.apache.kafka.common.utils;
1818

19+
import org.apache.kafka.common.metrics.MetricConfig;
1920
import org.apache.kafka.common.metrics.Metrics;
2021

2122
import org.junit.jupiter.api.AfterEach;
2223
import org.junit.jupiter.api.BeforeEach;
2324
import org.junit.jupiter.api.Test;
2425

2526
import java.lang.management.ManagementFactory;
27+
import java.util.Map;
2628

2729
import javax.management.JMException;
2830
import javax.management.MBeanServer;
@@ -41,38 +43,49 @@ public class AppInfoParserTest {
4143
private static final String METRICS_PREFIX = "app-info-test";
4244
private static final String METRICS_ID = "test";
4345

44-
private Metrics metrics;
4546
private MBeanServer mBeanServer;
4647

4748
@BeforeEach
4849
public void setUp() {
49-
metrics = new Metrics(new MockTime(1));
5050
mBeanServer = ManagementFactory.getPlatformMBeanServer();
5151
}
5252

5353
@AfterEach
54-
public void tearDown() {
55-
metrics.close();
54+
public void tearDown() throws JMException {
55+
if (mBeanServer.isRegistered(expectedAppObjectName())) {
56+
mBeanServer.unregisterMBean(expectedAppObjectName());
57+
}
5658
}
5759

5860
@Test
5961
public void testRegisterAppInfoRegistersMetrics() throws JMException {
60-
registerAppInfo();
61-
registerAppInfoMultipleTimes();
62+
try (Metrics metrics = new Metrics(new MockTime(1))) {
63+
registerAppInfo(metrics);
64+
registerAppInfoMultipleTimes(metrics);
65+
AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics);
66+
}
6267
}
6368

6469
@Test
6570
public void testUnregisterAppInfoUnregistersMetrics() throws JMException {
66-
registerAppInfo();
67-
AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics);
68-
69-
assertFalse(mBeanServer.isRegistered(expectedAppObjectName()));
70-
assertNull(metrics.metric(metrics.metricName("commit-id", "app-info")));
71-
assertNull(metrics.metric(metrics.metricName("version", "app-info")));
72-
assertNull(metrics.metric(metrics.metricName("start-time-ms", "app-info")));
71+
try (Metrics metrics = new Metrics(new MockTime(1))) {
72+
registerAppInfo(metrics);
73+
AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics);
74+
75+
assertFalse(mBeanServer.isRegistered(expectedAppObjectName()));
76+
assertNull(metrics.metric(metrics.metricName("commit-id", "app-info")));
77+
assertNull(metrics.metric(metrics.metricName("version", "app-info")));
78+
assertNull(metrics.metric(metrics.metricName("start-time-ms", "app-info")));
79+
80+
Map<String, String> idTag = Map.of("client-id", METRICS_ID);
81+
assertNull(metrics.metric(metrics.metricName("commit-id", "app-info", idTag)));
82+
assertNull(metrics.metric(metrics.metricName("version", "app-info", idTag)));
83+
assertNull(metrics.metric(metrics.metricName("start-time-ms", "app-info", idTag)));
84+
AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics);
85+
}
7386
}
7487

75-
private void registerAppInfo() throws JMException {
88+
private void registerAppInfo(Metrics metrics) throws JMException {
7689
assertEquals(EXPECTED_COMMIT_VERSION, AppInfoParser.getCommitId());
7790
assertEquals(EXPECTED_VERSION, AppInfoParser.getVersion());
7891

@@ -82,9 +95,15 @@ private void registerAppInfo() throws JMException {
8295
assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info")).metricValue());
8396
assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info")).metricValue());
8497
assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info")).metricValue());
98+
99+
Map<String, String> idTag = Map.of("client-id", METRICS_ID);
100+
assertTrue(mBeanServer.isRegistered(expectedAppObjectName()));
101+
assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info", idTag)).metricValue());
102+
assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info", idTag)).metricValue());
103+
assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info", idTag)).metricValue());
85104
}
86105

87-
private void registerAppInfoMultipleTimes() throws JMException {
106+
private void registerAppInfoMultipleTimes(Metrics metrics) throws JMException {
88107
assertEquals(EXPECTED_COMMIT_VERSION, AppInfoParser.getCommitId());
89108
assertEquals(EXPECTED_VERSION, AppInfoParser.getVersion());
90109

@@ -95,9 +114,37 @@ private void registerAppInfoMultipleTimes() throws JMException {
95114
assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info")).metricValue());
96115
assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info")).metricValue());
97116
assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info")).metricValue());
117+
118+
Map<String, String> idTag = Map.of("client-id", METRICS_ID);
119+
assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info", idTag)).metricValue());
120+
assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info", idTag)).metricValue());
121+
assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info", idTag)).metricValue());
98122
}
99123

100124
private ObjectName expectedAppObjectName() throws MalformedObjectNameException {
101125
return new ObjectName(METRICS_PREFIX + ":type=app-info,id=" + METRICS_ID);
102126
}
127+
128+
@Test
129+
public void testClientIdWontAddRepeatedly() throws JMException {
130+
Map<String, String> tags = Map.of(
131+
"client-id", METRICS_ID,
132+
"other-tag", "tag-value",
133+
"another-tag", "another-value"
134+
);
135+
Metrics metrics = new Metrics(new MetricConfig().tags(tags), new MockTime(1));
136+
AppInfoParser.registerAppInfo(METRICS_PREFIX, METRICS_ID, metrics, EXPECTED_START_MS);
137+
138+
assertTrue(mBeanServer.isRegistered(expectedAppObjectName()));
139+
assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info", tags)).metricValue());
140+
assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info", tags)).metricValue());
141+
assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info", tags)).metricValue());
142+
143+
Map<String, String> idTag = Map.of("client-id", METRICS_ID);
144+
assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info", idTag)).metricValue());
145+
assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info", idTag)).metricValue());
146+
assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info", idTag)).metricValue());
147+
metrics.close();
148+
AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics);
149+
}
103150
}

docs/upgrade.html

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,21 @@ <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4
147147
and defaults to false.
148148
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/nyH1D">KIP-853</a>.
149149
</li>
150+
<li>
151+
The AppInfo metrics will deprecate the following metric names, which will be removed in Kafka 5.0:
152+
<ul>
153+
<li><code>[name=start-time-ms, group=app-info, description=Metric indicating start-time-ms, tags={}]</code></li>
154+
<li><code>[name=commit-id, group=app-info, description=Metric indicating commit-id, tags={}]</code></li>
155+
<li><code>[name=version, group=app-info, description=Metric indicating version, tags={}]</code></li>
156+
</ul>
157+
In addition, the <code>client-id</code> will be added to the tags of these metrics. The new metric names will be:
158+
<ul>
159+
<li><code>[name=start-time-ms, group=app-info, description=Metric indicating start-time-ms, tags={client-id=...}]</code></li>
160+
<li><code>[name=commit-id, group=app-info, description=Metric indicating commit-id, tags={client-id=...}]</code></li>
161+
<li><code>[name=version, group=app-info, description=Metric indicating version, tags={client-id=...}]</code></li>
162+
</ul>
163+
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/3gn0Ew">KIP-1120</a>.
164+
</li>
150165
</ul>
151166

152167
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
@@ -423,12 +438,12 @@ <h5><a id="upgrade_servers_400_notable" href="#upgrade_servers_400_notable">Nota
423438
<li>The <code>--whitelist</code> option was removed from the <code>kafka-console-consumer</code> command line tool.
424439
Please use <code>--include</code> instead.
425440
</li>
426-
<li>Redirections from the old tools packages have been removed:
427-
<code>kafka.admin.FeatureCommand</code>,
428-
<code>kafka.tools.ClusterTool</code>,
441+
<li>Redirections from the old tools packages have been removed:
442+
<code>kafka.admin.FeatureCommand</code>,
443+
<code>kafka.tools.ClusterTool</code>,
429444
<code>kafka.tools.EndToEndLatency</code>,
430-
<code>kafka.tools.StateChangeLogMerger</code>,
431-
<code>kafka.tools.StreamsResetter</code>,
445+
<code>kafka.tools.StateChangeLogMerger</code>,
446+
<code>kafka.tools.StreamsResetter</code>,
432447
<code>kafka.tools.JmxTool</code>.
433448
</li>
434449
<li>The <code>--authorizer</code>, <code>--authorizer-properties</code>, and <code>--zk-tls-config-file</code> options were removed from the <code>kafka-acls</code> command line tool.
@@ -492,7 +507,7 @@ <h5><a id="upgrade_servers_400_notable" href="#upgrade_servers_400_notable">Nota
492507
</li>
493508
<li>The deprecated <code>sendOffsetsToTransaction(Map&lt;TopicPartition, OffsetAndMetadata&gt;, String)</code> method has been removed from the Producer API.
494509
</li>
495-
<li>The default <code>linger.ms</code> changed from 0 to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically result in
510+
<li>The default <code>linger.ms</code> changed from 0 to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically result in
496511
similar or lower producer latency despite the increased linger.
497512
</li>
498513
</ul>

0 commit comments

Comments
 (0)