From 17e886da118de68f90ac4e2f058049de2db067d3 Mon Sep 17 00:00:00 2001 From: Dmitry Novotochinov Date: Tue, 16 Jan 2018 09:31:15 +0300 Subject: [PATCH] [experiment #3038] Add buffer to process incoming messages asynchronously --- src/status_im/chat/views/message/message.cljs | 11 ++++---- src/status_im/protocol/handlers.cljs | 14 +++++++++- src/status_im/protocol/listeners.cljs | 16 +++++++---- src/status_im/utils/events_buffer.cljs | 28 +++++++++++++++++++ 4 files changed, 57 insertions(+), 12 deletions(-) create mode 100644 src/status_im/utils/events_buffer.cljs diff --git a/src/status_im/chat/views/message/message.cljs b/src/status_im/chat/views/message/message.cljs index 60dba77cc3..8d460a1266 100644 --- a/src/status_im/chat/views/message/message.cljs +++ b/src/status_im/chat/views/message/message.cljs @@ -14,6 +14,7 @@ [status-im.chat.views.message.request-message :as request-message] [status-im.constants :as constants] [status-im.ui.components.chat-icon.screen :as chat-icon.screen] + [status-im.utils.events-buffer :as events-buffer] [status-im.utils.identicon :as identicon] [status-im.utils.gfycat.core :as gfycat] [status-im.utils.platform :as platform] @@ -266,7 +267,7 @@ (if (:new? message) (let [layout-height (reagent/atom 0) anim-value (animation/create-value 1) - anim-callback #(re-frame/dispatch [:set-message-shown message]) + anim-callback #(events-buffer/dispatch [:set-message-shown message]) context {:to-value layout-height :val anim-value :callback anim-callback} @@ -293,10 +294,10 @@ ;; send `:seen` signal when we have signed-in user, message not from us and we didn't sent it already #(when (and current-public-key message-id chat-id (not outgoing) (not (chat.utils/message-seen-by? message current-public-key))) - (re-frame/dispatch [:send-seen! {:chat-id chat-id - :from from - :me current-public-key - :message-id message-id}])) + (events-buffer/dispatch [:send-seen! {:chat-id chat-id + :from from + :me current-public-key + :message-id message-id}])) :reagent-render (fn [{:keys [outgoing group-chat content-type content] :as message}] [message-container message diff --git a/src/status_im/protocol/handlers.cljs b/src/status_im/protocol/handlers.cljs index f9bb450b86..736d6cb5cd 100644 --- a/src/status_im/protocol/handlers.cljs +++ b/src/status_im/protocol/handlers.cljs @@ -11,10 +11,12 @@ [status-im.i18n :as i18n] [status-im.utils.random :as random] [status-im.protocol.message-cache :as cache] + [status-im.protocol.listeners :as listeners] [status-im.chat.utils :as chat.utils] [status-im.protocol.web3.inbox :as inbox] [status-im.protocol.web3.keys :as web3.keys] [status-im.utils.datetime :as datetime] + [status-im.utils.events-buffer :as events-buffer] [taoensso.timbre :as log :refer-macros [debug]] [status-im.native-module.core :as status] [clojure.string :as string] @@ -83,7 +85,7 @@ {:web3 web3 :identity public-key :groups groups - :callback #(re-frame/dispatch [:incoming-message %1 %2]) + :callback #(events-buffer/dispatch [:incoming-message %1 %2]) :ack-not-received-s-interval 125 :default-ttl 120 :send-online-s-interval 180 @@ -249,6 +251,10 @@ #(re-frame/dispatch [::request-messages-success %]) #(re-frame/dispatch [::request-messages-error %])))) +(re-frame/reg-fx + ::handle-whisper-message + listeners/handle-whisper-message) + ;;;; Handlers ;; NOTE(dmitryn): events chain @@ -316,6 +322,12 @@ (fn [_ [_ error]] (log/error "offline inbox: request-messages error" error))) +(handlers/register-handler-fx + :handle-whisper-message + (fn [_ [_ error msg options]] + {::handle-whisper-message {:error error + :msg msg + :options options}})) ;;; INITIALIZE PROTOCOL (handlers/register-handler-fx diff --git a/src/status_im/protocol/listeners.cljs b/src/status_im/protocol/listeners.cljs index 3dc0be9fe0..750ada8c6d 100644 --- a/src/status_im/protocol/listeners.cljs +++ b/src/status_im/protocol/listeners.cljs @@ -4,7 +4,8 @@ [status-im.protocol.web3.utils :as u] [status-im.protocol.encryption :as e] [taoensso.timbre :as log] - [status-im.utils.hex :as i])) + [status-im.utils.hex :as i] + [status-im.utils.events-buffer :as events-buffer])) (defn empty-public-key? [public-key] (or (= "0x0" public-key) @@ -84,13 +85,16 @@ (callback (if ack? :ack (:type payload)) message) (ack/check-ack! web3 sig payload identity)))) +(defn- handle-whisper-message [{:keys [error msg options]}] + (-> (init-scope error msg options) + parse-payload + filter-messages-from-same-user + parse-content + handle-message)) + (defn message-listener "Valid options are: web3, identity, callback, keypair" [options] (fn [js-error js-message] - (-> (init-scope js-error js-message options) - parse-payload - filter-messages-from-same-user - parse-content - handle-message))) + (events-buffer/dispatch [:handle-whisper-message js-error js-message options]))) diff --git a/src/status_im/utils/events_buffer.cljs b/src/status_im/utils/events_buffer.cljs new file mode 100644 index 0000000000..473764d19a --- /dev/null +++ b/src/status_im/utils/events_buffer.cljs @@ -0,0 +1,28 @@ +(ns status-im.utils.events-buffer + (:require [cljs.core.async :as async] + [re-frame.core :as re-frame]) + (:require-macros [cljs.core.async.macros :refer [go-loop]])) + +;; NOTE:(dmitryn) Ideally we should not exceed current buffer size. +;; Buffer length is an experimental number, consider to change it. +(defonce ^:private buffer (async/chan 10000)) + +;; NOTE:(dmitryn) Reference to re-frame event loop mechanism +;; https://github.com/Day8/re-frame/blob/master/src/re_frame/router.cljc#L8 +;; Might need future improvements. +;; "Fast" events could be processed in batches to speed up things, +;; so multiple buffers/channels could be introduced. +(defn- start-loop! [c t] + "Dispatches events to re-frame processing queue, + but in a way that doesn't block events processing." + (go-loop [e (async/