Dont override last-clock-value on messages
The denormalized-clock-value was erroneously set to the one of the last message received. This meant that on chats were the clock-value raced ahead of the timestamp (#status), a message from the mailserver or a message from someone with an old clock-value would basically make those messages be sorted in the past. The correct behavior is that last-clock-value for a given chat should be the maximum last clock value ever seen for that chat. Signed-off-by: Andrea Maria Piana <andrea.maria.piana@gmail.com>
This commit is contained in:
parent
b8d4f2911b
commit
c9994b5d0f
|
@ -111,7 +111,7 @@
|
|||
(fx/defn add-message
|
||||
[{:keys [db] :as cofx}
|
||||
{{:keys [chat-id message-id clock-value timestamp from] :as message} :message
|
||||
:keys [current-chat? batch? last-clock-value dedup-id raw-message]}]
|
||||
:keys [current-chat? batch? dedup-id raw-message]}]
|
||||
(let [current-public-key (accounts.db/current-public-key cofx)
|
||||
prepared-message (-> message
|
||||
(prepare-message chat-id current-chat?)
|
||||
|
@ -126,11 +126,7 @@
|
|||
{:db (cond->
|
||||
(-> db
|
||||
(update-in [:chats chat-id :messages] assoc message-id prepared-message)
|
||||
(update-in [:chats chat-id :last-clock-value]
|
||||
(fn [old-clock-value]
|
||||
(or last-clock-value
|
||||
(utils.clocks/receive clock-value
|
||||
old-clock-value)))))
|
||||
(update-in [:chats chat-id :last-clock-value] (partial utils.clocks/receive clock-value)))
|
||||
|
||||
(and (not current-chat?)
|
||||
(not= from current-public-key))
|
||||
|
@ -280,8 +276,7 @@
|
|||
(chat-model/upsert-chat
|
||||
{:chat-id chat-id
|
||||
:last-message-content content
|
||||
:last-message-content-type content-type
|
||||
:last-clock-value clock-value})))
|
||||
:last-message-content-type content-type})))
|
||||
|
||||
(fx/defn update-last-messages
|
||||
[{:keys [db] :as cofx} chat-ids]
|
||||
|
@ -378,8 +373,7 @@
|
|||
:last-clock-value (:clock-value message)})
|
||||
(add-message {:batch? false
|
||||
:message message-with-id
|
||||
:current-chat? true
|
||||
:last-clock-value (:clock-value message)})
|
||||
:current-chat? true})
|
||||
(add-own-status chat-id message-id :sending)
|
||||
(send chat-id message-id wrapped-record))))
|
||||
|
||||
|
|
|
@ -429,6 +429,8 @@
|
|||
contact-device-info/v1
|
||||
contact-recovery/v1])
|
||||
|
||||
(def v38 v37)
|
||||
|
||||
;; put schemas ordered by version
|
||||
(def schemas [{:schema v1
|
||||
:schemaVersion 1
|
||||
|
@ -540,4 +542,7 @@
|
|||
:migration (constantly nil)}
|
||||
{:schema v37
|
||||
:schemaVersion 37
|
||||
:migration (constantly nil)}])
|
||||
:migration (constantly nil)}
|
||||
{:schema v38
|
||||
:schemaVersion 38
|
||||
:migration migrations/v38}])
|
||||
|
|
|
@ -360,3 +360,12 @@
|
|||
chat-id (aget old-chat "chat-id")]
|
||||
(when (one-to-one? chat-id)
|
||||
(aset new-chat "one-to-one" true))))))
|
||||
|
||||
(defn v38 [old-realm new-realm]
|
||||
(log/debug "migrating chats v38")
|
||||
(let [chats (.objects new-realm "chat")]
|
||||
(dotimes [i (.-length chats)]
|
||||
(let [chat (aget chats i)
|
||||
chat-id (aget chat "chat-id")]
|
||||
(when-let [last-clock-value (get-last-clock-value new-realm chat-id)]
|
||||
(aset chat "last-clock-value" last-clock-value))))))
|
||||
|
|
|
@ -94,6 +94,5 @@
|
|||
(inc (max local-clock (->timestamp-bid))))
|
||||
|
||||
(defn receive [message-clock local-clock]
|
||||
(-> (+ 1000 (max (or message-clock 0) (or local-clock 0)))
|
||||
safe-timestamp
|
||||
inc))
|
||||
(-> (max (or message-clock 0) (or local-clock 0))
|
||||
safe-timestamp))
|
||||
|
|
|
@ -63,6 +63,37 @@
|
|||
(testing "it marks it as sent"
|
||||
(is (= :sent status)))))))
|
||||
|
||||
(deftest receive-many-clock-value
|
||||
(let [db {:account/account {:public-key "me"}
|
||||
:view-id :chat
|
||||
:current-chat-id "chat-id"
|
||||
:chats {"chat-id" {:last-clock-value 10
|
||||
:messages {}}}}]
|
||||
(testing "a message with a higher clock value"
|
||||
(let [actual (message/receive-many {:db db}
|
||||
[{:from "chat-id"
|
||||
:message-type :user-message
|
||||
:timestamp 0
|
||||
:message-id "id"
|
||||
:chat-id "chat-id"
|
||||
:content "b"
|
||||
:clock-value 12}])
|
||||
chat-clock-value (get-in actual [:db :chats "chat-id" :last-clock-value])]
|
||||
(testing "it sets last-clock-value"
|
||||
(is (= 12 chat-clock-value)))))
|
||||
(testing "a message with a lower clock value"
|
||||
(let [actual (message/receive-many {:db db}
|
||||
[{:from "chat-id"
|
||||
:message-type :user-message
|
||||
:timestamp 0
|
||||
:message-id "id"
|
||||
:chat-id "chat-id"
|
||||
:content "b"
|
||||
:clock-value 2}])
|
||||
chat-clock-value (get-in actual [:db :chats "chat-id" :last-clock-value])]
|
||||
(testing "it sets last-clock-value"
|
||||
(is (= 10 chat-clock-value)))))))
|
||||
|
||||
(deftest receive-group-chats
|
||||
(let [cofx {:db {:chats {"chat-id" {:contacts #{"present"}
|
||||
:members-joined #{"a"}}}
|
||||
|
|
|
@ -2,91 +2,6 @@
|
|||
(:require [cljs.test :refer-macros [deftest is testing]]
|
||||
[status-im.utils.clocks :as clocks]))
|
||||
|
||||
;; Messages are shown on a per-chat basis, ordered by the message clock-value.
|
||||
;; See status-im-utils.clocks namespace for details.
|
||||
|
||||
;; We are not a monolith.
|
||||
(def a (atom {:identity "a"}))
|
||||
(def b (atom {:identity "b"}))
|
||||
(def c (atom {:identity "c"}))
|
||||
|
||||
(declare recv!)
|
||||
|
||||
;; The network is unreliable.
|
||||
(defn random-broadcast! [chat-id message]
|
||||
(when (> (rand-int 10) 5) (recv! a chat-id message))
|
||||
(when (> (rand-int 10) 5) (recv! b chat-id message))
|
||||
(when (> (rand-int 10) 5) (recv! c chat-id message)))
|
||||
|
||||
(defn get-last-clock-value
|
||||
[db chat-id]
|
||||
(if-let [messages (-> @db :chats chat-id :messages)]
|
||||
(-> (sort-by :clock-value > messages)
|
||||
first
|
||||
:clock-value)
|
||||
0))
|
||||
|
||||
(defn save! [db chat-id message]
|
||||
(swap! db
|
||||
(fn [state]
|
||||
(let [messages (-> state :chats chat-id :messages)]
|
||||
(assoc-in state [:chats chat-id :messages]
|
||||
(conj messages message))))))
|
||||
|
||||
(defn send! [db chat-id message]
|
||||
(let [clock-value (get-last-clock-value db chat-id)
|
||||
prepared-message (assoc message :clock-value (clocks/send clock-value))]
|
||||
(save! db chat-id prepared-message)
|
||||
(random-broadcast! chat-id prepared-message)))
|
||||
|
||||
(defn recv! [db chat-id {:keys [clock-value] :as message}]
|
||||
(let [local-clock (get-last-clock-value db chat-id)
|
||||
new-clock (clocks/receive clock-value local-clock)]
|
||||
(when-not (= (:from message) (:identity @db))
|
||||
(save! db chat-id (assoc message :clock-value new-clock)))))
|
||||
|
||||
(defn thread [db chat-id]
|
||||
(let [messages (-> @db :chats chat-id :messages)]
|
||||
(sort-by :clock-value < messages)))
|
||||
|
||||
(defn format-message [{:keys [from text]}]
|
||||
(str from ": " text ", "))
|
||||
|
||||
(defn format-thread [thread]
|
||||
(apply str (map format-message thread)))
|
||||
|
||||
;; Invariant we want to maintain.
|
||||
(defn ordered-increasing-text? [thread]
|
||||
(let [xs (map :text thread)]
|
||||
(or (empty? xs) (apply < xs))))
|
||||
|
||||
(defn simulate! []
|
||||
(send! a :foo {:from "a" :text "1"})
|
||||
(send! a :foo {:from "a" :text "2"})
|
||||
|
||||
(send! a :bar {:from "a" :text "1"})
|
||||
|
||||
(send! b :foo {:from "b" :text "3"})
|
||||
(send! c :foo {:from "c" :text "4"})
|
||||
(send! a :foo {:from "a" :text "5"})
|
||||
|
||||
(send! c :bar {:from "c" :text "7"}))
|
||||
|
||||
(deftest clocks
|
||||
(testing "Message order preserved"
|
||||
(simulate!)
|
||||
(is (ordered-increasing-text? (thread a :foo)))
|
||||
(is (ordered-increasing-text? (thread b :foo)))
|
||||
(is (ordered-increasing-text? (thread c :foo)))
|
||||
(is (ordered-increasing-text? (thread a :bar))))
|
||||
|
||||
(testing "Bad thread recognized as such"
|
||||
(let [bad-thread '({:from "a", :text "1", :clock-value 1}
|
||||
{:from "c", :text "4", :clock-value 1}
|
||||
{:from "a", :text "2", :clock-value 2}
|
||||
{:from "a", :text "5", :clock-value 8})]
|
||||
(is (not (ordered-increasing-text? bad-thread))))))
|
||||
|
||||
(deftest safe-timestamp
|
||||
(testing "it caps the timestamp when a value too large is provided"
|
||||
(is (< (clocks/receive js/Number.MAX_SAFE_INTEGER 0)
|
||||
|
@ -97,10 +12,3 @@
|
|||
(is (not (clocks/safe-timestamp? js/Number.MAX_SAFE_INTEGER))))
|
||||
(testing "it returns true for a normal timestamp number"
|
||||
(is (clocks/safe-timestamp? (clocks/send 0)))))
|
||||
|
||||
;; Debugging
|
||||
;;(println "******************************************")
|
||||
;;(println "A's POV :foo" (format-thread (thread a :foo)))
|
||||
;;(println "B's POV :foo" (format-thread (thread b :foo)))
|
||||
;;(println "C's POV :foo" (format-thread (thread c :foo)))
|
||||
;;(println "******************************************")
|
||||
|
|
Loading…
Reference in New Issue