Skip to content

Commit 9f1b7de

Browse files
committed
feat: 将 release 函数移动到 QuotaResponse 结构体中
1 parent ca989d3 commit 9f1b7de

File tree

8 files changed

+58
-62
lines changed

8 files changed

+58
-62
lines changed

pkg/flow/quota/assist.go

+15-11
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest
241241
Code: model.QuotaResultOk,
242242
Info: Disabled,
243243
}
244-
return model.QuotaFutureWithResponse(resp, nil), nil
244+
return model.QuotaFutureWithResponse(resp), nil
245245
}
246246
windows, err := f.lookupRateLimitWindow(commonRequest)
247247
if err != nil {
@@ -253,33 +253,37 @@ func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest
253253
Code: model.QuotaResultOk,
254254
Info: RuleNotExists,
255255
}
256-
return model.QuotaFutureWithResponse(resp, nil), nil
256+
return model.QuotaFutureWithResponse(resp), nil
257257
}
258258
var maxWaitMs int64 = 0
259-
var releaseFuncs = make([]func(), 0, len(windows))
259+
var releaseFuncs = make([]model.ReleaseFunc, 0, len(windows))
260260
for _, window := range windows {
261261
window.Init()
262-
quotaResult, releaseFunc := window.AllocateQuotaWithRelease(commonRequest)
263-
if releaseFunc != nil {
264-
releaseFuncs = append(releaseFuncs, releaseFunc)
262+
quotaResult := window.AllocateQuota(commonRequest)
263+
if quotaResult == nil {
264+
continue
265+
}
266+
for i := range quotaResult.ReleaseFuncs {
267+
releaseFuncs = append(releaseFuncs, quotaResult.ReleaseFuncs[i])
265268
}
266269
// 触发限流,提前返回
267270
if quotaResult.Code == model.QuotaResultLimited {
268271
// 先释放资源
269272
for i := range releaseFuncs {
270-
releaseFuncs[i]()
273+
releaseFuncs[i](0)
271274
}
272-
return model.QuotaFutureWithResponse(quotaResult, nil), nil
275+
return model.QuotaFutureWithResponse(quotaResult), nil
273276
}
274277
// 未触发限流,记录令牌桶的最大排队时间
275278
if quotaResult.WaitMs > maxWaitMs {
276279
maxWaitMs = quotaResult.WaitMs
277280
}
278281
}
279282
return model.QuotaFutureWithResponse(&model.QuotaResponse{
280-
Code: model.QuotaResultOk,
281-
WaitMs: maxWaitMs,
282-
}, releaseFuncs), nil
283+
Code: model.QuotaResultOk,
284+
WaitMs: maxWaitMs,
285+
ReleaseFuncs: releaseFuncs,
286+
}), nil
283287
}
284288

285289
// lookupRateLimitWindow 计算限流窗口

pkg/flow/quota/window.go

-9
Original file line numberDiff line numberDiff line change
@@ -608,15 +608,6 @@ func (r *RateLimitWindow) AllocateQuota(commonRequest *data.CommonRateLimitReque
608608
return r.trafficShapingBucket.GetQuota(curTimeMs, commonRequest.Token)
609609
}
610610

611-
// AllocateQuotaWithRelease 分配配额,并返回释放资源函数
612-
func (r *RateLimitWindow) AllocateQuotaWithRelease(commonRequest *data.CommonRateLimitRequest) (*model.QuotaResponse, func()) {
613-
nowMilli := model.CurrentMillisecond()
614-
atomic.StoreInt64(&r.lastAccessTimeMilli, nowMilli)
615-
// 获取服务端时间
616-
curTimeMs := r.toServerTimeMilli(nowMilli)
617-
return r.trafficShapingBucket.GetQuotaWithRelease(curTimeMs, commonRequest.Token)
618-
}
619-
620611
// GetLastAccessTimeMilli 获取最近访问时间
621612
func (r *RateLimitWindow) GetLastAccessTimeMilli() int64 {
622613
return atomic.LoadInt64(&r.lastAccessTimeMilli)

pkg/model/quota.go

+16-12
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ const (
156156
QuotaResultLimited QuotaResultCode = -1
157157
)
158158

159+
type ReleaseFunc func()
160+
159161
// QuotaResponse 配额查询应答.
160162
type QuotaResponse struct {
161163
// 配额分配的返回码
@@ -164,27 +166,27 @@ type QuotaResponse struct {
164166
Info string
165167
// 需要等待的时间段
166168
WaitMs int64
169+
// 释放资源函数
170+
ReleaseFuncs []ReleaseFunc
167171
}
168172

169173
// QuotaFutureImpl 异步获取配额的future.
170174
type QuotaFutureImpl struct {
171-
resp *QuotaResponse
172-
deadlineCtx context.Context
173-
cancel context.CancelFunc
174-
releaseFuncs []func()
175+
resp *QuotaResponse
176+
deadlineCtx context.Context
177+
cancel context.CancelFunc
175178
}
176179

177-
func QuotaFutureWithResponse(resp *QuotaResponse, releaseFuncs []func()) *QuotaFutureImpl {
180+
func QuotaFutureWithResponse(resp *QuotaResponse) *QuotaFutureImpl {
178181
var deadlineCtx context.Context
179182
var cancel context.CancelFunc
180183
if resp.WaitMs > 0 {
181184
deadlineCtx, cancel = context.WithTimeout(context.Background(), time.Duration(resp.WaitMs)*time.Millisecond)
182185
}
183186
return &QuotaFutureImpl{
184-
resp: resp,
185-
deadlineCtx: deadlineCtx,
186-
cancel: cancel,
187-
releaseFuncs: releaseFuncs,
187+
resp: resp,
188+
deadlineCtx: deadlineCtx,
189+
cancel: cancel,
188190
}
189191
}
190192

@@ -209,10 +211,12 @@ func (q *QuotaFutureImpl) Get() *QuotaResponse {
209211
return q.resp
210212
}
211213

212-
// Release 释放资源,仅用于并发数限流的场景.
214+
// Release 释放资源,仅用于并发数限流/CPU限流场景
213215
func (q *QuotaFutureImpl) Release() {
214-
for i := range q.releaseFuncs {
215-
q.releaseFuncs[i]()
216+
if q.resp != nil {
217+
for i := range q.resp.ReleaseFuncs {
218+
q.resp.ReleaseFuncs[i]()
219+
}
216220
}
217221
}
218222

pkg/plugin/ratelimiter/ratelimiter.go

-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ type ServiceRateLimiter interface {
3535
type QuotaBucket interface {
3636
// GetQuota 在令牌桶/漏桶中进行单个配额的划扣,并返回本次分配的结果
3737
GetQuota(curTimeMs int64, token uint32) *model.QuotaResponse
38-
// GetQuotaWithRelease 判断限流结果,并返回配额释放函数(对并发数限流、CPU自适应限流有用)
39-
GetQuotaWithRelease(curTimeMs int64, token uint32) (*model.QuotaResponse, func())
4038
// Release 释放配额(仅对于并发数限流有用)
4139
Release()
4240
// OnRemoteUpdate 远程配额更新

plugin/ratelimiter/bbr/README.md

+2-4
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ BBR 的源码实现可参考:
1212

1313

1414
# 插件设计
15-
本插件将 BBR 限流器适配成 `QuotaBucket` 接口(主要实现 `GetQuotaWithRelease` 判断限流方法),以及 `ServiceRateLimiter` 接口(实现 `InitQuota` 初始化方法)。
16-
17-
由于 BBR 限流需要记录请求通过数、当前并发数、请求耗时,因此没有复用原来 `QuotaBucket` 接口中的 `GetQuota` 方法,而是新增了一个方法 `GetQuotaWithRelease`,该方法相比于 `GetQuota` 方法,返回参数中多了一个 `func()`,供业务方在业务逻辑处理完成后调用。
15+
本插件将 BBR 限流器适配成 `QuotaBucket` 接口(主要实现 `GetQuota` 判断限流方法),以及 `ServiceRateLimiter` 接口(实现 `InitQuota` 初始化方法)。
1816

1917
由于 CPU 使用率指标为实例单机指标,因此 CPU 限流只适用于单机限流,不适用于分布式限流,未实现分布式限流器需要实现的接口。
2018

@@ -29,7 +27,7 @@ bucket: 桶数,BBR 会把 window 分成多个 bucket,沿时间轴向前滑
2927
这三个入参,从 `apitraffic.Rule` 结构体中解析,直接使用了结构体中的 `MaxAmount``ValidDuration``Precision` 字段
3028

3129

32-
## 判断限流 GetQuotaWithRelease
30+
## 判断限流 GetQuota
3331
调用了 BBR 的 `Allow()` 方法
3432

3533
其内部执行 `shouldDrop()` 方法,其执行流程如下:

plugin/ratelimiter/bbr/bucket.go

+25-14
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,32 @@
1+
/**
2+
* Tencent is pleased to support the open source community by making polaris-go available.
3+
*
4+
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
118
package bbr
219

320
import (
4-
"sort"
5-
621
"github.com/polarismesh/polaris-go/pkg/model"
722
"github.com/polarismesh/polaris-go/pkg/plugin/ratelimiter"
823
"github.com/polarismesh/polaris-go/plugin/ratelimiter/bbr/core"
24+
"sort"
925

1026
apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
1127
)
1228

1329
var (
14-
allowResp = &model.QuotaResponse{
15-
Code: model.QuotaResultOk,
16-
}
1730
denyResp = &model.QuotaResponse{
1831
Code: model.QuotaResultLimited,
1932
}
@@ -26,20 +39,18 @@ type BBRQuotaBucket struct {
2639

2740
// GetQuota 获取限额
2841
func (b *BBRQuotaBucket) GetQuota(_ int64, _ uint32) *model.QuotaResponse {
29-
return nil
30-
}
31-
32-
// GetQuotaWithRelease 判断是否限流,并返回释放资源函数
33-
func (b *BBRQuotaBucket) GetQuotaWithRelease(_ int64, _ uint32) (*model.QuotaResponse, func()) {
3442
release, allow := b.BBR.Allow()
3543
if allow {
36-
return allowResp, release
44+
return &model.QuotaResponse{
45+
Code: model.QuotaResultOk,
46+
ReleaseFuncs: []model.ReleaseFunc{release},
47+
}
3748
}
38-
return denyResp, nil
49+
return denyResp
3950
}
4051

41-
// Release 释放资源
42-
func (b *BBRQuotaBucket) Release() {
52+
// Release 释放配额(仅对于并发数限流有用)
53+
func (l *BBRQuotaBucket) Release() {
4354

4455
}
4556

plugin/ratelimiter/reject/bucket.go

-5
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,6 @@ func (q *QuotaBucketReject) GetQuota(curTimeMs int64, token uint32) *model.Quota
3131
return q.bucket.Allocate(curTimeMs, token)
3232
}
3333

34-
// GetQuotaWithRelease 在令牌桶/漏桶中进行单个配额的划扣,并返回本次分配的结果
35-
func (q *QuotaBucketReject) GetQuotaWithRelease(curTimeMs int64, token uint32) (*model.QuotaResponse, func()) {
36-
return q.bucket.Allocate(curTimeMs, token), nil
37-
}
38-
3934
// Release 释放配额(仅对于并发数限流有用)
4035
func (q *QuotaBucketReject) Release() {
4136
q.bucket.Release()

plugin/ratelimiter/unirate/bucket_leaky.go

-5
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,6 @@ func (l *LeakyBucket) GetQuota(curTimeMs int64, token uint32) *model.QuotaRespon
156156
return l.allocateQuota()
157157
}
158158

159-
// GetQuotaWithRelease 在令牌桶/漏桶中进行单个配额的划扣,并返回本次分配的结果
160-
func (l *LeakyBucket) GetQuotaWithRelease(_ int64, _ uint32) (*model.QuotaResponse, func()) {
161-
return l.allocateQuota(), nil
162-
}
163-
164159
// Release 释放配额(仅对于并发数限流有用)
165160
func (l *LeakyBucket) Release() {
166161

0 commit comments

Comments
 (0)