Order of messages fix (#550)

This commit is contained in:
alwx 2016-12-13 13:56:00 +03:00
parent 080f2f508a
commit 158d0ba033
13 changed files with 214 additions and 71 deletions

View File

@ -469,11 +469,10 @@
((after update-chat!)))) ((after update-chat!))))
(register-handler :upsert-chat! (register-handler :upsert-chat!
(fn [db [_ {:keys [chat-id clock-value] :as opts}]] (fn [db [_ {:keys [chat-id] :as opts}]]
(let [chat (if (chats/exists? chat-id) (let [chat (if (chats/exists? chat-id)
(let [{old-clock-value :clock-value :as chat} (chats/get-by-id chat-id)] (let [chat (chats/get-by-id chat-id)]
(assoc chat :clock-value (max old-clock-value clock-value) (assoc chat :timestamp (random/timestamp)))
:timestamp (random/timestamp)))
(prepare-chat db chat-id opts))] (prepare-chat db chat-id opts))]
(chats/save chat) (chats/save chat)
(update-in db [:chats chat-id] merge chat)))) (update-in db [:chats chat-id] merge chat))))
@ -582,6 +581,38 @@
(dispatch [:remove-unviewed-messages chat-id])))] (dispatch [:remove-unviewed-messages chat-id])))]
(u/side-effect! send-seen!)) (u/side-effect! send-seen!))
(defn send-clock-value-request!
[{:keys [web3 current-public-key]} [_ {:keys [message-id from] :as message}]]
(protocol/send-clock-value-request! {:web3 web3
:message {:from current-public-key
:to from
:message-id message-id}}))
(register-handler :send-clock-value-request! (u/side-effect! send-clock-value-request!))
(defn send-clock-value!
[{:keys [web3 current-public-key]} to message-id clock-value]
(when current-public-key
(protocol/send-clock-value! {:web3 web3
:message {:from current-public-key
:to to
:message-id message-id
:clock-value clock-value}})))
(register-handler :update-clock-value!
(after (fn [db [_ to i {:keys [message-id] :as message} last-clock-value]]
(let [clock-value (+ last-clock-value i 1)]
(messages/update (assoc message :clock-value clock-value))
(send-clock-value! db to message-id clock-value))))
(fn [db [_ _ i {:keys [message-id] :as message} last-clock-value]]
(assoc-in db [:message-extras message-id :clock-value] (+ last-clock-value i 1))))
(register-handler :send-clock-value!
(u/side-effect!
(fn [db [_ to message-id]]
(let [{:keys [clock-value]} (messages/get-by-id message-id)]
(send-clock-value! db to message-id clock-value)))))
(register-handler :set-web-view-url (register-handler :set-web-view-url
(fn [{:keys [current-chat-id] :as db} [_ url]] (fn [{:keys [current-chat-id] :as db} [_ url]]
(assoc-in db [:web-view-url current-chat-id] url))) (assoc-in db [:web-view-url current-chat-id] url)))
@ -611,14 +642,6 @@
(dispatch [:set-chat-command (keyword autorun)]) (dispatch [:set-chat-command (keyword autorun)])
(dispatch [:animate-command-suggestions]))))))) (dispatch [:animate-command-suggestions])))))))
(register-handler :inc-clock
(u/side-effect!
(fn [_ [_ chat-id]]
(let [chat (-> (chats/get-by-id chat-id)
(update :clock-value inc)
(assoc :timestamp (random/timestamp)))]
(dispatch [:update-chat! chat])))))
(register-handler :update-group-message (register-handler :update-group-message
(u/side-effect! (u/side-effect!
(fn [{:keys [current-public-key web3 chats]} (fn [{:keys [current-public-key web3 chats]}
@ -641,3 +664,10 @@
:identity current-public-key :identity current-public-key
:keypair keypair :keypair keypair
:callback #(dispatch [:incoming-message %1 %2])})))))))) :callback #(dispatch [:incoming-message %1 %2])}))))))))
(register-handler :update-message-overhead!
(u/side-effect!
(fn [_ [_ chat-id network-status]]
(if (= network-status :offline)
(chats/inc-message-overhead chat-id)
(chats/reset-message-overhead chat-id)))))

View File

@ -32,16 +32,18 @@
(declare add-message-to-wallet) (declare add-message-to-wallet)
(defn add-message (defn add-message
[db {:keys [from group-id chat-id message-id timestamp clock-value] :as message :or {clock-value 0}}] [db {:keys [from group-id chat-id
message-id timestamp clock-value show?]
:as message
:or {clock-value 0}}]
(let [same-message (messages/get-by-id message-id) (let [same-message (messages/get-by-id message-id)
current-identity (get-current-identity db) current-identity (get-current-identity db)
chat-id' (or group-id chat-id from) chat-id' (or group-id chat-id from)
exists? (chats/exists? chat-id') exists? (chats/exists? chat-id')
active? (chats/is-active? chat-id') active? (chats/is-active? chat-id')
chat-clock-value (messages/get-last-clock-value chat-id')
clock-value (if (= clock-value 0) clock-value (if (= clock-value 0)
(-> (chats/get-by-id chat-id') (inc chat-clock-value)
(get :clock-value)
(inc))
clock-value)] clock-value)]
(when (and (not same-message) (when (and (not same-message)
(not= from current-identity) (not= from current-identity)
@ -55,13 +57,14 @@
:timestamp (or timestamp (random/timestamp)) :timestamp (or timestamp (random/timestamp))
:clock-value clock-value)] :clock-value clock-value)]
(store-message message') (store-message message')
(dispatch [:upsert-chat! {:chat-id chat-id' (dispatch [:upsert-chat! {:chat-id chat-id'
:group-chat group-chat? :group-chat group-chat?}])
:clock-value clock-value}])
(dispatch [::add-message chat-id' message']) (dispatch [::add-message chat-id' message'])
(when (= (:content-type message') content-type-command-request) (when (= (:content-type message') content-type-command-request)
(dispatch [:add-request chat-id' message'])) (dispatch [:add-request chat-id' message']))
(dispatch [:add-unviewed-message chat-id' message-id])) (dispatch [:add-unviewed-message chat-id' message-id])
(when-not show?
(dispatch [:send-clock-value-request! message])))
(if (and (if (and
(= (:content-type message) content-type-command) (= (:content-type message) content-type-command)
(not= chat-id' wallet-chat-id) (not= chat-id' wallet-chat-id)
@ -108,3 +111,4 @@
(s/execute-later (s/execute-later
#(dispatch [:received-message-when-commands-loaded chat-id message]) #(dispatch [:received-message-when-commands-loaded chat-id message])
timeout))))) timeout)))))

View File

@ -1,6 +1,7 @@
(ns status-im.chat.handlers.send-message (ns status-im.chat.handlers.send-message
(:require [status-im.utils.handlers :refer [register-handler] :as u] (:require [status-im.utils.handlers :refer [register-handler] :as u]
[clojure.string :as s] [clojure.string :as s]
[status-im.data-store.chats :as chats]
[status-im.data-store.messages :as messages] [status-im.data-store.messages :as messages]
[status-im.components.status :as status] [status-im.components.status :as status]
[status-im.utils.random :as random] [status-im.utils.random :as random]
@ -43,7 +44,8 @@
:to-message to-message :to-message to-message
:type (:type command) :type (:type command)
:has-handler (:has-handler command) :has-handler (:has-handler command)
:clock-value (inc clock-value)})) :clock-value (inc clock-value)
:show? true}))
(register-handler :send-chat-message (register-handler :send-chat-message
(u/side-effect! (u/side-effect!
@ -95,14 +97,15 @@
(register-handler :prepare-command! (register-handler :prepare-command!
(u/side-effect! (u/side-effect!
(fn [{:keys [current-public-key] :as db} (fn [{:keys [current-public-key network-status] :as db}
[_ add-to-chat-id {:keys [chat-id staged-command command handler-data] :as params}]] [_ add-to-chat-id {:keys [chat-id staged-command command handler-data] :as params}]]
(let [{:keys [clock-value]} (get-in db [:chats add-to-chat-id]) (let [clock-value (messages/get-last-clock-value chat-id)
request (:request (:handler-data command)) request (:request (:handler-data command))
command' (->> (assoc staged-command :handler-data handler-data) command' (->> (assoc staged-command :handler-data handler-data)
(prepare-command current-public-key chat-id clock-value request) (prepare-command current-public-key chat-id clock-value request)
(cu/check-author-direction db chat-id))] (cu/check-author-direction db chat-id))]
(log/debug "Handler data: " request handler-data (dissoc params :commands :staged-command)) (log/debug "Handler data: " request handler-data (dissoc params :commands :staged-command))
(dispatch [:update-message-overhead! chat-id network-status])
(dispatch [:clear-command chat-id (:id staged-command)]) (dispatch [:clear-command chat-id (:id staged-command)])
(dispatch [::send-command! add-to-chat-id (assoc params :command command')]) (dispatch [::send-command! add-to-chat-id (assoc params :command command')])
(when (cu/console? chat-id) (when (cu/console? chat-id)
@ -179,22 +182,25 @@
(register-handler ::prepare-message (register-handler ::prepare-message
(u/side-effect! (u/side-effect!
(fn [db [_ {:keys [chat-id identity message] :as params}]] (fn [{:keys [network-status]} [_ {:keys [chat-id identity message] :as params}]]
(let [{:keys [group-chat clock-value]} (get-in db [:chats chat-id]) (let [{:keys [group-chat]} (get-in db [:chats chat-id])
message' (cu/check-author-direction clock-value (messages/get-last-clock-value chat-id)
db chat-id message' (cu/check-author-direction
{:message-id (random/id) db chat-id
:chat-id chat-id {:message-id (random/id)
:content message :chat-id chat-id
:from identity :content message
:content-type text-content-type :from identity
:outgoing true :content-type text-content-type
:timestamp (time/now-ms) :outgoing true
:clock-value (inc clock-value)}) :timestamp (time/now-ms)
message'' (if group-chat :clock-value (inc clock-value)
(assoc message' :group-id chat-id :message-type :group-user-message) :show? true})
(assoc message' :to chat-id :message-type :user-message)) message'' (if group-chat
params' (assoc params :message message'')] (assoc message' :group-id chat-id :message-type :group-user-message)
(assoc message' :to chat-id :message-type :user-message))
params' (assoc params :message message'')]
(dispatch [:update-message-overhead! chat-id network-status])
(dispatch [::add-message params']) (dispatch [::add-message params'])
(dispatch [::save-message! params']))))) (dispatch [::save-message! params'])))))
@ -261,15 +267,20 @@
(register-handler ::send-message! (register-handler ::send-message!
(u/side-effect! (u/side-effect!
(fn [{:keys [web3 chats] :as db} [_ {{:keys [message-type] (fn [{:keys [web3 chats network-status]
:as message} :message :as db} [_ {{:keys [message-type]
chat-id :chat-id}]] :as message} :message
chat-id :chat-id}]]
(let [{:keys [dapp?] :as contact} (get-in db [:contacts chat-id])] (let [{:keys [dapp?] :as contact} (get-in db [:contacts chat-id])]
(if dapp? (if dapp?
(dispatch [::send-dapp-message chat-id message]) (dispatch [::send-dapp-message chat-id message])
(when message (when message
(let [message' (select-keys message [:from :message-id]) (let [message' (select-keys message [:from :message-id])
payload (select-keys message [:timestamp :content :content-type :clock-value]) payload (select-keys message [:timestamp :content :content-type
:clock-value :show?])
payload (if (= network-status :offline)
(assoc payload :show? false)
payload)
options {:web3 web3 options {:web3 web3
:message (assoc message' :payload payload)}] :message (assoc message' :payload payload)}]
(if (= message-type :group-user-message) (if (= message-type :group-user-message)
@ -279,12 +290,11 @@
:keypair {:public public-key :keypair {:public public-key
:private private-key}))) :private private-key})))
(protocol/send-message! (assoc-in options (protocol/send-message! (assoc-in options
[:message :to] (:to message))))))) [:message :to] (:to message)))))))))))
(dispatch [:inc-clock chat-id])))))
(register-handler ::send-command-protocol! (register-handler ::send-command-protocol!
(u/side-effect! (u/side-effect!
(fn [{:keys [web3 current-public-key chats] :as db} (fn [{:keys [web3 current-public-key chats network-status] :as db}
[_ {:keys [chat-id command]}]] [_ {:keys [chat-id command]}]]
(log/debug "sending command: " command) (log/debug "sending command: " command)
(when (cu/not-console? chat-id) (when (cu/not-console? chat-id)
@ -292,8 +302,12 @@
{:keys [group-chat]} (get-in db [:chats chat-id]) {:keys [group-chat]} (get-in db [:chats chat-id])
payload (-> command payload (-> command
(select-keys [:content :content-type :clock-value]) (select-keys [:content :content-type
:clock-value :show?])
(assoc :timestamp (datetime/now-ms))) (assoc :timestamp (datetime/now-ms)))
payload (if (= network-status :offline)
(assoc payload :show? false)
payload)
options {:web3 web3 options {:web3 web3
:message {:from current-public-key :message {:from current-public-key
:message-id (:message-id command) :message-id (:message-id command)
@ -304,5 +318,4 @@
:keypair {:public public-key :keypair {:public public-key
:private private-key})) :private private-key}))
(protocol/send-message! (assoc-in options (protocol/send-message! (assoc-in options
[:message :to] chat-id))) [:message :to] chat-id))))))))
(dispatch [:inc-clock chat-id]))))))

View File

@ -118,6 +118,7 @@
:custom-action [toolbar-action] :custom-action [toolbar-action]
:style (get-in platform-specific [:component-styles :toolbar])}] :style (get-in platform-specific [:component-styles :toolbar])}]
[add-contact-bar]]) [add-contact-bar]])
(defn get-intro-status-message [all-messages] (defn get-intro-status-message [all-messages]
(let [{:keys [timestamp content-type] :as last-message} (last all-messages)] (let [{:keys [timestamp content-type] :as last-message} (last all-messages)]
(when (not= content-type content-type-status) (when (not= content-type content-type-status)
@ -125,13 +126,14 @@
:content-type content-type-status :content-type content-type-status
:timestamp (or timestamp (time/now-ms))}))) :timestamp (or timestamp (time/now-ms))})))
(defn messages-with-timemarks [all-messages extras]
(defn messages-with-timemarks [all-messages]
(let [status-message (get-intro-status-message all-messages) (let [status-message (get-intro-status-message all-messages)
all-messages (if status-message all-messages (if status-message
(concat all-messages [status-message]) (concat all-messages [status-message])
all-messages) all-messages)
messages (->> all-messages messages (->> all-messages
(map #(merge % (get extras (:message-id %))))
(remove #(false? (:show? %)))
(sort-by :clock-value >) (sort-by :clock-value >)
(map #(assoc % :datemark (time/day-relative (:timestamp %)))) (map #(assoc % :datemark (time/day-relative (:timestamp %))))
(group-by :datemark) (group-by :datemark)
@ -147,9 +149,10 @@
(defview messages-view [group-chat] (defview messages-view [group-chat]
[messages [:chat :messages] [messages [:chat :messages]
contacts [:chat :contacts] contacts [:chat :contacts]
message-extras [:get :message-extras]
loaded? [:all-messages-loaded?]] loaded? [:all-messages-loaded?]]
(let [contacts' (contacts-by-identity contacts) (let [contacts' (contacts-by-identity contacts)
messages (messages-with-timemarks messages)] messages (messages-with-timemarks messages message-extras)]
[list-view {:renderRow (fn [row _ index] [list-view {:renderRow (fn [row _ index]
(message-row {:contact-by-identity contacts' (message-row {:contact-by-identity contacts'
:group-chat group-chat :group-chat group-chat

View File

@ -31,11 +31,6 @@
(get-in [:chats (:current-chat-id @db) k]) (get-in [:chats (:current-chat-id @db) k])
(reaction)))) (reaction))))
(register-sub :get-chat-messages
(fn [db _]
(let [chat-id (:current-chat-id @db)]
(reaction (get-in @db [:chats chat-id :messages])))))
(register-sub :get-current-chat-id (register-sub :get-current-chat-id
(fn [db _] (fn [db _]
(reaction (:current-chat-id @db)))) (reaction (:current-chat-id @db))))

View File

@ -64,6 +64,10 @@
[chat-id] [chat-id]
(get-property chat-id :removed-at)) (get-property chat-id :removed-at))
(defn get-message-overhead
[chat-id]
(get-property chat-id :message-overhead))
(defn get-active-group-chats (defn get-active-group-chats
[] []
(data-store/get-active-group-chats)) (data-store/get-active-group-chats))
@ -72,6 +76,14 @@
[chat-id active?] [chat-id active?]
(save-property chat-id :is-active active?)) (save-property chat-id :is-active active?))
(defn inc-message-overhead
[chat-id]
(save-property chat-id :message-overhead (inc (get-message-overhead chat-id))))
(defn reset-message-overhead
[chat-id]
(save-property chat-id :message-overhead 0))
(defn new-update? (defn new-update?
[timestamp chat-id] [timestamp chat-id]
(let (let

View File

@ -63,6 +63,10 @@
(generate-hiccup (read-string preview))))) (generate-hiccup (read-string preview)))))
message))))) message)))))
(defn get-count-by-chat-id
[chat-id]
(data-store/get-count-by-chat-id chat-id))
(defn get-by-chat-id (defn get-by-chat-id
([chat-id] ([chat-id]
(get-by-chat-id chat-id 0)) (get-by-chat-id chat-id 0))
@ -81,13 +85,24 @@
message)))))) message))))))
(defn get-last-message (defn get-last-message
[{:keys [chats] :as db} chat-id] [db chat-id]
(if-let [message (first (get-in db [:chats chat-id :messages]))] (if-let [{:keys [content-type] :as message} (data-store/get-last-message chat-id)]
message (if (command-type? content-type)
(if-let [{:keys [content-type] :as message} (data-store/get-last-message chat-id)] (clojure.core/update message :content str-to-map)
(if (command-type? content-type) message)))
(clojure.core/update message :content str-to-map)
message)))) (defn get-last-outgoing
[chat-id number-of-messages]
(data-store/get-by-fields {:chat-id chat-id
:outgoing true}
0
number-of-messages))
(defn get-last-clock-value
[chat-id]
(if-let [message (data-store/get-last-message chat-id)]
(:clock-value message)
0))
(defn get-unviewed (defn get-unviewed
[] []

View File

@ -27,6 +27,11 @@
(realm/page from (+ from number-of-messages)) (realm/page from (+ from number-of-messages))
(realm/realm-collection->list)))) (realm/realm-collection->list))))
(defn get-count-by-chat-id
[chat-id]
(-> (get-by-chat-id chat-id)
(realm/get-count)))
(defn get-by-fields (defn get-by-fields
[fields from number-of-messages] [fields from number-of-messages]
(-> (realm/get-by-fields @realm/account-realm :message :and fields) (-> (realm/get-by-fields @realm/account-realm :message :and fields)

View File

@ -29,12 +29,12 @@
:updated-at {:type :int :updated-at {:type :int
:optional true} :optional true}
:last-message-id :string :last-message-id :string
:message-overhead {:type :int
:default 0}
:public-key {:type :string :public-key {:type :string
:optional true} :optional true}
:private-key {:type :string :private-key {:type :string
:optional true} :optional true}
:clock-value {:type :int
:default 0}
:pending-contact? {:type :bool :pending-contact? {:type :bool
:default false} :default false}
:contact-info {:type :string :contact-info {:type :string

View File

@ -28,7 +28,9 @@
:user-statuses {:type :list :user-statuses {:type :list
:objectType "user-status"} :objectType "user-status"}
:clock-value {:type :int :clock-value {:type :int
:default 0}}}) :default 0}
:show? {:type :bool
:default true}}})
(defn migration [old-realm new-realm] (defn migration [old-realm new-realm]
(log/debug "migrating message schema")) (log/debug "migrating message schema"))

View File

@ -41,3 +41,25 @@
:requires-ack? false) :requires-ack? false)
(assoc-in [:payload :group-id] (:group-id message)) (assoc-in [:payload :group-id] (:group-id message))
(dissoc :group-id))))) (dissoc :group-id)))))
(defn send-clock-value-request!
[{:keys [web3 message]}]
(debug :send-clock-value-request message)
(d/add-pending-message!
web3
(merge message-defaults
(-> message
(assoc
:type :clock-value-request
:requires-ack? false)))))
(defn send-clock-value!
[{:keys [web3 message]}]
(debug :send-clock-value message)
(d/add-pending-message!
web3
(merge message-defaults
(-> message
(assoc
:type :clock-value
:requires-ack? false)))))

View File

@ -17,6 +17,8 @@
;; user ;; user
(def send-message! chat/send!) (def send-message! chat/send!)
(def send-seen! chat/send-seen!) (def send-seen! chat/send-seen!)
(def send-clock-value-request! chat/send-clock-value-request!)
(def send-clock-value! chat/send-clock-value!)
(def reset-pending-messages! d/reset-pending-messages!) (def reset-pending-messages! d/reset-pending-messages!)
;; group ;; group

View File

@ -97,6 +97,8 @@
(dispatch [:message-delivered message]) (dispatch [:message-delivered message])
(dispatch [:pending-message-remove message])) (dispatch [:pending-message-remove message]))
:seen (dispatch [:message-seen message]) :seen (dispatch [:message-seen message])
:clock-value-request (dispatch [:message-clock-value-request message])
:clock-value (dispatch [:message-clock-value message])
:group-invitation (dispatch [:group-chat-invite-received message]) :group-invitation (dispatch [:group-chat-invite-received message])
:update-group (dispatch [:update-group-message message]) :update-group (dispatch [:update-group-message message])
:add-group-identity (dispatch [:participant-invited-to-group message]) :add-group-identity (dispatch [:participant-invited-to-group message])
@ -275,6 +277,15 @@
(assoc message :message-status status))] (assoc message :message-status status))]
(messages/update message))))))) (messages/update message)))))))
(defn save-message-clock-value!
[{:keys [message-extras] :as db}
[_ {:keys [from]
{:keys [message-id clock-value]} :payload}]]
(when-let [{old-clock-value :clock-value
:as message} (merge (messages/get-by-id message-id)
(get message-extras message-id))]
(if (>= clock-value old-clock-value)
(messages/update (assoc message :clock-value clock-value :show? true)))))
(defn update-message-status [status] (defn update-message-status [status]
(fn [db (fn [db
@ -314,6 +325,35 @@
[(after (save-message-status! :seen))] [(after (save-message-status! :seen))]
(update-message-status :seen)) (update-message-status :seen))
(register-handler :message-clock-value-request
(u/side-effect!
(fn [db [_ {:keys [from] {:keys [message-id]} :payload}]]
(let [{:keys [chat-id]} (messages/get-by-id message-id)
message-overhead (chats/get-message-overhead chat-id)
last-clock-value (messages/get-last-clock-value chat-id)]
(if (> message-overhead 0)
(let [last-outgoing (->> (messages/get-last-outgoing chat-id message-overhead)
(reverse)
(map-indexed vector))]
(chats/reset-message-overhead chat-id)
(doseq [[i message] last-outgoing]
(dispatch [:update-clock-value! from i message (+ last-clock-value 100)])))
(dispatch [:send-clock-value! from message-id]))))))
(register-handler :message-clock-value
(after save-message-clock-value!)
(fn [{:keys [message-extras] :as db}
[_ {:keys [from]
{:keys [message-id clock-value]} :payload}]]
(if-let [{old-clock-value :clock-value
:as message} (merge (messages/get-by-id message-id)
(get message-extras message-id))]
(if (> clock-value old-clock-value)
(assoc-in db [:message-extras message-id] {:clock-value clock-value
:show? true})
db)
db)))
(register-handler :pending-message-upsert (register-handler :pending-message-upsert
(after (after
(fn [_ [_ {:keys [type id] :as pending-message}]] (fn [_ [_ {:keys [type id] :as pending-message}]]