Fast message grouping/sorting

Signed-off-by: Julien Eluard <julien.eluard@gmail.com>
This commit is contained in:
janherich 2018-05-10 17:19:51 +02:00 committed by Julien Eluard
parent 32d3b16f83
commit e1da12d8a2
No known key found for this signature in database
GPG Key ID: 6FD7DB5437FCBEF6
13 changed files with 302 additions and 187 deletions

View File

@ -70,15 +70,20 @@
(handlers/register-handler-fx
:load-more-messages
[(re-frame/inject-cofx :data-store/get-messages)]
(fn [{{:keys [current-chat-id] :as db} :db get-stored-messages :get-stored-messages} _]
(fn [{{:keys [current-chat-id] :as db} :db get-stored-messages :get-stored-messages :as cofx} _]
(when-not (get-in db [:chats current-chat-id :all-loaded?])
(let [loaded-count (count (get-in db [:chats current-chat-id :messages]))
new-messages (index-messages (get-stored-messages current-chat-id loaded-count))]
new-messages (get-stored-messages current-chat-id loaded-count)
indexed-messages (index-messages new-messages)]
(handlers-macro/merge-fx
cofx
{:db (-> db
(update-in [:chats current-chat-id :messages] merge new-messages)
(update-in [:chats current-chat-id :not-loaded-message-ids] #(apply disj % (keys new-messages)))
(update-in [:chats current-chat-id :messages] merge indexed-messages)
(update-in [:chats current-chat-id :not-loaded-message-ids]
#(apply disj % (keys indexed-messages)))
(assoc-in [:chats current-chat-id :all-loaded?]
(> constants/default-number-of-messages (count new-messages))))}))))
(> constants/default-number-of-messages (count new-messages))))}
(models.message/group-messages current-chat-id new-messages))))))
(handlers/register-handler-db
:message-appeared
@ -173,6 +178,13 @@
(vals contacts-to-add))]}
(events.loading/load-commands))))
(defn- group-chat-messages
[{:keys [db]}]
(reduce-kv (fn [fx chat-id {:keys [messages]}]
(models.message/group-messages chat-id (vals messages) fx))
{:db db}
(:chats db)))
(handlers/register-handler-fx
:initialize-chats
[(re-frame/inject-cofx :get-default-contacts)
@ -206,6 +218,7 @@
(handlers-macro/merge-fx cofx
{:db (assoc db :chats chats)}
(init-console-chat)
(group-chat-messages)
(add-default-contacts)))))
(handlers/register-handler-fx

View File

@ -44,13 +44,19 @@
;; regular non command message, we can add it right away
(message-model/receive message cofx))))
(defn add-messages [[messages] {:keys [db] :as cofx}]
(handlers-macro/merge-effects cofx add-message messages))
(defn add-messages [messages {:keys [db] :as cofx}]
(let [messages-to-add (filter (partial message-model/add-to-chat? cofx) messages)
plain-messages (remove (comp :command :content) messages-to-add)
command-messages (filter (comp :command :content) messages-to-add)]
(handlers-macro/merge-effects (message-model/receive-many plain-messages cofx)
cofx
add-message
command-messages)))
(handlers/register-handler-fx
:chat-received-message/add
message-model/receive-interceptors
(fn [cofx messages]
(fn [cofx [messages]]
(add-messages messages cofx)))
;; TODO(alwx): refactor this when status-im.commands.handlers.jail is refactored

View File

@ -3,6 +3,7 @@
[status-im.constants :as constants]
[status-im.utils.core :as utils]
[status-im.utils.ethereum.core :as ethereum]
[status-im.utils.datetime :as time]
[status-im.chat.events.console :as console-events]
[status-im.chat.events.requests :as requests-events]
[status-im.chat.models :as chat-model]
@ -33,20 +34,77 @@
(defn- prepare-message
[{:keys [content] :as message} chat-id current-chat?]
;; TODO janherich: enable the animations again once we can do them more efficiently
(cond-> (assoc message :appearing? true)
(not current-chat?) (assoc :appearing? false)
(emoji-only-content? content) (assoc :content-type constants/content-type-emoji)))
(defn- re-index-message-groups
"Relative datemarks of message groups can get obsolete with passing time,
this function re-indexes them for given chat"
[chat-id {:keys [db]}]
(let [chat-messages (get-in db [:chats chat-id :messages])]
{:db (update-in db
[:chats chat-id :message-groups]
(partial reduce-kv (fn [groups datemark message-refs]
(let [new-datemark (->> message-refs
first
:message-id
(get chat-messages)
:timestamp
time/day-relative)]
(if (= datemark new-datemark)
;; nothing to re-index
(assoc groups datemark message-refs)
;; relative datemark shifted, reindex
(assoc groups new-datemark message-refs))))
{}))}))
(defn- sort-references
"Sorts message-references sequence primary by clock value,
breaking ties by `:message-id`"
[messages message-references]
(sort-by (juxt (comp :clock-value (partial get messages) :message-id)
:message-id)
message-references))
(defn- group-messages
"Takes chat-id, new messages + cofx and properly groups them
into the `:message-groups`index in db"
[chat-id messages {:keys [db]}]
{:db (reduce (fn [db [datemark grouped-messages]]
(update-in db [:chats chat-id :message-groups datemark]
(fn [message-references]
(->> grouped-messages
(map (fn [{:keys [message-id timestamp]}]
{:message-id message-id
:timestamp-str (time/timestamp->time timestamp)}))
(into (or message-references '()))
(sort-references (get-in db [:chats chat-id :messages]))))))
db
(group-by (comp time/day-relative :timestamp)
(filter :show? messages)))})
(defn- add-message
[chat-id {:keys [message-id clock-value content] :as message} current-chat? {:keys [db]}]
[batch? {:keys [chat-id message-id clock-value content] :as message} current-chat? {:keys [db] :as cofx}]
(let [prepared-message (prepare-message message chat-id current-chat?)]
{:db (cond->
(let [fx {:db (cond->
(-> db
(update-in [:chats chat-id :messages] assoc message-id prepared-message)
(update-in [:chats chat-id :last-clock-value] (partial utils.clocks/receive clock-value))) ; this will increase last-clock-value twice when sending our own messages
;; this will increase last-clock-value twice when sending our own messages
(update-in [:chats chat-id :last-clock-value] (partial utils.clocks/receive clock-value)))
(not current-chat?)
(update-in [:chats chat-id :unviewed-messages] (fnil conj #{}) message-id))
:data-store/tx [(messages-store/save-message-tx prepared-message)]}))
:data-store/tx [(messages-store/save-message-tx prepared-message)]}]
(if batch?
fx
(handlers-macro/merge-fx cofx
fx
(re-index-message-groups chat-id)
(group-messages chat-id [message]))))))
(def ^:private- add-single-message (partial add-message false))
(def ^:private- add-batch-message (partial add-message true))
(defn- prepare-chat [chat-id {:keys [db now] :as cofx}]
(chat-model/upsert-chat {:chat-id chat-id
@ -57,9 +115,11 @@
(transport/send (protocol/map->MessagesSeen {:message-ids #{message-id}}) chat-id cofx)))
(defn- add-received-message
[{:keys [from message-id chat-id content content-type timestamp clock-value to-clock-value] :as message}
[batch?
{:keys [from message-id chat-id content content-type timestamp clock-value to-clock-value js-obj] :as message}
{:keys [db now] :as cofx}]
(let [{:keys [current-chat-id
(let [{:keys [web3
current-chat-id
view-id
access-scope->commands-responses]
:contacts/keys [contacts]} db
@ -70,15 +130,14 @@
request-command (:request-command content)
command-request? (and (= content-type constants/content-type-command-request)
request-command)
new-timestamp (or timestamp now)]
new-timestamp (or timestamp now)
add-message-fn (if batch? add-batch-message add-single-message)]
(handlers-macro/merge-fx cofx
(add-message chat-id
(cond-> (assoc message
:timestamp new-timestamp
:show? true)
{:confirm-message-processed [{:web3 web3
:js-obj js-obj}]}
(add-message-fn (cond-> (assoc message :timestamp new-timestamp)
public-key
(assoc :user-statuses {public-key (if current-chat? :seen :received)})
(not clock-value)
(assoc :clock-value (utils.clocks/send last-clock-value)) ; TODO (cammeelos): for backward compatibility, we use received time to be removed when not an issue anymore
command-request?
@ -86,26 +145,43 @@
(lookup-response-ref access-scope->commands-responses
current-account chat contacts request-command)))
current-chat?)
(requests-events/add-request chat-id message-id)
(send-message-seen chat-id message-id (and public-key
(not public?)
current-chat?
(not (chat-model/bot-only-chat? db chat-id))
(not (= constants/system from)))))))
(defn confirm-messages-processed [js-obj {{:keys [web3]} :db}]
{:confirm-message-processed [{:web3 web3
:js-obj js-obj}]})
(def ^:private add-single-received-message (partial add-received-message false))
(def ^:private add-batch-received-message (partial add-received-message true))
(defn receive
[{:keys [chat-id message-id js-obj] :as message} {:keys [now] :as cofx}]
[{:keys [chat-id message-id] :as message} {:keys [now] :as cofx}]
(handlers-macro/merge-fx cofx
(chat-model/upsert-chat {:chat-id chat-id
;; We activate a chat again on new messages
:is-active true
:timestamp now})
(add-received-message message)
(requests-events/add-request chat-id message-id)
(confirm-messages-processed js-obj)))
(add-single-received-message message)))
(defn receive-many
[messages {:keys [now] :as cofx}]
(let [chat-ids (into #{} (map :chat-id) messages)
chat-effects (handlers-macro/merge-effects cofx
(fn [chat-id cofx]
(chat-model/upsert-chat {:chat-id chat-id
:is-active true
:timestamp now}
cofx))
chat-ids)
message-effects (handlers-macro/merge-effects chat-effects cofx add-batch-received-message messages)]
(handlers-macro/merge-effects message-effects
cofx
(fn [chat-id cofx]
(handlers-macro/merge-fx cofx
(re-index-message-groups chat-id)
(group-messages chat-id messages)))
chat-ids)))
(defn system-message [chat-id message-id timestamp content]
{:message-id message-id
@ -214,7 +290,7 @@
(handlers-macro/merge-fx cofx
(chat-model/upsert-chat {:chat-id chat-id
:timestamp now})
(add-message chat-id message-with-id true)
(add-single-message message-with-id true)
(send chat-id message-id send-record))))
(defn send-push-notification [fcm-token status cofx]
@ -238,9 +314,27 @@
(send chat-id message-id send-record)
(update-message-status message :sending))))
(defn delete-message [chat-id message-id {:keys [db]}]
(defn- remove-message-from-group [chat-id {:keys [timestamp message-id]} {:keys [db]}]
(let [datemark (time/day-relative timestamp)]
{:db (update-in db [:chats chat-id :message-groups]
(fn [groups]
(let [message-references (get groups datemark)]
(if (= 1 (count message-references))
;; message removed is the only one in group, remove whole group
(dissoc groups datemark)
;; remove message from `message-references` list
(assoc groups datemark
(remove (comp (partial = message-id) :message-id)
message-references))))))}))
(defn delete-message
"Deletes chat message, along its occurence in all references, like `:message-groups`"
[chat-id message-id {:keys [db] :as cofx}]
(handlers-macro/merge-fx
cofx
{:db (update-in db [:chats chat-id :messages] dissoc message-id)
:data-store/tx [(messages-store/delete-message-tx message-id)]})
:data-store/tx [(messages-store/delete-message-tx message-id)]}
(remove-message-from-group chat-id (get-in db [:chats chat-id :messages message-id]))))
(defn send-message [{:keys [db now random-id] :as cofx} {:keys [chat-id] :as params}]
(upsert-and-send (prepare-plain-message chat-id params (get-in db [:chats chat-id]) now) cofx))

View File

@ -100,7 +100,7 @@
message-view]]]))
(defview messages-view [group-chat]
(letsubs [messages [:get-current-chat-messages]
(letsubs [messages [:get-current-chat-messages-stream]
chat-id [:get-current-chat-id]
current-public-key [:get-current-public-key]]
{:component-did-mount #(re-frame/dispatch [:set-chat-ui-props {:messages-focused? true

View File

@ -15,6 +15,7 @@
(s/def :chat/public-group-topic (s/nilable string?))
(s/def :chat/public-group-topic-error (s/nilable string?))
(s/def :chat/messages (s/nilable map?)) ; messages indexed by message-id
(s/def :chat/message-groups (s/nilable map?)) ; grouped/sorted messages
(s/def :chat/not-loaded-message-ids (s/nilable set?)) ; set of message-ids not yet fully loaded from persisted state
(s/def :chat/last-clock-value (s/nilable number?)) ; last logical clock value of messages in chat
(s/def :chat/loaded-chats (s/nilable seq?))

View File

@ -84,67 +84,37 @@
(fn [{:keys [messages]} [_ message-id]]
(get messages message-id)))
(defn- intersperse-datemark
"Reduce step which expects the input list of messages to be sorted by clock value.
It makes best effort to group them by day.
We cannot sort them by :timestamp, as that represents the clock of the sender
and we have no guarantees on the order.
(reg-sub
:get-current-chat-messages
:<- [:get-current-chat]
(fn [{:keys [messages]}]
(or messages {})))
We naively and arbitrarly group them assuming that out-of-order timestamps
fall in the previous bucket.
(reg-sub
:get-current-chat-message-groups
:<- [:get-current-chat]
(fn [{:keys [message-groups]}]
(or message-groups {})))
A sends M1 to B with timestamp 2000-01-01T00:00:00
B replies M2 with timestamp 1999-12-31-23:59:59
(defn sort-message-groups
"Sorts message groups according to timestamp of first message in group "
[message-groups messages]
(sort-by
(comp unchecked-negate :timestamp (partial get messages) :message-id first second)
message-groups))
M1 needs to be displayed before M2
so we bucket both in 1999-12-31"
[{:keys [acc last-timestamp last-datemark]} {:keys [timestamp datemark] :as msg}]
(cond (empty? acc) ; initial element
{:last-timestamp timestamp
:last-datemark datemark
:acc (conj acc msg)}
(and (not= last-datemark datemark) ; not the same day
(< timestamp last-timestamp)) ; not out-of-order
{:last-timestamp timestamp
:last-datemark datemark
:acc (conj acc {:value last-datemark ; intersperse datemark message
:type :datemark}
msg)}
:else
{:last-timestamp (max timestamp last-timestamp) ; use last datemark
:last-datemark last-datemark
:acc (conj acc (assoc msg :datemark last-datemark))}))
(defn sort-messages
"Remove hidden messages and sort by clock-value desc, breaking ties by message id"
[id->messages]
(->> id->messages
vals
(filter :show?)
(sort-by (juxt (comp unchecked-negate :clock-value) :message-id))))
(defn- add-datemark [{:keys [timestamp] :as msg}]
(assoc msg :datemark (time/day-relative timestamp)))
(defn- add-timestamp [{:keys [timestamp] :as msg}]
(assoc msg :timestamp-str (time/timestamp->time timestamp)))
(defn intersperse-datemarks
"Add a datemark in between an ordered seq of messages when two datemarks are not
the same. Ignore messages with out-of-order timestamps"
[messages]
(when (seq messages)
(let [messages-with-datemarks (transduce (comp
(map add-datemark)
(map add-timestamp))
(completing intersperse-datemark :acc)
{:acc []}
messages)]
; Append last datemark
(conj messages-with-datemarks {:value (:datemark (peek messages-with-datemarks))
:type :datemark}))))
(defn messages-with-datemarks
"Converts message groups into sequence of messages interspersed with datemarks"
[message-groups messages]
(mapcat (fn [[datemark message-references]]
(into (list {:value datemark
:type :datemark})
(map (fn [{:keys [message-id timestamp-str]}]
(assoc (get messages message-id)
:datemark datemark
:timestamp-str timestamp-str)))
message-references))
message-groups))
(defn- set-previous-message-info [stream]
(let [{:keys [display-photo?] :as previous-message} (peek stream)]
@ -212,17 +182,13 @@
:stream))))
(reg-sub
:get-ordered-chat-messages
(fn [[_ chat-id]]
(subscribe [:get-chat chat-id]))
(fn [{:keys [messages]}]
(sort-messages messages)))
(reg-sub
:get-current-chat-messages
:<- [:get-current-chat]
(fn [{:keys [messages]}]
(-> messages sort-messages intersperse-datemarks messages-stream)))
:get-current-chat-messages-stream
:<- [:get-current-chat-messages]
:<- [:get-current-chat-message-groups]
(fn [[messages message-groups]]
(-> (sort-message-groups message-groups messages)
(messages-with-datemarks messages)
messages-stream)))
(reg-sub
:get-commands-for-chat
@ -384,8 +350,14 @@
(reg-sub
:get-last-message
(fn [[_ chat-id]]
(subscribe [:get-ordered-chat-messages chat-id]))
first)
(subscribe [:get-chat chat-id]))
(fn [{:keys [messages message-groups]}]
(->> (sort-message-groups message-groups messages)
first
second
last
:message-id
(get messages))))
(reg-sub
:chat-animations

View File

@ -99,6 +99,7 @@
{:chat-received-message/add-fx
[(assoc (into {} this)
:message-id (transport.utils/message-id this)
:show? true
:chat-id chat-id
:from signature)]}))

View File

@ -232,6 +232,7 @@
:chat/public-group-topic
:chat/public-group-topic-error
:chat/messages
:chat/message-groups
:chat/not-loaded-message-ids
:chat/last-clock-value
:chat/loaded-chats

View File

@ -111,7 +111,7 @@
(let [_ (when (or (not @chat-id*) (not= @chat-id* chat-id))
(reset! chat-id* chat-id)
(js/setTimeout #(when scroll-ref (.scrollToEnd @scroll-ref)) 400))
messages (re-frame/subscribe [:get-current-chat-messages])
messages (re-frame/subscribe [:get-current-chat-messages-stream])
current-public-key (re-frame/subscribe [:get-current-public-key])]
[react/view {:style {:flex 1 :background-color :white :margin-horizontal 16}}
[react/scroll-view {:scrollEventThrottle 16

View File

@ -61,7 +61,9 @@
(handlers/register-handler-fx
:clear-history
(fn [{{:keys [current-chat-id] :as db} :db} _]
{:db (assoc-in db [:chats current-chat-id :messages] {})
{:db (-> db
(assoc-in [:chats current-chat-id :messages] {})
(assoc-in [:chats current-chat-id :message-groups] {}))
:data-store/tx [(messages-store/hide-messages-tx current-chat-id)]}))
(handlers/register-handler-fx

View File

@ -25,11 +25,15 @@
(select-keys new-fx mergable-keys)))
{:merging-fx-with-common-keys common-keys}))))
(defn merge-effects [{:keys [db] :as initial-cofx} handler args]
(defn merge-effects
([{:keys [db] :as cofx} handler args]
(merge-effects {:db db} cofx handler args))
([initial-fx {:keys [db] :as cofx} handler args]
(reduce (fn [fx arg]
(let [temp-cofx (update-db initial-cofx fx)]
(let [temp-cofx (update-db cofx fx)]
(safe-merge
fx
(handler arg temp-cofx))))
{:db db}
args))
(or initial-fx
{:db db})
args)))

View File

@ -1,7 +1,8 @@
(ns status-im.test.chat.models.message
(:require [cljs.test :refer-macros [deftest is testing]]
[status-im.transport.message.v1.protocol :as protocol]
[status-im.chat.models.message :as message]))
[status-im.chat.models.message :as message]
[status-im.utils.datetime :as time]))
(deftest add-to-chat?
(testing "it returns true when it's not in loaded message"
@ -33,7 +34,9 @@
:view-id :chat}}
message {:chat-id "chat-id"
:from "a"
:message-id "1"}
:message-id "1"
:clock-value 0
:timestamp 0}
extract-seen (comp :payload :message first :shh/post)]
(testing "it send a seen message when the chat is 1-to-1 and is open"
(is (instance? protocol/MessagesSeen
@ -59,3 +62,72 @@
(message/receive
message
(assoc-in db [:db :account/account :public-key] nil))))))))
(deftest group-messages
(let [cofx {:db {:chats {"chat-id" {:messages {0 {:message-id 0
:content "a"
:clock-value 0
:timestamp 0}
1 {:message-id 1
:content "b"
:clock-value 1
:timestamp 1}
2 {:message-id 2
:content "c"
:clock-value 2
:timestamp 2}
3 {:message-id 3
:content "d"
:clock-value 3
:timestamp 3}}}}}}
new-messages '({:message-id 1
:content "b"
:clock-value 1
:timestamp 1
:show? false}
{:message-id 2
:content "c"
:clock-value 2
:timestamp 2
:show? true}
{:message-id 3
:content "d"
:clock-value 3
:timestamp 3
:show? true})]
(testing "New messages are grouped/sorted correctly, hidden messages are not grouped"
(is (= '(2 3)
(map :message-id
(-> (get-in (message/group-messages "chat-id" new-messages cofx)
[:db :chats "chat-id" :message-groups])
first
second)))))))
(deftest delete-message
(let [timestamp (time/now)
cofx1 {:db {:chats {"chat-id" {:messages {0 {:message-id 0
:content "a"
:clock-value 0
:timestamp (- timestamp 1)}
1 {:message-id 1
:content "b"
:clock-value 1
:timestamp timestamp}}
:message-groups {"datetime-today" '({:message-id 1}
{:message-id 0})}}}}}
cofx2 {:db {:chats {"chat-id" {:messages {0 {:message-id 0
:content "a"
:clock-value 0
:timestamp timestamp}}
:message-groups {"datetime-today" '({:message-id 0})}}}}}
fx1 (message/delete-message "chat-id" 1 cofx1)
fx2 (message/delete-message "chat-id" 0 cofx2)]
(testing "Deleting message deletes it along with all references"
(is (= '(0)
(keys (get-in fx1 [:db :chats "chat-id" :messages]))))
(is (= {"datetime-today" '({:message-id 0})}
(get-in fx1 [:db :chats "chat-id" :message-groups])))
(is (= {}
(get-in fx2 [:db :chats "chat-id" :messages])))
(is (= {}
(get-in fx2 [:db :chats "chat-id" :message-groups]))))))

View File

@ -3,57 +3,6 @@
[status-im.constants :as const]
[status-im.chat.subs :as s]))
(deftest test-message-datemark-groups
(testing "it orders a map of messages by clock-values desc, breaking ties by message-id asc and removing hidden messages"
(let [message-1 {:show? true
:message-id "doesn't matter 1"
:clock-value 1}
message-2 {:show? true
:message-id "doesn't matter 2"
:clock-value 2}
message-3 {:show? true
:message-id "does matter 2"
:clock-value 3}
message-4 {:show? true
:message-id "does matter 1"
:clock-value 3}
hidden-message {:show? false
:clock-value 1}
unordered-messages (->> [message-1
message-2
message-3
message-4
hidden-message]
(map (juxt :message-id identity))
shuffle ; clojure maps are sorted for n <= 32
(into {}))]
(is (= [message-4
message-3
message-2
message-1] (s/sort-messages unordered-messages))))))
(deftest intersperse-datemarks
(testing "it mantains the order even when timestamps are across days"
(let [message-1 {:timestamp 946641600000} ; 1999}
message-2 {:timestamp 946728000000} ; 2000 this will displayed in 1999
message-3 {:timestamp 946641600000} ; 1999
message-4 {:timestamp 946728000000} ; 2000
ordered-messages [message-4
message-3
message-2
message-1]
[m1 d1 m2 m3 m4 d2] (s/intersperse-datemarks ordered-messages)]
(is (= "Jan 1, 2000"
(:datemark m1)))
(is (= {:type :datemark
:value "Jan 1, 2000"} d1))
(is (= "Dec 31, 1999"
(:datemark m2)
(:datemark m3)
(:datemark m4)))
(is (= {:type :datemark
:value "Dec 31, 1999"} d2)))))
(deftest message-stream-tests
(testing "messages with no interspersed datemarks"
(let [m1 {:from "1"