Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix!: rename "transient" connections to "limited"
Browse files Browse the repository at this point in the history
To better align with [[email protected]](https://github.com/libp2p/go-libp2p/releases/tag/v0.34.0)
rename "transient" connections to "limited".

BREAKING CHANGE: There are three breaking API changes:
  * Connections have an optional `.limits` property
  * The `runOnTransientConnection` property of `libp2p.handle` and `libp2p.dialProtocol` has been renamed to `runOnLimitedConnection`
  * The `notifyOnTransient` property of `libp2p.register` has been renamed `notifyOnLimitedConnection`

Refs: #2622
achingbrain committed Jul 31, 2024
1 parent 944935f commit f24a06e
Showing 34 changed files with 160 additions and 111 deletions.
30 changes: 15 additions & 15 deletions packages/integration-tests/test/circuit-relay.node.ts
Original file line number Diff line number Diff line change
@@ -90,7 +90,7 @@ const echoService = (components: EchoServiceComponents): unknown => {
stream, stream
)
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})
},
stop () {}
@@ -560,7 +560,7 @@ describe('circuit-relay', () => {
// open hop stream and try to connect to remote
const stream = await local.dialProtocol(ma, RELAY_V2_HOP_CODEC, {
runOnTransientConnection: true
runOnLimitedConnection: true
})
const hopStream = pbStream(stream).pb(HopMessage)
@@ -671,7 +671,7 @@ describe('circuit-relay', () => {
// connection from local through relay should be marked transient
const connections = remote.getConnections(local.peerId)
expect(connections).to.have.lengthOf(1)
expect(connections).to.have.nested.property('[0].transient', true)
expect(connections).to.have.nested.property('[0].limits').that.is.not.null()
})

it('should not open streams on a transient connection', async () => {
@@ -697,7 +697,7 @@ describe('circuit-relay', () => {
await remote.handle(protocol, ({ stream }) => {
void pipe(stream, stream)
}, {
runOnTransientConnection: false
runOnLimitedConnection: false
})

// discover relay and make reservation
@@ -712,7 +712,7 @@ describe('circuit-relay', () => {
expect(connection).to.have.property('transient', true)

await expect(connection.newStream('/my-protocol/1.0.0', {
runOnTransientConnection: false
runOnLimitedConnection: false
}))
.to.eventually.be.rejected.with.property('code', 'ERR_TRANSIENT_CONNECTION')
})
@@ -724,7 +724,7 @@ describe('circuit-relay', () => {
await remote.handle(protocol, ({ stream }) => {
void pipe(stream, stream)
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// discover relay and make reservation
@@ -739,7 +739,7 @@ describe('circuit-relay', () => {
expect(connection).to.have.property('transient', true)

await expect(connection.newStream('/my-protocol/1.0.0', {
runOnTransientConnection: true
runOnLimitedConnection: true
}))
.to.eventually.be.ok()
})
@@ -912,15 +912,15 @@ describe('circuit-relay', () => {
} catch {}
})
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// dial the remote from the local through the relay
const ma = getRelayAddress(remote)

try {
const stream = await local.dialProtocol(ma, protocol, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

await stream.sink(async function * () {
@@ -1056,7 +1056,7 @@ describe('circuit-relay', () => {
const ma = getRelayAddress(remote)

const stream = await local.dialProtocol(ma, ECHO_PROTOCOL, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// write more than the default data limit
@@ -1075,7 +1075,7 @@ describe('circuit-relay', () => {
const ma = getRelayAddress(remote)

const stream = await local.dialProtocol(ma, ECHO_PROTOCOL, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

let finished = false
@@ -1107,21 +1107,21 @@ describe('circuit-relay', () => {
expect(finish - start).to.be.greaterThan(defaultDurationLimit)
})

it('should not mark an outgoing connection as transient', async () => {
it('should not mark an outgoing connection as limited', async () => {
const ma = getRelayAddress(remote)

const connection = await local.dial(ma)
expect(connection).to.have.property('transient', false)
expect(connection).to.not.have.property('limits')
})

it('should not mark an incoming connection as transient', async () => {
it('should not mark an incoming connection as limited', async () => {
const ma = getRelayAddress(remote)

await local.dial(ma)

const connections = remote.getConnections(local.peerId)
expect(connections).to.have.lengthOf(1)
expect(connections).to.have.nested.property('[0].transient', false)
expect(connections).to.not.have.nested.property('[0].limits')
})
})
})
10 changes: 5 additions & 5 deletions packages/integration-tests/test/dcutr.node.ts
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ describe('dcutr', () => {
async function waitForOnlyDirectConnections (): Promise<void> {
await pRetry(async () => {
const connections = libp2pA.getConnections(libp2pB.peerId)
const onlyDirect = connections.filter(conn => !conn.transient)
const onlyDirect = connections.filter(conn => conn.limits == null)

if (onlyDirect.length === connections.length) {
// all connections are direct
@@ -109,8 +109,8 @@ describe('dcutr', () => {
const relayedAddress = multiaddr(`/ip4/127.0.0.1/tcp/${RELAY_PORT}/p2p/${relay.peerId}/p2p-circuit/p2p/${libp2pB.peerId}`)
const connection = await libp2pA.dial(relayedAddress)

// connection should be transient
expect(connection).to.have.property('transient', true)
// connection should be limited
expect(connection).to.have.property('limited', true)

// wait for DCUtR unilateral upgrade
await waitForOnlyDirectConnections()
@@ -166,8 +166,8 @@ describe('dcutr', () => {
const relayedAddress = multiaddr(`/ip4/127.0.0.1/tcp/${RELAY_PORT}/p2p/${relay.peerId}/p2p-circuit/p2p/${libp2pB.peerId}`)
const connection = await libp2pA.dial(relayedAddress)

// connection should be transient
expect(connection).to.have.property('transient', true)
// connection should be limited
expect(connection).to.have.property('limited', true)

// wait for DCUtR unilateral upgrade
await waitForOnlyDirectConnections()
2 changes: 1 addition & 1 deletion packages/integration-tests/test/ping.spec.ts
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ describe('ping', () => {
stream
)
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

const latency = await nodes[0].services.ping.ping(nodes[1].getMultiaddrs())
5 changes: 2 additions & 3 deletions packages/interface-compliance-tests/src/mocks/connection.ts
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ import { Uint8ArrayList } from 'uint8arraylist'
import { mockMultiaddrConnection } from './multiaddr-connection.js'
import { mockMuxer } from './muxer.js'
import { mockRegistrar } from './registrar.js'
import type { AbortOptions, ComponentLogger, Logger, MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus, PeerId, StreamMuxer, StreamMuxerFactory, NewStreamOptions } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, Logger, MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus, PeerId, StreamMuxer, StreamMuxerFactory, NewStreamOptions, ConnectionLimits } from '@libp2p/interface'
import type { Registrar } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Duplex, Source } from 'it-stream-types'
@@ -41,7 +41,7 @@ class MockConnection implements Connection {
public status: ConnectionStatus
public streams: Stream[]
public tags: string[]
public transient: boolean
public limits?: ConnectionLimits
public log: Logger

private readonly muxer: StreamMuxer
@@ -64,7 +64,6 @@ class MockConnection implements Connection {
this.tags = []
this.muxer = muxer
this.maConn = maConn
this.transient = false
this.logger = logger
this.log = logger.forComponent(this.id)
}
6 changes: 4 additions & 2 deletions packages/interface-internal/src/registrar/index.ts
Original file line number Diff line number Diff line change
@@ -30,9 +30,11 @@ export interface StreamHandlerOptions {
/**
* If true, allow this protocol to run on limited connections (e.g.
* connections with data or duration limits such as circuit relay
* connections) (default: false)
* connections)
*
* @default false
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean
}

export interface StreamHandlerRecord {
24 changes: 16 additions & 8 deletions packages/interface/src/connection/index.ts
Original file line number Diff line number Diff line change
@@ -185,12 +185,16 @@ export interface NewStreamOptions extends AbortOptions {
maxOutboundStreams?: number

/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
* Opt-in to running over a limited connection - one that has restrictions
* on the amount of data that may be transferred or how long it may be open for.
*
* These limits are typically enforced by a relay server, if the protocol
* will be transferring a lot of data or the stream will be open for a long time
* consider upgrading to a direct connection before opening the stream.
*
* @default false
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean

/**
* By default when negotiating a protocol the dialer writes then protocol name
@@ -222,6 +226,11 @@ export interface NewStreamOptions extends AbortOptions {

export type ConnectionStatus = 'open' | 'closing' | 'closed'

export interface ConnectionLimits {
bytes?: number
seconds?: number
}

/**
* A Connection is a high-level representation of a connection
* to a remote peer that may have been secured by encryption and
@@ -280,12 +289,11 @@ export interface Connection {
status: ConnectionStatus

/**
* A transient connection is one that is not expected to be open for very long
* or one that cannot transfer very much data, such as one being used as a
* circuit relay connection. Protocols need to explicitly opt-in to being run
* over transient connections.
* If present, this connection has limits applied to it, perhaps by an
* intermediate relay. Once the limits have been reached the connection will
* be closed by the relay.
*/
transient: boolean
limits?: ConnectionLimits

/**
* Create a new stream on this connection and negotiate one of the passed protocols
2 changes: 1 addition & 1 deletion packages/interface/src/index.ts
Original file line number Diff line number Diff line change
@@ -331,7 +331,7 @@ export interface IsDialableOptions extends AbortOptions {
* because that protocol would not be allowed to run over a data/time limited
* connection.
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean
}

export type TransportManagerDialProgressEvents =
6 changes: 3 additions & 3 deletions packages/interface/src/stream-handler/index.ts
Original file line number Diff line number Diff line change
@@ -21,10 +21,10 @@ export interface StreamHandlerOptions {
maxOutboundStreams?: number

/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
* Opt-in to running over connections with limits on how much data can be
* transferred or how long it can be open for.
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean
}

export interface StreamHandlerRecord {
7 changes: 4 additions & 3 deletions packages/interface/src/topology/index.ts
Original file line number Diff line number Diff line change
@@ -27,12 +27,13 @@ export interface Topology {
filter?: TopologyFilter

/**
* If true, invoke `onConnect` for this topology on transient (e.g. short-lived
* and/or data-limited) connections
* If true, invoke `onConnect` for this topology on limited connections, e.g.
* ones with limits on how much data can be transferred or how long they can
* be open for.
*
* @default false
*/
notifyOnTransient?: boolean
notifyOnLimitedConnection?: boolean

/**
* Invoked when a new connection is opened to a peer that supports the
9 changes: 2 additions & 7 deletions packages/interface/src/transport/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Connection, MultiaddrConnection } from '../connection/index.js'
import type { Connection, ConnectionLimits, MultiaddrConnection } from '../connection/index.js'
import type { TypedEventTarget } from '../event-target.js'
import type { AbortOptions } from '../index.js'
import type { StreamMuxerFactory } from '../stream-muxer/index.js'
@@ -104,12 +104,7 @@ export interface UpgraderOptions<ConnectionUpgradeEvents extends ProgressEvent =
skipEncryption?: boolean
skipProtection?: boolean
muxerFactory?: StreamMuxerFactory

/**
* The passed MultiaddrConnection has limits place on duration and/or data
* transfer amounts so is not expected to be open for very long.
*/
transient?: boolean
limits?: ConnectionLimits
}

export type InboundConnectionUpgradeEvents =
2 changes: 1 addition & 1 deletion packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
@@ -471,7 +471,7 @@ export class DialQueue {
try {
const addresses = await this.calculateMultiaddrs(undefined, new Set(multiaddr.map(ma => ma.toString())), options)

if (options.runOnTransientConnection === false) {
if (options.runOnLimitedConnection === false) {
// return true if any resolved multiaddrs are not relay addresses
return addresses.find(addr => {
return !Circuit.matches(addr.multiaddr)
4 changes: 2 additions & 2 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
@@ -505,10 +505,10 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
if (peerId != null && options.force !== true) {
this.log('dial %p', peerId)
const existingConnection = this.getConnections(peerId)
.find(conn => !conn.transient)
.find(conn => conn.limits == null)

if (existingConnection != null) {
this.log('had an existing non-transient connection to %p', peerId)
this.log('had an existing non-limited connection to %p', peerId)

options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
return existingConnection
10 changes: 5 additions & 5 deletions packages/libp2p/src/connection/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { connectionSymbol, CodeError, setMaxListeners } from '@libp2p/interface'
import type { AbortOptions, Logger, ComponentLogger, Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions, PeerId } from '@libp2p/interface'
import type { AbortOptions, Logger, ComponentLogger, Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions, PeerId, ConnectionLimits } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'

const CLOSE_TIMEOUT = 500
@@ -16,7 +16,7 @@ interface ConnectionInit {
timeline: ConnectionTimeline
multiplexer?: string
encryption?: string
transient?: boolean
limits?: ConnectionLimits
logger: ComponentLogger
}

@@ -45,7 +45,7 @@ export class ConnectionImpl implements Connection {
public multiplexer?: string
public encryption?: string
public status: ConnectionStatus
public transient: boolean
public limits?: ConnectionLimits
public readonly log: Logger

/**
@@ -86,7 +86,7 @@ export class ConnectionImpl implements Connection {
this.timeline = init.timeline
this.multiplexer = init.multiplexer
this.encryption = init.encryption
this.transient = init.transient ?? false
this.limits = init.limits
this.log = init.logger.forComponent(`libp2p:connection:${this.direction}:${this.id}`)

if (this.remoteAddr.getPeerId() == null) {
@@ -127,7 +127,7 @@ export class ConnectionImpl implements Connection {
protocols = [protocols]
}

if (this.transient && options?.runOnTransientConnection !== true) {
if (this.limits != null && options?.runOnLimitedConnection !== true) {
throw new CodeError('Cannot open protocol stream on transient connection', 'ERR_TRANSIENT_CONNECTION')
}

2 changes: 1 addition & 1 deletion packages/libp2p/src/registrar.ts
Original file line number Diff line number Diff line change
@@ -230,7 +230,7 @@ export class DefaultRegistrar implements Registrar {
}

for (const topology of topologies.values()) {
if (connection.transient && topology.notifyOnTransient !== true) {
if (connection.limits != null && topology.notifyOnLimitedConnection !== true) {
continue
}

14 changes: 7 additions & 7 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ import { createConnection } from './connection/index.js'
import { INBOUND_UPGRADE_TIMEOUT } from './connection-manager/constants.js'
import { codes } from './errors.js'
import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js'
import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions } from '@libp2p/interface'
import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions, ConnectionLimits } from '@libp2p/interface'
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'

const DEFAULT_PROTOCOL_SELECT_TIMEOUT = 30000
@@ -18,7 +18,7 @@ interface CreateConnectionOptions {
upgradedConn: MultiaddrConnection
remotePeer: PeerId
muxerFactory?: StreamMuxerFactory
transient?: boolean
limits?: ConnectionLimits
}

interface OnStreamOptions {
@@ -243,7 +243,7 @@ export class DefaultUpgrader implements Upgrader {
upgradedConn,
muxerFactory,
remotePeer,
transient: opts?.transient
limits: opts?.limits
})
} finally {
signal.removeEventListener('abort', onAbort)
@@ -342,7 +342,7 @@ export class DefaultUpgrader implements Upgrader {
upgradedConn,
muxerFactory,
remotePeer,
transient: opts?.transient
limits: opts?.limits
})
}

@@ -357,7 +357,7 @@ export class DefaultUpgrader implements Upgrader {
upgradedConn,
remotePeer,
muxerFactory,
transient
limits
} = opts

let muxer: StreamMuxer | undefined
@@ -578,7 +578,7 @@ export class DefaultUpgrader implements Upgrader {
timeline: maConn.timeline,
multiplexer: muxer?.protocol,
encryption: cryptoProtocol,
transient,
limits,
logger: this.components.logger,
newStream: newStream ?? errConnectionNotMultiplexed,
getStreams: () => { if (muxer != null) { return muxer.streams } else { return [] } },
@@ -617,7 +617,7 @@ export class DefaultUpgrader implements Upgrader {
const { connection, stream, protocol } = opts
const { handler, options } = this.components.registrar.getHandler(protocol)

if (connection.transient && options.runOnTransientConnection !== true) {
if (connection.limits != null && options.runOnLimitedConnection !== true) {
throw new CodeError('Cannot open protocol stream on transient connection', 'ERR_TRANSIENT_CONNECTION')
}

4 changes: 3 additions & 1 deletion packages/libp2p/test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
@@ -527,7 +527,9 @@ describe('Connection Manager', () => {
const addr = multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/${targetPeer}`)

const existingConnection = stubInterface<Connection>({
transient: true
limits: {
bytes: 100
}
})
const newConnection = stubInterface<Connection>()

6 changes: 3 additions & 3 deletions packages/libp2p/test/core/core.spec.ts
Original file line number Diff line number Diff line change
@@ -57,15 +57,15 @@ describe('core', () => {
})

await expect(libp2p.isDialable(multiaddr('/dns4/example.com/tls/ws'), {
runOnTransientConnection: false
runOnLimitedConnection: false
})).to.eventually.be.true()

await expect(libp2p.isDialable(multiaddr('/dns4/example.com/tls/ws/p2p/12D3KooWSExt8hTzoaHEhn435BTK6BPNSY1LpTc1j2o9Gw53tXE1/p2p-circuit/p2p/12D3KooWSExt8hTzoaHEhn435BTK6BPNSY1LpTc1j2o9Gw53tXE2'), {
runOnTransientConnection: true
runOnLimitedConnection: true
})).to.eventually.be.true()

await expect(libp2p.isDialable(multiaddr('/dns4/example.com/tls/ws/p2p/12D3KooWSExt8hTzoaHEhn435BTK6BPNSY1LpTc1j2o9Gw53tXE1/p2p-circuit/p2p/12D3KooWSExt8hTzoaHEhn435BTK6BPNSY1LpTc1j2o9Gw53tXE2'), {
runOnTransientConnection: false
runOnLimitedConnection: false
})).to.eventually.be.false()
})
})
22 changes: 15 additions & 7 deletions packages/libp2p/test/registrar/registrar.spec.ts
Original file line number Diff line number Diff line change
@@ -221,7 +221,9 @@ describe('registrar topologies', () => {
const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId))

// connection is transient
conn.transient = true
conn.limits = {
bytes: 100
}

// return connection from connection manager
connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn])
@@ -265,14 +267,16 @@ describe('registrar topologies', () => {
const remotePeerId = await createEd25519PeerId()
const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId))

// connection is transient
conn.transient = true
// connection is limited
conn.limits = {
bytes: 100
}

// return connection from connection manager
connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn])

const topology: Topology = {
notifyOnTransient: true,
notifyOnLimitedConnection: true,
onConnect: () => {
onConnectDefer.resolve()
}
@@ -298,7 +302,7 @@ describe('registrar topologies', () => {
let callCount = 0

const topology: Topology = {
notifyOnTransient: true,
notifyOnLimitedConnection: true,
onConnect: () => {
callCount++

@@ -314,10 +318,14 @@ describe('registrar topologies', () => {
// setup connections before registrar
const remotePeerId = await createEd25519PeerId()
const transientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId))
transientConnection.transient = true
transientConnection.limits = {
bytes: 100
}

const nonTransientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId))
nonTransientConnection.transient = false
nonTransientConnection.limits = {
bytes: 100
}

// return connection from connection manager
connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([
2 changes: 1 addition & 1 deletion packages/protocol-dcutr/README.md
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ await node.dial(ma)
while (true) {
const connections = node.getConnections()

if (connections.find(conn => conn.transient === false)) {
if (connections.find(conn => conn.limits == null)) {
console.info('have direct connection')
break
} else {
11 changes: 6 additions & 5 deletions packages/protocol-dcutr/src/dcutr.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { CodeError, ERR_INVALID_MESSAGE, serviceDependencies } from '@libp2p/interface'
import { type Multiaddr, multiaddr } from '@multiformats/multiaddr'
import { Circuit } from '@multiformats/multiaddr-matcher'
import delay from 'delay'
import { pbStream } from 'it-protobuf-stream'
import { HolePunch } from './pb/message.js'
@@ -70,9 +71,9 @@ export class DefaultDCUtRService implements Startable {
// register for notifications of when peers that support DCUtR connect
// nb. requires the identify service to be enabled
this.topologyId = await this.registrar.register(multicodec, {
notifyOnTransient: true,
notifyOnLimitedConnection: true,
onConnect: (peerId, connection) => {
if (!connection.transient) {
if (!Circuit.exactMatch(connection.remoteAddr)) {
// the connection is already direct, no upgrade is required
return
}
@@ -97,7 +98,7 @@ export class DefaultDCUtRService implements Startable {
}, {
maxInboundStreams: this.maxInboundStreams,
maxOutboundStreams: this.maxOutboundStreams,
runOnTransientConnection: true
runOnLimitedConnection: true
})

this.started = true
@@ -140,7 +141,7 @@ export class DefaultDCUtRService implements Startable {
// 1. B opens a stream to A using the /libp2p/dcutr protocol.
stream = await relayedConnection.newStream([multicodec], {
signal: options.signal,
runOnTransientConnection: true
runOnLimitedConnection: true
})

const pb = pbStream(stream, {
@@ -256,7 +257,7 @@ export class DefaultDCUtRService implements Startable {
force: true
})

if (connection.transient) {
if (!Circuit.exactMatch(connection.remoteAddr)) {
throw new Error('Could not open a new, non-transient, connection')
}

2 changes: 1 addition & 1 deletion packages/protocol-dcutr/src/index.ts
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@
* while (true) {
* const connections = node.getConnections()
*
* if (connections.find(conn => conn.transient === false)) {
* if (connections.find(conn => conn.limits == null)) {
* console.info('have direct connection')
* break
* } else {
2 changes: 1 addition & 1 deletion packages/protocol-identify/src/identify-push.ts
Original file line number Diff line number Diff line change
@@ -82,7 +82,7 @@ export class IdentifyPush extends AbstractIdentify implements Startable, Identif
try {
stream = await connection.newStream(self.protocol, {
signal,
runOnTransientConnection: self.runOnTransientConnection
runOnLimitedConnection: self.runOnLimitedConnection
})

const pb = pbStream(stream, {
2 changes: 1 addition & 1 deletion packages/protocol-identify/src/identify.ts
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ export class Identify extends AbstractIdentify implements Startable, IdentifyInt
try {
stream = await connection.newStream(this.protocol, {
...options,
runOnTransientConnection: this.runOnTransientConnection
runOnLimitedConnection: this.runOnLimitedConnection
})

const pb = pbStream(stream, {
2 changes: 1 addition & 1 deletion packages/protocol-identify/src/index.ts
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ export interface IdentifyInit {
*
* @default true
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean

/**
* Whether to automatically run identify on newly opened connections
8 changes: 4 additions & 4 deletions packages/protocol-identify/src/utils.ts
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ export const defaultValues = {
maxMessageSize: MAX_IDENTIFY_MESSAGE_SIZE,
runOnConnectionOpen: true,
runOnSelfUpdate: true,
runOnTransientConnection: true,
runOnLimitedConnection: true,
concurrency: MAX_PUSH_CONCURRENCY
}

@@ -207,7 +207,7 @@ export abstract class AbstractIdentify implements Startable {
protected readonly maxMessageSize: number
protected readonly maxObservedAddresses: number
protected readonly events: TypedEventTarget<Libp2pEvents>
protected readonly runOnTransientConnection: boolean
protected readonly runOnLimitedConnection: boolean
protected readonly log: Logger

constructor (components: IdentifyComponents, init: AbstractIdentifyInit) {
@@ -225,7 +225,7 @@ export abstract class AbstractIdentify implements Startable {
this.maxOutboundStreams = init.maxOutboundStreams ?? defaultValues.maxOutboundStreams
this.maxMessageSize = init.maxMessageSize ?? defaultValues.maxMessageSize
this.maxObservedAddresses = init.maxObservedAddresses ?? defaultValues.maxObservedAddresses
this.runOnTransientConnection = init.runOnTransientConnection ?? defaultValues.runOnTransientConnection
this.runOnLimitedConnection = init.runOnLimitedConnection ?? defaultValues.runOnLimitedConnection

// Store self host metadata
this.host = {
@@ -257,7 +257,7 @@ export abstract class AbstractIdentify implements Startable {
}, {
maxInboundStreams: this.maxInboundStreams,
maxOutboundStreams: this.maxOutboundStreams,
runOnTransientConnection: this.runOnTransientConnection
runOnLimitedConnection: this.runOnLimitedConnection
})

this.started = true
2 changes: 1 addition & 1 deletion packages/protocol-perf/src/index.ts
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ export interface PerfInit {
protocolName?: string
maxInboundStreams?: number
maxOutboundStreams?: number
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean

/**
* Data sent/received will be sent in chunks of this size (default: 64KiB)
6 changes: 3 additions & 3 deletions packages/protocol-perf/src/perf-service.ts
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ export class Perf implements Startable, PerfInterface {
private readonly writeBlockSize: number
private readonly maxInboundStreams: number
private readonly maxOutboundStreams: number
private readonly runOnTransientConnection: boolean
private readonly runOnLimitedConnection: boolean

constructor (components: PerfComponents, init: PerfInit = {}) {
this.components = components
@@ -25,7 +25,7 @@ export class Perf implements Startable, PerfInterface {
this.databuf = new ArrayBuffer(this.writeBlockSize)
this.maxInboundStreams = init.maxInboundStreams ?? MAX_INBOUND_STREAMS
this.maxOutboundStreams = init.maxOutboundStreams ?? MAX_OUTBOUND_STREAMS
this.runOnTransientConnection = init.runOnTransientConnection ?? RUN_ON_TRANSIENT_CONNECTION
this.runOnLimitedConnection = init.runOnLimitedConnection ?? RUN_ON_TRANSIENT_CONNECTION
}

readonly [Symbol.toStringTag] = '@libp2p/perf'
@@ -38,7 +38,7 @@ export class Perf implements Startable, PerfInterface {
}, {
maxInboundStreams: this.maxInboundStreams,
maxOutboundStreams: this.maxOutboundStreams,
runOnTransientConnection: this.runOnTransientConnection
runOnLimitedConnection: this.runOnLimitedConnection
})
this.started = true
}
2 changes: 1 addition & 1 deletion packages/protocol-ping/src/index.ts
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ export interface PingServiceInit {
protocolPrefix?: string
maxInboundStreams?: number
maxOutboundStreams?: number
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean

/**
* How long we should wait for a ping response
8 changes: 4 additions & 4 deletions packages/protocol-ping/src/ping.ts
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ export class PingService implements Startable, PingServiceInterface {
private readonly timeout: number
private readonly maxInboundStreams: number
private readonly maxOutboundStreams: number
private readonly runOnTransientConnection: boolean
private readonly runOnLimitedConnection: boolean
private readonly log: Logger

constructor (components: PingServiceComponents, init: PingServiceInit = {}) {
@@ -27,7 +27,7 @@ export class PingService implements Startable, PingServiceInterface {
this.timeout = init.timeout ?? TIMEOUT
this.maxInboundStreams = init.maxInboundStreams ?? MAX_INBOUND_STREAMS
this.maxOutboundStreams = init.maxOutboundStreams ?? MAX_OUTBOUND_STREAMS
this.runOnTransientConnection = init.runOnTransientConnection ?? true
this.runOnLimitedConnection = init.runOnLimitedConnection ?? true

this.handleMessage = this.handleMessage.bind(this)
}
@@ -38,7 +38,7 @@ export class PingService implements Startable, PingServiceInterface {
await this.components.registrar.handle(this.protocol, this.handleMessage, {
maxInboundStreams: this.maxInboundStreams,
maxOutboundStreams: this.maxOutboundStreams,
runOnTransientConnection: this.runOnTransientConnection
runOnLimitedConnection: this.runOnLimitedConnection
})
this.started = true
}
@@ -96,7 +96,7 @@ export class PingService implements Startable, PingServiceInterface {
try {
stream = await connection.newStream(this.protocol, {
...options,
runOnTransientConnection: this.runOnTransientConnection
runOnLimitedConnection: this.runOnLimitedConnection
})

onAbort = () => {
4 changes: 2 additions & 2 deletions packages/transport-circuit-relay-v2/src/server/index.ts
Original file line number Diff line number Diff line change
@@ -144,7 +144,7 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
}, {
maxInboundStreams: this.maxInboundHopStreams,
maxOutboundStreams: this.maxOutboundHopStreams,
runOnTransientConnection: true
runOnLimitedConnection: true
})

this.reservationStore.start()
@@ -383,7 +383,7 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
this.log('starting circuit relay v2 stop request to %s', connection.remotePeer)
const stream = await connection.newStream([RELAY_V2_STOP_CODEC], {
maxOutboundStreams: this.maxOutboundStopStreams,
runOnTransientConnection: true
runOnLimitedConnection: true
})
const pbstr = pbStream(stream)
const stopstr = pbstr.pb(StopMessage)
39 changes: 36 additions & 3 deletions packages/transport-circuit-relay-v2/src/transport/transport.ts
Original file line number Diff line number Diff line change
@@ -149,7 +149,7 @@
}, {
maxInboundStreams: this.maxInboundStopStreams,
maxOutboundStreams: this.maxOutboundStopStreams,
runOnTransientConnection: true
runOnLimitedConnection: true
})

await start(this.discovery, this.reservationStore)
@@ -269,8 +269,25 @@
})

this.log('new outbound relayed connection %a', maConn.remoteAddr)

let expires = Date.now() + (status.limit?.duration ?? Infinity) * 1000

if (status.limit?.duration === 0) {
expires = Infinity
}

const bytes = Number(status.limit?.data) ?? Infinity

Check warning on line 280 in packages/transport-circuit-relay-v2/src/transport/transport.ts

Codecov / codecov/patch

packages/transport-circuit-relay-v2/src/transport/transport.ts#L272-L280

Added lines #L272 - L280 were not covered by tests
return await this.upgrader.upgradeOutbound(maConn, {
transient: status.limit != null,
limits: {
get bytes (): number {
// TODO: decrement me
return bytes
},
get seconds (): number {
return (Date.now() - expires) / 1000
}
},

Check warning on line 290 in packages/transport-circuit-relay-v2/src/transport/transport.ts

Codecov / codecov/patch

packages/transport-circuit-relay-v2/src/transport/transport.ts#L282-L290

Added lines #L282 - L290 were not covered by tests
onProgress
})
} catch (err: any) {
@@ -384,9 +401,25 @@
logger: this.logger
})

let expires = Date.now() + (request.limit.duration ?? Infinity) * 1000

if (request.limit.duration === 0) {
expires = Infinity

Check warning on line 407 in packages/transport-circuit-relay-v2/src/transport/transport.ts

Codecov / codecov/patch

packages/transport-circuit-relay-v2/src/transport/transport.ts#L407

Added line #L407 was not covered by tests
}

const bytes = Number(request.limit?.data) ?? Infinity

this.log('new inbound relayed connection %a', maConn.remoteAddr)
await this.upgrader.upgradeInbound(maConn, {
transient: request.limit != null
limits: {
get bytes (): number {
// TODO: decrement me
return bytes
},

Check warning on line 418 in packages/transport-circuit-relay-v2/src/transport/transport.ts

Codecov / codecov/patch

packages/transport-circuit-relay-v2/src/transport/transport.ts#L416-L418

Added lines #L416 - L418 were not covered by tests
get seconds (): number {
return (Date.now() - expires) / 1000
}

Check warning on line 421 in packages/transport-circuit-relay-v2/src/transport/transport.ts

Codecov / codecov/patch

packages/transport-circuit-relay-v2/src/transport/transport.ts#L420-L421

Added lines #L420 - L421 were not covered by tests
}
})
this.log('%s connection %a upgraded', 'inbound', maConn.remoteAddr)
}
Original file line number Diff line number Diff line change
@@ -72,7 +72,7 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa

const stream = await connection.newStream(SIGNALING_PROTO_ID, {
signal,
runOnTransientConnection: true
runOnLimitedConnection: true
})

const messageStream = pbStream(stream).pb(Message)
Original file line number Diff line number Diff line change
@@ -108,7 +108,7 @@ export class WebRTCTransport implements Transport<WebRTCDialEvents>, Startable {
await this.components.registrar.handle(SIGNALING_PROTO_ID, (data: IncomingStreamData) => {
this._onProtocol(data).catch(err => { this.log.error('failed to handle incoming connect from %p', data.connection.remotePeer, err) })
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})
this._started = true
}
12 changes: 6 additions & 6 deletions packages/transport-webrtc/test/basics.spec.ts
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ describe('basics', () => {
await remoteNode.handle(echo, (info) => {
streamHandler(info)
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

const connection = await localNode.dial(remoteAddr)
@@ -138,7 +138,7 @@ describe('basics', () => {

// open a stream on the echo protocol
const stream = await connection.newStream(echo, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// send and receive some data
@@ -170,7 +170,7 @@ describe('basics', () => {

// open a stream on the echo protocol
const stream = await connection.newStream(echo, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// close for reading
@@ -204,7 +204,7 @@ describe('basics', () => {

// open a stream on the echo protocol
const stream = await connection.newStream(echo, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// close for reading
@@ -241,7 +241,7 @@ describe('basics', () => {

// open a stream on the echo protocol
const stream = await connection.newStream(echo, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// close the write end immediately
@@ -302,7 +302,7 @@ describe('basics', () => {

// open a stream on the echo protocol
const stream = await connection.newStream(echo, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// keep the remote write end open, this should delay the FIN_ACK reply to the local stream

0 comments on commit f24a06e

Please sign in to comment.