问题分解:
- 定义速率限制,例如每秒 1000 条消息
- 只要消息数量较少,就可以正常(并及时)处理消息
超过速率限制
- 如果超出速率限制(例如告诉客户
请稍后再试)
- 开销相当低
我正在通过一个简单地将通道组合成循环的解决方案来解决这个问题。
一种常见的速率限制算法称为Token bucket。您有一个固定大小的令牌桶,并以固定的速率添加令牌。只要有令牌,就可以发送消息。
桶的大小决定了“突发性”(你能多快赶上最大速率),而速率决定了最大平均速率。这些将是我们代码的参数。
让我们创建一个以给定速率发送消息(不管是什么)的通道。 (#1)
(defn rate-chan [burstiness rate]
(let [c (chan burstiness) ;; bucket size is buffer size
delta (/ 1000 rate)]
(go
(while true
(>! c :go) ;; send a token, will block if bucket is full
(<! (timeout delta)))) ;; wait a little
c))
现在我们需要一个按速率限制另一个通道的通道。 (#2)
(defn limit-chan [in rc]
(let [c (chan)]
(go
(while true
(<! rc) ;; wait for token
(>! c (<! in)))) ;; pass message along
c))
现在我们可以在没有消息等待的情况下使用这些默认通道:
(defn chan-with-default [in]
(let [c (chan)]
(go
(while true
;; take from in, or if not available, pass useful message
(>! c (alts! [in] :default :rate-exceeded))))
c))
现在我们有了解决问题的所有方法。
(def rchan (-> (chan)
(limit-chan (rate-chan 100 1000))
(chan-with-default)))
就#4 而言,这不是绝对最快的解决方案。但它使用可组合部件,并且可能足够快。如果您希望它更快,您可以创建一个循环来完成所有这些(而不是将其分解为更小的函数)。最快的方法是自己实现interfaces。