Skip to content

Commit 6537419

Browse files
szabolcs-horvatholeewere
authored andcommitted
CB-17954 Refactor databus-connector module to use streaming-common
The code of databus-connector and streaming-common is the same or very similar in many places, so it makes sense to reuse code from streaming-common in databus-connector.
1 parent b233af1 commit 6537419

File tree

35 files changed

+295
-371
lines changed

35 files changed

+295
-371
lines changed

common/src/main/java/com/sequenceiq/cloudbreak/telemetry/databus/AbstractDatabusStreamConfiguration.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,23 @@
33
import java.util.HashMap;
44
import java.util.Map;
55

6-
public abstract class AbstractDatabusStreamConfiguration {
6+
import com.sequenceiq.cloudbreak.telemetry.streaming.CommonStreamingConfiguration;
7+
8+
public abstract class AbstractDatabusStreamConfiguration extends CommonStreamingConfiguration {
79

810
private final boolean enabled;
911

1012
private final String dbusAppName;
1113

1214
private final String dbusStreamName;
1315

14-
public AbstractDatabusStreamConfiguration(boolean enabled, String dbusAppName, String dbusStreamName) {
16+
private final boolean streamingEnabled;
17+
18+
public AbstractDatabusStreamConfiguration(boolean enabled, String dbusAppName, String dbusStreamName, boolean streamingEnabled) {
1519
this.enabled = enabled;
1620
this.dbusAppName = dbusAppName;
1721
this.dbusStreamName = dbusStreamName;
22+
this.streamingEnabled = streamingEnabled;
1823
}
1924

2025
public boolean isEnabled() {
@@ -29,6 +34,11 @@ public String getDbusStreamName() {
2934
return dbusStreamName;
3035
}
3136

37+
@Override
38+
public boolean isStreamingEnabled() {
39+
return streamingEnabled;
40+
}
41+
3242
public abstract String getDbusServiceName();
3343

3444
public abstract String getDbusAppNameKey();
@@ -39,4 +49,14 @@ public Map<String, String> getDbusConfigs() {
3949
map.put(String.format("dbus%sAppName", getDbusServiceName()), dbusAppName);
4050
return map;
4151
}
52+
53+
@Override
54+
public String toString() {
55+
return "AbstractDatabusStreamConfiguration{" +
56+
"enabled=" + enabled +
57+
", dbusAppName='" + dbusAppName + '\'' +
58+
", dbusStreamName='" + dbusStreamName + '\'' +
59+
", streamingEnabled=" + streamingEnabled +
60+
'}';
61+
}
4262
}

common/src/main/java/com/sequenceiq/cloudbreak/telemetry/logcollection/ClusterLogsCollectionConfiguration.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ public class ClusterLogsCollectionConfiguration extends AbstractDatabusStreamCon
1111
public ClusterLogsCollectionConfiguration(
1212
@Value("${cluster.logs.collection.enabled:false}") boolean enabled,
1313
@Value("${cluster.logs.collection.dbus.app.name:}") String dbusAppName,
14-
@Value("${cluster.logs.collection.dbus.stream.name:LogCollection}") String dbusStreamName) {
15-
super(enabled, dbusAppName, dbusStreamName);
14+
@Value("${cluster.logs.collection.dbus.stream.name:LogCollection}") String dbusStreamName,
15+
@Value("${cluster.logs.collection.streaming-enabled:false}") boolean streamingEnabled) {
16+
super(enabled, dbusAppName, dbusStreamName, streamingEnabled);
1617
}
1718

1819
@Override

common/src/main/java/com/sequenceiq/cloudbreak/telemetry/messagebroker/MessageBrokerConfiguration.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,23 @@ public class MessageBrokerConfiguration extends AbstractDatabusStreamConfigurati
1212

1313
private final String processor;
1414

15+
private final int numberOfWorkers;
16+
17+
private final int queueSizeLimit;
18+
1519
public MessageBrokerConfiguration(@Value("${telemetry.usage.messagebroker.enabled}") boolean enabled,
1620
@Value("${telemetry.usage.messagebroker.dbus-app-name}") String dbusAppName,
1721
@Value("${telemetry.usage.messagebroker.dbus-stream-name}") String dbusStreamName,
22+
@Value("${telemetry.usage.messagebroker.streaming-enabled}") boolean streamingEnabled,
1823
@Value("${telemetry.usage.messagebroker.headers.origin}") String origin,
19-
@Value("${telemetry.usage.messagebroker.headers.processor}") String processor) {
20-
super(enabled, dbusAppName, dbusStreamName);
24+
@Value("${telemetry.usage.messagebroker.headers.processor}") String processor,
25+
@Value("${telemetry.usage.messagebroker.workers:1}") int numberOfWorkers,
26+
@Value("${telemetry.usage.messagebroker.queueSizeLimit:2000}") int queueSizeLimit) {
27+
super(enabled, dbusAppName, dbusStreamName, streamingEnabled);
2128
this.origin = origin;
2229
this.processor = processor;
30+
this.numberOfWorkers = numberOfWorkers;
31+
this.queueSizeLimit = queueSizeLimit;
2332
}
2433

2534
@Override
@@ -32,6 +41,16 @@ public String getDbusAppNameKey() {
3241
return "usage-events-app";
3342
}
3443

44+
@Override
45+
public int getNumberOfWorkers() {
46+
return numberOfWorkers;
47+
}
48+
49+
@Override
50+
public int getQueueSizeLimit() {
51+
return queueSizeLimit;
52+
}
53+
3554
public String getOrigin() {
3655
return origin;
3756
}

common/src/main/java/com/sequenceiq/cloudbreak/telemetry/metering/MeteringConfiguration.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ public class MeteringConfiguration extends AbstractDatabusStreamConfiguration {
1010

1111
public MeteringConfiguration(@Value("${metering.enabled:false}") boolean enabled,
1212
@Value("${metering.dbus.app.name:}") String dbusAppName,
13-
@Value("${metering.dbus.stream.name:Metering}") String dbusStreamName) {
14-
super(enabled, dbusAppName, dbusStreamName);
13+
@Value("${metering.dbus.stream.name:Metering}") String dbusStreamName,
14+
@Value("${metering.consumption.enabled:false}") boolean streamingEnabled) {
15+
super(enabled, dbusAppName, dbusStreamName, streamingEnabled);
1516
}
1617

1718
@Override
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package com.sequenceiq.cloudbreak.telemetry.metering;
22

3+
import javax.annotation.PostConstruct;
4+
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
37
import org.springframework.beans.factory.annotation.Value;
48
import org.springframework.stereotype.Component;
59

@@ -8,24 +12,36 @@
812
@Component
913
public class MeteringConsumptionConfiguration extends AbstractDatabusStreamConfiguration {
1014

15+
private static final Logger LOGGER = LoggerFactory.getLogger(MeteringConsumptionConfiguration.class);
16+
1117
private final MeteringConfiguration meteringConfiguration;
1218

13-
private final boolean consumptionEnabled;
19+
private final int numberOfWorkers;
20+
21+
private final int queueSizeLimit;
1422

1523
public MeteringConsumptionConfiguration(MeteringConfiguration meteringConfiguration,
16-
@Value("${metering.consumption.enabled:false}") boolean consumptionEnabled) {
17-
super(meteringConfiguration.isEnabled(), meteringConfiguration.getDbusAppName(), meteringConfiguration.getDbusStreamName());
24+
@Value("${telemetry.usage.messagebroker.workers:1}") int numberOfWorkers,
25+
@Value("${telemetry.usage.messagebroker.queueSizeLimit:2000}") int queueSizeLimit) {
26+
super(meteringConfiguration.isEnabled(), meteringConfiguration.getDbusAppName(),
27+
meteringConfiguration.getDbusStreamName(), meteringConfiguration.isStreamingEnabled());
1828
this.meteringConfiguration = meteringConfiguration;
19-
this.consumptionEnabled = consumptionEnabled;
29+
this.numberOfWorkers = numberOfWorkers;
30+
this.queueSizeLimit = queueSizeLimit;
2031
}
2132

22-
public boolean isConsumptionEnabled() {
23-
return consumptionEnabled;
33+
@PostConstruct
34+
public void init() {
35+
if (isEnabled()) {
36+
LOGGER.info("Consumption is enabled. Configuration: {}", this);
37+
} else {
38+
LOGGER.info("Consumption is disabled. Configuration: {}", this);
39+
}
2440
}
2541

2642
@Override
2743
public boolean isEnabled() {
28-
return meteringConfiguration.isEnabled() && consumptionEnabled;
44+
return meteringConfiguration.isEnabled() && isStreamingEnabled();
2945
}
3046

3147
@Override
@@ -37,4 +53,23 @@ public String getDbusServiceName() {
3753
public String getDbusAppNameKey() {
3854
return meteringConfiguration.getDbusAppNameKey();
3955
}
56+
57+
@Override
58+
public int getNumberOfWorkers() {
59+
return numberOfWorkers;
60+
}
61+
62+
@Override
63+
public int getQueueSizeLimit() {
64+
return queueSizeLimit;
65+
}
66+
67+
@Override
68+
public String toString() {
69+
return "MeteringConsumptionConfiguration{" +
70+
"meteringConfiguration=" + meteringConfiguration +
71+
", numberOfWorkers=" + numberOfWorkers +
72+
", queueSizeLimit=" + queueSizeLimit +
73+
"} " + super.toString();
74+
}
4075
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.sequenceiq.cloudbreak.telemetry.streaming;
2+
3+
public abstract class CommonStreamingConfiguration {
4+
5+
private static final int DEFAULT_NUMBER_OF_WORKERS = 1;
6+
7+
private static final int DEFAULT_QUEUE_SIZE_LIMIT = 2000;
8+
9+
public boolean isStreamingEnabled() {
10+
return false;
11+
}
12+
13+
public int getNumberOfWorkers() {
14+
return DEFAULT_NUMBER_OF_WORKERS;
15+
}
16+
17+
public int getQueueSizeLimit() {
18+
return DEFAULT_QUEUE_SIZE_LIMIT;
19+
}
20+
}

common/src/main/java/com/sequenceiq/cloudbreak/telemetry/support/SupportBundleConfiguration.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ public class SupportBundleConfiguration extends AbstractDatabusStreamConfigurati
1111
public SupportBundleConfiguration(
1212
@Value("${cluster.support.bundle.enabled:false}") boolean enabled,
1313
@Value("${cluster.support.bundle.dbus.app.name:}") String dbusAppName,
14-
@Value("${cluster.support.bundle.dbus.stream.name:UnifiedDiagnostics}") String dbusStreamName) {
15-
super(enabled, dbusAppName, dbusStreamName);
14+
@Value("${cluster.support.bundle.dbus.stream.name:UnifiedDiagnostics}") String dbusStreamName,
15+
@Value("${cluster.support.bundle.streaming-enabled:false}") boolean streamingEnabled) {
16+
super(enabled, dbusAppName, dbusStreamName, streamingEnabled);
1617
}
1718

1819
@Override

common/src/main/resources/common-config.yml

+1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ telemetry:
101101
queueSizeLimit: 2000
102102
dbus-app-name: manowar_dev-mow-UsageEvents
103103
dbus-stream-name: manowar_dev-mow-UsageEvents
104+
streaming-enabled: false
104105
headers:
105106
origin: CONTROL_PLANE
106107
processor: NO_EDH

common/src/test/java/com/sequenceiq/cloudbreak/telemetry/fluent/FluentConfigServiceTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ public class FluentConfigServiceTest {
4545
@Before
4646
public void setUp() {
4747
MeteringConfiguration meteringConfiguration =
48-
new MeteringConfiguration(false, "dbusApp", "dbusStream");
48+
new MeteringConfiguration(false, "dbusApp", "dbusStream", false);
4949
ClusterLogsCollectionConfiguration logCollectionConfig =
50-
new ClusterLogsCollectionConfiguration(false, null, null);
50+
new ClusterLogsCollectionConfiguration(false, null, null, false);
5151
TelemetryConfiguration telemetryConfiguration =
5252
new TelemetryConfiguration(null, meteringConfiguration, logCollectionConfig, null, null);
5353
underTest = new FluentConfigService(new S3ConfigGenerator(), new AdlsGen2ConfigGenerator(), new GcsConfigGenerator(),

common/src/test/java/com/sequenceiq/cloudbreak/telemetry/metering/MeteringConfigServiceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class MeteringConfigServiceTest {
2424

2525
@Before
2626
public void setUp() {
27-
MeteringConfiguration meteringConfiguration = new MeteringConfiguration(true, "app", "stream");
27+
MeteringConfiguration meteringConfiguration = new MeteringConfiguration(true, "app", "stream", false);
2828
TelemetryComponentUpgradeConfiguration meteringAgentConfig = new TelemetryComponentUpgradeConfiguration();
2929
meteringAgentConfig.setDesiredDate("2021-01-01");
3030
TelemetryUpgradeConfiguration upgradeConfigs = new TelemetryUpgradeConfiguration();

core/src/test/java/com/sequenceiq/cloudbreak/controller/validation/diagnostics/DiagnosticsCollectionValidatorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class DiagnosticsCollectionValidatorTest {
3636
@BeforeEach
3737
public void setUp() {
3838
underTest = new DiagnosticsCollectionValidator(
39-
new SupportBundleConfiguration(false, null, null), entitlementService);
39+
new SupportBundleConfiguration(false, null, null, false), entitlementService);
4040
}
4141

4242
@Test

core/src/test/java/com/sequenceiq/cloudbreak/converter/v4/stacks/TelemetryConverterTest.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ public class TelemetryConverterTest {
5151
@Before
5252
public void setUp() {
5353
AltusDatabusConfiguration altusDatabusConfiguration = new AltusDatabusConfiguration(DATABUS_ENDPOINT, DATABUS_S3_BUCKET, true, "****", "****");
54-
MeteringConfiguration meteringConfiguration = new MeteringConfiguration(true, "app", "stream");
55-
ClusterLogsCollectionConfiguration logCollectionConfig = new ClusterLogsCollectionConfiguration(true, "app", "stream");
54+
MeteringConfiguration meteringConfiguration = new MeteringConfiguration(true, "app", "stream", false);
55+
ClusterLogsCollectionConfiguration logCollectionConfig = new ClusterLogsCollectionConfiguration(true, "app", "stream", false);
5656
MonitoringConfiguration monitoringConfig = new MonitoringConfiguration();
5757
monitoringConfig.setEnabled(true);
5858
TelemetryConfiguration telemetryConfiguration =
@@ -230,8 +230,8 @@ public void testConvertFromEnvAndSdxResponseWithDefaultDisabled() {
230230
// GIVEN
231231
SdxClusterResponse sdxClusterResponse = null;
232232
AltusDatabusConfiguration altusDatabusConfiguration = new AltusDatabusConfiguration(DATABUS_ENDPOINT, DATABUS_S3_BUCKET, false, "", null);
233-
MeteringConfiguration meteringConfiguration = new MeteringConfiguration(true, null, null);
234-
ClusterLogsCollectionConfiguration logCollectionConfig = new ClusterLogsCollectionConfiguration(true, null, null);
233+
MeteringConfiguration meteringConfiguration = new MeteringConfiguration(true, null, null, false);
234+
ClusterLogsCollectionConfiguration logCollectionConfig = new ClusterLogsCollectionConfiguration(true, null, null, false);
235235
MonitoringConfiguration monitoringConfig = new MonitoringConfiguration();
236236
monitoringConfig.setEnabled(true);
237237
TelemetryConfiguration telemetryConfiguration =
@@ -307,8 +307,8 @@ public void testConvertFromEnvAndSdxResponseWithWADisabledGlobally() {
307307
sdxClusterResponse.setCrn("crn:cdp:cloudbreak:us-west-1:someone:sdxcluster:sdxId");
308308
sdxClusterResponse.setName("sdxName");
309309
AltusDatabusConfiguration altusDatabusConfiguration = new AltusDatabusConfiguration(DATABUS_ENDPOINT, DATABUS_S3_BUCKET, false, "", null);
310-
MeteringConfiguration meteringConfiguration = new MeteringConfiguration(true, null, null);
311-
ClusterLogsCollectionConfiguration logCollectionConfig = new ClusterLogsCollectionConfiguration(true, null, null);
310+
MeteringConfiguration meteringConfiguration = new MeteringConfiguration(true, null, null, false);
311+
ClusterLogsCollectionConfiguration logCollectionConfig = new ClusterLogsCollectionConfiguration(true, null, null, false);
312312
MonitoringConfiguration monitoringConfig = new MonitoringConfiguration();
313313
monitoringConfig.setEnabled(true);
314314
TelemetryConfiguration telemetryConfiguration =

databus-connector/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ buildscript {
1313
dependencies {
1414
implementation project(':grpc-common')
1515
implementation project(':auth-connector')
16+
implementation project(':streaming-common')
1617

1718
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-jersey', version: springBootVersion
1819
implementation group: 'net.bytebuddy', name: 'byte-buddy', version: '1.10.11'

databus-connector/src/main/java/com/sequenceiq/cloudbreak/sigmadbus/SigmaDatabusClient.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
import com.sequenceiq.cloudbreak.logger.MDCBuilder;
2727
import com.sequenceiq.cloudbreak.logger.MdcContext;
2828
import com.sequenceiq.cloudbreak.sigmadbus.config.SigmaDatabusConfig;
29-
import com.sequenceiq.cloudbreak.sigmadbus.model.DatabusRecordProcessingException;
3029
import com.sequenceiq.cloudbreak.sigmadbus.model.DatabusRequest;
3130
import com.sequenceiq.cloudbreak.sigmadbus.model.DatabusRequestContext;
31+
import com.sequenceiq.cloudbreak.streaming.model.StreamProcessingException;
3232
import com.sequenceiq.cloudbreak.telemetry.databus.AbstractDatabusStreamConfiguration;
3333

3434
import io.grpc.ManagedChannel;
@@ -62,9 +62,9 @@ public SigmaDatabusClient(Tracer tracer,
6262
/**
6363
* Upload data into databus. If the payload is larger than 1 MB, the data will be uploaded to cloudera S3.
6464
* @param request databus record payload input
65-
* @throws DatabusRecordProcessingException error during databus record processing
65+
* @throws StreamProcessingException error during databus record processing
6666
*/
67-
public void putRecord(DatabusRequest request) throws DatabusRecordProcessingException {
67+
public void putRecord(DatabusRequest request) throws StreamProcessingException {
6868
ManagedChannelWrapper channelWrapper = getMessageWrapper();
6969
DbusProto.PutRecordRequest recordRequest = convert(request, databusStreamConfiguration);
7070
String requestId = MDCBuilder.getOrGenerateRequestId();
@@ -85,11 +85,11 @@ public void putRecord(DatabusRequest request) throws DatabusRecordProcessingExce
8585
LOGGER.debug("Dbus record can be uploaded to s3 [record id: {}], [s3 url: {}], [stream: {}]", recordId, s3BucketUrl, streamName);
8686
uploadRecordToS3(s3BucketUrl, request, recordId);
8787
} else {
88-
throw new DatabusRecordProcessingException(String.format("Cannot process record to Sigma Databus. [stream: %s]", streamName));
88+
throw new StreamProcessingException(String.format("Cannot process record to Sigma Databus. [stream: %s]", streamName));
8989
}
9090
}
9191

92-
private void uploadRecordToS3(String s3Url, DatabusRequest request, String recordId) throws DatabusRecordProcessingException {
92+
private void uploadRecordToS3(String s3Url, DatabusRequest request, String recordId) throws StreamProcessingException {
9393
String payload = getPayload(request);
9494
try {
9595
HttpClient client = HttpClient.newHttpClient();
@@ -103,10 +103,10 @@ private void uploadRecordToS3(String s3Url, DatabusRequest request, String recor
103103
if (Response.Status.Family.SUCCESSFUL.equals(statusFamily) || Response.Status.Family.REDIRECTION.equals(statusFamily)) {
104104
LOGGER.debug("Databus record with id {} successfully uploaded to s3.", recordId);
105105
} else {
106-
throw new DatabusRecordProcessingException(String.format("S3 upload failed for databus record with id %s", recordId));
106+
throw new StreamProcessingException(String.format("S3 upload failed for databus record with id %s", recordId));
107107
}
108108
} catch (URISyntaxException | IOException | InterruptedException e) {
109-
throw new DatabusRecordProcessingException(String.format("Error during uploading record with id %s to s3", recordId), e);
109+
throw new StreamProcessingException(String.format("Error during uploading record with id %s to s3", recordId), e);
110110
}
111111
}
112112

databus-connector/src/main/java/com/sequenceiq/cloudbreak/sigmadbus/model/DatabusRecordProcessingException.java

-16
This file was deleted.

0 commit comments

Comments
 (0)