Make reactions async by queueing changes instead

Reactions now put themselves in a queue when they get a
callback from an atom. The queue is flushed before rendering, and
on deref on individual reactions.

This turns out to perform much better in many cases.
This commit is contained in:
Dan Holmsand 2015-09-30 08:56:06 +02:00
parent e39bb33ccc
commit 3fca2e5d41
9 changed files with 218 additions and 183 deletions

View File

@ -37,9 +37,8 @@
(dotimes [i (alength a)]
(let [c (aget a i)]
(when (true? (.' c :cljsIsDirty))
(if (true? (._check-clean (.' c :cljsRatom)))
(.! c :cljsIsDirty false)
(.' c forceUpdate))))))
(.' c forceUpdate)))))
(defn run-funs [a]
(dotimes [i (alength a)]
@ -68,7 +67,6 @@
(run-funs aq))))
(defonce render-queue (RenderQueue. (array) false (array)))
(set! ratom/render-queue render-queue)
(defn flush []
(.run-queue render-queue))
@ -87,6 +85,9 @@
(do-after-flush f)
(.schedule render-queue))
(defn schedule []
(.schedule render-queue))
;; Render helper
(defn is-reagent-component [c]
@ -104,6 +105,7 @@
(ratom/make-reaction run
:auto-run #(queue-render c)
:capture derefed
:async true
:no-cache true)))
res)
(ratom/run rat))))
@ -112,3 +114,27 @@
(some-> (.' c :cljsRatom)
ratom/dispose!)
(mark-rendered c))
(comment
(defn ratom-perf []
(dbg "ratom-perf")
(set! ratom/debug false)
(dotimes [_ 10]
(let [nite 100000
a (ratom/atom 0)
f (fn []
;; (with-let [x 1])
(quot @a 10))
mid (ratom/make-reaction f)
res (ratom/track! (fn []
;; @(track f)
(inc @mid)
))]
@res
(time (dotimes [x nite]
(swap! a inc)
(ratom/flush!)))
(ratom/dispose! res))))
(enable-console-print!)
(ratom-perf))

View File

@ -17,8 +17,11 @@
;;; Utilities
(defonce rea-sync-count 0)
(defn running []
(+ @-running
rea-sync-count
(count cached-reactions)))
(defn capture-derefed [f obj]
@ -78,7 +81,8 @@
(set! (.-watches this) (check-watches w (assoc w key f)))))
(defn- remove-w [this key]
(let [w (.-watches this)]
(let [w (.-watches this)
r -running]
(set! (.-watches this) (check-watches w (dissoc w key)))))
(defn- notify-w [this old new]
@ -94,6 +98,31 @@
(-write writer ">"))
;;; Queueing
(defonce rea-queue nil)
(def ^boolean ^:dynamic *flushing*)
(defn- rea-enqueue [r]
(when (nil? rea-queue)
(set! rea-queue (array))
;; Get around ugly circular dependency. TODO: Fix.
(js/reagent.impl.batching.schedule))
(.push rea-queue r))
(defn- run-queue [q]
(dotimes [i (alength q)]
(let [r (aget q i)]
(._try-run r))))
(defn flush! []
(when-some [q rea-queue]
(set! rea-queue nil)
(binding [*flushing* true]
(run-queue q))))
;;; Atom
(defprotocol IReactiveAtom)
@ -118,6 +147,9 @@
(set! state new-value)
(when-not (nil? watches)
(notify-w a old-value new-value))
;; Support deprecated sync reactions
(when-not (== 0 rea-sync-count)
(flush!))
new-value))
ISwap
@ -163,7 +195,8 @@
(set! (.-reaction obj) nil))
(when-not (nil? destroy)
(destroy x))
nil))
nil)
:async true)
v (-deref r)]
(set! cached-reactions (assoc cached-reactions key r))
(when-not (nil? obj)
@ -195,8 +228,10 @@
(Track. #(apply f args) [f args] nil))
(defn make-track! [f args]
(let [r (make-reaction #(-deref (make-track f args))
:auto-run :async)]
(let [t (make-track f args)
r (make-reaction #(-deref t)
:auto-run true
:async true)]
@r
r))
@ -304,14 +339,11 @@
(defprotocol IRunnable
(run [this]))
(def ^:const clean 0)
(def ^:const maybe-dirty 1)
(def ^:const dirty 2)
(defn- handle-reaction-change [this sender old new]
(._handle-change this sender old new))
(deftype Reaction [f ^:mutable state ^:mutable ^number dirtyness
(deftype Reaction [f ^:mutable state ^:mutable ^boolean dirty?
^:mutable watching ^:mutable watches
^:mutable auto-run on-set on-dispose ^boolean nocache?]
IAtom
@ -343,67 +375,46 @@
Object
(_peek-at [this]
(if (== dirtyness clean)
state
(binding [*ratom-context* nil]
(-deref this))))
(_check-clean [this]
(when (== dirtyness maybe-dirty)
(let [ar auto-run
len (arr-len watching)]
(set! auto-run nil)
(loop [i 0]
(when (< i len)
(let [w (aget watching i)]
(when (and (instance? Reaction w)
(false? (._check-clean w)))
(._try-run w this)))
(when (== dirtyness maybe-dirty)
(recur (inc i)))))
(set! auto-run ar))
(when (== dirtyness maybe-dirty)
(set! dirtyness clean)))
(== dirtyness clean))
(-deref this)))
(_handle-change [this sender oldval newval]
(let [old-dirty dirtyness
new-dirty (if (identical? oldval newval)
(if (instance? Reaction sender)
maybe-dirty clean)
dirty)]
(when (> new-dirty old-dirty)
(set! dirtyness new-dirty)
(when (== old-dirty clean)
(if-some [ar auto-run]
(when-not (and (identical? ar run)
(true? (._check-clean this)))
(ar this))
(notify-w this state state)))))
(when-not (identical? oldval newval)
(if *flushing*
(if-not (nil? auto-run)
(auto-run this)
(when-not dirty?
(set! dirty? true)
(run this)))
(do
(set! dirty? true)
(rea-enqueue this))))
nil)
(_update-watching [this derefed]
(let [wg watching]
(set! watching derefed)
(doseq [w derefed]
(when (or (nil? watching)
(== -1 (.indexOf watching w)))
(when (or (nil? wg)
(== -1 (.indexOf wg w)))
(-add-watch w this handle-reaction-change)))
(doseq [w watching]
(doseq [w wg]
(when (or (nil? derefed)
(== -1 (.indexOf derefed w)))
(-remove-watch w this)))
(set! watching derefed)
(-remove-watch w this))))
nil)
(_try-run [this other]
(if-not (nil? auto-run)
(auto-run this)
(when (and dirty? (not (nil? watching)))
(try
(if-some [ar auto-run]
(ar this)
(run this))
(run this)
(catch :default e
;; Just log error: it will most likely pop up again at deref time.
(when-not silent (error "Error in reaction:" e))
(set! dirtyness dirty)
(set! (.-dirtyness other) dirty)))
(set! state nil)
(notify-w this e nil)))))
nil)
IRunnable
@ -411,9 +422,9 @@
(let [oldstate state
res (capture-derefed f this)
derefed (-captured this)]
(set! dirty? false)
(when-not (arr-eq derefed watching)
(._update-watching this derefed))
(set! dirtyness clean)
(when-not nocache?
(set! state res)
;; Use = to determine equality from reactions, since
@ -425,32 +436,35 @@
IDeref
(-deref [this]
(._check-clean this)
(if (and (nil? *ratom-context*)
(nil? auto-run))
(do
(when-not (== dirtyness clean)
(let [oldstate state
newstate (f)]
(set! state newstate)
(when-not (or (nil? watches)
(= oldstate newstate))
(notify-w this oldstate newstate)))))
(if-not (and (nil? auto-run) (nil? *ratom-context*))
(do
(notify-deref-watcher! this)
(when-not (== dirtyness clean)
(run this))))
(when dirty?
(run this)))
(do
(when-not *flushing*
(flush!))
(when dirty?
(let [oldstate state]
(set! state (f))
(when-not (or (nil? watches)
(= oldstate state))
(notify-w this oldstate state))))))
state)
IDisposable
(dispose! [this]
(doseq [w watching]
(remove-watch w this))
(let [s state]
(let [s state
wg watching]
(set! watching nil)
(set! state nil)
(set! auto-run nil)
(set! dirtyness dirty)
(set! dirty? true)
(when (true? (.-sync this))
(set! (.-sync this) false)
(set! rea-sync-count (dec rea-sync-count)))
(doseq [w wg]
(remove-watch w this))
(when-not (nil? on-dispose)
(on-dispose s)))
nil)
@ -465,44 +479,24 @@
(-hash [this] (reaction-key this)))
;;; Queueing
;; Gets set up from batching
;; TODO: Refactor so that isn't needed
(defonce render-queue nil)
(def dirty-queue nil)
(defn enqueue [r]
(when (nil? dirty-queue)
(set! dirty-queue (array))
(.schedule render-queue))
(.push dirty-queue r))
(defn flush! []
(let [q dirty-queue]
(when-not (nil? q)
(set! dirty-queue nil)
(dotimes [i (alength q)]
(let [r (aget q i)]
(when-not (or (nil? (.-auto-run r))
(true? (._check-clean r)))
(run r)))))))
;; TOOD: Fix arguments
(defn make-reaction [f & {:keys [auto-run on-set on-dispose derefed no-cache
capture]}]
capture async]}]
(let [runner (case auto-run
true run
:async enqueue
auto-run)
derefs (if-some [c capture]
(-captured c)
derefed)
dirty (if (nil? derefs) dirty clean)
dirty (if (nil? derefs) true false)
nocache (if (nil? no-cache) false no-cache)
reaction (Reaction. f nil dirty nil nil
runner on-set on-dispose nocache)]
(when (and runner (not async))
(when-not debug
(warn "deprecated sync"))
(set! (.-sync reaction) true)
(set! rea-sync-count (inc rea-sync-count)))
(when-some [rid (some-> capture .-reaction-id)]
(set! (.-reaction-id reaction) rid))
(when-not (nil? derefed)
@ -515,7 +509,6 @@
reaction))
;;; wrap
(deftype Wrapper [^:mutable state callback ^:mutable ^boolean changed
@ -569,25 +562,3 @@
(Wrapper. value
(util/partial-ifn. callback-fn args nil)
false nil))
(comment
(defn ratom-perf []
(dbg "ratom-perf")
(set! debug false)
(dotimes [_ 10]
(let [nite 100000
a (atom 0)
f (fn []
;; (with-let [x 1])
(quot @a 10))
mid (make-reaction f)
res (make-reaction (fn []
;; @(track f)
(inc @mid))
:auto-run true)]
@res
(time (dotimes [x nite]
(swap! a inc)))
(dispose! res))))
(enable-console-print!)
(ratom-perf))

View File

@ -23,6 +23,7 @@
:color :#aaa})
(defn all-tests []
#_(test/run-tests 'reagenttest.testratomasync)
(test/run-all-tests #"reagenttest.test.*"))
(defmethod test/report [::test/default :summary] [m]

View File

@ -6,9 +6,16 @@
;; this repeats all the atom tests but using cursors instead
(defn running []
(defn fixture [f]
(set! rv/debug true)
(f)
(set! rv/debug false))
(t/use-fixtures :once fixture)
(defn running []
(rv/running))
(defn dispose [v] (rv/dispose! v))
(def testite 10)
@ -31,7 +38,7 @@
(is (= @out 2))
(reset! start 1)
(is (= @out 3))
(is (= @count 2))
(is (= @count 4))
(dispose const)
(is (= @start-base {:a {:b {:c 1}}}))
(is (= (running) runs))))
@ -193,19 +200,19 @@
:on-dispose #(reset! disposed-cns true))]
@cns
(is (= @res 2))
(is (= (+ 6 runs) (running)))
(is (= (+ 7 runs) (running)))
(is (= @count-b 1))
(is (= {:a 0 :b 0} @a-base))
(reset! a -1)
(is (= @res 1))
(is (= @disposed nil))
(is (= @count-b 2))
(is (= (+ 6 runs) (running)) "still running")
(is (= (+ 7 runs) (running)) "still running")
(is (= {:a -1 :b 0} @a-base))
(reset! a 2)
(is (= @res 1))
(is (= @disposed true))
(is (= (+ 4 runs) (running)) "less running count")
(is (= (+ 5 runs) (running)) "less running count")
(is (= {:a 2 :b 0} @a-base))
(reset! disposed nil)

View File

@ -4,8 +4,14 @@
[reagent.debug :refer-macros [dbg]]
[reagent.core :as r]))
(defn running []
(defn fixture [f]
(set! rv/debug true)
(f)
(set! rv/debug false))
(t/use-fixtures :once fixture)
(defn running []
(rv/running))
(def testite 10)
@ -49,7 +55,7 @@
(is (= @out 2))
(reset! start 1)
(is (= @out 3))
(is (= @count 2))
(is (= @count 4))
(dispose const)
(is (= (running) runs))))
@ -194,17 +200,17 @@
:on-dispose #(reset! disposed-cns true))]
@cns
(is (= @res 2))
(is (= (+ 4 runs) (running)))
(is (= (+ 5 runs) (running)))
(is (= @count-b 1))
(reset! a -1)
(is (= @res 1))
(is (= @disposed nil))
(is (= @count-b 2))
(is (= (+ 4 runs) (running)) "still running")
(is (= (+ 5 runs) (running)) "still running")
(reset! a 2)
(is (= @res 1))
(is (= @disposed true))
(is (= (+ 2 runs) (running)) "less running count")
(is (= (+ 3 runs) (running)) "less running count")
(reset! disposed nil)
(reset! a -1)

View File

@ -4,18 +4,24 @@
[reagent.debug :refer-macros [dbg]]
[reagent.core :as r]))
(defn running []
(defn fixture [f]
(set! rv/debug true)
(f)
(set! rv/debug false))
(t/use-fixtures :once fixture)
(defn running []
(rv/running))
(def testite 1)
(def testite 10)
(defn dispose [v]
(rv/dispose! v))
(defn sync [] (r/flush))
(defn ar [f] (rv/make-reaction f :auto-run :async))
(defn ar [f] (rv/track! f))
(deftest basic-ratom
(sync)
@ -31,15 +37,13 @@
@sv @c2 @comp)
const (ar (fn []
(reset! out @res)))]
(is (= @count 0))
@const
(is (= @count 1) "constrain ran")
(is (= @out 2))
(reset! start 1)
(is (= @count 1))
(sync)
(is (= @out 3))
(is (= @count 2))
(is (= @count 4))
(reset! start 2)
(dispose const)
(is (= (running) runs) "did dispose")
@ -53,13 +57,10 @@
c3-count (rv/atom 0)
c1 (reaction @start 1)
c2 (reaction @start)
c3 (rv/make-reaction
c3 (ar
(fn []
(swap! c3-count inc)
(+ @c1 @c2))
:auto-run :async)]
(is (= @c3-count 0) "t0")
(sync)
(+ @c1 @c2)))]
(is (= @c3 1))
(is (= @c3-count 1) "t1")
(swap! start inc)
@ -84,8 +85,6 @@
@!signal
;;update the counter
(swap! !counter inc)))]
(is (= 0 @!counter))
@co
(is (= 1 @!counter) "Constraint run on init")
(reset! !signal "foo")
(sync)
@ -93,7 +92,7 @@
"Counter auto updated")
(dispose co))
(let [!x (rv/atom 0)
!co (rv/make-reaction #(inc @!x) :auto-run :async)]
!co (ar #(inc @!x))]
@!co
(is (= 1 @!co) "CO has correct value on first deref")
(swap! !x inc)
@ -118,15 +117,17 @@
c (reaction
(swap! c-changed inc)
(+ 10 @a2))
res (ar (fn []
(if (< @a2 1) @b @c)))]
res (atom nil)
resr (ar (fn []
(reset! res
(if (< @a2 1) @b @c))))]
(is (= @res (+ 2 @a)))
(is (= @b-changed 1))
(is (= @c-changed 0))
(reset! a -1)
(is (= @b-changed 1))
@res
(sync)
(is (= @b-changed 2))
(is (= @c-changed 0))
@ -151,8 +152,9 @@
(is (= @res (+ 10 @a)))
(reset! a -1)
(sync)
(is (= @res (+ 2 @a)))
(dispose res)
(dispose resr)
(is (= runs (running))))))
(deftest maybe-broken
@ -198,42 +200,44 @@
disposed-c (rv/atom nil)
disposed-cns (rv/atom nil)
count-b (rv/atom 0)
count-c (atom 0)
b (rv/make-reaction (fn []
(swap! count-b inc)
(inc @a))
:on-dispose #(reset! disposed true))
c (rv/make-reaction #(if (< @a 1) (inc @b) (dec @a))
c (rv/make-reaction (fn []
(swap! count-c inc)
(if (< @a 1) (inc @b) (+ 100 @a)))
:on-dispose #(reset! disposed-c true))
res (rv/atom nil)
cns (rv/make-reaction #(reset! res @c)
:auto-run :async
:on-dispose #(reset! disposed-cns true))]
@cns
main (rv/make-reaction #(reset! res @c)
:on-dispose #(reset! disposed-cns true))
cns (rv/track! #(deref main))]
(is (= @res 2))
(is (= (+ 4 runs) (running)))
(is (= @count-b 1))
(is (= @count-c 1))
(reset! a -1)
(is (= @res 2))
(sync)
(is (= @res 1))
(is (= @disposed nil))
(is (= @count-b 2))
(is (= (+ 4 runs) (running)) "still running")
(is (= @count-c 2))
(reset! a 2)
(is (= @res 1) "unchanged")
(sync)
(is (= @res 1))
(is (= @disposed true))
(is (= (+ 2 runs) (running)) "less running count")
(is (= @res 102) "new value")
(is (= @count-c 3))
(is (= @disposed true) "should be disposed")
(reset! disposed nil)
(reset! a -1)
(sync)
;; This fails sometimes on node. I have no idea why.
(is (= 1 @res) "should be one again")
(is (= @disposed nil))
(reset! a 2)
(sync)
(is (= @res 1))
(is (= @res 102))
(is (= @disposed true))
(dispose cns)
(is (= @disposed-c true))
@ -245,7 +249,7 @@
(let [runs (running)
a (rv/atom 0)
b (rv/make-reaction #(+ 5 @a)
:auto-run :async
:auto-run true :async true
:on-set (fn [oldv newv]
(reset! a (+ 10 newv))))]
(sync)

View File

@ -5,6 +5,13 @@
[reagent.interop :refer-macros [.' .!]]
[reagent.core :as r]))
(defn fixture [f]
(set! rv/debug true)
(f)
(set! rv/debug false))
(t/use-fixtures :once fixture)
(defn running [] (rv/running))
(def isClient r/is-client)

View File

@ -4,8 +4,14 @@
[reagent.debug :refer-macros [dbg]]
[reagent.core :as r]))
(defn running []
(defn fixture [f]
(set! rv/debug true)
(f)
(set! rv/debug false))
(t/use-fixtures :once fixture)
(defn running []
(rv/running))
(def testite 10)
@ -13,7 +19,8 @@
(defn dispose [v]
(rv/dispose! v))
(defn sync [] (r/flush))
(defn sync []
(r/flush))
(enable-console-print!)
@ -38,7 +45,7 @@
(is (= @out 5))
(reset! start 1)
(is (= @out 8))
(is (= @count 2))
(is (= @count 4))
(dispose const)
(is (= (running) runs))))
@ -65,13 +72,13 @@
(is (= @count 1))
(sync)
(is (= @out 8))
(is (= @count 2))
(is (= @count 4))
(dispose const)
(swap! start inc)
(sync)
(is (= @count 2))
(is (= @count 4))
(is (= @const 11))
(is (= @count 3))
(is (= @count 5))
(is (= (running) runs))))
(deftest double-dependency

View File

@ -6,9 +6,15 @@
:refer [flush track track! dispose!] :refer-macros [with-let]]
[clojure.walk :as w]))
(defn fixture [f]
(set! rv/debug true)
(f)
(set! rv/debug false))
(t/use-fixtures :once fixture)
(defn running []
(r/flush)
(set! rv/debug true)
(rv/running))
(deftest basic-with-let