From dc7426f9283f763b2fa0f568d71c99e844cc0890 Mon Sep 17 00:00:00 2001
From: Arun Pandian <pandiana@google.com>
Date: Thu, 23 Jan 2025 02:15:21 -0800
Subject: [PATCH] [Dataflow Streaming] fix max thread time calculation (#33686)

---
 .../worker/util/BoundedQueueExecutor.java     |  9 +--
 .../worker/util/BoundedQueueExecutorTest.java | 56 ++++++++++++++++++-
 2 files changed, 60 insertions(+), 5 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index 9286be84ceaa..9905c0ae5b5b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -30,6 +30,7 @@
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
 public class BoundedQueueExecutor {
+
   private final ThreadPoolExecutor executor;
   private final long maximumBytesOutstanding;
 
@@ -54,17 +55,17 @@ public class BoundedQueueExecutor {
   private long totalTimeMaxActiveThreadsUsed;
 
   public BoundedQueueExecutor(
-      int maximumPoolSize,
+      int initialMaximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
       int maximumElementsOutstanding,
       long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    this.maximumPoolSize = maximumPoolSize;
+    this.maximumPoolSize = initialMaximumPoolSize;
     executor =
         new ThreadPoolExecutor(
-            maximumPoolSize,
-            maximumPoolSize,
+            initialMaximumPoolSize,
+            initialMaximumPoolSize,
             keepAliveTime,
             unit,
             new LinkedBlockingQueue<>(),
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
index df82200986ad..af1519f8a84a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
@@ -50,6 +50,7 @@
 // released (2.11.0)
 @SuppressWarnings("unused")
 public class BoundedQueueExecutorTest {
+
   private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
   private static final int DEFAULT_MAX_THREADS = 2;
   private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
@@ -247,7 +248,8 @@ public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception {
   }
 
   @Test
-  public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeUpdated() throws Exception {
+  public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsIncreased()
+      throws Exception {
     CountDownLatch processStart1 = new CountDownLatch(1);
     CountDownLatch processStart2 = new CountDownLatch(1);
     CountDownLatch processStart3 = new CountDownLatch(1);
@@ -287,6 +289,58 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeUpdated()
     executor.shutdown();
   }
 
+  @Test
+  public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsReduced()
+      throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.getMaximumPoolSize());
+
+    // m1 is accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    assertEquals(2, executor.getMaximumPoolSize());
+    assertEquals(0L, executor.allThreadsActiveTime());
+
+    processStop1.countDown();
+    while (executor.activeCount() != 0) {
+      // Waiting for all threads to be ended.
+      Thread.sleep(200);
+    }
+
+    // Reduce max pool size to 1
+    executor.setMaximumPoolSize(1, 105);
+
+    assertEquals(0, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    Thread.sleep(100);
+    assertEquals(1, executor.activeCount());
+    assertEquals(1, executor.getMaximumPoolSize());
+    processStop2.countDown();
+
+    while (executor.activeCount() != 0) {
+      // Waiting for all threads to be ended.
+      Thread.sleep(200);
+    }
+
+    // allThreadsActiveTime() should be recorded
+    // since when the second task was running it reached the new max pool size.
+    assertThat(executor.allThreadsActiveTime(), greaterThan(0L));
+    executor.shutdown();
+  }
+
   @Test
   public void testRenderSummaryHtml() {
     String expectedSummaryHtml =