Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/nextrelease/fix-eventstream-partial-read.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[
{
"type": "bugfix",
"category": "",
"description": "Fixed an issue in NonSeekableStreamDecodingEventStreamIterator where partial reads from non-seekable streams could result in truncated payloads and CRC mismatches."
}
]
15 changes: 13 additions & 2 deletions src/Api/Parser/NonSeekableStreamDecodingEventStreamIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,21 @@ protected function readAndHashBytes($num): string
while (!empty($this->tempBuffer) && $num > 0) {
$byte = array_shift($this->tempBuffer);
$bytes .= $byte;
$num = $num - 1;
$num -= 1;
}

// Loop until we've read the expected number of bytes
while ($num > 0 && !$this->stream->eof()) {
$chunk = $this->stream->read($num);
$chunkLen = strlen($chunk);
$bytes .= $chunk;
$num -= $chunkLen;

if ($chunkLen === 0) {
break; // Prevent infinite loop on unexpected EOF
}
}

$bytes = $bytes . $this->stream->read($num);
hash_update($this->hashContext, $bytes);

return $bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

use Aws\Api\Parser\NonSeekableStreamDecodingEventStreamIterator;
use GuzzleHttp\Psr7\NoSeekStream;
use GuzzleHttp\Psr7\StreamDecoratorTrait;
use GuzzleHttp\Psr7\Utils;
use Psr\Http\Message\StreamInterface;
use Yoast\PHPUnitPolyfills\TestCases\TestCase;

class NonSeekableStreamDecodingEventStreamIteratorTest extends TestCase
{
const EVENT_STREAMS_DIR = __DIR__ . '/event-streams/';
const CASES_FILE = self::EVENT_STREAMS_DIR . 'cases.json';

public function testFailOnNonSeekableStream()
{
$this->expectException(\InvalidArgumentException::class);
Expand Down Expand Up @@ -67,4 +72,71 @@ public function testValidReturnsTrueOnEOF()
$iterator->next();
$this->assertFalse($iterator->valid());
}

/**
* @param string $eventName
* @param array $expected
*
* @dataProvider readAndHashBytesHandlesPartialReadsProvider
*
* @return void
*/
public function testReadAndHashBytesHandlesPartialReads(
string $eventName,
array $expected
): void
{
$eventPath = self::EVENT_STREAMS_DIR . "events/$eventName";
$eventStream = Utils::streamFor(
base64_decode(
file_get_contents($eventPath)
)
);
$partialReadStream = $this->createMock(StreamInterface::class);
$partialReadStream->method('isSeekable')->willReturn(false);
$partialReadStream->method('isReadable')->willReturn(true);
$partialReadStream->method('read')
->willReturnCallback(function ($length) use ($eventStream) {
if ($eventStream->eof()) {
return '';
}

$readLength = min($length, 20);
return $eventStream->read($readLength);
});
$noSeekStreamDecodingEventStreamIterator = new NonSeekableStreamDecodingEventStreamIterator(
$partialReadStream
);
$noSeekStreamDecodingEventStreamIterator->next();
$event = $noSeekStreamDecodingEventStreamIterator->current();
$this->assertEquals(
$expected['headers'],
$event['headers']
);
$decodedPayload = json_decode(
$event['payload']->getContents(),
true
);
$this->assertEquals(
$expected['payload'],
$decodedPayload
);
}

/**
* @return \Generator
*/
public function readAndHashBytesHandlesPartialReadsProvider(): \Generator
{
$cases = json_decode(
file_get_contents(self::CASES_FILE),
true
);
foreach ($cases as $case) {
yield $case['eventName'] => [
'eventName' => $case['eventName'],
'expected' => $case['expected'],
];
}
}
}
69 changes: 69 additions & 0 deletions tests/Api/Parser/event-streams/cases.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
[
{
"eventName": "event-bedrock",
"expected": {
"headers": {
":content-type": "application/json",
":message-type": "event"
},
"payload": {
"eventType": "chunk",
"timestamp": 1764360403.788373,
"requestId": "req-692a00d3c0796",
"modelId": "anthropic.claude-3-sonnet-20240229-v1:0",
"contentType": "application\/json",
"body": {
"type": "content_block_delta",
"index": 0,
"delta": {
"type": "text_delta",
"text": "The artificial intelligence revolution is transforming how we approach complex problems across industries."
}
}
}
}
},
{
"eventName": "event-cloudwatch",
"expected": {
"headers": {
":content-type": "application/json",
":message-type": "event"
},
"payload": {
"eventType": "logEvent",
"timestamp": 1764360403.78797,
"logGroup": "\/aws\/lambda\/my-function",
"logStream": "2025\/11\/28\/[$LATEST]692a00d3c06a0",
"events": [
{
"timestamp": 1764360403000,
"message": "WARN: High memory usage detected",
"id": "692a00d3c06ab"
}
]
}
}
},
{
"eventName": "event-lambda",
"expected": {
"headers": {
":content-type": "application/json",
":message-type": "event"
},
"payload": {
"eventType": "invoke",
"timestamp": 1764360403.787594,
"requestId": "lambda-692a00d3c048b",
"functionName": "my-function",
"statusCode": 200,
"payload": {
"message": "Function executed successfully",
"result": 54,
"executionTime": 4829
}
}
}
}
]
1 change: 1 addition & 0 deletions tests/Api/Parser/event-streams/events/event-bedrock
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
AAACHwAAADdy59s+DTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsKICAgICJldmVudFR5cGUiOiAiY2h1bmsiLAogICAgInRpbWVzdGFtcCI6IDE3NjQzNjA0MDMuNzg4MzczLAogICAgInJlcXVlc3RJZCI6ICJyZXEtNjkyYTAwZDNjMDc5NiIsCiAgICAibW9kZWxJZCI6ICJhbnRocm9waWMuY2xhdWRlLTMtc29ubmV0LTIwMjQwMjI5LXYxOjAiLAogICAgImNvbnRlbnRUeXBlIjogImFwcGxpY2F0aW9uXC9qc29uIiwKICAgICJib2R5IjogewogICAgICAgICJ0eXBlIjogImNvbnRlbnRfYmxvY2tfZGVsdGEiLAogICAgICAgICJpbmRleCI6IDAsCiAgICAgICAgImRlbHRhIjogewogICAgICAgICAgICAidHlwZSI6ICJ0ZXh0X2RlbHRhIiwKICAgICAgICAgICAgInRleHQiOiAiVGhlIGFydGlmaWNpYWwgaW50ZWxsaWdlbmNlIHJldm9sdXRpb24gaXMgdHJhbnNmb3JtaW5nIGhvdyB3ZSBhcHByb2FjaCBjb21wbGV4IHByb2JsZW1zIGFjcm9zcyBpbmR1c3RyaWVzLiIKICAgICAgICB9CiAgICB9Cn2hDb/T
1 change: 1 addition & 0 deletions tests/Api/Parser/event-streams/events/event-cloudwatch
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
AAABoAAAADdm5DZVDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsKICAgICJldmVudFR5cGUiOiAibG9nRXZlbnQiLAogICAgInRpbWVzdGFtcCI6IDE3NjQzNjA0MDMuNzg3OTcsCiAgICAibG9nR3JvdXAiOiAiXC9hd3NcL2xhbWJkYVwvbXktZnVuY3Rpb24iLAogICAgImxvZ1N0cmVhbSI6ICIyMDI1XC8xMVwvMjhcL1skTEFURVNUXTY5MmEwMGQzYzA2YTAiLAogICAgImV2ZW50cyI6IFsKICAgICAgICB7CiAgICAgICAgICAgICJ0aW1lc3RhbXAiOiAxNzY0MzYwNDAzMDAwLAogICAgICAgICAgICAibWVzc2FnZSI6ICJXQVJOOiBIaWdoIG1lbW9yeSB1c2FnZSBkZXRlY3RlZCIsCiAgICAgICAgICAgICJpZCI6ICI2OTJhMDBkM2MwNmFiIgogICAgICAgIH0KICAgIF0KffY2M+w=
1 change: 1 addition & 0 deletions tests/Api/Parser/event-streams/events/event-lambda
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
AAABbAAAADdKATOPDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsKICAgICJldmVudFR5cGUiOiAiaW52b2tlIiwKICAgICJ0aW1lc3RhbXAiOiAxNzY0MzYwNDAzLjc4NzU5NCwKICAgICJyZXF1ZXN0SWQiOiAibGFtYmRhLTY5MmEwMGQzYzA0OGIiLAogICAgImZ1bmN0aW9uTmFtZSI6ICJteS1mdW5jdGlvbiIsCiAgICAic3RhdHVzQ29kZSI6IDIwMCwKICAgICJwYXlsb2FkIjogewogICAgICAgICJtZXNzYWdlIjogIkZ1bmN0aW9uIGV4ZWN1dGVkIHN1Y2Nlc3NmdWxseSIsCiAgICAgICAgInJlc3VsdCI6IDU0LAogICAgICAgICJleGVjdXRpb25UaW1lIjogNDgyOQogICAgfQp9m4KtLA==