Skip to content

Commit 8b85a28

Browse files
wilsones-berkeleywilsones1991yenfryherrerafelizstobrien89
authored
bugfix: Fix eventstream partial read (#3165)
* fix: ensure full payload is read in NonSeekableStreamDecodingEventStreamIterator Previously, readAndHashBytes() assumed that a single call to $stream->read($num) would return the full payload length. However, PSR-7 allows partial reads, and this behavior occurs in practice with non-seekable streams (e.g., cURL). This caused CRC mismatches and truncated payloads in streamed responses, especially for large final chunks, such as those returned by Bedrock's RetrieveAndGenerateStream API. This change updates readAndHashBytes() to read in a loop until all expected bytes are received or EOF is reached, ensuring proper CRC calculation and full payload integrity. * Add unit test for bug in NonSeekableStreamDecodingEventStreamIterator * chore: add changelog for bugfix in NonSeekableStreamDecodingEventStreamIterator * tests: add event stream test cases - Add mocked events with proper headers and payload in order to test events that carry checksum calculation and make sure the whole processing works as expected. * chore: add property * chore: mock stream interface Co-authored-by: Sean O'Brien <[email protected]> * chore: add empty lines Add empty lines at the end of file. --------- Co-authored-by: Eric Wilson <[email protected]> Co-authored-by: Yenfry Herrera Feliz <[email protected]> Co-authored-by: Yenfry Herrera Feliz <[email protected]> Co-authored-by: Sean O'Brien <[email protected]>
1 parent 768e005 commit 8b85a28

File tree

7 files changed

+164
-2
lines changed

7 files changed

+164
-2
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
[
2+
{
3+
"type": "bugfix",
4+
"category": "",
5+
"description": "Fixed an issue in NonSeekableStreamDecodingEventStreamIterator where partial reads from non-seekable streams could result in truncated payloads and CRC mismatches."
6+
}
7+
]

src/Api/Parser/NonSeekableStreamDecodingEventStreamIterator.php

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,21 @@ protected function readAndHashBytes($num): string
6464
while (!empty($this->tempBuffer) && $num > 0) {
6565
$byte = array_shift($this->tempBuffer);
6666
$bytes .= $byte;
67-
$num = $num - 1;
67+
$num -= 1;
68+
}
69+
70+
// Loop until we've read the expected number of bytes
71+
while ($num > 0 && !$this->stream->eof()) {
72+
$chunk = $this->stream->read($num);
73+
$chunkLen = strlen($chunk);
74+
$bytes .= $chunk;
75+
$num -= $chunkLen;
76+
77+
if ($chunkLen === 0) {
78+
break; // Prevent infinite loop on unexpected EOF
79+
}
6880
}
6981

70-
$bytes = $bytes . $this->stream->read($num);
7182
hash_update($this->hashContext, $bytes);
7283

7384
return $bytes;

tests/Api/Parser/NonSeekableStreamDecodingEventStreamIteratorTest.php

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@
44

55
use Aws\Api\Parser\NonSeekableStreamDecodingEventStreamIterator;
66
use GuzzleHttp\Psr7\NoSeekStream;
7+
use GuzzleHttp\Psr7\StreamDecoratorTrait;
78
use GuzzleHttp\Psr7\Utils;
9+
use Psr\Http\Message\StreamInterface;
810
use Yoast\PHPUnitPolyfills\TestCases\TestCase;
911

1012
class NonSeekableStreamDecodingEventStreamIteratorTest extends TestCase
1113
{
14+
const EVENT_STREAMS_DIR = __DIR__ . '/event-streams/';
15+
const CASES_FILE = self::EVENT_STREAMS_DIR . 'cases.json';
16+
1217
public function testFailOnNonSeekableStream()
1318
{
1419
$this->expectException(\InvalidArgumentException::class);
@@ -67,4 +72,71 @@ public function testValidReturnsTrueOnEOF()
6772
$iterator->next();
6873
$this->assertFalse($iterator->valid());
6974
}
75+
76+
/**
77+
* @param string $eventName
78+
* @param array $expected
79+
*
80+
* @dataProvider readAndHashBytesHandlesPartialReadsProvider
81+
*
82+
* @return void
83+
*/
84+
public function testReadAndHashBytesHandlesPartialReads(
85+
string $eventName,
86+
array $expected
87+
): void
88+
{
89+
$eventPath = self::EVENT_STREAMS_DIR . "events/$eventName";
90+
$eventStream = Utils::streamFor(
91+
base64_decode(
92+
file_get_contents($eventPath)
93+
)
94+
);
95+
$partialReadStream = $this->createMock(StreamInterface::class);
96+
$partialReadStream->method('isSeekable')->willReturn(false);
97+
$partialReadStream->method('isReadable')->willReturn(true);
98+
$partialReadStream->method('read')
99+
->willReturnCallback(function ($length) use ($eventStream) {
100+
if ($eventStream->eof()) {
101+
return '';
102+
}
103+
104+
$readLength = min($length, 20);
105+
return $eventStream->read($readLength);
106+
});
107+
$noSeekStreamDecodingEventStreamIterator = new NonSeekableStreamDecodingEventStreamIterator(
108+
$partialReadStream
109+
);
110+
$noSeekStreamDecodingEventStreamIterator->next();
111+
$event = $noSeekStreamDecodingEventStreamIterator->current();
112+
$this->assertEquals(
113+
$expected['headers'],
114+
$event['headers']
115+
);
116+
$decodedPayload = json_decode(
117+
$event['payload']->getContents(),
118+
true
119+
);
120+
$this->assertEquals(
121+
$expected['payload'],
122+
$decodedPayload
123+
);
124+
}
125+
126+
/**
127+
* @return \Generator
128+
*/
129+
public function readAndHashBytesHandlesPartialReadsProvider(): \Generator
130+
{
131+
$cases = json_decode(
132+
file_get_contents(self::CASES_FILE),
133+
true
134+
);
135+
foreach ($cases as $case) {
136+
yield $case['eventName'] => [
137+
'eventName' => $case['eventName'],
138+
'expected' => $case['expected'],
139+
];
140+
}
141+
}
70142
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
[
2+
{
3+
"eventName": "event-bedrock",
4+
"expected": {
5+
"headers": {
6+
":content-type": "application/json",
7+
":message-type": "event"
8+
},
9+
"payload": {
10+
"eventType": "chunk",
11+
"timestamp": 1764360403.788373,
12+
"requestId": "req-692a00d3c0796",
13+
"modelId": "anthropic.claude-3-sonnet-20240229-v1:0",
14+
"contentType": "application\/json",
15+
"body": {
16+
"type": "content_block_delta",
17+
"index": 0,
18+
"delta": {
19+
"type": "text_delta",
20+
"text": "The artificial intelligence revolution is transforming how we approach complex problems across industries."
21+
}
22+
}
23+
}
24+
}
25+
},
26+
{
27+
"eventName": "event-cloudwatch",
28+
"expected": {
29+
"headers": {
30+
":content-type": "application/json",
31+
":message-type": "event"
32+
},
33+
"payload": {
34+
"eventType": "logEvent",
35+
"timestamp": 1764360403.78797,
36+
"logGroup": "\/aws\/lambda\/my-function",
37+
"logStream": "2025\/11\/28\/[$LATEST]692a00d3c06a0",
38+
"events": [
39+
{
40+
"timestamp": 1764360403000,
41+
"message": "WARN: High memory usage detected",
42+
"id": "692a00d3c06ab"
43+
}
44+
]
45+
}
46+
}
47+
},
48+
{
49+
"eventName": "event-lambda",
50+
"expected": {
51+
"headers": {
52+
":content-type": "application/json",
53+
":message-type": "event"
54+
},
55+
"payload": {
56+
"eventType": "invoke",
57+
"timestamp": 1764360403.787594,
58+
"requestId": "lambda-692a00d3c048b",
59+
"functionName": "my-function",
60+
"statusCode": 200,
61+
"payload": {
62+
"message": "Function executed successfully",
63+
"result": 54,
64+
"executionTime": 4829
65+
}
66+
}
67+
}
68+
}
69+
]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
AAACHwAAADdy59s+DTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsKICAgICJldmVudFR5cGUiOiAiY2h1bmsiLAogICAgInRpbWVzdGFtcCI6IDE3NjQzNjA0MDMuNzg4MzczLAogICAgInJlcXVlc3RJZCI6ICJyZXEtNjkyYTAwZDNjMDc5NiIsCiAgICAibW9kZWxJZCI6ICJhbnRocm9waWMuY2xhdWRlLTMtc29ubmV0LTIwMjQwMjI5LXYxOjAiLAogICAgImNvbnRlbnRUeXBlIjogImFwcGxpY2F0aW9uXC9qc29uIiwKICAgICJib2R5IjogewogICAgICAgICJ0eXBlIjogImNvbnRlbnRfYmxvY2tfZGVsdGEiLAogICAgICAgICJpbmRleCI6IDAsCiAgICAgICAgImRlbHRhIjogewogICAgICAgICAgICAidHlwZSI6ICJ0ZXh0X2RlbHRhIiwKICAgICAgICAgICAgInRleHQiOiAiVGhlIGFydGlmaWNpYWwgaW50ZWxsaWdlbmNlIHJldm9sdXRpb24gaXMgdHJhbnNmb3JtaW5nIGhvdyB3ZSBhcHByb2FjaCBjb21wbGV4IHByb2JsZW1zIGFjcm9zcyBpbmR1c3RyaWVzLiIKICAgICAgICB9CiAgICB9Cn2hDb/T
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
AAABoAAAADdm5DZVDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsKICAgICJldmVudFR5cGUiOiAibG9nRXZlbnQiLAogICAgInRpbWVzdGFtcCI6IDE3NjQzNjA0MDMuNzg3OTcsCiAgICAibG9nR3JvdXAiOiAiXC9hd3NcL2xhbWJkYVwvbXktZnVuY3Rpb24iLAogICAgImxvZ1N0cmVhbSI6ICIyMDI1XC8xMVwvMjhcL1skTEFURVNUXTY5MmEwMGQzYzA2YTAiLAogICAgImV2ZW50cyI6IFsKICAgICAgICB7CiAgICAgICAgICAgICJ0aW1lc3RhbXAiOiAxNzY0MzYwNDAzMDAwLAogICAgICAgICAgICAibWVzc2FnZSI6ICJXQVJOOiBIaWdoIG1lbW9yeSB1c2FnZSBkZXRlY3RlZCIsCiAgICAgICAgICAgICJpZCI6ICI2OTJhMDBkM2MwNmFiIgogICAgICAgIH0KICAgIF0KffY2M+w=
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
AAABbAAAADdKATOPDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsKICAgICJldmVudFR5cGUiOiAiaW52b2tlIiwKICAgICJ0aW1lc3RhbXAiOiAxNzY0MzYwNDAzLjc4NzU5NCwKICAgICJyZXF1ZXN0SWQiOiAibGFtYmRhLTY5MmEwMGQzYzA0OGIiLAogICAgImZ1bmN0aW9uTmFtZSI6ICJteS1mdW5jdGlvbiIsCiAgICAic3RhdHVzQ29kZSI6IDIwMCwKICAgICJwYXlsb2FkIjogewogICAgICAgICJtZXNzYWdlIjogIkZ1bmN0aW9uIGV4ZWN1dGVkIHN1Y2Nlc3NmdWxseSIsCiAgICAgICAgInJlc3VsdCI6IDU0LAogICAgICAgICJleGVjdXRpb25UaW1lIjogNDgyOQogICAgfQp9m4KtLA==

0 commit comments

Comments
 (0)