Finish jail-call deduplication

This commit is contained in:
janherich 2017-10-24 12:51:32 +02:00 committed by Roman Volosovskyi
parent ef27444ba8
commit bf71df17b5
6 changed files with 215 additions and 68 deletions

View File

@ -1,6 +1,6 @@
(ns status-im.native-module.impl.module
(:require-macros
[cljs.core.async.macros :refer [go-loop go]])
[cljs.core.async.macros :as async :refer [go-loop go]])
(:require [status-im.components.react :as r]
[re-frame.core :refer [dispatch]]
[taoensso.timbre :as log]
@ -9,6 +9,8 @@
[status-im.utils.platform :as p]
[status-im.utils.scheduler :as scheduler]
[status-im.utils.types :as types]
[status-im.utils.transducers :as transducers]
[status-im.utils.async :as async-util]
[status-im.react-native.js-dependencies :as rn-dependencies]
[status-im.native-module.module :as module]))
@ -28,7 +30,7 @@
(swap! calls conj args))
(defn call-module [f]
;(log/debug :call-module f)
;;(log/debug :call-module f)
(if @module-initialized?
(f)
(store-call f)))
@ -52,9 +54,9 @@
(defn init-jail []
(when status
(call-module
(fn []
(let [init-js (str js-res/status-js "I18n.locale = '" rn-dependencies/i18n.locale "';")]
(.initJail status init-js #(log/debug "jail initialized")))))))
(fn []
(let [init-js (str js-res/status-js "I18n.locale = '" rn-dependencies/i18n.locale "';")]
(.initJail status init-js #(log/debug "jail initialized")))))))
(defonce listener-initialized (atom false))
@ -71,7 +73,6 @@
(when status
(call-module #(.moveToInternalStorage status on-result))))
(defn stop-node []
(when status
(call-module #(.stopNode status))))
@ -126,79 +127,76 @@
(defn execute-call [{:keys [jail-id path params callback]}]
(when status
(call-module
#(do
(log/debug :call-jail :jail-id jail-id)
(log/debug :call-jail :path path)
(log/debug :call-jail :params params)
;; this debug message can contain sensitive info
#_(log/debug :call-jail :params params)
(let [params' (update params :context assoc
:debug js/goog.DEBUG
:locale rn-dependencies/i18n.locale)
cb (fn [r]
(let [{:keys [result] :as r'} (types/json->clj r)
{:keys [messages]} result]
(log/debug r')
(doseq [{:keys [type message]} messages]
(log/debug (str "VM console(" type ") - " message)))
(callback r')))]
(.callJail status jail-id (types/clj->json path) (types/clj->json params') cb))))))
#(do
(log/debug :call-jail :jail-id jail-id)
(log/debug :call-jail :path path)
;; this debug message can contain sensitive info
#_(log/debug :call-jail :params params)
(let [params' (update params :context assoc
:debug js/goog.DEBUG
:locale rn-dependencies/i18n.locale)
cb (fn [r]
(let [{:keys [result] :as r'} (types/json->clj r)
{:keys [messages]} result]
(log/debug r')
(doseq [{:keys [type message]} messages]
(log/debug (str "VM console(" type ") - " message)))
(callback r')))]
(.callJail status jail-id (types/clj->json path) (types/clj->json params') cb))))))
;; TODO(rasom): temporal solution, should be fixed on status-go side
(def check-raw-calls-interval 400)
(def interval-between-calls 100)
;; contains all calls to jail before with duplicates
(def raw-jail-calls (atom '()))
;; contains only calls that passed duplication check
(def jail-calls (atom '()))
;; We want the mainting (time) windowed queue of all calls to the jail
;; in order to de-duplicate certain type of calls made in rapid succession
;; where it's beneficial to only execute the last call of that type.
;;
;; The reason why is to improve performance and user feedback, for example
;; when making command argument suggestion lookups, everytime the command
;; input changes (so the user types/deletes a character), we need to fetch
;; new suggestions.
;; However the process of asynchronously fetching and displaying them
;; is unfortunately not instant, so without de-duplication, given that user
;; typed N characters in rapid succession, N percievable suggestion updates
;; will be performed after user already stopped typing, which gives
;; impression of slow, unresponsive UI.
;;
;; With de-duplication in some timeframe (set to 400ms currently), only
;; the last suggestion call for given jail-id jail-path combination is
;; made, and the UI feedback is much better + we save some unnecessary
;; calls to jail.
(defn remove-duplicate-calls
"Removes duplicates by [jail path] keys, remains the last one."
[[all-keys calls] {:keys [jail-id path] :as call}]
(if (and (contains? all-keys [jail-id path])
(not (#{:subscription :preview} (last path))))
[all-keys calls]
[(conj all-keys [jail-id path])
(conj calls call)]))
(def ^:private queue-flush-time 400)
(defn check-raw-calls-loop!
"Only the last call with [jail path] key is added to jail-calls list
from raw-jail-calls"
[]
(go-loop [_ nil]
(let [[_ new-calls] (reduce remove-duplicate-calls [#{} '()] @raw-jail-calls)]
(reset! raw-jail-calls '())
(swap! jail-calls (fn [old-calls]
(concat new-calls old-calls))))
(recur (<! (timeout check-raw-calls-interval)))))
(def ^:private call-queue (async/chan))
(def ^:private deduplicated-calls (async/chan))
(defn execute-calls-loop!
"Calls to jail are executed ne by one with interval-between-calls,
which reduces chances of response shuffling"
[]
(go-loop [_ nil]
(let [next-call (first @jail-calls)]
(swap! jail-calls rest)
(when next-call
(execute-call next-call)))
(recur (<! (timeout interval-between-calls)))))
(async-util/chunked-pipe! call-queue deduplicated-calls queue-flush-time)
(check-raw-calls-loop!)
(execute-calls-loop!)
(defn compare-calls
"Used as comparator deciding which calls should be de-duplicated.
Whenever we fetch suggestions, we only want to issue the last call
done in the `queue-flush-time` window, for all other calls, we have
de-duplicate based on call identity"
[{:keys [jail-id path] :as call}]
(if (= :suggestions (last path))
[jail-id path]
call))
(go-loop []
(doseq [call (sequence (transducers/last-distinct-by compare-calls) (<! deduplicated-calls))]
(execute-call call))
(recur))
(defn call-jail [call]
(swap! raw-jail-calls conj call))
;; TODO(rasom): end of sick magic, should be removed ^
(async/put! call-queue call))
(defn call-function!
[{:keys [chat-id function callback] :as opts}]
(let [path [:functions function]
params (select-keys opts [:parameters :context])]
(call-jail
{:jail-id chat-id
:path path
:params params
:callback (or callback #(dispatch [:received-bot-response {:chat-id chat-id} %]))})))
{:jail-id chat-id
:path path
:params params
:callback (or callback #(dispatch [:received-bot-response {:chat-id chat-id} %]))})))
(defn set-soft-input-mode [mode]
(when status

View File

@ -0,0 +1,25 @@
(ns status-im.utils.async
"Utility namespace containing `core.async` helper constructs"
(:require-macros [cljs.core.async.macros :as async])
(:require [cljs.core.async :as async]))
(defn chunked-pipe!
"Connects input channel to the output channel with time-based chunking.
`flush-time` parameter decides for how long we are waiting to accumulate
value from input channel in a vector before it's put on the output channel.
When `flush-time` interval elapses and there are no values accumulated, nothing
is put on the output channel till the next input arrives, which is then put on
the output channel immediately (wrapped in a vector).
When input channel is closed, output channel is closed as well and go-loop exits."
[input-ch output-ch flush-time]
(async/go-loop [acc []
flush? false]
(if flush?
(do (async/put! output-ch acc)
(recur [] false))
(let [[v ch] (async/alts! [input-ch (async/timeout flush-time)])]
(if (= ch input-ch)
(if v
(recur (conj acc v) (and (seq acc) flush?))
(async/close! output-ch))
(recur acc (seq acc)))))))

View File

@ -0,0 +1,26 @@
(ns status-im.utils.transducers
"Utility namespace containing various usefull transducers")
(defn last-distinct-by
"Just like regular `distinct`, but you provide function
computing the distinctness of input elements and when
duplicate elements are removed, the last, not the first
one is removed."
[compare-fn]
(fn [rf]
(let [accumulated-input (volatile! {:seen {}
:input []})]
(fn
([] (rf))
([result]
(reduce rf result (:input @accumulated-input)))
([result input]
(let [compare-value (compare-fn input)]
(if-let [previous-duplicate-index (get-in @accumulated-input [:seen compare-value])]
(do (vswap! accumulated-input assoc-in [:input previous-duplicate-index] input)
result)
(do (vswap! accumulated-input (fn [{previous-input :input :as accumulated-input}]
(-> accumulated-input
(update :seen assoc compare-value (count previous-input))
(update :input conj input))))
result))))))))

View File

@ -16,7 +16,9 @@
[status-im.test.utils.erc20]
[status-im.test.utils.random]
[status-im.test.utils.gfycat.core]
[status-im.test.utils.signing-phrase.core]))
[status-im.test.utils.signing-phrase.core]
[status-im.test.utils.transducers]
[status-im.test.utils.async]))
(enable-console-print!)
@ -27,6 +29,7 @@
(set! goog.DEBUG false)
(doo-tests
'status-im.test.utils.async
'status-im.test.chat.events
'status-im.test.accounts.events
;;'status-im.test.contacts.events
@ -43,4 +46,5 @@
'status-im.test.utils.erc20
'status-im.test.utils.random
'status-im.test.utils.gfycat.core
'status-im.test.utils.signing-phrase.core)
'status-im.test.utils.signing-phrase.core
'status-im.test.utils.transducers)

View File

@ -0,0 +1,36 @@
(ns status-im.test.utils.async
(:require-macros [cljs.core.async.macros :as async])
(:require [cljs.test :refer-macros [deftest is testing async]]
[cljs.core.async :as async]
[status-im.utils.async :as async-util]))
(deftest chunking-test
(testing "Accumulating result works as expected for `chunked-pipe!`"
(let [input (async/chan)
output (async/chan)]
(async-util/chunked-pipe! input output 100)
(async done
(async/go
(async/put! input 1)
(async/put! input 2)
(async/put! input 3)
(<! (async/timeout 110))
(async/put! input 1)
(async/put! input 2)
(<! (async/timeout 300))
(async/put! input 1)
(is (= [1 2 3] (async/<! output)))
(is (= [1 2] (async/<! output)))
(is (= [1] (async/<! output)))
(done))))))
(deftest chunking-closing-test
(testing "Closing input channel closes output channel connected through `chunked-pipe!`"
(let [input (async/chan)
output (async/chan)]
(async-util/chunked-pipe! input output 100)
(async done
(async/go
(async/close! input)
(is (= nil (async/<! output)))
(done))))))

View File

@ -0,0 +1,58 @@
(ns status-im.test.utils.transducers
(:require [cljs.test :refer-macros [deftest is testing]]
[status-im.utils.transducers :as transducers]
[status-im.native-module.impl.module :as native-module]))
(def ^:private preview-call-1
{:jail-id 1
:path [:preview]
:params {:chat-id 1}
:callback (fn []
[[:msg-id 1]])})
(def ^:private preview-call-2
{:jail-id 1
:path [:preview]
:params {:chat-id 1}
:callback (fn []
[[:msg-id 2]])})
(def ^:private jail-calls
'({:jail-id 1
:path [:suggestions]
:params {:arg 0}}
{:jail-id 1
:path [:function]
:params {:sub :a}}
{:jail-id 1
:path [:function]
:params {:sub :b}}
{:jail-id 1
:path [:suggestions]
:params {:arg 1}}
{:jail-id 1
:path [:suggestions]
:params {:arg 2}}
preview-call-1
preview-call-2))
(deftest last-distinct-by-test
(testing "Elements are removed from input according to provided `compare-fn`,
when duplicate elements are removed, the last one stays"
(is (= (sequence (transducers/last-distinct-by native-module/compare-calls) jail-calls)
'({:jail-id 1
:path [:suggestions]
:params {:arg 2}}
{:jail-id 1
:path [:function]
:params {:sub :a}}
{:jail-id 1
:path [:function]
:params {:sub :b}}
preview-call-1
preview-call-2))))
(testing "Edge cases with input size `N=0` and `N=1` work as well"
(is (= (sequence (transducers/last-distinct-by identity) '())
'()))
(is (= (sequence (transducers/last-distinct-by identity) '(1))
'(1)))))