Skip to content

Commit

Permalink
MINOR: Cleanup connect runtime module (#18074)
Browse files Browse the repository at this point in the history
Reviewers: Mickael Maison <[email protected]>
  • Loading branch information
wernerdv authored Dec 10, 2024
1 parent f57fd2d commit 45835a0
Show file tree
Hide file tree
Showing 21 changed files with 87 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,7 @@ CreateConnectorRequest parseConnectorConfigurationFile(String filePath) throws I

File connectorConfigurationFile = Paths.get(filePath).toFile();
try {
Map<String, String> connectorConfigs = objectMapper.readValue(
connectorConfigurationFile,
new TypeReference<Map<String, String>>() { });
Map<String, String> connectorConfigs = objectMapper.readValue(connectorConfigurationFile, new TypeReference<>() { });

if (!connectorConfigs.containsKey(NAME_CONFIG)) {
throw new ConnectException("Connector configuration at '" + filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
Expand All @@ -136,8 +134,7 @@ CreateConnectorRequest parseConnectorConfigurationFile(String filePath) throws I

try {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
CreateConnectorRequest createConnectorRequest = objectMapper.readValue(connectorConfigurationFile,
new TypeReference<CreateConnectorRequest>() { });
CreateConnectorRequest createConnectorRequest = objectMapper.readValue(connectorConfigurationFile, new TypeReference<>() { });
if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
if (!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name())) {
throw new ConnectException("Connector name configuration in 'config' doesn't match the one specified in 'name' at '" + filePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ public ConnectMetrics(String workerId, WorkerConfig config, Time time, String cl
.timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).recordLevel(
Sensor.RecordingLevel.forName(metricsRecordingLevel));

Map<String, Object> contextLabels = new HashMap<>();
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
Map<String, Object> contextLabels = new HashMap<>(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
Object groupId = config.originals().get(DistributedConfig.GROUP_ID_CONFIG);
if (groupId != null) {
Expand Down Expand Up @@ -391,8 +390,7 @@ public Sensor sensor(String name, MetricConfig config, Sensor... parents) {
public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
// We need to make sure that all sensor names are unique across all groups, so use the sensor prefix
Sensor result = metrics.sensor(sensorPrefix + name, config, Long.MAX_VALUE, recordingLevel, parents);
if (result != null)
sensorNames.add(result.name());
sensorNames.add(result.name());
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private void commitTransaction() {

error = flushError.get();
if (error != null) {
recordCommitFailure(time.milliseconds() - started, null);
recordCommitFailure(time.milliseconds() - started);
offsetWriter.cancelFlush();
throw maybeWrapProducerSendException(
"Failed to flush offsets and/or records for task " + id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

/**
* Utility class that tracks the current state and the duration of time spent in each state.
* This class is threadsafe.
* This class is thread-safe.
*/
public class StateTracker {

Expand Down Expand Up @@ -60,7 +60,7 @@ public State currentState() {

/**
* An immutable record of the accumulated times at the most recent state change. This class is required to
* efficiently make {@link StateTracker} threadsafe.
* efficiently make {@link StateTracker} thread-safe.
*/
private static final class StateChange {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ protected void iteration() {

// Maybe commit
if (!committing && (context.isCommitRequested() || now >= nextCommit)) {
commitOffsets(now, false);
commitOffsets(now);
nextCommit = now + offsetCommitIntervalMs;
context.clearCommitRequest();
}
Expand Down Expand Up @@ -282,7 +282,7 @@ private void onCommitCompleted(Throwable error, long seqno, Map<TopicPartition,
log.error("{} Commit of offsets threw an unexpected exception for sequence number {}: {}",
this, seqno, committedOffsets, error);
commitFailures++;
recordCommitFailure(durationMillis, error);
recordCommitFailure(durationMillis);
} else {
log.debug("{} Finished offset commit successfully in {} ms for sequence number {}: {}",
this, durationMillis, seqno, committedOffsets);
Expand Down Expand Up @@ -396,8 +396,8 @@ private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean cl
}
}

private void commitOffsets(long now, boolean closing) {
commitOffsets(now, closing, consumer.assignment());
private void commitOffsets(long now) {
commitOffsets(now, false, consumer.assignment());
}

private void commitOffsets(long now, boolean closing, Collection<TopicPartition> topicPartitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,11 @@ public boolean commitOffsets() {
shouldFlush = offsetWriter.beginFlush(timeout - time.milliseconds(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("{} Interrupted while waiting for previous offset flush to complete, cancelling", this);
recordCommitFailure(time.milliseconds() - started, e);
recordCommitFailure(time.milliseconds() - started);
return false;
} catch (TimeoutException e) {
log.warn("{} Timed out while waiting for previous offset flush to complete, cancelling", this);
recordCommitFailure(time.milliseconds() - started, e);
recordCommitFailure(time.milliseconds() - started);
return false;
}
if (!shouldFlush) {
Expand All @@ -292,7 +292,7 @@ public boolean commitOffsets() {
// any data
if (flushFuture == null) {
offsetWriter.cancelFlush();
recordCommitFailure(time.milliseconds() - started, null);
recordCommitFailure(time.milliseconds() - started);
return false;
}
try {
Expand All @@ -304,17 +304,17 @@ public boolean commitOffsets() {
} catch (InterruptedException e) {
log.warn("{} Flush of offsets interrupted, cancelling", this);
offsetWriter.cancelFlush();
recordCommitFailure(time.milliseconds() - started, e);
recordCommitFailure(time.milliseconds() - started);
return false;
} catch (ExecutionException e) {
log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
offsetWriter.cancelFlush();
recordCommitFailure(time.milliseconds() - started, e);
recordCommitFailure(time.milliseconds() - started);
return false;
} catch (TimeoutException e) {
log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", this);
offsetWriter.cancelFlush();
recordCommitFailure(time.milliseconds() - started, null);
recordCommitFailure(time.milliseconds() - started);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,17 +356,16 @@ protected void recordActiveTopic(String topic) {
* @param duration the length of time in milliseconds for the commit attempt to complete
*/
protected void recordCommitSuccess(long duration) {
taskMetricsGroup.recordCommit(duration, true, null);
taskMetricsGroup.recordCommit(duration, true);
}

/**
* Record that offsets have been committed.
*
* @param duration the length of time in milliseconds for the commit attempt to complete
* @param error the unexpected error that occurred; may be null in the case of timeouts or interruptions
*/
protected void recordCommitFailure(long duration, Throwable error) {
taskMetricsGroup.recordCommit(duration, false, error);
protected void recordCommitFailure(long duration) {
taskMetricsGroup.recordCommit(duration, false);
}

/**
Expand Down Expand Up @@ -434,7 +433,7 @@ void close() {
metricGroup.close();
}

void recordCommit(long duration, boolean success, Throwable error) {
void recordCommit(long duration, boolean success) {
if (success) {
commitTime.record(duration);
commitAttempts.record(1.0d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1510,7 +1510,7 @@ public void restartConnectorAndTasks(RestartRequest request, Callback<ConnectorS
}
// Compute and send the response that this was accepted
Optional<RestartPlan> plan = buildRestartPlan(request);
if (!plan.isPresent()) {
if (plan.isEmpty()) {
callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null);
} else {
callback.onCompletion(null, plan.get().restartConnectorStateInfo());
Expand Down Expand Up @@ -1558,7 +1558,7 @@ void processRestartRequests() {
protected synchronized void doRestartConnectorAndTasks(RestartRequest request) {
String connectorName = request.connectorName();
Optional<RestartPlan> maybePlan = buildRestartPlan(request);
if (!maybePlan.isPresent()) {
if (maybePlan.isEmpty()) {
log.debug("Skipping restart of connector '{}' since no status is available: {}", connectorName, request);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public WorkerGroupMember(DistributedConfig config,
.tags(metricsTags);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);

Map<String, Object> contextLabels = new HashMap<>();
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
Map<String, Object> contextLabels = new HashMap<>(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, config.kafkaClusterId());
contextLabels.put(WorkerConfig.CONNECT_GROUP_ID, config.getString(DistributedConfig.GROUP_ID_CONFIG));
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, contextLabels);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.transforms.predicates.Predicate;

import java.io.PrintStream;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -53,29 +52,22 @@ private <P extends Predicate<?>> DocInfo(Class<P> predicateClass, String overvie
.sorted(Comparator.comparing(docInfo -> docInfo.predicateName))
.collect(Collectors.toList());

private static void printPredicateHtml(PrintStream out, DocInfo docInfo) {
out.println("<div id=\"" + docInfo.predicateName + "\">");

out.print("<h5>");
out.print("<a href=\"#" + docInfo.predicateName + "\">" + docInfo.predicateName + "</a>");
out.println("</h5>");

out.println(docInfo.overview);

out.println("<p/>");

out.println(docInfo.configDef.toHtml(6, key -> docInfo.predicateName + "_" + key));

out.println("</div>");
}

private static void printHtml(PrintStream out) {
private static String toHtml() {
StringBuilder b = new StringBuilder();
for (final DocInfo docInfo : PREDICATES) {
printPredicateHtml(out, docInfo);
b.append("<div id=\"" + docInfo.predicateName + "\">\n");
b.append("<h5>");
b.append("<a href=\"#" + docInfo.predicateName + "\">" + docInfo.predicateName + "</a>");
b.append("</h5>\n");
b.append(docInfo.overview + "\n");
b.append("<p/>\n");
b.append(docInfo.configDef.toHtml(6, key -> docInfo.predicateName + "_" + key) + "\n");
b.append("</div>\n");
}
return b.toString();
}

public static void main(String... args) {
printHtml(System.out);
System.out.println(toHtml());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.kafka.connect.transforms.TimestampRouter;
import org.apache.kafka.connect.transforms.ValueToKey;

import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -71,30 +70,23 @@ private DocInfo(String transformationName, String overview, ConfigDef configDef)
new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF)
);

private static void printTransformationHtml(PrintStream out, DocInfo docInfo) {
out.println("<div id=\"" + docInfo.transformationName + "\">");

out.print("<h5>");
out.print("<a href=\"#" + docInfo.transformationName + "\">" + docInfo.transformationName + "</a>");
out.println("</h5>");

out.println(docInfo.overview);

out.println("<p/>");

out.println(docInfo.configDef.toHtml(6, key -> docInfo.transformationName + "_" + key));

out.println("</div>");
}

private static void printHtml(PrintStream out) {
private static String toHtml() {
StringBuilder b = new StringBuilder();
for (final DocInfo docInfo : TRANSFORMATIONS) {
printTransformationHtml(out, docInfo);
b.append("<div id=\"" + docInfo.transformationName + "\">\n");
b.append("<h5>");
b.append("<a href=\"#" + docInfo.transformationName + "\">" + docInfo.transformationName + "</a>");
b.append("</h5>\n");
b.append(docInfo.overview + "\n");
b.append("<p/>\n");
b.append(docInfo.configDef.toHtml(6, key -> docInfo.transformationName + "_" + key) + "\n");
b.append("</div>\n");
}
return b.toString();
}

public static void main(String... args) {
printHtml(System.out);
System.out.println(toHtml());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ default void recordStage(Stage stage) {
}

default <V2> Callback<V2> chainStaging(Callback<V2> chained) {
return new Callback<V2>() {
return new Callback<>() {
@Override
public void recordStage(Stage stage) {
Callback.this.recordStage(stage);
Expand Down
Loading

0 comments on commit 45835a0

Please sign in to comment.