From 4f2266cf8435ebdec0520362d1e799b16e1a5713 Mon Sep 17 00:00:00 2001
From: sinsy <550569627@qq.com>
Date: Fri, 26 Jan 2024 14:54:52 +0800
Subject: [PATCH 1/3] feat: peak ewma lb

---
 .../spi/PeakEWMALoadBalancer.java             | 145 ++++++++++++++++++
 1 file changed, 145 insertions(+)
 create mode 100644 shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java

diff --git a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
new file mode 100644
index 000000000000..18e9c01c4caf
--- /dev/null
+++ b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.loadbalancer.spi;
+
+import org.apache.shenyu.loadbalancer.entity.Upstream;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * <p>
+ * PeakEwmaLoadBalance is designed to converge quickly when encountering slow endpoints.
+ * It is quick to react to latency spikes recovering only cautiously.Peak EWMA takes
+ * history into account,so that slow behavior is penalized relative to the
+ * supplied `decayTime`.
+ * if there are multiple invokers and the same cost,then randomly called,which doesn't care
+ * about weight.
+ * <p>
+ * Inspiration drawn from:
+ * https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
+ * /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
+ *
+ * https://github.com/apache/dubbo-spi-extensions/blob/efd18a63468f817a7581fea44e9e2e3f35d9c9ba
+ * /dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/java/org/apache
+ * /dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalance.java#L46
+ */
+public class PeakEWMALoadBalancer extends AbstractLoadBalancer {
+
+    private static final double PENALTY = Long.MAX_VALUE >> 16;
+
+    private static final double ZERO_COST = 1E-6;
+
+    private static double decayTime = 10000;
+
+    @Override
+    protected Upstream doSelect(List<Upstream> upstreamList, String ip) {
+        double minResponse = Double.MAX_VALUE;
+
+        List<Integer> selectInvokerIndexList = new ArrayList<>(upstreamList.size());
+
+        for (int i = 0; i < upstreamList.size(); i++) {
+            Metric metric = new Metric(upstreamList.get(i));
+            double estimateResponse = metric.getCost();
+
+            if (estimateResponse < minResponse) {
+                selectInvokerIndexList.clear();
+                selectInvokerIndexList.add(i);
+                minResponse = estimateResponse;
+            } else if (estimateResponse == minResponse) {
+                selectInvokerIndexList.add(i);
+            }
+        }
+
+        return upstreamList.get(selectInvokerIndexList.get(ThreadLocalRandom.current().nextInt(selectInvokerIndexList.size())));
+    }
+
+    protected static class Metric {
+
+        /**
+         *  last timestamp in Millis we observed an runningTime
+         */
+        private volatile long lastUpdateTime;
+
+        /**
+         * ewma of rtt, sensitive to peaks.
+         */
+        private volatile double cost;
+
+        private Upstream upstream;
+
+        private long invokeOffset;
+
+        private long invokeElapsedOffset;
+
+        //lock for get and set cost
+        ReentrantLock ewmaLock = new ReentrantLock();
+
+        public Metric(Upstream upstream) {
+            this.upstream = upstream;
+            this.lastUpdateTime = System.currentTimeMillis();
+            this.cost = 0.0;
+            this.invokeOffset = 0;
+            this.invokeElapsedOffset = 0;
+        }
+
+        private void observe() {
+            double rtt = 0;
+            long succeed = this.upstream.getSucceeded().get() - this.invokeOffset;
+            if (succeed != 0) {
+                rtt = (this.upstream.getSucceededElapsed().get() * 1.0 - this.invokeElapsedOffset) / succeed;
+            }
+
+            final long currentTime = System.currentTimeMillis();
+            long td = Math.max(currentTime - lastUpdateTime, 0);
+            double w = Math.exp(-td / decayTime);
+            if (rtt > cost) {
+                cost = rtt;
+            } else {
+                cost = cost * w + rtt * (1.0 - w);
+            }
+
+            lastUpdateTime = currentTime;
+
+//            invokeOffset = upstream.getTotal();
+//            invokeElapsedOffset = upstream.getTotalElapsed();
+
+        }
+
+        private double getCost() {
+            ewmaLock.lock();
+            observe();
+            int active = 0;
+            if (upstream.isHealthy()) {
+                active = 1;
+            }
+
+            ewmaLock.unlock();
+
+            double costTemp = cost;
+
+            //If we don't have any latency history, we penalize the host on the first probe.
+            return (costTemp < ZERO_COST && active != 0) ? PENALTY + active : costTemp * (active + 1);
+        }
+
+    }
+
+
+}

From d819f27c2e43a16e150816e530d3cae9b80d0e58 Mon Sep 17 00:00:00 2001
From: sinsy <550569627@qq.com>
Date: Wed, 31 Jan 2024 10:51:35 +0800
Subject: [PATCH 2/3] add spi

---
 .../shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java       | 6 ++++--
 .../shenyu/org.apache.shenyu.loadbalancer.spi.LoadBalancer  | 3 ++-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
index 18e9c01c4caf..e4f580a79fce 100644
--- a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
+++ b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
@@ -18,6 +18,7 @@
 package org.apache.shenyu.loadbalancer.spi;
 
 import org.apache.shenyu.loadbalancer.entity.Upstream;
+import org.apache.shenyu.spi.Join;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -41,13 +42,14 @@
  * /dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/java/org/apache
  * /dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalance.java#L46
  */
+@Join
 public class PeakEWMALoadBalancer extends AbstractLoadBalancer {
 
     private static final double PENALTY = Long.MAX_VALUE >> 16;
 
     private static final double ZERO_COST = 1E-6;
 
-    private static double decayTime = 10000;
+    private static final double DECAY_TIME = 600;
 
     @Override
     protected Upstream doSelect(List<Upstream> upstreamList, String ip) {
@@ -109,7 +111,7 @@ private void observe() {
 
             final long currentTime = System.currentTimeMillis();
             long td = Math.max(currentTime - lastUpdateTime, 0);
-            double w = Math.exp(-td / decayTime);
+            double w = Math.exp( -td / DECAY_TIME);
             if (rtt > cost) {
                 cost = rtt;
             } else {
diff --git a/shenyu-loadbalancer/src/main/resources/META-INF/shenyu/org.apache.shenyu.loadbalancer.spi.LoadBalancer b/shenyu-loadbalancer/src/main/resources/META-INF/shenyu/org.apache.shenyu.loadbalancer.spi.LoadBalancer
index 109f1f482b1c..f0a096ccf51a 100644
--- a/shenyu-loadbalancer/src/main/resources/META-INF/shenyu/org.apache.shenyu.loadbalancer.spi.LoadBalancer
+++ b/shenyu-loadbalancer/src/main/resources/META-INF/shenyu/org.apache.shenyu.loadbalancer.spi.LoadBalancer
@@ -18,4 +18,5 @@ roundRobin=org.apache.shenyu.loadbalancer.spi.RoundRobinLoadBalancer
 hash=org.apache.shenyu.loadbalancer.spi.HashLoadBalancer
 leastActive=org.apache.shenyu.loadbalancer.spi.LeastActiveLoadBalance
 p2c=org.apache.shenyu.loadbalancer.spi.P2cLoadBalancer
-shortestResponse=org.apache.shenyu.loadbalancer.spi.ShortestResponseLoadBalancer
\ No newline at end of file
+shortestResponse=org.apache.shenyu.loadbalancer.spi.ShortestResponseLoadBalancer
+peakEWMA=org.apache.shenyu.loadbalancer.spi.PeakEWMALoadBalancer
\ No newline at end of file

From 93e619c5e027016722016ce7276732f61532884d Mon Sep 17 00:00:00 2001
From: sinsy <550569627@qq.com>
Date: Wed, 31 Jan 2024 15:48:38 +0800
Subject: [PATCH 3/3] change get metric way

---
 .../spi/PeakEWMALoadBalancer.java             | 30 +++++++++----------
 1 file changed, 14 insertions(+), 16 deletions(-)

diff --git a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
index e4f580a79fce..73c2128574ad 100644
--- a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
+++ b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
@@ -22,6 +22,8 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -51,14 +53,21 @@ public class PeakEWMALoadBalancer extends AbstractLoadBalancer {
 
     private static final double DECAY_TIME = 600;
 
+    private Map<Upstream, Metric> upstreamMetricMap = new ConcurrentHashMap<>();
+
     @Override
     protected Upstream doSelect(List<Upstream> upstreamList, String ip) {
         double minResponse = Double.MAX_VALUE;
 
         List<Integer> selectInvokerIndexList = new ArrayList<>(upstreamList.size());
-
+        Metric metric;
         for (int i = 0; i < upstreamList.size(); i++) {
-            Metric metric = new Metric(upstreamList.get(i));
+            if (upstreamMetricMap.containsKey(upstreamList.get(i))) {
+                metric = upstreamMetricMap.get(upstreamList.get(i));
+            } else {
+                metric = new Metric(upstreamList.get(i));
+                upstreamMetricMap.put(upstreamList.get(i), metric);
+            }
             double estimateResponse = metric.getCost();
 
             if (estimateResponse < minResponse) {
@@ -87,10 +96,6 @@ protected static class Metric {
 
         private Upstream upstream;
 
-        private long invokeOffset;
-
-        private long invokeElapsedOffset;
-
         //lock for get and set cost
         ReentrantLock ewmaLock = new ReentrantLock();
 
@@ -98,20 +103,16 @@ public Metric(Upstream upstream) {
             this.upstream = upstream;
             this.lastUpdateTime = System.currentTimeMillis();
             this.cost = 0.0;
-            this.invokeOffset = 0;
-            this.invokeElapsedOffset = 0;
         }
 
         private void observe() {
             double rtt = 0;
-            long succeed = this.upstream.getSucceeded().get() - this.invokeOffset;
-            if (succeed != 0) {
-                rtt = (this.upstream.getSucceededElapsed().get() * 1.0 - this.invokeElapsedOffset) / succeed;
-            }
+
+            rtt = Math.max(this.upstream.getResponseStamp() - this.upstream.getLastPicked(), rtt);
 
             final long currentTime = System.currentTimeMillis();
             long td = Math.max(currentTime - lastUpdateTime, 0);
-            double w = Math.exp( -td / DECAY_TIME);
+            double w = Math.exp(-td / DECAY_TIME);
             if (rtt > cost) {
                 cost = rtt;
             } else {
@@ -120,8 +121,6 @@ private void observe() {
 
             lastUpdateTime = currentTime;
 
-//            invokeOffset = upstream.getTotal();
-//            invokeElapsedOffset = upstream.getTotalElapsed();
 
         }
 
@@ -143,5 +142,4 @@ private double getCost() {
 
     }
 
-
 }