diff --git a/collector/lib/CollectorConfig.cpp b/collector/lib/CollectorConfig.cpp index 9170483e14..f1f367735b 100644 --- a/collector/lib/CollectorConfig.cpp +++ b/collector/lib/CollectorConfig.cpp @@ -82,6 +82,8 @@ PathEnvVar tls_client_cert_path("ROX_COLLECTOR_TLS_CLIENT_CERT"); PathEnvVar tls_client_key_path("ROX_COLLECTOR_TLS_CLIENT_KEY"); BoolEnvVar disable_process_arguments("ROX_COLLECTOR_NO_PROCESS_ARGUMENTS", false); + +IntEnvVar grpc_wait_time("ROX_COLLECTOR_GRPC_WAIT_TIME", 30); } // namespace constexpr bool CollectorConfig::kTurnOffScrape; @@ -113,6 +115,7 @@ void CollectorConfig::InitCollectorConfig(CollectorArgs* args) { enable_introspection_ = enable_introspection.value(); track_send_recv_ = track_send_recv.value(); disable_process_arguments_ = disable_process_arguments.value(); + grpc_wait_time_ = grpc_wait_time.value(); for (const auto& syscall : kSyscalls) { syscalls_.emplace_back(syscall); diff --git a/collector/lib/CollectorConfig.h b/collector/lib/CollectorConfig.h index 0bb8eaeb04..f9bbccf8b3 100644 --- a/collector/lib/CollectorConfig.h +++ b/collector/lib/CollectorConfig.h @@ -147,6 +147,7 @@ class CollectorConfig { unsigned int GetSinspTotalBufferSize() const { return sinsp_total_buffer_size_; } unsigned int GetSinspThreadCacheSize() const { return sinsp_thread_cache_size_; } bool DisableProcessArguments() const { return disable_process_arguments_; } + int GRPCWaitTime() const { return grpc_wait_time_; } static std::pair CheckConfiguration(const char* config, Json::Value* root); @@ -207,6 +208,7 @@ class CollectorConfig { double connection_stats_error_; unsigned int connection_stats_window_; int64_t per_container_rate_limit_ = 1024; + int grpc_wait_time_; // URL to the GRPC server std::optional grpc_server_; diff --git a/collector/lib/NetworkStatusNotifier.cpp b/collector/lib/NetworkStatusNotifier.cpp index 27efaa6ade..c2fcfcb2d2 100644 --- a/collector/lib/NetworkStatusNotifier.cpp +++ b/collector/lib/NetworkStatusNotifier.cpp @@ -150,7 +150,8 @@ void NetworkStatusNotifier::Stop() { void NetworkStatusNotifier::WaitUntilWriterStarted(IDuplexClientWriter* writer, int wait_time_seconds) { if (!writer->WaitUntilStarted(std::chrono::seconds(wait_time_seconds))) { - CLOG(ERROR) << "Failed to establish network connection info stream."; + CLOG(ERROR) << "Unable to establish network connection info stream."; + CLOG(FATAL) << "Failed to communicate with Sensor."; return; } @@ -218,7 +219,7 @@ bool NetworkStatusNotifier::UpdateAllConnsAndEndpoints() { } void NetworkStatusNotifier::RunSingle(IDuplexClientWriter* writer) { - WaitUntilWriterStarted(writer, 10); + WaitUntilWriterStarted(writer, config_.GRPCWaitTime()); ConnMap old_conn_state; AdvertisedEndpointMap old_cep_state; diff --git a/collector/lib/SignalServiceClient.cpp b/collector/lib/SignalServiceClient.cpp index e43c7e1f95..3475f2bb42 100644 --- a/collector/lib/SignalServiceClient.cpp +++ b/collector/lib/SignalServiceClient.cpp @@ -29,8 +29,8 @@ bool SignalServiceClient::EstablishGRPCStreamSingle() { // stream writer context_ = MakeUnique(); writer_ = DuplexClient::CreateWithReadsIgnored(&SignalService::Stub::AsyncPushSignals, channel_, context_.get()); - if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) { - CLOG(ERROR) << "Signal stream not ready after 30 seconds. Retrying ..."; + if (!writer_->WaitUntilStarted(std::chrono::seconds(config_.GRPCWaitTime()))) { + CLOG(ERROR) << "Signal stream not ready after " << config_.GRPCWaitTime() << " seconds. Retrying ..."; CLOG(ERROR) << "Error message: " << writer_->FinishNow().error_message(); writer_.reset(); return true; diff --git a/collector/lib/SignalServiceClient.h b/collector/lib/SignalServiceClient.h index 668989f609..2dade9829d 100644 --- a/collector/lib/SignalServiceClient.h +++ b/collector/lib/SignalServiceClient.h @@ -13,6 +13,7 @@ #include "api/v1/signal.pb.h" #include "internalapi/sensor/signal_iservice.grpc.pb.h" +#include "CollectorConfig.h" #include "DuplexGRPC.h" #include "SignalHandler.h" #include "StoppableThread.h" @@ -35,8 +36,8 @@ class SignalServiceClient : public ISignalServiceClient { using SignalService = sensor::SignalService; using SignalStreamMessage = sensor::SignalStreamMessage; - explicit SignalServiceClient(std::shared_ptr channel) - : channel_(std::move(channel)), stream_active_(false) {} + explicit SignalServiceClient(std::shared_ptr channel, CollectorConfig& config) + : channel_(std::move(channel)), stream_active_(false), config_(config) {} void Start(); void Stop(); @@ -57,6 +58,8 @@ class SignalServiceClient : public ISignalServiceClient { std::unique_ptr context_; std::unique_ptr> writer_; + CollectorConfig& config_; + bool first_write_; }; diff --git a/collector/lib/system-inspector/Service.cpp b/collector/lib/system-inspector/Service.cpp index f0867df6d6..2de41aa0ea 100644 --- a/collector/lib/system-inspector/Service.cpp +++ b/collector/lib/system-inspector/Service.cpp @@ -52,7 +52,7 @@ void Service::Init(const CollectorConfig& config, std::shared_ptr