-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Push notification listener #4194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
- Preparation step for processing custom push notifications - Push notification can appear out-of band in-between executed commands - Current Connection implementation does not support out of band Push notifications - Meaning it will crash if "CLIENT TRACKING ON is enabled" on regular Jedis Connection and "invalidation" push event is triggered This commit provides a way to register push handler for the connection which process incoming push messages, before actual command is executed. To preserve backward compatibility unprocessed push messages are forward to application logic as before. - By default Connection will start with NOOP push handler which marks any incoming push event as processed and skips it - On subcsribe/psubscribe a dedicated push handler is registered which propagates to the app only supported push vents such as (message, subscribe, unsubscribe ...) - CacheConection is refactored to use a push handler handling "invalidate" push events only, and skipping any other
This commit adds a new PushHandlerChain class that implements the Chain of Responsibility pattern for Redis RESP3 push message handling. Key features: - Allows composing multiple PushHandlers in a processing chain - Push events propagate through the complete chain in sequence - Events marked as not processed are propagated to the client application - Provides both constructor-based and fluent builder API for chain creation - Includes predefined handlers for common use cases (CONSUME_ALL, PROPAGATE_ALL) - Supports immutable chain transformations via methods like then(), The chain approach provides a flexible way to handle different types of push messages (invalidations, pub/sub, etc.) with specialized handlers while maintaining a clean separation of concerns. Example usage: PushHandlerChain chain = PushHandlerChain.of(loggingHandler) .then(invalidationHandler) .then(PushHandlerChain.PROPAGATE_PUB_SUB_PUSH_HANDLER);
- code clean up - added relaxed timeout configuration - fix unit tests
Register PushInvalidateConsumer after cache is initialised
ConenctionFactory should be rebound before triggering the disposal of Idle connection, so that any newly creaetd are using the proper hostname
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good!! there are couple of things not entirely clear to me. Just to clear them on my mind i will summarize as i understand;
all push messages goes through all the consumers in the chain,
in current impl, they go through;
CONSUME_ALL_HANDLER
PUBSUB_ONLY_HANDLER
MaintenanceEventConsumer
ListenerNotificationConsumer
- helps to plug customer app push listenersPushInvalidateConsumer
(in case of CSC -CacheConnection
-)
what i don't like is each push messages visit all consumers no matter what. And the last one in the list, it kind of decides to set if data handed back to command execution or not.
so we expect each consumer to do whatever it wants to do with message and set the context appropriately.
this setup brings both flexibility and chaotic behaviour as well, because if someone shows up and marks it as they wish, they challenge to other consumer implementations plugged previously.
i think no push messages, other than pub/sub ones should be handed back to command executer anyway.
(and at some point, even they are not going to following the command response path in the case we introduce a way to customer app handle pub/sub messages via push listeners).
IMHO there should be something separate from letting client received/notified of push messages as well. May be with PUBSUB_ONLY_HANDLER
functions the same way with ListenerNotificationConsumer
but only for pub/sub messages.
let me try to explain the way i see it;
- a consumer chooses if each message propagates further to next consumer or not, and returns it as a value of consume result.
Each consumer along the chain should receive only what is let by its preceding consumers. this secures if messages should be seen or not, by any other consumer. Also this is more efficient in case there is a load of invalidation and/or pubsub messages.
interface PushConsumer {
bool consume(PushMessageContext);
}
- no messages other than pub/sub ones handed back to customer app (or received by command executor). isProcessed flag in context still remains the same to identify if message is chosen to be returned to the app
- each push message initializes with default
isProcessed
=true and there is no need for aCONSUME_ALL_HANDLER
. Then current naming of flag "isProcessed" might be suboptimal in this case. Alternatives would beforwardToClient
orisRequesterBound
maintenanceEventHandler.addListener(new ConnectionRebindHandler()); | ||
} | ||
|
||
if (TimeoutOptions.isRelaxedTimeoutEnabled(config.getTimeoutOptions().getRelaxedTimeout())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we sure relaxedTimeout s are something that makes sense as standalone item? All this should be considered part of hitless upgrade setup and shouldnt be some standalone item in the configuration.
} | ||
|
||
|
||
private static class MaintenanceEventConsumer implements PushConsumer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: these types(ListenerNotificationConsumer
, MaintenanceEventConsumer
, ..) better be located in their own files for the sake of readability, maintainability etc..
May be as package private.
} | ||
} | ||
|
||
private boolean isPubSubType(String type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not sure it adds any value to have this as a method.
import java.lang.reflect.Field; | ||
import java.lang.reflect.Modifier; | ||
|
||
public class ReflectionTestUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets move this to test folder
public static boolean isRelaxedTimeoutEnabled(Duration relaxedTimeout) { | ||
return relaxedTimeout != null && !relaxedTimeout.equals(DISABLED_TIMEOUT); | ||
} | ||
|
||
public static boolean isRelaxedTimeoutDisabled(Duration relaxedTimeout) { | ||
return relaxedTimeout == null || relaxedTimeout.equals(DISABLED_TIMEOUT); | ||
} | ||
|
||
public static boolean isRelaxedTimeoutDisabled(int relaxedTimeout) { | ||
return relaxedTimeout == DISABLED_TIMEOUT_MS; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need all of this? first one should be enough, likely to be the optimal one.
personal POV; if there is nothing planned to built/add more on TimeoutOptions
at the moment, i'd say this options class itself is an overkill.
@@ -211,22 +214,52 @@ private static List<KeyValue> processMapKeyValueReply(final RedisInputStream is) | |||
default: | |||
final List<KeyValue> ret = new ArrayList<>(num); | |||
for (int i = 0; i < num; i++) { | |||
ret.add(new KeyValue(process(is), process(is))); | |||
ret.add(new KeyValue(process(is, null), process(is,null))); | |||
} | |||
return ret; | |||
} | |||
} | |||
|
|||
public static Object read(final RedisInputStream is) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this method remains just for test purposes?
// or push-event is not handled and need to be propagated to application | ||
Object reply; | ||
do { | ||
reply = process(is, pushConsumer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its only here that we expect a push message (instance of PushConsumerContext
) can be returned. so lets return a wrapper type that contains if its a push message or command response. Then, we wont need to use class type checking like within isPush
.
* @author Ivo Gaydajiev | ||
* @since 6.1 | ||
*/ | ||
public interface PushHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same for MaintenanceEventHandler
@Override | ||
public void onRebind(HostAndPort target) { | ||
HostAndPort previous = rebindTarget.getAndSet(target); | ||
if (previous != target) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to trigger same handler with multiple rebind on different targets?
not sure how it should be but if we rely on host+port equality, lets use equals
.
} | ||
} | ||
|
||
private static int safeToInt(long millis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move it to some util class.. may be a new one like NumberUtils
and which also exposes DoublePrecision
methods and retire DoublePrecision
as well.
Issue : If Maintenace notifications are received during blocking command, relaxTimeout is enforced instead of infinit timeout. Fix: Introduce dedicated relax timeout setting for blocking commands. It will fall back to infinit timeout if not set
Description
This PR enhances Jedis and UnifiedJedis with improved support for RESP3 push messages and introduces a configurable option for relaxed timeout behavior.
These changes are part of a broader effort to support more robust and controlled upgrade and maintenance scenarios. Specifically, they lay the groundwork for:
The goal is to increase resilience and flexibility in production environments that require connection stability during topology changes or Redis upgrades.
Backround
Starting with RESP3, Redis servers can send push messages to clients out-of-band (i.e., outside the standard request/response flow). See the RESP3 Push Specification for more details.
In the current state of the client:
CLIENT TRACKING ON
is enabled in a context other than CSC, or if other RESP3 push events are received.Changes Introduced