Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Feb 16, 2024
1 parent 0166ff6 commit 991fc86
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected BatchDataflowWorker(

this.memoryMonitor = MemoryMonitor.fromOptions(options);
this.statusPages =
WorkerStatusPages.create(options, DEFAULT_STATUS_PORT, this.memoryMonitor, () -> true);
WorkerStatusPages.create(DEFAULT_STATUS_PORT, this.memoryMonitor, () -> true);

if (!DataflowRunner.hasExperiment(options, "disable_debug_capture")) {
this.debugCaptureManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,7 @@ public class StreamingDataflowWorker {
this.executorSupplier = executorSupplier;
this.windmillServiceEnabled = options.isEnableStreamingEngine();
this.memoryMonitor = MemoryMonitor.fromOptions(options);
this.statusPages =
WorkerStatusPages.create(options, DEFAULT_STATUS_PORT, memoryMonitor, () -> true);
this.statusPages = WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor, () -> true);
if (windmillServiceEnabled) {
this.debugCaptureManager =
new DebugCapture.Manager(options, statusPages.getDebugCapturePages());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.status.DebugCapture.Capturable;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.sdk.util.construction.Environments;
Expand Down Expand Up @@ -58,7 +57,6 @@ public class WorkerStatusPages {

// Install the default servlets (threadz, healthz, heapz, jfrz, statusz)
addServlet(threadzServlet);

addServlet(new HealthzServlet(healthyIndicator));
addServlet(new HeapzServlet(memoryMonitor));
if (Environments.getJavaVersion() != Environments.JavaVersion.java8) {
Expand All @@ -74,10 +72,7 @@ public class WorkerStatusPages {
}

public static WorkerStatusPages create(
DataflowWorkerHarnessOptions options,
int defaultStatusPort,
MemoryMonitor memoryMonitor,
BooleanSupplier healthyIndicator) {
int defaultStatusPort, MemoryMonitor memoryMonitor, BooleanSupplier healthyIndicator) {
int statusPort = defaultStatusPort;
if (System.getProperties().containsKey("status_port")) {
statusPort = Integer.parseInt(System.getProperty("status_port"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
: randomlySelectNextStub(windmillServiceStubs));
}

synchronized ImmutableSet<HostAndPort> getDispatcherEndpoints() {
ImmutableSet<HostAndPort> getDispatcherEndpoints() {
return dispatcherStubs.get().dispatcherEndpoints();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import static org.hamcrest.Matchers.containsString;

import java.util.function.BooleanSupplier;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.Server;
import org.junit.After;
Expand All @@ -47,8 +45,6 @@ public class WorkerStatusPagesTest {
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
DataflowWorkerHarnessOptions options =
PipelineOptionsFactory.create().as(DataflowWorkerHarnessOptions.class);
wsp = new WorkerStatusPages(server, mockMemoryMonitor, mockHealthyIndicator);
server.addConnector(connector);
wsp.start();
Expand Down Expand Up @@ -78,8 +74,6 @@ public void testHealthzHealthy() throws Exception {

@Test
public void testHealthzUnhealthy() throws Exception {
DataflowWorkerHarnessOptions options =
PipelineOptionsFactory.create().as(DataflowWorkerHarnessOptions.class);
// set up WorkerStatusPages that respond unhealthy status on "healthz"
wsp.stop();
wsp = new WorkerStatusPages(server, mockMemoryMonitor, () -> false);
Expand Down

0 comments on commit 991fc86

Please sign in to comment.