1
1
import sys
2
2
from abc import ABC
3
3
from asyncio import IncompleteReadError , StreamReader , TimeoutError
4
- from typing import Callable , List , Optional , Protocol , Union
4
+ from typing import Awaitable , Callable , List , Optional , Protocol , Union
5
+
6
+ from redis .maintenance_events import (
7
+ NodeMigratedEvent ,
8
+ NodeMigratingEvent ,
9
+ NodeMovingEvent ,
10
+ )
5
11
6
12
if sys .version_info .major >= 3 and sys .version_info .minor >= 11 :
7
13
from asyncio import timeout as async_timeout
@@ -158,48 +164,122 @@ async def read_response(
158
164
raise NotImplementedError ()
159
165
160
166
161
- _INVALIDATION_MESSAGE = [b"invalidate" , "invalidate" ]
167
+ _INVALIDATION_MESSAGE = (b"invalidate" , "invalidate" )
168
+ _MOVING_MESSAGE = (b"MOVING" , "MOVING" )
169
+ _MIGRATING_MESSAGE = (b"MIGRATING" , "MIGRATING" )
170
+ _MIGRATED_MESSAGE = (b"MIGRATED" , "MIGRATED" )
171
+ _FAILING_OVER_MESSAGE = (b"FAILING_OVER" , "FAILING_OVER" )
172
+ _FAILED_OVER_MESSAGE = (b"FAILED_OVER" , "FAILED_OVER" )
173
+
174
+ _MAINTENANCE_MESSAGES = (
175
+ * _MIGRATING_MESSAGE ,
176
+ * _MIGRATED_MESSAGE ,
177
+ * _FAILING_OVER_MESSAGE ,
178
+ * _FAILED_OVER_MESSAGE ,
179
+ )
162
180
163
181
164
182
class PushNotificationsParser (Protocol ):
165
183
"""Protocol defining RESP3-specific parsing functionality"""
166
184
167
185
pubsub_push_handler_func : Callable
168
186
invalidation_push_handler_func : Optional [Callable ] = None
187
+ node_moving_push_handler_func : Optional [Callable ] = None
188
+ maintenance_push_handler_func : Optional [Callable ] = None
169
189
170
190
def handle_pubsub_push_response (self , response ):
171
191
"""Handle pubsub push responses"""
172
192
raise NotImplementedError ()
173
193
174
194
def handle_push_response (self , response , ** kwargs ):
175
- if response [0 ] not in _INVALIDATION_MESSAGE :
195
+ msg_type = response [0 ]
196
+ if msg_type not in (
197
+ * _INVALIDATION_MESSAGE ,
198
+ * _MAINTENANCE_MESSAGES ,
199
+ * _MOVING_MESSAGE ,
200
+ ):
176
201
return self .pubsub_push_handler_func (response )
177
- if self .invalidation_push_handler_func :
202
+ if msg_type in _INVALIDATION_MESSAGE and self .invalidation_push_handler_func :
178
203
return self .invalidation_push_handler_func (response )
204
+ if msg_type in _MOVING_MESSAGE and self .node_moving_push_handler_func :
205
+ # TODO: PARSE latest format when available
206
+ host , port = response [2 ].decode ().split (":" )
207
+ ttl = response [1 ]
208
+ id = 1 # Hardcoded value until the notification starts including the id
209
+ notification = NodeMovingEvent (id , host , port , ttl )
210
+ return self .node_moving_push_handler_func (notification )
211
+ if msg_type in _MAINTENANCE_MESSAGES and self .maintenance_push_handler_func :
212
+ if msg_type in _MIGRATING_MESSAGE :
213
+ # TODO: PARSE latest format when available
214
+ ttl = response [1 ]
215
+ id = 2 # Hardcoded value until the notification starts including the id
216
+ notification = NodeMigratingEvent (id , ttl )
217
+ elif msg_type in _MIGRATED_MESSAGE :
218
+ # TODO: PARSE latest format when available
219
+ id = 3 # Hardcoded value until the notification starts including the id
220
+ notification = NodeMigratedEvent (id )
221
+ else :
222
+ notification = None
223
+ if notification is not None :
224
+ return self .maintenance_push_handler_func (notification )
225
+ else :
226
+ return None
179
227
180
228
def set_pubsub_push_handler (self , pubsub_push_handler_func ):
181
229
self .pubsub_push_handler_func = pubsub_push_handler_func
182
230
183
231
def set_invalidation_push_handler (self , invalidation_push_handler_func ):
184
232
self .invalidation_push_handler_func = invalidation_push_handler_func
185
233
234
+ def set_node_moving_push_handler (self , node_moving_push_handler_func ):
235
+ self .node_moving_push_handler_func = node_moving_push_handler_func
236
+
237
+ def set_maintenance_push_handler (self , maintenance_push_handler_func ):
238
+ self .maintenance_push_handler_func = maintenance_push_handler_func
239
+
186
240
187
241
class AsyncPushNotificationsParser (Protocol ):
188
242
"""Protocol defining async RESP3-specific parsing functionality"""
189
243
190
244
pubsub_push_handler_func : Callable
191
245
invalidation_push_handler_func : Optional [Callable ] = None
246
+ node_moving_push_handler_func : Optional [Callable [..., Awaitable [None ]]] = None
247
+ maintenance_push_handler_func : Optional [Callable [..., Awaitable [None ]]] = None
192
248
193
249
async def handle_pubsub_push_response (self , response ):
194
250
"""Handle pubsub push responses asynchronously"""
195
251
raise NotImplementedError ()
196
252
197
253
async def handle_push_response (self , response , ** kwargs ):
198
254
"""Handle push responses asynchronously"""
199
- if response [0 ] not in _INVALIDATION_MESSAGE :
255
+ msg_type = response [0 ]
256
+ if msg_type not in (
257
+ * _INVALIDATION_MESSAGE ,
258
+ * _MAINTENANCE_MESSAGES ,
259
+ * _MOVING_MESSAGE ,
260
+ ):
200
261
return await self .pubsub_push_handler_func (response )
201
- if self .invalidation_push_handler_func :
262
+ if msg_type in _INVALIDATION_MESSAGE and self .invalidation_push_handler_func :
202
263
return await self .invalidation_push_handler_func (response )
264
+ if msg_type in _MOVING_MESSAGE and self .node_moving_push_handler_func :
265
+ # push notification from enterprise cluster for node moving
266
+ # TODO: PARSE latest format when available
267
+ host , port = response [2 ].split (":" )
268
+ ttl = response [1 ]
269
+ id = 1 # Hardcoded value for async parser
270
+ notification = NodeMovingEvent (id , host , port , ttl )
271
+ return await self .node_moving_push_handler_func (notification )
272
+ if msg_type in _MAINTENANCE_MESSAGES and self .maintenance_push_handler_func :
273
+ if msg_type in _MIGRATING_MESSAGE :
274
+ # TODO: PARSE latest format when available
275
+ ttl = response [1 ]
276
+ id = 2 # Hardcoded value for async parser
277
+ notification = NodeMigratingEvent (id , ttl )
278
+ elif msg_type in _MIGRATED_MESSAGE :
279
+ # TODO: PARSE latest format when available
280
+ id = 3 # Hardcoded value for async parser
281
+ notification = NodeMigratedEvent (id )
282
+ return await self .maintenance_push_handler_func (notification )
203
283
204
284
def set_pubsub_push_handler (self , pubsub_push_handler_func ):
205
285
"""Set the pubsub push handler function"""
@@ -209,6 +289,12 @@ def set_invalidation_push_handler(self, invalidation_push_handler_func):
209
289
"""Set the invalidation push handler function"""
210
290
self .invalidation_push_handler_func = invalidation_push_handler_func
211
291
292
+ def set_node_moving_push_handler (self , node_moving_push_handler_func ):
293
+ self .node_moving_push_handler_func = node_moving_push_handler_func
294
+
295
+ def set_maintenance_push_handler (self , maintenance_push_handler_func ):
296
+ self .maintenance_push_handler_func = maintenance_push_handler_func
297
+
212
298
213
299
class _AsyncRESPBase (AsyncBaseParser ):
214
300
"""Base class for async resp parsing"""
0 commit comments