Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import io.kafbat.ui.connect.ApiClient;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
import io.kafbat.ui.connect.model.Connector;
import io.kafbat.ui.connect.model.ConnectorExpand;
import io.kafbat.ui.connect.model.ConnectorPlugin;
import io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse;
import io.kafbat.ui.connect.model.ConnectorStatus;
import io.kafbat.ui.connect.model.ConnectorTask;
import io.kafbat.ui.connect.model.ConnectorTopics;
import io.kafbat.ui.connect.model.ExpandedConnector;
import io.kafbat.ui.connect.model.NewConnector;
import io.kafbat.ui.connect.model.TaskStatus;
import io.kafbat.ui.exception.KafkaConnectConflictResponseException;
Expand Down Expand Up @@ -221,13 +223,17 @@ public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWith
}

@Override
public Mono<List<String>> getConnectors(String search) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.getConnectors(search));
public Mono<Map<String, ExpandedConnector>> getConnectors(
String search, List<ConnectorExpand> expand
) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.getConnectors(search, expand));
}

@Override
public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search));
public Mono<ResponseEntity<Map<String, ExpandedConnector>>> getConnectorsWithHttpInfo(
String search, List<ConnectorExpand> expand
) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search, expand));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ public enum LogLevel {
@AllArgsConstructor
public static class CacheProperties {
boolean enabled = true;
Duration connectCacheExpiry = Duration.ofMinutes(1);
Duration connectClusterCacheExpiry = Duration.ofHours(24);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, Stri
.build();

return validateAccess(context)
.thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
.doOnEach(sig -> audit(context, sig));
.thenReturn(
ResponseEntity.ok(
kafkaConnectService.getConnectors(getCluster(clusterName), connectName)
.flatMapMany(m -> Flux.fromIterable(m.keySet()))
)
).doOnEach(sig -> audit(context, sig));
}

@Override
Expand Down
39 changes: 39 additions & 0 deletions api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kafbat.ui.connect.model.ClusterInfo;
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
import io.kafbat.ui.connect.model.ConnectorTask;
import io.kafbat.ui.connect.model.ExpandedConnector;
import io.kafbat.ui.connect.model.NewConnector;
import io.kafbat.ui.model.ConnectDTO;
import io.kafbat.ui.model.ConnectorDTO;
Expand All @@ -14,10 +15,15 @@
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
import io.kafbat.ui.model.FullConnectorInfoDTO;
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.TaskIdDTO;
import io.kafbat.ui.model.TaskStatusDTO;
import io.kafbat.ui.model.connect.InternalConnectorInfo;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;

Expand All @@ -43,6 +49,39 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse
connectorPluginConfigValidationResponse);

default InternalConnectorInfo fromClient(String connect, ExpandedConnector connector, @Nullable List<String> topics) {
Objects.requireNonNull(connector.getInfo());
Objects.requireNonNull(connector.getStatus());
List<TaskDTO> tasks = List.of();

if (connector.getInfo().getTasks() != null
&& connector.getStatus().getTasks() != null
) {
Map<Integer, TaskIdDTO> taskIds = connector.getInfo().getTasks()
.stream().map(t -> new TaskIdDTO().task(t.getTask()).connector(t.getConnector()))
.collect(Collectors.toMap(
TaskIdDTO::getTask,
t -> t
));

tasks = connector.getStatus().getTasks().stream()
.map(s ->
new TaskDTO().status(fromClient(s)).id(taskIds.get(s.getId()))
).toList();
}

ConnectorDTO connectorDto = fromClient(connector.getInfo())
.connect(connect)
.status(fromClient(connector.getStatus().getConnector()));

return InternalConnectorInfo.builder()
.connector(connectorDto)
.config(connector.getInfo().getConfig())
.tasks(tasks)
.topics(topics)
.build();
}

default ConnectDTO toKafkaConnect(
ClustersProperties.ConnectCluster connect,
List<InternalConnectorInfo> connectors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.springframework.stereotype.Component;

@Component
class KafkaConfigSanitizer {
public class KafkaConfigSanitizer {

private static final String SANITIZED_VALUE = "******";

Expand Down
113 changes: 44 additions & 69 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
import io.kafbat.ui.connect.model.ClusterInfo;
import io.kafbat.ui.connect.model.ConnectorExpand;
import io.kafbat.ui.connect.model.ConnectorStatus;
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
import io.kafbat.ui.connect.model.ConnectorTopics;
import io.kafbat.ui.connect.model.ExpandedConnector;
import io.kafbat.ui.connect.model.TaskStatus;
import io.kafbat.ui.exception.ConnectorOffsetsResetException;
import io.kafbat.ui.exception.NotFoundException;
Expand All @@ -24,6 +26,7 @@
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.model.NewConnectorDTO;
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.TaskIdDTO;
import io.kafbat.ui.model.connect.InternalConnectorInfo;
import io.kafbat.ui.service.index.KafkaConnectNgramFilter;
import io.kafbat.ui.util.ReactiveFailover;
Expand All @@ -32,7 +35,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
Expand All @@ -47,8 +49,6 @@ public class KafkaConnectService {
private final KafkaConnectMapper kafkaConnectMapper;
private final KafkaConfigSanitizer kafkaConfigSanitizer;
private final ClustersProperties clustersProperties;

private final AsyncCache<ConnectCacheKey, List<InternalConnectorInfo>> cachedConnectors;
private final AsyncCache<String, ClusterInfo> cacheClusterInfo;

public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
Expand All @@ -57,9 +57,6 @@ public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
this.kafkaConnectMapper = kafkaConnectMapper;
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
this.clustersProperties = clustersProperties;
this.cachedConnectors = Caffeine.newBuilder()
.expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry())
.buildAsync();
this.cacheClusterInfo = Caffeine.newBuilder()
.expireAfterWrite(clustersProperties.getCache().getConnectClusterCacheExpiry())
.buildAsync();
Expand All @@ -74,9 +71,10 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
Flux.fromIterable(connects).flatMap(c ->
getClusterInfo(cluster, c.getName()).map(ci -> Tuples.of(c, ci))
).flatMap(tuple -> (
getConnectConnectorsFromCache(new ConnectCacheKey(cluster, tuple.getT1()))
getConnectConnectors(cluster, tuple.getT1())
.collectList()
.map(connectors ->
kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), withStats)
kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), true)
)
)
)
Expand All @@ -85,29 +83,17 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
return Flux.fromIterable(connectClusters.orElse(List.of()))
.flatMap(c ->
getClusterInfo(cluster, c.getName()).map(info ->
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, withStats)
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false)
)
);
}
}

private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectCacheKey key) {
if (clustersProperties.getCache().isEnabled()) {
return Mono.fromFuture(
cachedConnectors.get(key, (t, e) ->
getConnectConnectors(t.cluster(), t.connect()).collectList().toFuture()
)
);
} else {
return getConnectConnectors(key.cluster(), key.connect()).collectList();
}
}

private Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectName) {
return Mono.fromFuture(cacheClusterInfo.get(connectName, (t, e) ->
api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo)
.onErrorResume(th -> {
log.error("Error on collecting cluster info" + th.getMessage(), th);
log.error("Error on collecting cluster info", th);
return Mono.just(new ClusterInfo());
}).toFuture()
));
Expand All @@ -116,17 +102,11 @@ private Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectNam
private Flux<InternalConnectorInfo> getConnectConnectors(
KafkaCluster cluster,
ClustersProperties.ConnectCluster connect) {
return getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName ->
Mono.zip(
getConnector(cluster, connect.getName(), connectorName),
getConnectorTasks(cluster, connect.getName(), connectorName).collectList()
).map(tuple ->
InternalConnectorInfo.builder()
.connector(tuple.getT1())
.config(null)
.tasks(tuple.getT2())
.topics(null)
.build()
return getConnectorsWithErrorsSuppress(cluster, connect.getName()).flatMapMany(connectors ->
Flux.fromStream(
connectors.values().stream().map(c ->
kafkaConnectMapper.fromClient(connect.getName(), c, null)
)
)
);
}
Expand All @@ -135,21 +115,20 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
@Nullable final String search, Boolean fts) {
return getConnects(cluster, false)
.flatMap(connect ->
getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
.flatMap(connectorName ->
Mono.zip(
getConnector(cluster, connect.getName(), connectorName),
getConnectorConfig(cluster, connect.getName(), connectorName),
getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
getConnectorTopics(cluster, connect.getName(), connectorName)
).map(tuple ->
InternalConnectorInfo.builder()
.connector(tuple.getT1())
.config(tuple.getT2())
.tasks(tuple.getT3())
.topics(tuple.getT4().getTopics())
.build())))
.map(kafkaConnectMapper::fullConnectorInfo)
getConnectorsWithErrorsSuppress(cluster, connect.getName())
.flatMapMany(connectors ->
Flux.fromIterable(connectors.entrySet())
.flatMap(e ->
getConnectorTopics(
cluster,
connect.getName(),
e.getKey()
).map(topics ->
kafkaConnectMapper.fromClient(connect.getName(), e.getValue(), topics.getTopics())
)
)
)
).map(kafkaConnectMapper::fullConnectorInfo)
.collectList()
.map(lst -> filterConnectors(lst, search, fts))
.flatMapMany(Flux::fromIterable);
Expand All @@ -165,14 +144,6 @@ private List<FullConnectorInfoDTO> filterConnectors(
return filter.find(search);
}

private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {
return Stream.of(
fullConnectorInfo.getName(),
fullConnectorInfo.getConnect(),
fullConnectorInfo.getStatus().getState().getValue(),
fullConnectorInfo.getType().getValue());
}

public Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String connectClusterName,
String connectorName) {
return api(cluster, connectClusterName)
Expand All @@ -183,15 +154,17 @@ public Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String con
.onErrorResume(Exception.class, e -> Mono.just(new ConnectorTopics().topics(List.of())));
}

public Flux<String> getConnectorNames(KafkaCluster cluster, String connectName) {
public Mono<Map<String, ExpandedConnector>> getConnectors(KafkaCluster cluster, String connectName) {
return api(cluster, connectName)
.mono(client -> client.getConnectors(null))
.flatMapMany(Flux::fromIterable);
.mono(client ->
client.getConnectors(null, List.of(ConnectorExpand.INFO, ConnectorExpand.STATUS))
);
}

// returns empty flux if there was an error communicating with Connect
public Flux<String> getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, String connectName) {
return getConnectorNames(cluster, connectName).onErrorComplete();
public Mono<Map<String, ExpandedConnector>> getConnectorsWithErrorsSuppress(
KafkaCluster cluster, String connectName) {
return getConnectors(cluster, connectName).onErrorComplete();
}

public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectName,
Expand All @@ -216,8 +189,8 @@ public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectNa

private Mono<Boolean> connectorExists(KafkaCluster cluster, String connectName,
String connectorName) {
return getConnectorNames(cluster, connectName)
.any(name -> name.equals(connectorName));
return getConnectors(cluster, connectName)
.map(m -> m.containsKey(connectorName));
}

public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName,
Expand Down Expand Up @@ -306,8 +279,11 @@ private Mono<Void> restartTasks(KafkaCluster cluster, String connectName,
return getConnectorTasks(cluster, connectName, connectorName)
.filter(taskFilter)
.flatMap(t ->
restartConnectorTask(cluster, connectName, connectorName, t.getId().getTask()))
.then();
restartConnectorTask(
cluster, connectName, connectorName,
Optional.ofNullable(t.getId()).map(TaskIdDTO::getTask).orElseThrow()
)
).then();
}

public Flux<TaskDTO> getConnectorTasks(KafkaCluster cluster, String connectName, String connectorName) {
Expand All @@ -318,8 +294,9 @@ public Flux<TaskDTO> getConnectorTasks(KafkaCluster cluster, String connectName,
.map(kafkaConnectMapper::fromClient)
.flatMap(task ->
client
.getConnectorTaskStatus(connectorName, task.getId().getTask())
.onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty())
.getConnectorTaskStatus(connectorName,
Optional.ofNullable(task.getId()).map(TaskIdDTO::getTask).orElseThrow()
).onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty())
.map(kafkaConnectMapper::fromClient)
.map(task::status)
));
Expand Down Expand Up @@ -372,6 +349,4 @@ public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName
.formatted(connectorName, connectName));
});
}

record ConnectCacheKey(KafkaCluster cluster, ClustersProperties.ConnectCluster connect) {}
}
Loading
Loading