Skip to content

feat: Configure maxDataLength for it-length-prefixed in PeerStreams #2954

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions packages/pubsub/src/peer-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import { pushable } from 'it-pushable'
import { Uint8ArrayList } from 'uint8arraylist'
import type { ComponentLogger, Logger, Stream, PeerId, PeerStreamEvents } from '@libp2p/interface'
import type { DecoderOptions as LpDecoderOptions } from 'it-length-prefixed'
import type { Pushable } from 'it-pushable'

export interface PeerStreamsInit {
Expand All @@ -16,6 +17,10 @@
logger: ComponentLogger
}

export interface DecoderOptions extends LpDecoderOptions {
// other custom options we might want for `attachInboundStream`
}

/**
* Thin wrapper around a peer's inbound / outbound pubsub streams
*/
Expand Down Expand Up @@ -86,7 +91,7 @@
/**
* Attach a raw inbound stream and setup a read stream
*/
attachInboundStream (stream: Stream): AsyncIterable<Uint8ArrayList> {
attachInboundStream (stream: Stream, decoderOptions?: DecoderOptions): AsyncIterable<Uint8ArrayList> {
const abortListener = (): void => {
closeSource(stream.source, this.log)
}
Expand All @@ -102,7 +107,7 @@
this._rawInboundStream = stream
this.inboundStream = pipe(
this._rawInboundStream,
(source) => lp.decode(source)
(source) => lp.decode(source, decoderOptions)
)

this.dispatchEvent(new CustomEvent('stream:inbound'))
Expand All @@ -123,13 +128,11 @@
this._rawOutboundStream = stream
this.outboundStream = pushable<Uint8ArrayList>({
onEnd: (shouldEmit) => {
// close writable side of the stream
if (this._rawOutboundStream != null) { // eslint-disable-line @typescript-eslint/prefer-optional-chain
this._rawOutboundStream.closeWrite()
.catch(err => {
this.log('error closing outbound stream', err)
})
}
// close writable side of the stream if it exists
this._rawOutboundStream?.closeWrite()
.catch(err => {
this.log('error closing outbound stream', err)

Check warning on line 134 in packages/pubsub/src/peer-streams.ts

View check run for this annotation

Codecov / codecov/patch

packages/pubsub/src/peer-streams.ts#L134

Added line #L134 was not covered by tests
})

this._rawOutboundStream = undefined
this.outboundStream = undefined
Expand Down
65 changes: 65 additions & 0 deletions packages/pubsub/test/peer-streams.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { generateKeyPair } from '@libp2p/crypto/keys'
import { type PeerId } from '@libp2p/interface'
import { defaultLogger } from '@libp2p/logger'
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
import { expect } from 'aegir/chai'
import * as lp from 'it-length-prefixed'
import { pipe } from 'it-pipe'
import { Uint8ArrayList } from 'uint8arraylist'
import { PeerStreams } from '../src/peer-streams.js'
import { ConnectionPair } from './utils/index.js'

describe('peer-streams', () => {
let otherPeerId: PeerId

beforeEach(async () => {
otherPeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
})

it('should receive messages larger than internal MAX_DATA_LENGTH when maxDataLength is set', async () => {
const messageSize = 6 * 1024 * 1024 // 6MB
const largeMessage = new Uint8ArrayList(new Uint8Array(messageSize).fill(65)) // Fill with "A"

// Get both ends of the duplex stream
const [connA, connB] = ConnectionPair()

// Use connB as the inbound (reading) side
const inboundStream = await connB.newStream(['a-protocol'])
// Use connA as the outbound (writing) side
const outboundStream = await connA.newStream(['a-protocol'])

// Create PeerStreams with increased maxDataLength
const peer = new PeerStreams(
{ logger: defaultLogger() },
{ id: otherPeerId, protocol: 'a-protocol' }
)

// Attach the inbound stream on the reading end
const inbound = peer.attachInboundStream(inboundStream, { maxDataLength: messageSize })

// Simulate sending data from the outbound side
await pipe(
[largeMessage],
(source) => lp.encode(source, { maxDataLength: messageSize }),
outboundStream.sink
)

// Close the outbound writer so the reader knows no more data is coming
await outboundStream.closeWrite()

// Collect received messages
const receivedMessages: Uint8ArrayList[] = []
for await (const msg of inbound) {
receivedMessages.push(msg)
}

// Check if received correctly
expect(receivedMessages).to.have.lengthOf(1)
expect(receivedMessages[0].byteLength).to.equal(messageSize)
// Check that the content of the sent and received messages are identical
const data = receivedMessages[0].slice()
const input = largeMessage.slice()
expect(data.length).to.equal(input.length)
expect(data).to.deep.equal(input)
})
})
Loading