From 38088f72e18caad8ad4b192d078f74ade2f60e93 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 10 Oct 2023 16:39:15 -0400 Subject: [PATCH 1/8] Added filter order optimizations Signed-off-by: Calum Murray --- .../kafka/broker/dispatcher/Filter.java | 4 ++ .../dispatcher/impl/RecordDispatcherImpl.java | 2 + .../filter/subscriptionsapi/AllFilter.java | 31 +++++++- .../filter/subscriptionsapi/AnyFilter.java | 32 +++++++-- .../subscriptionsapi/FilterCounter.java | 40 +++++++++++ .../subscriptionsapi/FilterListOptimizer.java | 71 +++++++++++++++++++ 6 files changed, 173 insertions(+), 7 deletions(-) create mode 100644 data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterCounter.java create mode 100644 data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterListOptimizer.java diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java index 67248bef94..7f006fc0c7 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java @@ -30,4 +30,8 @@ public interface Filter extends Predicate { static Filter noop() { return ce -> true; } + + default void close() { + return; + } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index b62604d0e9..fc1706eb80 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -503,6 +503,8 @@ private void recordReceived(final ConsumerRecordContext recordContext) { public Future close() { this.closed.set(true); + this.filter.close(); + Metrics.searchEgressMeters( meterRegistry, consumerVerticleContext.getEgress().getReference()) .forEach(meterRegistry::remove); diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java index e1a0ab87a0..f43137f88f 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java @@ -18,28 +18,53 @@ import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AllFilter implements Filter { - private final List filters; + private final List filters; private static final Logger logger = LoggerFactory.getLogger(AllFilter.class); + private final ArrayBlockingQueue indexSwapQueue; + + private final FilterListOptimizer filterListOptimizer; + + private final ReadWriteLock readWriteLock; + public AllFilter(List filters) { - this.filters = filters; + this.filters = filters.stream().map(FilterCounter::new).collect(Collectors.toList()); + this.indexSwapQueue = new ArrayBlockingQueue<>(1); + this.readWriteLock = new ReentrantReadWriteLock(); + this.filterListOptimizer = + new FilterListOptimizer(this.readWriteLock, this.indexSwapQueue, this.filters, logger); + this.filterListOptimizer.start(); } @Override public boolean test(CloudEvent cloudEvent) { logger.debug("Testing event against ALL filters. Event {}", cloudEvent); - for (Filter filter : filters) { + this.readWriteLock.readLock().lock(); + for (int i = 0; i < this.filters.size(); i++) { + Filter filter = this.filters.get(i).getFilter(); if (!filter.test(cloudEvent)) { + this.indexSwapQueue.offer(i); logger.debug("Test failed. Filter {} Event {}", filter, cloudEvent); + this.readWriteLock.readLock().unlock(); return false; } } logger.debug("Test ALL filters succeeded. Event {}", cloudEvent); + this.readWriteLock.readLock().unlock(); return true; } + + @Override + public void close() { + this.filterListOptimizer.interrupt(); + } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java index 2325300801..1083b78d4b 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java @@ -18,6 +18,10 @@ import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,23 +29,43 @@ public class AnyFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger(AnyFilter.class); - private final List filters; + private final List filters; + + private final ArrayBlockingQueue indexSwapQueue; + + private final FilterListOptimizer filterListOptimizer; + + private final ReadWriteLock readWriteLock; public AnyFilter(List filters) { - this.filters = filters; + this.filters = filters.stream().map(FilterCounter::new).collect(Collectors.toList()); + this.indexSwapQueue = new ArrayBlockingQueue<>(1); + this.readWriteLock = new ReentrantReadWriteLock(); + this.filterListOptimizer = + new FilterListOptimizer(this.readWriteLock, this.indexSwapQueue, this.filters, logger); + this.filterListOptimizer.start(); } @Override public boolean test(CloudEvent cloudEvent) { logger.debug("Testing event against ANY filter. Event {}", cloudEvent); - - for (Filter filter : filters) { + this.readWriteLock.readLock().lock(); + for (int i = 0; i < this.filters.size(); i++) { + Filter filter = this.filters.get(i).getFilter(); if (filter.test(cloudEvent)) { + this.indexSwapQueue.offer(i); logger.debug("Test succeeded. Filter {} Event {}", filter, cloudEvent); + this.readWriteLock.readLock().unlock(); return true; } } logger.debug("Test failed. All filters failed. Event {}", cloudEvent); + this.readWriteLock.readLock().unlock(); return false; } + + @Override + public void close() { + this.filterListOptimizer.interrupt(); + } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterCounter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterCounter.java new file mode 100644 index 0000000000..2523d62663 --- /dev/null +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterCounter.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; + +import dev.knative.eventing.kafka.broker.dispatcher.Filter; + +public class FilterCounter { + private final Filter filter; + private int count; + + public FilterCounter(Filter filter) { + this.filter = filter; + this.count = 0; + } + + public Filter getFilter() { + return filter; + } + + public int getCount() { + return count; + } + + public int incrementCount() { + return this.count++; + } +} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterListOptimizer.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterListOptimizer.java new file mode 100644 index 0000000000..d998451b7e --- /dev/null +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterListOptimizer.java @@ -0,0 +1,71 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.locks.ReadWriteLock; +import org.slf4j.Logger; + +public class FilterListOptimizer extends Thread { + private final ReadWriteLock readWriteLock; + + private final ArrayBlockingQueue indexSwapQueue; + + private final List filters; + + private final Logger logger; + + public FilterListOptimizer( + ReadWriteLock readWriteLock, + ArrayBlockingQueue indexSwapQueue, + List filters, + Logger logger) { + this.filters = filters; + this.indexSwapQueue = indexSwapQueue; + this.readWriteLock = readWriteLock; + this.logger = logger; + } + + @Override + public void run() { + while (true) { + if (Thread.interrupted()) { + return; + } + try { + this.readWriteLock.readLock().lock(); + final int swapIndex = + this.indexSwapQueue.take(); // this is the only line that throws InterruptedException + if (swapIndex != 0 + && this.filters.get(swapIndex).incrementCount() + > 2 * this.filters.get(swapIndex - 1).getCount()) { + new Thread(() -> { + this.readWriteLock.writeLock().lock(); + Collections.swap(this.filters, swapIndex - 1, swapIndex); + this.readWriteLock.writeLock().unlock(); + }) + .start(); + } + this.readWriteLock.readLock().unlock(); + } catch (InterruptedException e) { + logger.debug("Filter optimizer thread was interrupted", e); + this.readWriteLock.readLock().unlock(); + } + } + } +} From 6a4af89ef2947a05df210530ac29900b30585558 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 13 Oct 2023 16:08:49 -0400 Subject: [PATCH 2/8] Made filter optimization with vertx periodic instead of threads Signed-off-by: Calum Murray --- .../impl/filter/AllFilterBenchmark.java | 28 +++++-- .../impl/filter/AnyFilterBenchmark.java | 25 +++++-- .../impl/filter/FilterBenchmark.java | 16 +++- .../kafka/broker/dispatcher/Filter.java | 6 ++ .../dispatcher/impl/RecordDispatcherImpl.java | 7 +- .../filter/subscriptionsapi/AllFilter.java | 73 ++++++++++++------- .../filter/subscriptionsapi/AnyFilter.java | 66 ++++++++++------- .../subscriptionsapi/FilterCounter.java | 9 ++- .../subscriptionsapi/FilterListOptimizer.java | 71 ------------------ .../filter/subscriptionsapi/NotFilter.java | 6 ++ .../main/ConsumerVerticleBuilder.java | 40 ++++++---- .../dispatcher/impl/RecordDispatcherTest.java | 12 +++ .../AbstractConsumerVerticleTest.java | 3 + .../subscriptionsapi/AllFilterTest.java | 35 ++++++--- .../subscriptionsapi/AnyFilterTest.java | 44 +++++++---- 15 files changed, 258 insertions(+), 183 deletions(-) delete mode 100644 data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterListOptimizer.java diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java index eeef087af5..28d91af112 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java @@ -26,6 +26,7 @@ import java.util.Map; public class AllFilterBenchmark { + public static ExactFilter makeExactFilter() { return new ExactFilter(Map.of("id", "com.github.pull.create")); } @@ -50,7 +51,7 @@ public static class AllFilterWithExactFilter extends FilterBenchmark { @Override protected Filter createFilter() { - return new AllFilter(List.of(makeExactFilter())); + return new AllFilter(List.of(makeExactFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -63,7 +64,10 @@ public static class AllFilterMatchAllSubFilters extends FilterBenchmark { @Override protected Filter createFilter() { - return new AllFilter(List.of(makeExactFilter(), makePrefixFilter(), makeSuffixFilter())); + return new AllFilter( + List.of(makeExactFilter(), makePrefixFilter(), makeSuffixFilter()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -76,7 +80,10 @@ public static class AllFilterFirstMatchEndOfArray extends FilterBenchmark { @Override protected Filter createFilter() { - return new AllFilter(List.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch(), makeExactFilter())); + return new AllFilter( + List.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch(), makeExactFilter()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -89,7 +96,10 @@ public static class AllFilterFirstMatchStartOfArray extends FilterBenchmark { @Override protected Filter createFilter() { - return new AllFilter(List.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSuffixFilterNoMatch())); + return new AllFilter( + List.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSuffixFilterNoMatch()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -102,7 +112,10 @@ public static class AllFilterOneNonMatchingFilterInMiddle extends FilterBenchmar @Override protected Filter createFilter() { - return new AllFilter(List.of(makeExactFilter(), makePrefixFilterNoMatch(), makePrefixFilter())); + return new AllFilter( + List.of(makeExactFilter(), makePrefixFilterNoMatch(), makePrefixFilter()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -115,7 +128,10 @@ public static class AllFilterNoMatchingFilters extends FilterBenchmark { @Override protected Filter createFilter() { - return new AllFilter(List.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch())); + return new AllFilter( + List.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java index 30a6cc3669..479d46c3ba 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java @@ -56,7 +56,7 @@ public static class AnyFilterWithExactFilterBenchmark extends FilterBenchmark { @Override protected Filter createFilter() { - return new AnyFilter(List.of(makeExactFilter())); + return new AnyFilter(List.of(makeExactFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -69,7 +69,10 @@ public static class AnyFilterMatchAllSubfilters extends FilterBenchmark { @Override protected Filter createFilter() { - return new AnyFilter(List.of(makeExactFilter(), makePrefixFilter(), makeSuffixFilter())); + return new AnyFilter( + List.of(makeExactFilter(), makePrefixFilter(), makeSuffixFilter()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -82,7 +85,10 @@ public static class AnyFilterFirstMatchAtEnd extends FilterBenchmark { @Override protected Filter createFilter() { - return new AnyFilter(List.of(makePrefixFilterNoMatch(), makeSufficFilterNoMatch(), makeExactFilter())); + return new AnyFilter( + List.of(makePrefixFilterNoMatch(), makeSufficFilterNoMatch(), makeExactFilter()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -95,7 +101,10 @@ public static class AnyFilterFirstMatchAtStart extends FilterBenchmark { @Override protected Filter createFilter() { - return new AnyFilter(List.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSufficFilterNoMatch())); + return new AnyFilter( + List.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSufficFilterNoMatch()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -108,7 +117,8 @@ public static class AnyFilter2EventsMatch2DifferentFilters extends FilterBenchma @Override protected Filter createFilter() { - return new AnyFilter(List.of(makePrefixFilter(), makePrefixFilterNoMatch())); + return new AnyFilter( + List.of(makePrefixFilter(), makePrefixFilterNoMatch()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -120,7 +130,10 @@ protected CloudEvent createEvent() { public static class AnyFilter2EventsMatch2DifferentFiltersOneFilterMatchesNeither extends FilterBenchmark { @Override protected Filter createFilter() { - return new AnyFilter(List.of(makeSufficFilterNoMatch(), makePrefixFilter(), makePrefixFilterNoMatch())); + return new AnyFilter( + List.of(makeSufficFilterNoMatch(), makePrefixFilter(), makePrefixFilterNoMatch()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java index 28ae301a4b..65fec21c3f 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java @@ -18,11 +18,12 @@ import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; +import io.vertx.core.Vertx; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.infra.Blackhole; -@BenchmarkMode(Mode.All) +@BenchmarkMode(Mode.Throughput) @Fork(1) @State(Scope.Thread) @OutputTimeUnit(TimeUnit.MICROSECONDS) @@ -30,6 +31,10 @@ public abstract class FilterBenchmark { Filter filter; CloudEvent cloudEvent; + public static final Vertx vertx = Vertx.vertx(); + + public static final long FILTER_REORDER_TIME_MILLISECONDS = 500; // 0.5 seconds + @Setup public void setupFilter() { this.filter = createFilter(); @@ -40,13 +45,20 @@ public void setupCloudEvent() { this.cloudEvent = createEvent(); } + @TearDown + public void teardown() { + this.filter.close(vertx); + } + protected abstract Filter createFilter(); protected abstract CloudEvent createEvent(); @Benchmark public void benchmarkFilterCreation(Blackhole bh) { - bh.consume(this.createFilter()); + final var filter = this.createFilter(); + filter.close(); + bh.consume(filter); } @Benchmark diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java index 7f006fc0c7..a9c82cf8a6 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java @@ -16,6 +16,7 @@ package dev.knative.eventing.kafka.broker.dispatcher; import io.cloudevents.CloudEvent; +import io.vertx.core.Vertx; import java.util.function.Predicate; /** @@ -34,4 +35,9 @@ static Filter noop() { default void close() { return; } + + default void close(Vertx vertx) { + return; + } + ; } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index fc1706eb80..19b1c36b38 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -77,6 +77,8 @@ public class RecordDispatcherImpl implements RecordDispatcher { private static final String EKB_ERROR_PREFIX = "kne-"; private static final int KN_ERROR_DATA_MAX_BYTES = 1024; + private final Vertx vertx; + private final Filter filter; private final Function, Future>> subscriberSender; private final Function, Future>> dlsSender; @@ -103,6 +105,7 @@ public class RecordDispatcherImpl implements RecordDispatcher { * @param consumerTracer consumer tracer */ public RecordDispatcherImpl( + final Vertx vertx, final ConsumerVerticleContext consumerVerticleContext, final Filter filter, final CloudEventSender subscriberSender, @@ -111,6 +114,7 @@ public RecordDispatcherImpl( final RecordDispatcherListener recordDispatcherListener, final ConsumerTracer consumerTracer, final MeterRegistry meterRegistry) { + Objects.requireNonNull(vertx, "provide vertx"); Objects.requireNonNull(consumerVerticleContext, "provide consumerVerticleContext"); Objects.requireNonNull(filter, "provide filter"); Objects.requireNonNull(subscriberSender, "provide subscriberSender"); @@ -118,6 +122,7 @@ public RecordDispatcherImpl( Objects.requireNonNull(recordDispatcherListener, "provide offsetStrategy"); Objects.requireNonNull(responseHandler, "provide sinkResponseHandler"); + this.vertx = vertx; this.consumerVerticleContext = consumerVerticleContext; this.filter = filter; this.subscriberSender = composeSenderAndSinkHandler(subscriberSender, responseHandler, "subscriber"); @@ -503,7 +508,7 @@ private void recordReceived(final ConsumerRecordContext recordContext) { public Future close() { this.closed.set(true); - this.filter.close(); + this.filter.close(vertx); Metrics.searchEgressMeters( meterRegistry, consumerVerticleContext.getEgress().getReference()) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java index f43137f88f..bf95705a8b 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java @@ -15,56 +15,73 @@ */ package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; +import io.vertx.core.Vertx; +import java.util.Comparator; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AllFilter implements Filter { - - private final List filters; private static final Logger logger = LoggerFactory.getLogger(AllFilter.class); - private final ArrayBlockingQueue indexSwapQueue; + private final AtomicReference> filters; - private final FilterListOptimizer filterListOptimizer; + private final long periodicTimerId; - private final ReadWriteLock readWriteLock; + private boolean shouldReorder; - public AllFilter(List filters) { - this.filters = filters.stream().map(FilterCounter::new).collect(Collectors.toList()); - this.indexSwapQueue = new ArrayBlockingQueue<>(1); - this.readWriteLock = new ReentrantReadWriteLock(); - this.filterListOptimizer = - new FilterListOptimizer(this.readWriteLock, this.indexSwapQueue, this.filters, logger); - this.filterListOptimizer.start(); + public AllFilter(List filters, Vertx vertx, long delayMilliseconds) { + logger.debug("Starting with timeout {}", delayMilliseconds); + this.periodicTimerId = vertx.setPeriodic(delayMilliseconds, this::reorder); + this.filters = new AtomicReference<>( + filters.stream().map(FilterCounter::new).collect(ImmutableList.toImmutableList())); } - @Override - public boolean test(CloudEvent cloudEvent) { + private void reorder(Long id) { + if (!this.shouldReorder) { + return; + } + logger.debug("Reordering filters!"); + this.filters.updateAndGet((filterCounters -> filterCounters.stream() + .sorted(Comparator.comparingInt(FilterCounter::getCount).reversed()) + .collect(ImmutableList.toImmutableList()))); + } + + private static boolean test( + CloudEvent cloudEvent, ImmutableList filters, Consumer shouldReorder) { logger.debug("Testing event against ALL filters. Event {}", cloudEvent); - this.readWriteLock.readLock().lock(); - for (int i = 0; i < this.filters.size(); i++) { - Filter filter = this.filters.get(i).getFilter(); - if (!filter.test(cloudEvent)) { - this.indexSwapQueue.offer(i); - logger.debug("Test failed. Filter {} Event {}", filter, cloudEvent); - this.readWriteLock.readLock().unlock(); + for (int i = 0; i < filters.size(); i++) { + final var filterCounter = filters.get(i); + if (!filterCounter.getFilter().test(cloudEvent)) { + shouldReorder.accept(i != 0); + filterCounter.incrementCount(); + logger.debug("Test failed. Filter {} Event {}", filterCounter.getFilter(), cloudEvent); return false; } } logger.debug("Test ALL filters succeeded. Event {}", cloudEvent); - this.readWriteLock.readLock().unlock(); return true; } + private void setShouldReorder(boolean shouldReorder) { + logger.debug("Filters should reorder!"); + this.shouldReorder = shouldReorder; + } + + @Override + public boolean test(CloudEvent cloudEvent) { + return AllFilter.test(cloudEvent, this.filters.get(), this::setShouldReorder); + } + @Override - public void close() { - this.filterListOptimizer.interrupt(); + public void close(Vertx vertx) { + logger.debug("Closing periodic reorder job"); + vertx.cancelTimer(this.periodicTimerId); + this.filters.get().forEach((f) -> f.getFilter().close(vertx)); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java index 1083b78d4b..8c7b9d0265 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java @@ -15,13 +15,14 @@ */ package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; +import io.vertx.core.Vertx; +import java.util.Comparator; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,43 +30,54 @@ public class AnyFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger(AnyFilter.class); - private final List filters; + private final AtomicReference> filters; - private final ArrayBlockingQueue indexSwapQueue; + private final long periodicTimerId; - private final FilterListOptimizer filterListOptimizer; + private boolean shouldReorder; - private final ReadWriteLock readWriteLock; + public AnyFilter(List filters, Vertx vertx, long delayMilliseconds) { + this.periodicTimerId = vertx.setPeriodic(delayMilliseconds, this::reorder); + this.filters = new AtomicReference<>( + filters.stream().map(FilterCounter::new).collect(ImmutableList.toImmutableList())); + } - public AnyFilter(List filters) { - this.filters = filters.stream().map(FilterCounter::new).collect(Collectors.toList()); - this.indexSwapQueue = new ArrayBlockingQueue<>(1); - this.readWriteLock = new ReentrantReadWriteLock(); - this.filterListOptimizer = - new FilterListOptimizer(this.readWriteLock, this.indexSwapQueue, this.filters, logger); - this.filterListOptimizer.start(); + private void reorder(Long id) { + logger.debug("Reordering ANY filter!"); + this.filters.updateAndGet((filterCounters -> filterCounters.stream() + .sorted(Comparator.comparingInt(FilterCounter::getCount).reversed()) + .collect(ImmutableList.toImmutableList()))); } - @Override - public boolean test(CloudEvent cloudEvent) { + private static boolean test( + CloudEvent cloudEvent, ImmutableList filters, Consumer shouldReorder) { logger.debug("Testing event against ANY filter. Event {}", cloudEvent); - this.readWriteLock.readLock().lock(); - for (int i = 0; i < this.filters.size(); i++) { - Filter filter = this.filters.get(i).getFilter(); - if (filter.test(cloudEvent)) { - this.indexSwapQueue.offer(i); - logger.debug("Test succeeded. Filter {} Event {}", filter, cloudEvent); - this.readWriteLock.readLock().unlock(); + for (int i = 0; i < filters.size(); i++) { + final var filterCounter = filters.get(i); + if (filterCounter.getFilter().test(cloudEvent)) { + shouldReorder.accept(i != 0); + filterCounter.incrementCount(); + logger.debug("Test succeeded. Filter {} Event {}", filterCounter.getFilter(), cloudEvent); return true; } } logger.debug("Test failed. All filters failed. Event {}", cloudEvent); - this.readWriteLock.readLock().unlock(); return false; } + private void setShouldReorder(boolean shouldReorder) { + logger.debug("Filters should reorder!"); + this.shouldReorder = shouldReorder; + } + + @Override + public boolean test(CloudEvent cloudEvent) { + return AnyFilter.test(cloudEvent, this.filters.get(), this::setShouldReorder); + } + @Override - public void close() { - this.filterListOptimizer.interrupt(); + public void close(Vertx vertx) { + vertx.cancelTimer(this.periodicTimerId); + this.filters.get().forEach((f) -> f.getFilter().close(vertx)); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterCounter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterCounter.java index 2523d62663..8169536547 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterCounter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterCounter.java @@ -16,14 +16,15 @@ package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; import dev.knative.eventing.kafka.broker.dispatcher.Filter; +import java.util.concurrent.atomic.AtomicInteger; public class FilterCounter { private final Filter filter; - private int count; + private final AtomicInteger count; public FilterCounter(Filter filter) { this.filter = filter; - this.count = 0; + this.count = new AtomicInteger(0); } public Filter getFilter() { @@ -31,10 +32,10 @@ public Filter getFilter() { } public int getCount() { - return count; + return count.get(); } public int incrementCount() { - return this.count++; + return this.count.incrementAndGet(); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterListOptimizer.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterListOptimizer.java deleted file mode 100644 index d998451b7e..0000000000 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterListOptimizer.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; - -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.locks.ReadWriteLock; -import org.slf4j.Logger; - -public class FilterListOptimizer extends Thread { - private final ReadWriteLock readWriteLock; - - private final ArrayBlockingQueue indexSwapQueue; - - private final List filters; - - private final Logger logger; - - public FilterListOptimizer( - ReadWriteLock readWriteLock, - ArrayBlockingQueue indexSwapQueue, - List filters, - Logger logger) { - this.filters = filters; - this.indexSwapQueue = indexSwapQueue; - this.readWriteLock = readWriteLock; - this.logger = logger; - } - - @Override - public void run() { - while (true) { - if (Thread.interrupted()) { - return; - } - try { - this.readWriteLock.readLock().lock(); - final int swapIndex = - this.indexSwapQueue.take(); // this is the only line that throws InterruptedException - if (swapIndex != 0 - && this.filters.get(swapIndex).incrementCount() - > 2 * this.filters.get(swapIndex - 1).getCount()) { - new Thread(() -> { - this.readWriteLock.writeLock().lock(); - Collections.swap(this.filters, swapIndex - 1, swapIndex); - this.readWriteLock.writeLock().unlock(); - }) - .start(); - } - this.readWriteLock.readLock().unlock(); - } catch (InterruptedException e) { - logger.debug("Filter optimizer thread was interrupted", e); - this.readWriteLock.readLock().unlock(); - } - } - } -} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java index 8d90034751..85be28735a 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java @@ -17,6 +17,7 @@ import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; +import io.vertx.core.Vertx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,4 +39,9 @@ public boolean test(CloudEvent cloudEvent) { logger.debug("{}: Filter {} - Event {}", result, this.filter, cloudEvent); return passed; } + + @Override + public void close(Vertx vertx) { + this.filter.close(vertx); + } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java index b6a5f038cd..3819bc939a 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java @@ -70,6 +70,8 @@ public class ConsumerVerticleBuilder { private static final CloudEventSender NO_DEAD_LETTER_SINK_SENDER = CloudEventSender.noop("No dead letter sink set"); + private static final long FILTER_REORDER_TIME_MILLISECONDS = 1000 * 60 * 5; // 5 minutes + private final ConsumerVerticleContext consumerVerticleContext; public ConsumerVerticleBuilder(final ConsumerVerticleContext consumerVerticleContext) { @@ -108,8 +110,9 @@ private void build(final Vertx vertx, final ConsumerVerticle consumerVerticle, f final var recordDispatcher = new RecordDispatcherMutatorChain( new RecordDispatcherImpl( + vertx, consumerVerticleContext, - getFilter(), + getFilter(vertx), egressSubscriberSender, egressDeadLetterSender, responseHandler, @@ -179,10 +182,10 @@ public void onPartitionsAssigned(Collection partitions) { }; } - private Filter getFilter() { + private Filter getFilter(Vertx vertx) { // Dialected filters should override the attributes filter if (consumerVerticleContext.getEgress().getDialectedFilterCount() > 0) { - return getFilter(consumerVerticleContext.getEgress().getDialectedFilterList()); + return getFilter(consumerVerticleContext.getEgress().getDialectedFilterList(), vertx); } else if (consumerVerticleContext.getEgress().hasFilter()) { return new ExactFilter( consumerVerticleContext.getEgress().getFilter().getAttributesMap()); @@ -190,23 +193,34 @@ private Filter getFilter() { return Filter.noop(); } - private static Filter getFilter(List filters) { + private static Filter getFilter(List filters, Vertx vertx) { return new AllFilter( - filters.stream().map(ConsumerVerticleBuilder::getFilter).collect(Collectors.toList())); + filters.stream() + .map((DataPlaneContract.DialectedFilter filter) -> + ConsumerVerticleBuilder.getFilter(filter, vertx)) + .collect(Collectors.toList()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } - private static Filter getFilter(DataPlaneContract.DialectedFilter filter) { + private static Filter getFilter(DataPlaneContract.DialectedFilter filter, Vertx vertx) { return switch (filter.getFilterCase()) { case EXACT -> new ExactFilter(filter.getExact().getAttributesMap()); case PREFIX -> new PrefixFilter(filter.getPrefix().getAttributesMap()); case SUFFIX -> new SuffixFilter(filter.getSuffix().getAttributesMap()); - case NOT -> new NotFilter(getFilter(filter.getNot().getFilter())); - case ANY -> new AnyFilter(filter.getAny().getFiltersList().stream() - .map(ConsumerVerticleBuilder::getFilter) - .collect(Collectors.toList())); - case ALL -> new AllFilter(filter.getAll().getFiltersList().stream() - .map(ConsumerVerticleBuilder::getFilter) - .collect(Collectors.toList())); + case NOT -> new NotFilter(getFilter(filter.getNot().getFilter(), vertx)); + case ANY -> new AnyFilter( + filter.getAny().getFiltersList().stream() + .map((DataPlaneContract.DialectedFilter f) -> ConsumerVerticleBuilder.getFilter(f, vertx)) + .collect(Collectors.toList()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); + case ALL -> new AllFilter( + filter.getAll().getFiltersList().stream() + .map((DataPlaneContract.DialectedFilter f) -> ConsumerVerticleBuilder.getFilter(f, vertx)) + .collect(Collectors.toList()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); case CESQL -> new CeSqlFilter(filter.getCesql().getExpression()); default -> Filter.noop(); }; diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java index 98ce91556d..4a9f768cb8 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java @@ -46,6 +46,7 @@ import io.micrometer.prometheus.PrometheusMeterRegistry; import io.vertx.core.Future; import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpVersion; import io.vertx.ext.web.client.impl.HttpResponseImpl; @@ -64,6 +65,7 @@ @ExtendWith(VertxExtension.class) public class RecordDispatcherTest { + Vertx vertx = Vertx.vertx(); private static final ConsumerVerticleContext resourceContext = FakeConsumerVerticleContext.get( FakeConsumerVerticleContext.get().getResource(), @@ -91,6 +93,7 @@ public void shouldNotSendToSubscriberNorToDeadLetterSinkIfValueDoesntMatch() { final RecordDispatcherListener receiver = offsetManagerMock(); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> false, CloudEventSender.noop("subscriber send called"), @@ -122,6 +125,7 @@ public void shouldSendOnlyToSubscriberIfValueMatches() { final RecordDispatcherListener receiver = offsetManagerMock(); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -161,6 +165,7 @@ public void shouldSendToDeadLetterSinkIfValueMatchesAndSubscriberSenderFails() { final RecordDispatcherListener receiver = offsetManagerMock(); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -201,6 +206,7 @@ public void shouldCallFailedToSendToDeadLetterSinkIfValueMatchesAndSubscriberAnd final RecordDispatcherListener receiver = offsetManagerMock(); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -244,6 +250,7 @@ public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDls() String errorBody = "{ \"message\": \"bad bad things happened\" }"; final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -307,6 +314,7 @@ public void failedEventsShouldBeEnhancedWithCustomHttpHeaders() { MultiMap.caseInsensitiveMultiMap().add(validErrorKey, "hello").add(invalidErrorKey, "nope"); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -369,6 +377,7 @@ public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDlsBo String errorBodyTooLarge = errorBody + "QWERTY"; final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -440,6 +449,7 @@ private HttpResponseImpl makeHttpResponseWithHeaders(int statusCode, Str final RecordDispatcherListener receiver = offsetManagerMock(); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -481,6 +491,7 @@ public void shouldCloseSinkResponseHandlerSubscriberSenderAndDeadLetterSinkSende when(deadLetterSender.close()).thenReturn(Future.succeededFuture()); final RecordDispatcher recordDispatcher = new RecordDispatcherImpl( + vertx, resourceContext, Filter.noop(), subscriberSender, @@ -507,6 +518,7 @@ public void shouldDiscardRecordIfInvalidCloudEvent() { final RecordDispatcherListener receiver = offsetManagerMock(); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, Filter.noop(), CloudEventSender.noop("subscriber send called"), diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java index cd6c69989d..44dc6160e5 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java @@ -69,6 +69,7 @@ public abstract class AbstractConsumerVerticleTest { public void subscribedToTopic(final Vertx vertx, final VertxTestContext context) { final var consumer = new MockConsumer(OffsetResetStrategy.LATEST); final var recordDispatcher = new RecordDispatcherImpl( + vertx, resourceContext, value -> false, CloudEventSender.noop("subscriber send called"), @@ -116,6 +117,7 @@ public void onPartitionsAssigned(final Collection partitions) {} public void stop(final Vertx vertx, final VertxTestContext context) { final var consumer = new MockConsumer(OffsetResetStrategy.LATEST); final var recordDispatcher = new RecordDispatcherImpl( + vertx, resourceContext, value -> false, CloudEventSender.noop("subscriber send called"), @@ -199,6 +201,7 @@ public void shouldCloseEverything(final Vertx vertx, final VertxTestContext cont final var sinkClosed = new AtomicBoolean(false); final var recordDispatcher = new RecordDispatcherImpl( + vertx, resourceContext, ce -> true, new CloudEventSenderMock(record -> Future.succeededFuture(), () -> { diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java index c77d6209a1..816794b3fd 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java @@ -20,6 +20,7 @@ import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; +import io.vertx.core.Vertx; import java.net.URI; import java.time.OffsetDateTime; import java.time.ZoneOffset; @@ -33,6 +34,8 @@ public class AllFilterTest { + static final Vertx vertx = Vertx.vertx(); + static final CloudEvent event = CloudEventBuilder.v1() .withId("123-42") .withDataContentType("application/cloudevents+json") @@ -47,29 +50,39 @@ public class AllFilterTest { @MethodSource(value = {"testCases"}) public void match(CloudEvent event, Filter filter, boolean shouldMatch) { assertThat(filter.test(event)).isEqualTo(shouldMatch); + filter.close(vertx); } static Stream testCases() { return Stream.of( - Arguments.of(event, new AllFilter(List.of(new ExactFilter(Map.of("id", "123-42")))), true), + Arguments.of(event, new AllFilter(List.of(new ExactFilter(Map.of("id", "123-42"))), vertx, 500), true), Arguments.of( event, - new AllFilter(List.of( - new ExactFilter(Map.of("id", "123-42")), - new ExactFilter(Map.of("source", "/api/some-source")))), + new AllFilter( + List.of( + new ExactFilter(Map.of("id", "123-42")), + new ExactFilter(Map.of("source", "/api/some-source"))), + vertx, + 500), true), Arguments.of( event, - new AllFilter(List.of( - new ExactFilter(Map.of("id", "123")), - new ExactFilter(Map.of("source", "/api/some-source")))), + new AllFilter( + List.of( + new ExactFilter(Map.of("id", "123")), + new ExactFilter(Map.of("source", "/api/some-source"))), + vertx, + 500), false), Arguments.of( event, - new AllFilter(List.of( - new ExactFilter(Map.of("id", "123-42")), - new ExactFilter(Map.of("source", "/api/something-else")))), + new AllFilter( + List.of( + new ExactFilter(Map.of("id", "123-42")), + new ExactFilter(Map.of("source", "/api/something-else"))), + vertx, + 500), false), - Arguments.of(event, new AllFilter(Collections.emptyList()), true)); + Arguments.of(event, new AllFilter(Collections.emptyList(), Vertx.vertx(), 500), true)); } } diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java index c1564c4cb4..359799efc6 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java @@ -20,6 +20,7 @@ import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; +import io.vertx.core.Vertx; import java.net.URI; import java.time.OffsetDateTime; import java.time.ZoneOffset; @@ -33,6 +34,8 @@ public class AnyFilterTest { + static final Vertx vertx = Vertx.vertx(); + static final CloudEvent event = CloudEventBuilder.v1() .withId("123-42") .withDataContentType("application/cloudevents+json") @@ -47,35 +50,48 @@ public class AnyFilterTest { @MethodSource(value = {"testCases"}) public void match(CloudEvent event, Filter filter, boolean shouldMatch) { assertThat(filter.test(event)).isEqualTo(shouldMatch); + filter.close(vertx); } static Stream testCases() { return Stream.of( - Arguments.of(event, new AnyFilter(List.of(new ExactFilter(Map.of("id", "123-42")))), true), + Arguments.of(event, new AnyFilter(List.of(new ExactFilter(Map.of("id", "123-42"))), vertx, 500), true), Arguments.of( event, - new AnyFilter(List.of( - new ExactFilter(Map.of("id", "123-42")), - new ExactFilter(Map.of("source", "/api/some-source")))), + new AnyFilter( + List.of( + new ExactFilter(Map.of("id", "123-42")), + new ExactFilter(Map.of("source", "/api/some-source"))), + vertx, + 500), true), Arguments.of( event, - new AnyFilter(List.of( - new ExactFilter(Map.of("id", "123")), - new ExactFilter(Map.of("source", "/api/some-source")))), + new AnyFilter( + List.of( + new ExactFilter(Map.of("id", "123")), + new ExactFilter(Map.of("source", "/api/some-source"))), + vertx, + 500), true), Arguments.of( event, - new AnyFilter(List.of( - new ExactFilter(Map.of("id", "123-42")), - new ExactFilter(Map.of("source", "/api/something-else")))), + new AnyFilter( + List.of( + new ExactFilter(Map.of("id", "123-42")), + new ExactFilter(Map.of("source", "/api/something-else"))), + vertx, + 500), true), Arguments.of( event, - new AnyFilter(List.of( - new ExactFilter(Map.of("id", "123")), - new ExactFilter(Map.of("source", "/api/something-else")))), + new AnyFilter( + List.of( + new ExactFilter(Map.of("id", "123")), + new ExactFilter(Map.of("source", "/api/something-else"))), + vertx, + 500), false), - Arguments.of(event, new AnyFilter(Collections.emptyList()), false)); + Arguments.of(event, new AnyFilter(Collections.emptyList(), vertx, 500), false)); } } From 39581501f5be5f86023f1495570a839fc9a8fda3 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 13 Oct 2023 16:24:58 -0400 Subject: [PATCH 3/8] Small benchmark fixes Signed-off-by: Calum Murray --- data-plane/benchmarks/resources/filter-class-list.txt | 2 ++ .../kafka/broker/dispatcher/impl/filter/FilterBenchmark.java | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/data-plane/benchmarks/resources/filter-class-list.txt b/data-plane/benchmarks/resources/filter-class-list.txt index f050db4586..0307d60a2b 100644 --- a/data-plane/benchmarks/resources/filter-class-list.txt +++ b/data-plane/benchmarks/resources/filter-class-list.txt @@ -2,3 +2,5 @@ ExactFilterBenchmark NotFilterBenchmark PrefixFilterBenchmark SuffixFilterBenchmark +AnyFilterBenchmark +AllFilterBenchmark diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java index 65fec21c3f..0e648a4ffa 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java @@ -26,6 +26,8 @@ @BenchmarkMode(Mode.Throughput) @Fork(1) @State(Scope.Thread) +@Measurement(iterations = 3, time = 10) +@Warmup(iterations = 3, time = 5) @OutputTimeUnit(TimeUnit.MICROSECONDS) public abstract class FilterBenchmark { Filter filter; @@ -57,7 +59,7 @@ public void teardown() { @Benchmark public void benchmarkFilterCreation(Blackhole bh) { final var filter = this.createFilter(); - filter.close(); + filter.close(vertx); bh.consume(filter); } From a4792a849f706c6072cc220015a9fa2054397593 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 16 Oct 2023 11:02:14 -0400 Subject: [PATCH 4/8] fixed benchmarks Signed-off-by: Calum Murray --- .../impl/filter/AllFilterBenchmark.java | 2 +- .../impl/filter/AnyFilterBenchmark.java | 10 +++++----- .../dispatcher/impl/filter/FilterBenchmark.java | 16 +++++++++++----- .../impl/filter/subscriptionsapi/AnyFilter.java | 3 +++ 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java index 28d91af112..4487ba61cd 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java @@ -28,7 +28,7 @@ public class AllFilterBenchmark { public static ExactFilter makeExactFilter() { - return new ExactFilter(Map.of("id", "com.github.pull.create")); + return new ExactFilter(Map.of("type", "com.github.pull.create")); } public static PrefixFilter makePrefixFilter() { diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java index 479d46c3ba..c1780008bc 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java @@ -33,7 +33,7 @@ public static CloudEvent event() { } public static ExactFilter makeExactFilter() { - return new ExactFilter(Map.of("id", "com.github.pull.create")); + return new ExactFilter(Map.of("type", "com.github.pull.create")); } public static PrefixFilter makePrefixFilter() { @@ -48,7 +48,7 @@ public static PrefixFilter makePrefixFilterNoMatch() { return new PrefixFilter(Map.of("type", "other.event")); } - public static SuffixFilter makeSufficFilterNoMatch() { + public static SuffixFilter makeSuffixFilterNoMatch() { return new SuffixFilter(Map.of("source", "qwertyuiop")); } @@ -86,7 +86,7 @@ public static class AnyFilterFirstMatchAtEnd extends FilterBenchmark { @Override protected Filter createFilter() { return new AnyFilter( - List.of(makePrefixFilterNoMatch(), makeSufficFilterNoMatch(), makeExactFilter()), + List.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch(), makeExactFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @@ -102,7 +102,7 @@ public static class AnyFilterFirstMatchAtStart extends FilterBenchmark { @Override protected Filter createFilter() { return new AnyFilter( - List.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSufficFilterNoMatch()), + List.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSuffixFilterNoMatch()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @@ -131,7 +131,7 @@ public static class AnyFilter2EventsMatch2DifferentFiltersOneFilterMatchesNeithe @Override protected Filter createFilter() { return new AnyFilter( - List.of(makeSufficFilterNoMatch(), makePrefixFilter(), makePrefixFilterNoMatch()), + List.of(makeSuffixFilterNoMatch(), makePrefixFilter(), makePrefixFilterNoMatch()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java index 0e648a4ffa..b6a178e708 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java @@ -33,21 +33,27 @@ public abstract class FilterBenchmark { Filter filter; CloudEvent cloudEvent; - public static final Vertx vertx = Vertx.vertx(); + Vertx vertx; - public static final long FILTER_REORDER_TIME_MILLISECONDS = 500; // 0.5 seconds + public static final long FILTER_REORDER_TIME_MILLISECONDS = 1000; // 1 seconds - @Setup + @TearDown + public void closeVertx() { + this.vertx.close(); + } + + @Setup(Level.Trial) public void setupFilter() { + this.vertx = Vertx.vertx(); this.filter = createFilter(); } - @Setup + @Setup(Level.Trial) public void setupCloudEvent() { this.cloudEvent = createEvent(); } - @TearDown + @TearDown(Level.Trial) public void teardown() { this.filter.close(vertx); } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java index 8c7b9d0265..e4813d2d8f 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java @@ -43,6 +43,9 @@ public AnyFilter(List filters, Vertx vertx, long delayMilliseconds) { } private void reorder(Long id) { + if (!this.shouldReorder) { + return; + } logger.debug("Reordering ANY filter!"); this.filters.updateAndGet((filterCounters -> filterCounters.stream() .sorted(Comparator.comparingInt(FilterCounter::getCount).reversed()) From 536a774ed1a62beff3ac6e0dd53415e02c877fa7 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 17 Oct 2023 19:33:05 -0400 Subject: [PATCH 5/8] further optimizations to base loop performance Signed-off-by: Calum Murray --- .../impl/filter/FilterBenchmark.java | 2 +- .../kafka/broker/dispatcher/Filter.java | 11 +++-- .../impl/filter/AttributesFilter.java | 14 ++++++ .../filter/subscriptionsapi/AllFilter.java | 43 +++++++++++------ .../filter/subscriptionsapi/AnyFilter.java | 47 ++++++++++++------- .../filter/subscriptionsapi/CeSqlFilter.java | 14 ++++++ .../subscriptionsapi/FilterCounter.java | 41 ---------------- .../filter/subscriptionsapi/NotFilter.java | 14 ++++++ 8 files changed, 111 insertions(+), 75 deletions(-) delete mode 100644 data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterCounter.java diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java index b6a178e708..a3aaf04b86 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java @@ -35,7 +35,7 @@ public abstract class FilterBenchmark { Vertx vertx; - public static final long FILTER_REORDER_TIME_MILLISECONDS = 1000; // 1 seconds + public static final long FILTER_REORDER_TIME_MILLISECONDS = 10000; // 1 seconds @TearDown public void closeVertx() { diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java index a9c82cf8a6..aaac496ed9 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java @@ -32,12 +32,17 @@ static Filter noop() { return ce -> true; } - default void close() { - return; + default int getCount() { + return 0; + } + ; + + default int incrementCount() { + return 0; } + ; default void close(Vertx vertx) { return; } - ; } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java index 876fc5d1b3..b67be60dde 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java @@ -25,6 +25,7 @@ import java.net.URI; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -64,6 +65,8 @@ public AttributeEntry(String name, String expectedValue, Function attributes; + private final AtomicInteger count; + /** * All args constructor. * @@ -81,6 +84,7 @@ public AttributesFilter(final Map attributes) { } }))) .collect(Collectors.toUnmodifiableList()); + this.count = new AtomicInteger(0); } /** @@ -132,4 +136,14 @@ private static String getOrDefault(@Nullable final T s, final Function> filters; + private final AtomicReference> filters; + + private final AtomicInteger count; private final long periodicTimerId; @@ -38,8 +41,8 @@ public class AllFilter implements Filter { public AllFilter(List filters, Vertx vertx, long delayMilliseconds) { logger.debug("Starting with timeout {}", delayMilliseconds); this.periodicTimerId = vertx.setPeriodic(delayMilliseconds, this::reorder); - this.filters = new AtomicReference<>( - filters.stream().map(FilterCounter::new).collect(ImmutableList.toImmutableList())); + this.count = new AtomicInteger(0); + this.filters = new AtomicReference<>(filters.stream().collect(ImmutableList.toImmutableList())); } private void reorder(Long id) { @@ -48,28 +51,40 @@ private void reorder(Long id) { } logger.debug("Reordering filters!"); this.filters.updateAndGet((filterCounters -> filterCounters.stream() - .sorted(Comparator.comparingInt(FilterCounter::getCount).reversed()) + .sorted(Comparator.comparingInt(Filter::getCount).reversed()) .collect(ImmutableList.toImmutableList()))); } - private static boolean test( - CloudEvent cloudEvent, ImmutableList filters, Consumer shouldReorder) { + private static boolean test(CloudEvent cloudEvent, List filters, Consumer shouldReorder) { logger.debug("Testing event against ALL filters. Event {}", cloudEvent); - for (int i = 0; i < filters.size(); i++) { - final var filterCounter = filters.get(i); - if (!filterCounter.getFilter().test(cloudEvent)) { - shouldReorder.accept(i != 0); - filterCounter.incrementCount(); - logger.debug("Test failed. Filter {} Event {}", filterCounter.getFilter(), cloudEvent); + int i = 0; + for (final Filter filter : filters) { + if (!filter.test(cloudEvent)) { + int count = filter.incrementCount(); + if (i != 0 && count > 2 * filters.get(i - 1).getCount()) { + shouldReorder.accept(true); + } + filter.incrementCount(); + logger.debug("Test failed. Filter {} Event {}", filter, cloudEvent); return false; } + i++; } logger.debug("Test ALL filters succeeded. Event {}", cloudEvent); return true; } + @Override + public int getCount() { + return this.count.get(); + } + + @Override + public int incrementCount() { + return this.count.incrementAndGet(); + } + private void setShouldReorder(boolean shouldReorder) { - logger.debug("Filters should reorder!"); this.shouldReorder = shouldReorder; } @@ -82,6 +97,6 @@ public boolean test(CloudEvent cloudEvent) { public void close(Vertx vertx) { logger.debug("Closing periodic reorder job"); vertx.cancelTimer(this.periodicTimerId); - this.filters.get().forEach((f) -> f.getFilter().close(vertx)); + this.filters.get().forEach((f) -> f.close(vertx)); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java index e4813d2d8f..7f3e8c45af 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java @@ -21,6 +21,7 @@ import io.vertx.core.Vertx; import java.util.Comparator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.slf4j.Logger; @@ -30,7 +31,9 @@ public class AnyFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger(AnyFilter.class); - private final AtomicReference> filters; + private final AtomicReference> filters; + + private final AtomicInteger count; private final long periodicTimerId; @@ -38,8 +41,8 @@ public class AnyFilter implements Filter { public AnyFilter(List filters, Vertx vertx, long delayMilliseconds) { this.periodicTimerId = vertx.setPeriodic(delayMilliseconds, this::reorder); - this.filters = new AtomicReference<>( - filters.stream().map(FilterCounter::new).collect(ImmutableList.toImmutableList())); + this.count = new AtomicInteger(0); + this.filters = new AtomicReference<>(filters.stream().collect(ImmutableList.toImmutableList())); } private void reorder(Long id) { @@ -47,29 +50,41 @@ private void reorder(Long id) { return; } logger.debug("Reordering ANY filter!"); - this.filters.updateAndGet((filterCounters -> filterCounters.stream() - .sorted(Comparator.comparingInt(FilterCounter::getCount).reversed()) - .collect(ImmutableList.toImmutableList()))); + this.filters.set(this.filters.get().stream() + .sorted(Comparator.comparingInt(Filter::getCount).reversed()) + .collect(ImmutableList.toImmutableList())); + this.shouldReorder = false; } - private static boolean test( - CloudEvent cloudEvent, ImmutableList filters, Consumer shouldReorder) { + private static boolean test(CloudEvent cloudEvent, ImmutableList filters, Consumer shouldReorder) { logger.debug("Testing event against ANY filter. Event {}", cloudEvent); - for (int i = 0; i < filters.size(); i++) { - final var filterCounter = filters.get(i); - if (filterCounter.getFilter().test(cloudEvent)) { - shouldReorder.accept(i != 0); - filterCounter.incrementCount(); - logger.debug("Test succeeded. Filter {} Event {}", filterCounter.getFilter(), cloudEvent); + int i = 0; + for (final Filter filter : filters) { + if (filter.test(cloudEvent)) { + int count = filter.incrementCount(); + if (i != 0 && count > 2 * filters.get(i - 1).getCount()) { + shouldReorder.accept(true); + } + logger.debug("Test succeeded. Filter {} Event {}", filter, cloudEvent); return true; } + i++; } logger.debug("Test failed. All filters failed. Event {}", cloudEvent); return false; } + @Override + public int getCount() { + return this.count.get(); + } + + @Override + public int incrementCount() { + return this.count.incrementAndGet(); + } + private void setShouldReorder(boolean shouldReorder) { - logger.debug("Filters should reorder!"); this.shouldReorder = shouldReorder; } @@ -81,6 +96,6 @@ public boolean test(CloudEvent cloudEvent) { @Override public void close(Vertx vertx) { vertx.cancelTimer(this.periodicTimerId); - this.filters.get().forEach((f) -> f.getFilter().close(vertx)); + this.filters.get().forEach((f) -> f.close(vertx)); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java index df3c935d1c..a20e5b78bb 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java @@ -22,6 +22,7 @@ import io.cloudevents.sql.Expression; import io.cloudevents.sql.Parser; import io.cloudevents.sql.Type; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,12 +30,15 @@ public class CeSqlFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger(CeSqlFilter.class); + private final AtomicInteger count; + private final Expression expression; private final EvaluationRuntime runtime; public CeSqlFilter(String sqlExpression) { this.expression = Parser.parseDefault(sqlExpression); this.runtime = EvaluationRuntime.getDefault(); + this.count = new AtomicInteger(0); } @Override @@ -54,4 +58,14 @@ public boolean test(CloudEvent cloudEvent) { return false; } } + + @Override + public int getCount() { + return this.count.get(); + } + + @Override + public int incrementCount() { + return this.count.incrementAndGet(); + } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterCounter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterCounter.java deleted file mode 100644 index 8169536547..0000000000 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/FilterCounter.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; - -import dev.knative.eventing.kafka.broker.dispatcher.Filter; -import java.util.concurrent.atomic.AtomicInteger; - -public class FilterCounter { - private final Filter filter; - private final AtomicInteger count; - - public FilterCounter(Filter filter) { - this.filter = filter; - this.count = new AtomicInteger(0); - } - - public Filter getFilter() { - return filter; - } - - public int getCount() { - return count.get(); - } - - public int incrementCount() { - return this.count.incrementAndGet(); - } -} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java index 85be28735a..3d449cca08 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java @@ -18,6 +18,7 @@ import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; import io.vertx.core.Vertx; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,8 +28,11 @@ public class NotFilter implements Filter { private final Filter filter; + private final AtomicInteger count; + public NotFilter(Filter filter) { this.filter = filter; + this.count = new AtomicInteger(0); } @Override @@ -40,6 +44,16 @@ public boolean test(CloudEvent cloudEvent) { return passed; } + @Override + public int getCount() { + return this.count.get(); + } + + @Override + public int incrementCount() { + return this.count.incrementAndGet(); + } + @Override public void close(Vertx vertx) { this.filter.close(vertx); From 573874c6bc43649aec34a6f2d8239ae72e83fd27 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 18 Oct 2023 12:00:00 -0400 Subject: [PATCH 6/8] further improvements Signed-off-by: Calum Murray --- .../dispatcher/impl/filter/AttributesFilter.java | 8 ++++---- .../impl/filter/subscriptionsapi/AllFilter.java | 8 ++++---- .../impl/filter/subscriptionsapi/AnyFilter.java | 12 ++++++------ .../impl/filter/subscriptionsapi/CeSqlFilter.java | 8 ++++---- .../impl/filter/subscriptionsapi/NotFilter.java | 8 ++++---- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java index b67be60dde..99b490219c 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java @@ -65,7 +65,7 @@ public AttributeEntry(String name, String expectedValue, Function attributes; - private final AtomicInteger count; + private int count; /** * All args constructor. @@ -84,7 +84,7 @@ public AttributesFilter(final Map attributes) { } }))) .collect(Collectors.toUnmodifiableList()); - this.count = new AtomicInteger(0); + this.count = 0; } /** @@ -139,11 +139,11 @@ private static boolean isNotEmpty(final String value) { @Override public int getCount() { - return this.count.get(); + return this.count; } @Override public int incrementCount() { - return this.count.incrementAndGet(); + return this.count++; } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java index 87d6e1932e..58a765efeb 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java @@ -32,7 +32,7 @@ public class AllFilter implements Filter { private final AtomicReference> filters; - private final AtomicInteger count; + private int count; private final long periodicTimerId; @@ -41,7 +41,7 @@ public class AllFilter implements Filter { public AllFilter(List filters, Vertx vertx, long delayMilliseconds) { logger.debug("Starting with timeout {}", delayMilliseconds); this.periodicTimerId = vertx.setPeriodic(delayMilliseconds, this::reorder); - this.count = new AtomicInteger(0); + this.count = 0; this.filters = new AtomicReference<>(filters.stream().collect(ImmutableList.toImmutableList())); } @@ -76,12 +76,12 @@ private static boolean test(CloudEvent cloudEvent, List filters, Consume @Override public int getCount() { - return this.count.get(); + return this.count; } @Override public int incrementCount() { - return this.count.incrementAndGet(); + return this.count++; } private void setShouldReorder(boolean shouldReorder) { diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java index 7f3e8c45af..98f30250a5 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java @@ -33,7 +33,7 @@ public class AnyFilter implements Filter { private final AtomicReference> filters; - private final AtomicInteger count; + private int count; private final long periodicTimerId; @@ -41,7 +41,7 @@ public class AnyFilter implements Filter { public AnyFilter(List filters, Vertx vertx, long delayMilliseconds) { this.periodicTimerId = vertx.setPeriodic(delayMilliseconds, this::reorder); - this.count = new AtomicInteger(0); + this.count = 0; this.filters = new AtomicReference<>(filters.stream().collect(ImmutableList.toImmutableList())); } @@ -61,8 +61,8 @@ private static boolean test(CloudEvent cloudEvent, ImmutableList filters int i = 0; for (final Filter filter : filters) { if (filter.test(cloudEvent)) { - int count = filter.incrementCount(); - if (i != 0 && count > 2 * filters.get(i - 1).getCount()) { +// int count = filter.incrementCount(); + if (i != 0 && filter.incrementCount() > 2 * filters.get(i - 1).getCount()) { shouldReorder.accept(true); } logger.debug("Test succeeded. Filter {} Event {}", filter, cloudEvent); @@ -76,12 +76,12 @@ private static boolean test(CloudEvent cloudEvent, ImmutableList filters @Override public int getCount() { - return this.count.get(); + return this.count; } @Override public int incrementCount() { - return this.count.incrementAndGet(); + return this.count++; } private void setShouldReorder(boolean shouldReorder) { diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java index a20e5b78bb..f9649566e7 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java @@ -30,7 +30,7 @@ public class CeSqlFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger(CeSqlFilter.class); - private final AtomicInteger count; + private int count; private final Expression expression; private final EvaluationRuntime runtime; @@ -38,7 +38,7 @@ public class CeSqlFilter implements Filter { public CeSqlFilter(String sqlExpression) { this.expression = Parser.parseDefault(sqlExpression); this.runtime = EvaluationRuntime.getDefault(); - this.count = new AtomicInteger(0); + this.count = 0; } @Override @@ -61,11 +61,11 @@ public boolean test(CloudEvent cloudEvent) { @Override public int getCount() { - return this.count.get(); + return this.count; } @Override public int incrementCount() { - return this.count.incrementAndGet(); + return this.count++; } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java index 3d449cca08..291aa90399 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java @@ -28,11 +28,11 @@ public class NotFilter implements Filter { private final Filter filter; - private final AtomicInteger count; + private int count; public NotFilter(Filter filter) { this.filter = filter; - this.count = new AtomicInteger(0); + this.count = 0; } @Override @@ -46,12 +46,12 @@ public boolean test(CloudEvent cloudEvent) { @Override public int getCount() { - return this.count.get(); + return this.count; } @Override public int incrementCount() { - return this.count.incrementAndGet(); + return this.count++; } @Override From 0fbd5586629e7a10e46484d421ee253efc207e0b Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 18 Oct 2023 13:50:36 -0400 Subject: [PATCH 7/8] Speed up filter creation time Signed-off-by: Calum Murray --- .../dispatcher/impl/filter/AllFilterBenchmark.java | 13 +++++++------ .../dispatcher/impl/filter/AnyFilterBenchmark.java | 13 +++++++------ .../impl/filter/subscriptionsapi/AllFilter.java | 4 ++-- .../impl/filter/subscriptionsapi/AnyFilter.java | 4 ++-- .../dispatcher/main/ConsumerVerticleBuilder.java | 7 ++++--- .../impl/filter/subscriptionsapi/AllFilterTest.java | 11 ++++++----- .../impl/filter/subscriptionsapi/AnyFilterTest.java | 13 +++++++------ 7 files changed, 35 insertions(+), 30 deletions(-) diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java index 4487ba61cd..2d8f89c337 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java @@ -16,6 +16,7 @@ package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.dispatcher.Filter; import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.AllFilter; import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter; @@ -51,7 +52,7 @@ public static class AllFilterWithExactFilter extends FilterBenchmark { @Override protected Filter createFilter() { - return new AllFilter(List.of(makeExactFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); + return new AllFilter(ImmutableList.of(makeExactFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -65,7 +66,7 @@ public static class AllFilterMatchAllSubFilters extends FilterBenchmark { @Override protected Filter createFilter() { return new AllFilter( - List.of(makeExactFilter(), makePrefixFilter(), makeSuffixFilter()), + ImmutableList.of(makeExactFilter(), makePrefixFilter(), makeSuffixFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @@ -81,7 +82,7 @@ public static class AllFilterFirstMatchEndOfArray extends FilterBenchmark { @Override protected Filter createFilter() { return new AllFilter( - List.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch(), makeExactFilter()), + ImmutableList.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch(), makeExactFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @@ -97,7 +98,7 @@ public static class AllFilterFirstMatchStartOfArray extends FilterBenchmark { @Override protected Filter createFilter() { return new AllFilter( - List.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSuffixFilterNoMatch()), + ImmutableList.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSuffixFilterNoMatch()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @@ -113,7 +114,7 @@ public static class AllFilterOneNonMatchingFilterInMiddle extends FilterBenchmar @Override protected Filter createFilter() { return new AllFilter( - List.of(makeExactFilter(), makePrefixFilterNoMatch(), makePrefixFilter()), + ImmutableList.of(makeExactFilter(), makePrefixFilterNoMatch(), makePrefixFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @@ -129,7 +130,7 @@ public static class AllFilterNoMatchingFilters extends FilterBenchmark { @Override protected Filter createFilter() { return new AllFilter( - List.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch()), + ImmutableList.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java index c1780008bc..60b1427ad8 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java @@ -16,6 +16,7 @@ package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.dispatcher.Filter; import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.*; import io.cloudevents.CloudEvent; @@ -56,7 +57,7 @@ public static class AnyFilterWithExactFilterBenchmark extends FilterBenchmark { @Override protected Filter createFilter() { - return new AnyFilter(List.of(makeExactFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); + return new AnyFilter(ImmutableList.of(makeExactFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -70,7 +71,7 @@ public static class AnyFilterMatchAllSubfilters extends FilterBenchmark { @Override protected Filter createFilter() { return new AnyFilter( - List.of(makeExactFilter(), makePrefixFilter(), makeSuffixFilter()), + ImmutableList.of(makeExactFilter(), makePrefixFilter(), makeSuffixFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @@ -86,7 +87,7 @@ public static class AnyFilterFirstMatchAtEnd extends FilterBenchmark { @Override protected Filter createFilter() { return new AnyFilter( - List.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch(), makeExactFilter()), + ImmutableList.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch(), makeExactFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @@ -102,7 +103,7 @@ public static class AnyFilterFirstMatchAtStart extends FilterBenchmark { @Override protected Filter createFilter() { return new AnyFilter( - List.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSuffixFilterNoMatch()), + ImmutableList.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSuffixFilterNoMatch()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @@ -118,7 +119,7 @@ public static class AnyFilter2EventsMatch2DifferentFilters extends FilterBenchma @Override protected Filter createFilter() { return new AnyFilter( - List.of(makePrefixFilter(), makePrefixFilterNoMatch()), vertx, FILTER_REORDER_TIME_MILLISECONDS); + ImmutableList.of(makePrefixFilter(), makePrefixFilterNoMatch()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -131,7 +132,7 @@ public static class AnyFilter2EventsMatch2DifferentFiltersOneFilterMatchesNeithe @Override protected Filter createFilter() { return new AnyFilter( - List.of(makeSuffixFilterNoMatch(), makePrefixFilter(), makePrefixFilterNoMatch()), + ImmutableList.of(makeSuffixFilterNoMatch(), makePrefixFilter(), makePrefixFilterNoMatch()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java index 58a765efeb..a7e2c827f7 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java @@ -38,11 +38,11 @@ public class AllFilter implements Filter { private boolean shouldReorder; - public AllFilter(List filters, Vertx vertx, long delayMilliseconds) { + public AllFilter(ImmutableList filters, Vertx vertx, long delayMilliseconds) { logger.debug("Starting with timeout {}", delayMilliseconds); this.periodicTimerId = vertx.setPeriodic(delayMilliseconds, this::reorder); this.count = 0; - this.filters = new AtomicReference<>(filters.stream().collect(ImmutableList.toImmutableList())); + this.filters = new AtomicReference<>(filters); } private void reorder(Long id) { diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java index 98f30250a5..d22e403de5 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java @@ -39,10 +39,10 @@ public class AnyFilter implements Filter { private boolean shouldReorder; - public AnyFilter(List filters, Vertx vertx, long delayMilliseconds) { + public AnyFilter(ImmutableList filters, Vertx vertx, long delayMilliseconds) { this.periodicTimerId = vertx.setPeriodic(delayMilliseconds, this::reorder); this.count = 0; - this.filters = new AtomicReference<>(filters.stream().collect(ImmutableList.toImmutableList())); + this.filters = new AtomicReference<>(filters); } private void reorder(Long id) { diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java index 3819bc939a..5534d8dd53 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java @@ -17,6 +17,7 @@ import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer; import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; @@ -198,7 +199,7 @@ private static Filter getFilter(List filters, filters.stream() .map((DataPlaneContract.DialectedFilter filter) -> ConsumerVerticleBuilder.getFilter(filter, vertx)) - .collect(Collectors.toList()), + .collect(ImmutableList.toImmutableList()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @@ -212,13 +213,13 @@ private static Filter getFilter(DataPlaneContract.DialectedFilter filter, Vertx case ANY -> new AnyFilter( filter.getAny().getFiltersList().stream() .map((DataPlaneContract.DialectedFilter f) -> ConsumerVerticleBuilder.getFilter(f, vertx)) - .collect(Collectors.toList()), + .collect(ImmutableList.toImmutableList()), vertx, FILTER_REORDER_TIME_MILLISECONDS); case ALL -> new AllFilter( filter.getAll().getFiltersList().stream() .map((DataPlaneContract.DialectedFilter f) -> ConsumerVerticleBuilder.getFilter(f, vertx)) - .collect(Collectors.toList()), + .collect(ImmutableList.toImmutableList()), vertx, FILTER_REORDER_TIME_MILLISECONDS); case CESQL -> new CeSqlFilter(filter.getCesql().getExpression()); diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java index 816794b3fd..e5d84c8f02 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; @@ -55,11 +56,11 @@ public void match(CloudEvent event, Filter filter, boolean shouldMatch) { static Stream testCases() { return Stream.of( - Arguments.of(event, new AllFilter(List.of(new ExactFilter(Map.of("id", "123-42"))), vertx, 500), true), + Arguments.of(event, new AllFilter(ImmutableList.of(new ExactFilter(Map.of("id", "123-42"))), vertx, 500), true), Arguments.of( event, new AllFilter( - List.of( + ImmutableList.of( new ExactFilter(Map.of("id", "123-42")), new ExactFilter(Map.of("source", "/api/some-source"))), vertx, @@ -68,7 +69,7 @@ static Stream testCases() { Arguments.of( event, new AllFilter( - List.of( + ImmutableList.of( new ExactFilter(Map.of("id", "123")), new ExactFilter(Map.of("source", "/api/some-source"))), vertx, @@ -77,12 +78,12 @@ static Stream testCases() { Arguments.of( event, new AllFilter( - List.of( + ImmutableList.of( new ExactFilter(Map.of("id", "123-42")), new ExactFilter(Map.of("source", "/api/something-else"))), vertx, 500), false), - Arguments.of(event, new AllFilter(Collections.emptyList(), Vertx.vertx(), 500), true)); + Arguments.of(event, new AllFilter(ImmutableList.of(), Vertx.vertx(), 500), true)); } } diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java index 359799efc6..6e081990d0 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; @@ -55,11 +56,11 @@ public void match(CloudEvent event, Filter filter, boolean shouldMatch) { static Stream testCases() { return Stream.of( - Arguments.of(event, new AnyFilter(List.of(new ExactFilter(Map.of("id", "123-42"))), vertx, 500), true), + Arguments.of(event, new AnyFilter(ImmutableList.of(new ExactFilter(Map.of("id", "123-42"))), vertx, 500), true), Arguments.of( event, new AnyFilter( - List.of( + ImmutableList.of( new ExactFilter(Map.of("id", "123-42")), new ExactFilter(Map.of("source", "/api/some-source"))), vertx, @@ -68,7 +69,7 @@ static Stream testCases() { Arguments.of( event, new AnyFilter( - List.of( + ImmutableList.of( new ExactFilter(Map.of("id", "123")), new ExactFilter(Map.of("source", "/api/some-source"))), vertx, @@ -77,7 +78,7 @@ static Stream testCases() { Arguments.of( event, new AnyFilter( - List.of( + ImmutableList.of( new ExactFilter(Map.of("id", "123-42")), new ExactFilter(Map.of("source", "/api/something-else"))), vertx, @@ -86,12 +87,12 @@ static Stream testCases() { Arguments.of( event, new AnyFilter( - List.of( + ImmutableList.of( new ExactFilter(Map.of("id", "123")), new ExactFilter(Map.of("source", "/api/something-else"))), vertx, 500), false), - Arguments.of(event, new AnyFilter(Collections.emptyList(), vertx, 500), false)); + Arguments.of(event, new AnyFilter(ImmutableList.of(), vertx, 500), false)); } } From 35bc70700b65637a1a196c0d26a55423cff9f2e5 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 18 Oct 2023 15:04:54 -0400 Subject: [PATCH 8/8] Final fixes Signed-off-by: Calum Murray --- .../dispatcher/impl/filter/AllFilterBenchmark.java | 1 - .../dispatcher/impl/filter/AnyFilterBenchmark.java | 5 +++-- .../dispatcher/impl/filter/AttributesFilter.java | 1 - .../impl/filter/subscriptionsapi/AllFilter.java | 3 +-- .../impl/filter/subscriptionsapi/AnyFilter.java | 10 ++++------ .../impl/filter/subscriptionsapi/CeSqlFilter.java | 1 - .../impl/filter/subscriptionsapi/NotFilter.java | 1 - .../dispatcher/main/ConsumerVerticleBuilder.java | 1 - .../impl/filter/subscriptionsapi/AllFilterTest.java | 7 ++++--- .../impl/filter/subscriptionsapi/AnyFilterTest.java | 7 ++++--- 10 files changed, 16 insertions(+), 21 deletions(-) diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java index 2d8f89c337..43f18305cf 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java @@ -23,7 +23,6 @@ import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter; import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter; import io.cloudevents.CloudEvent; -import java.util.List; import java.util.Map; public class AllFilterBenchmark { diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java index 60b1427ad8..b8347a0302 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java @@ -20,7 +20,6 @@ import dev.knative.eventing.kafka.broker.dispatcher.Filter; import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.*; import io.cloudevents.CloudEvent; -import java.util.List; import java.util.Map; public class AnyFilterBenchmark { @@ -119,7 +118,9 @@ public static class AnyFilter2EventsMatch2DifferentFilters extends FilterBenchma @Override protected Filter createFilter() { return new AnyFilter( - ImmutableList.of(makePrefixFilter(), makePrefixFilterNoMatch()), vertx, FILTER_REORDER_TIME_MILLISECONDS); + ImmutableList.of(makePrefixFilter(), makePrefixFilterNoMatch()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java index 99b490219c..459400cb3f 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java @@ -25,7 +25,6 @@ import java.net.URI; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; import org.slf4j.Logger; diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java index a7e2c827f7..da40763b9b 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java @@ -21,7 +21,6 @@ import io.vertx.core.Vertx; import java.util.Comparator; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.slf4j.Logger; @@ -53,6 +52,7 @@ private void reorder(Long id) { this.filters.updateAndGet((filterCounters -> filterCounters.stream() .sorted(Comparator.comparingInt(Filter::getCount).reversed()) .collect(ImmutableList.toImmutableList()))); + this.shouldReorder = false; } private static boolean test(CloudEvent cloudEvent, List filters, Consumer shouldReorder) { @@ -64,7 +64,6 @@ private static boolean test(CloudEvent cloudEvent, List filters, Consume if (i != 0 && count > 2 * filters.get(i - 1).getCount()) { shouldReorder.accept(true); } - filter.incrementCount(); logger.debug("Test failed. Filter {} Event {}", filter, cloudEvent); return false; } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java index d22e403de5..24bb316d97 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java @@ -20,8 +20,6 @@ import io.cloudevents.CloudEvent; import io.vertx.core.Vertx; import java.util.Comparator; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.slf4j.Logger; @@ -50,9 +48,9 @@ private void reorder(Long id) { return; } logger.debug("Reordering ANY filter!"); - this.filters.set(this.filters.get().stream() + this.filters.updateAndGet((filterCounters -> filterCounters.stream() .sorted(Comparator.comparingInt(Filter::getCount).reversed()) - .collect(ImmutableList.toImmutableList())); + .collect(ImmutableList.toImmutableList()))); this.shouldReorder = false; } @@ -61,8 +59,8 @@ private static boolean test(CloudEvent cloudEvent, ImmutableList filters int i = 0; for (final Filter filter : filters) { if (filter.test(cloudEvent)) { -// int count = filter.incrementCount(); - if (i != 0 && filter.incrementCount() > 2 * filters.get(i - 1).getCount()) { + int count = filter.incrementCount(); + if (i != 0 && count > 2 * filters.get(i - 1).getCount()) { shouldReorder.accept(true); } logger.debug("Test succeeded. Filter {} Event {}", filter, cloudEvent); diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java index f9649566e7..d386d8bf5e 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java @@ -22,7 +22,6 @@ import io.cloudevents.sql.Expression; import io.cloudevents.sql.Parser; import io.cloudevents.sql.Type; -import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java index 291aa90399..5dc898a44d 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java @@ -18,7 +18,6 @@ import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; import io.vertx.core.Vertx; -import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java index 5534d8dd53..17704efbef 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java @@ -62,7 +62,6 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java index e5d84c8f02..c471dd2884 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java @@ -25,8 +25,6 @@ import java.net.URI; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; @@ -56,7 +54,10 @@ public void match(CloudEvent event, Filter filter, boolean shouldMatch) { static Stream testCases() { return Stream.of( - Arguments.of(event, new AllFilter(ImmutableList.of(new ExactFilter(Map.of("id", "123-42"))), vertx, 500), true), + Arguments.of( + event, + new AllFilter(ImmutableList.of(new ExactFilter(Map.of("id", "123-42"))), vertx, 500), + true), Arguments.of( event, new AllFilter( diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java index 6e081990d0..518af42caf 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java @@ -25,8 +25,6 @@ import java.net.URI; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; @@ -56,7 +54,10 @@ public void match(CloudEvent event, Filter filter, boolean shouldMatch) { static Stream testCases() { return Stream.of( - Arguments.of(event, new AnyFilter(ImmutableList.of(new ExactFilter(Map.of("id", "123-42"))), vertx, 500), true), + Arguments.of( + event, + new AnyFilter(ImmutableList.of(new ExactFilter(Map.of("id", "123-42"))), vertx, 500), + true), Arguments.of( event, new AnyFilter(