From de0d98da20baf9e6ece6fc3f562e6db181c50375 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Mon, 1 Jul 2019 19:11:48 +0200 Subject: [PATCH] Use a signal for messages This commits changes the behavior to read from a signal instead of polling each filter. We receive a signal from status-go every 0.3 seconds, only if new messages are received. We receive a single signal for all the chats, and we don't dispatch anymore on every message. Signed-off-by: Andrea Maria Piana --- STATUS_GO_SHA256 | 2 +- STATUS_GO_VERSION | 2 +- src/status_im/accounts/logout/core.cljs | 4 +- src/status_im/signals/core.cljs | 3 +- src/status_im/transport/core.cljs | 15 +--- src/status_im/transport/filters/core.cljs | 80 ++++--------------- src/status_im/transport/message/core.cljs | 45 ++++++----- test/cljs/status_im/test/transport/core.cljs | 17 ++-- .../test/transport/filters/core.cljs | 18 ++--- 9 files changed, 69 insertions(+), 117 deletions(-) diff --git a/STATUS_GO_SHA256 b/STATUS_GO_SHA256 index 19413080d1..ba47287ee3 100644 --- a/STATUS_GO_SHA256 +++ b/STATUS_GO_SHA256 @@ -1,3 +1,3 @@ ## DO NOT EDIT THIS FILE BY HAND. USE `scripts/update-status-go.sh ` instead -0hrjppsayih655a7cz3cdl3yp5ff98cxxcrmzwlzsfaqj82jk7jp +1nm0b1nbz4ly2fx6n2pybrfa1m9x0zzj7kkkqf7fwjxg6hjnp6y5 diff --git a/STATUS_GO_VERSION b/STATUS_GO_VERSION index a4d7834a86..628318871c 100644 --- a/STATUS_GO_VERSION +++ b/STATUS_GO_VERSION @@ -1,3 +1,3 @@ ## DO NOT EDIT THIS FILE BY HAND. USE `scripts/update-status-go.sh ` instead -v0.28.0-beta.0 +v0.30.0-beta.0 diff --git a/src/status_im/accounts/logout/core.cljs b/src/status_im/accounts/logout/core.cljs index c4e8a815e7..c1e203c48b 100644 --- a/src/status_im/accounts/logout/core.cljs +++ b/src/status_im/accounts/logout/core.cljs @@ -11,9 +11,9 @@ [{:keys [db] :as cofx}] (fx/merge cofx {:keychain/clear-user-password (get-in db [:account/account :address]) + :dispatch [:accounts.logout/filters-removed] :dev-server/stop nil} - (transport/stop-whisper - #(re-frame/dispatch [:accounts.logout/filters-removed])) + (transport/stop-whisper) (chaos-mode/stop-checking))) (fx/defn leave-account diff --git a/src/status_im/signals/core.cljs b/src/status_im/signals/core.cljs index a5fd597c72..10de55341f 100644 --- a/src/status_im/signals/core.cljs +++ b/src/status_im/signals/core.cljs @@ -79,4 +79,5 @@ "subscriptions.error" (ethereum.subscriptions/handle-error cofx event) "status.chats.did-change" (chat.loading/load-chats-from-rpc cofx) "whisper.filter.added" (transport.filters/handle-negotiated-filter cofx event) - (log/debug "Event " type " not handled")))) + "messages.new" (transport.message/receive-messages cofx event) + (log/debug "Event " type " not handled" event)))) diff --git a/src/status_im/transport/core.cljs b/src/status_im/transport/core.cljs index ce2534b307..81304e630c 100644 --- a/src/status_im/transport/core.cljs +++ b/src/status_im/transport/core.cljs @@ -36,8 +36,6 @@ (fx/defn init-whisper "Initialises whisper protocol by: - - adding fixed shh discovery filter - - restoring existing symetric keys along with their unique filters - (optionally) initializing mailserver" [{:keys [db web3 all-installations] :as cofx}] (fx/merge cofx @@ -49,13 +47,6 @@ (message/resend-contact-messages []))) (fx/defn stop-whisper - "Stops whisper protocol by removing all existing shh filters - It is necessary to remove the filters because status-go there isn't currently a logout feature in status-go - to clean-up after logout. When logging out of account A and logging in account B, account B would receive - account A messages without this." - [{:keys [db] :as cofx} callback] - (let [{:transport/keys [filters]} db] - (fx/merge - cofx - (transport.filters/stop-filters callback) - (publisher/stop-fx)))) + "Stops whisper protocol" + [cofx] + (publisher/stop-fx cofx)) diff --git a/src/status_im/transport/filters/core.cljs b/src/status_im/transport/filters/core.cljs index 9add7c7020..b81bdfa617 100644 --- a/src/status_im/transport/filters/core.cljs +++ b/src/status_im/transport/filters/core.cljs @@ -27,16 +27,13 @@ ;; fx functions -(defn load-filter-fx [web3 filters] - {:filters/load-filters [[web3 filters]]}) +(defn load-filter-fx [filters] + {:filters/load-filters filters}) (defn remove-filter-fx [filters] (when (seq filters) {:filters/remove-filters filters})) -(defn stop-filter-fx [filters callback] - {:filters/stop-filters [filters callback]}) - ;; dispatches (defn filters-added! [filters] @@ -45,9 +42,6 @@ (defn filters-removed! [filters] (re-frame/dispatch [:filters.callback/filters-removed filters])) -(defn- receive-message [chat-id js-error js-message] - (re-frame/dispatch [:transport/messages-received js-error js-message chat-id])) - ;; Mailserver topics (fx/defn upsert-mailserver-topic @@ -192,28 +186,6 @@ ;; shh filters -(defn build-filter - "Create a raw filter from a filter response" - [web3 {:keys [filter-id - discovery? - negotiated? - chat-id] :as f}] - (let [shh-filter (.newRawMessageFilter - (utils/shh web3) - (clj->js {:allowP2P true - :filterId filter-id}) - ;; We set chat-id to nil on discovery or negotiated - ;; as we have multiple people sending on discovery and pairing - ;; messages on negotiated - (partial receive-message (if (or discovery? negotiated?) nil chat-id)))] - (assoc f :filter shh-filter))) - -(defn- add-filters! - [web3 filters] - (log/debug "PERF" :add-raw-filters filters) - (filters-added! - (map (partial build-filter web3) filters))) - (defn- responses->filters [{:keys [negotiated discovery filterId @@ -266,8 +238,7 @@ processed-filters)] (when (seq new-filters) {:filters/add-raw-filters - {:web3 (get-in cofx [:db :web3]) - :filters new-filters}}))) + {:filters new-filters}}))) (fx/defn load-filters "Load all contacts and chats as filters" @@ -278,18 +249,7 @@ filters (concat (chats->filter-requests chats) (contacts->filter-requests contacts))] - (load-filter-fx (:web3 db) filters))) - -(defn stop-watching! [filters] - (doseq [f filters] - (.stopWatching f (constantly nil)))) - -(fx/defn stop-filters - "Stop all filters" - [{:keys [db]} callback] - (let [filters (map :filter (vals - (:filter/filters db)))] - (stop-filter-fx filters callback))) + (load-filter-fx filters))) ;; Load functions: utility function to load filters @@ -299,20 +259,20 @@ (when (and (filters-initialized? db) (not (chat-loaded? db chat-id))) (let [chat (get-in db [:chats chat-id])] - (load-filter-fx (:web3 db) (->filter-request chat))))) + (load-filter-fx (->filter-request chat))))) (fx/defn load-contact "Check if we already have a filter for that contact, otherwise load the filter if the contact has been added" [{:keys [db]} contact] (when-not (chat-loaded? db (:public-key contact)) - (load-filter-fx (:web3 db) (contacts->filter-requests [contact])))) + (load-filter-fx (contacts->filter-requests [contact])))) (fx/defn load-member "Check if we already have a filter for that member, otherwise load the filter, regardless of whether is in our contacts" [{:keys [db]} public-key] (when-not (chat-loaded? db public-key) - (load-filter-fx (:web3 db) (->filter-request {:chat-id public-key})))) + (load-filter-fx (->filter-request {:chat-id public-key})))) (fx/defn load-members "Load multiple members" @@ -360,9 +320,9 @@ (re-frame/reg-fx :filters/add-raw-filters - (fn [{:keys [web3 filters]}] + (fn [{:keys [filters]}] (log/debug "PERF" :filters/add-raw-filters) - (add-filters! web3 filters))) + (filters-added! filters))) ;; Here we stop first polling and then we hit status-go, otherwise it would throw ;; an error trying to poll from a delete filter. If we fail to remove the filter though @@ -371,26 +331,16 @@ :filters/remove-filters (fn [filters] (log/debug "removing filters" filters) - (stop-watching! (map :filter filters)) (remove-filters-rpc (map ->remove-filter-request filters) #(filters-removed! filters) #(log/error "remove-filters: failed error" %)))) -(re-frame/reg-fx - :filters/stop-filters - (fn [[filters callback]] - (stop-watching! filters) - (callback))) - (re-frame/reg-fx :filters/load-filters - (fn [ops] - (when (seq ops) - (let [web3 (first (peek ops)) - filters (mapcat peek ops)] - (load-filters-rpc - filters - #(add-filters! web3 - (map responses->filters %)) - #(log/error "load-filters: failed error" %)))))) + (fn [filters] + (load-filters-rpc + filters + #(filters-added! (map responses->filters %)) + #(log/error "load-filters: failed error" %)))) + diff --git a/src/status_im/transport/message/core.cljs b/src/status_im/transport/message/core.cljs index ba260bb5b2..a11f298c3f 100644 --- a/src/status_im/transport/message/core.cljs +++ b/src/status_im/transport/message/core.cljs @@ -14,28 +14,21 @@ [status-im.utils.fx :as fx] [taoensso.timbre :as log])) -(defn unwrap-message - "Extract message from new payload {:id some-id :message some-message} - or old (just plain message)" - [js-message] - (let [clj-message (js->clj js-message :keywordize-keys true) - {:keys [message id]} clj-message] - {:message (or message clj-message) - :raw-payload (if message - (o/get js-message "message") - js-message) - :id id})) +(defn add-raw-payload + "Add raw payload for id calculation" + [{:keys [message] :as m}] + (assoc m :raw-payload (clj->js message))) (fx/defn receive-message "Receive message handles a new status-message. dedup-id is passed by status-go and is used to deduplicate messages at that layer. Once a message has been successfuly processed, that id needs to be sent back in order to stop receiving that message" - [cofx now-in-s filter-chat-id js-message] + [cofx now-in-s filter-chat-id message] (let [blocked-contacts (get-in cofx [:db :contacts/blocked] #{}) {{:keys [payload sig timestamp ttl]} :message dedup-id :id - raw-payload :raw-payload} (unwrap-message js-message) + raw-payload :raw-payload} (add-raw-payload message) status-message (-> payload ethereum/hex-to-utf8 transit/deserialize)] @@ -63,15 +56,31 @@ [obj])) (fx/defn receive-whisper-messages - [{:keys [now] :as cofx} js-error js-messages chat-id] - (if (and (not js-error) - js-messages) + [{:keys [now] :as cofx} error messages chat-id] + (if (and (not error) + messages) (let [now-in-s (quot now 1000) receive-message-fxs (map (fn [message] (receive-message now-in-s chat-id message)) - (js-obj->seq js-messages))] + messages)] (apply fx/merge cofx receive-message-fxs)) - (log/error "Something went wrong" js-error js-messages))) + (log/error "Something went wrong" error messages))) + +(fx/defn receive-messages [cofx event] + (let [fxs (map + (fn [{:keys [chat messages error]}] + (receive-whisper-messages + error + messages + ;; For discovery and negotiated filters we don't + ;; set a chatID, and we use the signature of the message + ;; to indicate which chat it is for + (if (or (:discovery chat) + (:negotiated chat)) + nil + (:chatId chat)))) + (:messages event))] + (apply fx/merge cofx fxs))) (fx/defn remove-hash [{:keys [db] :as cofx} envelope-hash] diff --git a/test/cljs/status_im/test/transport/core.cljs b/test/cljs/status_im/test/transport/core.cljs index b9c3cddcef..ffae94edcd 100644 --- a/test/cljs/status_im/test/transport/core.cljs +++ b/test/cljs/status_im/test/transport/core.cljs @@ -56,14 +56,15 @@ (def sig "0x04325367620ae20dd878dbb39f69f02c567d789dd21af8a88623dc5b529827c2812571c380a2cd8236a2851b8843d6486481166c39debf60a5d30b9099c66213e4") -(def messages #js [{:sig sig - :ttl 10 - :timestamp 1527692015 - :topic "0x9c22ff5f" - :payload "0x5b227e236334222c5b2246222c22746578742f706c61696e222c227e3a7075626c69632d67726f75702d757365722d6d657373616765222c3135323736393230313433383130312c313532373639323031343337375d5d" - :padding "0xbf06347cc7f9aa18b4a846032264a88f559d9b14079975d14b10648847c0543a77a80624e101c082d19b502ae3b4f97958d18abf59eb0a82afc1301aa22470495fac739a30c2f563599fa8d8e09363a43d39311596b7f119dee7b046989c08224f1ef5cdc385" - :pow 0.002631578947368421 - :hash "0x220ef9994a4fae64c112b27ed07ef910918159cbe6fcf8ac515ee2bf9a6711a0"}]) +(def messages [{:id "someid" + :message {:sig sig + :ttl 10 + :timestamp 1527692015 + :topic "0x9c22ff5f" + :payload "0x5b227e236334222c5b2246222c22746578742f706c61696e222c227e3a7075626c69632d67726f75702d757365722d6d657373616765222c3135323736393230313433383130312c313532373639323031343337375d5d" + :padding "0xbf06347cc7f9aa18b4a846032264a88f559d9b14079975d14b10648847c0543a77a80624e101c082d19b502ae3b4f97958d18abf59eb0a82afc1301aa22470495fac739a30c2f563599fa8d8e09363a43d39311596b7f119dee7b046989c08224f1ef5cdc385" + :pow 0.002631578947368421 + :hash "0x220ef9994a4fae64c112b27ed07ef910918159cbe6fcf8ac515ee2bf9a6711a0"}}]) (deftest receive-whisper-messages-test (testing "an error is reported" diff --git a/test/cljs/status_im/test/transport/filters/core.cljs b/test/cljs/status_im/test/transport/filters/core.cljs index ced2ff3224..6ba32c93c3 100644 --- a/test/cljs/status_im/test/transport/filters/core.cljs +++ b/test/cljs/status_im/test/transport/filters/core.cljs @@ -93,19 +93,19 @@ (deftest load-member (testing "it returns fx for a member" - (is (= {:filters/load-filters [[nil [{:ChatID "0xchat-id-2" - :OneToOne true - :Identity "chat-id-2"}]]]} + (is (= {:filters/load-filters [{:ChatID "0xchat-id-2" + :OneToOne true + :Identity "chat-id-2"}]} (transport.filters/load-member {:db {}} "0xchat-id-2")))) (testing "merging fx" (is (= {:db {} - :filters/load-filters [[nil [{:ChatID "0xchat-id-1" - :OneToOne true - :Identity "chat-id-1"}]] - [nil [{:ChatID "0xchat-id-2" - :OneToOne true - :Identity "chat-id-2"}]]]} + :filters/load-filters [{:ChatID "0xchat-id-1" + :OneToOne true + :Identity "chat-id-1"} + {:ChatID "0xchat-id-2" + :OneToOne true + :Identity "chat-id-2"}]} (apply fx/merge {:db {}} (map transport.filters/load-member ["0xchat-id-1" "0xchat-id-2"]))))))