diff --git a/changelog.d/23922_websocket_source_reconnect.fix.md b/changelog.d/23922_websocket_source_reconnect.fix.md new file mode 100644 index 0000000000000..a70b859079d02 --- /dev/null +++ b/changelog.d/23922_websocket_source_reconnect.fix.md @@ -0,0 +1,3 @@ +Fixed the `websocket` source entering a "zombie" state when the `connect_timeout_secs` threshold was reached with multiple sources running. The connection timeout is now applied per connect attempt with indefinite retries, rather than as a total timeout limit. + +authors: benjamin-awd diff --git a/src/common/websocket.rs b/src/common/websocket.rs index 7abb8c589cec6..dab85c0a922be 100644 --- a/src/common/websocket.rs +++ b/src/common/websocket.rs @@ -40,6 +40,8 @@ pub enum WebSocketError { DnsError { source: dns::DnsError }, #[snafu(display("No addresses returned."))] NoAddresses, + #[snafu(display("Connection attempt timed out"))] + ConnectionTimedOut, } #[derive(Clone)] @@ -138,6 +140,41 @@ impl WebSocketConnector { } } + /// Connects with exponential backoff, applying a timeout to each individual connection attempt. + /// This will retry forever until a connection is established. + pub(crate) async fn connect_backoff_with_timeout( + &self, + timeout_duration: Duration, + ) -> WebSocketStream> { + let mut backoff = ExponentialBackoff::default(); + + loop { + match time::timeout(timeout_duration, self.connect()).await { + Ok(Ok(ws_stream)) => { + emit!(WebSocketConnectionEstablished {}); + return ws_stream; + } + Ok(Err(error)) => { + emit!(WebSocketConnectionFailedError { + error: Box::new(error) + }); + } + Err(_) => { + emit!(WebSocketConnectionFailedError { + error: Box::new(WebSocketError::ConnectionTimedOut), + }); + } + } + + time::sleep( + backoff + .next() + .expect("backoff iterator always returns some value"), + ) + .await; + } + } + #[cfg(feature = "sinks-websocket")] pub(crate) async fn healthcheck(&self) -> crate::Result<()> { self.connect().await.map(|_| ()).map_err(Into::into) diff --git a/src/internal_events/websocket.rs b/src/internal_events/websocket.rs index f08453f0f3585..f4d1cc1553541 100644 --- a/src/internal_events/websocket.rs +++ b/src/internal_events/websocket.rs @@ -45,6 +45,7 @@ impl InternalEvent for WebSocketConnectionFailedError { ); counter!( "component_errors_total", + "protocol" => PROTOCOL, "error_code" => "websocket_connection_failed", "error_type" => error_type::CONNECTION_FAILED, "stage" => error_stage::SENDING, @@ -232,6 +233,7 @@ impl InternalEvent for WebSocketSendError<'_> { ); counter!( "component_errors_total", + "protocol" => PROTOCOL, "error_code" => "websocket_send_error", "error_type" => error_type::CONNECTION_FAILED, "stage" => error_stage::PROCESSING, diff --git a/src/sources/websocket/source.rs b/src/sources/websocket/source.rs index da515f21f3e37..cf3c75d40f534 100644 --- a/src/sources/websocket/source.rs +++ b/src/sources/websocket/source.rs @@ -21,10 +21,9 @@ use crate::{ common::websocket::{PingInterval, WebSocketConnector, is_closed}, config::SourceContext, internal_events::{ - ConnectionOpen, OpenGauge, PROTOCOL, WebSocketBytesReceived, WebSocketConnectionError, - WebSocketConnectionEstablished, WebSocketConnectionFailedError, - WebSocketConnectionShutdown, WebSocketKind, WebSocketMessageReceived, - WebSocketReceiveError, WebSocketSendError, + ConnectionOpen, OpenGauge, PROTOCOL, WebSocketBytesReceived, + WebSocketConnectionFailedError, WebSocketConnectionShutdown, WebSocketKind, + WebSocketMessageReceived, WebSocketReceiveError, WebSocketSendError, }, sources::websocket::config::WebSocketConfig, vector_lib::codecs::StreamDecodingError, @@ -297,23 +296,12 @@ impl WebSocketSource { async fn try_create_sink_and_stream( &self, ) -> Result<(WebSocketSink, WebSocketStream), WebSocketSourceError> { - let connect_future = self.params.connector.connect_backoff(); - let timeout = self.config.connect_timeout_secs; - - let ws_stream = match time::timeout(timeout, connect_future).await { - Ok(ws) => ws, - Err(_) => { - emit!(WebSocketConnectionError { - error: TungsteniteError::Io(std::io::Error::new( - std::io::ErrorKind::TimedOut, - "Connection attempt timed out", - )) - }); - return Err(WebSocketSourceError::ConnectTimeout); - } - }; + let ws_stream = self + .params + .connector + .connect_backoff_with_timeout(self.config.connect_timeout_secs) + .await; - emit!(WebSocketConnectionEstablished {}); let (sink, stream) = ws_stream.split(); Ok((Box::pin(sink), Box::pin(stream)))