Skip to content

Commit

Permalink
Fixes ehcache#3169: Register only those CacheEvents with correspondin…
Browse files Browse the repository at this point in the history
…g listener
  • Loading branch information
SamuelBussmann committed Jul 18, 2024
1 parent 155fb3d commit 14ec4e1
Show file tree
Hide file tree
Showing 16 changed files with 131 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,12 @@ public synchronized void removeEventListener(StoreEventListener<K, V> eventListe
}
delegate.removeEventListener(eventListener);
}

@Override
public void listenerModified() {
delegate.listenerModified();
}

@Override
public void addEventFilter(StoreEventFilter<K, V> eventFilter) {
delegate.addEventFilter(eventFilter);
Expand Down
10 changes: 9 additions & 1 deletion ehcache-api/src/main/java/org/ehcache/event/EventType.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package org.ehcache.event;

import java.util.EnumSet;
import java.util.Set;

/**
* The different event types.
*/
Expand Down Expand Up @@ -44,6 +47,11 @@ public enum EventType {
/**
* Represents an existing {@link org.ehcache.Cache.Entry cache entry} being updated for a given key
*/
UPDATED,
UPDATED;

private static final Set<EventType> ALL_EVENT_TYPES = EnumSet.allOf(EventType.class);

public static Set<EventType> allAsSet() {
return ALL_EVENT_TYPES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public void removeEventListener(StoreEventListener<K, V> eventListener) {
// Do nothing
}

@Override
public void listenerModified() {
// Do nothing
}

@Override
public void addEventFilter(StoreEventFilter<K, V> eventFilter) {
// Do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package org.ehcache.core.spi.store.events;

import java.util.Set;

import org.ehcache.core.events.StoreEventDispatcher;
import org.ehcache.event.EventType;

/**
* Interface used to register on a {@link StoreEventSource} to get notified of events happening to mappings the
Expand All @@ -36,4 +39,15 @@ public interface StoreEventListener<K, V> {
* @param event the actual {@link StoreEvent}
*/
void onEvent(StoreEvent<K, V> event);

/**
* Specify which Events this Listener is handling.
* <p>
* Defaults return is all values of {@link EventType}
*
* @return Set of the {@link EventType} this listener handles.
*/
default Set<EventType> getEventTypes() {
return EventType.allAsSet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,9 @@ public interface StoreEventSource<K, V> {
* @return {@code true} if ordering is on, {@code false} otherwise
*/
boolean isEventOrdering();

/**
* Indicates that a listener was modified
*/
void listenerModified();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class CacheEventDispatcherImpl<K, V> implements CacheEventDispatcher<K, V
.collect(toMap(identity(), t -> new CopyOnWriteArrayList<>(), (a, b) -> { throw new AssertionError(); }, () -> new EnumMap<>(EventType.class))));
private final Map<EventType, List<EventListenerWrapper<K, V>>> asyncListenersList = unmodifiableMap(allOf(EventType.class).stream()
.collect(toMap(identity(), t -> new CopyOnWriteArrayList<>(), (a, b) -> { throw new AssertionError(); }, () -> new EnumMap<>(EventType.class))));
private final Set<EventType> registeredEventTypes = EnumSet.noneOf(EventType.class);

private final StoreEventListener<K, V> eventListener = new StoreListener();

Expand Down Expand Up @@ -117,6 +119,8 @@ private synchronized void registerCacheEventListener(EventListenerWrapper<K, V>
storeEventSource.setEventOrdering(true);
}

registeredEventTypes.addAll(wrapper.getEventTypes()); // add EventType of new wrapper to list or relevant EntryTypes

switch (wrapper.getFiringMode()) {
case ASYNCHRONOUS:
wrapper.getEventTypes().forEach(type -> asyncListenersList.get(type).add(wrapper));
Expand All @@ -133,6 +137,8 @@ private synchronized void registerCacheEventListener(EventListenerWrapper<K, V>

if (firstListener) {
storeEventSource.addEventListener(eventListener);
} else {
storeEventSource.listenerModified();
}
}

Expand Down Expand Up @@ -164,6 +170,8 @@ public synchronized void deregisterCacheEventListener(CacheEventListener<? supe
throw new IllegalStateException("Unknown cache event listener: " + listener);
}

refreshRegisteredEventTypes();

if (!allListeners().findAny().isPresent()) {
storeEventSource.setSynchronous(false);
storeEventSource.setEventOrdering(false);
Expand All @@ -178,6 +186,14 @@ public synchronized void deregisterCacheEventListener(CacheEventListener<? supe
}
}

private void refreshRegisteredEventTypes() {
// collect all registered EventTypes
EnumSet<EventType> newRegisteredEventTypes = EnumSet.noneOf(EventType.class);
allListeners().forEach(listener -> newRegisteredEventTypes.addAll(listener.getEventTypes()));
// drop irrelevant EventTypes
registeredEventTypes.retainAll(newRegisteredEventTypes);
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -264,8 +280,12 @@ public void onEvent(StoreEvent<K, V> event) {
throw new AssertionError("Unexpected StoreEvent value: " + event.getType());
}
}
}

@Override
public Set<EventType> getEventTypes() {
return registeredEventTypes;
}
}
/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.ehcache.core.events.StoreEventSink;
import org.ehcache.core.spi.store.events.StoreEventFilter;
import org.ehcache.core.spi.store.events.StoreEventListener;
import org.ehcache.event.EventType;

import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
Expand Down Expand Up @@ -77,6 +79,7 @@ public void evicted(Object key, Supplier<Object> value) {
private final Set<StoreEventFilter<K, V>> filters = new CopyOnWriteArraySet<>();
private final Set<StoreEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
private final BlockingQueue<FireableStoreEventHolder<K, V>>[] orderedQueues;
private final Set<EventType> relevantEventTypes = EnumSet.noneOf(EventType.class);
private volatile boolean ordered = false;

protected AbstractStoreEventDispatcher(int dispatcherConcurrency) {
Expand All @@ -92,6 +95,16 @@ protected AbstractStoreEventDispatcher(int dispatcherConcurrency) {
}
}

private void computeRelevantEventTypes() {
// collect all EventTypes the listeners are interested in.
for (StoreEventListener<K, V> listener : listeners) {
relevantEventTypes.addAll(listener.getEventTypes());
}
if (relevantEventTypes.isEmpty()) { // mocks are empty -> handle all types
relevantEventTypes.addAll(EnumSet.allOf(EventType.class));
}
}

protected Set<StoreEventListener<K, V>> getListeners() {
return listeners;
}
Expand All @@ -104,14 +117,25 @@ protected BlockingQueue<FireableStoreEventHolder<K, V>>[] getOrderedQueues() {
return orderedQueues;
}

protected Set<EventType> getRelevantEventTypes() {
return relevantEventTypes;
}

@Override
public void addEventListener(StoreEventListener<K, V> eventListener) {
listeners.add(eventListener);
computeRelevantEventTypes(); // refresh
}

@Override
public void removeEventListener(StoreEventListener<K, V> eventListener) {
listeners.remove(eventListener);
computeRelevantEventTypes(); // refresh
}

@Override
public void listenerModified() {
computeRelevantEventTypes(); // refresh
}

@Override
Expand Down Expand Up @@ -151,6 +175,6 @@ public void reset(StoreEventSink<K, V> eventSink) {

@Override
public StoreEventSink<K, V> eventSink() {
return new InvocationScopedEventSink<>(getFilters(), isEventOrdering(), getOrderedQueues(), getListeners());
return new InvocationScopedEventSink<>(getFilters(), isEventOrdering(), getOrderedQueues(), getListeners(), getRelevantEventTypes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class FudgingInvocationScopedEventSink<K, V> extends InvocationScopedEventSink<K

FudgingInvocationScopedEventSink(Set<StoreEventFilter<K, V>> filters, boolean ordered,
BlockingQueue<FireableStoreEventHolder<K, V>>[] orderedQueues,
Set<StoreEventListener<K, V>> listeners) {
super(filters, ordered, orderedQueues, listeners);
Set<StoreEventListener<K, V>> listeners,
Set<EventType> relevantEventTypes) {
super(filters, ordered, orderedQueues, listeners, relevantEventTypes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package org.ehcache.impl.internal.events;

import org.ehcache.event.EventType;
import org.ehcache.core.spi.store.events.StoreEventFilter;
import org.ehcache.core.spi.store.events.StoreEventListener;
import static org.ehcache.impl.internal.events.StoreEvents.createEvent;
import static org.ehcache.impl.internal.events.StoreEvents.evictEvent;
import static org.ehcache.impl.internal.events.StoreEvents.expireEvent;
import static org.ehcache.impl.internal.events.StoreEvents.removeEvent;
import static org.ehcache.impl.internal.events.StoreEvents.updateEvent;

import java.util.ArrayDeque;
import java.util.Deque;
Expand All @@ -27,11 +29,9 @@
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;

import static org.ehcache.impl.internal.events.StoreEvents.createEvent;
import static org.ehcache.impl.internal.events.StoreEvents.evictEvent;
import static org.ehcache.impl.internal.events.StoreEvents.expireEvent;
import static org.ehcache.impl.internal.events.StoreEvents.removeEvent;
import static org.ehcache.impl.internal.events.StoreEvents.updateEvent;
import org.ehcache.core.spi.store.events.StoreEventFilter;
import org.ehcache.core.spi.store.events.StoreEventListener;
import org.ehcache.event.EventType;

/**
* InvocationScopedEventSink
Expand All @@ -44,13 +44,16 @@ class InvocationScopedEventSink<K, V> implements CloseableStoreEventSink<K, V> {
private final Set<StoreEventListener<K, V>> listeners;
private final Deque<FireableStoreEventHolder<K, V>> events = new ArrayDeque<>(4);

private final Set<EventType> relevantEventTypes;

InvocationScopedEventSink(Set<StoreEventFilter<K, V>> filters, boolean ordered,
BlockingQueue<FireableStoreEventHolder<K, V>>[] orderedQueues,
Set<StoreEventListener<K, V>> listeners) {
BlockingQueue<FireableStoreEventHolder<K, V>>[] orderedQueues, Set<StoreEventListener<K, V>> listeners,
Set<EventType> relevantEventTypes) {
this.filters = filters;
this.ordered = ordered;
this.orderedQueues = orderedQueues;
this.listeners = listeners;
this.relevantEventTypes = relevantEventTypes;
}

@Override
Expand Down Expand Up @@ -98,7 +101,8 @@ protected boolean acceptEvent(EventType type, K key, V oldValue, V newValue) {
return false;
}
}
return true;
// at least one listener is interested in this event
return relevantEventTypes.contains(type);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public StoreEventSink<K, V> eventSink() {
} else {
StoreEventSink<K, V> eventSink = tlEventSink.get();
if (eventSink == null) {
eventSink = new FudgingInvocationScopedEventSink<>(getFilters(), isEventOrdering(), getOrderedQueues(), getListeners());
eventSink = new FudgingInvocationScopedEventSink<>(getFilters(), isEventOrdering(), getOrderedQueues(), getListeners(), getRelevantEventTypes());
tlEventSink.set(eventSink);
usageDepth.set(0);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public void setSynchronous(boolean synchronous) throws IllegalArgumentException
public boolean isEventOrdering() {
return false;
}

@Override
public void listenerModified() {
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.Test;
import org.mockito.InOrder;

import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
Expand Down Expand Up @@ -53,7 +54,7 @@ public void setUp() {
storeEventListeners.add(listener);
@SuppressWarnings({"unchecked", "rawtypes"})
BlockingQueue<FireableStoreEventHolder<String, String>>[] blockingQueues = new BlockingQueue[] { new ArrayBlockingQueue<FireableStoreEventHolder<String, String>>(10) };
eventSink = new FudgingInvocationScopedEventSink<>(new HashSet<>(), false, blockingQueues, storeEventListeners);
eventSink = new FudgingInvocationScopedEventSink<>(new HashSet<>(), false, blockingQueues, storeEventListeners, EnumSet.allOf(EventType.class));
}

@Test
Expand All @@ -63,6 +64,7 @@ public void testEvictedDifferentKeyNoImpact() {
eventSink.close();

InOrder inOrder = inOrder(listener);
inOrder.verify(listener, times(2)).getEventTypes();
inOrder.verify(listener).onEvent(argThat(createdMatcher));
inOrder.verify(listener).onEvent(argThat(evictedMatcher));
verifyNoMoreInteractions(listener);
Expand All @@ -75,6 +77,7 @@ public void testEvictedSameKeyAfterUpdateReplacesWithEvictCreate() {
eventSink.close();

InOrder inOrder = inOrder(listener);
inOrder.verify(listener, times(3)).getEventTypes();
inOrder.verify(listener).onEvent(argThat(evictedMatcher));
inOrder.verify(listener).onEvent(argThat(createdMatcher));
verifyNoMoreInteractions(listener);
Expand All @@ -88,6 +91,7 @@ public void testEvictedSameKeyAfterCreateFudgesExpiryToo() {
eventSink.close();

InOrder inOrder = inOrder(listener);
inOrder.verify(listener, times(4)).getEventTypes();
inOrder.verify(listener).onEvent(argThat(evictedMatcher));
inOrder.verify(listener).onEvent(argThat(createdMatcher));
verifyNoMoreInteractions(listener);
Expand All @@ -102,6 +106,7 @@ public void testEvictedSameKeyAfterUpdateReplacesWithEvictCreateEvenWithMultiple
eventSink.close();

InOrder inOrder = inOrder(listener);
inOrder.verify(listener, times(5)).getEventTypes();
inOrder.verify(listener, times(3)).onEvent(argThat(evictedMatcher));
inOrder.verify(listener).onEvent(argThat(createdMatcher));
verifyNoMoreInteractions(listener);
Expand All @@ -117,6 +122,7 @@ public void testEvictedSameKeyAfterCreateFudgesExpiryTooEvenWithMultipleEvictsIn
eventSink.close();

InOrder inOrder = inOrder(listener);
inOrder.verify(listener, times(6)).getEventTypes();
inOrder.verify(listener, times(3)).onEvent(argThat(evictedMatcher));
inOrder.verify(listener).onEvent(argThat(createdMatcher));
verifyNoMoreInteractions(listener);
Expand All @@ -130,6 +136,7 @@ public void testEvictedKeyDoesNotFudgeOlderEvents() {
eventSink.close();

InOrder inOrder = inOrder(listener);
inOrder.verify(listener, times(3)).getEventTypes();
Matcher<StoreEvent<String, String>> updatedMatcher = eventType(EventType.UPDATED);
inOrder.verify(listener).onEvent(argThat(updatedMatcher));
inOrder.verify(listener).onEvent(argThat(createdMatcher));
Expand Down
Loading

0 comments on commit 14ec4e1

Please sign in to comment.