Skip to content

IKC-439 Remove config readers for clusters and groups from YAML kouncil configuration file #423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: IKC-436
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- name: Checkout project
uses: actions/checkout@v2
- name: Cache local Maven repository
uses: actions/cache@v2
uses: actions/cache@v3
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Checkout project
uses: actions/checkout@v2
- name: Cache local Maven repository
uses: actions/cache@v2
uses: actions/cache@v3
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Checkout project
uses: actions/checkout@v2
- name: Cache local Maven repository
uses: actions/cache@v2
uses: actions/cache@v3
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.valves.rewrite.RewriteValve;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomTomcatConfiguration implements WebServerFactoryCustomizer<TomcatServletWebServerFactory> {

Expand All @@ -29,7 +31,7 @@ protected synchronized void startInternal() throws LifecycleException {

parse(buffer);
} catch (IOException e) {
e.printStackTrace();
log.error("Rewrite config exception={}", e.getMessage(), e);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.Data;
import lombok.RequiredArgsConstructor;
Expand All @@ -38,15 +40,11 @@
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
* @deprecated will be removed in the future.
*/
@Deprecated
@Component
@Data
@Slf4j
@Component
@RequiredArgsConstructor
@ConfigurationProperties(prefix = "kouncil")
@Slf4j
public class ClusterConfigReader {

protected static final String SPECIAL_CHARS = "[^a-zA-Z0-9\\s]";
Expand All @@ -66,74 +64,97 @@ public class ClusterConfigReader {
public void init() {
List<Cluster> clustersToSave = new ArrayList<>();
Iterable<Cluster> all = repository.findAll();
//get saved cluster names to check if some clusters are not saved already
List<String> clusterNames = StreamSupport.stream(all.spliterator(), false).map(Cluster::getName).toList();
//get saved cluster names to check if some clusters are saved already
Map<String, Cluster> clustersMap = StreamSupport.stream(all.spliterator(), false).collect(Collectors.toMap(Cluster::getName, cluster -> cluster));

if (clusters != null) {
initializeAdvancedConfig(clustersToSave, clusterNames);
initializeAdvancedConfig(clustersToSave, clustersMap);
} else {
initializeSimpleConfig(clustersToSave, clusterNames);
initializeSimpleConfig(clustersToSave, clustersMap);
}

repository.saveAll(clustersToSave);
}

private void initializeAdvancedConfig(List<Cluster> clustersToSave, List<String> clusterNames) {
clusters.forEach(clusterConfig -> {
if (!clusterNames.contains(clusterConfig.getName())) {
Cluster cluster = new Cluster();
cluster.setName(clusterConfig.getName());
private void initializeAdvancedConfig(List<Cluster> clustersToSave, Map<String, Cluster> clustersMap) {
this.clusters.forEach(clusterConfig -> {
Cluster cluster = new Cluster();
String clusterName = clusterConfig.getName();
cluster.setName(clusterName);

//Global JMX properties
cluster.setGlobalJmxPort(clusterConfig.getJmxPort());
cluster.setGlobalJmxUser(clusterConfig.getJmxUser());
cluster.setGlobalJmxPassword(clusterConfig.getJmxPassword());
//Global JMX properties
cluster.setGlobalJmxPort(clusterConfig.getJmxPort());
cluster.setGlobalJmxUser(clusterConfig.getJmxUser());
cluster.setGlobalJmxPassword(clusterConfig.getJmxPassword());

cluster.setClusterSecurityConfig(new ClusterSecurityConfig());
cluster.getClusterSecurityConfig().setAuthenticationMethod(ClusterAuthenticationMethod.NONE);
KafkaProperties kafkaProperties = clusterConfig.getKafka();
cluster.setClusterSecurityConfig(new ClusterSecurityConfig());
cluster.getClusterSecurityConfig().setAuthenticationMethod(ClusterAuthenticationMethod.NONE);
KafkaProperties kafkaProperties = clusterConfig.getKafka();

if (kafkaProperties.getSecurity().getProtocol() != null) {
setClusterSSLConfig(cluster.getClusterSecurityConfig(), kafkaProperties.getSsl(), kafkaProperties.getSecurity().getProtocol());
}
setClusterSASLConfig(cluster.getClusterSecurityConfig(), clusterConfig.getBrokers());
setBrokers(cluster, clusterConfig);
setClusterSchemaRegistry(cluster, clusterConfig);
if (kafkaProperties.getSecurity().getProtocol() != null) {
setClusterSSLConfig(cluster.getClusterSecurityConfig(), kafkaProperties.getSsl(), kafkaProperties.getSecurity().getProtocol());
}
setClusterSASLConfig(cluster.getClusterSecurityConfig(), clusterConfig.getBrokers());
setBrokers(cluster, clusterConfig);
setClusterSchemaRegistry(cluster, clusterConfig);

if (!clustersMap.containsKey(clusterName)) {
clustersToSave.add(cluster);
} else {
log.warn("Cluster with name={} already exists", clusterConfig.getName());
log.warn("Cluster with name={} already exists", clusterName);
if (!cluster.equals(clustersMap.get(clusterName))) {
log.warn("Cluster config changed");

cluster.setId(clustersMap.get(clusterName).getId());
if (cluster.getSchemaRegistry() != null && clustersMap.get(clusterName).getSchemaRegistry() != null) {
cluster.getSchemaRegistry().setId(clustersMap.get(clusterName).getSchemaRegistry().getId());
}

clustersToSave.add(cluster);
}
}
});
}

private void initializeSimpleConfig(List<Cluster> clustersToSave, List<String> clusterNames) {
private void initializeSimpleConfig(List<Cluster> clustersToSave, Map<String, Cluster> clustersMap) {
for (String initialBootstrapServer : initialBootstrapServers) {
String clusterId = sanitizeClusterId(initialBootstrapServer);
if (!clusterNames.contains(clusterId)) {
if (initialBootstrapServer.contains(HOST_PORT_SEPARATOR)) {
Cluster cluster = new Cluster();
cluster.setName(clusterId);
cluster.setClusterSecurityConfig(new ClusterSecurityConfig());
cluster.getClusterSecurityConfig().setAuthenticationMethod(ClusterAuthenticationMethod.NONE);
cluster.setBrokers(new HashSet<>());

Broker broker = new Broker();
broker.setBootstrapServer(initialBootstrapServer);
cluster.getBrokers().add(broker);

if (isNotBlank(schemaRegistryUrl)) {
SchemaRegistry schemaRegistry = new SchemaRegistry();
schemaRegistry.setUrl(schemaRegistryUrl);
cluster.setSchemaRegistry(schemaRegistry);
}

clustersToSave.add(cluster);
} else {
throw new KouncilRuntimeException(format("Could not parse bootstrap server %s", initialBootstrapServer));
Cluster cluster = new Cluster();
if (initialBootstrapServer.contains(HOST_PORT_SEPARATOR)) {
cluster.setName(clusterId);
cluster.setClusterSecurityConfig(new ClusterSecurityConfig());
cluster.getClusterSecurityConfig().setAuthenticationMethod(ClusterAuthenticationMethod.NONE);
cluster.setBrokers(new HashSet<>());

Broker broker = new Broker();
broker.setBootstrapServer(initialBootstrapServer);
cluster.getBrokers().add(broker);

if (isNotBlank(schemaRegistryUrl)) {
SchemaRegistry schemaRegistry = new SchemaRegistry();
schemaRegistry.setUrl(schemaRegistryUrl);
cluster.setSchemaRegistry(schemaRegistry);
}

} else {
throw new KouncilRuntimeException(format("Could not parse bootstrap server %s", initialBootstrapServer));
}

if (!clustersMap.containsKey(clusterId)) {
log.info("Adding cluster with name={}", clusterId);
clustersToSave.add(cluster);
} else {
log.warn("Cluster with name={} already exists", clusterId);
if (cluster.equals(clustersMap.get(clusterId))) {
log.warn("Cluster config changed");

cluster.setId(clustersMap.get(clusterId).getId());
if (cluster.getSchemaRegistry() != null && clustersMap.get(clusterId).getSchemaRegistry() != null) {
cluster.getSchemaRegistry().setId(clustersMap.get(clusterId).getSchemaRegistry().getId());
}
clustersToSave.add(cluster);
}
}
}
}
Expand Down Expand Up @@ -183,11 +204,15 @@ private void setClusterSSLConfig(ClusterSecurityConfig clusterSecurityConfig, Ss

if (ssl != null) {
try {
clusterSecurityConfig.setKeystoreLocation(ssl.getKeyStoreLocation().getFile().getAbsolutePath());
if (ssl.getKeyStoreLocation() != null) {
clusterSecurityConfig.setKeystoreLocation(ssl.getKeyStoreLocation().getFile().getAbsolutePath());
}
clusterSecurityConfig.setKeystorePassword(ssl.getKeyStorePassword());
clusterSecurityConfig.setKeyPassword(ssl.getKeyPassword());

clusterSecurityConfig.setTruststoreLocation(ssl.getTrustStoreLocation().getFile().getAbsolutePath());
if (ssl.getTrustStoreLocation() != null) {
clusterSecurityConfig.setTruststoreLocation(ssl.getTrustStoreLocation().getFile().getAbsolutePath());
}
clusterSecurityConfig.setTruststorePassword(ssl.getTrustStorePassword());
} catch (IOException e) {
throw new KouncilRuntimeException(e);
Expand Down Expand Up @@ -226,10 +251,14 @@ private void setSchemaRegistrySSL(SchemaRegistrySecurityConfig schemaRegistrySec
schemaRegistrySecurityConfig.setKeyPassword(schemaRegistrySSL.getKeyPassword());
schemaRegistrySecurityConfig.setKeystorePassword(schemaRegistrySSL.getKeyStorePassword());
schemaRegistrySecurityConfig.setKeystoreType(StoreType.valueOf(schemaRegistrySSL.getKeyStoreType()));
schemaRegistrySecurityConfig.setKeystoreLocation(schemaRegistrySSL.getKeyStoreLocation().getFile().getAbsolutePath());
if (schemaRegistrySSL.getKeyStoreLocation() != null) {
schemaRegistrySecurityConfig.setKeystoreLocation(schemaRegistrySSL.getKeyStoreLocation().getFile().getAbsolutePath());
}

schemaRegistrySecurityConfig.setTruststoreType(StoreType.valueOf(schemaRegistrySSL.getTrustStoreType()));
schemaRegistrySecurityConfig.setTruststoreLocation(schemaRegistrySSL.getTrustStoreLocation().getFile().getAbsolutePath());
if (schemaRegistrySSL.getTrustStoreLocation() != null) {
schemaRegistrySecurityConfig.setTruststoreLocation(schemaRegistrySSL.getTrustStoreLocation().getFile().getAbsolutePath());
}
schemaRegistrySecurityConfig.setTruststorePassword(schemaRegistrySSL.getTrustStorePassword());
} catch (IOException e) {
throw new KouncilRuntimeException(e);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.method.configuration.EnableGlobalMethodSecurity;
import org.springframework.context.annotation.DependsOn;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
Expand All @@ -23,7 +24,6 @@
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "kouncil.auth", name = "active-provider", havingValue = "inmemory")
@EnableMethodSecurity(jsr250Enabled = true, securedEnabled = true)
@DependsOn({"userGroupsConfigReader"})
public class InMemoryWebSecurityConfig {

private final InMemoryUserManager inMemoryUserManager;
Expand Down
Loading