Fix message ordering for 1-1 chats

This commit is contained in:
janherich 2018-03-02 00:35:59 +01:00
parent 54546204ae
commit 7efcdcb150
No known key found for this signature in database
GPG Key ID: C23B473AFBE94D13
11 changed files with 254 additions and 122 deletions

View File

@ -1,7 +1,7 @@
(ns status-im.chat.console (ns status-im.chat.console
(:require [status-im.ui.components.styles :refer [default-chat-color]] (:require [status-im.ui.components.styles :refer [default-chat-color]]
[status-im.utils.random :as random] [status-im.utils.random :as random]
[status-im.constants :as const] [status-im.constants :as constants]
[status-im.i18n :as i18n] [status-im.i18n :as i18n]
[clojure.string :as string])) [clojure.string :as string]))
@ -9,29 +9,31 @@
:or {message-id (random/id)}}] :or {message-id (random/id)}}]
{:message-id message-id {:message-id message-id
:outgoing false :outgoing false
:chat-id const/console-chat-id :chat-id constants/console-chat-id
:from const/console-chat-id :from constants/console-chat-id
:to "me" :to "me"
:content content :content content
:content-type content-type}) :content-type content-type})
(def chat (def chat
{:chat-id const/console-chat-id {:chat-id constants/console-chat-id
:name (string/capitalize const/console-chat-id) :name (string/capitalize constants/console-chat-id)
:color default-chat-color :color default-chat-color
:group-chat false :group-chat false
:is-active true :is-active true
:unremovable? true :unremovable? true
:timestamp (.getTime (js/Date.)) :timestamp (.getTime (js/Date.))
:photo-path const/console-chat-id :photo-path constants/console-chat-id
:contacts [{:identity const/console-chat-id :contacts [{:identity constants/console-chat-id
:text-color "#FFFFFF" :text-color "#FFFFFF"
:background-color "#AB7967"}]}) :background-color "#AB7967"}]
:last-to-clock-value 0
:last-from-clock-value 0})
(def contact (def contact
{:whisper-identity const/console-chat-id {:whisper-identity constants/console-chat-id
:name (string/capitalize const/console-chat-id) :name (string/capitalize constants/console-chat-id)
:photo-path const/console-chat-id :photo-path constants/console-chat-id
:dapp? true :dapp? true
:unremovable? true :unremovable? true
:bot-url "local://console-bot" :bot-url "local://console-bot"

View File

@ -73,13 +73,15 @@
:create-new-public-chat :create-new-public-chat
(fn [{:keys [db]} [_ topic]] (fn [{:keys [db]} [_ topic]]
(let [exists? (boolean (get-in db [:chats topic])) (let [exists? (boolean (get-in db [:chats topic]))
chat {:chat-id topic chat {:chat-id topic
:name topic :name topic
:color components.styles/default-chat-color :color components.styles/default-chat-color
:group-chat true :group-chat true
:public? true :public? true
:is-active true :is-active true
:timestamp (random/timestamp)}] :timestamp (random/timestamp)
:last-to-clock-value 0
:last-from-clock-value 0}]
(merge (merge
(when-not exists? (when-not exists?
{:db (assoc-in db [:chats (:chat-id chat)] chat) {:db (assoc-in db [:chats (:chat-id chat)] chat)
@ -129,16 +131,18 @@
group-name group-name
(group-name-from-contacts contacts selected-contacts username)) (group-name-from-contacts contacts selected-contacts username))
{:keys [public private]} (protocol/new-keypair!)] {:keys [public private]} (protocol/new-keypair!)]
{:chat-id (random/id) {:chat-id (random/id)
:public-key public :public-key public
:private-key private :private-key private
:name chat-name :name chat-name
:color components.styles/default-chat-color :color components.styles/default-chat-color
:group-chat true :group-chat true
:group-admin current-public-key :group-admin current-public-key
:is-active true :is-active true
:timestamp (random/timestamp) :timestamp (random/timestamp)
:contacts selected-contacts'})) :contacts selected-contacts'
:last-to-clock-value 0
:last-from-clock-value 0}))
(handlers/register-handler-fx (handlers/register-handler-fx
:create-new-group-chat-and-open :create-new-group-chat-and-open

View File

@ -15,13 +15,15 @@
(defn- create-new-chat (defn- create-new-chat
[{:keys [db now]} chat-id] [{:keys [db now]} chat-id]
(let [name (get-in db [:contacts/contacts chat-id :name])] (let [name (get-in db [:contacts/contacts chat-id :name])]
{:chat-id chat-id {:chat-id chat-id
:name (or name (gfycat/generate-gfy chat-id)) :name (or name (gfycat/generate-gfy chat-id))
:color styles/default-chat-color :color styles/default-chat-color
:group-chat false :group-chat false
:is-active true :is-active true
:timestamp now :timestamp now
:contacts [{:identity chat-id}]})) :contacts [{:identity chat-id}]
:last-to-clock-value 0
:last-from-clock-value 0}))
(defn add-chat (defn add-chat
"Adds new chat to db & realm, if the chat with same id already exists, justs restores it" "Adds new chat to db & realm, if the chat with same id already exists, justs restores it"

View File

@ -25,7 +25,7 @@
(:ref (get available-commands-responses response-name)))) (:ref (get available-commands-responses response-name))))
(defn add-message-to-db (defn add-message-to-db
[db chat-id {:keys [message-id clock-value] :as message} current-chat?] [db chat-id {:keys [message-id from-clock-value to-clock-value] :as message} current-chat?]
(let [prepared-message (cond-> (assoc message (let [prepared-message (cond-> (assoc message
:chat-id chat-id :chat-id chat-id
:appearing? true) :appearing? true)
@ -33,43 +33,57 @@
(assoc :appearing? false))] (assoc :appearing? false))]
(cond-> (-> db (cond-> (-> db
(update-in [:chats chat-id :messages] assoc message-id prepared-message) (update-in [:chats chat-id :messages] assoc message-id prepared-message)
(update-in [:chats chat-id :last-clock-value] (fnil max 0) clock-value)) (update-in [:chats chat-id :last-from-clock-value] max from-clock-value)
(update-in [:chats chat-id :last-to-clock-value] max to-clock-value))
(not current-chat?) (not current-chat?)
(update-in [:chats chat-id :unviewed-messages] (fnil conj #{}) message-id)))) (update-in [:chats chat-id :unviewed-messages] (fnil conj #{}) message-id))))
;; We start with [0 0] ([from-clock-value to-clock-value]) for each participant of 1-1 chat (local perspective on each device).
;; Now for sending, we always increment the to-clock-value and include it in message payload being sent (so only to-clock-value is present in network message).
;; Locally, the sent message always replicates the latest-from-clock-value of the chat.
;; Upon receiving message, receiver reads the to-clock-value of the received message and sets that to be the from-clock-value locally
;; (this will be also the new latest-from-clock-value of the chat), to-clock-value for the message is the latest-to-clock-value of the 1-1 chat`.
;; All this ensures, that there will be no [from-clock-value to-clock-value] duplicate in chat message on each device + the local order will appear consistent,
;; even if its possible it wont be the same on both devices (for example user A sends 5 messages, during the sending,
;; he receives the message from user B, so his local order will be A1, A2, B, A3, A4, A5, but his messages will take a long time to reach user B,
;; for some reason, so user B will see it as B, A1, A2, A3, A4, A5).
;; I dont think thats very problematic and I dont think we can do much about it without single source of truth where order received messages are serialised
;; and definite order is established (server), it is the case even in the current implementation.
(defn receive (defn receive
[{:keys [db now] :as cofx} [{:keys [db now] :as cofx}
{:keys [from group-id chat-id content-type content message-id timestamp clock-value] {:keys [from group-id chat-id content-type content message-id timestamp from-clock-value to-clock-value]
:as message}] :as message}]
(let [{:keys [current-chat-id view-id (let [{:keys [current-chat-id view-id
access-scope->commands-responses] :contacts/keys [contacts]} db access-scope->commands-responses] :contacts/keys [contacts]} db
{:keys [public-key] :as current-account} (get-current-account db) {:keys [public-key] :as current-account} (get-current-account db)
chat-identifier (or group-id chat-id from) chat-identifier (or group-id chat-id from)
current-chat? (and (= :chat view-id) current-chat? (and (= :chat view-id)
(= current-chat-id chat-identifier)) (= current-chat-id chat-identifier))
fx (if (get-in db [:chats chat-identifier]) fx (if (get-in db [:chats chat-identifier])
(chat-model/upsert-chat cofx {:chat-id chat-identifier (chat-model/upsert-chat cofx {:chat-id chat-identifier
:group-chat (boolean group-id)}) :group-chat (boolean group-id)})
(chat-model/add-chat cofx chat-identifier)) (chat-model/add-chat cofx chat-identifier))
chat (get-in fx [:db :chats chat-identifier]) {:keys [last-from-clock-value
command-request? (= content-type constants/content-type-command-request) last-to-clock-value]} (get-in fx [:db :chats chat-identifier])
command (:command content) command-request? (= content-type constants/content-type-command-request)
enriched-message (cond-> (assoc message command (:command content)
:chat-id chat-identifier enriched-message (cond-> (assoc message
:timestamp (or timestamp now) :chat-id chat-identifier
:show? true :timestamp (or timestamp now)
:clock-value (clocks-utils/receive :show? true
clock-value :to-clock-value last-to-clock-value
(:last-clock-value chat))) :from-clock-value (or to-clock-value (inc last-from-clock-value)))
public-key public-key
(assoc :user-statuses {public-key (if current-chat? :seen :received)}) (assoc :user-statuses {public-key (if current-chat? :seen :received)})
(and command command-request?) (and command command-request?)
(assoc-in [:content :content-command-ref] (assoc-in [:content :content-command-ref]
(lookup-response-ref access-scope->commands-responses (lookup-response-ref access-scope->commands-responses
current-account current-account
(get-in fx [:db :chats chat-identifier]) (get-in fx [:db :chats chat-identifier])
contacts contacts
command)))] command)))]
(cond-> (-> fx (cond-> (-> fx
(update :db add-message-to-db chat-identifier enriched-message current-chat?) (update :db add-message-to-db chat-identifier enriched-message current-chat?)
(assoc :save-message (dissoc enriched-message :new?))) (assoc :save-message (dissoc enriched-message :new?)))
@ -140,7 +154,7 @@
(defn- generate-message (defn- generate-message
[{:keys [network-status]} chat-id message] [{:keys [network-status]} chat-id message]
(assoc (select-keys message [:from :message-id]) (assoc (select-keys message [:from :message-id])
:payload (cond-> (select-keys message [:content :content-type :clock-value :timestamp :show?]) :payload (cond-> (select-keys message [:content :content-type :to-clock-value :timestamp :show?])
(= :offline network-status) (= :offline network-status)
(assoc :show? false)))) (assoc :show? false))))
@ -158,14 +172,17 @@
(cond (cond
(and group-chat (not public?)) (and group-chat (not public?))
(let [{:keys [public-key private-key]} (get chats chat-id)] (let [{:keys [public-key private-key]} (get chats chat-id)]
{:send-group-message (assoc options {:send-group-message (-> options
:group-id chat-id (update-in [:message :payload] dissoc :to-clock-value)
:keypair {:public public-key (assoc :group-id chat-id
:private private-key})}) :keypair {:public public-key
:private private-key}))})
(and group-chat public?) (and group-chat public?)
{:send-public-group-message (assoc options :group-id chat-id {:send-public-group-message (-> options
:username (get-in accounts [current-account-id :name]))} (update-in [:message :payload] dissoc :to-clock-value)
(assoc :group-id chat-id
:username (get-in accounts [current-account-id :name])))}
:else :else
(merge {:send-message (assoc-in options [:message :to] chat-id)} (merge {:send-message (assoc-in options [:message :to] chat-id)}
@ -175,16 +192,17 @@
(defn- prepare-message [params chat now random-id] (defn- prepare-message [params chat now random-id]
(let [{:keys [chat-id identity message-text]} params (let [{:keys [chat-id identity message-text]} params
{:keys [group-chat public? last-clock-value]} chat {:keys [group-chat public? last-from-clock-value last-to-clock-value]} chat
message {:message-id random-id message {:message-id random-id
:chat-id chat-id :chat-id chat-id
:content message-text :content message-text
:from identity :from identity
:content-type constants/text-content-type :content-type constants/text-content-type
:outgoing true :outgoing true
:timestamp now :timestamp now
:clock-value (clocks-utils/send last-clock-value) :from-clock-value last-from-clock-value
:show? true}] :to-clock-value (inc last-to-clock-value)
:show? true}]
(cond-> message (cond-> message
(not group-chat) (not group-chat)
(assoc :message-type :user-message (assoc :message-type :user-message
@ -208,7 +226,7 @@
(merge fx (send cofx params')))) (merge fx (send cofx params'))))
(defn- prepare-command (defn- prepare-command
[identity chat-id now clock-value [identity chat-id now last-from-clock-value last-to-clock-value
{request-params :params {request-params :params
request-command :command request-command :command
:keys [prefill prefillBotDb] :keys [prefill prefillBotDb]
@ -231,21 +249,22 @@
:short-preview (:short-preview command) :short-preview (:short-preview command)
:bot (or (:bot command) :bot (or (:bot command)
(:owner-id command)))] (:owner-id command)))]
{:message-id id {:message-id id
:from identity :from identity
:to chat-id :to chat-id
:timestamp now :timestamp now
:content content' :content content'
:content-type (or content-type :content-type (or content-type
(if request (if request
constants/content-type-command-request constants/content-type-command-request
constants/content-type-command)) constants/content-type-command))
:outgoing true :outgoing true
:to-message to-message :to-message to-message
:type (:type command) :type (:type command)
:has-handler (:has-handler command) :has-handler (:has-handler command)
:clock-value (clocks-utils/send clock-value) :to-clock-value (inc last-to-clock-value)
:show? true})) :from-clock-value last-from-clock-value
:show? true}))
(defn send-command (defn send-command
[{{:keys [current-public-key chats] :as db} :db :keys [now random-id-seq] :as cofx} add-to-chat-id params] [{{:keys [current-public-key chats] :as db} :db :keys [now random-id-seq] :as cofx} add-to-chat-id params]
@ -254,11 +273,12 @@
:as content} :command :as content} :command
chat-id :chat-id} params chat-id :chat-id} params
request (:request handler-data) request (:request handler-data)
last-clock-value (get-in chats [chat-id :last-clock-value]) {:keys [last-to-clock-value
last-from-clock-value]} (get chats chat-id)
hidden-params (->> (:params command) hidden-params (->> (:params command)
(filter :hidden) (filter :hidden)
(map :name)) (map :name))
command' (prepare-command current-public-key chat-id now last-clock-value request content) command' (prepare-command current-public-key chat-id now last-to-clock-value last-from-clock-value request content)
params' (assoc params :command command') params' (assoc params :command command')
fx (-> (chat-model/upsert-chat cofx {:chat-id chat-id}) fx (-> (chat-model/upsert-chat cofx {:chat-id chat-id})
(update :db add-message-to-db chat-id command' true) (update :db add-message-to-db chat-id command' true)

View File

@ -90,10 +90,11 @@
(defn message-datemark-groups (defn message-datemark-groups
"Transforms map of messages into sequence of `[datemark messages]` tuples, where "Transforms map of messages into sequence of `[datemark messages]` tuples, where
messages with particular datemark are sorted according to their `:clock-value` and messages with particular datemark are sorted according to their clock-values and
tuples themeselves are sorted according to the highest `:clock-value` in the messages." tuples themeselves are sorted according to the highest clock-values in the messages."
[id->messages] [id->messages]
(let [datemark->messages (transduce (comp (map second) (let [clock-sorter (juxt :from-clock-value :to-clock-value)
datemark->messages (transduce (comp (map second)
(filter :show?) (filter :show?)
(map (fn [{:keys [timestamp] :as msg}] (map (fn [{:keys [timestamp] :as msg}]
(assoc msg :datemark (time/day-relative timestamp))))) (assoc msg :datemark (time/day-relative timestamp)))))
@ -103,8 +104,9 @@
id->messages)] id->messages)]
(->> datemark->messages (->> datemark->messages
(map (fn [[datemark messages]] (map (fn [[datemark messages]]
[datemark (sort-by :clock-value > messages)])) [datemark (->> messages (sort-by clock-sorter) reverse)]))
(sort-by (comp :clock-value first second) >)))) (sort-by (comp clock-sorter first second))
reverse)))
(reg-sub (reg-sub
:get-chat-message-datemark-groups :get-chat-message-datemark-groups

View File

@ -7,10 +7,12 @@
(:refer-clojure :exclude [exists?])) (:refer-clojure :exclude [exists?]))
(defn- normalize-chat [{:keys [chat-id] :as chat}] (defn- normalize-chat [{:keys [chat-id] :as chat}]
(let [last-message (messages/get-last-message chat-id)] (let [last-to-clock-value (messages/get-last-clock-value chat-id :to-clock-value)
last-from-clock-value (messages/get-last-clock-value chat-id :from-clock-value)]
(-> chat (-> chat
(realm/fix-map->vec :contacts) (realm/fix-map->vec :contacts)
(assoc :last-clock-value (or (:clock-value last-message) 0))))) (assoc :last-to-clock-value (or last-to-clock-value 0))
(assoc :last-from-clock-value (or last-from-clock-value 0)))))
(defn get-all-active (defn get-all-active
[] []

View File

@ -52,11 +52,12 @@
(realm/page from (+ from number-of-messages)) (realm/page from (+ from number-of-messages))
realm/js-object->clj)) realm/js-object->clj))
(defn get-last-message (defn get-last-clock-value
[chat-id] [chat-id clock-prop]
(-> (realm/get-by-field @realm/account-realm :message :chat-id chat-id) (-> (realm/get-by-field @realm/account-realm :message :chat-id chat-id)
(realm/sorted :clock-value :desc) (realm/sorted clock-prop :desc)
(realm/single-clj))) (realm/single-clj)
(get clock-prop)))
(defn get-unviewed (defn get-unviewed
[current-public-key] [current-public-key]

View File

@ -20,7 +20,8 @@
[status-im.data-store.realm.schemas.account.v19.core :as v19] [status-im.data-store.realm.schemas.account.v19.core :as v19]
[status-im.data-store.realm.schemas.account.v20.core :as v20] [status-im.data-store.realm.schemas.account.v20.core :as v20]
[status-im.data-store.realm.schemas.account.v21.core :as v21] [status-im.data-store.realm.schemas.account.v21.core :as v21]
[status-im.data-store.realm.schemas.account.v22.core :as v22])) [status-im.data-store.realm.schemas.account.v22.core :as v22]
[status-im.data-store.realm.schemas.account.v23.core :as v23]))
;; TODO(oskarth): Add failing test if directory vXX exists but isn't in schemas. ;; TODO(oskarth): Add failing test if directory vXX exists but isn't in schemas.
@ -91,5 +92,7 @@
:migration v21/migration} :migration v21/migration}
{:schema v22/schema {:schema v22/schema
:schemaVersion 22 :schemaVersion 22
:migration v22/migration}]) :migration v22/migration}
{:schema v23/schema
:schemaVersion 23
:migration v23/migration}])

View File

@ -0,0 +1,64 @@
(ns status-im.data-store.realm.schemas.account.v23.core
(:require [status-im.data-store.realm.schemas.account.v22.chat :as chat]
[status-im.data-store.realm.schemas.account.v1.chat-contact :as chat-contact]
[status-im.data-store.realm.schemas.account.v19.contact :as contact]
[status-im.data-store.realm.schemas.account.v20.discover :as discover]
[status-im.data-store.realm.schemas.account.v23.message :as message]
[status-im.data-store.realm.schemas.account.v12.pending-message :as pending-message]
[status-im.data-store.realm.schemas.account.v1.processed-message :as processed-message]
[status-im.data-store.realm.schemas.account.v19.request :as request]
[status-im.data-store.realm.schemas.account.v19.user-status :as user-status]
[status-im.data-store.realm.schemas.account.v5.contact-group :as contact-group]
[status-im.data-store.realm.schemas.account.v5.group-contact :as group-contact]
[status-im.data-store.realm.schemas.account.v8.local-storage :as local-storage]
[status-im.data-store.realm.schemas.account.v21.browser :as browser]
[goog.object :as object]
[taoensso.timbre :as log]
[cljs.reader :as reader]
[clojure.string :as string]))
(def schema [chat/schema
chat-contact/schema
contact/schema
discover/schema
message/schema
pending-message/schema
processed-message/schema
request/schema
user-status/schema
contact-group/schema
group-contact/schema
local-storage/schema
browser/schema])
(defn update-new-message [new-realm message-id to-clock-value from-clock-value]
(when-let [message (some-> new-realm
(.objects "message")
(.filtered (str "message-id = \"" message-id "\""))
(aget 0))]
(aset message "to-clock-value" to-clock-value)
(aset message "from-clock-value" from-clock-value)))
(defn update-chat-messages [old-realm new-realm chat-id]
(let [from-clock-value (atom 0)
to-clock-value (atom 0)]
(some-> old-realm
(.objects "message")
(.filtered (str "chat-id = \"" chat-id "\""))
(.sorted "clock-value" false)
(.map (fn [message _ _]
(let [message-id (object/get message "message-id")
outgoing? (boolean (object/get message "outgoing"))]
(if outgoing?
(update-new-message new-realm message-id (swap! to-clock-value inc) @from-clock-value)
(update-new-message new-realm message-id @to-clock-value (swap! from-clock-value inc)))))))))
(defn update-chats [old-realm new-realm]
(some-> new-realm
(.objects "chat")
(.map (fn [chat _ _]
(update-chat-messages old-realm new-realm (object/get chat "chat-id"))))))
(defn migration [old-realm new-realm]
(log/debug "migrating v23 account database: " old-realm new-realm)
(update-chats old-realm new-realm))

View File

@ -0,0 +1,32 @@
(ns status-im.data-store.realm.schemas.account.v23.message)
(def schema {:name :message
:primaryKey :message-id
:properties {:message-id :string
:from :string
:to {:type :string
:optional true}
:group-id {:type :string
:optional true}
:content :string ; TODO make it ArrayBuffer
:content-type :string
:username {:type :string
:optional true}
:timestamp :int
:chat-id {:type :string
:indexed true}
:outgoing :bool
:retry-count {:type :int
:default 0}
:message-type {:type :string
:optional true}
:message-status {:type :string
:optional true}
:user-statuses {:type :list
:objectType :user-status}
:from-clock-value {:type :int
:default 0}
:to-clock-value {:type :int
:default 0}
:show? {:type :bool
:default true}}})

View File

@ -470,7 +470,7 @@
route-fx (case type route-fx (case type
(:message (:message
:group-message :group-message
:public-group-message) {:dispatch [:pre-received-message (transform-protocol-message message)]} :public-group-message) {:dispatch [:chat-received-message/add (transform-protocol-message message)]}
:pending (cond-> {::pending-messages-save message} :pending (cond-> {::pending-messages-save message}
chat-message chat-message
(assoc :dispatch (assoc :dispatch