Improve appender rate limiting: now specified as ncalls / window-msecs

This commit is contained in:
Peter Taoussanis 2013-11-29 18:31:33 +07:00
parent 39e568f4fc
commit 5ac604d66d
8 changed files with 33 additions and 36 deletions

View File

@ -82,7 +82,8 @@
(def example-config
"APPENDERS
An appender is a map with keys:
:doc, :min-level, :enabled?, :async?, :limit-per-msecs, :fn
:doc, :min-level, :enabled?, :async?, :fn,
:rate-limit ([ncalls-limit window-ms] form).
An appender's fn takes a single map with keys:
:level, :throwable
@ -132,14 +133,14 @@
:appenders
{:standard-out
{:doc "Prints to *out*/*err*. Enabled by default."
:min-level nil :enabled? true :async? false :limit-per-msecs nil
:min-level nil :enabled? true :async? false :rate-limit nil
:fn (fn [{:keys [error? default-output]}]
(binding [*out* (if error? *err* *out*)]
(str-println default-output)))}
:spit
{:doc "Spits to `(:spit-filename :shared-appender-config)` file."
:min-level nil :enabled? false :async? false :limit-per-msecs nil
:min-level nil :enabled? false :async? false :rate-limit nil
:fn (fn [{:keys [ap-config default-output]}]
(when-let [filename (:spit-filename ap-config)]
(try (spit filename default-output :append true)
@ -154,38 +155,39 @@
(defn- wrap-appender-fn
"Wraps compile-time appender fn with additional runtime capabilities
controlled by compile-time config."
[{apfn :fn :keys [async? limit-per-msecs] :as appender}]
(let [limit-per-msecs (or (:max-message-per-msecs appender)
limit-per-msecs)] ; Backwards comp
[{apfn :fn :keys [async? rate-limit] :as appender}]
(let [rate-limit (or rate-limit ; Backwards comp:
(if-let [x (:max-message-per-msecs appender)] [1 x]
(when-let [x (:limit-per-msecs appender)] [1 x])))]
(assert (or (nil? rate-limit) (vector? rate-limit)))
(->> ; Wrapping applies per appender, bottom-to-top
apfn
;; Rate limit support
((fn [apfn]
(if-not limit-per-msecs apfn
(let [timers (atom {})] ; {:hash last-appended-time-msecs ...}
(fn [{ns :ns [x1 & _] :args :as apfn-args}]
(let [now (System/currentTimeMillis)
hash (str ns "/" x1) ; TODO Alternatives?
limit? (fn [last-msecs]
(and last-msecs (<= (- now last-msecs)
limit-per-msecs)))]
(when-not (limit? (@timers hash))
(apfn apfn-args)
(swap! timers assoc hash now))
(when (< (rand) 0.001) ; Occasionally garbage collect
(when-let [expired-timers (->> (keys @timers)
(remove #(limit? (@timers %)))
(seq))]
(apply swap! timers dissoc expired-timers)))))))))
;; Compile-time:
(if-not rate-limit apfn
(let [[ncalls-limit window-ms] rate-limit
limiter-any (utils/rate-limiter ncalls-limit window-ms)
;; This is a little hand-wavy but it's a decent general
;; strategy and helps us from making this overly complex to
;; configure:
limiter-specific (utils/rate-limiter (quot ncalls-limit 4)
window-ms)]
(fn [{:keys [ns args] :as apfn-args}]
;; Runtime: (test smaller limit 1st):
(when-not (or (limiter-specific (str ns args)) (limiter-any))
(apfn apfn-args)))))))
;; Async (agent) support
((fn [apfn]
;; Compile-time:
(if-not async? apfn
(let [agent (agent nil :error-mode :continue)]
(fn [apfn-args] (send-off agent (fn [_] (apfn apfn-args)))))))))))
(fn [apfn-args] ; Runtime:
(send-off agent (fn [_] (apfn apfn-args)))))))))))
(defn- make-timestamp-fn
"Returns a unary fn that formats instants using given pattern string and an

View File

@ -11,8 +11,6 @@
"timestamps, etc.")
:min-level :debug
:enabled? true
:async? false
:limit-per-msecs nil
:prefix-fn :ns
:fn (fn [{:keys [level prefix throwable message]}]
(if throwable

View File

@ -41,6 +41,6 @@
"Needs :irc config map in :shared-appender-config, e.g.:
{:host \"irc.example.org\" :port 6667 :nick \"logger\"
:name \"My Logger\" :chan \"#logs\"")
:min-level :info :enabled? true :async? false :limit-per-msecs nil
:min-level :info :enabled? true
:prefix-fn (fn [{:keys [level]}] (-> level name str/upper-case))
:fn appender-fn})

View File

@ -42,5 +42,5 @@
:server {:host \"127.0.0.1\"
:port 27017}}")
:min-level :warn :enabled? true :async? true
:max-message-per-msecs 1000 ; 1 entry / sec
:rate-limit [1 1000] ; 1 entry / sec
:fn appender-fn})

View File

@ -11,7 +11,7 @@
^{:host \"mail.isp.net\" :user \"jsmith\" :pass \"sekrat!!1\"}
{:from \"Bob's logger <me@draines.com>\" :to \"foo@example.com\"}")
:min-level :error :enabled? true :async? true
:limit-per-msecs (* 1000 60 10) ; 1 subject / 10 mins
:rate-limit [5 (* 1000 60 2)] ; 5 calls / 2 mins
:fn (fn [{:keys [ap-config prefix throwable args]}]
(when-let [postal-config (:postal ap-config)]
(let [[subject & body] args]

View File

@ -74,6 +74,4 @@
:backlog 5}")
:min-level nil
:enabled? true
:async? false
:limit-per-msecs nil
:fn appender-fn})

View File

@ -44,6 +44,5 @@
"Needs :socket config map in :shared-appender-config, e.g.:
{:listen-addr :all
:port 9000}")
:min-level :trace :enabled? true :async? false
:max-message-per-msecs nil ; no rate limit by default
:min-level :trace :enabled? true
:fn appender-fn})

View File

@ -30,7 +30,7 @@
(swap! cache assoc args cv)
@dv)))))))))
(defn rate-limit
(defn rate-limiter
"Returns a `(fn [& [id]])` that returns either `nil` (limit okay) or number of
msecs until next rate limit window (rate limited)."
[ncalls-limit window-ms]