Skip to content

Commit

Permalink
fix: respect dial signal and expose protocol negotiation timeouts (#2956
Browse files Browse the repository at this point in the history
)

Only fall back to the configured upgrade timeout if no abort signal
was passed to the dial, otherwise let the user decide when to cancel.

Also expose the inbound/outbound protocol negotiation timeout settings.
  • Loading branch information
achingbrain authored Feb 13, 2025
1 parent 7655833 commit f9345a7
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 16 deletions.
19 changes: 19 additions & 0 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ export interface ConnectionManagerInit {
* When a new outbound connection is opened, the upgrade process (e.g.
* protect, encrypt, multiplex etc) must complete within this number of ms.
*
* Does not apply if an abort signal is passed to the `.dial` method.
*
* @default 3000
*/
outboundUpgradeTimeout?: number
Expand All @@ -83,9 +85,26 @@ export interface ConnectionManagerInit {
* Protocol negotiation must complete within this number of ms
*
* @default 2000
* @deprecated use outboundStreamProtocolNegotiationTimeout or inboundStreamProtocolNegotiationTimeout instead
*/
protocolNegotiationTimeout?: number

/**
* Outbound protocol negotiation must complete within this number of ms.
*
* Does not apply if an abort signal is passed to the `.dial` method.
*
* @default 2000
*/
outboundStreamProtocolNegotiationTimeout?: number

/**
* Inbound protocol negotiation must complete within this number of ms
*
* @default 2000
*/
inboundStreamProtocolNegotiationTimeout?: number

/**
* Multiaddr resolvers to use when dialling
*/
Expand Down
5 changes: 4 additions & 1 deletion packages/libp2p/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter
public components: Components & T
private readonly log: Logger

// eslint-disable-next-line complexity
constructor (init: Libp2pInit<T> & { peerId: PeerId }) {
super()

Expand Down Expand Up @@ -116,7 +117,9 @@ export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter
connectionEncrypters: (init.connectionEncrypters ?? []).map((fn, index) => this.configureComponent(`connection-encryption-${index}`, fn(this.components))),
streamMuxers: (init.streamMuxers ?? []).map((fn, index) => this.configureComponent(`stream-muxers-${index}`, fn(this.components))),
inboundUpgradeTimeout: init.connectionManager?.inboundUpgradeTimeout,
outboundUpgradeTimeout: init.connectionManager?.outboundUpgradeTimeout
outboundUpgradeTimeout: init.connectionManager?.outboundUpgradeTimeout,
inboundStreamProtocolNegotiationTimeout: init.connectionManager?.inboundStreamProtocolNegotiationTimeout ?? init.connectionManager?.protocolNegotiationTimeout,
outboundStreamProtocolNegotiationTimeout: init.connectionManager?.outboundStreamProtocolNegotiationTimeout ?? init.connectionManager?.protocolNegotiationTimeout
})

// Setup the transport manager
Expand Down
24 changes: 9 additions & 15 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { InvalidMultiaddrError, TooManyInboundProtocolStreamsError, TooManyOutboundProtocolStreamsError, LimitedConnectionError, setMaxListeners, InvalidPeerIdError } from '@libp2p/interface'
import * as mss from '@libp2p/multistream-select'
import { peerIdFromString } from '@libp2p/peer-id'
import { anySignal } from 'any-signal'
import { CustomProgressEvent } from 'progress-events'
import { createConnection } from './connection/index.js'
import { PROTOCOL_NEGOTIATION_TIMEOUT, UPGRADE_TIMEOUT } from './connection-manager/constants.js'
Expand Down Expand Up @@ -252,10 +251,13 @@ export class DefaultUpgrader implements Upgrader {
let muxerFactory: StreamMuxerFactory | undefined
let cryptoProtocol

const upgradeTimeoutSignal = AbortSignal.timeout(direction === 'inbound' ? this.inboundUpgradeTimeout : this.outboundUpgradeTimeout)
const signal = anySignal([upgradeTimeoutSignal, opts.signal])
setMaxListeners(Infinity, upgradeTimeoutSignal, signal)
opts.signal = signal
if (opts.signal == null) {
maConn.log('no abort signal was passed while trying to upgrade connection, falling back to default timeout')

const upgradeTimeoutSignal = AbortSignal.timeout(direction === 'inbound' ? this.inboundUpgradeTimeout : this.outboundUpgradeTimeout)
setMaxListeners(Infinity, upgradeTimeoutSignal)
opts.signal = upgradeTimeoutSignal
}

this.components.metrics?.trackMultiaddrConnection(maConn)

Expand Down Expand Up @@ -284,14 +286,8 @@ export class DefaultUpgrader implements Upgrader {
remotePeer,
protocol: cryptoProtocol
} = await (direction === 'inbound'
? this._encryptInbound(protectedConn, {
...opts,
signal
})
: this._encryptOutbound(protectedConn, {
...opts,
signal
})
? this._encryptInbound(protectedConn, opts)
: this._encryptOutbound(protectedConn, opts)
))

const maConn: MultiaddrConnection = {
Expand Down Expand Up @@ -343,8 +339,6 @@ export class DefaultUpgrader implements Upgrader {
} catch (err: any) {
maConn.log.error('failed to upgrade inbound connection %s %a - %e', direction === 'inbound' ? 'from' : 'to', maConn.remoteAddr, err)
throw err
} finally {
signal.clear()
}

await this.shouldBlockConnection(direction === 'inbound' ? 'denyInboundUpgradedConnection' : 'denyOutboundUpgradedConnection', remotePeer, maConn)
Expand Down

0 comments on commit f9345a7

Please sign in to comment.