We’re designing a per-user rate limiter for a single-process application. Each user should be allowed bursts up to a fixed capacity and refill at a steady rate over time.
Every user must be isolated from other users. It should also work correctly under multi-threaded access.
To keep the scope focused, this is gonna be in-memory, single machine, no persistence, no distributed coordination
The API is minimal:
can_access(user_id) -> bool
If it returns True, the request is allowed. Otherwise, it’s rejected.
Now let’s look at some ideas that you might try
Sleep-based throttling
Block threads using sleep() when limit is exceeded. The problem with this is it blocks valuable threads. There is no burst support and it’s very hard to reason about fairness. Also it does not scale under load.
What must always be true
Before designing classes, define correctness rules.
- Request from a user never exceed thresholds
- Concurrent access must not corrupt state
- The rate limits should be enforced per timeframe smoothly
- One user’s rate limit must not affect any other user
Choosing the Algorithm: Token Bucket
The token bucket model is simple:
- Each user has a bucket.
- The bucket has a maximum capacity.
- Tokens are added continuously at a fixed rate.
- Each request consumes one token.
- If no tokens remain, reject.
- This model will support burst of traffic as well.
State Model
Bucket
- capacity: int
- refill_rate: float (tokens per second)
- tokens: float
- last_refill: float (last monotonic time when token was refilled)
- lock: threading.Lock
RateLimiter
- capacity: int
- refill_rate: float
- buckets: Dict[user_id, Bucket]
- lock: threading.Lock
Bucket
Represents one user’s rate state. Lock helps for synchronised access when using multiple threads.
RateLimiter
Manages buckets per user. Global lock protects bucket creation and dictionary mutation.
Why are we using monotonic time?
Rate limiting depends on elapsed time. Wall clock (time.time()) can move backwards due to NTP sync, manual clock adjustments etc… time.monotonic() always increases, it is immune to system time changes and it is designed for measuring durations
How state changes
Refill
Refill is computed lazily during request.
time_elapsed = now - last_refill
refill_amount = time_elapsed * refill_rate
tokens = min(capacity, tokens + refill_amount)
last_refill = now
No background thread. No timers. Time accumulation happens only when needed.
Request
Under bucket lock:
- Refill
- If tokens ≥ 1:
- tokens -= 1
- allow
- Else:
- reject
Concurrency Design
Two levels of locking.
1. Global lock (RateLimiter)
Used only for creating buckets and accessing dictionary. Released immediately after retrieving bucket. This prevents duplicate bucket creation and dictionary race conditions
2. Per-bucket lock
Protects tokensand last_refill. It prevents double consumption, negative tokens, refill race conditions. Without this lock, two threads could both see tokens ≥ 1, both decrement and it will give negative tokens
Minimal Class Diagram
classDiagram
class Bucket {
capacity: int
refill_rate: float
tokens: float
last_refill: float
try_consume()
_refill()
}
class RateLimiter {
buckets: Dict[int, Bucket]
can_access(user_id)
}
RateLimiter o-- Bucket
Implementation (Python)
import threading
import time
class Bucket:
def __init__(self, capacity: int, refill_rate: int):
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.monotonic()
self.lock = threading.Lock()
def _refill(self):
now = time.monotonic()
delta = now - self.last_refill
if delta <= 0:
return
refill_amount = delta * self.refill_rate
self.tokens = min(self.capacity, self.tokens + refill_amount)
self.last_refill = now
def try_consume(self) -> bool:
with self.lock:
self._refill()
if self.tokens >= 1:
self.tokens -= 1
return True
return False
class RateLimiter:
def __init__(self, refill_rate: int, capacity: int) -> None:
self.refill_rate = refill_rate
self.capacity = capacity
self.buckets = {}
self.lock = threading.Lock()
def can_access(self, user_id: int) -> bool:
with self.lock:
if user_id not in self.buckets:
self.buckets[user_id] = Bucket(
self.capacity,
self.refill_rate,
)
bucket = self.buckets[user_id]
return bucket.try_consume()
Limits and Extensions
Memory growth
Inactive users stay in memory. You may add eviction logic (LRU or TTL).
Distributed systems
This design is single-process only. For distributed rate limiting use redis with Lua scripts, centralized token storage or sharded per-user ownership.