Async audit
This commit is contained in:
parent
2b405d4d6c
commit
e379168452
|
@ -1,7 +1,5 @@
|
||||||
(ns status-im.chat.handlers
|
(ns status-im.chat.handlers
|
||||||
(:require-macros [cljs.core.async.macros :as am])
|
(:require [re-frame.core :refer [enrich after debug dispatch reg-fx]]
|
||||||
(:require [re-frame.core :refer [enrich after debug dispatch reg-fx]]
|
|
||||||
[cljs.core.async :as a]
|
|
||||||
[clojure.string :as string]
|
[clojure.string :as string]
|
||||||
[status-im.ui.components.styles :refer [default-chat-color]]
|
[status-im.ui.components.styles :refer [default-chat-color]]
|
||||||
[status-im.chat.constants :as chat-consts]
|
[status-im.chat.constants :as chat-consts]
|
||||||
|
|
|
@ -6,8 +6,7 @@
|
||||||
[taoensso.timbre :as log]
|
[taoensso.timbre :as log]
|
||||||
[cljs.core.async :as async :refer [<! timeout]]
|
[cljs.core.async :as async :refer [<! timeout]]
|
||||||
[status-im.utils.js-resources :as js-res]
|
[status-im.utils.js-resources :as js-res]
|
||||||
[status-im.utils.platform :as p]
|
[status-im.utils.platform :as p]
|
||||||
[status-im.utils.scheduler :as scheduler]
|
|
||||||
[status-im.utils.types :as types]
|
[status-im.utils.types :as types]
|
||||||
[status-im.utils.transducers :as transducers]
|
[status-im.utils.transducers :as transducers]
|
||||||
[status-im.utils.async :as async-util]
|
[status-im.utils.async :as async-util]
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
(ns status-im.protocol.web3.delivery
|
(ns status-im.protocol.web3.delivery
|
||||||
(:require-macros [cljs.core.async.macros :refer [go-loop go]])
|
(:require-macros [cljs.core.async.macros :as async])
|
||||||
(:require [cljs.core.async :refer [<! timeout]]
|
(:require [cljs.core.async :as async]
|
||||||
[status-im.protocol.web3.transport :as t]
|
[status-im.protocol.web3.transport :as t]
|
||||||
[status-im.protocol.web3.utils :as u]
|
[status-im.protocol.web3.utils :as u]
|
||||||
[status-im.protocol.encryption :as e]
|
[status-im.protocol.encryption :as e]
|
||||||
|
@ -53,16 +53,11 @@
|
||||||
(defonce pending-message-callback (atom nil))
|
(defonce pending-message-callback (atom nil))
|
||||||
(defonce recipient->pending-message (atom {}))
|
(defonce recipient->pending-message (atom {}))
|
||||||
|
|
||||||
(defn set-pending-mesage-callback!
|
(def ^:private pending-message-queue (async/chan 100))
|
||||||
[callback]
|
|
||||||
(reset! pending-message-callback callback))
|
|
||||||
|
|
||||||
(defn add-pending-message!
|
(async/go-loop [[web3 {:keys [type message-id requires-ack? to ack?] :as message}]
|
||||||
[web3 {:keys [type message-id requires-ack? to ack?] :as message}]
|
(async/<! pending-message-queue)]
|
||||||
{:pre [(valid? :protocol/message message)]}
|
(when message
|
||||||
(go
|
|
||||||
(debug :add-pending-message! message)
|
|
||||||
;; encryption can take some time, better to run asynchronously
|
|
||||||
(prepare-message
|
(prepare-message
|
||||||
web3 message
|
web3 message
|
||||||
(fn [message']
|
(fn [message']
|
||||||
|
@ -82,7 +77,19 @@
|
||||||
(swap! messages assoc-in [web3 message-id to] pending-message)
|
(swap! messages assoc-in [web3 message-id to] pending-message)
|
||||||
(when to
|
(when to
|
||||||
(swap! recipient->pending-message
|
(swap! recipient->pending-message
|
||||||
update to set/union #{[web3 message-id to]}))))))))
|
update to set/union #{[web3 message-id to]}))))))
|
||||||
|
(recur (async/<! pending-message-queue))))
|
||||||
|
|
||||||
|
(defn set-pending-mesage-callback!
|
||||||
|
[callback]
|
||||||
|
(reset! pending-message-callback callback))
|
||||||
|
|
||||||
|
(defn add-pending-message!
|
||||||
|
[web3 message]
|
||||||
|
{:pre [(valid? :protocol/message message)]}
|
||||||
|
(debug :add-pending-message! message)
|
||||||
|
;; encryption can take some time, better to run asynchronously
|
||||||
|
(async/put! pending-message-queue [web3 message]))
|
||||||
|
|
||||||
(s/def :delivery/pending-message
|
(s/def :delivery/pending-message
|
||||||
(s/keys :req-un [:message/sig :message/to :shh/payload :payload/ack? ::id
|
(s/keys :req-un [:message/sig :message/to :shh/payload :payload/ack? ::id
|
||||||
|
@ -228,26 +235,25 @@
|
||||||
(reset! loop-state stop?)
|
(reset! loop-state stop?)
|
||||||
;; go go!!!
|
;; go go!!!
|
||||||
(debug :init-loop)
|
(debug :init-loop)
|
||||||
(go-loop [_ nil]
|
(async/go-loop [_ nil]
|
||||||
(doseq [[_ messages] (@messages web3)]
|
(doseq [[_ messages] (@messages web3)]
|
||||||
(doseq [[_ {:keys [id message to type] :as data}] messages]
|
(doseq [[_ {:keys [id message to type] :as data}] messages]
|
||||||
;; check each message asynchronously
|
;; check each message asynchronously
|
||||||
(go
|
(when (should-be-retransmitted? options data)
|
||||||
(when (should-be-retransmitted? options data)
|
(try
|
||||||
(try
|
(let [message' (check-ttl message type ttl-config default-ttl)
|
||||||
(let [message' (check-ttl message type ttl-config default-ttl)
|
callback (delivery-callback web3 post-error-callback data message')]
|
||||||
callback (delivery-callback web3 post-error-callback data message')]
|
(t/post-message! web3 message' callback))
|
||||||
(t/post-message! web3 message' callback))
|
(catch :default err
|
||||||
(catch :default err
|
(log/error :post-message-error err))
|
||||||
(log/error :post-message-error err))
|
(finally
|
||||||
(finally
|
(attempt-was-made! web3 id to))))))
|
||||||
(attempt-was-made! web3 id to)))))))
|
|
||||||
(when-not @stop?
|
(when-not @stop?
|
||||||
(recur (<! (timeout delivery-loop-ms-interval)))))
|
(recur (async/<! (async/timeout delivery-loop-ms-interval)))))
|
||||||
(go-loop [_ nil]
|
(async/go-loop [_ nil]
|
||||||
(when-not @stop?
|
(when-not @stop?
|
||||||
(online-message)
|
(online-message)
|
||||||
(recur (<! (timeout (* 1000 send-online-s-interval))))))))
|
(recur (async/<! (async/timeout (* 1000 send-online-s-interval))))))))
|
||||||
|
|
||||||
(defn reset-pending-messages! [to]
|
(defn reset-pending-messages! [to]
|
||||||
(doseq [key (@recipient->pending-message to)]
|
(doseq [key (@recipient->pending-message to)]
|
||||||
|
|
|
@ -1,9 +0,0 @@
|
||||||
(ns status-im.utils.event
|
|
||||||
(:require [cljs.core.async :refer [<!]])
|
|
||||||
(:require-macros [cljs.core.async.macros :refer [go]]))
|
|
||||||
|
|
||||||
(defn handle-channel-events [chan handler]
|
|
||||||
(go (loop [[message args] (<! chan)]
|
|
||||||
(when message
|
|
||||||
(handler message args)
|
|
||||||
(recur (<! chan))))))
|
|
|
@ -8,10 +8,6 @@
|
||||||
(defn- add-message-mock [{:keys [id clock-value] :as msg}]
|
(defn- add-message-mock [{:keys [id clock-value] :as msg}]
|
||||||
(log/debug "add-message-mock:" id clock-value))
|
(log/debug "add-message-mock:" id clock-value))
|
||||||
|
|
||||||
(defn- delay-message [msg out ms]
|
|
||||||
(async/go (async/<! (async/timeout ms))
|
|
||||||
(async/put! out msg)))
|
|
||||||
|
|
||||||
(defn- earliest-clock-value-seen? [seen id clock-value]
|
(defn- earliest-clock-value-seen? [seen id clock-value]
|
||||||
(->> seen
|
(->> seen
|
||||||
(filter (fn [[_ x]] (= x id)))
|
(filter (fn [[_ x]] (= x id)))
|
||||||
|
@ -30,7 +26,8 @@
|
||||||
(async/go-loop []
|
(async/go-loop []
|
||||||
(let [{:keys [id clock-value] :as msg} (async/<! in-ch)]
|
(let [{:keys [id clock-value] :as msg} (async/<! in-ch)]
|
||||||
(swap! seen conj [clock-value id])
|
(swap! seen conj [clock-value id])
|
||||||
(delay-message msg mature-ch delay-ms))
|
(async/<! (async/timeout ms))
|
||||||
|
(async/put! mature-ch msg))
|
||||||
(recur))
|
(recur))
|
||||||
(async/go-loop []
|
(async/go-loop []
|
||||||
(let [{:keys [id clock-value] :as msg} (async/<! mature-ch)]
|
(let [{:keys [id clock-value] :as msg} (async/<! mature-ch)]
|
||||||
|
|
|
@ -1,10 +0,0 @@
|
||||||
(ns status-im.utils.scheduler
|
|
||||||
(:require-macros [cljs.core.async.macros :refer [go]])
|
|
||||||
(:require [cljs.core.async :refer [<! timeout]]))
|
|
||||||
|
|
||||||
(defn s->ms [s] (* 1000 s))
|
|
||||||
|
|
||||||
(defn execute-later
|
|
||||||
[function timeout-ms]
|
|
||||||
(go (<! (timeout timeout-ms))
|
|
||||||
(function)))
|
|
Loading…
Reference in New Issue