New scheduler implementation

* simpler and more REPL-friendly scheduler implementation
This commit is contained in:
Teemu Patja 2017-03-03 23:24:31 +02:00
parent 5cdc524c1b
commit 22d1fcc517
No known key found for this signature in database
GPG Key ID: F5B7035E6580FD4C
2 changed files with 23 additions and 45 deletions

View File

@ -45,7 +45,8 @@
[prismatic/plumbing "0.5.3"] [prismatic/plumbing "0.5.3"]
[cljsjs/moment "2.17.1-0"] [cljsjs/moment "2.17.1-0"]
[org.clojure/tools.nrepl "0.2.12"] [org.clojure/tools.nrepl "0.2.12"]
[com.cemerick/piggieback "0.2.2-SNAPSHOT"]] [com.cemerick/piggieback "0.2.2-SNAPSHOT"]
[jarohen/chime "0.2.0"]]
:min-lein-version "2.0.0" :min-lein-version "2.0.0"
:source-paths ["src/clj" "src/cljc"] :source-paths ["src/clj" "src/cljc"]

View File

@ -6,9 +6,10 @@
[commiteth.db.bounties :as db-bounties] [commiteth.db.bounties :as db-bounties]
[commiteth.bounties :as bounties] [commiteth.bounties :as bounties]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[mount.core :as mount]) [mount.core :as mount]
(:import [sun.misc ThreadGroupUtils] [clj-time.core :as t]
[java.lang.management ManagementFactory])) [clj-time.periodic :refer [periodic-seq]]
[chime :refer [chime-at]]))
(defn update-issue-contract-address (defn update-issue-contract-address
"For each pending deployment: gets transaction receipt, updates db "For each pending deployment: gets transaction receipt, updates db
@ -125,48 +126,24 @@
current-balance-eth current-balance-eth
current-balance-eth-str)))))) current-balance-eth-str))))))
(def scheduler-thread-name "SCHEDULER_THREAD")
(defn get-thread-by-name (defn run-periodic-tasks [time]
[name] (do
(let [root (ThreadGroupUtils/getRootThreadGroup) (log/debug "run-periodic-tasks" time)
threads-count (.getThreadCount (ManagementFactory/getThreadMXBean)) (update-issue-contract-address)
threads ^"[Ljava.lang.Thread;" (make-array Thread threads-count)] (update-confirm-hash)
(.enumerate root threads true) (update-payout-receipt)
(first (filter #(= name (.getName %)) threads)))) (self-sign-bounty)
(update-balances)))
(defn every
[ms tasks]
(.start (new Thread
(fn []
(while (not (.isInterrupted (Thread/currentThread)))
(do (try
(Thread/sleep ms)
(catch InterruptedException _
(.interrupt (Thread/currentThread))))
(doseq [task tasks]
(try (task)
(catch Exception e (log/error e)))))))
scheduler-thread-name)))
(defn stop-scheduler []
(when-let [scheduler (get-thread-by-name scheduler-thread-name)]
(log/debug "Stopping scheduler thread")
(.interrupt scheduler)))
(defn restart-scheduler [ms tasks]
(stop-scheduler)
(log/debug "Starting scheduler thread")
(while (get-thread-by-name scheduler-thread-name)
(log/debug "Waiting")
(Thread/sleep 1))
(every ms tasks))
(mount/defstate scheduler (mount/defstate scheduler
:start (restart-scheduler 60000 :start (let [every-minute (rest
[update-issue-contract-address (periodic-seq (t/now)
update-confirm-hash (-> 1 t/minutes)))
update-payout-receipt stop-fn (chime-at every-minute run-periodic-tasks)]
self-sign-bounty (log/info "started scheduler")
update-balances]) stop-fn)
:stop (stop-scheduler)) :stop (do
(log/info "stopping scheduler")
(scheduler)))