背景
要在agent中实现一个类似于ftp数据源端限速的功能,保证agent在运行过程中长期读取磁盘的时候,不会对磁盘造成压力,影响其它业务。
前言
说起限流,运维的小伙伴或多或少都知道点,来了20个流量,只能处理15个,则设置限流阈值为15,剩余的5个直接返回错误,这就是最简单理解的限流。然而限速呢?
相比限流直接了当的给一个错误的返回值,限速要做的不仅不能返回错误,而且要在一定的时间段内进行响应,否则可能造成超时。
方案探索
sleep?
当我要开始实现这个功能的时候,第一个也是想到了 sleep ,限速无非就是对性能做一个限制,那 sleep 不就是对性能最大的损耗吗?然而 sleep 多少呢?sleep 的时间太久,不就造成超时了么?而且当某时刻磁盘读写并不频繁的时候,默认都使用 sleep,无疑是在白白浪费生命!
令牌桶?
此时我又想起了 nginx 限流使用的令牌桶,每个令牌能够处理一个请求,那我能不能每个令牌就读取指定量的数据呢?这样在一定时间范围内,读取的数据量的大小不就是固定的吗?
进一步思考之后,如果 1s 有60个令牌,每个令牌能读取1M的数据,那理论上不就刚好达到限速 60M/s, 想象固然是美好的,再一想,如果这1s中前100ms就把60个令牌用完了,那瞬时值将达到 60M/100ms,即就是 60M/0.1s,那也就是 600M/s 。 what? 说好的限速呢???
方案确定
虽然上述令牌桶的方案看起来并不完善,但是,却是一个非常好的思路。
对于限速值的表达分为两部分,一个是数据量的大小,第二是获取指定数据量所用的时间。所以令牌桶方案的破绽就在于时间范围是错误的,本应该在1s的时间范围获取的数据量在0.1s内获取完了,所以造成了假限速的现象。
所以,至关重要的就是,我们不仅需要限制读取数据量的大小,而且要均衡到一定的时间片上。
为了让均衡的值更为准确,我们直接使用时间区间的大小作为 token 的数量,假设 1s 分为了 10 个 100ms, 那每 0.5s 就有 5 个 token,每 0.1s 就只有一个 token。我们把时间分的越细微,那么 token 分配的值将会愈发准确。
我们假设,当前进程有两个 goroutine, 共用一个限速器。则两个 goroutine 之间是共同使用 token 总数的,我们来人为走一下这个逻辑:
假设程序启动的时间戳为0ms,每个 token 可以读取 256K 大小的数据。
goA(goroutineA) 在 0.1s 的时候请求读取数据,那时间戳相减,可以得到 token 的数量为 1 个,则 goA 就可以读取 256K 以内的数据量
goB 在 0.15s 的时候请求读取数据,时间戳相减,可以得到 token 的数量为 0.5 个,则 goB 就可以读取 128K 以内的数据量
经过上面的流程我们大致可以按照指定的速率进行限制。
方案细化
进过上面逻辑的梳理,我们依旧会发现两个问题:
- 如果 goA 一次性读取了 300K 的数据怎么办?
- 如果 goB 在 0.4 s 的时候才开始读取数据,是不是有 0.3s 也就是 3个 token 直接可用,是不是也就可以直接读取 3 * 256K 的数据量 ?
第一个问题:
goA 读取了300K的数据,超过了一个整 token,我们自然要对数据流进行控制,怎么控制,还是 sleep,哈哈,又回到了 sleep,但是此刻,不在是死板的指定数值,而是动态sleep,sleep的时间只要能够产生指定的token数量就行,所以我们 sleep 了 (300K-256K)/256K * 0.1 s = 0.017s,怎么样,这个 sleep 的耗时看起来靠谱吧!
第二个问题:
时间区间很大,token 一次用光不就又造成了假限速的现象,这里我们把 时间区间 细化为 「窗口区间」,最多只能连续两个区间公用,其实就是废弃了已经过期的token,能够保证我们的上限值尽量准确,所以 「窗口区间」 值约小,限速值越准,但是势必会造成过多的计算开销(窗口越小,读取的数据量将会越少,读取的次数将会增多,会多次调用限速的逻辑),所以该值需要权衡使用。
LimitReader
golang代码中定义了 LimitReader,相比较普通的 Read
函数,这里的实现则需要获取到 token,才能正常返回,代码如下:
func NewLimitReaderWithLimiter(rl *ratelimiter.RateLimiter, src io.Reader) *LimitReader {
return &LimitReader{
Src: src,
Limiter: rl,
}
}
// LimitReader reads stream with RateLimiter.
type LimitReader struct {
Src io.Reader
Limiter *ratelimiter.RateLimiter
}
func (lr *LimitReader) Read(p []byte) (n int, err error) {
n, e := lr.Src.Read(p)
if e != nil && e != io.EOF {
return n, e
}
if n > 0 {
lr.Limiter.AcquireBlocking(int64(n))
}
return n, e
}
方案实现
具体的代码实现来自 阿里 蜻蜓团队,开源代码,具体位置见:Dragonfly/pkg/ratelimiter/ratelimiter.go
我自己做了测试,数据为671M的一个文件,实现一个复制文件的操作,效果还是比较可观的:
copy time is: 945.288114ms //直接使用 io.Copy 操作的耗时
limit copy time is: 1m8.478830526s //限速为 10MB 的耗时
代码实现
阿里团队的代码确实风格强悍,这里我也贴出最为重要的 ratelimiter,最为重要的当然是 acquire
和 createTokens
的实现,代码做了部分的删减:
// RateLimiter is used for limiting the rate of transporting.
type RateLimiter struct {
capacity int64
bucket int64
rate int64
ratePerWindow int64
window int64
last int64
mu sync.Mutex
}
// NewRateLimiter creates a RateLimiter instance.
// rate: how many tokens are generated per second. 0 represents that don't limit the rate.
// window: generating tokens interval (millisecond, [1,1000]).
// The production of rate and window should be division by 1000.
func NewRateLimiter(rate int64, window int64) *RateLimiter {
rl := new(RateLimiter)
rl.capacity = rate
rl.bucket = 0
rl.rate = rate
rl.setWindow(window)
rl.computeRatePerWindow()
rl.last = time.Now().UnixNano()
return rl
}
// AcquireBlocking acquires tokens. It will be blocking unit the bucket has enough required
// number of tokens.
func (rl *RateLimiter) AcquireBlocking(token int64) int64 {
return rl.acquire(token, true)
}
func (rl *RateLimiter) acquire(token int64, blocking bool) int64 {
if rl.capacity <= 0 || token < 1 {
return token
}
tmpCapacity := util.Max(rl.capacity, token)
var process func() int64
process = func() int64 {
now := time.Now().UnixNano()
newTokens := rl.createTokens(now)
curTotal := util.Min(newTokens+rl.bucket, tmpCapacity)
if curTotal >= token {
rl.bucket = curTotal - token
rl.last = now
return token
}
if blocking {
rl.blocking(token - curTotal)
return process()
}
return -1
}
rl.mu.Lock()
defer rl.mu.Unlock()
return process()
}
func (rl *RateLimiter) createTokens(timeNano int64) int64 {
diff := timeNano - rl.last
if diff < time.Millisecond.Nanoseconds() {
return 0
}
return diff / (rl.window * time.Millisecond.Nanoseconds()) * rl.ratePerWindow
}
func (rl *RateLimiter) blocking(requiredToken int64) {
if requiredToken <= 0 {
return
}
windowCount := int64(util.Max(requiredToken/rl.ratePerWindow, 1))
time.Sleep(time.Duration(windowCount * rl.window * time.Millisecond.Nanoseconds()))
}
具体使用的例子
//全局限速器
var TotalLimit *ratelimiter.RateLimiter
func init() {
TotalLimit = ratelimiter.NewRateLimiter(ratelimiter.TransRate(10*MB), 2)
}
func copyFileWithLimit(source, dest string) bool {
//一系列打开文件的操作
//xxxxx
buf := make([]byte, 256*1024)
limitReader := limitreader.NewLimitReaderWithLimiter(TotalLimit, srcFile)
_, copyErr := io.CopyBuffer(destFile, limitReader, buf)
if copyErr != nil {
log.Println(copyErr.Error())
return false
} else {
return true
}
}
这是一个单线程对文件拷贝的限速例子,如果要限制多个线程的进行整体限速,只需要使用同一个 totalLimiter
即可。
个人示例例的全部的代码可以在zhuxinquan/LimitReader.git看到。
done~