From 8f75340570b3577669af85ef711bdd598eb20319 Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Mon, 6 Mar 2023 17:14:38 -0500 Subject: [PATCH 01/18] intial deadletter queue setup changes --- .../config/RabbitMQConfig.java | 36 ++++++++++++++++++- .../consumer/RabbitMQConsumer.java | 19 +++++++++- .../listner/JobCompletionListener.java | 20 +++++++++++ src/main/resources/application.properties | 3 ++ 4 files changed, 76 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java index be81ad00..40934cfc 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java @@ -7,7 +7,9 @@ import org.springframework.context.annotation.Configuration; import java.util.Date; +import java.util.HashMap; import java.util.Locale; +import java.util.Map; @Configuration public class RabbitMQConfig { @@ -21,6 +23,15 @@ public class RabbitMQConfig { @Value("${ods.rabbitmq.routingkey}") String routingKey; + @Value("${ods.rabbitmq.dead-letter-exchange}") + private String deadLetterExchange; + + @Value("${ods.rabbitmq.dead-letter-routing-key}") + private String deadLetterRoutingKey; + + @Value("${ods.rabbitmq.dead-letter-queue}") + private String deadLetterQueueName; + @Bean public Gson gson() { GsonBuilder builder = new GsonBuilder() @@ -30,19 +41,42 @@ public Gson gson() { @Bean Queue userQueue(){ - //String name, boolean durable, boolean exclusive, boolean autoDelete + // Map args = new HashMap<>(); +// args.put("x-dead-letter-exchange", deadLetterExchange); +// args.put("x-dead-letter-routing-key", deadLetterRoutingKey); +// //String name, boolean durable, boolean exclusive, boolean autoDelete, args return new Queue(this.queueName, true, false, false); } + @Bean + public Queue deadLetterQueue() { + return new Queue(this.deadLetterQueueName, true,false, false); + } + + @Bean public DirectExchange exchange(){ return new DirectExchange(exchange); } + + @Bean + public DirectExchange deadLetterExchange(){ + return new DirectExchange(exchange); + } + @Bean public Binding binding(DirectExchange exchange, Queue userQueue){ return BindingBuilder.bind(userQueue) .to(exchange) .with(routingKey); } + + + @Bean + public Binding deadLetterBinding(){ + return BindingBuilder.bind(deadLetterQueue()) + .to(deadLetterExchange()) + .with(deadLetterRoutingKey); + } } diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java index e5afc255..dbbaf366 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java @@ -4,25 +4,29 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.onedatashare.transferservice.odstransferservice.Enum.EndpointType; import org.onedatashare.transferservice.odstransferservice.model.EntityInfo; import org.onedatashare.transferservice.odstransferservice.model.TransferJobRequest; +import org.onedatashare.transferservice.odstransferservice.model.DeadLetterQueueData; import org.onedatashare.transferservice.odstransferservice.model.optimizer.TransferApplicationParams; import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolManager; import org.onedatashare.transferservice.odstransferservice.service.DatabaseService.CrudService; +import org.onedatashare.transferservice.odstransferservice.service.DeadLetterQueueService; import org.onedatashare.transferservice.odstransferservice.service.JobControl; import org.onedatashare.transferservice.odstransferservice.service.JobParamService; import org.onedatashare.transferservice.odstransferservice.service.VfsExpander; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.ArrayList; @@ -47,6 +51,17 @@ public class RabbitMQConsumer { VfsExpander vfsExpander; + DeadLetterQueueService deadLetterQueueService; + + @Value("${ods.rabbitmq.dead-letter-exchange}") + private String deadLetterExchange; + + @Value("${ods.rabbitmq.dead-letter-routing-key}") + private String deadLetterRoutingKey; + + @Autowired + AmqpTemplate rmqTemplate; + public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamService jobParamService, JobLauncher asyncJobLauncher, JobControl jc, CrudService crudService, ThreadPoolManager threadPoolManager) { this.vfsExpander = vfsExpander; this.userQueue = userQueue; @@ -89,6 +104,8 @@ public void consumeDefaultMessage(final Message message) { this.threadPoolManager.applyOptimizer(params.getConcurrency(), params.getParallelism()); } catch (JsonProcessingException e) { logger.info("Did not apply transfer params due to parsing message failure"); + DeadLetterQueueData failedMessage = deadLetterQueueService.getDataFromMesaage(message, e); + rmqTemplate.convertAndSend(deadLetterExchange,deadLetterRoutingKey,failedMessage); e.printStackTrace(); } } diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java index 6a92a8b7..47b7ee31 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java @@ -1,16 +1,21 @@ package org.onedatashare.transferservice.odstransferservice.service.listner; import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants; +import org.onedatashare.transferservice.odstransferservice.model.DeadLetterQueueData; import org.onedatashare.transferservice.odstransferservice.model.optimizer.OptimizerCreateRequest; import org.onedatashare.transferservice.odstransferservice.model.optimizer.OptimizerDeleteRequest; import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolManager; import org.onedatashare.transferservice.odstransferservice.service.ConnectionBag; +import org.onedatashare.transferservice.odstransferservice.service.DeadLetterQueueService; import org.onedatashare.transferservice.odstransferservice.service.OptimizerService; import org.onedatashare.transferservice.odstransferservice.service.cron.MetricsCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @@ -45,6 +50,17 @@ public class JobCompletionListener extends JobExecutionListenerSupport { int maxPipe; boolean optimizerEnable; + DeadLetterQueueService deadLetterQueueService; + + @Value("${ods.rabbitmq.dead-letter-exchange}") + private String deadLetterExchange; + + @Value("${ods.rabbitmq.dead-letter-routing-key}") + private String deadLetterRoutingKey; + + @Autowired + AmqpTemplate rmqTemplate; + public JobCompletionListener(ThreadPoolManager threadPoolManager, OptimizerService optimizerService, MetricsCollector metricsCollector, ConnectionBag connectionBag) { this.threadPoolManager = threadPoolManager; this.optimizerService = optimizerService; @@ -84,6 +100,10 @@ public void afterJob(JobExecution jobExecution) { this.optimizerService.deleteOptimizerBlocking(new OptimizerDeleteRequest(appName)); this.optimizerEnable = false; } + if(jobExecution.getExitStatus().equals(ExitStatus.FAILED)){ + DeadLetterQueueData failedMessage = deadLetterQueueService.getDataFromJobExecution(jobExecution.getJobParameters(), jobExecution.getFailureExceptions()); + rmqTemplate.convertAndSend(deadLetterExchange,deadLetterRoutingKey,failedMessage); + } } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f5ef659b..6c182d58 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -32,6 +32,9 @@ spring.batch.job.enabled=false #RabitMQ ods.rabbitmq.exchange=ods.exchange +ods.rabbitmq.dead-letter-exchange=ods.dead-letter-exchange +ods.rabbitmq.dead-letter-routing-key= ods.dead-letter-routing-key +ods.rabbitmq.dead-letter-queue= ods.dead-letter-queue #for vfs nodes this should be the APP_NAME which is always lowercase. ods.rabbitmq.queue=${CONNECTOR_QUEUE:transferQueue} ods.rabbitmq.routingkey=${CONNECTOR_QUEUE:ods.routing} From 74b372a1f7c176f0a395abd7cb34703f00da9d8f Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Mon, 6 Mar 2023 17:17:34 -0500 Subject: [PATCH 02/18] intial deadletter queue setup changes --- .../consumer/RabbitMQConsumer.java | 1 + .../model/DeadLetterQueueData.java | 9 +++++++ .../service/DeadLetterQueueService.java | 27 +++++++++++++++++++ 3 files changed, 37 insertions(+) create mode 100644 src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java create mode 100644 src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java index dbbaf366..e7af264d 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.onedatashare.transferservice.odstransferservice.Enum.EndpointType; import org.onedatashare.transferservice.odstransferservice.model.EntityInfo; diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java new file mode 100644 index 00000000..4c90dbd0 --- /dev/null +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java @@ -0,0 +1,9 @@ +package org.onedatashare.transferservice.odstransferservice.model; + +import java.util.List; + +public class DeadLetterQueueData { + public List failureException; + + +} diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java new file mode 100644 index 00000000..36168b98 --- /dev/null +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java @@ -0,0 +1,27 @@ +package org.onedatashare.transferservice.odstransferservice.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.onedatashare.transferservice.odstransferservice.model.DeadLetterQueueData; +import org.springframework.amqp.core.Message; +import org.springframework.batch.core.JobParameters; + +import java.util.ArrayList; +import java.util.List; + +public class DeadLetterQueueService { + + public DeadLetterQueueData getDataFromMesaage(Message message, JsonProcessingException exception){ + DeadLetterQueueData data = new DeadLetterQueueData(); + data.failureException = new ArrayList<>(); + data.failureException.add(exception); + return data; + } + + public DeadLetterQueueData getDataFromJobExecution(JobParameters jobParameters, List failureExceptions){ + DeadLetterQueueData data = new DeadLetterQueueData(); + data.failureException = new ArrayList<>(failureExceptions); + return data; + } + + +} From 6ba00732ce8404ef554546119f91e9d6e955e655 Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 15:41:48 -0400 Subject: [PATCH 03/18] changes --- .../consumer/RabbitMQConsumer.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java index e7af264d..637786ec 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java @@ -76,10 +76,17 @@ public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamServic this.objectMapper.setDefaultPropertyInclusion(JsonInclude.Include.ALWAYS); } + @RabbitListener(queues = "ods.dead-letter-queue") + public void handleFailedMessage(String failedMessage) { + // handle failed message here + } + @RabbitListener(queues = "#{userQueue}") public void consumeDefaultMessage(final Message message) { String jsonStr = new String(message.getBody()); logger.info("Message recv: {}", jsonStr); + JobParameters parameters = null; + List failureException = new ArrayList<>(); try { TransferJobRequest request = objectMapper.readValue(jsonStr, TransferJobRequest.class); logger.info("Job Recieved: {}",request.toString()); @@ -88,15 +95,17 @@ public void consumeDefaultMessage(final Message message) { request.getSource().setInfoList(new ArrayList<>(fileExpandedList)); } try { - JobParameters parameters = jobParamService.translate(new JobParametersBuilder(), request); + parameters = jobParamService.translate(new JobParametersBuilder(), request); crudService.insertBeforeTransfer(request); jc.setRequest(request); asyncJobLauncher.run(jc.concurrentJobDefinition(), parameters); return; } catch (Exception e) { + failureException.add(e); e.printStackTrace(); } } catch (JsonProcessingException e) { + failureException.add(e); logger.debug("Failed to parse jsonStr:{} to TransferJobRequest.java", jsonStr); } try { @@ -105,7 +114,8 @@ public void consumeDefaultMessage(final Message message) { this.threadPoolManager.applyOptimizer(params.getConcurrency(), params.getParallelism()); } catch (JsonProcessingException e) { logger.info("Did not apply transfer params due to parsing message failure"); - DeadLetterQueueData failedMessage = deadLetterQueueService.getDataFromMesaage(message, e); + failureException.add(e); + DeadLetterQueueData failedMessage = deadLetterQueueService.convertDataToDLQ( parameters, failureException, null); rmqTemplate.convertAndSend(deadLetterExchange,deadLetterRoutingKey,failedMessage); e.printStackTrace(); } From 5ecff8fe6874eddd8ef2673b45c87ac87a72bb3e Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 15:42:11 -0400 Subject: [PATCH 04/18] changes --- .../model/DeadLetterQueueData.java | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java index 4c90dbd0..54c323bb 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java @@ -1,9 +1,43 @@ package org.onedatashare.transferservice.odstransferservice.model; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; + +import java.io.Serializable; +import java.util.Collection; import java.util.List; -public class DeadLetterQueueData { +public class DeadLetterQueueData implements Serializable { public List failureException; + public JobParameters jobParameters; + + public String failureString; + + public Collection stepExecutions; + + public JobParameters getJobParameters() { + return jobParameters; + } + + public void setJobParameters(JobParameters jobParameters) { + this.jobParameters = jobParameters; + } + public List getFailureException() { + return failureException; + } + + public void setFailureException(List failureException) { + this.failureException = failureException; + } + + public Collection getStepExecutions() { + return stepExecutions; + } + + public void setStepExecutions(Collection stepExecutions) { + this.stepExecutions = stepExecutions; + } } + From c0145c9fea08fe76cf17ef39251130d33ea46e4b Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 15:42:41 -0400 Subject: [PATCH 05/18] changes --- .../service/DeadLetterQueueService.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java index 36168b98..331784b8 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java @@ -4,24 +4,22 @@ import org.onedatashare.transferservice.odstransferservice.model.DeadLetterQueueData; import org.springframework.amqp.core.Message; import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; +import org.springframework.stereotype.Service; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +@Service public class DeadLetterQueueService { - public DeadLetterQueueData getDataFromMesaage(Message message, JsonProcessingException exception){ - DeadLetterQueueData data = new DeadLetterQueueData(); - data.failureException = new ArrayList<>(); - data.failureException.add(exception); - return data; - } - - public DeadLetterQueueData getDataFromJobExecution(JobParameters jobParameters, List failureExceptions){ + public DeadLetterQueueData convertDataToDLQ(JobParameters jobParameters, List failureExceptions, Collection stepExecutions){ DeadLetterQueueData data = new DeadLetterQueueData(); data.failureException = new ArrayList<>(failureExceptions); + data.jobParameters = jobParameters; + data.stepExecutions = new ArrayList<>(stepExecutions); return data; } - } From 115c2c70721fe05558fb972ad86ce03b15eb1679 Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 15:42:57 -0400 Subject: [PATCH 06/18] changes --- .../service/listner/JobCompletionListener.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java index 47b7ee31..138ab9cb 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java @@ -50,6 +50,7 @@ public class JobCompletionListener extends JobExecutionListenerSupport { int maxPipe; boolean optimizerEnable; + @Autowired DeadLetterQueueService deadLetterQueueService; @Value("${ods.rabbitmq.dead-letter-exchange}") @@ -100,8 +101,8 @@ public void afterJob(JobExecution jobExecution) { this.optimizerService.deleteOptimizerBlocking(new OptimizerDeleteRequest(appName)); this.optimizerEnable = false; } - if(jobExecution.getExitStatus().equals(ExitStatus.FAILED)){ - DeadLetterQueueData failedMessage = deadLetterQueueService.getDataFromJobExecution(jobExecution.getJobParameters(), jobExecution.getFailureExceptions()); + if(jobExecution.getExitStatus().getExitCode()== "FAILED"){ + DeadLetterQueueData failedMessage = deadLetterQueueService.convertDataToDLQ(jobExecution.getJobParameters(), jobExecution.getFailureExceptions(), jobExecution.getStepExecutions()); rmqTemplate.convertAndSend(deadLetterExchange,deadLetterRoutingKey,failedMessage); } } From d0b2db2d8f5c0b560b44b55e21b69512d84853f0 Mon Sep 17 00:00:00 2001 From: Jacob Goldverg Date: Sun, 12 Mar 2023 15:44:44 -0400 Subject: [PATCH 07/18] optimizer bug when the string is empty --- .../service/listner/JobCompletionListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java index 138ab9cb..6d8d3fad 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java @@ -77,7 +77,7 @@ public void beforeJob(JobExecution jobExecution) { long fileCount = jobExecution.getJobParameters().getLong(ODSConstants.FILE_COUNT); String optimizerType = jobExecution.getJobParameters().getString(ODSConstants.OPTIMIZER); if(optimizerType != null){ - if(!optimizerType.equals("None")) { + if(!optimizerType.equals("None") && !optimizerType.isEmpty()) { OptimizerCreateRequest createRequest = new OptimizerCreateRequest(appName, maxConc, maxParallel, maxPipe, optimizerType, fileCount); optimizerService.createOptimizerBlocking(createRequest); this.optimizerEnable = true; From 6f4378c519a283ddd4c9929ebe27a40eb59da840 Mon Sep 17 00:00:00 2001 From: Jacob Goldverg Date: Sun, 12 Mar 2023 15:54:34 -0400 Subject: [PATCH 08/18] we throw a fatal job crashing exception when the destination directory cant be created --- .../odstransferservice/service/step/vfs/VfsWriter.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/vfs/VfsWriter.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/vfs/VfsWriter.java index 20408e4b..29146d38 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/vfs/VfsWriter.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/vfs/VfsWriter.java @@ -53,13 +53,11 @@ public ExitStatus afterStep(StepExecution stepExecution) throws IOException { return stepExecution.getExitStatus(); } - public void prepareDirectories() { + public void prepareDirectories() throws IOException{ try { Files.createDirectories(Paths.get(this.destinationPath)); } catch (FileAlreadyExistsException fileAlreadyExistsException) { logger.warn("Already have the file with this path \t" + this.filePath.toString()); - } catch (IOException e) { - e.printStackTrace(); } } From 5053c4bbfb7e025b967ef1ec68777508b027219e Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 17:20:59 -0400 Subject: [PATCH 09/18] able to push to deadLetterQueue --- .../config/RabbitMQConfig.java | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java index 40934cfc..2774e0d6 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java @@ -23,9 +23,6 @@ public class RabbitMQConfig { @Value("${ods.rabbitmq.routingkey}") String routingKey; - @Value("${ods.rabbitmq.dead-letter-exchange}") - private String deadLetterExchange; - @Value("${ods.rabbitmq.dead-letter-routing-key}") private String deadLetterRoutingKey; @@ -41,9 +38,6 @@ public Gson gson() { @Bean Queue userQueue(){ - // Map args = new HashMap<>(); -// args.put("x-dead-letter-exchange", deadLetterExchange); -// args.put("x-dead-letter-routing-key", deadLetterRoutingKey); // //String name, boolean durable, boolean exclusive, boolean autoDelete, args return new Queue(this.queueName, true, false, false); } @@ -59,12 +53,6 @@ public DirectExchange exchange(){ return new DirectExchange(exchange); } - - @Bean - public DirectExchange deadLetterExchange(){ - return new DirectExchange(exchange); - } - @Bean public Binding binding(DirectExchange exchange, Queue userQueue){ return BindingBuilder.bind(userQueue) @@ -74,9 +62,9 @@ public Binding binding(DirectExchange exchange, Queue userQueue){ @Bean - public Binding deadLetterBinding(){ - return BindingBuilder.bind(deadLetterQueue()) - .to(deadLetterExchange()) + public Binding deadLetterBinding(DirectExchange exchange, Queue deadLetterQueue){ + return BindingBuilder.bind(deadLetterQueue) + .to(exchange) .with(deadLetterRoutingKey); } } From 5098fdb00c2d4072c59640d8738e2b705f0dd8af Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 17:21:14 -0400 Subject: [PATCH 10/18] able to push to deadLetterQueue --- .../consumer/RabbitMQConsumer.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java index 637786ec..8cbd600a 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java @@ -23,6 +23,7 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; @@ -30,13 +31,15 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import java.awt.*; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @Service public class RabbitMQConsumer { - private final ObjectMapper objectMapper; + private final ObjectMapper objectMapper; private final ThreadPoolManager threadPoolManager; Logger logger = LoggerFactory.getLogger(RabbitMQConsumer.class); @@ -54,16 +57,15 @@ public class RabbitMQConsumer { DeadLetterQueueService deadLetterQueueService; - @Value("${ods.rabbitmq.dead-letter-exchange}") + @Value("${ods.rabbitmq.exchange}") private String deadLetterExchange; @Value("${ods.rabbitmq.dead-letter-routing-key}") private String deadLetterRoutingKey; - @Autowired AmqpTemplate rmqTemplate; - public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamService jobParamService, JobLauncher asyncJobLauncher, JobControl jc, CrudService crudService, ThreadPoolManager threadPoolManager) { + public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamService jobParamService, JobLauncher asyncJobLauncher, JobControl jc, CrudService crudService, ThreadPoolManager threadPoolManager, AmqpTemplate rmqTemplate, DeadLetterQueueService deadLetterQueueService) { this.vfsExpander = vfsExpander; this.userQueue = userQueue; this.jobParamService = jobParamService; @@ -74,12 +76,18 @@ public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamServic this.objectMapper = new ObjectMapper(); this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); this.objectMapper.setDefaultPropertyInclusion(JsonInclude.Include.ALWAYS); + this.rmqTemplate = rmqTemplate; + this.deadLetterQueueService = deadLetterQueueService; } - @RabbitListener(queues = "ods.dead-letter-queue") - public void handleFailedMessage(String failedMessage) { - // handle failed message here - } + //Can be used in future for processing the messages from DeadLetterQueue +// @RabbitListener(queues = "ods.dead-letter-queue") +// public void handleFailedMessage( Message message) { +// Object payload = new SimpleMessageConverter().fromMessage(message); +// if (payload instanceof DeadLetterQueueData) { +// DeadLetterQueueData myObject = (DeadLetterQueueData) payload; +// } +// } @RabbitListener(queues = "#{userQueue}") public void consumeDefaultMessage(final Message message) { From df43a06a6bbd99438a246d029d833452dc61b0f2 Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 17:21:26 -0400 Subject: [PATCH 11/18] able to push to deadLetterQueue --- .../model/DeadLetterQueueData.java | 30 ++----------------- 1 file changed, 2 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java index 54c323bb..e5e3ab0f 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java @@ -1,5 +1,6 @@ package org.onedatashare.transferservice.odstransferservice.model; +import lombok.Data; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; @@ -7,37 +8,10 @@ import java.util.Collection; import java.util.List; +@Data public class DeadLetterQueueData implements Serializable { public List failureException; - public JobParameters jobParameters; - - public String failureString; - public Collection stepExecutions; - - public JobParameters getJobParameters() { - return jobParameters; - } - - public void setJobParameters(JobParameters jobParameters) { - this.jobParameters = jobParameters; - } - - public List getFailureException() { - return failureException; - } - - public void setFailureException(List failureException) { - this.failureException = failureException; - } - - public Collection getStepExecutions() { - return stepExecutions; - } - - public void setStepExecutions(Collection stepExecutions) { - this.stepExecutions = stepExecutions; - } } From dccaff1334ee122110d96226232f842ada16e5ef Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 17:21:44 -0400 Subject: [PATCH 12/18] able to push to deadLetterQueue --- .../service/listner/JobCompletionListener.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java index 6d8d3fad..3622115b 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java @@ -50,24 +50,24 @@ public class JobCompletionListener extends JobExecutionListenerSupport { int maxPipe; boolean optimizerEnable; - @Autowired DeadLetterQueueService deadLetterQueueService; - @Value("${ods.rabbitmq.dead-letter-exchange}") + @Value("${ods.rabbitmq.exchange}") private String deadLetterExchange; @Value("${ods.rabbitmq.dead-letter-routing-key}") private String deadLetterRoutingKey; - @Autowired AmqpTemplate rmqTemplate; - public JobCompletionListener(ThreadPoolManager threadPoolManager, OptimizerService optimizerService, MetricsCollector metricsCollector, ConnectionBag connectionBag) { + public JobCompletionListener(ThreadPoolManager threadPoolManager, OptimizerService optimizerService, MetricsCollector metricsCollector, ConnectionBag connectionBag, AmqpTemplate rmqTemplate, DeadLetterQueueService deadLetterQueueService) { this.threadPoolManager = threadPoolManager; this.optimizerService = optimizerService; this.metricsCollector = metricsCollector; this.connectionBag = connectionBag; this.optimizerEnable = false; + this.rmqTemplate = rmqTemplate; + this.deadLetterQueueService = deadLetterQueueService; } @@ -101,7 +101,8 @@ public void afterJob(JobExecution jobExecution) { this.optimizerService.deleteOptimizerBlocking(new OptimizerDeleteRequest(appName)); this.optimizerEnable = false; } - if(jobExecution.getExitStatus().getExitCode()== "FAILED"){ + String exitCode = jobExecution.getExitStatus().getExitCode(); + if(!exitCode.equals("EXECUTING") && !exitCode.equals("COMPLETED")){ DeadLetterQueueData failedMessage = deadLetterQueueService.convertDataToDLQ(jobExecution.getJobParameters(), jobExecution.getFailureExceptions(), jobExecution.getStepExecutions()); rmqTemplate.convertAndSend(deadLetterExchange,deadLetterRoutingKey,failedMessage); } From 7a8470cf2032300d866c31e200f1455aeace48f0 Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 17:29:31 -0400 Subject: [PATCH 13/18] reverting some unwanted changes --- .../service/listner/JobCompletionListener.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java index 3622115b..30d96bdc 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java @@ -12,10 +12,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; -import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; From 72fcd0d2beca17b098a056168646be89f0229155 Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 17:29:45 -0400 Subject: [PATCH 14/18] reverting some unwanted changes --- .../odstransferservice/service/DeadLetterQueueService.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java index 331784b8..95897327 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java @@ -1,8 +1,6 @@ package org.onedatashare.transferservice.odstransferservice.service; -import com.fasterxml.jackson.core.JsonProcessingException; import org.onedatashare.transferservice.odstransferservice.model.DeadLetterQueueData; -import org.springframework.amqp.core.Message; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; import org.springframework.stereotype.Service; From 34def78830b7efebeae742ab3aea0bd06a03cac8 Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 17:29:59 -0400 Subject: [PATCH 15/18] reverting some unwanted changes --- .../odstransferservice/consumer/RabbitMQConsumer.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java index 8cbd600a..793e7a59 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java @@ -23,23 +23,19 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import java.awt.*; -import java.io.IOException; import java.util.ArrayList; import java.util.List; @Service public class RabbitMQConsumer { - private final ObjectMapper objectMapper; + private final ObjectMapper objectMapper; private final ThreadPoolManager threadPoolManager; Logger logger = LoggerFactory.getLogger(RabbitMQConsumer.class); @@ -80,7 +76,7 @@ public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamServic this.deadLetterQueueService = deadLetterQueueService; } - //Can be used in future for processing the messages from DeadLetterQueue + //Uncomment and use in future for processing the messages from DeadLetterQueue // @RabbitListener(queues = "ods.dead-letter-queue") // public void handleFailedMessage( Message message) { // Object payload = new SimpleMessageConverter().fromMessage(message); From b411219811016701d65a167376727ab38c6a59ed Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 17:30:10 -0400 Subject: [PATCH 16/18] reverting some unwanted changes --- .../odstransferservice/config/RabbitMQConfig.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java index 2774e0d6..64b7798a 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java @@ -7,9 +7,6 @@ import org.springframework.context.annotation.Configuration; import java.util.Date; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; @Configuration public class RabbitMQConfig { @@ -38,7 +35,7 @@ public Gson gson() { @Bean Queue userQueue(){ -// //String name, boolean durable, boolean exclusive, boolean autoDelete, args + //String name, boolean durable, boolean exclusive, boolean autoDelete, args return new Queue(this.queueName, true, false, false); } From 52ede76135b84ac43f416de046c1df679c14f90d Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 17:30:49 -0400 Subject: [PATCH 17/18] reverting some unwanted changes --- src/main/resources/application.properties | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 6c182d58..810b25d0 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -32,7 +32,6 @@ spring.batch.job.enabled=false #RabitMQ ods.rabbitmq.exchange=ods.exchange -ods.rabbitmq.dead-letter-exchange=ods.dead-letter-exchange ods.rabbitmq.dead-letter-routing-key= ods.dead-letter-routing-key ods.rabbitmq.dead-letter-queue= ods.dead-letter-queue #for vfs nodes this should be the APP_NAME which is always lowercase. From 53308ba2f0295bbdb112f2726fd529b6e588884c Mon Sep 17 00:00:00 2001 From: SravyaPo <99841571+SravyaPo@users.noreply.github.com> Date: Sun, 12 Mar 2023 17:33:29 -0400 Subject: [PATCH 18/18] reverting some unwanted changes --- .../odstransferservice/config/RabbitMQConfig.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java index 64b7798a..502cf6da 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java @@ -7,6 +7,7 @@ import org.springframework.context.annotation.Configuration; import java.util.Date; +import java.util.Locale; @Configuration public class RabbitMQConfig { @@ -35,7 +36,7 @@ public Gson gson() { @Bean Queue userQueue(){ - //String name, boolean durable, boolean exclusive, boolean autoDelete, args + //String name, boolean durable, boolean exclusive, boolean autoDelete return new Queue(this.queueName, true, false, false); }