diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml
index d07d37c5..891908ac 100644
--- a/.github/workflows/docker-image.yml
+++ b/.github/workflows/docker-image.yml
@@ -1,20 +1,24 @@
-name: Docker Image CI
+name: Docker Image Publish CI ( Dockerhub and ECR )
on:
- push:
- branches: [ "master" ]
- pull_request:
- branches: [ "master" ]
+ release:
+ types: [created]
jobs:
-
build:
-
runs-on: ubuntu-latest
-
+ outputs:
+ id: git_tag_output
steps:
- name: Checkout
uses: actions/checkout@v4
+ - name: Determine tag type
+ id: tag_type
+ run: echo "::set-output name=is_release_tag::$(echo ${GITHUB_REF#refs/tags/} | grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$' || echo 'false')"
+ - name: Set version
+ id: version
+ run: |
+ echo "VERSION=${GITHUB_REF#refs/tags/}" >> $GITHUB_ENV
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
@@ -25,11 +29,42 @@ jobs:
DOCKER_PASSWORD: ${{secrets.DOCKER_PASSWORD}}
run:
docker login -u $DOCKER_USER -p $DOCKER_PASSWORD
- - name: Build and push
- uses: docker/build-push-action@v5
+ - name: Build and push dockerhub
+ uses: docker/build-push-action@v5
with:
context: .
platforms: linux/amd64,linux/arm64
push: true
- tags: onedatashare/transfer_service:latest
+ tags: onedatashare/transfer_service:${{ env.VERSION }}
+
+ push-to-ecr:
+ needs: build
+ runs-on: ubuntu-latest
+ steps:
+ - name: Determine tag type
+ id: tag_type
+ run: echo "::set-output name=is_release_tag::$(echo ${GITHUB_REF#refs/tags/} | grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$' || echo 'false')"
+ - name: Set version
+ id: version
+ run: |
+ echo "VERSION=${GITHUB_REF#refs/tags/}" >> $GITHUB_ENV
+ - name: Setup AWS ECR Details
+ uses: aws-actions/configure-aws-credentials@v3
+ with:
+ aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
+ aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
+ aws-region: us-east-1
+ - name: Login to Amazon ECR
+ id: login-pf-aws-ecr
+ uses: aws-actions/amazon-ecr-login@v2
+
+ - name: Build and push the tagged docker image to Amazon ECR
+ env:
+ ECR_REGISTRY: ${{ steps.login-pf-aws-ecr.outputs.registry }}
+ ECR_REPOSITORY: onedatashare/transfer_service
+ IMAGE_TAG: ${{ env.VERSION }}
+ run: |
+ docker pull onedatashare/transfer_service:$IMAGE_TAG
+ docker tag onedatashare/transfer_service:$IMAGE_TAG $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
+ docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
index 54411a54..fcc57b88 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,54 +1,27 @@
-FROM maven:3.9.5-amazoncorretto-21 AS build
+FROM maven:3.9.7-amazoncorretto-21 AS build
COPY src /home/app/src
COPY pom.xml /home/app
RUN mvn -f /home/app/pom.xml clean package -DskipTests
# Final Image
-FROM amazoncorretto:21-alpine-jdk
+FROM amazoncorretto:21-alpine3.18-jdk
+RUN apk update
RUN apk --no-cache add python3-dev py3-pip build-base gcc linux-headers
-RUN pip3 install pmeter-ods==1.0.8
+
+RUN pip install pmeter-ods
COPY --from=build /home/app/target/ods-transfer-service-0.0.1-SNAPSHOT.jar /usr/local/lib/ods-transfer-service-0.0.1-SNAPSHOT.jar
ENV PIP_ROOT_USER_ACTION=ignore
-ENV NODE_NAME="${NODE_NAME}"
-ENV USER_NAME="${USER_NAME}"
-ENV APP_NAME="${USER_NAME}"-"${NODE_NAME}"
-
-ENV CONNECTOR_QUEUE="${APP_NAME}"
-ENV ODS_GDRIVE_CLIENT_ID="${ODS_GDRIVE_CLIENT_ID}"
-ENV ODS_GDRIVE_CLIENT_SECRET="${ODS_GDRIVE_CLIENT_SECRET}"
-ENV ODS_GDRIVE_PROJECT_ID="onedatashare-dev"
-ENV EUREKA_URI="${EUREKA_URI}"
-ENV EUREKA_PASS="${EUREKA_PASS}"
-ENV EUREKA_USER="${EUREKA_USER}"
-ENV FOLDER_WITH_CERTS="${FOLDER_WITH_CERTS}"
-COPY ${FOLDER_WITH_CERTS} /certs/
-ENV COCKROACH_URI="${COCKROACH_URI}"
-ENV COCKROACH_USER="${COCKROACH_USER}"
-ENV COCKROACH_PASS="${COCKROACH_PASS}"
-ENV RMQ_ADDRESS="amqps://b-0e720b16-3ea7-4227-ad65-6cce3704121c.mq.us-east-2.amazonaws.com:5671"
-
-#use ODS user for your private queue.
-#create creds through aws console
-ENV AMPQ_USER="${AMPQ_USER}"
-ENV AMPQ_PWD="${AMPQ_PWD}"
#change to monitor the active NIC
ENV PMETER_CLI_OPTIONS="-NS"
ENV PMETER_NIC_INTERFACE="${PMETER_NIC_INTERFACE:-eth0}"
-ENV INFLUX_ORG="${INFLUX_ORG}"
-ENV INFLUX_BUCKET="${USER_NAME}"
-ENV INFLUX_TOKEN="${INFLUX_TOKEN}"
-ENV INFLUX_URI="https://influxdb.onedatashare.org"
ENV ENABLE_PMETER="true"
ENV PMETER_CRON_EXP="*/15 * * * * *"
-
-ENV OPTIMIZER_URL="${OPTIMIZER_URL}"
-ENV OPTIMIZER_ENABLE="${OPTIMIZER_ENABLE}"
-ENV SPRING_PROFILE="${SPRING_PROFILE:-hsql}"
+ENV SPRING_PROFILES_ACTIVE=aws,virtual,cockroach
ENV PATH "/home/ods/.local/bin:${PATH}"
@@ -56,4 +29,4 @@ RUN mkdir -p $HOME/.pmeter/
RUN touch $HOME/.pmeter/transfer_service_pmeter_measure.txt
EXPOSE 8092
-ENTRYPOINT ["java", "-Dspring.profiles.active=hsql","-jar", "/usr/local/lib/ods-transfer-service-0.0.1-SNAPSHOT.jar"]
\ No newline at end of file
+ENTRYPOINT ["java","-jar", "/usr/local/lib/ods-transfer-service-0.0.1-SNAPSHOT.jar"]
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index a5dd7ab6..1413894c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
21
- 2023.0.0-RC1
+ 2023.0.2
@@ -36,12 +36,43 @@
false
+
+ private-repository
+ Hazelcast Private Repository
+ https://repository.hazelcast.com/release/
+
+ true
+
+
+ false
+
+
com.fasterxml.jackson.datatype
jackson-datatype-jsr310
+
+ com.hazelcast
+ hazelcast-enterprise
+ 5.5.1
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-vault-config
+
+
+ org.springframework.vault
+ spring-vault-core
+ 3.1.2
+
+
+ org.springframework.cloud
+ spring-cloud-starter-bootstrap
+ 4.1.3
+
com.influxdb
influxdb-client-java
@@ -52,10 +83,6 @@
commons-pool2
2.11.1
-
- org.springframework.boot
- spring-boot-starter-amqp
-
com.box
box-java-sdk
@@ -130,20 +157,25 @@
org.springframework.boot
spring-boot-starter-actuator
+
+ org.jsoup
+ jsoup
+ 1.17.2
+
org.springframework.boot
spring-boot-starter-web
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
org.springframework.cloud
spring-cloud-starter-netflix-eureka-client
@@ -157,6 +189,16 @@
com.amazonaws
aws-java-sdk-s3
+
+ software.amazon.awssdk
+ auth
+ 2.25.67
+
+
+ software.amazon.awssdk
+ sts
+ 2.25.67
+
jakarta.servlet
jakarta.servlet-api
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/Enum/MessageType.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/Enum/MessageType.java
new file mode 100644
index 00000000..60f7a005
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/Enum/MessageType.java
@@ -0,0 +1,5 @@
+package org.onedatashare.transferservice.odstransferservice.Enum;
+
+public enum MessageType {
+ CARBON_AVG_REQUEST, TRANSFER_JOB_REQUEST, APPLICATION_PARAM_CHANGE, STOP_JOB_REQUEST, CARBON_IP_REQUEST
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/OdsTransferService.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/OdsTransferService.java
index 0a890310..c6b8bcfb 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/OdsTransferService.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/OdsTransferService.java
@@ -1,6 +1,5 @@
package org.onedatashare.transferservice.odstransferservice;
-import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/BatchConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/BatchConfig.java
index 826e235c..1d0d6ecb 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/BatchConfig.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/BatchConfig.java
@@ -7,38 +7,40 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.retry.backoff.BackOffPolicy;
+import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.concurrent.TimeUnit;
@Configuration
public class BatchConfig {
-// @Bean
-// public JobLauncher jobLauncher(JobRepository jobRepository) {
-// TaskExecutorJobLauncher taskExecutorJobLauncher = new TaskExecutorJobLauncher();
-// taskExecutorJobLauncher.setJobRepository(jobRepository);
-// return taskExecutorJobLauncher;
-// }
-
- @Bean
- public Set jobIds() {
- return new HashSet<>();
- }
-
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean
- public JobLauncher asyncJobLauncher(JobRepository jobRepository) {
+ public JobLauncher jobLauncher(JobRepository jobRepository) {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
- jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
+ SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
+ taskExecutor.setConcurrencyLimit(4);
+ jobLauncher.setTaskExecutor(taskExecutor);
return jobLauncher;
}
+
+
+ @Bean
+ public BackOffPolicy backOffPolicy() {
+ ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
+ backOffPolicy.setInitialInterval(TimeUnit.SECONDS.toMillis(5));
+ backOffPolicy.setMultiplier(2.0);
+ backOffPolicy.setMaxInterval(TimeUnit.DAYS.toMillis(1));
+ return backOffPolicy;
+ }
+
}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/HazelcastClientConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/HazelcastClientConfig.java
new file mode 100644
index 00000000..e3cc56a1
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/HazelcastClientConfig.java
@@ -0,0 +1,105 @@
+package org.onedatashare.transferservice.odstransferservice.config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.hazelcast.client.HazelcastClient;
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.config.IndexType;
+import com.hazelcast.config.SSLConfig;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.HazelcastJsonValue;
+import com.hazelcast.map.IMap;
+import org.onedatashare.transferservice.odstransferservice.service.VaultSSLService;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.core.env.Environment;
+import org.springframework.vault.core.VaultTemplate;
+
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+@Configuration
+public class HazelcastClientConfig {
+
+ private final Environment env;
+ private final ObjectMapper objectMapper;
+ private final VaultSSLService vaultSslService;
+
+ public HazelcastClientConfig(Environment environment, ObjectMapper objectMapper, VaultTemplate vaultTemplate, VaultSSLService vaultSSLService) {
+ this.env = environment;
+ this.objectMapper = objectMapper;
+ this.vaultSslService = vaultSSLService;
+ }
+
+ @Value("${spring.application.name}")
+ private String appName;
+
+ @Bean
+ @Qualifier("clientConfig")
+ @Profile("local")
+ public ClientConfig devClientConfig(SSLConfig sslConfig) {
+ ClientConfig clientConfig = new ClientConfig();
+ clientConfig.setClusterName("dev-scheduler-cluster");
+ clientConfig.getNetworkConfig().setSSLConfig(sslConfig);
+ clientConfig.setInstanceName(this.appName);
+ return clientConfig;
+ }
+
+ @Bean
+ @Qualifier("clientConfig")
+ @Profile({"prod", "eks", "ec2",})
+ public ClientConfig prodClientConfig(SSLConfig sslConfig) {
+ ClientConfig clientConfig = new ClientConfig();
+ clientConfig.setClusterName("prod-scheduler-cluster");
+ clientConfig.getNetworkConfig().setSSLConfig(sslConfig);
+ clientConfig.getNetworkConfig().addAddress(env.getProperty("hz.ipaddr", "localhost"));
+ return clientConfig;
+ }
+
+ @Bean
+ public SSLConfig sslConfig() {
+ Properties properties = new Properties();
+ properties.setProperty("protocol", "TLSv1.2");
+ properties.setProperty("mutualAuthentication", "OPTIONAL");
+ properties.setProperty("trustStore", this.vaultSslService.getStorePath().toAbsolutePath().toString());
+ properties.setProperty("trustStorePassword", env.getProperty("hz.keystore.password", "changeit"));
+ properties.setProperty("trustStoreType", "PKCS12");
+ properties.setProperty("keyMaterialDuration", this.vaultSslService.getStoreDuration().toString());
+ properties.setProperty("validateIdentity", "false");
+
+ SSLConfig sslConfig = new SSLConfig();
+ sslConfig.setEnabled(true);
+ sslConfig.setProperties(properties);
+ return sslConfig;
+ }
+
+ @Bean
+ public HazelcastInstance hazelcastInstance(ClientConfig clientConfig) {
+ clientConfig.addLabel(this.env.getProperty("spring.application.name"));
+ return HazelcastClient.newHazelcastClient(clientConfig);
+ }
+
+ @Bean
+ public IMap fileTransferNodeRegistrationMap(@Qualifier("hazelcastInstance") HazelcastInstance hazelcastInstance) {
+ return hazelcastInstance.getMap("file-transfer-node-map");
+ }
+
+ @Bean
+ public IMap fileTransferScheduleMap(@Qualifier("hazelcastInstance") HazelcastInstance hazelcastInstance) {
+ return hazelcastInstance.getMap("file-transfer-schedule-map");
+ }
+
+ @Bean
+ public IMap carbonIntensityMap(@Qualifier("hazelcastInstance") HazelcastInstance hazelcastInstance) {
+ return hazelcastInstance.getMap("carbon-intensity-map");
+ }
+
+ @Bean
+ public IQueue messageQueue(@Qualifier("hazelcastInstance") HazelcastInstance hazelcastInstance) {
+ return hazelcastInstance.getQueue(appName);
+ }
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/MetricsConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/MetricsConfig.java
deleted file mode 100644
index e952bdcd..00000000
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/MetricsConfig.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.onedatashare.transferservice.odstransferservice.config;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import lombok.Data;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.retry.backoff.FixedBackOffPolicy;
-import org.springframework.retry.support.RetryTemplate;
-
-@Configuration
-@Data
-public class MetricsConfig {
- @Bean
- public ObjectMapper pmeterMapper() {
- ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(new JavaTimeModule());
- objectMapper.configure(SerializationFeature.WRITE_DATE_KEYS_AS_TIMESTAMPS, false);
- return objectMapper;
- }
-
- @Bean
- public RetryTemplate retryTemplateForReaderAndWriter() {
- RetryTemplate retryTemplate = new RetryTemplate();
- FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
- fixedBackOffPolicy.setBackOffPeriod(2000l);
- retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
- return retryTemplate;
- }
-}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/OptimizerConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/OptimizerConfig.java
deleted file mode 100644
index b164d4f6..00000000
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/OptimizerConfig.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.onedatashare.transferservice.odstransferservice.config;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.web.client.RestTemplateBuilder;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.client.RestTemplate;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-@Configuration
-public class OptimizerConfig {
-
- @Value("${optimizer.url}")
- private String optimizerUrl;
-
- @Bean
- public RestTemplate optimizerTemplate() {
- return new RestTemplateBuilder()
- .uriTemplateHandler(new DefaultUriBuilderFactory(optimizerUrl))
- .build();
- }
-
- @Bean(name ="optimizerTaskExecutor")
- public Executor optimizerTaskExecutor(){
- return Executors.newVirtualThreadPerTaskExecutor();
- }
-}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java
deleted file mode 100644
index be81ad00..00000000
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.onedatashare.transferservice.odstransferservice.config;
-
-import com.google.gson.*;
-import org.springframework.amqp.core.*;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.util.Date;
-import java.util.Locale;
-
-@Configuration
-public class RabbitMQConfig {
-
- @Value("${ods.rabbitmq.queue}")
- String queueName;
-
- @Value("${ods.rabbitmq.exchange}")
- String exchange;
-
- @Value("${ods.rabbitmq.routingkey}")
- String routingKey;
-
- @Bean
- public Gson gson() {
- GsonBuilder builder = new GsonBuilder()
- .registerTypeAdapter(Date.class, (JsonDeserializer) (json, typeOfT, context) -> new Date(json.getAsJsonPrimitive().getAsLong()));
- return builder.create();
- }
-
- @Bean
- Queue userQueue(){
- //String name, boolean durable, boolean exclusive, boolean autoDelete
- return new Queue(this.queueName, true, false, false);
- }
-
- @Bean
- public DirectExchange exchange(){
- return new DirectExchange(exchange);
- }
-
- @Bean
- public Binding binding(DirectExchange exchange, Queue userQueue){
- return BindingBuilder.bind(userQueue)
- .to(exchange)
- .with(routingKey);
- }
-}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/DataInfluxConstants.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/DataInfluxConstants.java
index bee32170..03d38624 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/DataInfluxConstants.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/DataInfluxConstants.java
@@ -2,6 +2,7 @@
public class DataInfluxConstants {
public static final String NETWORK_INTERFACE = "interface";
+ public static final String IS_RUNNING = "isRunning";
public static final String ODS_USER = "ods_user";
public static final String TRANSFER_NODE_NAME = "transfer_node_name";
public static final String ACTIVE_CORE_COUNT = "active_core_count";
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java
index b91d561f..a9088f41 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java
@@ -6,6 +6,7 @@ public class ODSConstants {
public static final String TIME = "time";
public static final String SOURCE_HOST = "sourceURI";
public static final String SOURCE_PORT = "sourcePort";
+ public static final String CARBON_SCORE_SOURCE = "sourceCarbonScore";
public static final String SOURCE_BASE_PATH = "sourceBasePath";
public static final String DEST_BASE_PATH = "destBasePath";
public static final String FILE_COUNT = "fileCount";
@@ -15,6 +16,7 @@ public class ODSConstants {
public static final String DEST_CREDENTIAL_TYPE = "destCredentialType";
public static final String DEST_HOST = "destURI";
public static final String DEST_PORT = "destPort";
+ public static final String CARBON_SCORE_DEST = "destCarbonScore";
public static final String CHUNK_SIZE = "chunkSize";
public static final String JOB_UUID = "jobUuid";
public static final String OWNER_ID = "ownerId";
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java
deleted file mode 100644
index 46118159..00000000
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package org.onedatashare.transferservice.odstransferservice.consumer;
-
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.onedatashare.transferservice.odstransferservice.Enum.EndpointType;
-import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;
-import org.onedatashare.transferservice.odstransferservice.model.TransferJobRequest;
-import org.onedatashare.transferservice.odstransferservice.model.optimizer.TransferApplicationParams;
-import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolManager;
-import org.onedatashare.transferservice.odstransferservice.service.JobControl;
-import org.onedatashare.transferservice.odstransferservice.service.JobParamService;
-import org.onedatashare.transferservice.odstransferservice.service.VfsExpander;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.amqp.core.Message;
-import org.springframework.amqp.core.Queue;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.batch.core.JobParameters;
-import org.springframework.batch.core.JobParametersBuilder;
-import org.springframework.batch.core.launch.JobLauncher;
-import org.springframework.stereotype.Service;
-
-import java.util.ArrayList;
-import java.util.List;
-
-@Service
-public class RabbitMQConsumer {
-
- private final ObjectMapper objectMapper;
- private final ThreadPoolManager threadPoolManager;
- Logger logger = LoggerFactory.getLogger(RabbitMQConsumer.class);
-
- JobControl jc;
-
- JobLauncher jobLauncher;
-
- JobParamService jobParamService;
-
- Queue userQueue;
-
- VfsExpander vfsExpander;
-
- public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamService jobParamService, JobLauncher asyncJobLauncher, JobControl jc, ThreadPoolManager threadPoolManager) {
- this.vfsExpander = vfsExpander;
- this.userQueue = userQueue;
- this.jobParamService = jobParamService;
- this.jobLauncher = asyncJobLauncher;
- this.jc = jc;
- this.objectMapper = new ObjectMapper();
- this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
- this.objectMapper.setDefaultPropertyInclusion(JsonInclude.Include.ALWAYS);
- this.threadPoolManager = threadPoolManager;
- }
-
- @RabbitListener(queues = "#{userQueue}")
- public void consumeDefaultMessage(final Message message) {
- String jsonStr = new String(message.getBody());
-
- logger.info("Message recv: {}", jsonStr);
- try {
- TransferJobRequest request = objectMapper.readValue(jsonStr, TransferJobRequest.class);
- logger.info("Job Recieved: {}", request.toString());
-
- if (request.getSource().getType().equals(EndpointType.vfs)) {
- List fileExpandedList = vfsExpander.expandDirectory(request.getSource().getInfoList(), request.getSource().getFileSourcePath());
- request.getSource().setInfoList(new ArrayList<>(fileExpandedList));
- }
- JobParameters parameters = jobParamService.translate(new JobParametersBuilder(), request);
- jc.setRequest(request);
- jobLauncher.run(jc.concurrentJobDefinition(), parameters);
-
- return;
- } catch (Exception e) {
- logger.error("Failed to parse jsonStr: {} to TransferJobRequest.java", jsonStr);
- }
- try {
- TransferApplicationParams params = objectMapper.readValue(jsonStr, TransferApplicationParams.class);
- logger.info("Parsed TransferApplicationParams: {}", params);
- this.threadPoolManager.applyOptimizer(params.getConcurrency(), params.getParallelism());
- } catch (Exception e) {
- logger.error("Did not apply transfer params due to parsing message failure");
- }
- }
-}
\ No newline at end of file
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/JobMonitor.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/JobMonitor.java
index 3db70aff..15554efd 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/JobMonitor.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/JobMonitor.java
@@ -1,11 +1,10 @@
package org.onedatashare.transferservice.odstransferservice.controller;
-import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants;
import org.onedatashare.transferservice.odstransferservice.model.BatchJobData;
+import org.onedatashare.transferservice.odstransferservice.service.JobControl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
-import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
@@ -13,7 +12,8 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
-import java.util.*;
+import java.util.List;
+import java.util.Optional;
@RequestMapping("/api/v1/job")
@@ -21,13 +21,13 @@
public class JobMonitor {
private final JobExplorer jobExplorer;
- private Set jobIds;
+ private final JobControl jobControl;
Logger logger = LoggerFactory.getLogger(JobMonitor.class);
- public JobMonitor(JobExplorer jobExplorer, Set jobIds) {
+ public JobMonitor(JobExplorer jobExplorer, JobControl jobControl) {
+ this.jobControl = jobControl;
this.jobExplorer = jobExplorer;
- this.jobIds = jobIds;
}
@GetMapping("/execution")
@@ -44,30 +44,10 @@ public ResponseEntity getJobExecution(@RequestParam("jobId") Optio
}
}
- @GetMapping("/ids")
- public ResponseEntity> getJobIdsRun() {
- logger.info("Listing Job Ids");
- return ResponseEntity.ok(new ArrayList<>(this.jobIds));
+ @GetMapping("/latest")
+ public ResponseEntity getLatestJobExecution() {
+ JobExecution jobExecution = this.jobControl.getLatestJobExecution();
+ if(jobExecution == null) {return ResponseEntity.ok(null);}
+ return ResponseEntity.ok(BatchJobData.convertFromJobExecution(jobExecution));
}
-
-// @GetMapping("/uuid")
-// public ResponseEntity> getJobExec(@RequestParam Optional> jobIds){
-// List jobUuids = new ArrayList<>();
-// if(jobIds.isPresent()){
-// for(Long jobId: jobIds.get()){
-// JobExecution jobExecution = this.jobExplorer.getJobExecution(jobId);
-// JobParameters jobParameters = jobExecution.getJobParameters();
-// String jobUuid = jobParameters.getString(ODSConstants.JOB_UUID);
-// jobUuids.add(UUID.fromString(jobUuid));
-// }
-// }else{
-// for(Long jobId : this.jobIds){
-// JobExecution jobExecution = this.jobExplorer.getJobExecution(jobId);
-// JobParameters jobParameters = jobExecution.getJobParameters();
-// String jobUuid = jobParameters.getString(ODSConstants.JOB_UUID);
-// jobUuids.add(UUID.fromString(jobUuid));
-// }
-// }
-// return ResponseEntity.ok(jobUuids);
-// }
}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java
index fcb2bce3..be31f3d2 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java
@@ -1,20 +1,8 @@
package org.onedatashare.transferservice.odstransferservice.controller;
-import org.onedatashare.transferservice.odstransferservice.Enum.EndpointType;
-import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;
import org.onedatashare.transferservice.odstransferservice.model.TransferJobRequest;
import org.onedatashare.transferservice.odstransferservice.service.JobControl;
-import org.onedatashare.transferservice.odstransferservice.service.JobParamService;
-import org.onedatashare.transferservice.odstransferservice.service.VfsExpander;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
-import org.springframework.batch.core.JobParameters;
-import org.springframework.batch.core.JobParametersBuilder;
-import org.springframework.batch.core.launch.JobLauncher;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.RequestBody;
@@ -22,10 +10,6 @@
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
/**
* Transfer controller with to initiate transfer request
@@ -34,33 +18,17 @@
@RequestMapping("/api/v1/transfer")
public class TransferController {
- Logger logger = LoggerFactory.getLogger(TransferController.class);
-
- @Autowired
- JobControl jc;
-
- @Autowired
- JobLauncher asyncJobLauncher;
+ JobControl jobControl;
- @Autowired
- JobParamService jobParamService;
-
- @Autowired
- VfsExpander vfsExpander;
+ public TransferController(JobControl jobControl) {
+ this.jobControl = jobControl;
+ }
@RequestMapping(value = "/start", method = RequestMethod.POST)
@Async
- public ResponseEntity start(@RequestBody TransferJobRequest request) throws Exception {
- logger.info("Controller Entry point");
- if (request.getSource().getType().equals(EndpointType.vfs)) {
- List fileExpandedList = vfsExpander.expandDirectory(request.getSource().getInfoList(), request.getSource().getFileSourcePath());
- request.getSource().setInfoList(new ArrayList<>(fileExpandedList));
- }
- JobParameters parameters = jobParamService.translate(new JobParametersBuilder(), request);
- jc.setRequest(request);
- Job job = jc.concurrentJobDefinition();
- JobExecution jobExecution = asyncJobLauncher.run(job, parameters);
- return ResponseEntity.status(HttpStatus.OK).body("Your batch job has been submitted with \n ID: " + jobExecution.getJobId());
+ public ResponseEntity start(@RequestBody TransferJobRequest request) throws Exception {
+ JobExecution jobExecution = this.jobControl.runJob(request);
+ return ResponseEntity.ok(jobExecution.getJobId());
}
}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/message/MessageHandler.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/message/MessageHandler.java
new file mode 100644
index 00000000..29dd65f6
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/message/MessageHandler.java
@@ -0,0 +1,9 @@
+package org.onedatashare.transferservice.odstransferservice.message;
+
+import com.hazelcast.core.HazelcastJsonValue;
+
+import java.io.IOException;
+
+public interface MessageHandler {
+ void messageHandler(HazelcastJsonValue jsonMsg) throws IOException;
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/message/StopJobRequestHandler.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/message/StopJobRequestHandler.java
new file mode 100644
index 00000000..880fba4b
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/message/StopJobRequestHandler.java
@@ -0,0 +1,46 @@
+package org.onedatashare.transferservice.odstransferservice.message;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.hazelcast.core.HazelcastJsonValue;
+import org.onedatashare.transferservice.odstransferservice.model.StopJobRequest;
+import org.onedatashare.transferservice.odstransferservice.service.JobControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.explore.JobExplorer;
+import org.springframework.batch.core.launch.JobExecutionNotRunningException;
+import org.springframework.batch.core.launch.JobOperator;
+import org.springframework.batch.core.launch.NoSuchJobExecutionException;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.util.Set;
+
+@Service
+public class StopJobRequestHandler implements MessageHandler {
+
+ private final ObjectMapper objectMapper;
+ private final JobOperator jobOperator;
+ private final JobExplorer jobExplorer;
+ private final Logger logger;
+
+ public StopJobRequestHandler(JobExplorer jobExplorer, JobOperator jobOperator, ObjectMapper objectMapper) {
+ this.jobOperator = jobOperator;
+ this.objectMapper = objectMapper;
+ this.jobExplorer = jobExplorer;
+ this.logger = LoggerFactory.getLogger(StopJobRequestHandler.class);
+ }
+
+ @Override
+ public void messageHandler(HazelcastJsonValue jsonMsg) throws IOException {
+ StopJobRequest stopJobRequest = this.objectMapper.readValue(jsonMsg.getValue(), StopJobRequest.class);
+ Set jobExecutionSet = this.jobExplorer.findRunningJobExecutions(stopJobRequest.getJobUuid().toString());
+ for (JobExecution jobExecution : jobExecutionSet) {
+ try {
+ jobOperator.stop(jobExecution.getId());
+ } catch (NoSuchJobExecutionException | JobExecutionNotRunningException e) {
+ logger.error("Was unable to stop job: {} with error message: {}", jobExecution, e.getMessage());
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/message/TransferApplicationParamHandler.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/message/TransferApplicationParamHandler.java
new file mode 100644
index 00000000..0efbf52b
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/message/TransferApplicationParamHandler.java
@@ -0,0 +1,31 @@
+package org.onedatashare.transferservice.odstransferservice.message;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.hazelcast.core.HazelcastJsonValue;
+import org.onedatashare.transferservice.odstransferservice.model.TransferApplicationParams;
+import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolContract;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service
+public class TransferApplicationParamHandler implements MessageHandler {
+
+ private final ObjectMapper mesageObjectMapper;
+ private final ThreadPoolContract threadPool;
+ Logger logger = LoggerFactory.getLogger(TransferApplicationParamHandler.class);
+
+ public TransferApplicationParamHandler(ObjectMapper messageObjectMapper, ThreadPoolContract threadPool) {
+ this.mesageObjectMapper = messageObjectMapper;
+ this.threadPool = threadPool;
+ }
+
+ @Override
+ public void messageHandler(HazelcastJsonValue jsonMsg) throws JsonProcessingException {
+ String jsonStr = jsonMsg.getValue();
+ TransferApplicationParams params = mesageObjectMapper.readValue(jsonStr, TransferApplicationParams.class);
+ logger.info("Parsed TransferApplicationParams: {}", params);
+ this.threadPool.applyOptimizer(params.getConcurrency(), params.getParallelism());
+ }
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/message/TransferJobRequestHandler.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/message/TransferJobRequestHandler.java
new file mode 100644
index 00000000..dbbdcba0
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/message/TransferJobRequestHandler.java
@@ -0,0 +1,50 @@
+package org.onedatashare.transferservice.odstransferservice.message;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.hazelcast.core.HazelcastJsonValue;
+import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;
+import org.onedatashare.transferservice.odstransferservice.model.TransferJobRequest;
+import org.onedatashare.transferservice.odstransferservice.service.JobControl;
+import org.onedatashare.transferservice.odstransferservice.service.expanders.ExpanderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Service
+public class TransferJobRequestHandler implements MessageHandler {
+
+ private final ObjectMapper objectMapper;
+ private final JobControl jobControl;
+ private final ExpanderFactory expanderFactory;
+
+ Logger logger = LoggerFactory.getLogger(TransferJobRequestHandler.class);
+
+ public TransferJobRequestHandler(ObjectMapper messageObjectMapper, JobControl jobControl, ExpanderFactory expanderFactory) {
+ this.objectMapper = messageObjectMapper;
+ this.jobControl = jobControl;
+ this.expanderFactory = expanderFactory;
+ }
+
+ @Override
+ public void messageHandler(HazelcastJsonValue jsonMessage) throws JsonProcessingException {
+ String jsonStr = jsonMessage.getValue();
+ TransferJobRequest request = null;
+ try {
+ request = objectMapper.readValue(jsonStr, TransferJobRequest.class);
+ } catch (JsonProcessingException e) {
+ logger.error("Failed to parse Transfer Job Request: {}", jsonStr);
+ return;
+ }
+ logger.info("Job Received: {}", request.toString());
+ List fileInfo = expanderFactory.getExpander(request.getSource());
+ request.getSource().setInfoList(fileInfo);
+ try {
+ this.jobControl.runJob(request);
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/AWSSinglePutRequestMetaData.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/AWSSinglePutRequestMetaData.java
deleted file mode 100644
index bc361574..00000000
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/AWSSinglePutRequestMetaData.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.onedatashare.transferservice.odstransferservice.model;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.SneakyThrows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.stream.Collectors;
-
-@Getter
-@Setter
-public class AWSSinglePutRequestMetaData {
- private Queue dataChunkPriorityQueue;
- Logger logger = LoggerFactory.getLogger(AWSSinglePutRequestMetaData.class);
-
- public AWSSinglePutRequestMetaData(){
- this.dataChunkPriorityQueue = new ConcurrentLinkedQueue();
- }
- public void addChunk(DataChunk chunk){
- this.dataChunkPriorityQueue.add(chunk);
- }
- public void addAllChunks(List extends DataChunk> chunks){
- this.dataChunkPriorityQueue.addAll(chunks);
- }
-
- @SneakyThrows
- public InputStream condenseListToOneStream(long size){
- MessageDigest md = MessageDigest.getInstance("SHA-256");
- byte[] data = new byte[Long.valueOf(size).intValue()];
- ByteBuffer buffer = ByteBuffer.wrap(data);
- List list = this.dataChunkPriorityQueue.stream().sorted(new DataChunkComparator()).collect(Collectors.toList());
- for(DataChunk currentChunk : list){
- logger.info("Processing chunk {}", currentChunk);
- buffer.put(currentChunk.getData());
- md.update(currentChunk.getData());
- }
- String output = String.format("%032X", new BigInteger(1, md.digest()));
- logger.info(String.valueOf(output));
- this.dataChunkPriorityQueue.clear();
- return new ByteArrayInputStream(buffer.array());
- }
-
- public void clear(){
- this.dataChunkPriorityQueue.clear();
- }
-}
\ No newline at end of file
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/BoxSmallFileUpload.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/BoxSmallFileUpload.java
deleted file mode 100644
index ad7f3764..00000000
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/BoxSmallFileUpload.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.onedatashare.transferservice.odstransferservice.model;
-
-import lombok.Getter;
-import lombok.Setter;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.PriorityQueue;
-
-/**
- * This class is used to Buffer all of the data from a small file. So any file less than 20MB according to the box api
- */
-@Getter
-@Setter
-public class BoxSmallFileUpload {
- private PriorityQueue dataChunkPriorityQueue;
-
- public BoxSmallFileUpload(){
- this.dataChunkPriorityQueue = new PriorityQueue(new DataChunkComparator());
- }
-
- public void addAllChunks(List extends DataChunk> chunks){
- this.dataChunkPriorityQueue.addAll(chunks);
- }
-
- public InputStream condenseListToOneStream(long size){
- byte[] data = new byte[Long.valueOf(size).intValue()];//we know this file will always be <= 20MB
- ByteBuffer buffer = ByteBuffer.wrap(data);
- for(DataChunk chunk : this.dataChunkPriorityQueue){
- buffer.put(chunk.getData());
- }
- this.dataChunkPriorityQueue.clear();
- return new ByteArrayInputStream(buffer.array());
- }
-}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/CarbonIntensityMapKey.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/CarbonIntensityMapKey.java
new file mode 100644
index 00000000..40038a2e
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/CarbonIntensityMapKey.java
@@ -0,0 +1,18 @@
+package org.onedatashare.transferservice.odstransferservice.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class CarbonIntensityMapKey {
+ String ownerId;
+ String transferNodeName;
+ UUID jobUuid;
+ LocalDateTime timeMeasuredAt;
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/CarbonIpEntry.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/CarbonIpEntry.java
new file mode 100644
index 00000000..5595f6d0
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/CarbonIpEntry.java
@@ -0,0 +1,11 @@
+package org.onedatashare.transferservice.odstransferservice.model;
+
+import lombok.Data;
+
+@Data
+public class CarbonIpEntry {
+ String ip;
+ int carbonIntensity;
+ double lat;
+ double lon;
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/CarbonMeasurement.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/CarbonMeasurement.java
new file mode 100644
index 00000000..6853c123
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/CarbonMeasurement.java
@@ -0,0 +1,17 @@
+package org.onedatashare.transferservice.odstransferservice.model;
+
+import lombok.Data;
+
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.UUID;
+
+@Data
+public class CarbonMeasurement {
+
+ List traceRouteCarbon;
+ String ownerId;
+ String transferNodeName;
+ UUID jobUuid;
+ LocalDateTime timeMeasuredAt;
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/EntityInfo.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/EntityInfo.java
index c01faa88..433514cb 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/EntityInfo.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/EntityInfo.java
@@ -1,5 +1,7 @@
package org.onedatashare.transferservice.odstransferservice.model;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -14,4 +16,14 @@ public class EntityInfo {
private String path;
private long size;
private int chunkSize;
+
+ @Override
+ public String toString(){
+ ObjectMapper objectMapper = new ObjectMapper();
+ try {
+ return objectMapper.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/FileBuffer.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/FileBuffer.java
deleted file mode 100644
index d68d459d..00000000
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/FileBuffer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.onedatashare.transferservice.odstransferservice.model;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.PriorityQueue;
-
-public class FileBuffer {
- private PriorityQueue dataChunkPriorityQueue;
-
- public FileBuffer(){
- this.dataChunkPriorityQueue = new PriorityQueue(new DataChunkComparator());
- }
- public void addChunk(DataChunk chunk){
- this.dataChunkPriorityQueue.add(chunk);
- }
- public void addAllChunks(List extends DataChunk> chunks){
- this.dataChunkPriorityQueue.addAll(chunks);
- }
-
- public InputStream condenseListToOneStream(long size){
- byte[] data = new byte[Long.valueOf(size).intValue()];
- ByteBuffer buffer = ByteBuffer.wrap(data);
- for(DataChunk chunk : this.dataChunkPriorityQueue){
- buffer.put(chunk.getData());
- }
- return new ByteArrayInputStream(buffer.array());
- }
-
- public void clear(){
- this.dataChunkPriorityQueue.clear();
- }
-}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/FileTransferNodeMetaData.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/FileTransferNodeMetaData.java
new file mode 100644
index 00000000..013573c2
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/FileTransferNodeMetaData.java
@@ -0,0 +1,23 @@
+package org.onedatashare.transferservice.odstransferservice.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+@Data
+@AllArgsConstructor
+@Builder
+public class FileTransferNodeMetaData implements Serializable {
+
+ //ods metrics
+ String odsOwner;
+ String nodeName;
+ UUID nodeUuid;
+ Boolean runningJob;
+ Boolean online;
+ long jobId;
+ UUID jobUuid;
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/SmallFileUpload.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/SmallFileUpload.java
new file mode 100644
index 00000000..fe16efdb
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/SmallFileUpload.java
@@ -0,0 +1,42 @@
+package org.onedatashare.transferservice.odstransferservice.model;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+import java.util.PriorityQueue;
+
+@Getter
+@Setter
+public class SmallFileUpload {
+
+ private PriorityQueue dataChunkPriorityQueue;
+
+ public SmallFileUpload(){
+ this.dataChunkPriorityQueue = new PriorityQueue(new DataChunkComparator());
+ }
+
+ public void addAllChunks(List extends DataChunk> chunks){
+ this.dataChunkPriorityQueue.addAll(chunks);
+ }
+
+ public InputStream condenseListToOneStream(){
+ int totalLength = this.dataChunkPriorityQueue.stream().mapToInt(byteArray -> byteArray.getData().length).sum();
+ byte[] combinedBytes = new byte[totalLength];
+
+ int currentIndex = 0;
+ for (DataChunk chunk : dataChunkPriorityQueue) {
+ byte[] byteArray = chunk.getData();
+ System.arraycopy(byteArray, 0, combinedBytes, currentIndex, byteArray.length);
+ currentIndex += byteArray.length;
+ }
+
+ return new ByteArrayInputStream(combinedBytes);
+ }
+
+ public void clearBuffer(){
+ this.dataChunkPriorityQueue.clear();
+ }
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/StopJobRequest.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/StopJobRequest.java
new file mode 100644
index 00000000..b6f18c16
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/StopJobRequest.java
@@ -0,0 +1,12 @@
+package org.onedatashare.transferservice.odstransferservice.model;
+
+import lombok.Data;
+
+import java.util.UUID;
+
+@Data
+public class StopJobRequest {
+ UUID jobUuid;
+ Integer jobId;
+ String ownerId;
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/optimizer/TransferApplicationParams.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferApplicationParams.java
similarity index 94%
rename from src/main/java/org/onedatashare/transferservice/odstransferservice/model/optimizer/TransferApplicationParams.java
rename to src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferApplicationParams.java
index bec5c9b3..2d88ccac 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/optimizer/TransferApplicationParams.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferApplicationParams.java
@@ -1,4 +1,4 @@
-package org.onedatashare.transferservice.odstransferservice.model.optimizer;
+package org.onedatashare.transferservice.odstransferservice.model;
import lombok.Data;
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferJobRequest.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferJobRequest.java
index 0064299e..2c2b6ffb 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferJobRequest.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferJobRequest.java
@@ -6,8 +6,7 @@
import org.onedatashare.transferservice.odstransferservice.model.credential.AccountEndpointCredential;
import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential;
-
-import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
@Data
@@ -15,10 +14,13 @@
@NoArgsConstructor
public class TransferJobRequest {
- @NonNull private String ownerId;
+ @NonNull
+ private String ownerId;
private int connectionBufferSize;
- @NonNull private Source source;
- @NonNull private Destination destination;
+ @NonNull
+ private Source source;
+ @NonNull
+ private Destination destination;
private TransferOptions options;
@JsonInclude(JsonInclude.Include.NON_NULL)
private UUID jobUuid;
@@ -28,7 +30,8 @@ public class TransferJobRequest {
@AllArgsConstructor
@NoArgsConstructor
public static class Destination {
- @NonNull private EndpointType type;
+ @NonNull
+ private EndpointType type;
String credId;
private AccountEndpointCredential vfsDestCredential;
private OAuthEndpointCredential oauthDestCredential;
@@ -39,11 +42,13 @@ public static class Destination {
@AllArgsConstructor
@NoArgsConstructor
public static class Source {
- @NonNull private EndpointType type;
+ @NonNull
+ private EndpointType type;
String credId;
private AccountEndpointCredential vfsSourceCredential;
private OAuthEndpointCredential oauthSourceCredential;
private String fileSourcePath;
- @NonNull private ArrayList infoList;
+ @NonNull
+ private List infoList;
}
}
\ No newline at end of file
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/credential/AccountEndpointCredential.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/credential/AccountEndpointCredential.java
index c816c33c..1777e80b 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/credential/AccountEndpointCredential.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/credential/AccountEndpointCredential.java
@@ -8,7 +8,7 @@
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
-public class AccountEndpointCredential extends EndpointCredential{
+public class AccountEndpointCredential extends EndpointCredential {
private String uri; //the hostname and port to reach the server
private String username; //this should be the username for the client
@ToString.Exclude
@@ -18,12 +18,11 @@ public class AccountEndpointCredential extends EndpointCredential{
public static String[] uriFormat(AccountEndpointCredential credential, EndpointType type) {
String noTypeUri = "";
- if(type.equals(EndpointType.sftp)){
+ if (type.equals(EndpointType.sftp)) {
noTypeUri = credential.getUri().replaceFirst("sftp://", "");
- }else if(type.equals(EndpointType.ftp)){
+ } else if (type.equals(EndpointType.ftp)) {
noTypeUri = credential.getUri().replaceFirst("ftp://", "");
- }
- else{
+ } else {
noTypeUri = credential.getUri().replaceFirst("http://", "");
}
return noTypeUri.split(":");
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/metrics/CarbonScore.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/metrics/CarbonScore.java
new file mode 100644
index 00000000..0ad44096
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/metrics/CarbonScore.java
@@ -0,0 +1,14 @@
+package org.onedatashare.transferservice.odstransferservice.model.metrics;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CarbonScore {
+ public int avgCarbon;
+
+ public CarbonScore(){
+ this.avgCarbon = 0;
+ }
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/metrics/DataInflux.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/metrics/DataInflux.java
index 5eada8ed..3cb4af20 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/metrics/DataInflux.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/metrics/DataInflux.java
@@ -19,144 +19,147 @@ public class DataInflux {
@JsonProperty(value = NETWORK_INTERFACE)
@Column(name = NETWORK_INTERFACE)
- private String networkInterface;
+ private String networkInterface = "";
@JsonProperty(value = ODS_USER)
@Column(name = ODS_USER, tag = true)
- private String odsUser;
+ private String odsUser = "";
@JsonProperty(value = TRANSFER_NODE_NAME)
@Column(name = TRANSFER_NODE_NAME, tag = true)
- private String transferNodeName;
+ private String transferNodeName = "";
@JsonProperty(value = ACTIVE_CORE_COUNT)
@Column(name = ACTIVE_CORE_COUNT)
- private Integer coreCount;
+ private Integer coreCount = Runtime.getRuntime().availableProcessors();
@JsonProperty(value = CPU_FREQUENCY_MAX)
@Column(name = CPU_FREQUENCY_MAX)
- private Double cpu_frequency_max;
+ private Double cpu_frequency_max = 0.0;
@JsonProperty(value = CPU_FREQUENCY_CURRENT)
@Column(name = CPU_FREQUENCY_CURRENT)
- private Double cpu_frequency_current;
+ private Double cpu_frequency_current = 0.0;
@JsonProperty(value = CPU_FREQUENCY_MIN)
@Column(name = CPU_FREQUENCY_MIN)
- private Double cpu_frequency_min;
+ private Double cpu_frequency_min = 0.0;
@JsonProperty(value = CPU_ARCHITECTURE)
@Column(name = CPU_ARCHITECTURE)
- private String cpuArchitecture;
+ private String cpuArchitecture = "";
@JsonProperty(value = PACKET_LOSS_RATE)
@Column(name = PACKET_LOSS_RATE)
- private Double packetLossRate;
+ private Double packetLossRate = 0.0;
//NIC values
@JsonProperty(value = BYTES_SENT)
@Column(name = BYTES_SENT)
- private Long bytesSent;
+ private Long bytesSent = 0L;
@JsonProperty(value = BYTES_RECEIVED)
@Column(name = BYTES_RECEIVED)
- private Long bytesReceived;
+ private Long bytesReceived = 0L;
@JsonProperty(value = PACKETS_SENT)
@Column(name = PACKETS_SENT)
- private Long packetSent;
+ private Long packetSent = 0L;
@JsonProperty(value = PACKETS_RECEIVED)
@Column(name = PACKETS_RECEIVED)
- private Long packetReceived;
+ private Long packetReceived = 0L;
@JsonProperty(value = DROP_IN)
@Column(name = DROP_IN)
- private Long dropin;
+ private Long dropin = 0L;
@JsonProperty(value = DROP_OUT)
@Column(name = DROP_OUT)
- private Long dropout;
+ private Long dropout = 0L;
@JsonProperty(value = NIC_MTU)
@Column(name = NIC_MTU)
- private Integer nicMtu;
+ private Integer nicMtu = 0;
@JsonProperty(value = NIC_SPEED)
@Column(name = NIC_SPEED)
- private Integer nicSpeed;
+ private Integer nicSpeed = 0;
@JsonProperty(value = LATENCY)
@Column(name = LATENCY)
- private Double latency;
+ private Double latency = 0.0;
@JsonProperty(value = RTT)
@Column(name = RTT)
- private Double rtt;
+ private Double rtt = 0.0;
@Column(name = SOURCE_RTT)
- private Double sourceRtt;
+ private Double sourceRtt = 0.0;
@Column(name = SOURCE_LATENCY)
- private Double sourceLatency;
+ private Double sourceLatency = 0.0;
@Column(name = DESTINATION_RTT)
- private Double destinationRtt;
+ private Double destinationRtt = 0.0;
@Column(name = DEST_LATENCY)
- private Double destLatency;
+ private Double destLatency = 0.0;
@JsonProperty(value = ERROR_IN)
@Column(name = ERROR_IN)
- private Long errin;
+ private Long errin = 0L;
@JsonProperty(value = ERROR_OUT)
@Column(name = ERROR_OUT)
- private Long errout;
+ private Long errout = 0L;
//Job Values
@Column(name = JOB_ID, tag = true)
- private Long jobId;
+ private Long jobId = 0L;
@Column(name = READ_THROUGHPUT)
- private Double readThroughput;
+ private Double readThroughput = 0.0;
@Column(name = WRITE_THROUGHPUT)
- private Double writeThroughput;
+ private Double writeThroughput = 0.0;
@Column(name = BYTES_UPLOADED)
- private Long bytesWritten;
+ private Long bytesWritten = 0L;
@Column(name = BYTES_DOWNLOADED)
- private Long bytesRead;
+ private Long bytesRead = 0L;
@Column(name = CONCURRENCY)
- private Integer concurrency;
+ private Integer concurrency = 0;
@Column(name = PARALLELISM)
- private Integer parallelism;
+ private Integer parallelism = 0;
@Column(name = PIPELINING)
- private Integer pipelining;
+ private Integer pipelining = 0;
@Column(name = MEMORY)
- private Long memory;
+ private Long memory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
@Column(name = MAX_MEMORY)
- private Long maxMemory;
+ private Long maxMemory = Runtime.getRuntime().maxMemory();
@Column(name = FREE_MEMORY)
- private Long freeMemory;
+ private Long freeMemory = Runtime.getRuntime().freeMemory();
@Column(name = ALLOCATED_MEMORY)
- private Long allocatedMemory;
+ private Long allocatedMemory = Runtime.getRuntime().totalMemory();
@Column(name = JOB_SIZE)
- private Long jobSize;
+ private Long jobSize = 0L;
@Column(name = AVERAGE_FILE_SIZE)
- private Long avgFileSize;
+ private Long avgFileSize = 0L;
@Column(name = SOURCE_TYPE, tag = true)
- private String sourceType;
+ private String sourceType = "";
@Column(name = SOURCE_CRED_ID, tag = true)
- private String sourceCredId;
+ private String sourceCredId = "";
@Column(name = DESTINATION_TYPE, tag = true)
- private String destType;
+ private String destType = "";
@Column(name = DESTINATION_CRED_IT, tag = true)
- private String destCredId;
+ private String destCredId = "";
@Column(name = CHUNK_SIZE)
- private Long chunksize;
+ private Long chunksize = 0L;
@Column(name = JOB_UUID, tag = true)
private UUID jobUuid;
+
+ @Column(name = IS_RUNNING)
+ private Boolean isRunning = false;
}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/optimizer/OptimizerCreateRequest.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/optimizer/OptimizerCreateRequest.java
deleted file mode 100644
index 37488f10..00000000
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/optimizer/OptimizerCreateRequest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.onedatashare.transferservice.odstransferservice.model.optimizer;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.util.UUID;
-
-@Data
-public class OptimizerCreateRequest {
- String nodeId;
- int maxConcurrency;
- int maxParallelism;
- int maxPipelining;
- int maxChunkSize;
- String optimizerType;
- long fileCount;
- Long jobId;
- String dbType;
- String jobUuid;
- String userId;
-
- public OptimizerCreateRequest(String userId,String nodeId, int maxConcurrency, int maxParallelism, int maxPipelining, String optimizerType, long fileCount, long jobId, String dbType, String jobUuid) {
- this.userId = userId;
- this.maxConcurrency = maxConcurrency;
- this.maxChunkSize = Integer.MAX_VALUE;
- this.maxParallelism = maxParallelism;
- this.maxPipelining = maxPipelining;
- this.nodeId = nodeId;
- this.optimizerType = optimizerType;
- this.fileCount = fileCount;
- this.jobId = jobId;
- this.dbType = dbType;
- this.jobUuid = jobUuid;
- }
-}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/optimizer/OptimizerDeleteRequest.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/optimizer/OptimizerDeleteRequest.java
deleted file mode 100644
index 5f47d53e..00000000
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/optimizer/OptimizerDeleteRequest.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.onedatashare.transferservice.odstransferservice.model.optimizer;
-
-import lombok.Data;
-
-@Data
-public class OptimizerDeleteRequest {
- private String nodeId;
-
- public OptimizerDeleteRequest(String nodeId) {
- this.nodeId = nodeId;
- }
-}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolContract.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolContract.java
new file mode 100644
index 00000000..04886902
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolContract.java
@@ -0,0 +1,13 @@
+package org.onedatashare.transferservice.odstransferservice.pools;
+
+import org.springframework.core.task.TaskExecutor;
+
+public interface ThreadPoolContract {
+ public TaskExecutor createExecutor(int threadCount, String prefix);
+ public void applyOptimizer(int concurrency, int parallelism);
+ public void clearPools();
+ public int concurrencyCount();
+ public int parallelismCount();
+ public TaskExecutor stepPool(int threadCount);
+ public TaskExecutor parallelPool(int threadCount, String filePath);
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolManager.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolManager.java
deleted file mode 100644
index 74c32d00..00000000
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolManager.java
+++ /dev/null
@@ -1,179 +0,0 @@
-package org.onedatashare.transferservice.odstransferservice.pools;
-
-import lombok.Getter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.core.task.SimpleAsyncTaskExecutor;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Service;
-
-import java.util.HashMap;
-
-import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.PARALLEL_POOL_PREFIX;
-import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.STEP_POOL_PREFIX;
-
-@Service
-public class ThreadPoolManager {
-
- @Getter
- HashMap executorHashmap;
- HashMap platformThreadMap;
-
- Logger logger = LoggerFactory.getLogger(ThreadPoolManager.class);
-
- public ThreadPoolManager() {
- this.executorHashmap = new HashMap<>();
- this.platformThreadMap = new HashMap<>();
- }
-
- public ThreadPoolTaskExecutor createPlatformThreads(int corePoolSize, String prefix) {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setPrestartAllCoreThreads(true);
-// executor.setQueueCapacity(1);
- executor.setAllowCoreThreadTimeOut(false);
- executor.setCorePoolSize(corePoolSize);
-// executor.setMaxPoolSize(corePoolSize);
- executor.setThreadNamePrefix(prefix);
- executor.initialize();
- if (this.executorHashmap == null) {
- this.executorHashmap = new HashMap<>();
- }
- logger.info("Created ThreadPoolTaskExecutor: Prefix:{} with size:{}", prefix, corePoolSize);
- this.platformThreadMap.put(prefix, executor);
- return executor;
- }
-
- public SimpleAsyncTaskExecutor createVirtualThreadExecutor(int corePoolSize, String prefix) {
- SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
- executor.setThreadNamePrefix(prefix);
- executor.setVirtualThreads(true);
- executor.setConcurrencyLimit(corePoolSize);
- if (this.executorHashmap == null) {
- this.executorHashmap = new HashMap<>();
- }
- logger.info("Created a SimpleAsyncTaskExecutor: Prefix:{} with size:{}", prefix, corePoolSize);
- this.executorHashmap.put(prefix, executor);
- return executor;
- }
-
- /**
- * @param concurrency
- * @param parallel
- */
- public void applyOptimizer(int concurrency, int parallel) {
- for (String key : this.executorHashmap.keySet()) {
- SimpleAsyncTaskExecutor pool = this.executorHashmap.get(key);
- if (key.contains(STEP_POOL_PREFIX)) {
- logger.info("Changing {} pool size from {} to {}", pool.getThreadNamePrefix(), pool.getConcurrencyLimit(), concurrency);
- if (concurrency > 0 && concurrency != pool.getConcurrencyLimit()) {
- pool.setConcurrencyLimit(concurrency);
- logger.info("Set {} pool size to {}", pool.getThreadNamePrefix(), concurrency);
- }
- }
- if (key.contains(PARALLEL_POOL_PREFIX)) {
- logger.info("Changing {} pool size from {} to {}", pool.getThreadNamePrefix(), pool.getConcurrencyLimit(), parallel);
- if (parallel > 0 && parallel != pool.getConcurrencyLimit()) {
- pool.setConcurrencyLimit(parallel);
- logger.info("Set {} pool size to {}", pool.getThreadNamePrefix(), parallel);
- }
- }
- }
-
- for (String key : this.platformThreadMap.keySet()) {
- ThreadPoolTaskExecutor pool = this.platformThreadMap.get(key);
- if (key.contains(STEP_POOL_PREFIX)) {
- logger.info("Changing {} pool size from {} to {}", pool.getThreadNamePrefix(), pool.getCorePoolSize(), concurrency);
- if (concurrency > 0 && concurrency != pool.getCorePoolSize()) {
- pool.setCorePoolSize(concurrency);
- logger.info("Set {} pool size to {}", pool.getThreadNamePrefix(), concurrency);
- }
- }
- if (key.contains(PARALLEL_POOL_PREFIX)) {
- logger.info("Changing {} pool size from {} to {}", pool.getThreadNamePrefix(), pool.getCorePoolSize(), parallel);
- if (parallel > 0 && parallel != pool.getCorePoolSize()) {
- pool.setCorePoolSize(parallel);
- logger.info("Set {} pool size to {}", pool.getThreadNamePrefix(), parallel);
-
- }
- }
- }
- }
-
- public void clearJobPool() {
- for (String key : this.platformThreadMap.keySet()) {
- ThreadPoolTaskExecutor pool = this.platformThreadMap.get(key);
- pool.shutdown();
- logger.info("Shutting ThreadPoolTaskExecutor down {}", pool.getThreadNamePrefix());
- }
- for (String key : this.executorHashmap.keySet()) {
- SimpleAsyncTaskExecutor pool = this.executorHashmap.get(key);
- pool.close();
- logger.info("Shutting SimpleAsyncTaskExec down {}", pool.getThreadNamePrefix());
- }
- this.executorHashmap.clear();
- this.platformThreadMap.clear();
- logger.info("Cleared all thread pools");
- }
-
- // public SimpleAsyncTaskExecutor sequentialThreadPool() {
-// return this.createVirtualThreadExecutor(1, SEQUENTIAL_POOL_PREFIX);
-// }
-//
- public SimpleAsyncTaskExecutor stepTaskExecutorVirtual(int threadCount) {
- SimpleAsyncTaskExecutor te = this.executorHashmap.get(STEP_POOL_PREFIX);
- if (te == null) {
- return this.createVirtualThreadExecutor(threadCount, STEP_POOL_PREFIX);
- }
- return te;
- }
-
- public ThreadPoolTaskExecutor stepTaskExecutorPlatform(int threadCount) {
- ThreadPoolTaskExecutor te = this.platformThreadMap.get(STEP_POOL_PREFIX);
- if (te == null) {
- return this.createPlatformThreads(threadCount, STEP_POOL_PREFIX);
- }
- return te;
- }
-
- public SimpleAsyncTaskExecutor parallelThreadPoolVirtual(int threadCount, String fileName) {
- SimpleAsyncTaskExecutor te = this.executorHashmap.get(PARALLEL_POOL_PREFIX);
- if (te == null) {
- te = this.createVirtualThreadExecutor(threadCount, PARALLEL_POOL_PREFIX);
- }
- return te;
- }
-
- public ThreadPoolTaskExecutor parallelThreadPoolPlatform(int threadCount, String fileName) {
- return this.createPlatformThreads(threadCount, new StringBuilder().append(fileName).append("-").append(PARALLEL_POOL_PREFIX).toString());
- }
-
- public Integer concurrencyCount() {
- SimpleAsyncTaskExecutor threadPoolManager = this.executorHashmap.get(STEP_POOL_PREFIX);
- if (threadPoolManager == null) {
- return 0;
- }
- return threadPoolManager.getConcurrencyLimit();
- }
-
- public Integer parallelismCount() {
- int parallelism = 0;
- for (String key : this.executorHashmap.keySet()) {
- if (key.contains(PARALLEL_POOL_PREFIX)) {
- parallelism = this.executorHashmap.get(key).getConcurrencyLimit();
- if (parallelism > 0) {
- return parallelism;
- }
- }
- }
- for (String key : this.platformThreadMap.keySet()) {
- if (key.contains(PARALLEL_POOL_PREFIX)) {
- parallelism = this.platformThreadMap.get(key).getCorePoolSize();
- if (parallelism > 0) {
- return parallelism;
- }
- }
- }
- return parallelism;
- }
-
-}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolManagerPlatform.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolManagerPlatform.java
new file mode 100644
index 00000000..f5fbe593
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolManagerPlatform.java
@@ -0,0 +1,113 @@
+package org.onedatashare.transferservice.odstransferservice.pools;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Profile;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+
+import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.PARALLEL_POOL_PREFIX;
+import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.STEP_POOL_PREFIX;
+
+@Service("threadPool")
+@Profile("platform")
+public class ThreadPoolManagerPlatform implements ThreadPoolContract {
+ HashMap platformThreadMap;
+ Logger logger = LoggerFactory.getLogger(ThreadPoolManagerPlatform.class);
+
+ public ThreadPoolManagerPlatform() {
+ this.platformThreadMap = new HashMap<>();
+ }
+
+ @Override
+ public ThreadPoolTaskExecutor createExecutor(int threadCount, String prefix) {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setAllowCoreThreadTimeOut(true);
+ if(threadCount > 0) {
+ executor.setCorePoolSize(threadCount);
+ }
+ executor.setPrestartAllCoreThreads(true);
+ executor.setThreadNamePrefix(prefix);
+ executor.initialize();
+ if (this.platformThreadMap == null) {
+ this.platformThreadMap = new HashMap<>();
+ }
+ logger.info("Created ThreadPoolTaskExecutor: Prefix:{} with size:{}", prefix, threadCount);
+ this.platformThreadMap.put(prefix, executor);
+ return executor;
+ }
+
+ @Override
+ public void applyOptimizer(int concurrency, int parallelism) {
+ for (String key : this.platformThreadMap.keySet()) {
+ ThreadPoolTaskExecutor pool = this.platformThreadMap.get(key);
+ if (key.contains(STEP_POOL_PREFIX)) {
+ if (concurrency > 0 && concurrency != pool.getPoolSize()) {
+ pool.setCorePoolSize(concurrency);
+ logger.info("Set {} pool size to {}", pool.getThreadNamePrefix(), concurrency);
+ }
+ }
+ if (key.contains(PARALLEL_POOL_PREFIX)) {
+ logger.info("Changing {} pool size from {} to {}", pool.getThreadNamePrefix(), pool.getPoolSize(), parallelism);
+ if (parallelism > 0 && parallelism != pool.getPoolSize()) {
+ pool.setCorePoolSize(parallelism);
+ logger.info("Set {} pool size to {}", pool.getThreadNamePrefix(), parallelism);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void clearPools() {
+ for (String key : this.platformThreadMap.keySet()) {
+ ThreadPoolTaskExecutor pe = this.platformThreadMap.get(key);
+ pe.shutdown();
+ }
+ this.platformThreadMap.clear();
+ }
+
+ @Override
+ public int concurrencyCount() {
+ ThreadPoolTaskExecutor pe = this.platformThreadMap.get(STEP_POOL_PREFIX);
+ if (pe == null) {
+ return 0;
+ }
+ return pe.getCorePoolSize();
+ }
+
+ @Override
+ public int parallelismCount() {
+ for (String key : this.platformThreadMap.keySet()) {
+ if (key.contains(PARALLEL_POOL_PREFIX)) {
+ ThreadPoolTaskExecutor threadPoolManager = this.platformThreadMap.get(key);
+ if(threadPoolManager != null){
+ int parallelismCount = threadPoolManager.getCorePoolSize();
+ if(parallelismCount != 0){
+ return parallelismCount;
+ }
+ }
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public ThreadPoolTaskExecutor stepPool(int threadCount) {
+ ThreadPoolTaskExecutor te = this.platformThreadMap.get(STEP_POOL_PREFIX);
+ if (te == null) {
+ return this.createExecutor(threadCount, STEP_POOL_PREFIX);
+ }
+ return te;
+ }
+
+ @Override
+ public ThreadPoolTaskExecutor parallelPool(int threadCount, String filePath) {
+ ThreadPoolTaskExecutor te = this.platformThreadMap.get(PARALLEL_POOL_PREFIX + filePath);
+ if (te == null) {
+ te = this.createExecutor(threadCount, PARALLEL_POOL_PREFIX + filePath);
+ }
+ return te;
+ }
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolManagerVirtual.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolManagerVirtual.java
new file mode 100644
index 00000000..af125590
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolManagerVirtual.java
@@ -0,0 +1,116 @@
+package org.onedatashare.transferservice.odstransferservice.pools;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Profile;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+
+import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.PARALLEL_POOL_PREFIX;
+import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.STEP_POOL_PREFIX;
+
+@Service("threadPool")
+@Profile("virtual")
+public class ThreadPoolManagerVirtual implements ThreadPoolContract {
+
+ HashMap executorHashmap;
+
+ Logger logger = LoggerFactory.getLogger(ThreadPoolManagerVirtual.class);
+
+ public ThreadPoolManagerVirtual() {
+ this.executorHashmap = new HashMap<>();
+ }
+
+
+ @Override
+ public SimpleAsyncTaskExecutor createExecutor(int threadCount, String prefix) {
+ SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
+ executor.setThreadNamePrefix(prefix);
+ executor.setVirtualThreads(true);
+ if(threadCount > 0){
+ executor.setConcurrencyLimit(threadCount);
+ }
+ if (this.executorHashmap == null) {
+ this.executorHashmap = new HashMap<>();
+ }
+ logger.info("Created a SimpleAsyncTaskExecutor: Prefix:{} with size:{}", prefix, threadCount);
+ this.executorHashmap.put(prefix, executor);
+ return executor;
+ }
+
+ /**
+ * @param concurrency
+ * @param parallel
+ */
+ public void applyOptimizer(int concurrency, int parallel) {
+ SimpleAsyncTaskExecutor stepPool = this.executorHashmap.get(STEP_POOL_PREFIX);
+ if (stepPool != null) {
+ if (concurrency > 0 && concurrency != stepPool.getConcurrencyLimit()) {
+ stepPool.setConcurrencyLimit(concurrency);
+ logger.info("Set {} pool size to {}", stepPool.getThreadNamePrefix(), concurrency);
+ }
+ }
+ for (String key : this.executorHashmap.keySet()) {
+ if (key.contains(PARALLEL_POOL_PREFIX)) {
+ SimpleAsyncTaskExecutor parallelPool = this.executorHashmap.get(key);
+ if (parallelPool != null) {
+ if (parallel > 0 && parallel != parallelPool.getConcurrencyLimit()) {
+ parallelPool.setConcurrencyLimit(parallel);
+ logger.info("Set {} pool size to {}", parallelPool.getThreadNamePrefix(), parallel);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void clearPools() {
+ for (String key : this.executorHashmap.keySet()) {
+ SimpleAsyncTaskExecutor pool = this.executorHashmap.get(key);
+ pool.close();
+ logger.info("Shutting SimpleAsyncTaskExec down {}", pool.getThreadNamePrefix());
+ }
+ this.executorHashmap.clear();
+ logger.info("Cleared all thread pools");
+ }
+
+ @Override
+ public SimpleAsyncTaskExecutor stepPool(int threadCount) {
+ SimpleAsyncTaskExecutor te = this.executorHashmap.get(STEP_POOL_PREFIX);
+ if (te == null) {
+ return this.createExecutor(threadCount, STEP_POOL_PREFIX);
+ }
+ return te;
+ }
+
+ @Override
+ public SimpleAsyncTaskExecutor parallelPool(int threadCount, String filePath) {
+ SimpleAsyncTaskExecutor te = this.executorHashmap.get(PARALLEL_POOL_PREFIX + filePath);
+ if (te == null) {
+ te = this.createExecutor(threadCount, PARALLEL_POOL_PREFIX + filePath);
+ }
+ return te;
+ }
+
+ public int concurrencyCount() {
+ SimpleAsyncTaskExecutor threadPoolManager = this.executorHashmap.get(STEP_POOL_PREFIX);
+ if (threadPoolManager == null) {
+ return 0;
+ }
+ return threadPoolManager.getConcurrencyLimit();
+ }
+
+ public int parallelismCount() {
+ for (String key : this.executorHashmap.keySet()) {
+ if (key.contains(PARALLEL_POOL_PREFIX)) {
+ SimpleAsyncTaskExecutor executor = this.executorHashmap.get(key);
+ if (executor != null) {
+ return executor.getConcurrencyLimit();
+ }
+ }
+ }
+ return 0;
+ }
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/CarbonJobMeasure.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/CarbonJobMeasure.java
new file mode 100644
index 00000000..9d90f5ca
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/CarbonJobMeasure.java
@@ -0,0 +1,129 @@
+package org.onedatashare.transferservice.odstransferservice.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.hazelcast.core.HazelcastJsonValue;
+import com.hazelcast.map.IMap;
+import com.hazelcast.query.Predicate;
+import com.hazelcast.query.PredicateBuilder;
+import com.hazelcast.query.Predicates;
+import jakarta.annotation.PostConstruct;
+import org.onedatashare.transferservice.odstransferservice.Enum.EndpointType;
+import org.onedatashare.transferservice.odstransferservice.model.CarbonIpEntry;
+import org.onedatashare.transferservice.odstransferservice.model.CarbonMeasurement;
+import org.onedatashare.transferservice.odstransferservice.model.TransferJobRequest;
+import org.onedatashare.transferservice.odstransferservice.utility.ODSUtility;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Service
+public class CarbonJobMeasure {
+
+ private final IMap carbonIntensityMap;
+ private final IMap fileTransferScheduleMap;
+ private final PredicateBuilder.EntryObject entryObj;
+ private final PmeterParser pmeterParser;
+ private final ObjectMapper objectMapper;
+ private final Logger logger = LoggerFactory.getLogger(CarbonJobMeasure.class);
+ private boolean odsConnector;
+
+ @Value("${spring.application.name}")
+ private String appName;
+
+ @Value("${ods.user}")
+ private String odsUser;
+
+ public CarbonJobMeasure(IMap carbonIntensityMap, IMap fileTransferScheduleMap, PmeterParser pmeterParser, ObjectMapper objectMapper) {
+ this.carbonIntensityMap = carbonIntensityMap;
+ this.fileTransferScheduleMap = fileTransferScheduleMap;
+ this.entryObj = Predicates.newPredicateBuilder().getEntryObject();
+ this.pmeterParser = pmeterParser;
+ this.objectMapper = objectMapper;
+ this.odsConnector = false;
+ }
+
+ @PostConstruct
+ public void init() {
+ //set ODS Connector
+ if(this.odsUser.equals("OneDataShare") || this.appName.equals("ODSTransferService")) {
+ this.odsConnector = true;
+ }
+
+ }
+
+ public List getPotentialJobsFromMap() {
+ Predicate potentialJobs;
+ if (this.odsConnector) {
+ logger.info("{}} Querying Hazelcast for jobs", this.appName);
+ potentialJobs = this.entryObj.get("transferNodeName").equal("");
+ } else {
+ logger.info("ODS Connector: {} Querying Hazelcast for jobs", this.appName);
+ potentialJobs = this.entryObj.get("transferNodeName").equal(appName).or(this.entryObj.get("source.credId").equal(appName)).or(this.entryObj.get("destination.credId").equal(appName)).or(this.entryObj.get("ownerId").equal(this.odsUser));
+ }
+
+ Collection jsonJobs = this.fileTransferScheduleMap.values(potentialJobs);
+ return jsonJobs.stream().map(hazelcastJsonValue -> {
+ try {
+ return this.objectMapper.readValue(hazelcastJsonValue.getValue(), TransferJobRequest.class);
+ } catch (JsonProcessingException e) {
+ logger.error("Json Processing Exception: {}\n With message: {}", e, e.getMessage());
+ }
+ return null;
+ }).collect(Collectors.toList());
+ }
+
+ @Scheduled(cron = "0 * * * * *")
+ public void measureCarbonOfPotentialJobs() {
+ List potentialJobs = getPotentialJobsFromMap();
+ logger.info("Potential jobs from ODS to run: {}", potentialJobs);
+ potentialJobs.forEach(transferJobRequest -> {
+ try {
+ String sourceIp = "";
+ if (transferJobRequest.getSource().getVfsSourceCredential() != null) {
+ sourceIp = ODSUtility.uriFromEndpointCredential(transferJobRequest.getSource().getVfsSourceCredential(), transferJobRequest.getSource().getType());
+ } else {
+ sourceIp = ODSUtility.uriFromEndpointCredential(transferJobRequest.getSource().getOauthSourceCredential(), transferJobRequest.getSource().getType());
+ }
+ String destIp = "";
+ if (transferJobRequest.getDestination().getVfsDestCredential() != null) {
+ destIp = ODSUtility.uriFromEndpointCredential(transferJobRequest.getDestination().getVfsDestCredential(), transferJobRequest.getDestination().getType());
+ } else {
+ destIp = ODSUtility.uriFromEndpointCredential(transferJobRequest.getDestination().getOauthDestCredential(), transferJobRequest.getDestination().getType());
+ }
+ List totalEntries = new ArrayList<>();
+ if (!transferJobRequest.getSource().getType().equals(EndpointType.vfs)) {
+ totalEntries.addAll(this.pmeterParser.carbonPerIp(sourceIp));
+ }
+ if (transferJobRequest.getDestination().getType().equals(EndpointType.vfs)) {
+ totalEntries.addAll(this.pmeterParser.carbonPerIp(destIp));
+ }
+ CarbonMeasurement carbonMeasurement = new CarbonMeasurement();
+ carbonMeasurement.setTimeMeasuredAt(LocalDateTime.now());
+ carbonMeasurement.setJobUuid(transferJobRequest.getJobUuid());
+ carbonMeasurement.setOwnerId(transferJobRequest.getOwnerId());
+ carbonMeasurement.setTransferNodeName(transferJobRequest.getTransferNodeName());
+ carbonMeasurement.setTraceRouteCarbon(totalEntries);
+ HazelcastJsonValue jsonValue = new HazelcastJsonValue(this.objectMapper.writeValueAsString(carbonMeasurement));
+ UUID randomUUID = UUID.randomUUID();
+ this.carbonIntensityMap.put(randomUUID, jsonValue);
+ logger.info("Created Carbon entry with Key={} and Value={}", randomUUID, jsonValue.getValue());
+ } catch (JsonProcessingException e) {
+ logger.error("Failed to parse job: {} \n Error received: \t {}", transferJobRequest.toString(), e.getMessage());
+ } catch (IOException e) {
+ logger.error("Failed to measure ip: {} \n Error received: \t {}", transferJobRequest.toString(), e);
+ }
+ });
+ }
+
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/FileTransferNodeRegistrationService.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/FileTransferNodeRegistrationService.java
new file mode 100644
index 00000000..9040d059
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/FileTransferNodeRegistrationService.java
@@ -0,0 +1,63 @@
+package org.onedatashare.transferservice.odstransferservice.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.HazelcastJsonValue;
+import com.hazelcast.map.IMap;
+import jakarta.annotation.PostConstruct;
+import lombok.SneakyThrows;
+import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants;
+import org.onedatashare.transferservice.odstransferservice.model.FileTransferNodeMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Service;
+
+import java.util.UUID;
+
+@Service
+public class FileTransferNodeRegistrationService {
+
+ private final IMap fileTransferNodeRegistrationMap;
+ private final UUID nodeUuid;
+ private final String appName;
+ private final String odsOwner;
+ private final ObjectMapper objectMapper;
+ private final Logger logger = LoggerFactory.getLogger(FileTransferNodeRegistrationService.class);
+
+ public FileTransferNodeRegistrationService(HazelcastInstance hazelcastInstance, IMap fileTransferNodeRegistrationMap, Environment environment, ObjectMapper objectMapper) {
+ this.fileTransferNodeRegistrationMap = fileTransferNodeRegistrationMap;
+ this.nodeUuid = hazelcastInstance.getLocalEndpoint().getUuid();
+ this.appName = environment.getProperty("spring.application.name");
+ this.odsOwner = environment.getProperty("ods.user");
+ this.objectMapper = objectMapper;
+ }
+
+ @SneakyThrows
+ @PostConstruct
+ public void init() {
+ this.updateRegistrationInHazelcast(null);
+ }
+
+ public void updateRegistrationInHazelcast(JobExecution jobExecution) throws JsonProcessingException {
+ var metaDataBuilder = FileTransferNodeMetaData.builder();
+ if (jobExecution == null) {
+ metaDataBuilder.jobId(-1L);
+ metaDataBuilder.runningJob(false);
+ metaDataBuilder.jobUuid(new UUID(0, 0));
+ } else {
+ metaDataBuilder.jobId(jobExecution.getJobId());
+ metaDataBuilder.runningJob(jobExecution.isRunning());
+ metaDataBuilder.jobUuid(UUID.fromString(jobExecution.getJobParameters().getString(ODSConstants.JOB_UUID)));
+ }
+ metaDataBuilder.online(true);
+ metaDataBuilder.nodeName(this.appName);
+ metaDataBuilder.odsOwner(this.odsOwner);
+ metaDataBuilder.nodeUuid(this.nodeUuid);
+ String jsonValue = this.objectMapper.writeValueAsString(metaDataBuilder.build());
+ logger.info("Registering node: {}", jsonValue);
+ this.fileTransferNodeRegistrationMap.put(this.appName, new HazelcastJsonValue(jsonValue));
+ }
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/HazelcastConsumer.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/HazelcastConsumer.java
new file mode 100644
index 00000000..a858aca9
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/HazelcastConsumer.java
@@ -0,0 +1,82 @@
+package org.onedatashare.transferservice.odstransferservice.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.core.HazelcastJsonValue;
+import org.onedatashare.transferservice.odstransferservice.Enum.MessageType;
+import org.onedatashare.transferservice.odstransferservice.message.StopJobRequestHandler;
+import org.onedatashare.transferservice.odstransferservice.message.TransferApplicationParamHandler;
+import org.onedatashare.transferservice.odstransferservice.message.TransferJobRequestHandler;
+import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolContract;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+
+@Service
+public class HazelcastConsumer {
+
+ private final IQueue messageQueue;
+ private final ObjectMapper objectMapper;
+ private final TransferJobRequestHandler transferJobRequestHandler;
+ private final TransferApplicationParamHandler transferParamApplicationHandler;
+ private final Logger logger;
+ private final StopJobRequestHandler stopJobRequestHandler;
+ private final TaskExecutor executor;
+
+ public HazelcastConsumer(ThreadPoolContract threadPoolContract, StopJobRequestHandler stopJobRequestHandler, IQueue messageQueue, ObjectMapper objectMapper, TransferJobRequestHandler transferJobRequestHandler, TransferApplicationParamHandler transferApplicationParamHandler) {
+ this.messageQueue = messageQueue;
+ this.transferJobRequestHandler = transferJobRequestHandler;
+ this.objectMapper = objectMapper;
+ this.transferParamApplicationHandler = transferApplicationParamHandler;
+ this.logger = LoggerFactory.getLogger(HazelcastConsumer.class);
+ this.stopJobRequestHandler = stopJobRequestHandler;
+ this.executor = threadPoolContract.createExecutor(10, "HazelcastConsumer");
+ }
+
+
+ @Scheduled(cron = "0/5 * * * * *")
+ public void runConsumer() throws JsonProcessingException {
+ HazelcastJsonValue jsonMsg = this.messageQueue.poll();
+ if (jsonMsg == null) return;
+ JsonNode jsonNode = this.objectMapper.readTree(jsonMsg.getValue());
+ logger.info("Got Msg: {}", jsonNode.toPrettyString());
+ String type = ((ObjectNode) jsonNode).get("type").asText();
+ ((ObjectNode) jsonNode).remove("type");
+ HazelcastJsonValue properJsonMsg = new HazelcastJsonValue(jsonNode.toString());
+ this.executor.execute(() -> {
+ switch (MessageType.valueOf(type)) {
+ case MessageType.TRANSFER_JOB_REQUEST:
+ try {
+ this.transferJobRequestHandler.messageHandler(properJsonMsg);
+ } catch (JsonProcessingException e) {
+ logger.error("Failed to parse json in TransferJobReqeust Message Handler: {} \n Error: {}", properJsonMsg, e.getMessage());
+ }
+ break;
+
+ case MessageType.APPLICATION_PARAM_CHANGE:
+ try {
+ this.transferParamApplicationHandler.messageHandler(properJsonMsg);
+ } catch (JsonProcessingException e) {
+ logger.error("Failed to parse json in TransferParam Message Handler: {} \n Error: {}", properJsonMsg, e.getMessage());
+ }
+ break;
+
+ case MessageType.STOP_JOB_REQUEST:
+ try {
+ this.stopJobRequestHandler.messageHandler(properJsonMsg);
+ } catch (IOException e) {
+ logger.error("Failed to parse json in Stop Job Message Handler: {} \n Error: {}", properJsonMsg, e.getMessage());
+ }
+ break;
+ }
+ });
+ }
+
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/InfluxCache.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/InfluxCache.java
index c314dc35..705a2948 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/InfluxCache.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/InfluxCache.java
@@ -2,7 +2,7 @@
import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants;
import org.onedatashare.transferservice.odstransferservice.model.JobMetric;
-import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolManager;
+import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolContract;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
@@ -26,7 +26,7 @@
@Service
public class InfluxCache {
- private final ThreadPoolManager threadPoolManager;
+ private final ThreadPoolContract threadPool;
public ConcurrentHashMap threadCache; //stores a JobMetric that represents everything that thread has processed for the step. Thus each JobMetric is an aggregate of what has happened
Logger logger = LoggerFactory.getLogger(InfluxCache.class);
@@ -36,8 +36,8 @@ public enum ThroughputType {
WRITER
}
- public InfluxCache(ThreadPoolManager threadPoolManager) {
- this.threadPoolManager = threadPoolManager;
+ public InfluxCache(ThreadPoolContract threadPool) {
+ this.threadPool = threadPool;
this.threadCache = new ConcurrentHashMap<>();
}
@@ -47,8 +47,8 @@ public void addMetric(long threadId, StepExecution stepExecution, long totalByte
prevMetric = new JobMetric();
prevMetric.setThreadId(threadId);
prevMetric.setStepExecution(stepExecution);
- prevMetric.setConcurrency(this.threadPoolManager.concurrencyCount());
- prevMetric.setParallelism(this.threadPoolManager.parallelismCount());
+ prevMetric.setConcurrency(this.threadPool.concurrencyCount());
+ prevMetric.setParallelism(this.threadPool.parallelismCount());
prevMetric.setPipelining(stepExecution.getJobParameters().getLong(PIPELINING).intValue());
prevMetric.setChunkSize(chunkSize);
this.threadCache.put(threadId, prevMetric);
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DatabaseService/InfluxIOService.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/InfluxIOService.java
similarity index 68%
rename from src/main/java/org/onedatashare/transferservice/odstransferservice/service/DatabaseService/InfluxIOService.java
rename to src/main/java/org/onedatashare/transferservice/odstransferservice/service/InfluxIOService.java
index 2f39e73e..9012c52c 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DatabaseService/InfluxIOService.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/InfluxIOService.java
@@ -1,4 +1,4 @@
-package org.onedatashare.transferservice.odstransferservice.service.DatabaseService;
+package org.onedatashare.transferservice.odstransferservice.service;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
@@ -6,26 +6,26 @@
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.exceptions.InfluxException;
import com.influxdb.exceptions.UnprocessableEntityException;
+import jakarta.annotation.PostConstruct;
import org.onedatashare.transferservice.odstransferservice.model.metrics.DataInflux;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
-@Component
+@Service
public class InfluxIOService {
private final InfluxDBClient influxClient;
Logger logger = LoggerFactory.getLogger(InfluxIOService.class);
@Value("${ods.influx.bucket}")
- private String bucketName;
+ private String defaultInfluxBucket;
@Value("${ods.influx.org}")
String org;
+ Bucket bucket;
private WriteApi writeApi;
public InfluxIOService(InfluxDBClient influxClient) {
@@ -33,29 +33,27 @@ public InfluxIOService(InfluxDBClient influxClient) {
this.writeApi = this.influxClient.makeWriteApi();
}
- public void reconfigureBucketForNewJob(String ownerId) {
- logger.info("********* Reconfiguring the Bucket ***********");
- Bucket bucket;
- if (ownerId == null) {
- bucket = influxClient.getBucketsApi().findBucketByName(this.bucketName);
- } else {
- bucket = influxClient.getBucketsApi().findBucketByName(ownerId);
- }
+ @PostConstruct
+ public void postConstruct() {
+ this.reconfigureBucketForNewJob(this.defaultInfluxBucket);
+ }
+ public void reconfigureBucketForNewJob(String bucketName) {
+ logger.info("********* Reconfiguring the Bucket to Owner {}***********", bucketName);
+ bucket = influxClient.getBucketsApi().findBucketByName(bucketName);
if (bucket == null) {
- logger.info("Creating the Influx bucket name={}, org={}", ownerId, org);
+ logger.info("Creating the Influx bucket name={}, org={}", bucketName, org);
try {
- bucket = this.influxClient.getBucketsApi().createBucket(ownerId, org);
+ bucket = this.influxClient.getBucketsApi().createBucket(bucketName, org);
} catch (UnprocessableEntityException ignored) {
}
}
- this.writeApi = this.influxClient.makeWriteApi();
}
public void insertDataPoint(DataInflux point) {
try {
- writeApi.writeMeasurement(WritePrecision.MS, point);
+ writeApi.writeMeasurement(this.bucket.getName(), this.org, WritePrecision.MS, point);
} catch (InfluxException exception) {
logger.error("Exception occurred while pushing measurement to influx: " + exception.getMessage());
}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobCompletionListener.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobCompletionListener.java
new file mode 100644
index 00000000..c3f21cde
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobCompletionListener.java
@@ -0,0 +1,65 @@
+package org.onedatashare.transferservice.odstransferservice.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolContract;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobExecutionListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+import java.time.Duration;
+import java.util.Set;
+
+
+@Service
+public class JobCompletionListener implements JobExecutionListener {
+ private final ThreadPoolContract threadPool;
+ private Set jobIds;
+ Logger logger = LoggerFactory.getLogger(JobCompletionListener.class);
+
+ ConnectionBag connectionBag;
+
+ MetricsCollector metricsCollector;
+
+ @Autowired
+ FileTransferNodeRegistrationService fileTransferNodeRegistrationService;
+
+ public JobCompletionListener(MetricsCollector metricsCollector, ConnectionBag connectionBag, ThreadPoolContract threadPool, Set jobIds) {
+ this.metricsCollector = metricsCollector;
+ this.connectionBag = connectionBag;
+ this.threadPool = threadPool;
+ this.jobIds = jobIds;
+ }
+
+
+ @Override
+ @Async
+ public void beforeJob(JobExecution jobExecution) {
+ logger.info("*****Job Execution start Time***** : {} with jobId={}", jobExecution.getStartTime(), jobExecution.getJobId());
+ this.jobIds.add(jobExecution.getJobId());
+ try {
+ this.fileTransferNodeRegistrationService.updateRegistrationInHazelcast(jobExecution);
+ } catch (JsonProcessingException e) {
+ logger.error("Failed to update status of FTN inside of Hazelcast for job start. Exception \n {}", e.getMessage());
+ }
+ }
+
+ @Override
+ @Async
+ public void afterJob(JobExecution jobExecution) {
+ logger.info("*****Job Execution End Time**** : {}", jobExecution.getEndTime());
+ logger.info("Total Job Time in seconds: {}", Duration.between(jobExecution.getStartTime(), jobExecution.getEndTime()).toSeconds());
+ connectionBag.closePools();
+ this.threadPool.clearPools();
+ System.gc();
+ try {
+ this.fileTransferNodeRegistrationService.updateRegistrationInHazelcast(jobExecution);
+ } catch (JsonProcessingException e) {
+ logger.error("Failed to update status of FTN inside of Hazelcast for job end. Exception \n {}", e.getMessage());
+ }
+ }
+}
+
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java
index b4e19ce4..cf62267e 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java
@@ -3,55 +3,31 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
-import org.onedatashare.transferservice.odstransferservice.Enum.EndpointType;
import org.onedatashare.transferservice.odstransferservice.model.DataChunk;
-import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;
import org.onedatashare.transferservice.odstransferservice.model.TransferJobRequest;
-import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolManager;
-import org.onedatashare.transferservice.odstransferservice.service.DatabaseService.InfluxIOService;
-import org.onedatashare.transferservice.odstransferservice.service.cron.MetricsCollector;
-import org.onedatashare.transferservice.odstransferservice.service.listner.JobCompletionListener;
-import org.onedatashare.transferservice.odstransferservice.service.step.AmazonS3.AmazonS3LargeFileWriter;
-import org.onedatashare.transferservice.odstransferservice.service.step.AmazonS3.AmazonS3Reader;
-import org.onedatashare.transferservice.odstransferservice.service.step.AmazonS3.AmazonS3SmallFileWriter;
-import org.onedatashare.transferservice.odstransferservice.service.step.box.BoxReader;
-import org.onedatashare.transferservice.odstransferservice.service.step.box.BoxWriterLargeFile;
-import org.onedatashare.transferservice.odstransferservice.service.step.box.BoxWriterSmallFile;
-import org.onedatashare.transferservice.odstransferservice.service.step.dropbox.DropBoxChunkedWriter;
-import org.onedatashare.transferservice.odstransferservice.service.step.dropbox.DropBoxReader;
-import org.onedatashare.transferservice.odstransferservice.service.step.ftp.FTPReader;
-import org.onedatashare.transferservice.odstransferservice.service.step.ftp.FTPWriter;
-import org.onedatashare.transferservice.odstransferservice.service.step.googleDrive.GDriveReader;
-import org.onedatashare.transferservice.odstransferservice.service.step.googleDrive.GDriveResumableWriter;
-import org.onedatashare.transferservice.odstransferservice.service.step.googleDrive.GDriveSimpleWriter;
-import org.onedatashare.transferservice.odstransferservice.service.step.http.HttpReader;
-import org.onedatashare.transferservice.odstransferservice.service.step.scp.SCPReader;
-import org.onedatashare.transferservice.odstransferservice.service.step.scp.SCPWriter;
-import org.onedatashare.transferservice.odstransferservice.service.step.sftp.SFTPReader;
-import org.onedatashare.transferservice.odstransferservice.service.step.sftp.SFTPWriter;
-import org.onedatashare.transferservice.odstransferservice.service.step.vfs.VfsReader;
-import org.onedatashare.transferservice.odstransferservice.service.step.vfs.VfsWriter;
+import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolContract;
+import org.onedatashare.transferservice.odstransferservice.service.step.ReaderWriterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.batch.core.Job;
+import org.springframework.batch.core.*;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.Flow;
+import org.springframework.batch.core.launch.JobLauncher;
+import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
+import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
-import org.springframework.batch.item.ItemReader;
-import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.List;
import java.util.stream.Collectors;
-import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.FIVE_MB;
-import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.TWENTY_MB;
-
@Service
@NoArgsConstructor
@@ -59,12 +35,10 @@
@Setter
public class JobControl {
- public TransferJobRequest request;
-
Logger logger = LoggerFactory.getLogger(JobControl.class);
@Autowired
- VfsExpander vfsExpander;
+ ReaderWriterFactory readerWriterFactory;
@Autowired
JobRepository jobRepository;
@@ -76,26 +50,28 @@ public class JobControl {
JobCompletionListener jobCompletionListener;
@Autowired
- MetricsCollector metricsCollector;
+ PlatformTransactionManager platformTransactionManager;
+
+ @Autowired
+ InfluxIOService influxIOService;
@Autowired
- InfluxCache influxCache;
+ ThreadPoolContract threadPool;
@Autowired
- PlatformTransactionManager platformTransactionManager;
+ BackOffPolicy backOffPolicy;
@Autowired
- InfluxIOService influxIOService;
+ JobLauncher jobLauncher;
@Autowired
- ThreadPoolManager threadPoolManager;
-
- private List createConcurrentFlow(List infoList, String basePath) {
- if (this.request.getSource().getType().equals(EndpointType.vfs)) {
- infoList = vfsExpander.expandDirectory(infoList, basePath);
- logger.info("File list: {}", infoList);
- }
- return infoList.stream().map(file -> {
+ JobParamService jobParamService;
+
+ JobExecution latestJobExecution;
+
+ private List createConcurrentFlow(TransferJobRequest request) {
+ String basePath = request.getSource().getFileSourcePath();
+ return request.getSource().getInfoList().stream().map(file -> {
String idForStep = "";
if (!file.getId().isEmpty()) {
idForStep = file.getId();
@@ -103,118 +79,28 @@ private List createConcurrentFlow(List infoList, String basePa
idForStep = file.getPath();
}
SimpleStepBuilder stepBuilder = new StepBuilder(idForStep, this.jobRepository)
- .chunk(this.request.getOptions().getPipeSize(), this.platformTransactionManager);
+ .chunk(request.getOptions().getPipeSize(), this.platformTransactionManager);
stepBuilder
- .reader(getRightReader(request.getSource().getType(), file))
- .writer(getRightWriter(request.getDestination().getType(), file));
- if (this.request.getOptions().getParallelThreadCount() > 0) {
- stepBuilder.taskExecutor(threadPoolManager.parallelThreadPoolVirtual(request.getOptions().getParallelThreadCount() * request.getOptions().getConcurrencyThreadCount(), file.getPath()));
+ .reader(readerWriterFactory.getRightReader(request.getSource(), file, request.getOptions()))
+ .writer(readerWriterFactory.getRightWriter(request.getDestination(), file));
+ if (request.getOptions().getParallelThreadCount() > 0) {
+ stepBuilder.taskExecutor(threadPool.parallelPool(request.getOptions().getParallelThreadCount(), file.getPath()));
}
- stepBuilder.throttleLimit(64);
+ stepBuilder.faultTolerant()
+ .backOffPolicy(this.backOffPolicy);
return new FlowBuilder(basePath + idForStep)
.start(stepBuilder.build()).build();
}).collect(Collectors.toList());
}
- protected ItemReader getRightReader(EndpointType type, EntityInfo fileInfo) {
- switch (type) {
- case http:
- HttpReader hr = new HttpReader(fileInfo, request.getSource().getVfsSourceCredential());
- hr.setPool(connectionBag.getHttpReaderPool());
- return hr;
- case vfs:
- VfsReader vfsReader = new VfsReader(request.getSource().getVfsSourceCredential(), fileInfo);
- return vfsReader;
- case sftp:
- SFTPReader sftpReader = new SFTPReader(request.getSource().getVfsSourceCredential(), fileInfo, request.getOptions().getPipeSize());
- sftpReader.setPool(connectionBag.getSftpReaderPool());
- return sftpReader;
- case ftp:
- FTPReader ftpReader = new FTPReader(request.getSource().getVfsSourceCredential(), fileInfo);
- ftpReader.setPool(connectionBag.getFtpReaderPool());
- return ftpReader;
- case s3:
- AmazonS3Reader amazonS3Reader = new AmazonS3Reader(request.getSource().getVfsSourceCredential(), fileInfo);
- amazonS3Reader.setPool(connectionBag.getS3ReaderPool());
- return amazonS3Reader;
- case box:
- BoxReader boxReader = new BoxReader(request.getSource().getOauthSourceCredential(), fileInfo);
- boxReader.setMaxRetry(this.request.getOptions().getRetry());
- return boxReader;
- case dropbox:
- DropBoxReader dropBoxReader = new DropBoxReader(request.getSource().getOauthSourceCredential(), fileInfo);
- return dropBoxReader;
- case scp:
- SCPReader reader = new SCPReader(fileInfo);
- reader.setPool(connectionBag.getSftpReaderPool());
- return reader;
- case gdrive:
- GDriveReader dDriveReader = new GDriveReader(request.getSource().getOauthSourceCredential(), fileInfo);
- return dDriveReader;
- }
- return null;
- }
-
- protected ItemWriter getRightWriter(EndpointType type, EntityInfo fileInfo) {
- switch (type) {
- case vfs:
- VfsWriter vfsWriter = new VfsWriter(request.getDestination().getVfsDestCredential(), fileInfo, this.metricsCollector, this.influxCache);
- return vfsWriter;
- case sftp:
- SFTPWriter sftpWriter = new SFTPWriter(request.getDestination().getVfsDestCredential(), this.metricsCollector, this.influxCache);
- sftpWriter.setPool(connectionBag.getSftpWriterPool());
- return sftpWriter;
- case ftp:
- FTPWriter ftpWriter = new FTPWriter(request.getDestination().getVfsDestCredential(), fileInfo, this.metricsCollector, this.influxCache);
- ftpWriter.setPool(connectionBag.getFtpWriterPool());
- return ftpWriter;
- case s3:
- if (fileInfo.getSize() < TWENTY_MB) {
- AmazonS3SmallFileWriter amazonS3SmallFileWriter = new AmazonS3SmallFileWriter(request.getDestination().getVfsDestCredential(), fileInfo, this.metricsCollector, this.influxCache);
- amazonS3SmallFileWriter.setPool(connectionBag.getS3WriterPool());
- return amazonS3SmallFileWriter;
- } else {
- AmazonS3LargeFileWriter amazonS3LargeFileWriter = new AmazonS3LargeFileWriter(request.getDestination().getVfsDestCredential(), fileInfo, this.metricsCollector, this.influxCache);
- amazonS3LargeFileWriter.setPool(connectionBag.getS3WriterPool());
- return amazonS3LargeFileWriter;
- }
- case box:
- if (fileInfo.getSize() < TWENTY_MB) {
- BoxWriterSmallFile boxWriterSmallFile = new BoxWriterSmallFile(request.getDestination().getOauthDestCredential(), fileInfo, this.metricsCollector, this.influxCache);
- return boxWriterSmallFile;
- } else {
- BoxWriterLargeFile boxWriterLargeFile = new BoxWriterLargeFile(request.getDestination().getOauthDestCredential(), fileInfo, this.metricsCollector, this.influxCache);
- return boxWriterLargeFile;
- }
- case dropbox:
- DropBoxChunkedWriter dropBoxChunkedWriter = new DropBoxChunkedWriter(request.getDestination().getOauthDestCredential(), this.metricsCollector, this.influxCache);
- return dropBoxChunkedWriter;
- case scp:
- SCPWriter scpWriter = new SCPWriter(fileInfo, this.metricsCollector, this.influxCache);
- scpWriter.setPool(connectionBag.getSftpWriterPool());
- return scpWriter;
- case gdrive:
- if (fileInfo.getSize() < FIVE_MB) {
- GDriveSimpleWriter writer = new GDriveSimpleWriter(request.getDestination().getOauthDestCredential(), fileInfo);
- return writer;
- } else {
- GDriveResumableWriter writer = new GDriveResumableWriter(request.getDestination().getOauthDestCredential(), fileInfo);
- writer.setPool(connectionBag.getGoogleDriveWriterPool());
- return writer;
- }
- }
- return null;
- }
-
- public Job concurrentJobDefinition() {
- JobBuilder jobBuilder = new JobBuilder(this.request.getJobUuid().toString(), this.jobRepository);
- connectionBag.preparePools(this.request);
- List flows = createConcurrentFlow(request.getSource().getInfoList(), request.getSource().getFileSourcePath());
- this.influxIOService.reconfigureBucketForNewJob(this.request.getOwnerId());
+ public Job concurrentJobDefinition(TransferJobRequest request) {
+ JobBuilder jobBuilder = new JobBuilder(request.getJobUuid().toString(), this.jobRepository);
+ connectionBag.preparePools(request);
+ List flows = createConcurrentFlow(request);
+ this.influxIOService.reconfigureBucketForNewJob(request.getOwnerId());
Flow[] fl = new Flow[flows.size()];
Flow f = new FlowBuilder("splitFlow")
-// .split(this.threadPoolManager.stepTaskExecutorVirtual(this.request.getOptions().getConcurrencyThreadCount()))
- .split(this.threadPoolManager.stepTaskExecutorVirtual(this.request.getOptions().getConcurrencyThreadCount()))
+ .split(this.threadPool.stepPool(request.getOptions().getConcurrencyThreadCount()))
.add(flows.toArray(fl))
.build();
return jobBuilder
@@ -224,4 +110,11 @@ public Job concurrentJobDefinition() {
.build();
}
+ public JobExecution runJob(TransferJobRequest transferJobRequest) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
+ Job job = this.concurrentJobDefinition(transferJobRequest);
+ JobParameters jobParameters = this.jobParamService.translate(new JobParametersBuilder(), transferJobRequest);
+ this.latestJobExecution = this.jobLauncher.run(job, jobParameters);
+ return this.latestJobExecution;
+ }
+
}
\ No newline at end of file
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobParamService.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobParamService.java
index 2f52c965..3cfaf054 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobParamService.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobParamService.java
@@ -5,11 +5,13 @@
import org.onedatashare.transferservice.odstransferservice.model.TransferJobRequest;
import org.onedatashare.transferservice.odstransferservice.model.credential.AccountEndpointCredential;
import org.onedatashare.transferservice.odstransferservice.model.credential.EndpointCredential;
-import org.onedatashare.transferservice.odstransferservice.utility.S3Utility;
+import org.onedatashare.transferservice.odstransferservice.model.metrics.CarbonScore;
+import org.onedatashare.transferservice.odstransferservice.utility.ODSUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.net.URI;
@@ -22,6 +24,14 @@ public class JobParamService {
Logger logger = LoggerFactory.getLogger(JobParamService.class);
+ @Value("${spring.application.name}")
+ private String appName;
+
+ PmeterParser pmeterParser;
+
+ public JobParamService(PmeterParser pmeterParser) {
+ this.pmeterParser = pmeterParser;
+ }
/**
* Here we are adding basically the whole request except for sensitive credentials to the Job Params table.
@@ -52,7 +62,7 @@ public JobParameters translate(JobParametersBuilder builder, TransferJobRequest
builder.addLong(PIPELINING, (long) request.getOptions().getPipeSize());
builder.addString(COMPRESS, String.valueOf(request.getOptions().getCompress()));
builder.addLong(RETRY, (long) request.getOptions().getRetry());
- builder.addString(APP_NAME, System.getenv("APP_NAME"));
+ builder.addString(APP_NAME, this.appName);
builder.addString(OPTIMIZER, request.getOptions().getOptimizer());
builder.addLong(FILE_COUNT, (long) request.getSource().getInfoList().size());
long totalSize = 0L;
@@ -69,47 +79,30 @@ public JobParameters translate(JobParametersBuilder builder, TransferJobRequest
//adding the source host and source port to use for RTT & Latency measurements.
if (request.getSource().getVfsSourceCredential() != null) {
- builder.addString(SOURCE_HOST, this.uriFromEndpointCredential(request.getSource().getVfsSourceCredential(), sourceType));
+ String sourceIp = ODSUtility.uriFromEndpointCredential(request.getSource().getVfsSourceCredential(), sourceType);
+ builder.addString(SOURCE_HOST, sourceIp);
builder.addLong(SOURCE_PORT, (long) this.portFromEndpointCredential(request.getSource().getVfsSourceCredential(), sourceType));
+ CarbonScore score = this.pmeterParser.carbonAverageTraceRoute(sourceIp);
+ logger.info("Source Carbon Score: {}", score.avgCarbon);
+ builder.addLong(CARBON_SCORE_SOURCE, (long) score.avgCarbon);
} else if (request.getSource().getOauthSourceCredential() != null) {
- builder.addString(SOURCE_HOST, this.uriFromEndpointCredential(request.getSource().getOauthSourceCredential(), sourceType));
+ builder.addString(SOURCE_HOST, ODSUtility.uriFromEndpointCredential(request.getSource().getOauthSourceCredential(), sourceType));
builder.addLong(SOURCE_PORT, (long) this.portFromEndpointCredential(request.getSource().getOauthSourceCredential(), sourceType));
}
if (request.getDestination().getVfsDestCredential() != null) {
- builder.addString(DEST_HOST, this.uriFromEndpointCredential(request.getDestination().getVfsDestCredential(), destType));
+ String destIp = ODSUtility.uriFromEndpointCredential(request.getDestination().getVfsDestCredential(), destType);
+ builder.addString(DEST_HOST, destIp);
builder.addLong(DEST_PORT, (long) this.portFromEndpointCredential(request.getDestination().getVfsDestCredential(), destType));
+ CarbonScore score = this.pmeterParser.carbonAverageTraceRoute(destIp);
+ logger.info("Destination Carbon Score: {}", score.avgCarbon);
+ builder.addLong(CARBON_SCORE_DEST, (long) score.avgCarbon);
} else if (request.getDestination().getOauthDestCredential() != null) {
- builder.addString(DEST_HOST, this.uriFromEndpointCredential(request.getDestination().getOauthDestCredential(), destType));
+ builder.addString(DEST_HOST, ODSUtility.uriFromEndpointCredential(request.getDestination().getOauthDestCredential(), destType));
builder.addLong(DEST_PORT, (long) this.portFromEndpointCredential(request.getDestination().getOauthDestCredential(), destType));
}
-
return builder.toJobParameters();
}
- public String uriFromEndpointCredential(EndpointCredential credential, EndpointType type) {
- AccountEndpointCredential ac;
- switch (type) {
- case ftp:
- case sftp:
- case scp:
- case http:
- ac = (AccountEndpointCredential) credential;
- URI uri = URI.create(ac.getUri());
- return uri.getHost();
- case s3:
- ac = (AccountEndpointCredential) credential;
- URI s3Uri = URI.create(S3Utility.constructS3URI(ac.getUri(), ""));
- return s3Uri.getHost();
- case box:
- return "box.com";
- case dropbox:
- return "dropbox.com";
- case gdrive:
- return "drive.google.com";
- default:
- return "";
- }
- }
public int portFromEndpointCredential(EndpointCredential credential, EndpointType type) {
switch (type) {
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/cron/MetricsCollector.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/MetricsCollector.java
similarity index 94%
rename from src/main/java/org/onedatashare/transferservice/odstransferservice/service/cron/MetricsCollector.java
rename to src/main/java/org/onedatashare/transferservice/odstransferservice/service/MetricsCollector.java
index 2f68c69c..e330f9b9 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/cron/MetricsCollector.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/MetricsCollector.java
@@ -1,4 +1,4 @@
-package org.onedatashare.transferservice.odstransferservice.service.cron;
+package org.onedatashare.transferservice.odstransferservice.service;
import lombok.Getter;
import lombok.Setter;
@@ -6,10 +6,6 @@
import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants;
import org.onedatashare.transferservice.odstransferservice.model.JobMetric;
import org.onedatashare.transferservice.odstransferservice.model.metrics.DataInflux;
-import org.onedatashare.transferservice.odstransferservice.service.DatabaseService.InfluxIOService;
-import org.onedatashare.transferservice.odstransferservice.service.InfluxCache;
-import org.onedatashare.transferservice.odstransferservice.service.LatencyRtt;
-import org.onedatashare.transferservice.odstransferservice.service.PmeterParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobParameters;
@@ -85,7 +81,7 @@ public void collectAndSave() {
long maxMem = Runtime.getRuntime().maxMemory();
JobMetric currentAggregateMetric = influxCache.aggregateMetric(); //this metrics throughput is the throughput of the whole map in influxCache.
DataInflux lastPmeterData;
- if (this.metrics.size() < 1) {
+ if (this.metrics.isEmpty()) {
this.metrics.add(new DataInflux());
lastPmeterData = metrics.get(metrics.size() - 1);
} else {
@@ -134,6 +130,9 @@ public void collectAndSave() {
lastPmeterData.setJobSize(jobParameters.getLong(ODSConstants.JOB_SIZE));
lastPmeterData.setAvgFileSize(jobParameters.getLong(ODSConstants.FILE_SIZE_AVG));
lastPmeterData.setOdsUser(jobParameters.getString(ODSConstants.OWNER_ID));
+ lastPmeterData.setIsRunning(currentAggregateMetric.getStepExecution().getJobExecution().isRunning());
+ }else{
+ lastPmeterData.setIsRunning(false);
}
log.info(lastPmeterData.toString());
this.influxCache.clearCache();
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/OptimizerService.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/OptimizerService.java
deleted file mode 100644
index a5fa5457..00000000
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/OptimizerService.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.onedatashare.transferservice.odstransferservice.service;
-
-import org.onedatashare.transferservice.odstransferservice.model.optimizer.OptimizerCreateRequest;
-import org.onedatashare.transferservice.odstransferservice.model.optimizer.OptimizerDeleteRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.stereotype.Service;
-import org.springframework.web.client.RestClientException;
-import org.springframework.web.client.RestTemplate;
-
-import java.util.concurrent.CompletableFuture;
-
-@Service
-public class OptimizerService {
-
- @Autowired
- RestTemplate optimizerTemplate;
-
- @Value("${spring.application.name}")
- String appName;
-
- HttpHeaders headers;
-
- Logger logger = LoggerFactory.getLogger(OptimizerService.class);
-
- public OptimizerService() {
- headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
- }
-
- @Async("optimizerTaskExecutor")
- public void createOptimizerBlocking(OptimizerCreateRequest optimizerCreateRequest) throws RestClientException {
- optimizerCreateRequest.setNodeId(this.appName);
- logger.info("Sending OptimizerCreateRequest {}", optimizerCreateRequest);
- HttpEntity createRequestHttpEntity = new HttpEntity<>(optimizerCreateRequest, this.headers);
- logger.info(createRequestHttpEntity.getBody().toString());
- this.optimizerTemplate.postForLocation("/optimizer/create", createRequestHttpEntity, Void.class);
- CompletableFuture.completedFuture(null);
- }
-
- @Async("optimizerTaskExecutor")
- public void deleteOptimizerBlocking(OptimizerDeleteRequest optimizerDeleteRequest) {
- optimizerDeleteRequest.setNodeId(this.appName);
- try {
- this.optimizerTemplate.postForObject("/optimizer/delete", new HttpEntity<>(optimizerDeleteRequest, this.headers), Void.class);
- } catch (RestClientException e) {
- logger.error("Failed to Delete optimizer. {}", optimizerDeleteRequest);
- }
- logger.info("Deleted {}", optimizerDeleteRequest);
- CompletableFuture.completedFuture(null);
- }
-}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/PmeterParser.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/PmeterParser.java
index 07c534d4..564f7c3d 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/PmeterParser.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/PmeterParser.java
@@ -1,24 +1,33 @@
package org.onedatashare.transferservice.odstransferservice.service;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import jakarta.annotation.PostConstruct;
+import lombok.SneakyThrows;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
+import org.onedatashare.transferservice.odstransferservice.model.CarbonIpEntry;
+import org.onedatashare.transferservice.odstransferservice.model.metrics.CarbonScore;
import org.onedatashare.transferservice.odstransferservice.model.metrics.DataInflux;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.Socket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
@Service
public class PmeterParser {
@@ -26,17 +35,22 @@ public class PmeterParser {
private final String MEASURE = "measure";
private final ByteArrayOutputStream outputStream;
private final PumpStreamHandler streamHandler;
- private final DefaultExecutor executor;
+ private final DefaultExecutor pmeterExecutor;
private final ExecuteWatchdog watchDog;
+ @Value("${pmeter.nic}")
+ public String pmeterNic;
+
Logger logger = LoggerFactory.getLogger(PmeterParser.class);
+ @Value("${pmeter.carbon.path}")
+ String pmeterCarbonPath;
- @Value("${pmeter.report.path}")
- String pmeterReportPath;
+ @Value("${pmeter.carbon.map}")
+ String pmeterCarbonMapPath;
- @Value("${pmeter.interface}")
- String pmeterNic;
+ @Value("${pmeter.report.path}")
+ String pmeterMetricsPath;
@Value("${ods.user}")
String odsUser;
@@ -46,34 +60,43 @@ public class PmeterParser {
@Value("${pmeter.options}")
String pmeterOptions;
+
+ @Value("${pmeter.carbon.toggle}")
+ private boolean toggle;
+
ObjectMapper pmeterMapper;
private CommandLine cmdLine;
- @PostConstruct
- public void init() {
- this.cmdLine = CommandLine.parse(
- String.format("pmeter " + MEASURE + " %s --user %s --measure %s %s --file_name %s",
- pmeterNic, odsUser,
- measureCount, pmeterOptions, pmeterReportPath));
- }
- public PmeterParser(ObjectMapper pmeterMapper) {
+ public PmeterParser(Environment environment) {
this.outputStream = new ByteArrayOutputStream();
this.streamHandler = new PumpStreamHandler(outputStream);
- this.executor = new DefaultExecutor();
-
+ this.pmeterExecutor = new DefaultExecutor();
this.watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
- executor.setWatchdog(watchDog);
- executor.setStreamHandler(streamHandler);
+ pmeterExecutor.setWatchdog(watchDog);
+ pmeterExecutor.setStreamHandler(streamHandler);
+
+ this.pmeterMapper = new ObjectMapper();
+ this.pmeterMapper.registerModule(new JavaTimeModule());
+ this.pmeterMapper.configure(SerializationFeature.WRITE_DATE_KEYS_AS_TIMESTAMPS, false);
- this.pmeterMapper = pmeterMapper;
+
+ }
+
+ @PostConstruct
+ public void init() throws IOException {
+ if (this.pmeterNic == null || this.pmeterNic.isEmpty()) {
+ this.pmeterNic = this.discoverActiveNetworkInterface();
+ }
+ logger.info("Interface used for monitoring: {}", this.pmeterNic);
+ this.cmdLine = CommandLine.parse(String.format("pmeter " + MEASURE + " %s --user %s --measure %s %s --file_name %s", this.pmeterNic, odsUser, measureCount, pmeterOptions, pmeterMetricsPath));
}
public void runPmeter() {
try {
- executor.execute(cmdLine);
+ pmeterExecutor.execute(cmdLine);
} catch (IOException e) {
logger.error("Failed in executing pmeter script:\n " + cmdLine);
e.printStackTrace();
@@ -81,7 +104,7 @@ public void runPmeter() {
}
public List parsePmeterOutput() throws IOException {
- Path path = Paths.get(pmeterReportPath);
+ Path path = Paths.get(pmeterMetricsPath);
List allLines = Files.readAllLines(path);
List ret = new ArrayList<>();
for (String line : allLines) {
@@ -92,4 +115,67 @@ public List parsePmeterOutput() throws IOException {
path.toFile().createNewFile();
return ret;
}
+
+ public CarbonScore carbonAverageTraceRoute(String ip) {
+ //pmeter carbon 129.114.108.45
+ if (this.toggle == false || ip == null || ip.isEmpty()) return new CarbonScore();
+
+ CommandLine carbonCmd = CommandLine.parse(String.format("pmeter carbon %s", ip));
+ try {
+ DefaultExecutor carbonExecutor = new DefaultExecutor();
+ carbonExecutor.execute(carbonCmd);
+ Path filePath = Paths.get(this.pmeterCarbonPath);
+ List lines = Files.readAllLines(filePath);
+ CarbonScore score = new CarbonScore();
+ for (String line : lines) {
+ score = this.pmeterMapper.readValue(line, CarbonScore.class);
+ break;
+ }
+ filePath.toFile().delete();
+ filePath.toFile().createNewFile();
+ return score;
+ } catch (IOException e) {
+ logger.error("Error Carbon Average Trace Route:\n {}", e.getMessage());
+ return new CarbonScore();
+ }
+ }
+
+ public List carbonPerIp(String ip) throws IOException {
+ if (ip == null || ip.isEmpty()) return new ArrayList<>();
+ CommandLine carbonCmd = CommandLine.parse(String.format("pmeter carbon %s --save_per_ip=True", ip));
+ DefaultExecutor carbonExecutor = new DefaultExecutor();
+ carbonExecutor.execute(carbonCmd);
+ Path filePath = Paths.get(this.pmeterCarbonMapPath);
+ List lines = Files.readAllLines(filePath);
+ String lastLine = lines.getLast();
+ Map measurement = this.pmeterMapper.readValue(lastLine, new TypeReference