diff --git a/build.gradle b/build.gradle
index 0e5e13d..788b855 100644
--- a/build.gradle
+++ b/build.gradle
@@ -13,11 +13,28 @@ subprojects {
}
dependencies {
+ if (project.name != 'desm-core') {
+ implementation project(':desm-core')
+ }
+
+ // Testing
testImplementation platform('org.junit:junit-bom:5.10.0')
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation 'org.mockito:mockito-core:5.17.0'
testImplementation 'org.mockito:mockito-junit-jupiter:5.17.0'
testImplementation 'org.mockito:mockito-inline:5.2.0'
+
+ // Lombok
+ compileOnly 'org.projectlombok:lombok:1.18.38'
+ annotationProcessor 'org.projectlombok:lombok:1.18.38'
+ testCompileOnly 'org.projectlombok:lombok:1.18.38'
+ testAnnotationProcessor 'org.projectlombok:lombok:1.18.38'
+
+ // Gson
+ implementation 'com.google.code.gson:gson:2.13.1'
+
+ // MQTT
+ implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
}
repositories {
diff --git a/desm-client/build.gradle b/desm-client/build.gradle
new file mode 100644
index 0000000..e69de29
diff --git a/desm-common/build.gradle b/desm-common/build.gradle
deleted file mode 100644
index f5c5c46..0000000
--- a/desm-common/build.gradle
+++ /dev/null
@@ -1,3 +0,0 @@
-dependencies {
- implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
-}
diff --git a/desm-common/src/main/java/org/example/Constant.java b/desm-common/src/main/java/org/example/Constant.java
deleted file mode 100644
index 5bf1c23..0000000
--- a/desm-common/src/main/java/org/example/Constant.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.example;
-
-public class Constant {
- public static final String MQTT_BROKER = "tcp://localhost:1883";
-
- // MQTT topics
- public static final String ENERGY_REQ_TOPIC = "energy/request";
-}
diff --git a/desm-common/src/main/java/org/example/dto/PowerRequest.java b/desm-common/src/main/java/org/example/dto/PowerRequest.java
deleted file mode 100644
index b8c9d61..0000000
--- a/desm-common/src/main/java/org/example/dto/PowerRequest.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.example.dto;
-
-public class PowerRequest {
- private final int power;
- private final long timestamp;
-
- public PowerRequest(int power, long timestamp) {
- this.power = power;
- this.timestamp = timestamp;
- }
-
- public int getPower() {
- return power;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-}
diff --git a/desm-common/src/main/java/org/example/handler/MqttExceptionHandler.java b/desm-common/src/main/java/org/example/handler/MqttExceptionHandler.java
deleted file mode 100644
index 82a963e..0000000
--- a/desm-common/src/main/java/org/example/handler/MqttExceptionHandler.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.example.handler;
-
-import org.eclipse.paho.client.mqttv3.MqttException;
-
-public class MqttExceptionHandler {
- public static void printErrorInfo(MqttException e) {
- System.out.printf("Reason: %d%n", e.getReasonCode());
- System.out.printf("Message: %s%n", e.getMessage());
- System.out.printf("LocalizedMessage: %s%n", e.getLocalizedMessage());
- System.out.printf("Cause: %s%n", e.getCause());
- }
-}
diff --git a/desm-core/build.gradle b/desm-core/build.gradle
new file mode 100644
index 0000000..e69de29
diff --git a/desm-core/src/main/java/org/example/desm/common/Constant.java b/desm-core/src/main/java/org/example/desm/common/Constant.java
new file mode 100644
index 0000000..a6ee589
--- /dev/null
+++ b/desm-core/src/main/java/org/example/desm/common/Constant.java
@@ -0,0 +1,14 @@
+package org.example.desm.common;
+
+public class Constant {
+ public static final String MQTT_BROKER_ADDRESS = "tcp://localhost:1883";
+
+ // MQTT topics
+ public static final String TOPIC_ENERGY_REQUEST = "energy/request";
+ public static final String TOPIC_CO2_MEASUREMENT = "co2/measurement";
+
+ // API endpoints
+ public static final String ENDPOINT_PLANTS = "/plants";
+ public static final String ENDPOINT_REGISTER_PLANT = "/register";
+ public static final String ENDPOINT_POLLUTION = "/pollution";
+}
diff --git a/desm-core/src/main/java/org/example/desm/common/MyReadWriteLock.java b/desm-core/src/main/java/org/example/desm/common/MyReadWriteLock.java
new file mode 100644
index 0000000..9535479
--- /dev/null
+++ b/desm-core/src/main/java/org/example/desm/common/MyReadWriteLock.java
@@ -0,0 +1,100 @@
+package org.example.desm.common;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * Personal implementation of a Read-Write Lock using a FIFO queue.
+ *
+ * This lock ensures that:
+ *
+ * - Multiple readers can hold the lock simultaneously if no writer is active or waiting before them.
+ * - Writers have exclusive access, blocking both readers and other writers.
+ * - Requests are processed in FIFO order, avoiding readers or writers starvation exploiting fairness.
+ *
+ *
+ * Each thread adds a request to a queue and waits until it reaches the head and the lock is available.
+ */
+public class MyReadWriteLock {
+ /**
+ * A support class that represents a request to acquire a lock, either read or write.
+ */
+ private static final class Request {
+ // Support class
+ }
+
+ private int activeReaders;
+ private boolean isWriting;
+ private final Queue queue;
+
+ public MyReadWriteLock() {
+ this.activeReaders = 0;
+ this.isWriting = false;
+ this.queue = new LinkedList<>();
+ }
+
+ /**
+ * Acquires the read lock. Multiple threads can acquire the read lock as long as:
+ *
+ * - There is no writer currently holding the lock.
+ * - The current read request is at the head of the queue.
+ *
+ *
+ * @throws InterruptedException if the thread is interrupted while waiting.
+ */
+ public synchronized void lockRead() throws InterruptedException {
+ Request request = new Request();
+ queue.add(request);
+
+ while (queue.peek() != request || isWriting) {
+ wait();
+ }
+
+ queue.poll();
+ activeReaders++;
+ notifyAll();
+ }
+
+ /**
+ * Releases the read lock. If no more readers are present, waiting writers
+ * may be notified.
+ */
+ public synchronized void unlockRead() {
+ activeReaders--;
+ if (activeReaders == 0) {
+ notifyAll();
+ }
+ }
+
+ /**
+ * Acquires the write lock. This method blocks until:
+ *
+ * - The current write request is at the head of the queue.
+ * - There are no active readers.
+ * - There is no other writer holding the lock.
+ *
+ *
+ * @throws InterruptedException if the thread is interrupted while waiting.
+ */
+ public synchronized void lockWrite() throws InterruptedException {
+ Request request = new Request();
+ queue.add(request);
+
+ while (queue.peek() != request || isWriting || activeReaders > 0) {
+ wait();
+ }
+
+ queue.poll();
+ isWriting = true;
+ notifyAll();
+ }
+
+ /**
+ * Releases the write lock. Notifies waiting readers or writers so they
+ * can attempt to acquire the lock.
+ */
+ public synchronized void unlockWrite() {
+ isWriting = false;
+ notifyAll();
+ }
+}
diff --git a/desm-core/src/main/java/org/example/desm/common/SlidingWindowBuffer.java b/desm-core/src/main/java/org/example/desm/common/SlidingWindowBuffer.java
new file mode 100644
index 0000000..6260e2d
--- /dev/null
+++ b/desm-core/src/main/java/org/example/desm/common/SlidingWindowBuffer.java
@@ -0,0 +1,48 @@
+package org.example.desm.common;
+
+import org.example.desm.common.simulator.Buffer;
+import org.example.desm.common.simulator.Measurement;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SlidingWindowBuffer implements Buffer {
+ private static final int WINDOW_SIZE = 8;
+ private static final int SLIDE_SIZE = WINDOW_SIZE / 2;
+
+ private final String plantId;
+ private final List buffer;
+ private final List computedAverages;
+
+ public SlidingWindowBuffer(String plantId) {
+ this.plantId = plantId;
+ this.buffer = new ArrayList<>();
+ this.computedAverages = new ArrayList<>();
+ }
+
+ @Override
+ public synchronized void addMeasurement(Measurement m) {
+ buffer.add(m);
+ if (buffer.size() == WINDOW_SIZE) {
+ Measurement computedAverage = computeAverage();
+ computedAverages.add(computedAverage);
+ buffer.subList(0, SLIDE_SIZE).clear();
+ }
+ }
+
+ @Override
+ public synchronized List readAllAndClean() {
+ List result = List.copyOf(computedAverages);
+ computedAverages.clear();
+ return result;
+ }
+
+ private Measurement computeAverage() {
+ double sum = 0;
+ for (Measurement measurement : buffer) {
+ sum += measurement.getValue();
+ }
+ double avg = sum / buffer.size();
+ return new Measurement(plantId, "CO2_AVG", avg, System.currentTimeMillis());
+ }
+}
\ No newline at end of file
diff --git a/desm-core/src/main/java/org/example/desm/common/model/AverageEmission.java b/desm-core/src/main/java/org/example/desm/common/model/AverageEmission.java
new file mode 100644
index 0000000..b0e8733
--- /dev/null
+++ b/desm-core/src/main/java/org/example/desm/common/model/AverageEmission.java
@@ -0,0 +1,14 @@
+package org.example.desm.common.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+public class AverageEmission {
+ private long startTimestamp;
+ private long endTimestamp;
+ private double average_co2;
+}
diff --git a/desm-core/src/main/java/org/example/desm/common/model/ErrorResponse.java b/desm-core/src/main/java/org/example/desm/common/model/ErrorResponse.java
new file mode 100644
index 0000000..6d9a706
--- /dev/null
+++ b/desm-core/src/main/java/org/example/desm/common/model/ErrorResponse.java
@@ -0,0 +1,14 @@
+package org.example.desm.common.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+public class ErrorResponse {
+ private int status;
+ private String error;
+ private String message;
+}
diff --git a/desm-core/src/main/java/org/example/desm/common/model/MeasurementsWrapper.java b/desm-core/src/main/java/org/example/desm/common/model/MeasurementsWrapper.java
new file mode 100644
index 0000000..2a45b3a
--- /dev/null
+++ b/desm-core/src/main/java/org/example/desm/common/model/MeasurementsWrapper.java
@@ -0,0 +1,16 @@
+package org.example.desm.common.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import org.example.desm.common.simulator.Measurement;
+
+import java.util.List;
+
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+public class MeasurementsWrapper {
+ private List measurements;
+}
diff --git a/desm-core/src/main/java/org/example/desm/common/model/PowerPlant.java b/desm-core/src/main/java/org/example/desm/common/model/PowerPlant.java
new file mode 100644
index 0000000..bba5b56
--- /dev/null
+++ b/desm-core/src/main/java/org/example/desm/common/model/PowerPlant.java
@@ -0,0 +1,14 @@
+package org.example.desm.common.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+public class PowerPlant {
+ private String id;
+ private String address;
+ private int port;
+}
diff --git a/desm-core/src/main/java/org/example/desm/common/model/PowerRequest.java b/desm-core/src/main/java/org/example/desm/common/model/PowerRequest.java
new file mode 100644
index 0000000..da5267e
--- /dev/null
+++ b/desm-core/src/main/java/org/example/desm/common/model/PowerRequest.java
@@ -0,0 +1,13 @@
+package org.example.desm.common.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+public class PowerRequest {
+ private int power;
+ private long timestamp;
+}
diff --git a/desm-core/src/main/java/org/example/desm/common/simulator/Buffer.java b/desm-core/src/main/java/org/example/desm/common/simulator/Buffer.java
new file mode 100644
index 0000000..e1c3414
--- /dev/null
+++ b/desm-core/src/main/java/org/example/desm/common/simulator/Buffer.java
@@ -0,0 +1,8 @@
+package org.example.desm.common.simulator;
+
+import java.util.List;
+
+public interface Buffer {
+ void addMeasurement(Measurement m);
+ List readAllAndClean();
+}
diff --git a/desm-core/src/main/java/org/example/desm/common/simulator/Measurement.java b/desm-core/src/main/java/org/example/desm/common/simulator/Measurement.java
new file mode 100644
index 0000000..4638e3a
--- /dev/null
+++ b/desm-core/src/main/java/org/example/desm/common/simulator/Measurement.java
@@ -0,0 +1,25 @@
+package org.example.desm.common.simulator;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@AllArgsConstructor
+@Data
+public class Measurement implements Comparable {
+ private String id;
+ private String type;
+ private double value;
+ private long timestamp;
+
+ @Override
+ public int compareTo(Measurement m) {
+ Long thisTimestamp = timestamp;
+ Long otherTimestamp = m.getTimestamp();
+ return thisTimestamp.compareTo(otherTimestamp);
+ }
+
+ @Override
+ public String toString(){
+ return value + " " + timestamp;
+ }
+}
diff --git a/desm-core/src/main/java/org/example/desm/common/simulator/PollutionSensor.java b/desm-core/src/main/java/org/example/desm/common/simulator/PollutionSensor.java
new file mode 100644
index 0000000..a4ac41e
--- /dev/null
+++ b/desm-core/src/main/java/org/example/desm/common/simulator/PollutionSensor.java
@@ -0,0 +1,37 @@
+package org.example.desm.common.simulator;
+
+public class PollutionSensor extends Simulator {
+ private static final int MEAN = 125000;
+ private static final int VARIANCE = 5000;
+
+ private static int ID = 1;
+
+ public PollutionSensor(String id, Buffer buffer) {
+ super(id, "CO2", buffer);
+ }
+
+ // Use this constructor to initialize the Pollution Sensor simulator in your project
+ public PollutionSensor(Buffer buffer) {
+ this("CO2-" + ID++, buffer);
+ }
+
+ @Override
+ public void run() {
+ double i = rnd.nextInt();
+ long waitingTime;
+ while(!stopCondition) {
+ double co2 = getCO2Value(i);
+ addMeasurement(co2);
+
+ waitingTime = 2000;
+ sensorSleep(waitingTime);
+
+ i += 0.2;
+ }
+ }
+
+ private double getCO2Value(double t) {
+ double gaussian = rnd.nextGaussian();
+ return MEAN + Math.sqrt(VARIANCE) * gaussian;
+ }
+}
diff --git a/desm-core/src/main/java/org/example/desm/common/simulator/Simulator.java b/desm-core/src/main/java/org/example/desm/common/simulator/Simulator.java
new file mode 100644
index 0000000..746cdfc
--- /dev/null
+++ b/desm-core/src/main/java/org/example/desm/common/simulator/Simulator.java
@@ -0,0 +1,48 @@
+package org.example.desm.common.simulator;
+
+import lombok.Getter;
+
+import java.util.Random;
+
+public abstract class Simulator extends Thread {
+ protected volatile boolean stopCondition = false;
+ protected Random rnd = new Random();
+ @Getter
+ private final Buffer buffer;
+ private final String id;
+ private final String type;
+
+ public Simulator(String id, String type, Buffer buffer) {
+ this.id = id;
+ this.type = type;
+ this.buffer = buffer;
+ }
+
+ public void stopMeGently() {
+ stopCondition = true;
+ }
+
+ protected void addMeasurement(double measurement) {
+ Measurement m = new Measurement(id, type, measurement, currentTime());
+ buffer.addMeasurement(m);
+ }
+
+ protected void sensorSleep(long milliseconds) {
+ try {
+ Thread.sleep(milliseconds);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public abstract void run();
+
+ private long currentTime() {
+ return System.currentTimeMillis();
+ }
+
+ public String getIdentifier() {
+ return id;
+ }
+}
+
diff --git a/desm-core/src/test/java/org/example/desm/common/MyReadWriteLockTest.java b/desm-core/src/test/java/org/example/desm/common/MyReadWriteLockTest.java
new file mode 100644
index 0000000..835e8fb
--- /dev/null
+++ b/desm-core/src/test/java/org/example/desm/common/MyReadWriteLockTest.java
@@ -0,0 +1,191 @@
+package org.example.desm.common;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+class MyReadWriteLockTest {
+ @Data
+ @AllArgsConstructor
+ private static class Value {
+ private int value;
+ }
+
+ MyReadWriteLock lock;
+ Value value;
+
+ final int initialValue = 0;
+
+ @BeforeEach
+ void setUp() {
+ lock = new MyReadWriteLock();
+ value = new Value(initialValue);
+ }
+
+ @Test
+ void multipleReaderSimultaneously() throws InterruptedException {
+ int readers = 10;
+ CountDownLatch latch = new CountDownLatch(readers);
+ CyclicBarrier barrier = new CyclicBarrier(readers);
+ Long[] readWaitTimes = new Long[readers];
+
+ for (int i = 0; i < readers; i++) {
+ int finalI = i;
+ new Thread(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ fail(e);
+ }
+ readOperation(() -> {
+ int read = value.getValue();
+ System.out.printf("%s read: %d%n", Thread.currentThread().getName(), read);
+ // Simulate some work.
+ randomSleep(100);
+ assertEquals(initialValue, read);
+ latch.countDown();
+ }, readWaitTimes, finalI);
+ }).start();
+ }
+
+ assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
+
+ System.out.println();
+ printStats("Read lock", Arrays.asList(readWaitTimes));
+ }
+
+ @Test
+ void multipleWritersSimultaneously() throws InterruptedException {
+ int writers = 10;
+ CountDownLatch latch = new CountDownLatch(writers);
+ CyclicBarrier barrier = new CyclicBarrier(writers);
+ Long[] writeWaitTimes = new Long[writers];
+
+ for (int i = 0; i < writers; i++) {
+ int finalI = i;
+ new Thread(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ fail(e);
+ }
+ writeOperation(() -> {
+ int val = value.getValue();
+ int write = val + 1;
+ // Simulate some work.
+ randomSleep(100);
+ value.setValue(write);
+ System.out.printf("%s write: %d%n", Thread.currentThread().getName(), write);
+ latch.countDown();
+ }, writeWaitTimes, finalI);
+ }).start();
+ }
+
+ assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
+ assertEquals(writers, value.getValue());
+
+ System.out.println();
+ printStats("Write lock", Arrays.asList(writeWaitTimes));
+ }
+
+ @Test
+ void multipleReaderAndWriterRandomArrivalTime() throws InterruptedException {
+ int readers = 10;
+ int writers = 10;
+ CountDownLatch readerLatch = new CountDownLatch(readers);
+ CountDownLatch writerLatch = new CountDownLatch(writers);
+ Long[] readWaitTimes = new Long[readers];
+ Long[] writeWaitTimes = new Long[writers];
+
+ for (int i = 0; i < readers; i++) {
+ int finalI = i;
+ new Thread(() -> {
+ // Random delay to simulate different arrival times of the threads.
+ randomSleep(1000);
+ readOperation(() -> {
+ int read = value.getValue();
+ int expected = (int) (writers - writerLatch.getCount());
+ System.out.printf("%s read: %d%n", Thread.currentThread().getName(), read);
+ // Simulate some work.
+ randomSleep(100);
+ assertEquals(expected, read);
+ readerLatch.countDown();
+ }, readWaitTimes, finalI);
+ }).start();
+ }
+
+ for (int i = 0; i < writers; i++) {
+ int finalI = i;
+ new Thread(() -> {
+ // Random delay to simulate different arrival times of the threads.
+ randomSleep(1000);
+ writeOperation(() -> {
+ int val = value.getValue();
+ int write = val + 1;
+ // Simulate some work.
+ randomSleep(100);
+ value.setValue(write);
+ System.out.printf("%s write: %d%n", Thread.currentThread().getName(), write);
+ writerLatch.countDown();
+ }, writeWaitTimes, finalI);
+ }).start();
+ }
+
+ assertTrue(readerLatch.await(10000, TimeUnit.MILLISECONDS));
+ assertTrue(writerLatch.await(10000, TimeUnit.MILLISECONDS));
+ assertEquals(writers, value.getValue());
+
+ System.out.println();
+ printStats("Read lock", Arrays.asList(readWaitTimes));
+ printStats("Write lock", Arrays.asList(writeWaitTimes));
+ }
+
+ private void readOperation(Runnable operation, Long[] readWaitTimes, int threadIndex) {
+ try {
+ long start = System.currentTimeMillis();
+ lock.lockRead();
+ long waited = System.currentTimeMillis() - start;
+ readWaitTimes[threadIndex] = waited;
+ System.out.printf("%s waited to acquire read lock: %dms%n", Thread.currentThread().getName(), waited);
+ operation.run();
+ lock.unlockRead();
+ } catch (InterruptedException ignored) {}
+ }
+
+ private void writeOperation(Runnable operation, Long[] writeWaitTimes, int threadIndex) {
+ try {
+ long start = System.currentTimeMillis();
+ lock.lockWrite();
+ long waited = System.currentTimeMillis() - start;
+ writeWaitTimes[threadIndex] = waited;
+ System.out.printf("%s waited to acquire write lock: %dms%n", Thread.currentThread().getName(), waited);
+ operation.run();
+ lock.unlockWrite();
+ } catch (InterruptedException ignored) {}
+ }
+
+ private void randomSleep(int maxSleepMs) {
+ try {
+ Thread.sleep((int) (Math.random() * maxSleepMs));
+ } catch (InterruptedException ignored) {}
+ }
+
+ private void printStats(String lockName, List waitTimes) {
+ long min = Collections.min(waitTimes);
+ long max = Collections.max(waitTimes);
+ double avg = waitTimes.stream().mapToLong(Long::longValue).average().orElse(0);
+ System.out.printf("%s wait times - min: %dms, max: %dms, avg: %.2fms%n", lockName, min, max, avg);
+ }
+}
\ No newline at end of file
diff --git a/desm-network/build.gradle b/desm-network/build.gradle
new file mode 100644
index 0000000..e69de29
diff --git a/desm-provider/build.gradle b/desm-provider/build.gradle
index d5c2f89..e69de29 100644
--- a/desm-provider/build.gradle
+++ b/desm-provider/build.gradle
@@ -1,5 +0,0 @@
-dependencies {
- implementation project(":desm-common")
- implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
- implementation 'com.google.code.gson:gson:2.13.1'
-}
diff --git a/desm-provider/src/main/java/org/example/RenewableEnergyProvider.java b/desm-provider/src/main/java/org/example/RenewableEnergyProvider.java
deleted file mode 100644
index d5d54d1..0000000
--- a/desm-provider/src/main/java/org/example/RenewableEnergyProvider.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package org.example;
-
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.example.handler.MqttExceptionHandler;
-
-import java.util.Scanner;
-
-import static org.example.Constant.ENERGY_REQ_TOPIC;
-import static org.example.Constant.MQTT_BROKER;
-
-public class RenewableEnergyProvider {
- private final String clientId;
- private final IMqttClient client;
-
- private static IMqttClient defaultMqttClient() throws MqttException {
- // Use MemoryPersistence to avoid creation of new directory in the root folder by Paho MQTT
- String clientId = MqttClient.generateClientId();
- return new MqttClient(MQTT_BROKER, clientId, new MemoryPersistence());
- }
-
- public RenewableEnergyProvider(IMqttClient client) {
- this.clientId = client.getClientId();
- this.client = client;
- }
-
- public RenewableEnergyProvider() throws MqttException {
- this(defaultMqttClient());
- }
-
- public void start() {
- try {
- // MQTT connection option
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(true);
-
- // Connect the client
- System.out.printf("[%s] Connecting to broker %s.%n", clientId, MQTT_BROKER);
- client.connect(options);
- System.out.printf("[%s] Connected!%n", clientId);
-
- // Create a new thread to send power requests at regular intervals
- PeriodicPowerRequestSimulator simulator =
- new PeriodicPowerRequestSimulator(client, ENERGY_REQ_TOPIC);
- simulator.start();
-
- System.out.println("\n**********************************");
- System.out.println("*** Press a random key to exit ***");
- System.out.println("**********************************\n");
- Scanner command = new Scanner(System.in);
- command.nextLine();
- System.out.println("Disconnecting...");
- client.disconnect();
- System.out.println("Disconnected!");
- // Interrupt the simulator thread
- simulator.interrupt();
- } catch (MqttException e) {
- System.out.printf("Error on client '%s'.%n", clientId);
- MqttExceptionHandler.printErrorInfo(e);
- }
- }
-
- public static void main(String[] args) {
- try {
- RenewableEnergyProvider provider = new RenewableEnergyProvider();
- provider.start();
- } catch (MqttException e) {
- System.out.println("Initialization error in RenewableEnergyProvider!");
- MqttExceptionHandler.printErrorInfo(e);
- }
- }
-}
diff --git a/desm-provider/src/main/java/org/example/PeriodicPowerRequestSimulator.java b/desm-provider/src/main/java/org/example/desm/provider/PeriodicPowerRequestSimulator.java
similarity index 55%
rename from desm-provider/src/main/java/org/example/PeriodicPowerRequestSimulator.java
rename to desm-provider/src/main/java/org/example/desm/provider/PeriodicPowerRequestSimulator.java
index db90aee..5a93ff9 100644
--- a/desm-provider/src/main/java/org/example/PeriodicPowerRequestSimulator.java
+++ b/desm-provider/src/main/java/org/example/desm/provider/PeriodicPowerRequestSimulator.java
@@ -1,14 +1,17 @@
-package org.example;
+package org.example.desm.provider;
import com.google.gson.Gson;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.example.dto.PowerRequest;
-import org.example.handler.MqttExceptionHandler;
+import org.example.desm.common.model.PowerRequest;
import java.util.Random;
+/**
+ * This class simulates a periodic power request simulator that sends power requests
+ * to a specified MQTT topic at regular intervals.
+ */
public class PeriodicPowerRequestSimulator extends Thread {
private static final int DEFAULT_INTERVAL_MS = 10000;
private static final int DEFAULT_QOS = 2;
@@ -23,6 +26,14 @@ public class PeriodicPowerRequestSimulator extends Thread {
private final int qos;
private final int intervalMs;
+ /**
+ * Constructs {@link PeriodicPowerRequestSimulator} with custom parameters.
+ *
+ * @param client the MQTT client used to publish messages.
+ * @param topic the topic to publish energy requests to.
+ * @param qos the MQTT Quality of Service level.
+ * @param intervalMs the interval between requests in milliseconds.
+ */
public PeriodicPowerRequestSimulator(
IMqttClient client,
String topic,
@@ -35,10 +46,20 @@ public PeriodicPowerRequestSimulator(
this.intervalMs = intervalMs;
}
+ /**
+ * Constructs {@link PeriodicPowerRequestSimulator} with default parameters (QoS 2, interval 10 seconds).
+ *
+ * @param client the MQTT client used to publish messages.
+ * @param topic the topic to publish energy requests to.
+ */
public PeriodicPowerRequestSimulator(IMqttClient client, String topic) {
this(client, topic, DEFAULT_QOS, DEFAULT_INTERVAL_MS);
}
+ /**
+ * Continuously generates and publishes power requests until the thread is interrupted
+ * or the MQTT client disconnects.
+ */
@Override
public void run() {
while (client.isConnected()) {
@@ -47,10 +68,19 @@ public void run() {
}
}
+ /**
+ * Generates a random power request amount between {@code MIN_POWER} and {@code MAX_POWER}.
+ *
+ * @return a random power request amount.
+ */
private int getPowerRequestAmount() {
return random.nextInt((MAX_POWER - MIN_POWER) + 1) + MIN_POWER;
}
+ /**
+ * Generates a power request with a random power amount and current timestamp,
+ * then sends it to the configured topic.
+ */
private void task() {
int power = getPowerRequestAmount();
long timestamp = System.currentTimeMillis();
@@ -58,14 +88,23 @@ private void task() {
sendPowerRequest(powerRequest);
}
+ /**
+ * Sleeps for the specified interval in milliseconds.
+ */
private void sleeping() {
try {
Thread.sleep(intervalMs);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
System.out.println("Power request simulator interrupted.");
}
}
+ /**
+ * Sends an energy request message as a JSON object to the configured topic.
+ *
+ * @param powerRequest the power request containing the power amount and timestamp.
+ */
private void sendPowerRequest(PowerRequest powerRequest) {
String powerRequestJson = gson.toJson(powerRequest);
MqttMessage message = new MqttMessage(powerRequestJson.getBytes());
@@ -79,15 +118,11 @@ private void sendPowerRequest(PowerRequest powerRequest) {
);
try {
client.publish(topic, message);
- System.out.printf(
- "[%s] Published on topic %s the message: %s%n",
- client.getClientId(),
- topic,
- powerRequestJson
- );
+ System.out.printf("[%s] Published successfully.%n", client.getClientId());
} catch (MqttException e) {
- System.out.printf("Error publishing message: %s%n", e.getMessage());
- MqttExceptionHandler.printErrorInfo(e);
+ System.out.printf("[%s] Error publishing message.", client.getClientId());
+ System.out.printf("Message: %s%n", e.getMessage());
+ System.out.printf("Cause: %s%n", e.getCause());
}
}
}
diff --git a/desm-provider/src/main/java/org/example/desm/provider/RenewableEnergyProvider.java b/desm-provider/src/main/java/org/example/desm/provider/RenewableEnergyProvider.java
new file mode 100644
index 0000000..761e25a
--- /dev/null
+++ b/desm-provider/src/main/java/org/example/desm/provider/RenewableEnergyProvider.java
@@ -0,0 +1,97 @@
+package org.example.desm.provider;
+
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.util.Scanner;
+
+import static org.example.desm.common.Constant.TOPIC_ENERGY_REQUEST;
+import static org.example.desm.common.Constant.MQTT_BROKER_ADDRESS;
+
+/**
+ * Renewable energy provider in the DESM system.
+ * Periodically publishes energy requests to an MQTT topic.
+ *
+ *
+ * This class establishes an MQTT connection and starts a simulator thread
+ * to broadcast power requests every 10 seconds.
+ *
+ */
+public class RenewableEnergyProvider {
+ private final IMqttClient client;
+
+ /**
+ * Constructs a {@link RenewableEnergyProvider} with a given MQTT client.
+ *
+ * @param client the MQTT client instance.
+ */
+ public RenewableEnergyProvider(IMqttClient client) {
+ this.client = client;
+ }
+
+ /**
+ * Constructs a {@link RenewableEnergyProvider} using a default MQTT client.
+ *
+ * @throws MqttException if the MQTT client cannot be created.
+ */
+ public RenewableEnergyProvider() throws MqttException {
+ String clientId = MqttClient.generateClientId();
+ this.client = new MqttClient(MQTT_BROKER_ADDRESS, clientId, new MemoryPersistence());
+ }
+
+ /**
+ * Starts the provider: connects to MQTT broker and begins publishing power requests.
+ * Waits for user input to terminate and cleanly shuts down the simulator and connection.
+ */
+ public void start() {
+ try {
+ // MQTT connection option
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setCleanSession(true);
+ options.setAutomaticReconnect(true);
+
+ // Connect the client
+ System.out.printf("[%s] Connecting to broker %s...%n", client.getClientId(), MQTT_BROKER_ADDRESS);
+ client.connect(options);
+ System.out.printf("[%s] Connected!%n", client.getClientId());
+
+ // Create a new thread to send power requests at regular intervals
+ PeriodicPowerRequestSimulator simulator =
+ new PeriodicPowerRequestSimulator(client, TOPIC_ENERGY_REQUEST);
+ simulator.start();
+
+ System.out.println("\n**********************************");
+ System.out.println("*** Press a random key to exit ***");
+ System.out.println("**********************************\n");
+ Scanner command = new Scanner(System.in);
+ command.nextLine();
+ System.out.println("Disconnecting...");
+ client.disconnect();
+ System.out.println("Disconnected!");
+
+ // Wait termination of the simulator thread
+ simulator.join();
+ } catch (MqttException e) {
+ System.out.printf("[%s] Error on client.%n", client.getClientId());
+ System.out.printf("Message: %s%n", e.getMessage());
+ System.out.printf("Cause: %s%n", e.getCause());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ System.out.println("Power request simulator interrupted.");
+ }
+ }
+
+ public static void main(String[] args) {
+ try {
+ RenewableEnergyProvider provider = new RenewableEnergyProvider();
+ provider.start();
+ } catch (MqttException e) {
+ System.out.println("Initialization error in RenewableEnergyProvider.");
+ System.out.printf("Message: %s%n", e.getMessage());
+ System.out.printf("Cause: %s%n", e.getCause());
+ }
+ }
+}
diff --git a/desm-provider/src/test/java/org/example/PeriodicPowerRequestSimulatorTest.java b/desm-provider/src/test/java/org/example/desm/provider/PeriodicPowerRequestSimulatorTest.java
similarity index 80%
rename from desm-provider/src/test/java/org/example/PeriodicPowerRequestSimulatorTest.java
rename to desm-provider/src/test/java/org/example/desm/provider/PeriodicPowerRequestSimulatorTest.java
index 5889c8d..f8db2fc 100644
--- a/desm-provider/src/test/java/org/example/PeriodicPowerRequestSimulatorTest.java
+++ b/desm-provider/src/test/java/org/example/desm/provider/PeriodicPowerRequestSimulatorTest.java
@@ -1,18 +1,16 @@
-package org.example;
+package org.example.desm.provider;
import com.google.gson.Gson;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.example.dto.PowerRequest;
-import org.example.handler.MqttExceptionHandler;
+import org.example.desm.common.model.PowerRequest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
-import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.*;
@@ -21,7 +19,6 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -43,11 +40,12 @@ void setUp() {
}
@Test
- void testRunPublishesMessage() throws Exception {
+ void testRunPublishesMessage() throws MqttException, InterruptedException {
// Mock client isConnected to iterate while loop only once
when(client.isConnected()).thenReturn(true, false);
- simulator.run();
+ simulator.start();
+ simulator.join();
// Capture the message published to MQTT
verify(client, atLeastOnce()).publish(eq(topic), messageCaptor.capture());
@@ -71,7 +69,7 @@ void testRunPublishesMessage() throws Exception {
}
@Test
- void testRunException() throws MqttException {
+ void testRunException() throws MqttException, InterruptedException {
// Mock client isConnected to iterate while loop only once
when(client.isConnected()).thenReturn(true, false);
@@ -80,12 +78,10 @@ void testRunException() throws MqttException {
.when(client)
.publish(anyString(), any(MqttMessage.class));
+ simulator.start();
+ simulator.join();
- try (MockedStatic mockedHandler = mockStatic(MqttExceptionHandler.class)) {
- simulator.run();
-
- // Check if the exception handler was called
- mockedHandler.verify(() -> MqttExceptionHandler.printErrorInfo(any(MqttException.class)));
- }
+ // Verify that publish was called at least once
+ verify(client, atLeastOnce()).publish(anyString(), any(MqttMessage.class));
}
}
\ No newline at end of file
diff --git a/desm-provider/src/test/java/org/example/RenewableEnergyProviderTest.java b/desm-provider/src/test/java/org/example/desm/provider/RenewableEnergyProviderTest.java
similarity index 61%
rename from desm-provider/src/test/java/org/example/RenewableEnergyProviderTest.java
rename to desm-provider/src/test/java/org/example/desm/provider/RenewableEnergyProviderTest.java
index 0921634..74285e1 100644
--- a/desm-provider/src/test/java/org/example/RenewableEnergyProviderTest.java
+++ b/desm-provider/src/test/java/org/example/desm/provider/RenewableEnergyProviderTest.java
@@ -1,64 +1,66 @@
-package org.example;
+package org.example.desm.provider;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
-import org.example.handler.MqttExceptionHandler;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
-import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
-import static org.example.Constant.ENERGY_REQ_TOPIC;
+import static org.example.desm.common.Constant.TOPIC_ENERGY_REQUEST;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mockConstruction;
-import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
class RenewableEnergyProviderTest {
@Mock
- private IMqttClient client;
+ IMqttClient mqttClient;
- private RenewableEnergyProvider provider;
+ RenewableEnergyProvider provider;
@BeforeEach
void setUp() {
- provider = new RenewableEnergyProvider(client);
+ provider = new RenewableEnergyProvider(mqttClient);
}
@Test
- public void testStartSuccessfulConnection() throws MqttException {
+ public void testStartSuccessfulConnection() throws MqttException, InterruptedException {
+ // Mock the System.in to simulate user input
System.setIn(new java.io.ByteArrayInputStream("\n".getBytes()));
try (MockedConstruction construction =
mockConstruction(PeriodicPowerRequestSimulator.class, (mock, context) -> {
- assertEquals(client, context.arguments().get(0));
- assertEquals(ENERGY_REQ_TOPIC, context.arguments().get(1));
+ assertEquals(mqttClient, context.arguments().get(0));
+ assertEquals(TOPIC_ENERGY_REQUEST, context.arguments().get(1));
})) {
provider.start();
+ PeriodicPowerRequestSimulator simulator = construction.constructed().get(0);
+
// Verify that connect is called
- verify(client).connect(any(MqttConnectOptions.class));
+ verify(mqttClient).connect(any(MqttConnectOptions.class));
// Verify that exactly one simulator was created
assertEquals(1, construction.constructed().size());
- PeriodicPowerRequestSimulator simulator = construction.constructed().get(0);
-
// Verify that the simulator's start method was called
verify(simulator).start();
// Verify that the disconnect method was called on the client
- verify(client).disconnect();
+ verify(mqttClient).disconnect();
- // Verify that the simulator's interrupt method was called
- verify(simulator).interrupt();
+ // Verify that the simulator's join method was called
+ verify(simulator).join();
+
+ // Verify that the simulator is not still alive after the user input
+ assertFalse(simulator.isAlive());
}
}
@@ -68,21 +70,16 @@ public void testStartConnectionFailure() throws MqttException {
mockConstruction(PeriodicPowerRequestSimulator.class)) {
// Configure the mock to throw an exception when connect is called
doThrow(new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION))
- .when(client)
+ .when(mqttClient)
.connect(any(MqttConnectOptions.class));
- try(MockedStatic mockedHandler = mockStatic(MqttExceptionHandler.class)) {
- provider.start();
-
- // Verify that connect was attempted
- verify(client).connect(any(MqttConnectOptions.class));
+ provider.start();
- // Verify that no simulator was created
- assertEquals(0, construction.constructed().size());
+ // Verify that connect was attempted
+ verify(mqttClient).connect(any(MqttConnectOptions.class));
- // Check if the exception handler was called
- mockedHandler.verify(() -> MqttExceptionHandler.printErrorInfo(any(MqttException.class)));
- }
+ // Verify that no simulator was created
+ assertEquals(0, construction.constructed().size());
}
}
}
\ No newline at end of file
diff --git a/desm-server/build.gradle b/desm-server/build.gradle
new file mode 100644
index 0000000..73c9143
--- /dev/null
+++ b/desm-server/build.gradle
@@ -0,0 +1,11 @@
+plugins {
+ id 'org.springframework.boot' version '3.4.5'
+ id 'io.spring.dependency-management' version '1.1.7'
+}
+
+dependencies {
+ // Spring Boot dependencies
+ implementation 'org.springframework.boot:spring-boot-starter-web'
+ testImplementation 'org.springframework.boot:spring-boot-starter-test'
+ testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/DesmApplication.java b/desm-server/src/main/java/org/example/desm/server/DesmApplication.java
new file mode 100644
index 0000000..20cb960
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/DesmApplication.java
@@ -0,0 +1,13 @@
+package org.example.desm.server;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.web.bind.annotation.RestController;
+
+@SpringBootApplication
+@RestController
+public class DesmApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(DesmApplication.class, args);
+ }
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/GlobalControllerAdvice.java b/desm-server/src/main/java/org/example/desm/server/GlobalControllerAdvice.java
new file mode 100644
index 0000000..9ee61e1
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/GlobalControllerAdvice.java
@@ -0,0 +1,25 @@
+package org.example.desm.server;
+
+import org.example.desm.server.exception.ApiExceptions;
+import org.example.desm.common.model.ErrorResponse;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.RestControllerAdvice;
+
+/**
+ * Global exception handler for the application.
+ * This class handles exceptions thrown by controllers and returns appropriate error responses.
+ */
+@RestControllerAdvice
+public class GlobalControllerAdvice {
+ /**
+ * Handles {@link ApiExceptions} thrown by controllers.
+ *
+ * @param ex the {@link ApiExceptions} instance.
+ * @return a {@link ResponseEntity} containing the error response.
+ */
+ @ExceptionHandler(ApiExceptions.class)
+ public ResponseEntity plantAlreadyExistHandler(ApiExceptions ex) {
+ return ResponseEntity.status(ex.getStatusCode()).body(ex.buildResponse());
+ }
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/component/PollutionSubscriber.java b/desm-server/src/main/java/org/example/desm/server/component/PollutionSubscriber.java
new file mode 100644
index 0000000..25aa5c9
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/component/PollutionSubscriber.java
@@ -0,0 +1,125 @@
+package org.example.desm.server.component;
+
+import org.example.desm.server.service.PollutionService;
+import com.google.gson.Gson;
+import jakarta.annotation.PostConstruct;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.example.desm.common.model.MeasurementsWrapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import static org.example.desm.common.Constant.TOPIC_CO2_MEASUREMENT;
+import static org.example.desm.common.Constant.MQTT_BROKER_ADDRESS;
+
+/**
+ * Subscriber component that listens for pollution measurements from the MQTT broker.
+ * It processes incoming messages and registers the measurements using the {@link PollutionService}.
+ *
+ * This component implements the {@link MqttCallback} interface to handle MQTT callback listener.
+ */
+@Component
+public class PollutionSubscriber implements MqttCallback {
+ private static final int DEFAULT_QOS = 2;
+
+ private final Gson gson = new Gson();
+
+ private final PollutionService pollutionService;
+ private final IMqttClient client;
+
+ /**
+ * Default constructor for the {@link PollutionSubscriber} component used for testing.
+ *
+ * @param pollutionService the service to register pollution measurements.
+ * @param client the MQTT client to connect to the broker.
+ */
+ public PollutionSubscriber(PollutionService pollutionService, IMqttClient client) {
+ this.pollutionService = pollutionService;
+ this.client = client;
+ }
+
+ /**
+ * {@link Autowired} {@link PollutionSubscriber} constructor for Spring's dependency injection.
+ *
+ * @param pollutionService the service to register pollution measurements.
+ * @throws MqttException if an error occurs while creating the MQTT client.
+ */
+ @Autowired
+ public PollutionSubscriber(PollutionService pollutionService) throws MqttException {
+ this.pollutionService = pollutionService;
+ String clientId = MqttClient.generateClientId();
+ // Use MemoryPersistence to avoid creation of new directory in the root folder by Paho MQTT
+ this.client = new MqttClient(MQTT_BROKER_ADDRESS, clientId, new MemoryPersistence());
+ }
+
+ /**
+ * Initializes the subscriber by connecting to the MQTT broker and subscribing to the Co2 measurement topic.
+ * This method is called after the bean has been created and dependencies have been injected.
+ */
+ @PostConstruct
+ public void init() {
+ try {
+ connect();
+ subscribe();
+ } catch (MqttException e) {
+ System.err.printf("[%s] Error on connecting or subscribing.", client.getClientId());
+ System.out.printf("Message: %s%n", e.getMessage());
+ System.out.printf("Cause: %s%n", e.getCause());
+ }
+ }
+
+ /**
+ * Connects to the MQTT broker with the specified options.
+ *
+ * @throws MqttException if an error occurs while connecting to the broker.
+ */
+ private void connect() throws MqttException {
+ // MQTT connection option
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setCleanSession(true);
+ options.setAutomaticReconnect(true);
+
+ // Connect the client
+ System.out.printf("[%s] Connecting to broker %s...%n", client.getClientId(), MQTT_BROKER_ADDRESS);
+ // Set the callback for the client
+ client.setCallback(this);
+ client.connect(options);
+ System.out.printf("[%s] Connected!%n", client.getClientId());
+ }
+
+ /**
+ * Subscribes to the Co2 measurement topic.
+ *
+ * @throws MqttException if an error occurs while subscribing to the topic.
+ */
+ private void subscribe() throws MqttException {
+ // Subscribe to the CO2 measurement topic
+ System.out.printf("[%s] Subscribing to topic %s...%n", client.getClientId(), TOPIC_CO2_MEASUREMENT);
+ client.subscribe(TOPIC_CO2_MEASUREMENT, DEFAULT_QOS);
+ System.out.printf("[%s] Subscribed successfully.%n", client.getClientId());
+ }
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ System.err.printf("[%s] Connection lost!%n", client.getClientId());
+ // Automatic reconnect is enabled, so no need to handle reconnection
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ String payload = new String(message.getPayload());
+ MeasurementsWrapper measurements = gson.fromJson(payload, MeasurementsWrapper.class);
+ pollutionService.registerMeasurements(measurements.getMeasurements());
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ // Not used in subscriber
+ }
+}
\ No newline at end of file
diff --git a/desm-server/src/main/java/org/example/desm/server/controller/PollutionController.java b/desm-server/src/main/java/org/example/desm/server/controller/PollutionController.java
new file mode 100644
index 0000000..d61d015
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/controller/PollutionController.java
@@ -0,0 +1,41 @@
+package org.example.desm.server.controller;
+
+import org.example.desm.server.service.PollutionService;
+import org.example.desm.common.model.AverageEmission;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import static org.example.desm.common.Constant.ENDPOINT_POLLUTION;
+
+/**
+ * Controller for handling pollution related requests.
+ */
+@RestController
+@RequestMapping(ENDPOINT_POLLUTION)
+public class PollutionController {
+ private final PollutionService pollutionService;
+
+ public PollutionController(PollutionService pollutionService) {
+ this.pollutionService = pollutionService;
+ }
+
+ /**
+ * Endpoint to get the average emission level between two timestamps.
+ *
+ * @param t1 Start timestamp
+ * @param t2 End timestamp
+ * @return the average emission data
+ */
+ @GetMapping
+ public ResponseEntity getAvgEmissionLevel(
+ @RequestParam long t1,
+ @RequestParam long t2
+ ) {
+ double average = pollutionService.getAvgEmissionLevel(t1, t2);
+ AverageEmission response = new AverageEmission(t1, t2, average);
+ return ResponseEntity.ok(response);
+ }
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/controller/PowerPlantController.java b/desm-server/src/main/java/org/example/desm/server/controller/PowerPlantController.java
new file mode 100644
index 0000000..fc73b41
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/controller/PowerPlantController.java
@@ -0,0 +1,55 @@
+package org.example.desm.server.controller;
+
+import org.example.desm.server.service.PowerPlantService;
+import org.example.desm.common.model.PowerPlant;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+import static org.example.desm.common.Constant.ENDPOINT_PLANTS;
+import static org.example.desm.common.Constant.ENDPOINT_REGISTER_PLANT;
+
+/**
+ * Controller for handling power plant related requests.
+ */
+@RestController
+@RequestMapping(ENDPOINT_PLANTS)
+public class PowerPlantController {
+ private final PowerPlantService powerPlantService;
+
+ public PowerPlantController(
+ PowerPlantService powerPlantService
+ ) {
+ this.powerPlantService = powerPlantService;
+ }
+
+ /**
+ * Endpoint to retrieve all registered power plants.
+ *
+ * @return a list of all power plants
+ */
+ @GetMapping
+ public ResponseEntity> getAllPlants() {
+ List plants = powerPlantService.getAllPlants();
+ return ResponseEntity.ok(plants);
+ }
+
+ /**
+ * Endpoint to register a new power plant.
+ *
+ * @param plant the power plant to register
+ * @return the updated list of all power plants
+ */
+ @PostMapping(ENDPOINT_REGISTER_PLANT)
+ public ResponseEntity> registerPlant(
+ @RequestBody PowerPlant plant
+ ) {
+ powerPlantService.registerPlant(plant);
+ return ResponseEntity.ok(powerPlantService.getAllPlants());
+ }
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/exception/ApiExceptions.java b/desm-server/src/main/java/org/example/desm/server/exception/ApiExceptions.java
new file mode 100644
index 0000000..a1cbe2a
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/exception/ApiExceptions.java
@@ -0,0 +1,43 @@
+package org.example.desm.server.exception;
+
+import org.example.desm.common.model.ErrorResponse;
+import org.springframework.http.HttpStatus;
+
+/**
+ * Base class for all API exceptions.
+ * This class extends {@link RuntimeException} and provides a method to build an {@link ErrorResponse} object.
+ * It also defines abstract methods to get the HTTP status code and error message.
+ */
+public abstract class ApiExceptions extends RuntimeException {
+ public ApiExceptions(String message) {
+ super(message);
+ }
+
+ /**
+ * Builds an {@link ErrorResponse} object using the status code, error message, and
+ * exception message.
+ *
+ * @return an {@link ErrorResponse} object.
+ */
+ public ErrorResponse buildResponse() {
+ return new ErrorResponse(
+ getStatusCode().value(),
+ getError(),
+ getMessage()
+ );
+ }
+
+ /**
+ * Method to get the HTTP status code.
+ *
+ * @return the HTTP status code.
+ */
+ public abstract HttpStatus getStatusCode();
+
+ /**
+ * Method to get the error message.
+ *
+ * @return the error message.
+ */
+ public abstract String getError();
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/exception/PollutionReadOperationException.java b/desm-server/src/main/java/org/example/desm/server/exception/PollutionReadOperationException.java
new file mode 100644
index 0000000..71aecf1
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/exception/PollutionReadOperationException.java
@@ -0,0 +1,10 @@
+package org.example.desm.server.exception;
+
+/**
+ * Custom {@link ReadOperationException} for handling errors related to pollution read operations.
+ */
+public class PollutionReadOperationException extends ReadOperationException {
+ public PollutionReadOperationException() {
+ super("Unable to retrieve pollution data at this time.");
+ }
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/exception/PollutionWriteOperationException.java b/desm-server/src/main/java/org/example/desm/server/exception/PollutionWriteOperationException.java
new file mode 100644
index 0000000..a29cfbf
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/exception/PollutionWriteOperationException.java
@@ -0,0 +1,10 @@
+package org.example.desm.server.exception;
+
+/**
+ * Custom {@link WriteOperationException} for handling errors related to pollution write operations.
+ */
+public class PollutionWriteOperationException extends WriteOperationException {
+ public PollutionWriteOperationException() {
+ super("Unable to save the pollution data at this time.");
+ }
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/exception/PowerPlantAlreadyExistsException.java b/desm-server/src/main/java/org/example/desm/server/exception/PowerPlantAlreadyExistsException.java
new file mode 100644
index 0000000..7d96844
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/exception/PowerPlantAlreadyExistsException.java
@@ -0,0 +1,22 @@
+package org.example.desm.server.exception;
+
+import org.springframework.http.HttpStatus;
+
+/**
+ * Custom {@link ApiExceptions} for handling the case when a power plant already exists.
+ */
+public class PowerPlantAlreadyExistsException extends ApiExceptions {
+ public PowerPlantAlreadyExistsException(String id) {
+ super("Power plant with ID " + id + " already exists.");
+ }
+
+ @Override
+ public HttpStatus getStatusCode() {
+ return HttpStatus.CONFLICT;
+ }
+
+ @Override
+ public String getError() {
+ return "Plant Already Exists";
+ }
+}
\ No newline at end of file
diff --git a/desm-server/src/main/java/org/example/desm/server/exception/PowerPlantReadOperationException.java b/desm-server/src/main/java/org/example/desm/server/exception/PowerPlantReadOperationException.java
new file mode 100644
index 0000000..a0ead9c
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/exception/PowerPlantReadOperationException.java
@@ -0,0 +1,10 @@
+package org.example.desm.server.exception;
+
+/**
+ * Custom {@link ReadOperationException} for handling errors related to power plant read operations.
+ */
+public class PowerPlantReadOperationException extends ReadOperationException {
+ public PowerPlantReadOperationException() {
+ super("Unable to retrieve power plant data at this time.");
+ }
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/exception/PowerPlantWriteOperationException.java b/desm-server/src/main/java/org/example/desm/server/exception/PowerPlantWriteOperationException.java
new file mode 100644
index 0000000..1e81d6a
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/exception/PowerPlantWriteOperationException.java
@@ -0,0 +1,10 @@
+package org.example.desm.server.exception;
+
+/**
+ * Custom {@link WriteOperationException} for handling errors related to power plant write operations.
+ */
+public class PowerPlantWriteOperationException extends WriteOperationException {
+ public PowerPlantWriteOperationException() {
+ super("Unable to register the power plant at this time.");
+ }
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/exception/ReadOperationException.java b/desm-server/src/main/java/org/example/desm/server/exception/ReadOperationException.java
new file mode 100644
index 0000000..f9c0c8b
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/exception/ReadOperationException.java
@@ -0,0 +1,22 @@
+package org.example.desm.server.exception;
+
+import org.springframework.http.HttpStatus;
+
+/**
+ * Abstract custom {@link ApiExceptions} for handling errors related to read operations.
+ */
+public abstract class ReadOperationException extends ApiExceptions {
+ public ReadOperationException(String message) {
+ super(message);
+ }
+
+ @Override
+ public HttpStatus getStatusCode() {
+ return HttpStatus.SERVICE_UNAVAILABLE;
+ }
+
+ @Override
+ public String getError() {
+ return "Read Error";
+ }
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/exception/WriteOperationException.java b/desm-server/src/main/java/org/example/desm/server/exception/WriteOperationException.java
new file mode 100644
index 0000000..5770b4a
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/exception/WriteOperationException.java
@@ -0,0 +1,22 @@
+package org.example.desm.server.exception;
+
+import org.springframework.http.HttpStatus;
+
+/**
+ * Abstract custom {@link ApiExceptions} for handling errors related to write operations.
+ */
+public abstract class WriteOperationException extends ApiExceptions {
+ public WriteOperationException(String message) {
+ super(message);
+ }
+
+ @Override
+ public HttpStatus getStatusCode() {
+ return HttpStatus.SERVICE_UNAVAILABLE;
+ }
+
+ @Override
+ public String getError() {
+ return "Write Error";
+ }
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/service/PollutionService.java b/desm-server/src/main/java/org/example/desm/server/service/PollutionService.java
new file mode 100644
index 0000000..7af7567
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/service/PollutionService.java
@@ -0,0 +1,100 @@
+package org.example.desm.server.service;
+
+import org.example.desm.common.MyReadWriteLock;
+import org.example.desm.server.exception.PollutionReadOperationException;
+import org.example.desm.server.exception.PollutionWriteOperationException;
+import org.example.desm.common.simulator.Measurement;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Service class responsible for managing pollution measurements.
+ *
+ * Internally uses a {@link TreeMap} to store {@link Measurement} instances grouped by timestamp.
+ * To ensure thread safety in a concurrent environment, a custom {@link MyReadWriteLock}
+ * is used instead of the {@code synchronized} statement on class methods.
+ *
+ * The read-write lock improves concurrency by allowing multiple threads to access the read method
+ * simultaneously, while still preventing data races during writes. This is more efficient than
+ * synchronizing all methods, which would force threads to wait even when only reading data.
+ */
+@Service
+public class PollutionService {
+ /**
+ * Uses a {@link TreeMap} to store measurements by their timestamp.
+ * This allows efficient range queries for time-based statistics such as average emission levels.
+ */
+ private final TreeMap> measurements = new TreeMap<>();
+
+ /**
+ * Custom ReadWriteLock used to ensure thread safety and more and more consistency.
+ *
+ * Read operations can happen concurrently, while write operations are exclusive.
+ * This avoids issues like {@link java.util.ConcurrentModificationException}
+ * when reading from the list during concurrent modifications.
+ */
+ private final MyReadWriteLock lock = new MyReadWriteLock();
+
+ /**
+ * Return the average emission level for a given time interval.
+ *
+ * Uses a read lock to ensure that the measurements map is not being modified during read access,
+ * preventing possible {@link java.util.ConcurrentModificationException}.
+ *
+ * @param t1 start of the time interval (inclusive)
+ * @param t2 end of the time interval (inclusive)
+ * @return the average emission level in the specified time interval
+ * @throws PollutionReadOperationException if the reading thread is interrupted.
+ */
+ public double getAvgEmissionLevel(long t1, long t2) {
+ try {
+ lock.lockRead();
+ // Either t1 or t2 is considered inclusive
+ NavigableMap> subMap =
+ measurements.subMap(t1, true, t2, true);
+ double sum = 0;
+ int count = 0;
+ for (List measurementsAtTime : subMap.values()) {
+ for (Measurement m : measurementsAtTime) {
+ sum += m.getValue();
+ count++;
+ }
+ }
+ return count == 0 ? 0 : sum / count;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PollutionReadOperationException();
+ } finally {
+ lock.unlockRead();
+ }
+ }
+
+ /**
+ * Registers a new pollution measurements in the map.
+ *
+ * Uses a write lock to ensure exclusive access while modifying the internal list.
+ *
+ * @param newMeasurements the list of pollution measurements to register.
+ * @throws PollutionWriteOperationException if the writing thread is interrupted.
+ */
+ public void registerMeasurements(List newMeasurements) {
+ try {
+ lock.lockWrite();
+ for (Measurement m : newMeasurements) {
+ // Use computeIfAbsent to ensure that we create a new list only if it doesn't exist
+ measurements
+ .computeIfAbsent(m.getTimestamp(), k -> new ArrayList<>())
+ .add(m);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PollutionWriteOperationException();
+ } finally {
+ lock.unlockWrite();
+ }
+ }
+}
diff --git a/desm-server/src/main/java/org/example/desm/server/service/PowerPlantService.java b/desm-server/src/main/java/org/example/desm/server/service/PowerPlantService.java
new file mode 100644
index 0000000..3ef676a
--- /dev/null
+++ b/desm-server/src/main/java/org/example/desm/server/service/PowerPlantService.java
@@ -0,0 +1,97 @@
+package org.example.desm.server.service;
+
+import org.example.desm.server.exception.PowerPlantAlreadyExistsException;
+import org.example.desm.server.exception.PowerPlantReadOperationException;
+import org.example.desm.server.exception.ReadOperationException;
+import org.example.desm.server.exception.PowerPlantWriteOperationException;
+import org.example.desm.common.model.PowerPlant;
+import org.example.desm.common.MyReadWriteLock;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Service class responsible for managing power plants.
+ *
+ * Internally uses a {@link Map} to store {@link PowerPlant} instances by their unique ID.
+ * To ensure thread safety in a concurrent environment, a custom {@link MyReadWriteLock}
+ * is used instead of the {@code synchronized} statement on class methods.
+ *
+ * The read-write lock improves concurrency by allowing multiple threads to access the read method
+ * simultaneously, while still preventing data races during writes. This is more efficient than
+ * synchronizing all methods, which would force threads to wait even when only reading data.
+ */
+@Service
+public class PowerPlantService {
+ /**
+ * Uses a {@link Map} to store power plants by their unique IDs.
+ * This allows efficient retrieval and management of power plants.
+ */
+ private final Map plants = new HashMap<>();
+
+ /**
+ * Custom ReadWriteLock used to ensure thread safety and more consistency.
+ *
+ * Read operations can happen concurrently, while write operations are exclusive.
+ * This avoids issues like {@link java.util.ConcurrentModificationException}
+ * when reading from the map during concurrent modifications.
+ */
+ private final MyReadWriteLock lock = new MyReadWriteLock();
+
+ /**
+ * Return the list of all registered power plants.
+ *
+ * Uses a read lock to ensure that the map is not being modified during read access,
+ * preventing possible {@link java.util.ConcurrentModificationException}.
+ *
+ * @return an immutable snapshot list of all current power plants.
+ * @throws ReadOperationException if the reading thread is interrupted.
+ */
+ public List getAllPlants() {
+ try {
+ lock.lockRead();
+ return getAllPlantsUnsafe();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PowerPlantReadOperationException();
+ } finally {
+ lock.unlockRead();
+ }
+ }
+
+ /**
+ * Registers a new power plant in the system.
+ *
+ * Uses a write lock to ensure exclusive access while modifying the internal map.
+ *
+ * @param plant the power plant to register.
+ * @throws PowerPlantAlreadyExistsException if a plant with the same ID already exists.
+ * @throws PowerPlantWriteOperationException if the reading thread is interrupted.
+ */
+ public void registerPlant(PowerPlant plant) {
+ try {
+ lock.lockWrite();
+ // Check if the plant already exists
+ if (plants.containsKey(plant.getId())) {
+ throw new PowerPlantAlreadyExistsException(plant.getId());
+ }
+ plants.put(plant.getId(), plant);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PowerPlantWriteOperationException();
+ } finally {
+ lock.unlockWrite();
+ }
+ }
+
+ /**
+ * Returns a list of all registered power plants without acquiring a lock.
+ *
+ * @return an immutable snapshot list of all current power plants.
+ */
+ private List getAllPlantsUnsafe() {
+ return List.copyOf(plants.values());
+ }
+}
diff --git a/desm-server/src/main/resources/application.properties b/desm-server/src/main/resources/application.properties
new file mode 100644
index 0000000..7412338
--- /dev/null
+++ b/desm-server/src/main/resources/application.properties
@@ -0,0 +1,2 @@
+spring.application.name=desm
+server.port=8080
\ No newline at end of file
diff --git a/desm-server/src/test/java/org/example/desm/server/DesmApplicationTest.java b/desm-server/src/test/java/org/example/desm/server/DesmApplicationTest.java
new file mode 100644
index 0000000..d480b9b
--- /dev/null
+++ b/desm-server/src/test/java/org/example/desm/server/DesmApplicationTest.java
@@ -0,0 +1,23 @@
+package org.example.desm.server;
+
+import org.example.desm.server.controller.PollutionController;
+import org.example.desm.server.controller.PowerPlantController;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+@SpringBootTest
+class DesmApplicationTest {
+ @Autowired
+ PowerPlantController powerPlantController;
+ @Autowired
+ PollutionController pollutionController;
+
+ @Test
+ void contextLoads() {
+ assertNotNull(powerPlantController);
+ assertNotNull(pollutionController);
+ }
+}
diff --git a/desm-server/src/test/java/org/example/desm/server/component/PollutionSubscriberTest.java b/desm-server/src/test/java/org/example/desm/server/component/PollutionSubscriberTest.java
new file mode 100644
index 0000000..687ea2e
--- /dev/null
+++ b/desm-server/src/test/java/org/example/desm/server/component/PollutionSubscriberTest.java
@@ -0,0 +1,82 @@
+package org.example.desm.server.component;
+
+import com.google.gson.Gson;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.example.desm.common.model.MeasurementsWrapper;
+import org.example.desm.common.simulator.Measurement;
+import org.example.desm.server.service.PollutionService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.List;
+
+import static org.example.desm.common.Constant.TOPIC_CO2_MEASUREMENT;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+class PollutionSubscriberTest {
+ @Mock
+ PollutionService pollutionService;
+
+ @Mock
+ IMqttClient mqttClient;
+
+ PollutionSubscriber pollutionSubscriber;
+
+ @BeforeEach
+ void setUp() {
+ pollutionSubscriber = new PollutionSubscriber(pollutionService, mqttClient);
+ }
+
+ @Test
+ void testInit() throws Exception {
+ doNothing().when(mqttClient).setCallback(any());
+ doNothing().when(mqttClient).connect(any());
+ doNothing().when(mqttClient).subscribe(anyString(), anyInt());
+
+ pollutionSubscriber.init();
+
+ verify(mqttClient).setCallback(pollutionSubscriber);
+ verify(mqttClient).connect(any(MqttConnectOptions.class));
+ verify(mqttClient).subscribe(eq(TOPIC_CO2_MEASUREMENT), anyInt());
+ }
+
+ @Test
+ void testMessageArrived() {
+ List measurements = List.of(
+ new Measurement("id1", "co2", 400, 1000),
+ new Measurement("id2", "co2", 200, 2000)
+ );
+
+ MeasurementsWrapper wrapper = new MeasurementsWrapper();
+ wrapper.setMeasurements(measurements);
+
+ Gson gson = new Gson();
+ String payload = gson.toJson(wrapper);
+
+ MqttMessage mqttMessage = new MqttMessage(payload.getBytes());
+
+ pollutionSubscriber.messageArrived("topic", mqttMessage);
+
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class);
+ verify(pollutionService).registerMeasurements(captor.capture());
+
+ List captured = captor.getValue();
+
+ assertEquals(measurements.size(), captured.size());
+ assertEquals(measurements, captured);
+ }
+}
\ No newline at end of file
diff --git a/desm-server/src/test/java/org/example/desm/server/controller/PollutionControllerTest.java b/desm-server/src/test/java/org/example/desm/server/controller/PollutionControllerTest.java
new file mode 100644
index 0000000..984cb84
--- /dev/null
+++ b/desm-server/src/test/java/org/example/desm/server/controller/PollutionControllerTest.java
@@ -0,0 +1,47 @@
+package org.example.desm.server.controller;
+
+import org.example.desm.server.service.PollutionService;
+import com.google.gson.Gson;
+import org.example.desm.common.model.AverageEmission;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
+import org.springframework.test.context.bean.override.mockito.MockitoBean;
+import org.springframework.test.web.servlet.MockMvc;
+
+import static org.example.desm.common.Constant.ENDPOINT_POLLUTION;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+@WebMvcTest(PollutionController.class)
+class PollutionControllerTest {
+ @Autowired
+ MockMvc mockMvc;
+
+ @MockitoBean
+ PollutionService pollutionService;
+
+ Gson gson = new Gson();
+
+ @Test
+ void testGetAvgEmissionLevel() throws Exception {
+ long t1 = 1000;
+ long t2 = 2000;
+ double avg = 10.0;
+ AverageEmission response = new AverageEmission(t1, t2, avg);
+
+ when(pollutionService.getAvgEmissionLevel(any(Long.class), any(Long.class)))
+ .thenReturn(avg);
+
+ String expectedResponse = gson.toJson(response);
+
+ mockMvc.perform(get(ENDPOINT_POLLUTION)
+ .param("t1", String.valueOf(t1))
+ .param("t2", String.valueOf(t2)))
+ .andExpect(status().isOk())
+ .andExpect(content().json(expectedResponse));
+ }
+}
\ No newline at end of file
diff --git a/desm-server/src/test/java/org/example/desm/server/controller/PowerPlantControllerTest.java b/desm-server/src/test/java/org/example/desm/server/controller/PowerPlantControllerTest.java
new file mode 100644
index 0000000..d2ccd0d
--- /dev/null
+++ b/desm-server/src/test/java/org/example/desm/server/controller/PowerPlantControllerTest.java
@@ -0,0 +1,92 @@
+package org.example.desm.server.controller;
+
+import org.example.desm.server.exception.PowerPlantAlreadyExistsException;
+import org.example.desm.server.service.PowerPlantService;
+import com.google.gson.Gson;
+import org.example.desm.common.model.PowerPlant;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.bean.override.mockito.MockitoBean;
+import org.springframework.test.web.servlet.MockMvc;
+
+import java.util.List;
+
+import static org.example.desm.common.Constant.ENDPOINT_PLANTS;
+import static org.example.desm.common.Constant.ENDPOINT_REGISTER_PLANT;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+@WebMvcTest(PowerPlantController.class)
+class PowerPlantControllerTest {
+ @Autowired
+ MockMvc mockMvc;
+
+ @MockitoBean
+ PowerPlantService powerPlantService;
+
+ Gson gson = new Gson();
+
+ @Test
+ void testGetAllPlants() throws Exception {
+ List plants = List.of(
+ new PowerPlant("plant-1", "localhost", 0),
+ new PowerPlant("plant-2", "localhost", 1)
+ );
+
+ when(powerPlantService.getAllPlants()).thenReturn(plants);
+
+ String expectedResponse = gson.toJson(plants);
+
+ mockMvc.perform(get(ENDPOINT_PLANTS))
+ .andExpect(status().isOk())
+ .andExpect(jsonPath("$.size()").value(plants.size()))
+ .andExpect(content().json(expectedResponse));
+ }
+
+ @Test
+ void testRegisterPlant() throws Exception {
+ PowerPlant plant = new PowerPlant("plant-1", "localhost", 0);
+ PowerPlant newPlant = new PowerPlant("plant-2", "localhost", 1);
+ List updatedList = List.of(plant, newPlant);
+
+ doNothing().when(powerPlantService).registerPlant(any(PowerPlant.class));
+ when(powerPlantService.getAllPlants()).thenReturn(updatedList);
+
+ String requestBody = gson.toJson(newPlant);
+ String expectedResponse = gson.toJson(updatedList);
+
+ mockMvc.perform(
+ post(ENDPOINT_PLANTS + ENDPOINT_REGISTER_PLANT)
+ .contentType(MediaType.APPLICATION_JSON)
+ .content(requestBody))
+ .andExpect(status().isOk())
+ .andExpect(content().json(expectedResponse));
+ }
+
+ @Test
+ void testRegisterPlantAlreadyExistsError() throws Exception {
+ PowerPlant duplicate = new PowerPlant("plant-1", "localhost", 0);
+
+ PowerPlantAlreadyExistsException exception = new PowerPlantAlreadyExistsException(duplicate.getId());
+
+ doThrow(exception).when(powerPlantService).registerPlant(any(PowerPlant.class));
+
+ String requestBody = gson.toJson(duplicate);
+ String expectedResponse = gson.toJson(exception.buildResponse());
+
+ mockMvc.perform(post(ENDPOINT_PLANTS + ENDPOINT_REGISTER_PLANT)
+ .contentType(MediaType.APPLICATION_JSON)
+ .content(requestBody))
+ .andExpect(status().isConflict())
+ .andExpect(content().json(expectedResponse));
+ }
+}
\ No newline at end of file
diff --git a/desm-server/src/test/java/org/example/desm/server/service/PollutionServiceTest.java b/desm-server/src/test/java/org/example/desm/server/service/PollutionServiceTest.java
new file mode 100644
index 0000000..eb1f35d
--- /dev/null
+++ b/desm-server/src/test/java/org/example/desm/server/service/PollutionServiceTest.java
@@ -0,0 +1,121 @@
+package org.example.desm.server.service;
+
+import org.example.desm.common.simulator.Measurement;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class PollutionServiceTest {
+ PollutionService pollutionService;
+
+ @BeforeEach
+ void setUp() {
+ pollutionService = new PollutionService();
+ }
+
+ @Test
+ void testRegisterAndGetAverage() {
+ long now = 100000;
+
+ List measurements = List.of(
+ new Measurement("plant-1", "Co2", 5000.0, now - 2000),
+ new Measurement("plant-1", "Co2", 50.0, now - 1000),
+ new Measurement("plant-2", "Co2", 100.0, now),
+ new Measurement("plant-3", "Co2", 150.0, now + 1000)
+ );
+
+ pollutionService.registerMeasurements(measurements);
+
+ double avg = pollutionService.getAvgEmissionLevel(now - 1000, now + 1000);
+ assertEquals(100.0, avg, 0.001);
+ }
+
+ @Test
+ public void testGetAvgEmissionLevelWhenNoData() {
+ long now = 100000;
+ double avg = pollutionService.getAvgEmissionLevel(now - 1000, now + 1000);
+ assertEquals(0.0, avg);
+ }
+
+ @Test
+ public void testMeasurementOutsideInterval() {
+ long now = 100000;
+
+ List measurements = List.of(
+ new Measurement("plant-2", "Co2", 100.0, now - 5000),
+ new Measurement("plant-2", "Co2", 50.0, now - 10000)
+ );
+
+ pollutionService.registerMeasurements(measurements);
+
+ double avg = pollutionService.getAvgEmissionLevel(now - 1000, now + 1000);
+ assertEquals(0.0, avg);
+ }
+
+ @Test
+ void multipleGetAndRegistrationRandomArrivalTime() throws InterruptedException {
+ int get = 10;
+ int register = 10;
+ CountDownLatch getLatch = new CountDownLatch(get);
+ CountDownLatch registerLatch = new CountDownLatch(register);
+ int registrationCount = 3;
+ double[][] randomEmissions = new double[register][registrationCount];
+
+ for (int i = 0; i < register; i++) {
+ for (int j = 0; j < registrationCount; j++) {
+ randomEmissions[i][j] = Math.random() * 1000;
+ }
+ }
+
+ for (int i = 0; i < get; i++) {
+ new Thread(() -> {
+ // Random delay to simulate different arrival times of the threads.
+ randomSleep();
+ // Check if it does not throw an exception.
+ assertDoesNotThrow(() -> pollutionService.getAvgEmissionLevel(0, Long.MAX_VALUE));
+ getLatch.countDown();
+ }).start();
+ }
+
+ for (int i = 0; i < register; i++) {
+ int finalI = i;
+ new Thread(() -> {
+ // Random delay to simulate different arrival times of the threads.
+ randomSleep();
+ String id = "plant-" + Thread.currentThread().getName();
+ List measurements = new ArrayList<>();
+ for (int j = 0; j < registrationCount; j++) {
+ measurements.add(new Measurement(id, "Co2", randomEmissions[finalI][j], 100));
+ }
+ // Check if it does not throw an exception.
+ assertDoesNotThrow(() -> pollutionService.registerMeasurements(measurements));
+ registerLatch.countDown();
+ }).start();
+ }
+
+ assertTrue(getLatch.await(10000, TimeUnit.MILLISECONDS));
+ assertTrue(registerLatch.await(10000, TimeUnit.MILLISECONDS));
+
+ double sum = 0;
+ for (int i = 0; i < register; i++) {
+ for (int j = 0; j < registrationCount; j++) {
+ sum += randomEmissions[i][j];
+ }
+ }
+ double expectedAvg = sum / (register * registrationCount);
+ double avg = pollutionService.getAvgEmissionLevel(0, Long.MAX_VALUE);
+ assertEquals(expectedAvg, avg, 0.001);
+ }
+
+ private void randomSleep() {
+ try {
+ Thread.sleep((int) (Math.random() * 1000));
+ } catch (InterruptedException ignored) {}
+ }
+}
\ No newline at end of file
diff --git a/desm-server/src/test/java/org/example/desm/server/service/PowerPlantServiceTest.java b/desm-server/src/test/java/org/example/desm/server/service/PowerPlantServiceTest.java
new file mode 100644
index 0000000..d02c7c9
--- /dev/null
+++ b/desm-server/src/test/java/org/example/desm/server/service/PowerPlantServiceTest.java
@@ -0,0 +1,144 @@
+package org.example.desm.server.service;
+
+import org.example.desm.server.exception.PowerPlantAlreadyExistsException;
+import org.example.desm.common.model.PowerPlant;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class PowerPlantServiceTest {
+ PowerPlantService powerPlantService;
+
+ @BeforeEach
+ void setUp() {
+ powerPlantService = new PowerPlantService();
+ }
+
+ @Test
+ void testRegisterPowerPlant() {
+ PowerPlant plant = new PowerPlant("plant-1", "localhost", 0);
+
+ powerPlantService.registerPlant(plant);
+ List result = powerPlantService.getAllPlants();
+
+ assertEquals(1, result.size());
+ assertEquals(plant, result.get(0));
+ }
+
+ @Test
+ void testRegisterPlantAlreadyExistsThrowsException() {
+ PowerPlant plant = new PowerPlant("plant-1", "localhost", 0);
+ PowerPlant duplicate = new PowerPlant("plant-1", "localhost", 1);
+
+ powerPlantService.registerPlant(plant);
+
+ assertThrows(PowerPlantAlreadyExistsException.class, () ->
+ powerPlantService.registerPlant(duplicate));
+ }
+
+ @Test
+ void testRegisterAndGetAllPlants() {
+ List toRegister = List.of(
+ new PowerPlant("plant-1", "localhost", 0),
+ new PowerPlant("plant-2", "localhost", 1),
+ new PowerPlant("plant-3", "localhost", 2)
+ );
+
+ for (PowerPlant plant : toRegister) {
+ powerPlantService.registerPlant(plant);
+ }
+
+ List allPlants = new ArrayList<>(powerPlantService.getAllPlants());
+ allPlants.sort(Comparator.comparing(PowerPlant::getId));
+
+ assertEquals(toRegister.size(), allPlants.size());
+ for (int i = 0; i < allPlants.size(); i++) {
+ assertEquals(toRegister.get(i), allPlants.get(i));
+ }
+ }
+
+ @Test
+ void multipleGetAndRegistrationRandomArrivalTime() throws InterruptedException {
+ int get = 10;
+ int register = 10;
+ CountDownLatch getLatch = new CountDownLatch(get);
+ CountDownLatch registerLatch = new CountDownLatch(register);
+
+ for (int i = 0; i < get; i++) {
+ new Thread(() -> {
+ // Random delay to simulate different arrival times of the threads.
+ randomSleep();
+ // Check if it does not throw an exception like ConcurrentModificationException.
+ assertDoesNotThrow(() -> powerPlantService.getAllPlants());
+ getLatch.countDown();
+ }).start();
+ }
+
+ for (int i = 0; i < register; i++) {
+ int finalI = i;
+ new Thread(() -> {
+ // Random delay to simulate different arrival times of the threads.
+ randomSleep();
+ String id = "plant-" + Thread.currentThread().getName();
+ PowerPlant plant = new PowerPlant(id, "localhost", finalI);
+ // Check if it does not throw an exception like ConcurrentModificationException.
+ assertDoesNotThrow(() -> powerPlantService.registerPlant(plant));
+ registerLatch.countDown();
+ }).start();
+ }
+
+ assertTrue(getLatch.await(10000, TimeUnit.MILLISECONDS));
+ assertTrue(registerLatch.await(10000, TimeUnit.MILLISECONDS));
+
+ List allPlants = powerPlantService.getAllPlants();
+ assertEquals(register, allPlants.size());
+ }
+
+ @Test
+ void multipleRegistrationWithSameId() throws InterruptedException {
+ int register = 10;
+ CountDownLatch latchRegistration = new CountDownLatch(1);
+ CountDownLatch latchExceptions = new CountDownLatch(register - 1);
+ CyclicBarrier barrier = new CyclicBarrier(register);
+
+ for (int i = 0; i < register; i++) {
+ int finalI = i;
+ new Thread(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ fail(e);
+ }
+ String id = "plant-1";
+ PowerPlant plant = new PowerPlant(id, "localhost", finalI);
+ try{
+ powerPlantService.registerPlant(plant);
+ latchRegistration.countDown();
+ } catch (PowerPlantAlreadyExistsException e) {
+ latchExceptions.countDown();
+ }
+ }).start();
+ }
+
+ assertTrue(latchRegistration.await(10000, TimeUnit.MILLISECONDS));
+ assertTrue(latchExceptions.await(10000, TimeUnit.MILLISECONDS));
+
+ List allPlants = powerPlantService.getAllPlants();
+ assertEquals(1, allPlants.size());
+ }
+
+ private void randomSleep() {
+ try {
+ Thread.sleep((int) (Math.random() * 1000));
+ } catch (InterruptedException ignored) {}
+ }
+}
+
diff --git a/settings.gradle b/settings.gradle
index 011c6ca..8ae2939 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,4 +1,7 @@
rootProject.name = 'desm'
-include 'desm-common'
+include 'desm-client'
+include 'desm-core'
+include 'desm-network'
include 'desm-provider'
+include 'desm-server'