Revert "Async audit"

This reverts commit e379168452328bdc10e98a65b53cf0cce78037fa.
This commit is contained in:
Oskar Thoren 2018-01-13 00:09:41 +09:00
parent 00d665b2d5
commit 87dc4ead85
No known key found for this signature in database
GPG Key ID: 5128AB0637CD85AF
6 changed files with 57 additions and 38 deletions

View File

@ -1,5 +1,7 @@
(ns status-im.chat.handlers (ns status-im.chat.handlers
(:require [re-frame.core :refer [enrich after debug dispatch reg-fx]] (:require-macros [cljs.core.async.macros :as am])
(:require [re-frame.core :refer [enrich after debug dispatch reg-fx]]
[cljs.core.async :as a]
[clojure.string :as string] [clojure.string :as string]
[status-im.ui.components.styles :refer [default-chat-color]] [status-im.ui.components.styles :refer [default-chat-color]]
[status-im.chat.constants :as chat-consts] [status-im.chat.constants :as chat-consts]

View File

@ -6,7 +6,8 @@
[taoensso.timbre :as log] [taoensso.timbre :as log]
[cljs.core.async :as async :refer [<! timeout]] [cljs.core.async :as async :refer [<! timeout]]
[status-im.utils.js-resources :as js-res] [status-im.utils.js-resources :as js-res]
[status-im.utils.platform :as p] [status-im.utils.platform :as p]
[status-im.utils.scheduler :as scheduler]
[status-im.utils.types :as types] [status-im.utils.types :as types]
[status-im.utils.transducers :as transducers] [status-im.utils.transducers :as transducers]
[status-im.utils.async :as async-util] [status-im.utils.async :as async-util]

View File

@ -1,6 +1,6 @@
(ns status-im.protocol.web3.delivery (ns status-im.protocol.web3.delivery
(:require-macros [cljs.core.async.macros :as async]) (:require-macros [cljs.core.async.macros :refer [go-loop go]])
(:require [cljs.core.async :as async] (:require [cljs.core.async :refer [<! timeout]]
[status-im.protocol.web3.transport :as t] [status-im.protocol.web3.transport :as t]
[status-im.protocol.web3.utils :as u] [status-im.protocol.web3.utils :as u]
[status-im.protocol.encryption :as e] [status-im.protocol.encryption :as e]
@ -53,11 +53,16 @@
(defonce pending-message-callback (atom nil)) (defonce pending-message-callback (atom nil))
(defonce recipient->pending-message (atom {})) (defonce recipient->pending-message (atom {}))
(def ^:private pending-message-queue (async/chan 100)) (defn set-pending-mesage-callback!
[callback]
(reset! pending-message-callback callback))
(async/go-loop [[web3 {:keys [type message-id requires-ack? to ack?] :as message}] (defn add-pending-message!
(async/<! pending-message-queue)] [web3 {:keys [type message-id requires-ack? to ack?] :as message}]
(when message {:pre [(valid? :protocol/message message)]}
(go
(debug :add-pending-message! message)
;; encryption can take some time, better to run asynchronously
(prepare-message (prepare-message
web3 message web3 message
(fn [message'] (fn [message']
@ -77,19 +82,7 @@
(swap! messages assoc-in [web3 message-id to] pending-message) (swap! messages assoc-in [web3 message-id to] pending-message)
(when to (when to
(swap! recipient->pending-message (swap! recipient->pending-message
update to set/union #{[web3 message-id to]})))))) update to set/union #{[web3 message-id to]}))))))))
(recur (async/<! pending-message-queue))))
(defn set-pending-mesage-callback!
[callback]
(reset! pending-message-callback callback))
(defn add-pending-message!
[web3 message]
{:pre [(valid? :protocol/message message)]}
(debug :add-pending-message! message)
;; encryption can take some time, better to run asynchronously
(async/put! pending-message-queue [web3 message]))
(s/def :delivery/pending-message (s/def :delivery/pending-message
(s/keys :req-un [:message/sig :message/to :shh/payload :payload/ack? ::id (s/keys :req-un [:message/sig :message/to :shh/payload :payload/ack? ::id
@ -235,25 +228,26 @@
(reset! loop-state stop?) (reset! loop-state stop?)
;; go go!!! ;; go go!!!
(debug :init-loop) (debug :init-loop)
(async/go-loop [_ nil] (go-loop [_ nil]
(doseq [[_ messages] (@messages web3)] (doseq [[_ messages] (@messages web3)]
(doseq [[_ {:keys [id message to type] :as data}] messages] (doseq [[_ {:keys [id message to type] :as data}] messages]
;; check each message asynchronously ;; check each message asynchronously
(when (should-be-retransmitted? options data) (go
(try (when (should-be-retransmitted? options data)
(let [message' (check-ttl message type ttl-config default-ttl) (try
callback (delivery-callback web3 post-error-callback data message')] (let [message' (check-ttl message type ttl-config default-ttl)
(t/post-message! web3 message' callback)) callback (delivery-callback web3 post-error-callback data message')]
(catch :default err (t/post-message! web3 message' callback))
(log/error :post-message-error err)) (catch :default err
(finally (log/error :post-message-error err))
(attempt-was-made! web3 id to)))))) (finally
(attempt-was-made! web3 id to)))))))
(when-not @stop? (when-not @stop?
(recur (async/<! (async/timeout delivery-loop-ms-interval))))) (recur (<! (timeout delivery-loop-ms-interval)))))
(async/go-loop [_ nil] (go-loop [_ nil]
(when-not @stop? (when-not @stop?
(online-message) (online-message)
(recur (async/<! (async/timeout (* 1000 send-online-s-interval)))))))) (recur (<! (timeout (* 1000 send-online-s-interval))))))))
(defn reset-pending-messages! [to] (defn reset-pending-messages! [to]
(doseq [key (@recipient->pending-message to)] (doseq [key (@recipient->pending-message to)]

View File

@ -0,0 +1,9 @@
(ns status-im.utils.event
(:require [cljs.core.async :refer [<!]])
(:require-macros [cljs.core.async.macros :refer [go]]))
(defn handle-channel-events [chan handler]
(go (loop [[message args] (<! chan)]
(when message
(handler message args)
(recur (<! chan))))))

View File

@ -8,6 +8,10 @@
(defn- add-message-mock [{:keys [id clock-value] :as msg}] (defn- add-message-mock [{:keys [id clock-value] :as msg}]
(log/debug "add-message-mock:" id clock-value)) (log/debug "add-message-mock:" id clock-value))
(defn- delay-message [msg out ms]
(async/go (async/<! (async/timeout ms))
(async/put! out msg)))
(defn- earliest-clock-value-seen? [seen id clock-value] (defn- earliest-clock-value-seen? [seen id clock-value]
(->> seen (->> seen
(filter (fn [[_ x]] (= x id))) (filter (fn [[_ x]] (= x id)))
@ -26,8 +30,7 @@
(async/go-loop [] (async/go-loop []
(let [{:keys [id clock-value] :as msg} (async/<! in-ch)] (let [{:keys [id clock-value] :as msg} (async/<! in-ch)]
(swap! seen conj [clock-value id]) (swap! seen conj [clock-value id])
(async/<! (async/timeout ms)) (delay-message msg mature-ch delay-ms))
(async/put! mature-ch msg))
(recur)) (recur))
(async/go-loop [] (async/go-loop []
(let [{:keys [id clock-value] :as msg} (async/<! mature-ch)] (let [{:keys [id clock-value] :as msg} (async/<! mature-ch)]

View File

@ -0,0 +1,10 @@
(ns status-im.utils.scheduler
(:require-macros [cljs.core.async.macros :refer [go]])
(:require [cljs.core.async :refer [<! timeout]]))
(defn s->ms [s] (* 1000 s))
(defn execute-later
[function timeout-ms]
(go (<! (timeout timeout-ms))
(function)))