From 88e8d9a15d4f1690a6df1c1cabb355936f327aaa Mon Sep 17 00:00:00 2001 From: Qin Meijie Date: Fri, 8 Jun 2018 13:06:15 +0800 Subject: [PATCH 1/3] Add new service cache listener to let users know what instances have changed, while preserving backward compatibility. --- .../details/ServiceCacheEventListener.java | 50 ++++++++ .../x/discovery/details/ServiceCacheImpl.java | 109 +++++++++++++----- .../curator/x/discovery/TestServiceCache.java | 80 +++++++++++++ 3 files changed, 207 insertions(+), 32 deletions(-) create mode 100644 curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java new file mode 100644 index 0000000000..1f783f39e6 --- /dev/null +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.discovery.details; + +import org.apache.curator.x.discovery.ServiceInstance; + +/** + * Listener for events (addition/update/deletion) that happen to a service cache + */ +public interface ServiceCacheEventListener extends ServiceCacheListener +{ + + /** + * Called when a new cache is added. + * + * @param added instance added + */ + public void cacheAdded(ServiceInstance added); + + /** + * Called when a cache is deleted. + * + * @param deleted instance deleted + */ + public void cacheDeleted(ServiceInstance deleted); + + /** + * Called when a cache is updated. + * + * @param old old instance + * @param updated updated instance + */ + public void cacheUpdated(ServiceInstance old, ServiceInstance updated); +} diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index b8f39d5de4..df6696a897 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -93,7 +93,7 @@ public void start() throws Exception cache.start(true); for ( ChildData childData : cache.getCurrentData() ) { - addInstance(childData, true); + addInstanceOnlyIfAbsent(childData); } discovery.cacheOpened(this); } @@ -146,40 +146,72 @@ public void removeListener(ServiceCacheListener listener) @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - boolean notifyListeners = false; - switch ( event.getType() ) + final Tuple tuple; + switch ( event.getType() ) { case CHILD_ADDED: + tuple = addOrUpdateInstance(event.getData()); + listenerContainer.forEach( + new Function() + { + @Override + public Void apply(ServiceCacheListener listener) + { + listener.cacheChanged(); + + if(listener instanceof ServiceCacheEventListener) { + ((ServiceCacheEventListener) listener).cacheAdded(tuple.newInstance); + } + + return null; + } + } + ); + break; case CHILD_UPDATED: { - addInstance(event.getData(), false); - notifyListeners = true; + tuple = addOrUpdateInstance(event.getData()); + listenerContainer.forEach( + new Function() + { + @Override + public Void apply(ServiceCacheListener listener) + { + listener.cacheChanged(); + + if(listener instanceof ServiceCacheEventListener) { + ((ServiceCacheEventListener) listener).cacheUpdated(tuple.oldInstance, tuple.newInstance); + } + + return null; + } + } + ); break; } case CHILD_REMOVED: { - instances.remove(instanceIdFromData(event.getData())); - notifyListeners = true; + final ServiceInstance serviceInstance = instances.remove(instanceIdFromData(event.getData())); + listenerContainer.forEach( + new Function() + { + @Override + public Void apply(ServiceCacheListener listener) + { + listener.cacheChanged(); + + if(listener instanceof ServiceCacheEventListener) { + ((ServiceCacheEventListener) listener).cacheDeleted(serviceInstance); + } + + return null; + } + } + ); break; } } - - if ( notifyListeners ) - { - listenerContainer.forEach - ( - new Function() - { - @Override - public Void apply(ServiceCacheListener listener) - { - listener.cacheChanged(); - return null; - } - } - ); - } } private String instanceIdFromData(ChildData childData) @@ -187,18 +219,31 @@ private String instanceIdFromData(ChildData childData) return ZKPaths.getNodeFromPath(childData.getPath()); } - private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception + private void addInstanceOnlyIfAbsent(ChildData childData) throws Exception { String instanceId = instanceIdFromData(childData); ServiceInstance serviceInstance = discovery.getSerializer().deserialize(childData.getData()); - if ( onlyIfAbsent ) - { - instances.putIfAbsent(instanceId, serviceInstance); - } - else - { - instances.put(instanceId, serviceInstance); - } + + instances.putIfAbsent(instanceId, serviceInstance); + cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion()); + } + + private Tuple addOrUpdateInstance(ChildData childData) throws Exception + { + String instanceId = instanceIdFromData(childData); + ServiceInstance serviceInstance = discovery.getSerializer().deserialize(childData.getData()); + final Tuple result = new Tuple<>(instances.put(instanceId, serviceInstance), serviceInstance); cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion()); + return result; + } + + private static class Tuple { + public final ServiceInstance oldInstance; + public final ServiceInstance newInstance; + + private Tuple(final ServiceInstance oldInstance, final ServiceInstance newInstance) { + this.oldInstance = oldInstance; + this.newInstance = newInstance; + } } } diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index fda5c26965..65b7cb943f 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -28,6 +28,7 @@ import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.details.ServiceCacheEventListener; import org.apache.curator.x.discovery.details.ServiceCacheListener; import org.testng.Assert; import org.testng.annotations.Test; @@ -39,6 +40,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class TestServiceCache extends BaseClassForTests { @@ -310,4 +312,82 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) } } } + + @Test + public void testServiceCacheEventListener() throws Exception + { + List closeables = Lists.newArrayList(); + try + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + closeables.add(client); + client.start(); + + ServiceDiscovery discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build(); + closeables.add(discovery); + discovery.start(); + + ServiceCache cache = discovery.serviceCacheBuilder().name("test").build(); + closeables.add(cache); + + final CountDownLatch latch = new CountDownLatch(6); + + final AtomicBoolean notifyError = new AtomicBoolean(false); + ServiceCacheListener listener = new ServiceCacheEventListener() + { + @Override + public void cacheAdded(final ServiceInstance added) { + latch.countDown(); + + notifyError.compareAndSet(false,added == null); + } + + @Override + public void cacheDeleted(final ServiceInstance deleted) { + latch.countDown(); + + notifyError.compareAndSet(false,deleted == null); + } + + @Override + public void cacheUpdated(final ServiceInstance before, final ServiceInstance after) { + latch.countDown(); + + notifyError.compareAndSet(false, !"before".equals(before.getPayload())); + notifyError.compareAndSet(false, !"after".equals(after.getPayload())); + } + + @Override + public void cacheChanged() + { + latch.countDown(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + + } + }; + cache.addListener(listener); + cache.start(); + + ServiceInstance instance = ServiceInstance.builder().payload("before").name("test").port(10064).build(); + discovery.registerService(instance); + instance = ServiceInstance.builder().id(instance.getId()).payload("after").name("test").port(10064).build(); + discovery.updateService(instance); + discovery.unregisterService(instance); + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + + Assert.assertFalse(notifyError.get()); + } + finally + { + Collections.reverse(closeables); + for ( Closeable c : closeables ) + { + CloseableUtils.closeQuietly(c); + } + } + } } From 2ebd456930d5970380d68af9c6b7150a2cf24abf Mon Sep 17 00:00:00 2001 From: randgalt Date: Sun, 24 Jun 2018 10:21:04 -0500 Subject: [PATCH 2/3] wip --- .../curator/x/discovery/ServiceCache.java | 12 +- .../details/ServiceCacheEventListener.java | 10 +- .../x/discovery/details/ServiceCacheImpl.java | 140 ++++++++++-------- .../details/ServiceCacheListenerWrapper.java | 53 +++++++ .../curator/x/discovery/TestServiceCache.java | 13 +- 5 files changed, 150 insertions(+), 78 deletions(-) create mode 100644 curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java index a122d692a4..0214bf8aa8 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java @@ -20,6 +20,7 @@ import org.apache.curator.framework.listen.Listenable; import org.apache.curator.x.discovery.details.InstanceProvider; +import org.apache.curator.x.discovery.details.ServiceCacheEventListener; import org.apache.curator.x.discovery.details.ServiceCacheListener; import java.io.Closeable; import java.util.List; @@ -33,12 +34,19 @@ public interface ServiceCache extends Closeable, Listenable> getInstances(); + List> getInstances(); /** * The cache must be started before use * * @throws Exception errors */ - public void start() throws Exception; + void start() throws Exception; + + /** + * Returns the listenable container over the newer {@link org.apache.curator.x.discovery.details.ServiceCacheEventListener} + * + * @return listenable + */ + Listenable> getCacheEventListenable(); } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java index 1f783f39e6..e6345c447f 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java @@ -18,27 +18,27 @@ */ package org.apache.curator.x.discovery.details; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.x.discovery.ServiceInstance; /** * Listener for events (addition/update/deletion) that happen to a service cache */ -public interface ServiceCacheEventListener extends ServiceCacheListener +public interface ServiceCacheEventListener extends ConnectionStateListener { - /** * Called when a new cache is added. * * @param added instance added */ - public void cacheAdded(ServiceInstance added); + void cacheAdded(ServiceInstance added); /** * Called when a cache is deleted. * * @param deleted instance deleted */ - public void cacheDeleted(ServiceInstance deleted); + void cacheDeleted(ServiceInstance deleted); /** * Called when a cache is updated. @@ -46,5 +46,5 @@ public interface ServiceCacheEventListener extends ServiceCacheListener * @param old old instance * @param updated updated instance */ - public void cacheUpdated(ServiceInstance old, ServiceInstance updated); + void cacheUpdated(ServiceInstance old, ServiceInstance updated); } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index ffaf1a499b..47449e8ba1 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.curator.framework.listen.Listenable; import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; @@ -36,6 +37,7 @@ import org.apache.curator.x.discovery.ServiceInstance; import java.io.IOException; import java.util.List; +import java.util.Objects; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -45,9 +47,9 @@ public class ServiceCacheImpl implements ServiceCache, PathChildrenCacheListener { - private final ListenerContainer listenerContainer = new ListenerContainer(); + private final ListenerContainer> listenerContainer = new ListenerContainer<>(); private final ServiceDiscoveryImpl discovery; - private final AtomicReference state = new AtomicReference(State.LATENT); + private final AtomicReference state = new AtomicReference<>(State.LATENT); private final PathChildrenCache cache; private final ConcurrentMap> instances = Maps.newConcurrentMap(); @@ -125,10 +127,10 @@ public void close() throws IOException listenerContainer.forEach ( - new Function() + new Function, Void>() { @Override - public Void apply(ServiceCacheListener listener) + public Void apply(ServiceCacheEventListener listener) { discovery.getClient().getConnectionStateListenable().removeListener(listener); return null; @@ -142,99 +144,115 @@ public Void apply(ServiceCacheListener listener) discovery.cacheClosed(this); } + @Override + public Listenable> getCacheEventListenable() + { + return listenerContainer; + } + @Override public void addListener(ServiceCacheListener listener) { - listenerContainer.addListener(listener); - discovery.getClient().getConnectionStateListenable().addListener(listener); + ServiceCacheListenerWrapper wrapped = ServiceCacheListenerWrapper.wrap(listener); + listenerContainer.addListener(wrapped); + discovery.getClient().getConnectionStateListenable().addListener(wrapped); } @Override public void addListener(ServiceCacheListener listener, Executor executor) { - listenerContainer.addListener(listener, executor); - discovery.getClient().getConnectionStateListenable().addListener(listener, executor); + ServiceCacheListenerWrapper wrapped = ServiceCacheListenerWrapper.wrap(listener); + listenerContainer.addListener(wrapped, executor); + discovery.getClient().getConnectionStateListenable().addListener(wrapped, executor); } @Override - public void removeListener(ServiceCacheListener listener) + public void removeListener(final ServiceCacheListener listener) { - listenerContainer.removeListener(listener); - discovery.getClient().getConnectionStateListenable().removeListener(listener); + listenerContainer.forEach + ( + new Function, Void>() + { + @Override + public Void apply(ServiceCacheEventListener eventListener) + { + if ( Objects.equals(ServiceCacheListenerWrapper.unwrap(eventListener), listener) ) + { + listenerContainer.removeListener(eventListener); + discovery.getClient().getConnectionStateListenable().removeListener(eventListener); + } + return null; + } + } + ); } @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - final Tuple tuple; switch ( event.getType() ) { case CHILD_ADDED: - tuple = addOrUpdateInstance(event.getData()); - listenerContainer.forEach( - new Function() + { + final Tuple tuple = addOrUpdateInstance(event.getData()); + listenerContainer.forEach + ( + new Function, Void>() { @Override - public Void apply(ServiceCacheListener listener) + public Void apply(ServiceCacheEventListener listener) { - listener.cacheChanged(); + listener.cacheAdded(tuple.newInstance); + return null; + } + } + ); + break; + } - if ( listener instanceof ServiceCacheEventListener ) + case CHILD_UPDATED: + { + final Tuple tuple = addOrUpdateInstance(event.getData()); + listenerContainer.forEach + ( + new Function, Void>() + { + @Override + public Void apply(ServiceCacheEventListener listener) + { + if ( tuple.oldInstance != null ) { - //noinspection unchecked - ((ServiceCacheEventListener) listener).cacheAdded(tuple.newInstance); + listener.cacheUpdated(tuple.oldInstance, tuple.newInstance); + } + else + { + listener.cacheAdded(tuple.newInstance); } - return null; } } ); break; - case CHILD_UPDATED: - { - tuple = addOrUpdateInstance(event.getData()); - listenerContainer.forEach( - new Function() - { - @Override - public Void apply(ServiceCacheListener listener) - { - listener.cacheChanged(); - - if( listener instanceof ServiceCacheEventListener ) - { - //noinspection unchecked - ((ServiceCacheEventListener) listener).cacheUpdated(tuple.oldInstance, tuple.newInstance); - } - - return null; - } - } - ); - break; } case CHILD_REMOVED: { final ServiceInstance serviceInstance = instances.remove(instanceIdFromData(event.getData())); - listenerContainer.forEach( - new Function() - { - @Override - public Void apply(ServiceCacheListener listener) - { - listener.cacheChanged(); - - if ( listener instanceof ServiceCacheEventListener ) - { - //noinspection unchecked - ((ServiceCacheEventListener) listener).cacheDeleted(serviceInstance); - } - - return null; - } - } - ); + if ( serviceInstance != null ) + { + listenerContainer.forEach + ( + new Function, Void>() + { + @Override + public Void apply(ServiceCacheEventListener listener) + { + listener.cacheDeleted(serviceInstance); + return null; + } + } + ); + } break; } } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java new file mode 100644 index 0000000000..6f14178194 --- /dev/null +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java @@ -0,0 +1,53 @@ +package org.apache.curator.x.discovery.details; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.x.discovery.ServiceInstance; + +class ServiceCacheListenerWrapper implements ServiceCacheEventListener +{ + private final ServiceCacheListener listener; + + static ServiceCacheListenerWrapper wrap(ServiceCacheListener listener) + { + return new ServiceCacheListenerWrapper<>(listener); + } + + static ServiceCacheListener unwrap(ServiceCacheEventListener eventListener) + { + if ( eventListener instanceof ServiceCacheListenerWrapper ) + { + return ((ServiceCacheListenerWrapper)eventListener).listener; + } + return null; + } + + private ServiceCacheListenerWrapper(ServiceCacheListener listener) + { + this.listener = listener; + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + listener.stateChanged(client, newState); + } + + @Override + public void cacheAdded(ServiceInstance added) + { + listener.cacheChanged(); + } + + @Override + public void cacheDeleted(ServiceInstance deleted) + { + listener.cacheChanged(); + } + + @Override + public void cacheUpdated(ServiceInstance old, ServiceInstance updated) + { + listener.cacheChanged(); + } +} diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index 65b7cb943f..70b41e32e7 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -330,10 +330,10 @@ public void testServiceCacheEventListener() throws Exception ServiceCache cache = discovery.serviceCacheBuilder().name("test").build(); closeables.add(cache); - final CountDownLatch latch = new CountDownLatch(6); + final CountDownLatch latch = new CountDownLatch(3); final AtomicBoolean notifyError = new AtomicBoolean(false); - ServiceCacheListener listener = new ServiceCacheEventListener() + ServiceCacheEventListener listener = new ServiceCacheEventListener() { @Override public void cacheAdded(final ServiceInstance added) { @@ -357,19 +357,12 @@ public void cacheUpdated(final ServiceInstance before, final ServiceInst notifyError.compareAndSet(false, !"after".equals(after.getPayload())); } - @Override - public void cacheChanged() - { - latch.countDown(); - } - @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { - } }; - cache.addListener(listener); + cache.getCacheEventListenable().addListener(listener); cache.start(); ServiceInstance instance = ServiceInstance.builder().payload("before").name("test").port(10064).build(); From c0c0ecad59af4ca20c277790bb0e877cfb3fb5bd Mon Sep 17 00:00:00 2001 From: randgalt Date: Sun, 24 Jun 2018 10:48:36 -0500 Subject: [PATCH 3/3] CURATOR-470 Introduces an alternate cache listener, ServiceCacheEventListener, that gives more detail about changes to the cache as opposed to the original version which merely denotes a change. --- .../details/ServiceCacheEventListener.java | 9 +- .../x/discovery/details/ServiceCacheImpl.java | 156 ++++++++---------- .../details/ServiceCacheListenerWrapper.java | 53 ------ .../curator/x/discovery/TestServiceCache.java | 43 +++-- 4 files changed, 95 insertions(+), 166 deletions(-) delete mode 100644 curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java index e6345c447f..7a3b570b61 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java @@ -18,30 +18,29 @@ */ package org.apache.curator.x.discovery.details; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.x.discovery.ServiceInstance; /** * Listener for events (addition/update/deletion) that happen to a service cache */ -public interface ServiceCacheEventListener extends ConnectionStateListener +public interface ServiceCacheEventListener { /** - * Called when a new cache is added. + * Called when a new instance is added. * * @param added instance added */ void cacheAdded(ServiceInstance added); /** - * Called when a cache is deleted. + * Called when an instance is deleted. * * @param deleted instance deleted */ void cacheDeleted(ServiceInstance deleted); /** - * Called when a cache is updated. + * Called when an instance is updated. * * @param old old instance * @param updated updated instance diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index 47449e8ba1..715182ba93 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -37,7 +37,6 @@ import org.apache.curator.x.discovery.ServiceInstance; import java.io.IOException; import java.util.List; -import java.util.Objects; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -47,7 +46,8 @@ public class ServiceCacheImpl implements ServiceCache, PathChildrenCacheListener { - private final ListenerContainer> listenerContainer = new ListenerContainer<>(); + private final ListenerContainer> eventListenerContainer = new ListenerContainer<>(); + private final ListenerContainer listenerContainer = new ListenerContainer<>(); private final ServiceDiscoveryImpl discovery; private final AtomicReference state = new AtomicReference<>(State.LATENT); private final PathChildrenCache cache; @@ -126,18 +126,19 @@ public void close() throws IOException Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started"); listenerContainer.forEach - ( - new Function, Void>() + ( + new Function() + { + @Override + public Void apply(ServiceCacheListener listener) { - @Override - public Void apply(ServiceCacheEventListener listener) - { - discovery.getClient().getConnectionStateListenable().removeListener(listener); - return null; - } + discovery.getClient().getConnectionStateListenable().removeListener(listener); + return null; } - ); + } + ); listenerContainer.clear(); + eventListenerContainer.clear(); CloseableUtils.closeQuietly(cache); @@ -147,115 +148,98 @@ public Void apply(ServiceCacheEventListener listener) @Override public Listenable> getCacheEventListenable() { - return listenerContainer; + return eventListenerContainer; } @Override public void addListener(ServiceCacheListener listener) { - ServiceCacheListenerWrapper wrapped = ServiceCacheListenerWrapper.wrap(listener); - listenerContainer.addListener(wrapped); - discovery.getClient().getConnectionStateListenable().addListener(wrapped); + listenerContainer.addListener(listener); + discovery.getClient().getConnectionStateListenable().addListener(listener); } @Override public void addListener(ServiceCacheListener listener, Executor executor) { - ServiceCacheListenerWrapper wrapped = ServiceCacheListenerWrapper.wrap(listener); - listenerContainer.addListener(wrapped, executor); - discovery.getClient().getConnectionStateListenable().addListener(wrapped, executor); + listenerContainer.addListener(listener, executor); + discovery.getClient().getConnectionStateListenable().addListener(listener, executor); } @Override - public void removeListener(final ServiceCacheListener listener) + public void removeListener(ServiceCacheListener listener) { - listenerContainer.forEach - ( - new Function, Void>() - { - @Override - public Void apply(ServiceCacheEventListener eventListener) - { - if ( Objects.equals(ServiceCacheListenerWrapper.unwrap(eventListener), listener) ) - { - listenerContainer.removeListener(eventListener); - discovery.getClient().getConnectionStateListenable().removeListener(eventListener); - } - return null; - } - } - ); + listenerContainer.removeListener(listener); + discovery.getClient().getConnectionStateListenable().removeListener(listener); } @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + boolean notifyListeners = false; switch ( event.getType() ) { case CHILD_ADDED: + case CHILD_UPDATED: { - final Tuple tuple = addOrUpdateInstance(event.getData()); - listenerContainer.forEach - ( - new Function, Void>() - { - @Override - public Void apply(ServiceCacheEventListener listener) - { - listener.cacheAdded(tuple.newInstance); - return null; - } - } - ); + notifyListeners = true; + applyTuple(addOrUpdateInstance(event.getData())); break; } - case CHILD_UPDATED: + case CHILD_REMOVED: { - final Tuple tuple = addOrUpdateInstance(event.getData()); - listenerContainer.forEach - ( - new Function, Void>() - { - @Override - public Void apply(ServiceCacheEventListener listener) - { - if ( tuple.oldInstance != null ) - { - listener.cacheUpdated(tuple.oldInstance, tuple.newInstance); - } - else - { - listener.cacheAdded(tuple.newInstance); - } - return null; - } - } - ); + notifyListeners = true; + final ServiceInstance serviceInstance = instances.remove(instanceIdFromData(event.getData())); + applyTuple(new Tuple(serviceInstance, null)); break; } + } - case CHILD_REMOVED: + if ( notifyListeners ) + { + listenerContainer.forEach + ( + new Function() + { + @Override + public Void apply(ServiceCacheListener listener) + { + listener.cacheChanged(); + return null; + } + } + ); + } + } + + private void applyTuple(final Tuple tuple) + { + eventListenerContainer.forEach + ( + new Function, Void>() { - final ServiceInstance serviceInstance = instances.remove(instanceIdFromData(event.getData())); - if ( serviceInstance != null ) + @Override + public Void apply(ServiceCacheEventListener listener) { - listenerContainer.forEach - ( - new Function, Void>() + if ( tuple.oldInstance != null ) + { + if ( tuple.newInstance != null ) + { + listener.cacheUpdated(tuple.oldInstance, tuple.newInstance); + } + else { - @Override - public Void apply(ServiceCacheEventListener listener) - { - listener.cacheDeleted(serviceInstance); - return null; - } + listener.cacheDeleted(tuple.oldInstance); } - ); + } + else if ( tuple.newInstance != null ) + { + listener.cacheAdded(tuple.newInstance); + } + return null; } - break; } - } + ); } private String instanceIdFromData(ChildData childData) @@ -282,8 +266,8 @@ private Tuple addOrUpdateInstance(ChildData childData) throws Exception } private static class Tuple { - public final ServiceInstance oldInstance; - public final ServiceInstance newInstance; + private final ServiceInstance oldInstance; + private final ServiceInstance newInstance; private Tuple(final ServiceInstance oldInstance, final ServiceInstance newInstance) { this.oldInstance = oldInstance; diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java deleted file mode 100644 index 6f14178194..0000000000 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java +++ /dev/null @@ -1,53 +0,0 @@ -package org.apache.curator.x.discovery.details; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.x.discovery.ServiceInstance; - -class ServiceCacheListenerWrapper implements ServiceCacheEventListener -{ - private final ServiceCacheListener listener; - - static ServiceCacheListenerWrapper wrap(ServiceCacheListener listener) - { - return new ServiceCacheListenerWrapper<>(listener); - } - - static ServiceCacheListener unwrap(ServiceCacheEventListener eventListener) - { - if ( eventListener instanceof ServiceCacheListenerWrapper ) - { - return ((ServiceCacheListenerWrapper)eventListener).listener; - } - return null; - } - - private ServiceCacheListenerWrapper(ServiceCacheListener listener) - { - this.listener = listener; - } - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - listener.stateChanged(client, newState); - } - - @Override - public void cacheAdded(ServiceInstance added) - { - listener.cacheChanged(); - } - - @Override - public void cacheDeleted(ServiceInstance deleted) - { - listener.cacheChanged(); - } - - @Override - public void cacheUpdated(ServiceInstance old, ServiceInstance updated) - { - listener.cacheChanged(); - } -} diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index 70b41e32e7..509df7b539 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -44,6 +44,8 @@ public class TestServiceCache extends BaseClassForTests { + private final Timing timing = new Timing(); + @Test public void testInitialLoad() throws Exception { @@ -106,8 +108,6 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) @Test public void testViaProvider() throws Exception { - Timing timing = new Timing(); - List closeables = Lists.newArrayList(); try { @@ -330,36 +330,33 @@ public void testServiceCacheEventListener() throws Exception ServiceCache cache = discovery.serviceCacheBuilder().name("test").build(); closeables.add(cache); - final CountDownLatch latch = new CountDownLatch(3); - - final AtomicBoolean notifyError = new AtomicBoolean(false); + final Semaphore latch = new Semaphore(0); + final AtomicBoolean validation = new AtomicBoolean(true); ServiceCacheEventListener listener = new ServiceCacheEventListener() { @Override - public void cacheAdded(final ServiceInstance added) { - latch.countDown(); - - notifyError.compareAndSet(false,added == null); - } - - @Override - public void cacheDeleted(final ServiceInstance deleted) { - latch.countDown(); + public void cacheAdded(final ServiceInstance added) + { + latch.release(); - notifyError.compareAndSet(false,deleted == null); + validation.compareAndSet(true,added != null); } @Override - public void cacheUpdated(final ServiceInstance before, final ServiceInstance after) { - latch.countDown(); + public void cacheDeleted(final ServiceInstance deleted) + { + latch.release(); - notifyError.compareAndSet(false, !"before".equals(before.getPayload())); - notifyError.compareAndSet(false, !"after".equals(after.getPayload())); + validation.compareAndSet(true,deleted != null); } @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) + public void cacheUpdated(final ServiceInstance before, final ServiceInstance after) { + latch.release(); + + validation.compareAndSet(true, "before".equals(before.getPayload())); + validation.compareAndSet(true, "after".equals(after.getPayload())); } }; cache.getCacheEventListenable().addListener(listener); @@ -367,12 +364,14 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) ServiceInstance instance = ServiceInstance.builder().payload("before").name("test").port(10064).build(); discovery.registerService(instance); + Assert.assertTrue(timing.acquireSemaphore(latch)); instance = ServiceInstance.builder().id(instance.getId()).payload("after").name("test").port(10064).build(); discovery.updateService(instance); + Assert.assertTrue(timing.acquireSemaphore(latch)); discovery.unregisterService(instance); - Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + Assert.assertTrue(timing.acquireSemaphore(latch)); - Assert.assertFalse(notifyError.get()); + Assert.assertTrue(validation.get()); } finally {