Skip to content

Commit d6a4eb9

Browse files
committed
[FLINK-34417] Log Job ID via MDC
1 parent 4171b98 commit d6a4eb9

File tree

31 files changed

+1148
-258
lines changed

31 files changed

+1148
-258
lines changed

docs/content.zh/docs/deployment/advanced/logging.md

+15
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,21 @@ Flink 中的日志记录是使用 [SLF4J](http://www.slf4j.org/) 日志接口实
4040

4141
<a name="configuring-log4j-2"></a>
4242

43+
### Structured logging
44+
45+
Flink adds the following fields to [MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log messages (experimental feature):
46+
- Job ID
47+
- key: `flink-job-id`
48+
- format: string
49+
- length 32
50+
51+
This is most useful in environments with structured logging and allows you to quickly filter the relevant logs.
52+
53+
The MDC is propagated by slf4j to the logging backend which usually adds it to the log records automatically (e.g. in [log4j json layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html)).
54+
Alternatively, it can be configured explicitly - [log4j pattern layout](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html) might look like this:
55+
56+
`[%-32X{flink-job-id}] %c{0} %m%n`.
57+
4358
## 配置 Log4j 2
4459

4560
Log4j 2 是通过 property 配置文件进行配置的。

docs/content/docs/deployment/advanced/logging.md

+14
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,20 @@ This allows you to use any logging framework that supports SLF4J, without having
3838
By default, [Log4j 2](https://logging.apache.org/log4j/2.x/index.html) is used as the underlying logging framework.
3939

4040

41+
### Structured logging
42+
43+
Flink adds the following fields to [MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log messages (experimental feature):
44+
- Job ID
45+
- key: `flink-job-id`
46+
- format: string
47+
- length 32
48+
49+
This is most useful in environments with structured logging and allows you to quickly filter the relevant logs.
50+
51+
The MDC is propagated by slf4j to the logging backend which usually adds it to the log records automatically (e.g. in [log4j json layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html)).
52+
Alternatively, it can be configured explicitly - [log4j pattern layout](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html) might look like this:
53+
54+
`[%-32X{flink-job-id}] %c{0} %m%n`.
4155

4256
## Configuring Log4j 2
4357

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.util;
20+
21+
import java.util.Collections;
22+
import java.util.Map;
23+
import java.util.concurrent.Executor;
24+
25+
import static org.apache.flink.util.Preconditions.checkNotNull;
26+
27+
class MdcAwareExecutor<T extends Executor> implements Executor {
28+
protected final Map<String, String> contextData;
29+
protected final T delegate;
30+
31+
protected MdcAwareExecutor(T delegate, Map<String, String> contextData) {
32+
this.delegate = checkNotNull(delegate);
33+
this.contextData = Collections.unmodifiableMap(checkNotNull(contextData));
34+
}
35+
36+
public void execute(Runnable command) {
37+
delegate.execute(MdcUtils.wrapRunnable(contextData, command));
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.util;
20+
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.concurrent.Callable;
26+
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.Future;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
31+
32+
import static org.apache.flink.util.MdcUtils.wrapCallable;
33+
import static org.apache.flink.util.MdcUtils.wrapRunnable;
34+
35+
class MdcAwareExecutorService<S extends ExecutorService> extends MdcAwareExecutor<S>
36+
implements ExecutorService {
37+
38+
public MdcAwareExecutorService(S delegate, Map<String, String> contextData) {
39+
super(delegate, contextData);
40+
}
41+
42+
@Override
43+
public void shutdown() {
44+
delegate.shutdown();
45+
}
46+
47+
@Override
48+
public List<Runnable> shutdownNow() {
49+
return delegate.shutdownNow();
50+
}
51+
52+
@Override
53+
public boolean isShutdown() {
54+
return delegate.isShutdown();
55+
}
56+
57+
@Override
58+
public boolean isTerminated() {
59+
return delegate.isTerminated();
60+
}
61+
62+
@Override
63+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
64+
return delegate.awaitTermination(timeout, unit);
65+
}
66+
67+
@Override
68+
public <T> Future<T> submit(Callable<T> task) {
69+
return delegate.submit(wrapCallable(contextData, task));
70+
}
71+
72+
@Override
73+
public <T> Future<T> submit(Runnable task, T result) {
74+
return delegate.submit(wrapRunnable(contextData, task), result);
75+
}
76+
77+
@Override
78+
public Future<?> submit(Runnable task) {
79+
return delegate.submit(wrapRunnable(contextData, task));
80+
}
81+
82+
@Override
83+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
84+
throws InterruptedException {
85+
return delegate.invokeAll(wrapCallables(tasks));
86+
}
87+
88+
@Override
89+
public <T> List<Future<T>> invokeAll(
90+
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
91+
throws InterruptedException {
92+
return delegate.invokeAll(wrapCallables(tasks), timeout, unit);
93+
}
94+
95+
@Override
96+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
97+
throws InterruptedException, ExecutionException {
98+
return delegate.invokeAny(wrapCallables(tasks));
99+
}
100+
101+
@Override
102+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
103+
throws InterruptedException, ExecutionException, TimeoutException {
104+
return delegate.invokeAny(wrapCallables(tasks), timeout, unit);
105+
}
106+
107+
private <T> List<Callable<T>> wrapCallables(Collection<? extends Callable<T>> tasks) {
108+
List<Callable<T>> list = new ArrayList<>(tasks.size());
109+
for (Callable<T> task : tasks) {
110+
list.add(wrapCallable(contextData, task));
111+
}
112+
return list;
113+
}
114+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.util;
20+
21+
import java.util.Map;
22+
import java.util.concurrent.Callable;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.ScheduledFuture;
25+
import java.util.concurrent.TimeUnit;
26+
27+
import static org.apache.flink.util.MdcUtils.wrapCallable;
28+
import static org.apache.flink.util.MdcUtils.wrapRunnable;
29+
30+
class MdcAwareScheduledExecutorService extends MdcAwareExecutorService<ScheduledExecutorService>
31+
implements ScheduledExecutorService {
32+
33+
public MdcAwareScheduledExecutorService(
34+
ScheduledExecutorService delegate, Map<String, String> contextData) {
35+
super(delegate, contextData);
36+
}
37+
38+
@Override
39+
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
40+
return delegate.schedule(wrapRunnable(contextData, command), delay, unit);
41+
}
42+
43+
@Override
44+
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
45+
return delegate.schedule(wrapCallable(contextData, callable), delay, unit);
46+
}
47+
48+
@Override
49+
public ScheduledFuture<?> scheduleAtFixedRate(
50+
Runnable command, long initialDelay, long period, TimeUnit unit) {
51+
return delegate.scheduleAtFixedRate(
52+
wrapRunnable(contextData, command), initialDelay, period, unit);
53+
}
54+
55+
@Override
56+
public ScheduledFuture<?> scheduleWithFixedDelay(
57+
Runnable command, long initialDelay, long delay, TimeUnit unit) {
58+
return delegate.scheduleWithFixedDelay(
59+
wrapRunnable(contextData, command), initialDelay, delay, unit);
60+
}
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.util;
20+
21+
import org.apache.flink.api.common.JobID;
22+
23+
import org.slf4j.MDC;
24+
25+
import java.util.Collections;
26+
import java.util.Map;
27+
import java.util.concurrent.Callable;
28+
import java.util.concurrent.Executor;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
32+
import static org.apache.flink.util.Preconditions.checkArgument;
33+
34+
/** Utility class to manage common Flink attributes in {@link MDC} (only {@link JobID} ATM). */
35+
public class MdcUtils {
36+
37+
public static final String JOB_ID = "flink-job-id";
38+
39+
/**
40+
* Replace MDC contents with the provided one and return a closeable object that can be used to
41+
* restore the original MDC.
42+
*
43+
* @param context to put into MDC
44+
*/
45+
public static MdcCloseable withContext(Map<String, String> context) {
46+
final Map<String, String> orig = MDC.getCopyOfContextMap();
47+
MDC.setContextMap(context);
48+
return () -> MDC.setContextMap(orig);
49+
}
50+
51+
/** {@link AutoCloseable } that restores the {@link MDC} contents on close. */
52+
public interface MdcCloseable extends AutoCloseable {
53+
@Override
54+
void close();
55+
}
56+
57+
/**
58+
* Wrap the given {@link Runnable} so that the given data is added to {@link MDC} before its
59+
* execution and removed afterward.
60+
*/
61+
public static Runnable wrapRunnable(Map<String, String> contextData, Runnable command) {
62+
return () -> {
63+
try (MdcCloseable ctx = withContext(contextData)) {
64+
command.run();
65+
}
66+
};
67+
}
68+
69+
/**
70+
* Wrap the given {@link Callable} so that the given data is added to {@link MDC} before its
71+
* execution and removed afterward.
72+
*/
73+
public static <T> Callable<T> wrapCallable(
74+
Map<String, String> contextData, Callable<T> command) {
75+
return () -> {
76+
try (MdcCloseable ctx = withContext(contextData)) {
77+
return command.call();
78+
}
79+
};
80+
}
81+
82+
/**
83+
* Wrap the given {@link Executor} so that the given {@link JobID} is added before it executes
84+
* any submitted commands and removed afterward.
85+
*/
86+
public static Executor scopeToJob(JobID jobID, Executor executor) {
87+
checkArgument(!(executor instanceof MdcAwareExecutor));
88+
return new MdcAwareExecutor<>(executor, asContextData(jobID));
89+
}
90+
91+
/**
92+
* Wrap the given {@link ExecutorService} so that the given {@link JobID} is added before it
93+
* executes any submitted commands and removed afterward.
94+
*/
95+
public static ExecutorService scopeToJob(JobID jobID, ExecutorService delegate) {
96+
checkArgument(!(delegate instanceof MdcAwareExecutorService));
97+
return new MdcAwareExecutorService<>(delegate, asContextData(jobID));
98+
}
99+
100+
/**
101+
* Wrap the given {@link ScheduledExecutorService} so that the given {@link JobID} is added
102+
* before it executes any submitted commands and removed afterward.
103+
*/
104+
public static ScheduledExecutorService scopeToJob(JobID jobID, ScheduledExecutorService ses) {
105+
checkArgument(!(ses instanceof MdcAwareScheduledExecutorService));
106+
return new MdcAwareScheduledExecutorService(ses, asContextData(jobID));
107+
}
108+
109+
public static Map<String, String> asContextData(JobID jobID) {
110+
return Collections.singletonMap(JOB_ID, jobID.toHexString());
111+
}
112+
}

0 commit comments

Comments
 (0)