Skip to content

Commit

Permalink
《Eureka 源码解析 —— StringCache》
Browse files Browse the repository at this point in the history
  • Loading branch information
YunaiV committed Oct 19, 2017
1 parent 7f868f9 commit 2a2b05c
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ public interface EurekaInstanceConfig {
* address. The exact field values will depend on the implementation details of the corresponding
* implementing DataCenterInfo types.
*
* // TODO 芋艿,亚马逊,暂时跳过
*
* @return an ordered list of fields that should be used to preferentially
* resolve this instance's default address, empty String[] for default.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public class DiscoveryClient implements EurekaClient {
private final AtomicLong fetchRegistryGeneration;
private final ApplicationInfoManager applicationInfoManager;
private final InstanceInfo instanceInfo;
/**
/**sync
* 获取哪些区域( Region )集合的注册信息
*/
private final AtomicReference<String> remoteRegionsToFetch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,10 @@ public interface EurekaClientConfig {
/**
* Indicates whether the client is only interested in the registry information for a single VIP.
*
* TODO 芋艿:后面研究下
* 只获得一个 `vipAddress` 对应的应用实例们的注册信息。
*
* {@link AbstractVIPResource}
* {@link com.netflix.discovery.shared.transport.EurekaHttpClient#getVip(String, String...)}
*
* @return the address of the VIP (name:port).
* <code>null</code> if single VIP interest is not present.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,18 @@ class InstanceInfoReplicator implements Runnable {
*/
private final AtomicBoolean started;

private final RateLimiter rateLimiter; // 限流相关,跳过
private final int burstSize; // 限流相关,跳过
private final int allowedRatePerMinute; // 限流相关,跳过
/**
* RateLimiter
*/
private final RateLimiter rateLimiter;
/**
* 令牌桶上限,默认:2
*/
private final int burstSize;
/**
* 令牌再装平均速率,默认:60 * 2 / 30 = 4
*/
private final int allowedRatePerMinute;

InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
this.discoveryClient = discoveryClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,18 @@
*/
public class RateLimiter {

/**
* 速率单位转换成毫秒
*/
private final long rateToMsConversion;

/**
* 消耗令牌数
*/
private final AtomicInteger consumedTokens = new AtomicInteger();
/**
* 最后填充令牌的时间
*/
private final AtomicLong lastRefillTime = new AtomicLong(0);

@Deprecated
Expand All @@ -47,17 +56,24 @@ public RateLimiter() {

public RateLimiter(TimeUnit averageRateUnit) {
switch (averageRateUnit) {
case SECONDS:
case SECONDS: // 秒级
rateToMsConversion = 1000;
break;
case MINUTES:
case MINUTES: // 分钟级
rateToMsConversion = 60 * 1000;
break;
default:
throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported");
}
}

/**
* 获取令牌( Token )
*
* @param burstSize 令牌桶上限
* @param averageRate 令牌再装平均速率
* @return 是否获取成功
*/
public boolean acquire(int burstSize, long averageRate) {
return acquire(burstSize, averageRate, System.currentTimeMillis());
}
Expand All @@ -67,24 +83,33 @@ public boolean acquire(int burstSize, long averageRate, long currentTimeMillis)
return true;
}

// 填充 令牌
refillToken(burstSize, averageRate, currentTimeMillis);
// 消费 令牌
return consumeToken(burstSize);
}

private void refillToken(int burstSize, long averageRate, long currentTimeMillis) {
// 获得 最后填充令牌的时间
long refillTime = lastRefillTime.get();
// 获得 过去多少毫秒
long timeDelta = currentTimeMillis - refillTime;

// 计算 可填充最大令牌数量
long newTokens = timeDelta * averageRate / rateToMsConversion;
if (newTokens > 0) {
// 计算 新的填充令牌的时间
long newRefillTime = refillTime == 0
? currentTimeMillis
: refillTime + newTokens * rateToMsConversion / averageRate;
// CAS 保证有且仅有一个线程进入填充
if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) {
while (true) {
while (true) { // 死循环,直到成功
// 计算 填充令牌后的已消耗令牌数量
int currentLevel = consumedTokens.get();
int adjustedLevel = Math.min(currentLevel, burstSize); // In case burstSize decreased
int newLevel = (int) Math.max(0, adjustedLevel - newTokens);
// CAS 避免和正在消费令牌的线程冲突
if (consumedTokens.compareAndSet(currentLevel, newLevel)) {
return;
}
Expand All @@ -94,11 +119,13 @@ private void refillToken(int burstSize, long averageRate, long currentTimeMillis
}

private boolean consumeToken(int burstSize) {
while (true) {
while (true) { // 死循环,直到没有令牌,或者获取令牌成功
// 没有令牌
int currentLevel = consumedTokens.get();
if (currentLevel >= burstSize) {
return false;
}
// CAS 避免和正在消费令牌或者填充令牌的线程冲突
if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
return true;
}
Expand All @@ -109,4 +136,11 @@ public void reset() {
consumedTokens.set(0);
lastRefillTime.set(0);
}

public static void main(String[] args) {
RateLimiter limiter = new RateLimiter(TimeUnit.SECONDS);
limiter.acquire(10, 500L);
limiter.acquire(10, 500L);
limiter.acquire(10, 500L);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.netflix.discovery.util;

import org.apache.commons.lang.math.RandomUtils;

import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -70,4 +71,61 @@ public int size() {
public static String intern(String original) {
return INSTANCE.cachedValueOf(original);
}

public static void main(String[] args) {
if (true) {
String s = "qqq";
INSTANCE.cachedValueOf(s);

s = null;
System.gc();

System.out.println(INSTANCE.cache.get("qqq").get());

return;
}

int size = 10000000;
List<String> results = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
// results.add(String.valueOf(i));
results.add(String.valueOf(RandomUtils.nextInt(10)));
}
int scene = 1;
if (scene == 1) {
// String str = null;
long now = System.currentTimeMillis();
String str = null;
for (String result : results) {
str = result;
}
System.out.println(String.format("%s 消耗时间 %d", str, System.currentTimeMillis() - now));
// 59 44 39 100W + 全部不同
// 48 100W + 随机数(10个)
} else if (scene == 2) {
long now = System.currentTimeMillis();
String str = null;
for (String result : results) {
str = result.intern();
}
System.out.println(String.format("%s 消耗时间 %d", str, System.currentTimeMillis() - now));
// 53676 100W + 全部不同
// 2837 100W + 随机数(10个)
} else {
// 预热
StringCache cache = new StringCache();
results.forEach(cache::cachedValueOf);

long now = System.currentTimeMillis();
String str = null;
for (String result : results) {
str = cache.cachedValueOf(result);
}
System.out.println(String.format("%s 消耗时间 %d", str, System.currentTimeMillis() - now));
// 677 100W + 全部不同
// 499 100W + 随机数(10个)
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package com.netflix.eureka;

import com.netflix.appinfo.AbstractEurekaIdentity;
import com.netflix.appinfo.EurekaClientIdentity;
import com.netflix.discovery.util.RateLimiter;
import com.netflix.eureka.util.EurekaMonitors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
Expand All @@ -34,13 +36,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.netflix.appinfo.AbstractEurekaIdentity;
import com.netflix.appinfo.EurekaClientIdentity;
import com.netflix.eureka.util.EurekaMonitors;
import com.netflix.discovery.util.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Rate limiting filter, with configurable threshold above which non-privileged clients
* will be dropped. This feature enables cutting off non-standard and potentially harmful clients
Expand Down Expand Up @@ -95,7 +90,24 @@ public class RateLimitingFilter implements Filter {

private static final Pattern TARGET_RE = Pattern.compile("^.*/apps(/[^/]*)?$");

enum Target {FullFetch, DeltaFetch, Application, Other}
/**
* 目标类型
*/
enum Target {
/**
* 全量获取注册信息
*/
FullFetch,
/**
* 增量获取注册信息
*/
DeltaFetch,
/**
*
*/
Application,
Other
}

/**
* Includes both full and delta fetches.
Expand Down Expand Up @@ -129,16 +141,21 @@ public void init(FilterConfig filterConfig) throws ServletException {

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
// 获得 Target
Target target = getTarget(request);

// Other Target ,不做限流
if (target == Target.Other) {
chain.doFilter(request, response);
return;
}

HttpServletRequest httpRequest = (HttpServletRequest) request;

// 判断是否被限流
if (isRateLimited(httpRequest, target)) {
// TODO[0012]:监控相关,跳过
incrementStats(target);
// 如果开启限流,返回 503 状态码
if (serverConfig.isRateLimiterEnabled()) {
((HttpServletResponse) response).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return;
Expand Down Expand Up @@ -173,10 +190,12 @@ private static Target getTarget(ServletRequest request) {
}

private boolean isRateLimited(HttpServletRequest request, Target target) {
// 判断是否特权应用
if (isPrivileged(request)) {
logger.debug("Privileged {} request", target);
return false;
}
// 判断是否被超载( 限流 )
if (isOverloaded(target)) {
logger.debug("Overloaded {} request; discarding it", target);
return true;
Expand All @@ -186,21 +205,23 @@ private boolean isRateLimited(HttpServletRequest request, Target target) {
}

private boolean isPrivileged(HttpServletRequest request) {
// 是否对标准客户端开启限流
if (serverConfig.isRateLimiterThrottleStandardClients()) {
return false;
}
// 以请求头( "DiscoveryIdentity-Name" ) 判断是否在标准客户端名集合内
Set<String> privilegedClients = serverConfig.getRateLimiterPrivilegedClients();
String clientName = request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY);
return privilegedClients.contains(clientName) || DEFAULT_PRIVILEGED_CLIENTS.contains(clientName);
}

private boolean isOverloaded(Target target) {
int maxInWindow = serverConfig.getRateLimiterBurstSize();
int fetchWindowSize = serverConfig.getRateLimiterRegistryFetchAverageRate();
int maxInWindow = serverConfig.getRateLimiterBurstSize(); // 10
int fetchWindowSize = serverConfig.getRateLimiterRegistryFetchAverageRate(); // 500
boolean overloaded = !registryFetchRateLimiter.acquire(maxInWindow, fetchWindowSize);

if (target == Target.FullFetch) {
int fullFetchWindowSize = serverConfig.getRateLimiterFullFetchAverageRate();
int fullFetchWindowSize = serverConfig.getRateLimiterFullFetchAverageRate(); // 100
overloaded |= !registryFullFetchRateLimiter.acquire(maxInWindow, fullFetchWindowSize);
}
return overloaded;
Expand Down
Loading

0 comments on commit 2a2b05c

Please sign in to comment.