Introduce 'pre-receiver' for more intuitive message order

Includes tests and explanation of logic. Doesn't change any behavior, just
provides a new capability to be used when messages are received.
This commit is contained in:
Oskar Thoren 2017-12-16 16:55:47 +01:00
parent 3e940b5bd4
commit bb69995aa4
No known key found for this signature in database
GPG Key ID: 5128AB0637CD85AF
3 changed files with 127 additions and 0 deletions

View File

@ -0,0 +1,44 @@
(ns status-im.utils.pre-receiver
(:require-macros [cljs.core.async.macros :as async])
(:require [cljs.core.async :as async]
[taoensso.timbre :as log]))
;; See status-im.test.utils.pre-receiver for justification.
(defn- add-message-mock [{:keys [id clock-value] :as msg}]
(log/debug "add-message-mock:" id clock-value))
(defn- delay-message [msg out ms]
(async/go (async/<! (async/timeout ms))
(async/put! out msg)))
(defn- earliest-clock-value-seen? [seen id clock-value]
(->> seen
(filter (fn [[_ x]] (= x id)))
sort
ffirst
(= clock-value)))
(defn start!
"Starts a pre-receiver that returns channel to put messages on. Once
'delay-ms' (default 50ms) time has passed, calls add-fn on message."
[& [{:keys [delay-ms reorder? add-fn]
:or {delay-ms 50 reorder? true add-fn add-message-mock}}]]
(let [in-ch (async/chan)
mature-ch (async/chan)
seen (atom #{})]
(async/go-loop []
(let [{:keys [id clock-value] :as msg} (async/<! in-ch)]
(swap! seen conj [clock-value id])
(delay-message msg mature-ch delay-ms))
(recur))
(async/go-loop []
(let [{:keys [id clock-value] :as msg} (async/<! mature-ch)]
(if reorder?
(if (earliest-clock-value-seen? @seen id clock-value)
(do (swap! seen disj [clock-value id])
(add-fn msg))
(async/put! mature-ch msg))
(add-fn msg))
(recur)))
in-ch))

View File

@ -14,6 +14,7 @@
[status-im.test.utils.utils]
[status-im.test.utils.money]
[status-im.test.utils.clocks]
[status-im.test.utils.pre-receiver]
[status-im.test.utils.ethereum.eip681]
[status-im.test.utils.ethereum.core]
[status-im.test.utils.random]
@ -46,6 +47,7 @@
'status-im.test.utils.utils
'status-im.test.utils.money
'status-im.test.utils.clocks
'status-im.test.utils.pre-receiver
'status-im.test.utils.ethereum.eip681
'status-im.test.utils.ethereum.core
'status-im.test.utils.random

View File

@ -0,0 +1,81 @@
(ns status-im.test.utils.pre-receiver
(:require-macros [cljs.core.async.macros :as async])
(:require [cljs.test :refer-macros [deftest is testing async]]
[cljs.core.async :as async]
[status-im.utils.pre-receiver :as pre-receiver]))
;; The tests in clocks.cljs only ensure that the local clock value is respected
;; and that new messages are always appended correctly so we get a locally
;; consistent view.
;; Additionally, a desirable property to have is that two people talking to each
;; other have roughly the same ordering of messages. Example:
;; A and B are different chats with different chat identifiers. The sent
;; clock-value represents that client's truth, but The Network (Whisper, etc)
;; doesn't guarantee _delivery_ order. This means a client can receive the
;; messages in the following ordering.
(def messages [{:id "a" :clock-value 1 :payload "a1"}
{:id "a" :clock-value 2 :payload "a2"}
{:id "b" :clock-value 1 :payload "b1"}
{:id "a" :clock-value 4 :payload "a4"}
{:id "a" :clock-value 3 :payload "a3"}
{:id "b" :clock-value 2 :payload "b2"}])
;; Empirically speaking, "a4" arriving before "a3" happens when messages are
;; sent in quick succession, but the delay between these being delivered is
;; usually very small, i.e. <100ms.
;; Given this delivery order, we have a design decision to make. We can either
;; eagerly "commit" them, and thus update our local clock value to reflect the
;; order we see messages in. Alternatively, we can pause the commit/full receive
;; step and wait for some time for logically earlier messages arrive.
;; In 0.9.12 and earlier this is the behavior we had the former behavior, but
;; this breaks users expectation. The tests below showcases the latter behavior,
;; which can be turned on with a flag.
;; Invariant to maintain
(defn monotonic-increase? [received id clock-value]
(->> received
(filter (fn [[_ x]] (= x id)))
sort
last
first
((fn [v] (or (nil? v) (> clock-value v))))))
(defn add-message-test [received invariant? {:keys [id clock-value] :as msg}]
(when (not (monotonic-increase? @received id clock-value))
(println "add-message-test NOT earliest clock value seen!")
(println "add-message-test received:" (pr-str @received))
(println "add-message-test new:" id clock-value)
(is (not invariant?)))
(swap! received conj [clock-value id]))
(defn simulate! [{:keys [reorder? invariant? done]}]
(let [delay-ms 50
received (atom #{})
add-fn (partial add-message-test received invariant?)
in-ch (pre-receiver/start! {:delay-ms delay-ms
:reorder? reorder?
:add-fn add-fn})]
(doseq [msg messages]
(async/put! in-ch msg))
(async/go (async/<! (async/timeout (* delay-ms 2)))
(let [total (count messages)
poss (count @received)]
;;(println "received" poss "/" total "messages")
(reset! received #{})
(done)))))
(deftest pre-receiver
(testing "Pre-receiver with reorder - good case"
(async done
(simulate! {:reorder? true :invariant? true :done done})))
;; By setting invariant? to true this test will fail
(testing "Pre-receiver without reorder - bad case"
(async done
(simulate! {:reorder? false :invariant? false :done done}))))