From 22d1fcc517aab6b3828e1c6b562984439ae35b90 Mon Sep 17 00:00:00 2001 From: Teemu Patja Date: Fri, 3 Mar 2017 23:24:31 +0200 Subject: [PATCH] New scheduler implementation * simpler and more REPL-friendly scheduler implementation --- project.clj | 3 +- src/clj/commiteth/scheduler.clj | 65 +++++++++++---------------------- 2 files changed, 23 insertions(+), 45 deletions(-) diff --git a/project.clj b/project.clj index bc5cb3e..2290c08 100644 --- a/project.clj +++ b/project.clj @@ -45,7 +45,8 @@ [prismatic/plumbing "0.5.3"] [cljsjs/moment "2.17.1-0"] [org.clojure/tools.nrepl "0.2.12"] - [com.cemerick/piggieback "0.2.2-SNAPSHOT"]] + [com.cemerick/piggieback "0.2.2-SNAPSHOT"] + [jarohen/chime "0.2.0"]] :min-lein-version "2.0.0" :source-paths ["src/clj" "src/cljc"] diff --git a/src/clj/commiteth/scheduler.clj b/src/clj/commiteth/scheduler.clj index 1fdbacd..188404f 100644 --- a/src/clj/commiteth/scheduler.clj +++ b/src/clj/commiteth/scheduler.clj @@ -6,9 +6,10 @@ [commiteth.db.bounties :as db-bounties] [commiteth.bounties :as bounties] [clojure.tools.logging :as log] - [mount.core :as mount]) - (:import [sun.misc ThreadGroupUtils] - [java.lang.management ManagementFactory])) + [mount.core :as mount] + [clj-time.core :as t] + [clj-time.periodic :refer [periodic-seq]] + [chime :refer [chime-at]])) (defn update-issue-contract-address "For each pending deployment: gets transaction receipt, updates db @@ -125,48 +126,24 @@ current-balance-eth current-balance-eth-str)))))) -(def scheduler-thread-name "SCHEDULER_THREAD") -(defn get-thread-by-name - [name] - (let [root (ThreadGroupUtils/getRootThreadGroup) - threads-count (.getThreadCount (ManagementFactory/getThreadMXBean)) - threads ^"[Ljava.lang.Thread;" (make-array Thread threads-count)] - (.enumerate root threads true) - (first (filter #(= name (.getName %)) threads)))) +(defn run-periodic-tasks [time] + (do + (log/debug "run-periodic-tasks" time) + (update-issue-contract-address) + (update-confirm-hash) + (update-payout-receipt) + (self-sign-bounty) + (update-balances))) -(defn every - [ms tasks] - (.start (new Thread - (fn [] - (while (not (.isInterrupted (Thread/currentThread))) - (do (try - (Thread/sleep ms) - (catch InterruptedException _ - (.interrupt (Thread/currentThread)))) - (doseq [task tasks] - (try (task) - (catch Exception e (log/error e))))))) - scheduler-thread-name))) - -(defn stop-scheduler [] - (when-let [scheduler (get-thread-by-name scheduler-thread-name)] - (log/debug "Stopping scheduler thread") - (.interrupt scheduler))) - -(defn restart-scheduler [ms tasks] - (stop-scheduler) - (log/debug "Starting scheduler thread") - (while (get-thread-by-name scheduler-thread-name) - (log/debug "Waiting") - (Thread/sleep 1)) - (every ms tasks)) (mount/defstate scheduler - :start (restart-scheduler 60000 - [update-issue-contract-address - update-confirm-hash - update-payout-receipt - self-sign-bounty - update-balances]) - :stop (stop-scheduler)) + :start (let [every-minute (rest + (periodic-seq (t/now) + (-> 1 t/minutes))) + stop-fn (chime-at every-minute run-periodic-tasks)] + (log/info "started scheduler") + stop-fn) + :stop (do + (log/info "stopping scheduler") + (scheduler)))