Skip to content

Commit 13da6aa

Browse files
doktoriclnardai
authored andcommitted
CB-17744 Integration consumption service with EFS. Adding the cloudprovider interaction. Tested with curl -X POST 'http://localhost:8099/consumption/api/v1/internal/consumption/schedule?accountId=default&initiatorUserCrn=crn:cdp:iam:us-west-1:hortonworks:user:[email protected]' \ -H 'Content-Type: application/json' \ -H 'x-cdp-actor-crn: crn:cdp:iam:us-west-1:altus:user:__internal__actor__' \ -d '{environmentCrn:crn:cdp:environments:us-west-1:hortonworks:environment:5302bdc9-9231-424a-9973-603ccc326bb7,monitoredResourceType:ENVIRONMENT,monitoredResourceCrn:crn:cdp:environments:us-west-1:hortonworks:environment:5302bdc9-9231-424a-9973-603ccc326bb7,cloudResourceId:fs-03fb582194c0b190f,consumptionType:ELASTIC_FILESYSTEM,monitoredResourceName:rdoktorics-310}'
1 parent d94fd1a commit 13da6aa

File tree

13 files changed

+327
-11
lines changed

13 files changed

+327
-11
lines changed

cloud-api/src/main/java/com/sequenceiq/cloudbreak/cloud/ConsumptionCalculator.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ public interface ConsumptionCalculator {
2222

2323
StorageType storageType();
2424

25-
CloudPlatform cloudPlatform();
25+
MeteringEventsProto.ServiceType.Value getMeteringServiceType();
26+
27+
MeteringEventsProto.ServiceFeature.Value getServiceFeature();
2628

29+
CloudPlatform cloudPlatform();
2730
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.sequenceiq.cloudbreak.cloud.aws.common.connector.resource;
2+
3+
import java.util.Date;
4+
5+
import javax.inject.Inject;
6+
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import org.springframework.stereotype.Service;
10+
11+
import com.amazonaws.services.elasticfilesystem.model.AmazonElasticFileSystemException;
12+
import com.amazonaws.services.elasticfilesystem.model.DescribeFileSystemsRequest;
13+
import com.amazonaws.services.elasticfilesystem.model.DescribeFileSystemsResult;
14+
import com.sequenceiq.cloudbreak.cloud.aws.common.CommonAwsClient;
15+
import com.sequenceiq.cloudbreak.cloud.aws.common.client.AmazonEfsClient;
16+
import com.sequenceiq.cloudbreak.cloud.aws.common.view.AwsCredentialView;
17+
import com.sequenceiq.cloudbreak.cloud.exception.CloudConnectorException;
18+
import com.sequenceiq.cloudbreak.cloud.model.CloudCredential;
19+
20+
@Service
21+
public class AwsEfsCommonService {
22+
23+
private static final Logger LOGGER = LoggerFactory.getLogger(AwsEfsCommonService.class);
24+
25+
@Inject
26+
private CommonAwsClient awsClient;
27+
28+
public DescribeFileSystemsResult getEfsSize(CloudCredential cloudCredential, String region, Date startTime, Date endTime, String efsId) {
29+
try {
30+
AwsCredentialView credentialView = new AwsCredentialView(cloudCredential);
31+
AmazonEfsClient elasticFileSystemClient = awsClient.createElasticFileSystemClient(credentialView, region);
32+
DescribeFileSystemsRequest describeFileSystemsRequest = new DescribeFileSystemsRequest();
33+
describeFileSystemsRequest.setFileSystemId(efsId);
34+
DescribeFileSystemsResult describeFileSystemsResult = elasticFileSystemClient.describeFileSystems(describeFileSystemsRequest);
35+
LOGGER.info("Successfully queried efs for {} and timeframe from {} to {}. Returned number of datapoints: {}",
36+
efsId, startTime, endTime, describeFileSystemsResult.getFileSystems().stream().findFirst().get().getSizeInBytes());
37+
return describeFileSystemsResult;
38+
} catch (AmazonElasticFileSystemException e) {
39+
String message = String.format("Cannot get size for efs %s and timeframe from %s to %s. Reason: %s",
40+
efsId, startTime, endTime, e.getMessage());
41+
LOGGER.error(message, e);
42+
throw new CloudConnectorException(message, e);
43+
}
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package com.sequenceiq.cloudbreak.cloud.aws.common.consumption;
2+
3+
import java.util.Optional;
4+
5+
import javax.inject.Inject;
6+
import javax.validation.ValidationException;
7+
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import org.springframework.stereotype.Service;
11+
12+
import com.amazonaws.services.elasticfilesystem.model.DescribeFileSystemsResult;
13+
import com.amazonaws.services.elasticfilesystem.model.FileSystemDescription;
14+
import com.cloudera.thunderhead.service.metering.events.MeteringEventsProto;
15+
import com.sequenceiq.cloudbreak.cloud.ConsumptionCalculator;
16+
import com.sequenceiq.cloudbreak.cloud.aws.common.connector.resource.AwsEfsCommonService;
17+
import com.sequenceiq.cloudbreak.cloud.exception.CloudConnectorException;
18+
import com.sequenceiq.cloudbreak.cloud.model.CloudConsumption;
19+
import com.sequenceiq.cloudbreak.cloud.model.StorageSizeRequest;
20+
import com.sequenceiq.cloudbreak.cloud.model.StorageSizeResponse;
21+
import com.sequenceiq.cloudbreak.common.mappable.CloudPlatform;
22+
import com.sequenceiq.cloudbreak.common.mappable.StorageType;
23+
import com.sequenceiq.common.model.FileSystemType;
24+
25+
@Service
26+
public class AwsEFSConsumptionCalculator implements ConsumptionCalculator {
27+
28+
private static final StorageType EFS = StorageType.EFS;
29+
30+
private static final Logger LOGGER = LoggerFactory.getLogger(AwsEFSConsumptionCalculator.class);
31+
32+
private static final long NO_BYTE_IN_MB = 1000L * 1000L;
33+
34+
private static final int DATE_RANGE_WIDTH_IN_DAYS = 2;
35+
36+
@Inject
37+
private AwsEfsCommonService awsEfsCommonService;
38+
39+
@Override
40+
public void validate(CloudConsumption cloudConsumption) throws ValidationException {
41+
String storageLocation = cloudConsumption.getStorageLocation();
42+
if (storageLocation == null || !storageLocation.startsWith("fs-")) {
43+
throw new ValidationException(String.format("EFS id must start with 'fs-' if required file system type is '%s'!",
44+
FileSystemType.EFS.name()));
45+
}
46+
}
47+
48+
@Override
49+
public StorageSizeResponse calculate(StorageSizeRequest request) {
50+
DescribeFileSystemsResult result = awsEfsCommonService.getEfsSize(
51+
request.getCredential(),
52+
request.getRegion().value(),
53+
request.getStartTime(),
54+
request.getEndTime(),
55+
request.getObjectStoragePath());
56+
Optional<FileSystemDescription> latestFileSystemDescription = result.getFileSystems().stream().findFirst();
57+
if (latestFileSystemDescription.isPresent()) {
58+
FileSystemDescription fileSystemDescription = latestFileSystemDescription.get();
59+
LOGGER.debug("Gathered FileSystemDescription from EFS: {}", fileSystemDescription);
60+
return StorageSizeResponse.builder().withStorageInBytes(fileSystemDescription.getSizeInBytes().getValue()).build();
61+
} else {
62+
String message = String.format("No Efs were returned by efs id %s and timeframe from %s to %s",
63+
request.getObjectStoragePath(), request.getStartTime().toString(), request.getEndTime().toString());
64+
LOGGER.error(message);
65+
throw new CloudConnectorException(message);
66+
}
67+
}
68+
69+
@Override
70+
public String getObjectId(String objectId) {
71+
return objectId;
72+
}
73+
74+
@Override
75+
public MeteringEventsProto.StorageHeartbeat convertToStorageHeartbeat(CloudConsumption cloudConsumption, double sizeInBytes) {
76+
MeteringEventsProto.StorageHeartbeat.Builder storageHeartbeatBuilder = MeteringEventsProto.StorageHeartbeat.newBuilder();
77+
validate(cloudConsumption);
78+
MeteringEventsProto.Storage.Builder storageBuilder = MeteringEventsProto.Storage.newBuilder();
79+
storageBuilder.setId(getObjectId(cloudConsumption.getStorageLocation()));
80+
storageBuilder.setSizeInMB(sizeInBytes / NO_BYTE_IN_MB);
81+
storageBuilder.setType(MeteringEventsProto.StorageType.Value.EFS);
82+
storageHeartbeatBuilder.addStorages(storageBuilder.build());
83+
84+
MeteringEventsProto.MeteredResourceMetadata.Builder metaBuilder = MeteringEventsProto.MeteredResourceMetadata.newBuilder();
85+
metaBuilder.setEnvironmentCrn(cloudConsumption.getEnvironmentCrn());
86+
storageHeartbeatBuilder.setMeteredResourceMetadata(metaBuilder.build());
87+
88+
MeteringEventsProto.StorageHeartbeat ret = storageHeartbeatBuilder.build();
89+
LOGGER.debug("Converted StorageHeartbeat event: {}", ret);
90+
return ret;
91+
}
92+
93+
@Override
94+
public int dateRangeInDays() {
95+
return DATE_RANGE_WIDTH_IN_DAYS;
96+
}
97+
98+
@Override
99+
public StorageType storageType() {
100+
return EFS;
101+
}
102+
103+
@Override
104+
public MeteringEventsProto.ServiceType.Value getMeteringServiceType() {
105+
return MeteringEventsProto.ServiceType.Value.ENVIRONMENT;
106+
}
107+
108+
@Override
109+
public MeteringEventsProto.ServiceFeature.Value getServiceFeature() {
110+
return MeteringEventsProto.ServiceFeature.Value.FILE_STORAGE;
111+
}
112+
113+
@Override
114+
public CloudPlatform cloudPlatform() {
115+
return CloudPlatform.AWS;
116+
}
117+
}

cloud-aws-common/src/main/java/com/sequenceiq/cloudbreak/cloud/aws/common/consumption/AwsS3ConsumptionCalculator.java

+10
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,16 @@ public StorageType storageType() {
9898
return S3;
9999
}
100100

101+
@Override
102+
public MeteringEventsProto.ServiceType.Value getMeteringServiceType() {
103+
return MeteringEventsProto.ServiceType.Value.ENVIRONMENT;
104+
}
105+
106+
@Override
107+
public MeteringEventsProto.ServiceFeature.Value getServiceFeature() {
108+
return MeteringEventsProto.ServiceFeature.Value.OBJECT_STORAGE;
109+
}
110+
101111
@Override
102112
public CloudPlatform cloudPlatform() {
103113
return CloudPlatform.AWS;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package com.sequenceiq.cloudbreak.cloud.aws.common.consumption;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertThrows;
5+
import static org.mockito.Mockito.verify;
6+
import static org.mockito.Mockito.when;
7+
8+
import java.time.Instant;
9+
import java.time.temporal.ChronoUnit;
10+
import java.util.Date;
11+
import java.util.List;
12+
13+
import javax.validation.ValidationException;
14+
15+
import org.junit.jupiter.api.Test;
16+
import org.junit.jupiter.api.extension.ExtendWith;
17+
import org.junit.jupiter.params.ParameterizedTest;
18+
import org.junit.jupiter.params.provider.MethodSource;
19+
import org.mockito.InjectMocks;
20+
import org.mockito.Mock;
21+
import org.mockito.junit.jupiter.MockitoExtension;
22+
23+
import com.amazonaws.services.elasticfilesystem.model.DescribeFileSystemsResult;
24+
import com.amazonaws.services.elasticfilesystem.model.FileSystemDescription;
25+
import com.amazonaws.services.elasticfilesystem.model.FileSystemSize;
26+
import com.sequenceiq.cloudbreak.cloud.aws.common.connector.resource.AwsEfsCommonService;
27+
import com.sequenceiq.cloudbreak.cloud.exception.CloudConnectorException;
28+
import com.sequenceiq.cloudbreak.cloud.model.CloudConsumption;
29+
import com.sequenceiq.cloudbreak.cloud.model.Region;
30+
import com.sequenceiq.cloudbreak.cloud.model.StorageSizeRequest;
31+
import com.sequenceiq.cloudbreak.cloud.model.StorageSizeResponse;
32+
import com.sequenceiq.common.model.FileSystemType;
33+
34+
@ExtendWith(MockitoExtension.class)
35+
class AwsEFSConsumptionCalculatorTest {
36+
37+
private static final String BUCKET_NAME = "fs-123345";
38+
39+
private static final String REGION_NAME = "bucket-location";
40+
41+
private static final String ERROR_MESSAGE = "errormessage";
42+
43+
private static final double DOUBLE_ASSERT_EPSILON = 0.001;
44+
45+
private static final String EFS_OBJECT_PATH = "fs-12312";
46+
47+
private static final String ABFS_OBJECT_PATH = "abfs://[email protected]/PATH";
48+
49+
@Mock
50+
private AwsEfsCommonService awsEfsCommonService;
51+
52+
@InjectMocks
53+
private AwsEFSConsumptionCalculator underTest;
54+
55+
@Test
56+
public void getEfsDoesNotExistThrowsException() {
57+
Date startTime = Date.from(Instant.now().minus(42, ChronoUnit.MINUTES));
58+
Date endTime = Date.from(Instant.now());
59+
Region region = Region.region(REGION_NAME);
60+
StorageSizeRequest request = StorageSizeRequest.builder()
61+
.withObjectStoragePath(BUCKET_NAME)
62+
.withStartTime(startTime)
63+
.withEndTime(endTime)
64+
.withRegion(region)
65+
.build();
66+
67+
DescribeFileSystemsResult statisticsResult = new DescribeFileSystemsResult()
68+
.withFileSystems(List.of());
69+
when(awsEfsCommonService.getEfsSize(null, REGION_NAME, startTime, endTime, BUCKET_NAME)).thenReturn(statisticsResult);
70+
71+
CloudConnectorException ex = assertThrows(CloudConnectorException.class, () -> underTest.calculate(request));
72+
73+
verify(awsEfsCommonService).getEfsSize(null, REGION_NAME, startTime, endTime, BUCKET_NAME);
74+
assertEquals(String.format("No Efs were returned by efs id %s and timeframe from %s to %s",
75+
BUCKET_NAME, startTime, endTime), ex.getMessage());
76+
}
77+
78+
@Test
79+
public void getObjectStorageSizeOneDatapoint() {
80+
Date startTime = Date.from(Instant.now().minus(42, ChronoUnit.MINUTES));
81+
Date endTime = Date.from(Instant.now());
82+
Region region = Region.region(REGION_NAME);
83+
StorageSizeRequest request = StorageSizeRequest.builder()
84+
.withObjectStoragePath(BUCKET_NAME)
85+
.withStartTime(startTime)
86+
.withEndTime(endTime)
87+
.withRegion(region)
88+
.build();
89+
90+
FileSystemDescription description = new FileSystemDescription()
91+
.withCreationTime(Date.from(Instant.now()))
92+
.withSizeInBytes(new FileSystemSize().withValue(42L));
93+
DescribeFileSystemsResult statisticsResult = new DescribeFileSystemsResult()
94+
.withFileSystems(List.of(description));
95+
when(awsEfsCommonService.getEfsSize(null, REGION_NAME, startTime, endTime, BUCKET_NAME)).thenReturn(statisticsResult);
96+
97+
StorageSizeResponse result = underTest.calculate(request);
98+
99+
verify(awsEfsCommonService).getEfsSize(null, REGION_NAME, startTime, endTime, BUCKET_NAME);
100+
assertEquals(42.0, result.getStorageInBytes(), DOUBLE_ASSERT_EPSILON);
101+
}
102+
103+
@Test
104+
public void testEfsName() {
105+
assertEquals("fs-12312", underTest.getObjectId(EFS_OBJECT_PATH));
106+
}
107+
108+
@ParameterizedTest(name = "With requiredType={0} and storageLocation={1}, validation should succeed: {2}")
109+
@MethodSource("scenarios")
110+
public void testValidateCloudStorageType(FileSystemType requiredType, String storageLocation, boolean valid) {
111+
CloudConsumption cloudConsumption = CloudConsumption.builder().withStorageLocation(storageLocation).build();
112+
if (valid) {
113+
underTest.validate(cloudConsumption);
114+
} else {
115+
assertThrows(ValidationException.class, () -> underTest.validate(cloudConsumption));
116+
}
117+
}
118+
119+
static Object[][] scenarios() {
120+
return new Object[][]{
121+
{FileSystemType.EFS, EFS_OBJECT_PATH, true},
122+
{FileSystemType.EFS, ABFS_OBJECT_PATH, false},
123+
{FileSystemType.EFS, "", false},
124+
{FileSystemType.EFS, null, false},
125+
};
126+
}
127+
}

cloud-consumption-api/src/main/java/com/sequenceiq/consumption/api/v1/consumption/model/common/ConsumptionType.java

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ public enum ConsumptionType {
66

77
UNKNOWN(StorageType.UNKNOWN),
88
STORAGE(StorageType.S3),
9+
ELASTIC_FILESYSTEM(StorageType.EFS),
910
EBS(StorageType.EBS);
1011

1112
private final StorageType storageType;

cloud-consumption-api/src/main/java/com/sequenceiq/consumption/api/v1/consumption/model/request/CloudResourceConsumptionRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class CloudResourceConsumptionRequest extends ConsumptionBaseRequest {
1919
private String cloudResourceId;
2020

2121
@NotNull
22-
@ApiModelProperty(value = ConsumptionModelDescription.CONSUMPTION_TYPE, allowableValues = "UNKNOWN,STORAGE,EBS", required = true)
22+
@ApiModelProperty(value = ConsumptionModelDescription.CONSUMPTION_TYPE, allowableValues = "UNKNOWN,STORAGE,EBS,ELASTIC_FILESYSTEM", required = true)
2323
private ConsumptionType consumptionType;
2424

2525
public ConsumptionType getConsumptionType() {

cloud-consumption/src/main/java/com/sequenceiq/consumption/endpoint/ConsumptionInternalV1Controller.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public ConsumptionExistenceResponse doesStorageConsumptionCollectionExist(@Accou
8181
@InternalOnly
8282
public void scheduleConsumptionCollection(@AccountId String accountId,
8383
@Valid @NotNull CloudResourceConsumptionRequest request,
84-
@ValidCrn(resource = {CrnResourceDescriptor.USER, CrnResourceDescriptor.MACHINE_USER}) @NotEmpty String initiatorUserCrn) {
84+
@ValidCrn(resource = {CrnResourceDescriptor.USER, CrnResourceDescriptor.MACHINE_USER}) @InitiatorUserCrn @NotEmpty String initiatorUserCrn) {
8585
LOGGER.info("Registering consumption collection for resource with CRN [{}] and location [{}]",
8686
request.getMonitoredResourceCrn(), request.getCloudResourceId());
8787
scheduleCloudResourceConsumption(request, request.getConsumptionType());

cloud-consumption/src/main/java/com/sequenceiq/consumption/flow/consumption/storage/handler/SendConsumptionEventHandler.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ public Selectable executeOperation(HandlerEvent<StorageConsumptionCollectionHand
5656
if (storage != null && consumptionCalculator.isPresent()) {
5757
MeteringEventsProto.StorageHeartbeat heartbeat = consumptionCalculator.get()
5858
.convertToStorageHeartbeat(cloudConsumption, storage.getStorageInBytes());
59-
meteringEventProcessor.storageHeartbeat(heartbeat, MeteringEventsProto.ServiceType.Value.ENVIRONMENT);
59+
meteringEventProcessor.storageHeartbeat(heartbeat,
60+
consumptionCalculator.get().getMeteringServiceType(),
61+
consumptionCalculator.get().getServiceFeature()
62+
);
6063
LOGGER.debug("StorageHeartbeat event was successfully sent for Consumption with CRN [{}]", resourceCrn);
6164

6265
return StorageConsumptionCollectionEvent.builder()

cloud-consumption/src/test/java/com/sequenceiq/consumption/flow/consumption/storage/handler/SendConsumptionEventHandlerTest.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,16 @@ public void testDoAccept() {
9595
MeteringEventsProto.StorageHeartbeat heartbeat = MeteringEventsProto.StorageHeartbeat.newBuilder().build();
9696

9797
when(awsS3ConsumptionCalculator.convertToStorageHeartbeat(any(CloudConsumption.class), eq(storageResult.getStorageInBytes()))).thenReturn(heartbeat);
98-
doNothing().when(meteringEventProcessor).storageHeartbeat(heartbeat, MeteringEventsProto.ServiceType.Value.ENVIRONMENT);
98+
when(awsS3ConsumptionCalculator.getMeteringServiceType()).thenReturn(MeteringEventsProto.ServiceType.Value.ENVIRONMENT);
99+
when(awsS3ConsumptionCalculator.getServiceFeature()).thenReturn(MeteringEventsProto.ServiceFeature.Value.OBJECT_STORAGE);
100+
doNothing().when(meteringEventProcessor).storageHeartbeat(heartbeat,
101+
MeteringEventsProto.ServiceType.Value.ENVIRONMENT,
102+
MeteringEventsProto.ServiceFeature.Value.OBJECT_STORAGE);
99103
StorageConsumptionCollectionEvent result = (StorageConsumptionCollectionEvent) underTest.doAccept(new HandlerEvent<>(new Event<>(event)));
100104

101105
verify(awsS3ConsumptionCalculator).convertToStorageHeartbeat(any(CloudConsumption.class), eq(storageResult.getStorageInBytes()));
102-
verify(meteringEventProcessor).storageHeartbeat(heartbeat, MeteringEventsProto.ServiceType.Value.ENVIRONMENT);
106+
verify(meteringEventProcessor).storageHeartbeat(heartbeat, MeteringEventsProto.ServiceType.Value.ENVIRONMENT,
107+
MeteringEventsProto.ServiceFeature.Value.OBJECT_STORAGE);
103108
assertEquals(CRN, result.getResourceCrn());
104109
assertEquals(ID, result.getResourceId());
105110
assertEquals(STORAGE_CONSUMPTION_COLLECTION_FINISH_EVENT.selector(), result.selector());

common/src/main/java/com/sequenceiq/cloudbreak/common/mappable/StorageType.java

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ public enum StorageType {
44

55
UNKNOWN(CloudPlatform.MOCK),
66
S3(CloudPlatform.AWS),
7+
EFS(CloudPlatform.AWS),
78
EBS(CloudPlatform.AWS);
89

910
private final CloudPlatform cloudPlatform;

0 commit comments

Comments
 (0)