11import * as Y from 'yjs'
2- import * as redis from 'redis'
2+ import { createClient , defineScript , commandOptions } from 'redis'
33import * as map from 'lib0/map'
44import * as decoding from 'lib0/decoding'
55import * as awarenessProtocol from 'y-protocols/awareness'
@@ -97,7 +97,6 @@ export const createApiClient = async (store, { redisPrefix, redisUrl, enableAwar
9797}
9898
9999export class Api {
100- /** @type {import('@redis/client').RedisClientType<any, any, any> & { addMessage: (key: string, message: Buffer) => Promise<any>, xDelIfEmpty: (key: string) => Promise<any> } } */
101100 redis
102101 /**
103102 * @param {import('./storage.js').AbstractStorage } store
@@ -140,12 +139,11 @@ export class Api {
140139 redis.call("EXPIRE", KEYS[1], ${ ROOM_STREAM_TTL } )
141140 `
142141
143- /** @type {import('@redis/client').RedisClientType & { addMessage: (key: string, message: Buffer) => Promise<any>, xDelIfEmpty: (key: string) => Promise<any> } } */
144- this . redis = redis . createClient ( {
142+ this . redis = createClient ( {
145143 url,
146144 // scripting: https://github.com/redis/node-redis/#lua-scripts
147145 scripts : {
148- addMessage : redis . defineScript ( {
146+ addMessage : defineScript ( {
149147 NUMBER_OF_KEYS : 1 ,
150148 SCRIPT : addScript ,
151149 /**
@@ -162,7 +160,7 @@ export class Api {
162160 return x
163161 }
164162 } ) ,
165- xDelIfEmpty : redis . defineScript ( {
163+ xDelIfEmpty : defineScript ( {
166164 NUMBER_OF_KEYS : 1 ,
167165 SCRIPT : `
168166 if redis.call("XLEN", KEYS[1]) == 0 then
@@ -196,7 +194,7 @@ export class Api {
196194 return [ ]
197195 }
198196 const reads = await this . redis . xRead (
199- redis . commandOptions ( { returnBuffers : true } ) ,
197+ commandOptions ( { returnBuffers : true } ) ,
200198 streams ,
201199 { BLOCK : 1000 , COUNT : 1000 }
202200 )
@@ -244,7 +242,7 @@ export class Api {
244242 * @param {string } docid
245243 */
246244 async getDoc ( room , docid ) {
247- const ms = extractMessagesFromStreamReply ( await this . redis . xRead ( redis . commandOptions ( { returnBuffers : true } ) , { key : computeRedisRoomStreamName ( room , docid , this . prefix ) , id : '0' } ) , this . prefix )
245+ const ms = extractMessagesFromStreamReply ( await this . redis . xRead ( commandOptions ( { returnBuffers : true } ) , { key : computeRedisRoomStreamName ( room , docid , this . prefix ) , id : '0' } ) , this . prefix )
248246 const docMessages = ms . get ( room ) ?. get ( docid ) || null
249247 if ( docMessages ?. messages ) logApi ( `processing messages of length: ${ docMessages ?. messages . length } in room: ${ room } ` )
250248 const docstate = await this . store . retrieveDoc ( room , docid )
@@ -292,7 +290,7 @@ export class Api {
292290 * @param {string } docid
293291 */
294292 async getRedisLastId ( room , docid ) {
295- const ms = extractMessagesFromStreamReply ( await this . redis . xRead ( redis . commandOptions ( { returnBuffers : true } ) , { key : computeRedisRoomStreamName ( room , docid , this . prefix ) , id : '0' } ) , this . prefix )
293+ const ms = extractMessagesFromStreamReply ( await this . redis . xRead ( commandOptions ( { returnBuffers : true } ) , { key : computeRedisRoomStreamName ( room , docid , this . prefix ) , id : '0' } ) , this . prefix )
296294 const docMessages = ms . get ( room ) ?. get ( docid ) || null
297295 return docMessages ?. lastId . toString ( ) || '0'
298296 }
@@ -342,7 +340,6 @@ export class Api {
342340 const streamlen = await this . redis . xLen ( task . stream )
343341 if ( streamlen === 0 ) {
344342 await this . redis . multi ( )
345- // @ts -expect-error custom script on multi
346343 . xDelIfEmpty ( task . stream )
347344 . xAck ( this . redisWorkerStreamName , this . redisWorkerGroupName , task . id )
348345 . xDel ( this . redisWorkerStreamName , task . id )
0 commit comments