Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Configure maxDataLength for it-length-prefixed in PeerStreams #2954

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion packages/kad-dht/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
"it-all": "^3.0.6",
"it-drain": "^3.0.7",
"it-length": "^3.0.6",
"it-length-prefixed": "^9.1.0",
"it-length-prefixed": "^10.0.0",
"it-map": "^3.1.1",
"it-merge": "^3.0.5",
"it-parallel": "^3.0.8",
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"delay": "^6.0.0",
"it-all": "^3.0.6",
"it-drain": "^3.0.7",
"it-length-prefixed": "^9.1.0",
"it-length-prefixed": "^10.0.0",
"it-map": "^3.1.1",
"it-pair": "^2.0.6",
"it-stream-types": "^2.0.2",
Expand Down
2 changes: 1 addition & 1 deletion packages/multistream-select/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
},
"dependencies": {
"@libp2p/interface": "^2.5.0",
"it-length-prefixed": "^9.1.0",
"it-length-prefixed": "^10.0.0",
"it-length-prefixed-stream": "^1.2.0",
"it-stream-types": "^2.0.2",
"p-defer": "^4.0.1",
Expand Down
2 changes: 1 addition & 1 deletion packages/protocol-autonat/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
"aegir": "^45.0.5",
"it-all": "^3.0.6",
"it-drain": "^3.0.7",
"it-length-prefixed": "^9.1.0",
"it-length-prefixed": "^10.0.0",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.3",
"p-retry": "^6.2.1",
Expand Down
2 changes: 1 addition & 1 deletion packages/protocol-identify/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"@libp2p/logger": "^5.1.8",
"aegir": "^45.0.5",
"delay": "^6.0.0",
"it-length-prefixed": "^9.1.0",
"it-length-prefixed": "^10.0.0",
"it-pair": "^2.0.6",
"it-pushable": "^3.2.3",
"protons": "^7.6.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
"@libp2p/peer-collections": "^6.0.17",
"@libp2p/peer-id": "^5.0.12",
"@libp2p/utils": "^6.5.1",
"it-length-prefixed": "^9.1.0",
"it-length-prefixed": "^10.0.0",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.3",
"multiformats": "^13.3.1",
Expand Down
13 changes: 11 additions & 2 deletions packages/pubsub/src/peer-streams.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { TypedEventEmitter } from '@libp2p/interface'
import { closeSource } from '@libp2p/utils/close-source'
import * as lp from 'it-length-prefixed'
import { type decode } from 'it-length-prefixed'
import { pipe } from 'it-pipe'
import { pushable } from 'it-pushable'
import { Uint8ArrayList } from 'uint8arraylist'
import type { ComponentLogger, Logger, Stream, PeerId, PeerStreamEvents } from '@libp2p/interface'
import type { Pushable } from 'it-pushable'
// import { type DecoderOptions as LpDecoderOptions } from 'it-length-prefixed' // Enable this when DecoderOptions will be exported from it-length-prefixed
acul71 marked this conversation as resolved.
Show resolved Hide resolved
type LpDecoderOptions = NonNullable<Parameters<typeof decode>[1]>

export interface PeerStreamsInit {
id: PeerId
Expand All @@ -16,6 +19,11 @@ export interface PeerStreamsComponents {
logger: ComponentLogger
}

// Define the DecodeOptions type locally
interface DecoderOptions extends LpDecoderOptions {
// other custom options we might want for `attachInboundStream`
}

/**
* Thin wrapper around a peer's inbound / outbound pubsub streams
*/
Expand Down Expand Up @@ -86,7 +94,8 @@ export class PeerStreams extends TypedEventEmitter<PeerStreamEvents> {
/**
* Attach a raw inbound stream and setup a read stream
*/
attachInboundStream (stream: Stream): AsyncIterable<Uint8ArrayList> {
// attachInboundStream (stream: Stream): AsyncIterable<Uint8ArrayList> {
attachInboundStream (stream: Stream, decoderOptions?: DecoderOptions): AsyncIterable<Uint8ArrayList> {
const abortListener = (): void => {
closeSource(stream.source, this.log)
}
Expand All @@ -102,7 +111,7 @@ export class PeerStreams extends TypedEventEmitter<PeerStreamEvents> {
this._rawInboundStream = stream
this.inboundStream = pipe(
this._rawInboundStream,
(source) => lp.decode(source)
(source) => lp.decode(source, decoderOptions)
)

this.dispatchEvent(new CustomEvent('stream:inbound'))
Expand Down
70 changes: 70 additions & 0 deletions packages/pubsub/test/peer-streams.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { generateKeyPair } from '@libp2p/crypto/keys'
import { type PeerId } from '@libp2p/interface'
import { defaultLogger } from '@libp2p/logger'
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
import { expect } from 'aegir/chai'
import * as lp from 'it-length-prefixed'
import { pipe } from 'it-pipe'
import { Uint8ArrayList } from 'uint8arraylist'
import { PeerStreams } from '../src/peer-streams.js'
import { ConnectionPair } from './utils/index.js'

describe('PeerStreams large message handling', () => {
let otherPeerId: PeerId

beforeEach(async () => {
otherPeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
})

it('should receive messages larger than internal MAX_DATA_LENGTH when maxDataLength is set', async () => {
const messageSize = 6 * 1024 * 1024 // 6MB
const largeMessage = new Uint8ArrayList(new Uint8Array(messageSize).fill(65)) // Fill with "A"

// Get both ends of the duplex stream
const [connA, connB] = ConnectionPair()

// Use connB as the inbound (reading) side
const inboundStream = await connB.newStream(['a-protocol'])
// Use connA as the outbound (writing) side
const outboundStream = await connA.newStream(['a-protocol'])

// Create PeerStreams with increased maxDataLength
const peer = new PeerStreams(
{ logger: defaultLogger() },
{ id: otherPeerId, protocol: 'a-protocol' }
)

// Attach the inbound stream on the reading end
const inbound = peer.attachInboundStream(inboundStream, { maxDataLength: messageSize })

// Simulate sending data from the outbound side
await pipe(
[largeMessage],
(source) => lp.encode(source, { maxDataLength: messageSize }),
async function * (source) {
for await (const chunk of source) {
yield chunk
}
},
outboundStream.sink
)

// Close the outbound writer so the reader knows no more data is coming
await outboundStream.closeWrite()

// Collect received messages
const receivedMessages: Uint8ArrayList[] = []
for await (const msg of inbound) {
receivedMessages.push(msg)
}

// Check if received correctly
expect(receivedMessages).to.have.lengthOf(1)
expect(receivedMessages[0].byteLength).to.equal(messageSize)
// Check that the content of the sent and received messages are identical
const data = receivedMessages[0].slice()
const input = largeMessage.slice()
expect(data.length).to.equal(input.length)
expect(data).to.deep.equal(input)
})
})
2 changes: 1 addition & 1 deletion packages/transport-webrtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
"any-signal": "^4.1.1",
"detect-browser": "^5.3.0",
"get-port": "^7.1.0",
"it-length-prefixed": "^9.1.0",
"it-length-prefixed": "^10.0.0",
"it-protobuf-stream": "^1.1.5",
"it-pushable": "^3.2.3",
"it-stream-types": "^2.0.2",
Expand Down
Loading