diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java new file mode 100644 index 0000000000..1f783f39e6 --- /dev/null +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.discovery.details; + +import org.apache.curator.x.discovery.ServiceInstance; + +/** + * Listener for events (addition/update/deletion) that happen to a service cache + */ +public interface ServiceCacheEventListener extends ServiceCacheListener +{ + + /** + * Called when a new cache is added. + * + * @param added instance added + */ + public void cacheAdded(ServiceInstance added); + + /** + * Called when a cache is deleted. + * + * @param deleted instance deleted + */ + public void cacheDeleted(ServiceInstance deleted); + + /** + * Called when a cache is updated. + * + * @param old old instance + * @param updated updated instance + */ + public void cacheUpdated(ServiceInstance old, ServiceInstance updated); +} diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index b8f39d5de4..df6696a897 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -93,7 +93,7 @@ public void start() throws Exception cache.start(true); for ( ChildData childData : cache.getCurrentData() ) { - addInstance(childData, true); + addInstanceOnlyIfAbsent(childData); } discovery.cacheOpened(this); } @@ -146,40 +146,72 @@ public void removeListener(ServiceCacheListener listener) @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - boolean notifyListeners = false; - switch ( event.getType() ) + final Tuple tuple; + switch ( event.getType() ) { case CHILD_ADDED: + tuple = addOrUpdateInstance(event.getData()); + listenerContainer.forEach( + new Function() + { + @Override + public Void apply(ServiceCacheListener listener) + { + listener.cacheChanged(); + + if(listener instanceof ServiceCacheEventListener) { + ((ServiceCacheEventListener) listener).cacheAdded(tuple.newInstance); + } + + return null; + } + } + ); + break; case CHILD_UPDATED: { - addInstance(event.getData(), false); - notifyListeners = true; + tuple = addOrUpdateInstance(event.getData()); + listenerContainer.forEach( + new Function() + { + @Override + public Void apply(ServiceCacheListener listener) + { + listener.cacheChanged(); + + if(listener instanceof ServiceCacheEventListener) { + ((ServiceCacheEventListener) listener).cacheUpdated(tuple.oldInstance, tuple.newInstance); + } + + return null; + } + } + ); break; } case CHILD_REMOVED: { - instances.remove(instanceIdFromData(event.getData())); - notifyListeners = true; + final ServiceInstance serviceInstance = instances.remove(instanceIdFromData(event.getData())); + listenerContainer.forEach( + new Function() + { + @Override + public Void apply(ServiceCacheListener listener) + { + listener.cacheChanged(); + + if(listener instanceof ServiceCacheEventListener) { + ((ServiceCacheEventListener) listener).cacheDeleted(serviceInstance); + } + + return null; + } + } + ); break; } } - - if ( notifyListeners ) - { - listenerContainer.forEach - ( - new Function() - { - @Override - public Void apply(ServiceCacheListener listener) - { - listener.cacheChanged(); - return null; - } - } - ); - } } private String instanceIdFromData(ChildData childData) @@ -187,18 +219,31 @@ private String instanceIdFromData(ChildData childData) return ZKPaths.getNodeFromPath(childData.getPath()); } - private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception + private void addInstanceOnlyIfAbsent(ChildData childData) throws Exception { String instanceId = instanceIdFromData(childData); ServiceInstance serviceInstance = discovery.getSerializer().deserialize(childData.getData()); - if ( onlyIfAbsent ) - { - instances.putIfAbsent(instanceId, serviceInstance); - } - else - { - instances.put(instanceId, serviceInstance); - } + + instances.putIfAbsent(instanceId, serviceInstance); + cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion()); + } + + private Tuple addOrUpdateInstance(ChildData childData) throws Exception + { + String instanceId = instanceIdFromData(childData); + ServiceInstance serviceInstance = discovery.getSerializer().deserialize(childData.getData()); + final Tuple result = new Tuple<>(instances.put(instanceId, serviceInstance), serviceInstance); cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion()); + return result; + } + + private static class Tuple { + public final ServiceInstance oldInstance; + public final ServiceInstance newInstance; + + private Tuple(final ServiceInstance oldInstance, final ServiceInstance newInstance) { + this.oldInstance = oldInstance; + this.newInstance = newInstance; + } } } diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index fda5c26965..65b7cb943f 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -28,6 +28,7 @@ import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.details.ServiceCacheEventListener; import org.apache.curator.x.discovery.details.ServiceCacheListener; import org.testng.Assert; import org.testng.annotations.Test; @@ -39,6 +40,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class TestServiceCache extends BaseClassForTests { @@ -310,4 +312,82 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) } } } + + @Test + public void testServiceCacheEventListener() throws Exception + { + List closeables = Lists.newArrayList(); + try + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + closeables.add(client); + client.start(); + + ServiceDiscovery discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build(); + closeables.add(discovery); + discovery.start(); + + ServiceCache cache = discovery.serviceCacheBuilder().name("test").build(); + closeables.add(cache); + + final CountDownLatch latch = new CountDownLatch(6); + + final AtomicBoolean notifyError = new AtomicBoolean(false); + ServiceCacheListener listener = new ServiceCacheEventListener() + { + @Override + public void cacheAdded(final ServiceInstance added) { + latch.countDown(); + + notifyError.compareAndSet(false,added == null); + } + + @Override + public void cacheDeleted(final ServiceInstance deleted) { + latch.countDown(); + + notifyError.compareAndSet(false,deleted == null); + } + + @Override + public void cacheUpdated(final ServiceInstance before, final ServiceInstance after) { + latch.countDown(); + + notifyError.compareAndSet(false, !"before".equals(before.getPayload())); + notifyError.compareAndSet(false, !"after".equals(after.getPayload())); + } + + @Override + public void cacheChanged() + { + latch.countDown(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + + } + }; + cache.addListener(listener); + cache.start(); + + ServiceInstance instance = ServiceInstance.builder().payload("before").name("test").port(10064).build(); + discovery.registerService(instance); + instance = ServiceInstance.builder().id(instance.getId()).payload("after").name("test").port(10064).build(); + discovery.updateService(instance); + discovery.unregisterService(instance); + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + + Assert.assertFalse(notifyError.get()); + } + finally + { + Collections.reverse(closeables); + for ( Closeable c : closeables ) + { + CloseableUtils.closeQuietly(c); + } + } + } }