[experiment #3038] Add buffer to process incoming messages asynchronously

This commit is contained in:
Dmitry Novotochinov 2018-01-16 09:31:15 +03:00 committed by Eric Dvorsak
parent 8396442847
commit 17e886da11
No known key found for this signature in database
GPG Key ID: 932AC1CE5F05DE0C
4 changed files with 57 additions and 12 deletions

View File

@ -14,6 +14,7 @@
[status-im.chat.views.message.request-message :as request-message]
[status-im.constants :as constants]
[status-im.ui.components.chat-icon.screen :as chat-icon.screen]
[status-im.utils.events-buffer :as events-buffer]
[status-im.utils.identicon :as identicon]
[status-im.utils.gfycat.core :as gfycat]
[status-im.utils.platform :as platform]
@ -266,7 +267,7 @@
(if (:new? message)
(let [layout-height (reagent/atom 0)
anim-value (animation/create-value 1)
anim-callback #(re-frame/dispatch [:set-message-shown message])
anim-callback #(events-buffer/dispatch [:set-message-shown message])
context {:to-value layout-height
:val anim-value
:callback anim-callback}
@ -293,10 +294,10 @@
;; send `:seen` signal when we have signed-in user, message not from us and we didn't sent it already
#(when (and current-public-key message-id chat-id (not outgoing)
(not (chat.utils/message-seen-by? message current-public-key)))
(re-frame/dispatch [:send-seen! {:chat-id chat-id
:from from
:me current-public-key
:message-id message-id}]))
(events-buffer/dispatch [:send-seen! {:chat-id chat-id
:from from
:me current-public-key
:message-id message-id}]))
:reagent-render
(fn [{:keys [outgoing group-chat content-type content] :as message}]
[message-container message

View File

@ -11,10 +11,12 @@
[status-im.i18n :as i18n]
[status-im.utils.random :as random]
[status-im.protocol.message-cache :as cache]
[status-im.protocol.listeners :as listeners]
[status-im.chat.utils :as chat.utils]
[status-im.protocol.web3.inbox :as inbox]
[status-im.protocol.web3.keys :as web3.keys]
[status-im.utils.datetime :as datetime]
[status-im.utils.events-buffer :as events-buffer]
[taoensso.timbre :as log :refer-macros [debug]]
[status-im.native-module.core :as status]
[clojure.string :as string]
@ -83,7 +85,7 @@
{:web3 web3
:identity public-key
:groups groups
:callback #(re-frame/dispatch [:incoming-message %1 %2])
:callback #(events-buffer/dispatch [:incoming-message %1 %2])
:ack-not-received-s-interval 125
:default-ttl 120
:send-online-s-interval 180
@ -249,6 +251,10 @@
#(re-frame/dispatch [::request-messages-success %])
#(re-frame/dispatch [::request-messages-error %]))))
(re-frame/reg-fx
::handle-whisper-message
listeners/handle-whisper-message)
;;;; Handlers
;; NOTE(dmitryn): events chain
@ -316,6 +322,12 @@
(fn [_ [_ error]]
(log/error "offline inbox: request-messages error" error)))
(handlers/register-handler-fx
:handle-whisper-message
(fn [_ [_ error msg options]]
{::handle-whisper-message {:error error
:msg msg
:options options}}))
;;; INITIALIZE PROTOCOL
(handlers/register-handler-fx

View File

@ -4,7 +4,8 @@
[status-im.protocol.web3.utils :as u]
[status-im.protocol.encryption :as e]
[taoensso.timbre :as log]
[status-im.utils.hex :as i]))
[status-im.utils.hex :as i]
[status-im.utils.events-buffer :as events-buffer]))
(defn empty-public-key? [public-key]
(or (= "0x0" public-key)
@ -84,13 +85,16 @@
(callback (if ack? :ack (:type payload)) message)
(ack/check-ack! web3 sig payload identity))))
(defn- handle-whisper-message [{:keys [error msg options]}]
(-> (init-scope error msg options)
parse-payload
filter-messages-from-same-user
parse-content
handle-message))
(defn message-listener
"Valid options are: web3, identity, callback, keypair"
[options]
(fn [js-error js-message]
(-> (init-scope js-error js-message options)
parse-payload
filter-messages-from-same-user
parse-content
handle-message)))
(events-buffer/dispatch [:handle-whisper-message js-error js-message options])))

View File

@ -0,0 +1,28 @@
(ns status-im.utils.events-buffer
(:require [cljs.core.async :as async]
[re-frame.core :as re-frame])
(:require-macros [cljs.core.async.macros :refer [go-loop]]))
;; NOTE:(dmitryn) Ideally we should not exceed current buffer size.
;; Buffer length is an experimental number, consider to change it.
(defonce ^:private buffer (async/chan 10000))
;; NOTE:(dmitryn) Reference to re-frame event loop mechanism
;; https://github.com/Day8/re-frame/blob/master/src/re_frame/router.cljc#L8
;; Might need future improvements.
;; "Fast" events could be processed in batches to speed up things,
;; so multiple buffers/channels could be introduced.
(defn- start-loop! [c t]
"Dispatches events to re-frame processing queue,
but in a way that doesn't block events processing."
(go-loop [e (async/<! c)]
(re-frame/dispatch e)
(async/<! (async/timeout t))
(recur (async/<! c))))
(defonce ^:private dispatch-loop (start-loop! buffer 0))
;; Accepts re-frame event vector [:event-id args]
;; NOTE(dmitryn) Puts all events into a single buffer (naive approach).
(defn dispatch [event]
(async/put! buffer event))