-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrate_limiter.py
More file actions
508 lines (432 loc) · 16.1 KB
/
rate_limiter.py
File metadata and controls
508 lines (432 loc) · 16.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (c) 2025 Web4 Contributors
#
# Hardbound - Rate Limiting Infrastructure
# https://github.com/dp-web4/web4
"""
Rate Limiter: Prevent resource exhaustion and abuse.
Rate limiting protects against:
1. Request flooding (too many requests/second)
2. LCT creation abuse (Sybil attacks)
3. ATP drain attacks (rapid budget consumption)
4. Audit trail flooding
Uses token bucket algorithm with per-LCT and per-action limits.
"""
import sqlite3
import time
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass
from typing import Optional, Dict, List, TYPE_CHECKING
from enum import Enum
from pathlib import Path
if TYPE_CHECKING:
from .ledger import Ledger
class RateLimitScope(Enum):
"""Scope of rate limiting."""
GLOBAL = "global" # Team-wide limit
PER_LCT = "per_lct" # Per-member limit
PER_ACTION = "per_action" # Per-action-type limit
@dataclass
class RateLimitRule:
"""A rate limit rule."""
name: str
scope: RateLimitScope
max_requests: int # Maximum requests in window
window_seconds: int # Time window size
burst_allowance: int = 0 # Extra requests allowed in burst
cooldown_seconds: int = 0 # Cooldown after hitting limit
def to_dict(self) -> dict:
return {
"name": self.name,
"scope": self.scope.value,
"max_requests": self.max_requests,
"window_seconds": self.window_seconds,
"burst_allowance": self.burst_allowance,
"cooldown_seconds": self.cooldown_seconds
}
# Default rate limit rules
DEFAULT_RULES = {
# Request rate limits
"r6_requests": RateLimitRule(
name="r6_requests",
scope=RateLimitScope.PER_LCT,
max_requests=60, # 60 requests per minute
window_seconds=60,
burst_allowance=10 # Allow 10 extra in burst
),
"global_requests": RateLimitRule(
name="global_requests",
scope=RateLimitScope.GLOBAL,
max_requests=1000, # 1000 team-wide per minute
window_seconds=60,
burst_allowance=100
),
# LCT creation limits (anti-Sybil)
"lct_creation": RateLimitRule(
name="lct_creation",
scope=RateLimitScope.GLOBAL,
max_requests=10, # 10 new LCTs per hour
window_seconds=3600,
burst_allowance=2,
cooldown_seconds=60 # 1 minute cooldown after limit
),
# Audit entry limits (anti-flooding)
"audit_entries": RateLimitRule(
name="audit_entries",
scope=RateLimitScope.PER_LCT,
max_requests=100, # 100 audit entries per minute per member
window_seconds=60,
burst_allowance=20
),
# Proposal limits
"proposals": RateLimitRule(
name="proposals",
scope=RateLimitScope.PER_LCT,
max_requests=5, # 5 proposals per hour
window_seconds=3600,
burst_allowance=1
),
# ATP operations
"atp_operations": RateLimitRule(
name="atp_operations",
scope=RateLimitScope.PER_LCT,
max_requests=30, # 30 ATP operations per minute
window_seconds=60,
burst_allowance=5
),
# Authentication attempts
"auth_attempts": RateLimitRule(
name="auth_attempts",
scope=RateLimitScope.PER_LCT,
max_requests=5, # 5 failed auth attempts
window_seconds=300, # 5 minute window
burst_allowance=0,
cooldown_seconds=300 # 5 minute lockout
),
}
@dataclass
class RateLimitResult:
"""Result of a rate limit check."""
allowed: bool
remaining: int # Remaining requests in window
reset_seconds: int # Seconds until window reset
retry_after: int = 0 # Seconds to wait if blocked
rule_name: str = ""
reason: str = ""
class TokenBucket:
"""
Token bucket for rate limiting.
Each bucket has:
- tokens: Current available tokens
- last_update: Last time tokens were added
- max_tokens: Maximum tokens (capacity)
- refill_rate: Tokens added per second
"""
def __init__(self, max_tokens: int, refill_rate: float):
self.max_tokens = max_tokens
self.refill_rate = refill_rate
self.tokens = float(max_tokens)
self.last_update = time.time()
def consume(self, tokens: int = 1) -> bool:
"""
Try to consume tokens.
Returns:
True if tokens consumed, False if insufficient
"""
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
"""Refill tokens based on elapsed time."""
now = time.time()
elapsed = now - self.last_update
self.tokens = min(
self.max_tokens,
self.tokens + elapsed * self.refill_rate
)
self.last_update = now
@property
def available(self) -> int:
"""Get available tokens."""
self._refill()
return int(self.tokens)
class RateLimiter:
"""
Rate limiting with token bucket algorithm.
Provides multiple limiting strategies:
- Fixed window: Simple count per time window
- Sliding window: More accurate rate over time
- Token bucket: Allows bursts with steady refill
"""
def __init__(self, ledger: 'Ledger', rules: Optional[Dict[str, RateLimitRule]] = None):
"""
Initialize rate limiter.
Args:
ledger: Ledger for persistence
rules: Custom rules (uses defaults if None)
"""
self.ledger = ledger
self.rules = rules or DEFAULT_RULES.copy()
self._buckets: Dict[str, TokenBucket] = {}
self._cooldowns: Dict[str, float] = {} # key -> cooldown_end_time
self._ensure_table()
def _ensure_table(self):
"""Create rate limit tracking table."""
with sqlite3.connect(self.ledger.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS rate_limits (
key TEXT PRIMARY KEY,
rule_name TEXT NOT NULL,
request_count INTEGER NOT NULL,
window_start TEXT NOT NULL,
cooldown_until TEXT
)
""")
def _get_bucket_key(self, rule_name: str, lct_id: Optional[str] = None,
action: Optional[str] = None) -> str:
"""Generate bucket key based on scope."""
rule = self.rules.get(rule_name)
if not rule:
return f"unknown:{rule_name}"
if rule.scope == RateLimitScope.GLOBAL:
return f"global:{rule_name}"
elif rule.scope == RateLimitScope.PER_LCT:
return f"lct:{lct_id or 'unknown'}:{rule_name}"
elif rule.scope == RateLimitScope.PER_ACTION:
return f"action:{action or 'unknown'}:{rule_name}"
return f"unknown:{rule_name}"
def _get_bucket(self, key: str, rule: RateLimitRule) -> TokenBucket:
"""Get or create token bucket for key."""
if key not in self._buckets:
max_tokens = rule.max_requests + rule.burst_allowance
refill_rate = rule.max_requests / rule.window_seconds
self._buckets[key] = TokenBucket(max_tokens, refill_rate)
return self._buckets[key]
def check(
self,
rule_name: str,
lct_id: Optional[str] = None,
action: Optional[str] = None,
consume: bool = True
) -> RateLimitResult:
"""
Check rate limit and optionally consume a token.
Args:
rule_name: Name of the rule to check
lct_id: LCT ID for per-LCT rules
action: Action type for per-action rules
consume: If True, consume a token on success
Returns:
RateLimitResult with allowed status and details
"""
rule = self.rules.get(rule_name)
if not rule:
return RateLimitResult(
allowed=True,
remaining=999,
reset_seconds=0,
rule_name=rule_name,
reason="Unknown rule - allowing"
)
key = self._get_bucket_key(rule_name, lct_id, action)
# Check cooldown
if key in self._cooldowns:
cooldown_end = self._cooldowns[key]
if time.time() < cooldown_end:
retry_after = int(cooldown_end - time.time())
return RateLimitResult(
allowed=False,
remaining=0,
reset_seconds=retry_after,
retry_after=retry_after,
rule_name=rule_name,
reason=f"In cooldown for {retry_after} seconds"
)
else:
del self._cooldowns[key]
bucket = self._get_bucket(key, rule)
if consume:
if bucket.consume(1):
return RateLimitResult(
allowed=True,
remaining=bucket.available,
reset_seconds=int(rule.window_seconds * (1 - bucket.tokens / bucket.max_tokens)),
rule_name=rule_name,
reason="OK"
)
else:
# Apply cooldown if configured
if rule.cooldown_seconds > 0:
self._cooldowns[key] = time.time() + rule.cooldown_seconds
return RateLimitResult(
allowed=False,
remaining=0,
reset_seconds=int(1 / bucket.refill_rate), # Time for 1 token
retry_after=int(1 / bucket.refill_rate) + rule.cooldown_seconds,
rule_name=rule_name,
reason="Rate limit exceeded"
)
else:
# Just checking, don't consume
return RateLimitResult(
allowed=bucket.available > 0,
remaining=bucket.available,
reset_seconds=int(rule.window_seconds * (1 - bucket.tokens / bucket.max_tokens)),
rule_name=rule_name,
reason="OK" if bucket.available > 0 else "Would exceed limit"
)
def get_status(self, rule_name: str, lct_id: Optional[str] = None,
action: Optional[str] = None) -> dict:
"""
Get current rate limit status without consuming.
Returns:
Status dict with current limits and usage
"""
result = self.check(rule_name, lct_id, action, consume=False)
rule = self.rules.get(rule_name)
return {
"rule": rule.to_dict() if rule else None,
"remaining": result.remaining,
"max": rule.max_requests + rule.burst_allowance if rule else 0,
"reset_seconds": result.reset_seconds,
"in_cooldown": result.retry_after > 0,
"allowed": result.allowed
}
def add_rule(self, rule: RateLimitRule):
"""Add or update a rate limit rule."""
self.rules[rule.name] = rule
def check_adhoc(
self,
key: str,
max_count: int,
window_ms: int,
consume: bool = True
) -> RateLimitResult:
"""
Check an ad-hoc rate limit (not from predefined rules).
SECURITY FIX (CQ-5): This method allows PolicyEntity to enforce
rate limits defined in policy rules rather than only using
predefined RateLimitRule objects.
Args:
key: Unique key for this rate limit
max_count: Maximum requests in window
window_ms: Window size in milliseconds
consume: If True, consume a token on success
Returns:
RateLimitResult with allowed status
"""
# Create a temporary rule for this ad-hoc limit
window_seconds = max(1, window_ms // 1000)
temp_rule = RateLimitRule(
name=key,
scope=RateLimitScope.GLOBAL,
max_requests=max_count,
window_seconds=window_seconds,
burst_allowance=0,
cooldown_seconds=0
)
# Get or create bucket for this key
bucket = self._get_bucket(key, temp_rule)
if consume:
if bucket.consume(1):
return RateLimitResult(
allowed=True,
remaining=bucket.available,
reset_seconds=int(window_seconds * (1 - bucket.tokens / bucket.max_tokens)) if bucket.max_tokens > 0 else 0,
rule_name=key,
reason="OK"
)
else:
return RateLimitResult(
allowed=False,
remaining=0,
reset_seconds=int(1 / bucket.refill_rate) if bucket.refill_rate > 0 else window_seconds,
retry_after=int(1 / bucket.refill_rate) if bucket.refill_rate > 0 else window_seconds,
rule_name=key,
reason="Rate limit exceeded"
)
else:
return RateLimitResult(
allowed=bucket.available > 0,
remaining=bucket.available,
reset_seconds=int(window_seconds * (1 - bucket.tokens / bucket.max_tokens)) if bucket.max_tokens > 0 else 0,
rule_name=key,
reason="OK" if bucket.available > 0 else "Would exceed limit"
)
def reset(self, rule_name: str, lct_id: Optional[str] = None,
action: Optional[str] = None):
"""Reset rate limit for a specific key (admin action)."""
key = self._get_bucket_key(rule_name, lct_id, action)
if key in self._buckets:
del self._buckets[key]
if key in self._cooldowns:
del self._cooldowns[key]
class RateLimitedTeam:
"""
Mixin for adding rate limiting to Team operations.
Usage:
class Team(RateLimitedTeam):
def __init__(self, ...):
self._rate_limiter = RateLimiter(self.ledger)
def add_member(self, lct_id, ...):
self._check_rate_limit("lct_creation", lct_id)
# ... actual implementation
"""
_rate_limiter: RateLimiter
def _check_rate_limit(
self,
rule_name: str,
lct_id: Optional[str] = None,
action: Optional[str] = None,
auto_raise: bool = True
) -> RateLimitResult:
"""
Check rate limit before operation.
Args:
rule_name: Rule to check
lct_id: LCT for per-LCT rules
action: Action for per-action rules
auto_raise: If True, raise exception on limit
Returns:
RateLimitResult
Raises:
RateLimitExceeded: If limit exceeded and auto_raise=True
"""
result = self._rate_limiter.check(rule_name, lct_id, action)
if not result.allowed and auto_raise:
raise RateLimitExceeded(
f"Rate limit exceeded for {rule_name}: {result.reason}. "
f"Retry after {result.retry_after} seconds."
)
return result
class RateLimitExceeded(Exception):
"""Exception raised when rate limit is exceeded."""
pass
if __name__ == "__main__":
print("=" * 60)
print("Rate Limiter - Default Rules")
print("=" * 60)
for name, rule in DEFAULT_RULES.items():
print(f"\n{name}:")
print(f" Scope: {rule.scope.value}")
print(f" Limit: {rule.max_requests}/{rule.window_seconds}s")
print(f" Burst: +{rule.burst_allowance}")
if rule.cooldown_seconds:
print(f" Cooldown: {rule.cooldown_seconds}s")
print("\n" + "=" * 60)
print("Token Bucket Demo")
print("=" * 60)
# Demo token bucket
bucket = TokenBucket(max_tokens=5, refill_rate=1.0) # 1 token/second
print(f"\nBucket: 5 tokens max, 1 token/second refill")
print(f"Initial tokens: {bucket.available}")
# Consume some
for i in range(7):
result = bucket.consume(1)
print(f"Consume #{i+1}: {'OK' if result else 'FAILED'}, remaining={bucket.available}")
# Wait a bit
print("\nWaiting 3 seconds...")
time.sleep(3)
print(f"After 3s: {bucket.available} tokens")