Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Kafka to 3.9.0 #2247

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
Expand Down Expand Up @@ -282,7 +282,7 @@ public Map<Object, Object> buildConfig() {
}
if (_trustStore != null || _sslPort > 0) {
try {
props.putAll(TestSslUtils.createSslConfig(false, true, Mode.SERVER, _trustStore, "server" + _nodeId));
props.putAll(TestSslUtils.createSslConfig(false, true, ConnectionMode.SERVER, _trustStore, "server" + _nodeId));
// Switch interbroker to ssl
props.put(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.test.TestSslUtils;
Expand Down Expand Up @@ -58,7 +58,7 @@ protected void setSecurityConfigs(Properties clientProps, String certAlias) {
clientProps.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.name);
clientProps.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
try {
clientProps.putAll(TestSslUtils.createSslConfig(true, true, Mode.CLIENT, trustStoreFile, certAlias));
clientProps.putAll(TestSslUtils.createSslConfig(true, true, ConnectionMode.CLIENT, trustStoreFile, certAlias));
} catch (Exception e) {
throw new IllegalStateException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -548,7 +548,7 @@ public static KafkaZkClient createKafkaZkClient(String connectString,
org.apache.kafka.common.utils.Time.class, String.class, ZKClientConfig.class,
String.class, String.class, boolean.class, boolean.class);
kafkaZkClient = (KafkaZkClient) kafka38PlusMet.invoke(null, connectString, zkSecurityEnabled, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
Integer.MAX_VALUE, new SystemTime(), zkClientName, zkClientConfig, metricGroup,
Integer.MAX_VALUE, Time.SYSTEM, zkClientName, zkClientConfig, metricGroup,
metricType, false, true);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
LOG.debug("Unable to find apply method in KafkaZkClient for Kafka 3.8+.", e);
Expand All @@ -560,7 +560,7 @@ Integer.MAX_VALUE, new SystemTime(), zkClientName, zkClientConfig, metricGroup,
org.apache.kafka.common.utils.Time.class, String.class, ZKClientConfig.class,
String.class, String.class, boolean.class);
kafkaZkClient = (KafkaZkClient) kafka31PlusMet.invoke(null, connectString, zkSecurityEnabled, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
Integer.MAX_VALUE, new SystemTime(), zkClientName, zkClientConfig, metricGroup,
Integer.MAX_VALUE, Time.SYSTEM, zkClientName, zkClientConfig, metricGroup,
metricType, false);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
LOG.debug("Unable to find apply method in KafkaZkClient for Kafka 3.1+.", e);
Expand All @@ -574,7 +574,7 @@ Integer.MAX_VALUE, new SystemTime(), zkClientName, zkClientConfig, metricGroup,
org.apache.kafka.common.utils.Time.class, String.class, String.class, Option.class,
Option.class);
kafkaZkClient = (KafkaZkClient) kafka31MinusMet.invoke(null, connectString, zkSecurityEnabled, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
Integer.MAX_VALUE, new SystemTime(), metricGroup, metricType, zkClientName, zkConfig);
Integer.MAX_VALUE, Time.SYSTEM, metricGroup, metricType, zkClientName, zkConfig);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
LOG.debug("Unable to find apply method in KafkaZkClient for Kafka 3.1-.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -172,7 +171,7 @@ public AnomalyDetectorManager(KafkaCruiseControl kafkaCruiseControl, Time time,
_selfHealingFixGenerationTimer = new HashMap<>();
cachedValues().forEach(anomalyType -> _selfHealingFixGenerationTimer.put(anomalyType, new Timer()));
// Add anomaly detector state
_anomalyDetectorState = new AnomalyDetectorState(new SystemTime(), _anomalyNotifier, 10, null);
_anomalyDetectorState = new AnomalyDetectorState(Time.SYSTEM, _anomalyNotifier, 10, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,7 +80,7 @@ public class SelfHealingNotifier implements AnomalyNotifier {
protected final Map<Boolean, Map<Integer, Long>> _latestFailedBrokersByAutoFixTriggered;

public SelfHealingNotifier() {
this(new SystemTime());
this(Time.SYSTEM);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.regex.Pattern;
import junit.framework.AssertionFailedError;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -106,7 +106,7 @@ private GoalOptimizer createGoalOptimizer(Properties overrideProps) {
props.putAll(overrideProps);
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(props);

return new GoalOptimizer(config, EasyMock.mock(LoadMonitor.class), new SystemTime(), new MetricRegistry(),
return new GoalOptimizer(config, EasyMock.mock(LoadMonitor.class), Time.SYSTEM, new MetricRegistry(),
EasyMock.mock(Executor.class), EasyMock.mock(AdminClient.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;


Expand Down Expand Up @@ -56,7 +56,7 @@ public static void main(String[] argv) throws Exception {
// Instantiate the components.
GoalOptimizer goalOptimizer = new GoalOptimizer(config,
null,
new SystemTime(),
Time.SYSTEM,
new MetricRegistry(),
EasyMock.mock(Executor.class),
EasyMock.mock(AdminClient.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -140,7 +140,7 @@ static boolean executeGoalsFor(BalancingConstraint constraint,
props.setProperty(AnalyzerConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG, stringJoiner.toString());
GoalOptimizer goalOptimizer = new GoalOptimizer(new KafkaCruiseControlConfig(constraint.setProps(props)),
null,
new SystemTime(),
Time.SYSTEM,
new MetricRegistry(),
EasyMock.mock(Executor.class),
EasyMock.mock(AdminClient.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -112,7 +112,7 @@ public void testRackIdMapper() throws Exception {
_goalToTest.configure(kafkaCruiseControlConfig.mergedConfigValues());
GoalOptimizer goalOptimizer = new GoalOptimizer(kafkaCruiseControlConfig,
null,
new SystemTime(),
Time.SYSTEM,
new MetricRegistry(),
EasyMock.mock(Executor.class),
EasyMock.mock(AdminClient.class));
Expand Down Expand Up @@ -176,7 +176,7 @@ public void testWithoutRackIdMapper() throws Exception {
_goalToTest.configure(kafkaCruiseControlConfig.mergedConfigValues());
GoalOptimizer goalOptimizer = new GoalOptimizer(kafkaCruiseControlConfig,
null,
new SystemTime(),
Time.SYSTEM,
new MetricRegistry(),
EasyMock.mock(Executor.class),
EasyMock.mock(AdminClient.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -114,7 +114,7 @@ public void testRemoveDisks() throws KafkaCruiseControlException {
_goalToTest.configure(_kafkaCruiseControlConfig.mergedConfigValues());
GoalOptimizer goalOptimizer = new GoalOptimizer(_kafkaCruiseControlConfig,
null,
new SystemTime(),
Time.SYSTEM,
new MetricRegistry(),
EasyMock.mock(Executor.class),
EasyMock.mock(AdminClient.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -65,7 +65,7 @@ public static void setup() {
Integer.toString(MOCK_DEFAULT_CONCURRENCY.get(ConcurrencyType.LEADERSHIP_BROKER)));
properties.put(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG,
Integer.toString(MOCK_DEFAULT_CONCURRENCY.get(ConcurrencyType.INTRA_BROKER_REPLICA)));
taskManager = new ExecutionTaskManager(null, new MetricRegistry(), new SystemTime(),
taskManager = new ExecutionTaskManager(null, new MetricRegistry(), Time.SYSTEM,
new KafkaCruiseControlConfig(properties));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.client.ZKClientConfig;
import org.easymock.Capture;
Expand Down Expand Up @@ -780,7 +779,7 @@ private void executeAndVerifyProposals(KafkaZkClient kafkaZkClient,
EasyMock.replay(mockUserTaskInfo, mockExecutorNotifier, mockLoadMonitor, mockAnomalyDetectorManager);
}
MetricRegistry metricRegistry = new MetricRegistry();
Executor executor = new Executor(configs, new SystemTime(), metricRegistry, null, mockExecutorNotifier,
Executor executor = new Executor(configs, Time.SYSTEM, metricRegistry, null, mockExecutorNotifier,
mockAnomalyDetectorManager);
executor.setUserTaskManager(mockUserTaskManager);
Map<TopicPartition, Integer> replicationFactors = new HashMap<>();
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ org.gradle.daemon=false
org.gradle.parallel=false
org.gradle.jvmargs=-Xms512m -Xmx512m
scalaVersion=2.13.13
kafkaVersion=3.8.0
kafkaVersion=3.9.0
zookeeperVersion=3.9.3
nettyVersion=4.1.114.Final
jettyVersion=9.4.56.v20240826
Expand Down
Loading