signals processing performance improvements [#10287]

Signed-off-by: Andrey Shovkoplyas <motor4ik@gmail.com>
This commit is contained in:
Andrey Shovkoplyas 2020-04-10 11:24:15 +02:00
parent c9308af4d8
commit de4e73a256
No known key found for this signature in database
GPG Key ID: EAAB7C8622D860A4
7 changed files with 166 additions and 101 deletions

View File

@ -63,6 +63,11 @@
[{:keys [current-chat-id] :as db} ui-element]
(update-in db [:chat-ui-props current-chat-id ui-element] not))
(defn dissoc-join-time-fields [db chat-id]
(update-in db [:chats chat-id] dissoc
:join-time-mail-request-id
:might-have-join-time-messages?))
(fx/defn join-time-messages-checked
"The key :might-have-join-time-messages? in public chats signals that
the public chat is freshly (re)created and requests for messages to the
@ -74,11 +79,15 @@
public chat is not fresh anymore."
[{:keys [db] :as cofx} chat-id]
(when (:might-have-join-time-messages? (get-chat cofx chat-id))
{:db (update-in db
[:chats chat-id]
dissoc
:join-time-mail-request-id
:might-have-join-time-messages?)}))
{:db (dissoc-join-time-fields db chat-id)}))
(fx/defn join-time-messages-checked-for-chats
[{:keys [db]} chat-ids]
{:db (reduce #(if (:might-have-join-time-messages? (get-chat {:db %1} %2))
(dissoc-join-time-fields %1 %2)
%1)
db
chat-ids)})
(defn- create-new-chat
[chat-id {:keys [db now]}]
@ -107,6 +116,30 @@
(when (and public? new?)
(transport.filters/load-chat chat-id)))))
(defn map-chats [{:keys [db] :as cofx}]
(fn [val]
(merge
(or (get (:chats db) (:chat-id val))
(create-new-chat (:chat-id val) cofx))
val)))
(defn filter-chats [db]
(fn [val]
(and (not (get-in db [:chats (:chat-id val)])) (:public? val))))
(fx/defn ensure-chats
"Add chats to db and update"
[{:keys [db] :as cofx} chats]
(let [chats (map (map-chats cofx) chats)
filtered-chats (filter (filter-chats db) chats)]
(fx/merge cofx
{:db (update db :chats #(reduce
(fn [acc {:keys [chat-id] :as chat}]
(update acc chat-id merge chat))
%
chats))}
(transport.filters/load-chats filtered-chats))))
(fx/defn upsert-chat
"Upsert chat when not deleted"
[{:keys [db] :as cofx} {:keys [chat-id] :as chat-props}]

View File

@ -2,7 +2,6 @@
(:require [re-frame.core :as re-frame]
[status-im.multiaccounts.model :as multiaccounts.model]
[status-im.ethereum.json-rpc :as json-rpc]
[status-im.utils.config :as config]
[status-im.chat.db :as chat.db]
[status-im.waku.core :as waku]
[status-im.chat.models :as chat-model]
@ -12,18 +11,10 @@
[status-im.constants :as constants]
[status-im.contact.db :as contact.db]
[status-im.data-store.messages :as data-store.messages]
[status-im.ethereum.core :as ethereum]
[status-im.mailserver.core :as mailserver]
[status-im.native-module.core :as status]
[status-im.ui.screens.chat.state :as view.state]
[status-im.transport.message.protocol :as protocol]
[status-im.transport.utils :as transport.utils]
[status-im.ui.components.react :as react]
[status-im.utils.clocks :as utils.clocks]
[status-im.utils.datetime :as time]
[status-im.utils.fx :as fx]
[status-im.utils.platform :as platform]
[status-im.utils.types :as types]
[taoensso.timbre :as log]))
(defn- prepare-message
@ -91,47 +82,39 @@
(when message-to-be-removed
(hide-message chat-id message-to-be-removed))
(fn [{:keys [db]}]
{:db (cond->
(-> db
;; We should not be always adding to the list, as it does not make sense
;; if the chat has not been initialized, but run into
;; some troubles disabling it, so next time
(update-in [:chats chat-id :messages] assoc message-id prepared-message)
(update-in [:chats chat-id :message-list] message-list/add prepared-message))
(and (not seen-by-user?)
(not= from current-public-key))
(update-in [:chats chat-id :loaded-unviewed-messages-ids]
(fnil conj #{}) message-id))}))))
{:db (cond-> (-> db
;; We should not be always adding to the list, as it does not make sense
;; if the chat has not been initialized, but run into
;; some troubles disabling it, so next time
(update-in [:chats chat-id :messages] assoc message-id prepared-message)
(update-in [:chats chat-id :message-list] message-list/add prepared-message))
(and (not seen-by-user?)
(not= from current-public-key))
(update-in [:chats chat-id :loaded-unviewed-messages-ids]
(fnil conj #{}) message-id))}))))
(fx/defn add-received-message
[{:keys [db] :as cofx}
{:keys [chat-id
clock-value] :as message}]
(let [{:keys [loaded-chat-id
view-id
current-chat-id]} db
cursor-clock-value (get-in db [:chats current-chat-id :cursor-clock-value])
current-chat? (= chat-id loaded-chat-id)]
{:keys [chat-id clock-value] :as message}]
(let [{:keys [loaded-chat-id view-id current-chat-id]} db
cursor-clock-value (get-in db [:chats current-chat-id :cursor-clock-value])
current-chat? (= chat-id loaded-chat-id)]
(when current-chat?
;; If we don't have any hidden message or the hidden message is before
;; this one, we add the message to the UI
(if (or (not @view.state/first-not-visible-item)
(<= (:clock-value @view.state/first-not-visible-item)
clock-value))
(add-message cofx {:message message
(add-message cofx {:message message
:seen-by-user? (and current-chat?
(= view-id :chat))})
;; Not in the current view, set all-loaded to false
;; and offload to db and update cursor if necessary
{:db (cond-> db
(>= clock-value
cursor-clock-value)
{:db (cond-> (assoc-in db [:chats chat-id :all-loaded?] false)
(>= clock-value cursor-clock-value)
(update-in [:chats chat-id] assoc
:cursor (chat-loading/clock-value->cursor clock-value)
:cursor-clock-value clock-value)
:always
(assoc-in [:chats chat-id :all-loaded?] false))}))))
:cursor-clock-value clock-value))}))))
(defn- message-loaded?
[{:keys [db]} {:keys [chat-id message-id]}]
@ -143,14 +126,6 @@
(get-in db [:chats chat-id])]
(>= deleted-at-clock-value clock-value)))
(fx/defn offload-message-from [{:keys [db] :as cofx} chat-id message-id]
(let [old-messages (get-in db [:chats chat-id :messages])]
(when-let [last-clock-value (get-in old-messages [message-id :clock-value])]
(let [new-messages (select-keys old-messages (for [[k v] old-messages :when (<= last-clock-value (:clock-value v))] k))]
(fx/merge cofx
{:db (assoc-in db [:chats chat-id :messages] new-messages)}
(rebuild-message-list chat-id))))))
(defn extract-chat-id [cofx {:keys [chat-id from message-type]}]
"Validate and return a valid chat-id"
(cond
@ -165,10 +140,7 @@
(= constants/message-type-one-to-one message-type) from))
(fx/defn update-unviewed-count
[{:keys [now db] :as cofx} {:keys [chat-id
from
message-type
message-id] :as message}]
[{:keys [db] :as cofx} {:keys [chat-id from message-type message-id]}]
(when-not (= message-type constants/message-type-private-group-system-message)
(let [{:keys [current-chat-id view-id]} db
chat-view? (= :chat view-id)
@ -200,9 +172,53 @@
(fx/merge cofx
(add-received-message message-with-chat-id)
(update-unviewed-count message-with-chat-id)
(chat-model/join-time-messages-checked chat-id)
(when platform/desktop?
(chat-model/update-dock-badge-label))))))))
(chat-model/join-time-messages-checked chat-id)))))))
;;TODO currently we process every message, we need to precess them by batches
;;or better move processing to status-go
#_((fx/defn add-received-messages
[{:keys [db] :as cofx} grouped-messages]
(when-let [messages (get grouped-messages (:loaded-chat-id db))]
(apply fx/merge cofx (map add-received-message messages))))
(defn reduce-count-messages [me]
(fn [acc chat-id messages]
(assoc acc chat-id
(remove #(or
(= (:message-type %)
constants/message-type-private-group-system-message)
(= (:from %) me))
messages))))
(defn reduce-chat-messages [chat-view? current-chat-id]
(fn [acc chat-id messages]
(if (and chat-view? (= current-chat-id chat-id))
(data-store.messages/mark-messages-seen acc current-chat-id (map :message-id messages) nil)
(update-in acc [:db :chats chat-id :unviewed-messages-count] + (count messages)))))
(fx/defn update-unviewed-counts
[{:keys [db] :as cofx} grouped-messages]
(let [{:keys [current-chat-id view-id]} db
me (multiaccounts.model/current-public-key cofx)
messages (reduce-kv (reduce-count-messages me)
{}
grouped-messages)]
(when (seq messages)
(reduce-kv (reduce-chat-messages (= :chat view-id) current-chat-id) {:db db} messages))))
(fx/defn receive [cofx messages]
(when-let [grouped-messages
(->> (into []
(comp
(map #(assoc % :chat-id (extract-chat-id cofx %)))
(remove #(earlier-than-deleted-at? cofx %)))
messages)
(group-by :chat-id))]
(when (seq grouped-messages)
(fx/merge cofx
(add-received-messages grouped-messages)
(update-unviewed-counts grouped-messages)
(chat-model/join-time-messages-checked-for-chats (keys grouped-messages)))))))
;;;; Send message

View File

@ -60,10 +60,13 @@
(multiaccounts.update/multiaccount-update :last-updated last-updated {:dont-sync? true})
(multiaccounts.update/multiaccount-update :photo-path photo-path {:dont-sync? true}))))
(fx/defn ensure-contact
[{:keys [db]}
{:keys [public-key] :as contact}]
{:db (update-in db [:contacts/contacts public-key] merge contact)})
(fx/defn ensure-contacts
[{:keys [db]} contacts]
{:db (update db :contacts/contacts
#(reduce (fn [acc {:keys [public-key] :as contact}]
(update acc public-key merge contact))
%
contacts))})
(fx/defn upsert-contact
[{:keys [db] :as cofx}

View File

@ -167,18 +167,28 @@
([db public-key]
(active? (get-in db [:contacts/contacts public-key]))))
;;TODO TTT
#_(defn enrich-ttt-contact
[{:keys [system-tags tribute-to-talk] :as contact}]
(let [tribute (:snt-amount tribute-to-talk)
tribute-status (tribute-to-talk.db/tribute-status contact)
tribute-label (tribute-to-talk.db/status-label tribute-status tribute)]
(-> contact
(assoc-in [:tribute-to-talk :tribute-status] tribute-status)
(assoc-in [:tribute-to-talk :tribute-label] tribute-label)
(assoc :pending? (pending? contact)
:blocked? (blocked? contact)
:active? (active? contact)
:added? (contains? system-tags :contact/added)))))
(defn enrich-contact
[{:keys [system-tags tribute-to-talk] :as contact}]
(let [tribute (:snt-amount tribute-to-talk)
tribute-status (tribute-to-talk.db/tribute-status contact)
tribute-label (tribute-to-talk.db/status-label tribute-status tribute)]
(-> contact
(assoc-in [:tribute-to-talk :tribute-status] tribute-status)
(assoc-in [:tribute-to-talk :tribute-label] tribute-label)
(assoc :pending? (pending? contact)
:blocked? (blocked? contact)
:active? (active? contact)
:added? (contains? system-tags :contact/added)))))
[{:keys [system-tags] :as contact}]
(-> contact
(dissoc :ens-verified-at :ens-verification-retries)
(assoc :pending? (pending? contact)
:blocked? (blocked? contact)
:active? (active? contact)
:added? (contains? system-tags :contact/added))))
(defn enrich-contacts
[contacts]

View File

@ -216,8 +216,12 @@
:name (:name metadata)
:device-type (:deviceType metadata))})
(fx/defn handle-installation [{:keys [db]} {:keys [id] :as i}]
{:db (assoc-in db [:pairing/installations id] (installation<-rpc i))})
(fx/defn handle-installations [{:keys [db]} installations]
{:db (update db :pairing/installations #(reduce
(fn [acc {:keys [id] :as i}]
(update acc id merge (installation<-rpc i)))
%
installations))})
(fx/defn load-installations [{:keys [db]} installations]
{:db (assoc db :pairing/installations (reduce

View File

@ -273,6 +273,13 @@
(let [chat (get-in db [:chats chat-id])]
(load-filter-fx (waku/enabled? cofx) (->filter-request chat)))))
(fx/defn load-chats
"Check if a filter already exists for that chat, otherw load the filter"
[{:keys [db] :as cofx} chats]
(let [chats (filter #(chat-loaded? db (:chat-id %)) chats)]
(when (and (filters-initialized? db) (seq chats))
(load-filter-fx (waku/enabled? cofx) (chats->filter-requests chats)))))
(fx/defn load-contact
"Check if we already have a filter for that contact, otherwise load the filter
if the contact has been added"

View File

@ -1,23 +1,14 @@
(ns ^{:doc "Definition of the StatusMessage protocol"}
status-im.transport.message.core
(:require [goog.object :as o]
[re-frame.core :as re-frame]
[status-im.chat.models.message :as models.message]
(:require [status-im.chat.models.message :as models.message]
[status-im.chat.models :as models.chat]
[status-im.contact.core :as models.contact]
[status-im.pairing.core :as models.pairing]
[status-im.data-store.messages :as data-store.messages]
[status-im.data-store.contacts :as data-store.contacts]
[status-im.data-store.chats :as data-store.chats]
[status-im.constants :as constants]
[status-im.utils.handlers :as handlers]
[status-im.ethereum.json-rpc :as json-rpc]
[status-im.ethereum.core :as ethereum]
[status-im.native-module.core :as status]
[status-im.ens.core :as ens]
[status-im.utils.fx :as fx]
[taoensso.timbre :as log]
[status-im.ethereum.json-rpc :as json-rpc]
[status-im.utils.types :as types]))
(defn- js-obj->seq [obj]
@ -27,13 +18,11 @@
(aget obj i))
[obj]))
(fx/defn handle-chat [cofx chat]
;; :unviewed-messages-count is managed by status-react, so we don't copy
;; over it
(models.chat/ensure-chat cofx (dissoc chat :unviewed-messages-count)))
(fx/defn handle-chats [cofx chats]
(models.chat/ensure-chats cofx chats))
(fx/defn handle-contact [cofx contact]
(models.contact/ensure-contact cofx contact))
(fx/defn handle-contacts [cofx contacts]
(models.contact/ensure-contacts cofx contacts))
(fx/defn handle-message [cofx message]
(models.message/receive-one cofx message))
@ -45,25 +34,28 @@
messages (.-messages response-js)]
(cond
(seq installations)
(let [installation (.pop installations)]
(let [installations-clj (types/js->clj installations)]
(js-delete response-js "installations")
(fx/merge cofx
{:utils/dispatch-later [{:ms 20 :dispatch [::process response-js]}]}
(models.pairing/handle-installation (types/js->clj installation))))
(models.pairing/handle-installations installations-clj)))
(seq contacts)
(let [contact (.pop contacts)]
(fx/merge cofx
;;TODO temporary fix for release, we have and issue with contacts updates , UI is really slow
;;we need to inspect all subsctiptions and views, but for now to temporary make it better
;; we use dispatch instead dispatch-later
{:dispatch [::process response-js]}
(handle-contact (-> contact (types/js->clj) (data-store.contacts/<-rpc)))))
(seq chats)
(let [chat (.pop chats)]
(let [contacts-clj (types/js->clj contacts)]
(js-delete response-js "contacts")
(fx/merge cofx
{:utils/dispatch-later [{:ms 20 :dispatch [::process response-js]}]}
(handle-chat (-> chat (types/js->clj) (data-store.chats/<-rpc)))))
(handle-contacts (map data-store.contacts/<-rpc contacts-clj))))
(seq chats)
(let [chats-clj (types/js->clj chats)]
(js-delete response-js "chats")
(fx/merge cofx
{:utils/dispatch-later [{:ms 20 :dispatch [::process response-js]}]}
(handle-chats (map #(-> %
(data-store.chats/<-rpc)
(dissoc :unviewed-messages-count))
chats-clj))))
(seq messages)
(let [message (.pop messages)]