diff --git a/conf/default-config.json b/conf/default-config.json index c5de0d87b..bfa6bb020 100644 --- a/conf/default-config.json +++ b/conf/default-config.json @@ -41,4 +41,4 @@ "operator_type": "public", "enable_remote_config": true, "uid_instance_id_prefix": "local-operator" -} +} \ No newline at end of file diff --git a/src/main/java/com/uid2/operator/Main.java b/src/main/java/com/uid2/operator/Main.java index c4fe54c7e..46d9f6bc4 100644 --- a/src/main/java/com/uid2/operator/Main.java +++ b/src/main/java/com/uid2/operator/Main.java @@ -114,7 +114,9 @@ public Main(Vertx vertx, JsonObject config) throws Exception { this.clientSideTokenGenerate = config.getBoolean(Const.Config.EnableClientSideTokenGenerate, false); this.validateServiceLinks = config.getBoolean(Const.Config.ValidateServiceLinks, false); this.encryptedCloudFilesEnabled = config.getBoolean(Const.Config.EncryptedFiles, false); - this.shutdownHandler = new OperatorShutdownHandler(Duration.ofHours(12), Duration.ofHours(config.getInteger(Const.Config.SaltsExpiredShutdownHours, 12)), Clock.systemUTC(), new ShutdownService()); + this.shutdownHandler = new OperatorShutdownHandler(Duration.ofHours(12), + Duration.ofHours(config.getInteger(Const.Config.SaltsExpiredShutdownHours, 12)), Duration.ofDays(7), + Clock.systemUTC(), new ShutdownService()); this.uidInstanceIdProvider = new UidInstanceIdProvider(config); String coreAttestUrl = this.config.getString(Const.Config.CoreAttestUrlProp); @@ -314,20 +316,19 @@ private ICloudStorage wrapCloudStorageForOptOut(ICloudStorage cloudStorage) { if (config.getBoolean(Const.Config.OptOutS3PathCompatProp)) { LOGGER.warn("Using S3 Path Compatibility Conversion: log -> delta, snapshot -> partition"); return new PathConversionWrapper( - cloudStorage, - in -> { - String out = in.replace("log", "delta") - .replace("snapshot", "partition"); - LOGGER.debug("S3 path forward convert: " + in + " -> " + out); - return out; - }, - in -> { - String out = in.replace("delta", "log") - .replace("partition", "snapshot"); - LOGGER.debug("S3 path backward convert: " + in + " -> " + out); - return out; - } - ); + cloudStorage, + in -> { + String out = in.replace("log", "delta") + .replace("snapshot", "partition"); + LOGGER.debug("S3 path forward convert: " + in + " -> " + out); + return out; + }, + in -> { + String out = in.replace("delta", "log") + .replace("partition", "snapshot"); + LOGGER.debug("S3 path backward convert: " + in + " -> " + out); + return out; + }); } else { return cloudStorage; } @@ -352,8 +353,10 @@ private void run() throws Exception { fs.add(createStoreVerticles()); CompositeFuture.all(fs).onComplete(ar -> { - if (ar.failed()) compositePromise.fail(new Exception(ar.cause())); - else compositePromise.complete(); + if (ar.failed()) + compositePromise.fail(new Exception(ar.cause())); + else + compositePromise.complete(); }); compositePromise.future() @@ -403,20 +406,24 @@ private Future createStoreVerticles() throws Exception { if (clientSideTokenGenerate) { fs.add(createAndDeployRotatingStoreVerticle("site", siteProvider, "site_refresh_ms")); - fs.add(createAndDeployRotatingStoreVerticle("client_side_keypairs", clientSideKeypairProvider, "client_side_keypairs_refresh_ms")); + fs.add(createAndDeployRotatingStoreVerticle("client_side_keypairs", clientSideKeypairProvider, + "client_side_keypairs_refresh_ms")); } if (validateServiceLinks) { fs.add(createAndDeployRotatingStoreVerticle("service", serviceProvider, "service_refresh_ms")); - fs.add(createAndDeployRotatingStoreVerticle("service_link", serviceLinkProvider, "service_link_refresh_ms")); + fs.add(createAndDeployRotatingStoreVerticle("service_link", serviceLinkProvider, + "service_link_refresh_ms")); } if (encryptedCloudFilesEnabled) { - fs.add(createAndDeployRotatingStoreVerticle("cloud_encryption_keys", cloudEncryptionKeyProvider, "cloud_encryption_keys_refresh_ms")); + fs.add(createAndDeployRotatingStoreVerticle("cloud_encryption_keys", cloudEncryptionKeyProvider, + "cloud_encryption_keys_refresh_ms")); } if (useRemoteConfig) { - fs.add(createAndDeployRotatingStoreVerticle("runtime_config", (RuntimeConfigStore) configStore, Const.Config.ConfigScanPeriodMsProp)); + fs.add(createAndDeployRotatingStoreVerticle("runtime_config", (RuntimeConfigStore) configStore, + Const.Config.ConfigScanPeriodMsProp)); } fs.add(createAndDeployRotatingStoreVerticle("auth", clientKeyProvider, "auth_refresh_ms")); fs.add(createAndDeployRotatingStoreVerticle("keyset", keysetProvider, "keyset_refresh_ms")); @@ -424,15 +431,17 @@ private Future createStoreVerticles() throws Exception { fs.add(createAndDeployRotatingStoreVerticle("salt", saltProvider, "salt_refresh_ms")); fs.add(createAndDeployCloudSyncStoreVerticle("optout", fsOptOut, optOutCloudSync)); CompositeFuture.all(fs).onComplete(ar -> { - if (ar.failed()) promise.fail(new Exception(ar.cause())); - else promise.complete(); + if (ar.failed()) + promise.fail(new Exception(ar.cause())); + else + promise.complete(); }); - return promise.future(); } - private Future createAndDeployRotatingStoreVerticle(String name, IMetadataVersionedStore store, String storeRefreshConfigMs) { + private Future createAndDeployRotatingStoreVerticle(String name, IMetadataVersionedStore store, + String storeRefreshConfigMs) { final int intervalMs = config.getInteger(storeRefreshConfigMs, 10000); RotatingStoreVerticle rotatingStoreVerticle = new RotatingStoreVerticle(name, intervalMs, store); @@ -440,14 +449,16 @@ private Future createAndDeployRotatingStoreVerticle(String name, IMetada } private Future createAndDeployCloudSyncStoreVerticle(String name, ICloudStorage fsCloud, - ICloudSync cloudSync) { + ICloudSync cloudSync) { CloudSyncVerticle cloudSyncVerticle = new CloudSyncVerticle(name, fsCloud, fsLocal, cloudSync, config); return vertx.deployVerticle(cloudSyncVerticle) - .onComplete(v -> setupTimerEvent(cloudSyncVerticle.eventRefresh())); + .onComplete(v -> setupTimerEvent(cloudSyncVerticle.eventRefresh())); } private Future createAndDeployStatsCollector() { - StatsCollectorVerticle statsCollectorVerticle = new StatsCollectorVerticle(60000, config.getInteger(Const.Config.MaxInvalidPaths, 50), config.getInteger(Const.Config.MaxVersionBucketsPerSite, 50)); + StatsCollectorVerticle statsCollectorVerticle = new StatsCollectorVerticle(60000, + config.getInteger(Const.Config.MaxInvalidPaths, 50), + config.getInteger(Const.Config.MaxVersionBucketsPerSite, 50)); Future result = vertx.deployVerticle(statsCollectorVerticle); _statsCollectorQueue = statsCollectorVerticle; return result; @@ -466,31 +477,33 @@ private static Vertx createVertx() { ObjectName objectName = new ObjectName("uid2.operator:type=jmx,name=AdminApi"); MBeanServer server = ManagementFactory.getPlatformMBeanServer(); server.registerMBean(AdminApi.instance, objectName); - } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) { + } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException + | MalformedObjectNameException e) { LOGGER.error("mBean initialisation failed {}", e.getMessage(), e); System.exit(-1); } final int portOffset = Utils.getPortOffset(); VertxPrometheusOptions prometheusOptions = new VertxPrometheusOptions() - .setStartEmbeddedServer(true) - .setEmbeddedServerOptions(new HttpServerOptions().setPort(Const.Port.PrometheusPortForOperator + portOffset)) - .setEnabled(true); + .setStartEmbeddedServer(true) + .setEmbeddedServerOptions( + new HttpServerOptions().setPort(Const.Port.PrometheusPortForOperator + portOffset)) + .setEnabled(true); MicrometerMetricsOptions metricOptions = new MicrometerMetricsOptions() - .setPrometheusOptions(prometheusOptions) - .setLabels(EnumSet.of(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH)) - .setJvmMetricsEnabled(true) - .setEnabled(true); + .setPrometheusOptions(prometheusOptions) + .setLabels(EnumSet.of(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH)) + .setJvmMetricsEnabled(true) + .setEnabled(true); setupMetrics(metricOptions); final int threadBlockedCheckInterval = Utils.isProductionEnvironment() - ? 60 * 1000 - : 3600 * 1000; + ? 60 * 1000 + : 3600 * 1000; VertxOptions vertxOptions = new VertxOptions() - .setMetricsOptions(metricOptions) - .setBlockedThreadCheckInterval(threadBlockedCheckInterval); + .setMetricsOptions(metricOptions) + .setBlockedThreadCheckInterval(threadBlockedCheckInterval); return Vertx.vertx(vertxOptions); } @@ -505,32 +518,35 @@ private static void setupMetrics(MicrometerMetricsOptions metricOptions) { // see also https://micrometer.io/docs/registry/prometheus prometheusRegistry.config() - // providing common renaming for prometheus metric, e.g. "hello.world" to "hello_world" - .meterFilter(new PrometheusRenameFilter()) - .meterFilter(MeterFilter.replaceTagValues(Label.HTTP_PATH.toString(), - actualPath -> HTTPPathMetricFilter.filterPath(actualPath, Endpoints.pathSet()))) - // Don't record metrics for 404s. - .meterFilter(MeterFilter.deny(id -> - id.getName().startsWith(MetricsDomain.HTTP_SERVER.getPrefix()) && - Objects.equals(id.getTag(Label.HTTP_CODE.toString()), "404"))) - .meterFilter(new MeterFilter() { - private final String httpServerResponseTime = MetricsDomain.HTTP_SERVER.getPrefix() + MetricsNaming.v4Names().getHttpResponseTime(); - - @Override - public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) { - if (id.getName().equals(httpServerResponseTime)) { - return DistributionStatisticConfig.builder() - .percentiles(0.90, 0.95, 0.99) - .build() - .merge(config); + // providing common renaming for prometheus metric, e.g. "hello.world" to + // "hello_world" + .meterFilter(new PrometheusRenameFilter()) + .meterFilter(MeterFilter.replaceTagValues(Label.HTTP_PATH.toString(), + actualPath -> HTTPPathMetricFilter.filterPath(actualPath, Endpoints.pathSet()))) + // Don't record metrics for 404s. + .meterFilter( + MeterFilter.deny(id -> id.getName().startsWith(MetricsDomain.HTTP_SERVER.getPrefix()) && + Objects.equals(id.getTag(Label.HTTP_CODE.toString()), "404"))) + .meterFilter(new MeterFilter() { + private final String httpServerResponseTime = MetricsDomain.HTTP_SERVER.getPrefix() + + MetricsNaming.v4Names().getHttpResponseTime(); + + @Override + public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) { + if (id.getName().equals(httpServerResponseTime)) { + return DistributionStatisticConfig.builder() + .percentiles(0.90, 0.95, 0.99) + .build() + .merge(config); + } + return config; } - return config; - } - }) - // adding common labels - .commonTags("application", "uid2-operator"); + }) + // adding common labels + .commonTags("application", "uid2-operator"); - // wire my monitoring system to global static state, see also https://micrometer.io/docs/concepts + // wire my monitoring system to global static state, see also + // https://micrometer.io/docs/concepts Metrics.addRegistry(prometheusRegistry); } @@ -555,14 +571,19 @@ private void createVertxEventLoopsMetric() { .register(Metrics.globalRegistry); } - private Map.Entry createUidClients(Vertx vertx, String attestationUrl, String clientApiToken, Handler> responseWatcher) throws Exception { - AttestationResponseHandler attestationResponseHandler = getAttestationTokenRetriever(vertx, attestationUrl, clientApiToken, responseWatcher); - UidCoreClient coreClient = new UidCoreClient(clientApiToken, CloudUtils.defaultProxy, attestationResponseHandler, this.encryptedCloudFilesEnabled, this.uidInstanceIdProvider); - UidOptOutClient optOutClient = new UidOptOutClient(clientApiToken, CloudUtils.defaultProxy, attestationResponseHandler, this.uidInstanceIdProvider); + private Map.Entry createUidClients(Vertx vertx, String attestationUrl, + String clientApiToken, Handler> responseWatcher) throws Exception { + AttestationResponseHandler attestationResponseHandler = getAttestationTokenRetriever(vertx, attestationUrl, + clientApiToken, responseWatcher); + UidCoreClient coreClient = new UidCoreClient(clientApiToken, CloudUtils.defaultProxy, + attestationResponseHandler, this.encryptedCloudFilesEnabled, this.uidInstanceIdProvider); + UidOptOutClient optOutClient = new UidOptOutClient(clientApiToken, CloudUtils.defaultProxy, + attestationResponseHandler, this.uidInstanceIdProvider); return new AbstractMap.SimpleEntry<>(coreClient, optOutClient); } - private AttestationResponseHandler getAttestationTokenRetriever(Vertx vertx, String attestationUrl, String clientApiToken, Handler> responseWatcher) throws Exception { + private AttestationResponseHandler getAttestationTokenRetriever(Vertx vertx, String attestationUrl, + String clientApiToken, Handler> responseWatcher) throws Exception { String enclavePlatform = this.config.getString(Const.Config.EnclavePlatformProp); String operatorType = this.config.getString(Const.Config.OperatorTypeProp, ""); @@ -587,14 +608,17 @@ private AttestationResponseHandler getAttestationTokenRetriever(Vertx vertx, Str break; case "azure-cc": LOGGER.info("creating uid core client with azure cc attestation protocol"); - String maaServerBaseUrl = this.config.getString(Const.Config.MaaServerBaseUrlProp, "https://sharedeus.eus.attest.azure.net"); + String maaServerBaseUrl = this.config.getString(Const.Config.MaaServerBaseUrlProp, + "https://sharedeus.eus.attest.azure.net"); attestationProvider = AttestationFactory.getAzureCCAttestation(maaServerBaseUrl); break; default: - throw new IllegalArgumentException(String.format("enclave_platform is providing the wrong value: %s", enclavePlatform)); + throw new IllegalArgumentException( + String.format("enclave_platform is providing the wrong value: %s", enclavePlatform)); } - return new AttestationResponseHandler(vertx, attestationUrl, clientApiToken, operatorType, this.appVersion, attestationProvider, responseWatcher, CloudUtils.defaultProxy, this.uidInstanceIdProvider); + return new AttestationResponseHandler(vertx, attestationUrl, clientApiToken, operatorType, this.appVersion, + attestationProvider, responseWatcher, CloudUtils.defaultProxy, this.uidInstanceIdProvider); } private IOperatorKeyRetriever createOperatorKeyRetriever() throws Exception { @@ -617,7 +641,8 @@ private IOperatorKeyRetriever createOperatorKeyRetriever() throws Exception { return OperatorKeyRetrieverFactory.getGcpOperatorKeyRetriever(secretVersionName); } default: { - throw new IllegalArgumentException(String.format("enclave_platform is providing the wrong value: %s", enclavePlatform)); + throw new IllegalArgumentException( + String.format("enclave_platform is providing the wrong value: %s", enclavePlatform)); } } } diff --git a/src/main/java/com/uid2/operator/vertx/OperatorShutdownHandler.java b/src/main/java/com/uid2/operator/vertx/OperatorShutdownHandler.java index 84075fb03..1928db48e 100644 --- a/src/main/java/com/uid2/operator/vertx/OperatorShutdownHandler.java +++ b/src/main/java/com/uid2/operator/vertx/OperatorShutdownHandler.java @@ -16,17 +16,28 @@ public class OperatorShutdownHandler { private static final Logger LOGGER = LoggerFactory.getLogger(OperatorShutdownHandler.class); private static final int SALT_FAILURE_LOG_INTERVAL_MINUTES = 10; + private static final int KEYSET_KEY_FAILURE_LOG_INTERVAL_MINUTES = 10; private final Duration attestShutdownWaitTime; private final Duration saltShutdownWaitTime; + private final Duration keysetKeyShutdownWaitTime; private final AtomicReference attestFailureStartTime = new AtomicReference<>(null); private final AtomicReference saltFailureStartTime = new AtomicReference<>(null); + private final AtomicReference keysetKeyFailureStartTime = new AtomicReference<>(null); private final AtomicReference lastSaltFailureLogTime = new AtomicReference<>(null); + private final AtomicReference lastKeysetKeyFailureLogTime = new AtomicReference<>(null); private final Clock clock; private final ShutdownService shutdownService; - public OperatorShutdownHandler(Duration attestShutdownWaitTime, Duration saltShutdownWaitTime, Clock clock, ShutdownService shutdownService) { + public OperatorShutdownHandler(Duration attestShutdownWaitTime, Duration saltShutdownWaitTime, Clock clock, + ShutdownService shutdownService) { + this(attestShutdownWaitTime, saltShutdownWaitTime, Duration.ofDays(7), clock, shutdownService); + } + + public OperatorShutdownHandler(Duration attestShutdownWaitTime, Duration saltShutdownWaitTime, + Duration keysetKeyShutdownWaitTime, Clock clock, ShutdownService shutdownService) { this.attestShutdownWaitTime = attestShutdownWaitTime; this.saltShutdownWaitTime = saltShutdownWaitTime; + this.keysetKeyShutdownWaitTime = keysetKeyShutdownWaitTime; this.clock = clock; this.shutdownService = shutdownService; } @@ -54,6 +65,40 @@ public void logSaltFailureAtInterval() { } } + public void handleKeysetKeyRefreshResponse(Boolean success) { + if (success) { + Instant previousFailureTime = keysetKeyFailureStartTime.getAndSet(null); + if (previousFailureTime != null) { + Duration failureDuration = Duration.between(previousFailureTime, clock.instant()); + // can remove later + LOGGER.info("keyset keys sync recovered after {} ({}d {}h {}m). shutdown timer reset.", + failureDuration, + failureDuration.toDays(), + failureDuration.toHoursPart(), + failureDuration.toMinutesPart()); + } + } else { + logKeysetKeyFailureAtInterval(); + Instant t = keysetKeyFailureStartTime.get(); + if (t == null) { + keysetKeyFailureStartTime.set(clock.instant()); + LOGGER.warn( + "keyset keys sync started failing. shutdown timer started (will shutdown in 7 days if not recovered)"); + } else if (Duration.between(t, clock.instant()).compareTo(this.keysetKeyShutdownWaitTime) > 0) { + LOGGER.error("keyset keys have been failing to sync for too long. shutting down operator"); + this.shutdownService.Shutdown(1); + } + } + } + + public void logKeysetKeyFailureAtInterval() { + Instant t = lastKeysetKeyFailureLogTime.get(); + if (t == null || clock.instant().isAfter(t.plus(KEYSET_KEY_FAILURE_LOG_INTERVAL_MINUTES, ChronoUnit.MINUTES))) { + LOGGER.error("keyset keys sync failing"); + lastKeysetKeyFailureLogTime.set(Instant.now()); + } + } + public void handleAttestResponse(Pair response) { if (response.left() == AttestationResponseCode.AttestationFailure) { LOGGER.error("core attestation failed with AttestationFailure, shutting down operator, core response: {}", response.right()); diff --git a/src/test/java/com/uid2/operator/OperatorShutdownHandlerTest.java b/src/test/java/com/uid2/operator/OperatorShutdownHandlerTest.java index 10a00b813..881da4341 100644 --- a/src/test/java/com/uid2/operator/OperatorShutdownHandlerTest.java +++ b/src/test/java/com/uid2/operator/OperatorShutdownHandlerTest.java @@ -32,18 +32,19 @@ public class OperatorShutdownHandlerTest { private AutoCloseable mocks; - @Mock private Clock clock; - @Mock private ShutdownService shutdownService; + @Mock + private Clock clock; + @Mock + private ShutdownService shutdownService; private OperatorShutdownHandler operatorShutdownHandler; - - @BeforeEach void beforeEach() { mocks = MockitoAnnotations.openMocks(this); when(clock.instant()).thenAnswer(i -> Instant.now()); doThrow(new RuntimeException()).when(shutdownService).Shutdown(1); - this.operatorShutdownHandler = new OperatorShutdownHandler(Duration.ofHours(12), Duration.ofHours(12), clock, shutdownService); + this.operatorShutdownHandler = new OperatorShutdownHandler(Duration.ofHours(12), Duration.ofHours(12), clock, + shutdownService); } @AfterEach @@ -59,11 +60,14 @@ void shutdownOnAttestFailure(VertxTestContext testContext) { // Revoke auth try { - this.operatorShutdownHandler.handleAttestResponse(Pair.of(AttestationResponseCode.AttestationFailure, "Unauthorized")); + this.operatorShutdownHandler + .handleAttestResponse(Pair.of(AttestationResponseCode.AttestationFailure, "Unauthorized")); } catch (RuntimeException e) { verify(shutdownService).Shutdown(1); String message = logWatcher.list.get(0).getFormattedMessage(); - Assertions.assertEquals("core attestation failed with AttestationFailure, shutting down operator, core response: Unauthorized", logWatcher.list.get(0).getFormattedMessage()); + Assertions.assertEquals( + "core attestation failed with AttestationFailure, shutting down operator, core response: Unauthorized", + logWatcher.list.get(0).getFormattedMessage()); testContext.completeNow(); } } @@ -81,7 +85,8 @@ void shutdownOnAttestFailedTooLong(VertxTestContext testContext) { this.operatorShutdownHandler.handleAttestResponse(Pair.of(AttestationResponseCode.RetryableFailure, "")); } catch (RuntimeException e) { verify(shutdownService).Shutdown(1); - Assertions.assertTrue(logWatcher.list.get(0).getFormattedMessage().contains("core attestation has been in failed state for too long. shutting down operator")); + Assertions.assertTrue(logWatcher.list.get(0).getFormattedMessage() + .contains("core attestation has been in failed state for too long. shutting down operator")); testContext.completeNow(); } } @@ -119,8 +124,10 @@ void shutdownOnSaltsExpiredTooLong(VertxTestContext testContext) { }); Assertions.assertAll("Expired Salts Log Messages", () -> verify(shutdownService).Shutdown(1), - () -> Assertions.assertTrue(logWatcher.list.get(1).getFormattedMessage().contains("all salts are expired")), - () -> Assertions.assertTrue(logWatcher.list.get(2).getFormattedMessage().contains("salts have been in expired state for too long. shutting down operator")), + () -> Assertions + .assertTrue(logWatcher.list.get(1).getFormattedMessage().contains("all salts are expired")), + () -> Assertions.assertTrue(logWatcher.list.get(2).getFormattedMessage() + .contains("salts have been in expired state for too long. shutting down operator")), () -> Assertions.assertEquals(3, logWatcher.list.size())); testContext.completeNow(); @@ -166,4 +173,97 @@ void saltsLogErrorAtInterval(VertxTestContext testContext) { testContext.completeNow(); } + + @Test + void shutdownOnKeysetKeyFailedTooLong(VertxTestContext testContext) { + ListAppender logWatcher = new ListAppender<>(); + logWatcher.start(); + ((Logger) LoggerFactory.getLogger(OperatorShutdownHandler.class)).addAppender(logWatcher); + + this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); + Assertions.assertTrue(logWatcher.list.get(0).getFormattedMessage().contains("keyset keys sync failing")); + Assertions + .assertTrue(logWatcher.list.get(1).getFormattedMessage().contains("keyset keys sync started failing")); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(7, ChronoUnit.DAYS).plusSeconds(60)); + + Assertions.assertThrows(RuntimeException.class, () -> { + this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); + }); + + Assertions.assertAll("Keyset Key Failure Log Messages", + () -> verify(shutdownService).Shutdown(1), + () -> Assertions + .assertTrue(logWatcher.list.get(2).getFormattedMessage().contains("keyset keys sync failing")), + () -> Assertions.assertTrue(logWatcher.list.get(3).getFormattedMessage() + .contains("keyset keys have been failing to sync for too long. shutting down operator")), + () -> Assertions.assertEquals(4, logWatcher.list.size())); + + testContext.completeNow(); + } + + @Test + void keysetKeyRecoverOnSuccess(VertxTestContext testContext) { + ListAppender logWatcher = new ListAppender<>(); + logWatcher.start(); + ((Logger) LoggerFactory.getLogger(OperatorShutdownHandler.class)).addAppender(logWatcher); + + this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); + Assertions.assertTrue(logWatcher.list.get(0).getFormattedMessage().contains("keyset keys sync failing")); + Assertions + .assertTrue(logWatcher.list.get(1).getFormattedMessage().contains("keyset keys sync started failing")); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(3, ChronoUnit.DAYS)); + this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(true); + Assertions.assertTrue(logWatcher.list.get(2).getFormattedMessage().contains("keyset keys sync recovered")); + Assertions.assertTrue(logWatcher.list.get(2).getFormattedMessage().contains("shutdown timer reset")); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(8, ChronoUnit.DAYS)); + assertDoesNotThrow(() -> { + this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); + }); + + verify(shutdownService, never()).Shutdown(anyInt()); + testContext.completeNow(); + } + + @Test + void keysetKeyLogErrorAtInterval(VertxTestContext testContext) { + ListAppender logWatcher = new ListAppender<>(); + logWatcher.start(); + ((Logger) LoggerFactory.getLogger(OperatorShutdownHandler.class)).addAppender(logWatcher); + + this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); + Assertions.assertTrue(logWatcher.list.get(0).getFormattedMessage().contains("keyset keys sync failing")); + Assertions + .assertTrue(logWatcher.list.get(1).getFormattedMessage().contains("keyset keys sync started failing")); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(9, ChronoUnit.MINUTES)); + this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); + Assertions.assertEquals(2, logWatcher.list.size()); // No new logs within 10 min interval + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(11, ChronoUnit.MINUTES)); + this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); + Assertions.assertTrue(logWatcher.list.get(2).getFormattedMessage().contains("keyset keys sync failing")); + Assertions.assertEquals(3, logWatcher.list.size()); // One new log after 10 min interval + + testContext.completeNow(); + } + + @Test + void keysetKeyNoShutdownWhenAlwaysSuccessful(VertxTestContext testContext) { + ListAppender logWatcher = new ListAppender<>(); + logWatcher.start(); + ((Logger) LoggerFactory.getLogger(OperatorShutdownHandler.class)).addAppender(logWatcher); + + this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(true); + when(clock.instant()).thenAnswer(i -> Instant.now().plus(1, ChronoUnit.HOURS)); + this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(true); + when(clock.instant()).thenAnswer(i -> Instant.now().plus(3, ChronoUnit.HOURS)); + this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(true); + + Assertions.assertEquals(0, logWatcher.list.size()); + verify(shutdownService, never()).Shutdown(anyInt()); + testContext.completeNow(); + } }