Skip to content

Commit ce2f7b9

Browse files
authored
Fix: Process multiple received streams concurrently. (#13)
1 parent c53b6b8 commit ce2f7b9

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

lib/redis.client.ts

+4-3
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,10 @@ export class RedisStreamClient extends ClientProxy {
260260
// if BLOCK time ended, and results are null, listen again.
261261
if (!results) return this.listenOnStreams();
262262

263-
const [key, messages] = results[0];
264-
265-
await this.notifyHandlers(key, messages);
263+
for (let result of results) {
264+
let [stream, messages] = result;
265+
await this.notifyHandlers(stream, messages);
266+
}
266267

267268
return this.listenOnStreams();
268269
} catch (error) {

lib/redis.server.ts

+4-3
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,10 @@ export class RedisStreamStrategy
268268
// if BLOCK time ended, and results are null, listen again.
269269
if (!results) return this.listenOnStreams();
270270

271-
const [key, messages] = results[0];
272-
273-
await this.notifyHandlers(key, messages);
271+
for (let result of results) {
272+
let [stream, messages] = result;
273+
await this.notifyHandlers(stream, messages);
274+
}
274275

275276
return this.listenOnStreams();
276277
} catch (error) {

0 commit comments

Comments
 (0)