Add clock values for public group/chats

I have extended and modified the current algorithm for message ordering
so that it applies for group and public chats alike.

We use Lamport timestamps but we prefix the unix timestamp, which should
maximize the chances of the message being seen on the top of the chat.

Signed-off-by: Andrea Maria Piana <andrea.maria.piana@gmail.com>
This commit is contained in:
Andrea Maria Piana 2018-03-20 20:23:08 +02:00
parent b2f5146a8a
commit 3256d67c2e
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
17 changed files with 223 additions and 114 deletions

View File

@ -2,6 +2,7 @@
(:require [status-im.ui.components.styles :refer [default-chat-color]]
[status-im.utils.random :as random]
[status-im.constants :as constants]
[status-im.utils.clocks :as utils.clocks]
[status-im.i18n :as i18n]
[clojure.string :as string]))
@ -12,6 +13,7 @@
:chat-id constants/console-chat-id
:from constants/console-chat-id
:to "me"
:clock-value (utils.clocks/send 0)
:content content
:content-type content-type})
@ -27,8 +29,7 @@
:contacts [{:identity constants/console-chat-id
:text-color "#FFFFFF"
:background-color "#AB7967"}]
:last-to-clock-value 0
:last-from-clock-value 0})
:last-clock-value 0})
(def contact
{:whisper-identity constants/console-chat-id

View File

@ -4,6 +4,7 @@
[status-im.chat.events.commands :as commands-events]
[status-im.chat.models.message :as message-model]
[status-im.constants :as constants]
[status-im.utils.clocks :as utils.clocks]
[status-im.utils.handlers :as handlers]
[status-im.utils.random :as random]))
@ -65,6 +66,7 @@
:content (str type ": " message)
:content-type constants/content-type-log-message
:outgoing false
:clock-value (utils.clocks/send 0)
:chat-id chat-id
:from chat-id
:to "me"}]))))
@ -74,6 +76,7 @@
:content (str content)
:content-type constants/text-content-type
:outgoing false
:clock-value (utils.clocks/send 0)
:chat-id chat-id
:from chat-id
:to "me"}])))))

View File

@ -24,8 +24,7 @@
:is-active true
:timestamp now
:contacts [{:identity chat-id}]
:last-from-clock-value 0
:last-to-clock-value 0}))
:last-clock-value 0}))
(defn add-chat
"Adds new chat to db & realm, if the chat with same id already exists, justs restores it"
@ -52,8 +51,7 @@
:public? true
:is-active true
:timestamp now
:last-to-clock-value 0
:last-from-clock-value 0}]
:last-clock-value 0}]
{:db (assoc-in db [:chats topic] chat)
:data-store/save-chat chat}))
@ -68,8 +66,7 @@
:is-active true
:timestamp now
:contacts (mapv (partial hash-map :identity) participants)
:last-to-clock-value 0
:last-from-clock-value 0}]
:last-clock-value 0}]
{:db (assoc-in db [:chats chat-id] chat)
:data-store/save-chat chat}))

View File

@ -6,7 +6,7 @@
[status-im.chat.events.requests :as requests-events]
[status-im.chat.models :as chat-model]
[status-im.chat.models.commands :as commands-model]
[status-im.utils.clocks :as clocks-utils]
[status-im.utils.clocks :as utils.clocks]
[status-im.utils.handlers :as handlers]
[status-im.transport.utils :as transport.utils]
[status-im.transport.message.core :as transport]
@ -26,31 +26,18 @@
(:ref (get available-commands-responses response-name))))
(defn- add-message
[chat-id {:keys [message-id from-clock-value to-clock-value] :as message} current-chat? {:keys [db]}]
[chat-id {:keys [message-id clock-value] :as message} current-chat? {:keys [db]}]
(let [prepared-message (cond-> (assoc message :appearing? true)
(not current-chat?)
(assoc :appearing? false))]
{:db (cond-> (-> db
(update-in [:chats chat-id :messages] dissoc from-clock-value)
(update-in [:chats chat-id :messages] assoc message-id prepared-message)
(update-in [:chats chat-id :last-from-clock-value] max from-clock-value)
(update-in [:chats chat-id :last-to-clock-value] max to-clock-value))
{:db (cond->
(-> db
(update-in [:chats chat-id :messages] assoc message-id prepared-message)
(update-in [:chats chat-id :last-clock-value] (partial utils.clocks/receive clock-value))) ; this will increase last-clock-value twice when sending our own messages
(not current-chat?)
(update-in [:chats chat-id :unviewed-messages] (fnil conj #{}) message-id))
:data-store/save-message prepared-message}))
;; We start with [0 0] ([from-clock-value to-clock-value]) for each participant of 1-1 chat (local perspective on each device).
;; Now for sending, we always increment the to-clock-value and include it in message payload being sent (so only to-clock-value is present in network message).
;; Locally, the sent message always replicates the latest-from-clock-value of the chat.
;; Upon receiving message, receiver reads the to-clock-value of the received message and sets that to be the from-clock-value locally
;; (this will be also the new latest-from-clock-value of the chat), to-clock-value for the message is the latest-to-clock-value of the 1-1 chat`.
;; All this ensures, that there will be no [from-clock-value to-clock-value] duplicate in chat message on each device + the local order will appear consistent,
;; even if its possible it wont be the same on both devices (for example user A sends 5 messages, during the sending,
;; he receives the message from user B, so his local order will be A1, A2, B, A3, A4, A5, but his messages will take a long time to reach user B,
;; for some reason, so user B will see it as B, A1, A2, A3, A4, A5).
;; I dont think thats very problematic and I dont think we can do much about it without single source of truth where order received messages are serialised
;; and definite order is established (server), it is the case even in the current implementation.
(defn- prepare-chat [chat-id {:keys [db] :as cofx}]
(if (get-in db [:chats chat-id])
(chat-model/upsert-chat {:chat-id chat-id} cofx)
@ -63,28 +50,9 @@
(when send-seen?
(transport/send (protocol/map->MessagesSeen {:message-ids #{message-id}}) chat-id cofx)))
(defn- placeholder-message [chat-id from timestamp temp-id to-clock]
{:message-id temp-id
:outgoing false
:chat-id chat-id
:from from
:to "me"
:content "Waiting for message to arrive..."
:content-type constants/content-type-placeholder
:show? true
:from-clock-value temp-id
:to-clock-value to-clock
:timestamp timestamp})
(defn- add-placeholder-messages [chat-id from timestamp old-from-clock to-clock new-from-clock {:keys [db]}]
(when (> (- new-from-clock old-from-clock) 1)
{:db (reduce (fn [db temp-id]
(assoc-in db [:chats chat-id :messages temp-id] (placeholder-message chat-id from timestamp temp-id to-clock)))
db
(range (inc old-from-clock) new-from-clock))}))
(defn- add-received-message
[{:keys [from message-id chat-id content content-type timestamp to-clock-value] :as message}
[{:keys [from message-id chat-id content content-type timestamp clock-value to-clock-value] :as message}
{:keys [db now] :as cofx}]
(let [{:keys [current-chat-id
view-id
@ -92,22 +60,21 @@
:contacts/keys [contacts]} db
{:keys [public-key] :as current-account} (get-current-account db)
current-chat? (and (= :chat view-id) (= current-chat-id chat-id))
{:keys [last-from-clock-value
last-to-clock-value] :as chat} (get-in db [:chats chat-id])
{:keys [last-clock-value] :as chat} (get-in db [:chats chat-id])
request-command (:request-command content)
command-request? (and (= content-type constants/content-type-command-request)
request-command)
new-from-clock-value (or to-clock-value (inc last-from-clock-value))
new-timestamp (or timestamp now)]
(handlers/merge-fx cofx
(add-message chat-id
(cond-> (assoc message
:timestamp new-timestamp
:show? true
:from-clock-value new-from-clock-value
:to-clock-value last-to-clock-value)
:show? true)
public-key
(assoc :user-statuses {public-key (if current-chat? :seen :received)})
(not clock-value)
(assoc :clock-value (utils.clocks/send last-clock-value)) ; TODO (cammeelos): for backward compatibility, we use received time to be removed when not an issue anymore
command-request?
(assoc-in [:content :request-command-ref]
(lookup-response-ref access-scope->commands-responses
@ -116,8 +83,7 @@
(send-message-seen chat-id message-id (and public-key
current-chat?
(not (chat-model/bot-only-chat? db chat-id))
(not (= constants/system from))))
(add-placeholder-messages chat-id from new-timestamp last-from-clock-value last-to-clock-value new-from-clock-value))))
(not (= constants/system from)))))))
(defn receive
[{:keys [chat-id message-id] :as message} cofx]
@ -213,26 +179,25 @@
(defn add-message-type [message {:keys [chat-id group-chat public?]}]
(cond-> message
(not group-chat)
(assoc :message-type :user-message)
(assoc :message-type :user-message)
(and group-chat public?)
(assoc :message-type :public-group-user-message)
(and group-chat (not public?))
(assoc :message-type :group-user-message)))
(defn- prepare-plain-message [chat-id {:keys [identity message-text]}
{:keys [last-to-clock-value last-from-clock-value] :as chat} now]
{:keys [last-clock-value] :as chat} now]
(add-message-type {:chat-id chat-id
:content message-text
:from identity
:content-type constants/text-content-type
:outgoing true
:timestamp now
:to-clock-value (inc last-to-clock-value)
:from-clock-value last-from-clock-value
:clock-value (utils.clocks/send last-clock-value)
:show? true}
chat))
(def ^:private transport-keys [:content :content-type :message-type :to-clock-value :timestamp])
(def ^:private transport-keys [:content :content-type :message-type :clock-value :timestamp])
(defn- upsert-and-send [{:keys [chat-id] :as message} cofx]
(let [send-record (protocol/map->Message (select-keys message transport-keys))
@ -247,7 +212,7 @@
(defn- prepare-command-message
[identity
{:keys [last-to-clock-value last-from-clock-value chat-id] :as chat}
{:keys [last-clock-value chat-id] :as chat}
now
{request-params :params
request-command :command
@ -280,8 +245,7 @@
constants/content-type-command-request
constants/content-type-command))
:outgoing true
:to-clock-value (inc last-to-clock-value)
:from-clock-value last-from-clock-value
:clock-value (utils.clocks/send last-clock-value)
:show? true}
chat)))

View File

@ -91,25 +91,48 @@
(fn [[chats id] [_ k chat-id]]
(get-in chats [(or chat-id id) k])))
(defn- partition-by-datemark
"Reduce step which expects the input list of messages to be sorted by clock value.
It makes best effort to group them by day.
We cannot sort them by :timestamp, as that represents the clock of the sender
and we have no guarantees on the order.
We naively and arbitrarly group them assuming that out-of-order timestamps
fall in the previous bucket.
A sends M1 to B with timestamp 2000-01-01T00:00:00
B replies M2 with timestamp 1999-12-31-23:59:59
M1 needs to be displayed before M2
so we bucket both in 1999-12-31"
[{:keys [acc last-timestamp last-datemark]} {:keys [timestamp datemark] :as msg}]
(if (or (empty? acc) ; initial element
(and (not= last-datemark datemark) ; not the same day
(< timestamp last-timestamp))) ; not out-of-order
{:last-timestamp timestamp
:last-datemark datemark
:acc (conj acc [datemark [msg]])} ; add new datemark group
{:last-timestamp (max timestamp last-timestamp)
:last-datemark last-datemark
:acc (conj (pop acc) (update (peek acc) 1 conj msg))})) ; conj to the last element
(defn message-datemark-groups
"Transforms map of messages into sequence of `[datemark messages]` tuples, where
messages with particular datemark are sorted according to their `:clock-values` and
tuples themeselves are sorted according to the highest `:clock-values` in the messages."
messages with particular datemark are sorted according to their clock-values."
[id->messages]
(let [clock-sorter (juxt :from-clock-value :to-clock-value)
datemark->messages (transduce (comp (map second)
(filter :show?)
(map (fn [{:keys [timestamp] :as msg}]
(assoc msg :datemark (time/day-relative timestamp)))))
(completing (fn [acc {:keys [datemark] :as msg}]
(update acc datemark conj msg)))
{}
id->messages)]
(->> datemark->messages
(map (fn [[datemark messages]]
[datemark (->> messages (sort-by clock-sorter) reverse)]))
(sort-by (comp clock-sorter first second))
reverse)))
(let [sorted-messages (->> id->messages
vals
(sort-by (juxt (comp unchecked-negate :clock-value) :message-id))) ; sort-by clock in reverse order, break ties by :message-id field
remove-hidden-xf (filter :show?)
add-datemark-xf (map (fn [{:keys [timestamp] :as msg}]
(assoc msg :datemark (time/day-relative timestamp))))]
(-> (transduce (comp remove-hidden-xf
add-datemark-xf)
(completing partition-by-datemark)
{:acc []}
sorted-messages)
:acc)))
(reg-sub
:get-chat-message-datemark-groups

View File

@ -90,7 +90,7 @@
(data-store/save (prepare-message (merge default-values
message
{:from (or from "anonymous")
:timestamp (datetime/timestamp)})))))
:received-timestamp (datetime/timestamp)})))))
(re-frame/reg-fx
:data-store/save-message

View File

@ -7,12 +7,10 @@
(:refer-clojure :exclude [exists?]))
(defn- normalize-chat [{:keys [chat-id] :as chat}]
(let [last-to-clock-value (messages/get-last-clock-value chat-id :to-clock-value)
last-from-clock-value (messages/get-last-clock-value chat-id :from-clock-value)]
(let [last-clock-value (messages/get-last-clock-value chat-id)]
(-> chat
(realm/fix-map->vec :contacts)
(merge {:last-to-clock-value (or last-to-clock-value 0)
:last-from-clock-value (or last-from-clock-value 0)}))))
(assoc :last-clock-value (or last-clock-value 0)))))
(defn get-all-active
[]

View File

@ -59,11 +59,11 @@
realm/js-object->clj))
(defn get-last-clock-value
[chat-id clock-prop]
[chat-id]
(-> (realm/get-by-field @realm/account-realm :message :chat-id chat-id)
(realm/sorted clock-prop :desc)
(realm/sorted :clock-value :desc)
(realm/single-clj)
(get clock-prop)))
:clock-value))
(defn get-unviewed
[current-public-key]

View File

@ -7,3 +7,4 @@
(def schemas [{:schema v1/schema
:schemaVersion 1
:migration v1/migration}])

View File

@ -22,9 +22,7 @@
:optional true}
:user-statuses {:type :list
:objectType :user-status}
:from-clock-value {:type :int
:default 0}
:to-clock-value {:type :int
:clock-value {:type :int
:default 0}
:show? {:type :bool
:default true}}})

View File

@ -46,8 +46,8 @@
(deftype MessageHandler []
Object
(tag [this v] "c4")
(rep [this {:keys [content content-type message-type to-clock-value timestamp]}]
#js [content content-type message-type to-clock-value timestamp]))
(rep [this {:keys [content content-type message-type clock-value timestamp]}]
#js [content content-type message-type clock-value timestamp]))
(deftype MessagesSeenHandler []
Object
@ -98,8 +98,8 @@
(v1.contact/ContactRequest. name profile-image address fcm-token))
"c3" (fn [[name profile-image address fcm-token]]
(v1.contact/ContactRequestConfirmed. name profile-image address fcm-token))
"c4" (fn [[content content-type message-type to-clock-value timestamp]]
(v1.protocol/Message. content content-type message-type to-clock-value timestamp))
"c4" (fn [[content content-type message-type clock-value timestamp]]
(v1.protocol/Message. content content-type message-type clock-value timestamp))
"c5" (fn [message-ids]
(v1.protocol/MessagesSeen. message-ids))
"c6" (fn [[name profile-image]]

View File

@ -89,7 +89,7 @@
(send [this cofx chat-id])
(receive [this cofx chat-id sig]))
(defrecord Message [content content-type message-type to-clock-value timestamp]
(defrecord Message [content content-type message-type clock-value timestamp]
message/StatusMessage
(send [this chat-id cofx]
(send {:chat-id chat-id

View File

@ -1,32 +1,94 @@
(ns status-im.utils.clocks)
(ns status-im.utils.clocks
(:require [status-im.utils.datetime :as utils.datetime]))
;; We use Lamport clocks to ensure correct ordering of events in chats. This is
;; necessary because we operate in a distributed system and there is no central
;; coordinator for what happened before what.
;;
;; For example, the last received message in a group chat will appear last,
;; regardless if that person has seen all the previous group chat messages. The
;; principal invariant to maintain is that clock-values should be monotonically
;; increasing.
;; We can't rely uniquely on timestamps as clocks might be different on each device.
;;
;; All clock updates happens as part of sending or receiving a message. Here's
;; the basic algorithm:
;; Received time cannot be used as it does not work with out-of-order messages.
;; If we used received time also each client could potentially have a different
;; ordering of messages, which would lead to some difficult misunderstanding
;; among participants.
;;
;; Sending messages:
;; time = time+1;
;; time_stamp = time;
;; send(message, time_stamp);
;; Lamport timestamps offer a consistent view across client, at the expenses of
;; understanding exactly at what time something has happened.
;; They satisfy the property: if a caused b then T(a) < T(b)
;;
;; Receiving messages:
;; (message, time_stamp) = receive();
;; time = max(time_stamp, time)+1;
;; In chat terms:
;;
;; Any message I send will always be displayed after any message I have seen,
;; including the messages I have sent.
;; This is a necessary condition to have a meaningful conversation with someone
;; and ought to be always true.
;;
;; We need to address another issue here:
;;
;; Even if I don't see all the messages, if I post a message I want that message
;; to be displayed last in a chat.
;;
;; That's were the basic algorithm of Lamport timestamp would fall short, as
;; it's only meant to order causally related events.
;;
;; If I join a public chat and I have not received any messages or I have missed
;; many messages because I was offline, when I post a new message it would be
;; displayed back in the history ( I would have to wait to receive a message
;; to bring my timestamp up-to-date).
;;
;; We cannot completely solve this as there's no way to know what the chat
;; current timestamp is without having to contact other peers ( which might all be offline)
;;
;; But what we can do, is to use our time to make a "bid", hoping that it will
;; beat the current chat-timestamp. So our Lamport timestamp format is:
;; {unix-timestamp-ms}{2-digits-post-id}
;;
;; We always need to make sure we take the max value between the last-clock-value
;; for the chat and the bid-timestamp.
;;
;; This will still satisfy Lamport requirement, namely: a -> b then T(a) < T(b)
;;
;; One way to think of this is as as Lamport timestamps where at every ms
;; an internal event is generated.
;;
;; In whisper v6 any message with a timestamp older than 20 seconds will be discarded.
;;
;; So worst case scenario is:
;; Your clock is 20 seconds behind, you join a public chat where everyone's clock
;; is 20 seconds ahead, you have not received 40s of inflight messages, you
;; publish. drama.
;; Your post will be displayed before any non-received inflight message.
;;
;; Once received the posts you will be able to communicate effectively, much rejoicing.
;; If there are no inflight messages then your post will be last.
;;
;; Posts sent when offline are more troublesome, as they would carry an old
;; timestamp, so the timestamp should be refreshed before retrying.
;;
;; Details:
;; https://en.wikipedia.org/wiki/Lamport_timestamps
;; http://amturing.acm.org/p558-lamport.pdf
(def one-month-in-ms (* 60 60 24 31 1000))
(def post-id-digits 100)
(defn- ->timestamp-bid []
(* (utils.datetime/timestamp) post-id-digits))
; The timestamp has an upper limit of Number.MAX_SAFE_INTEGER
; A malicious client could send a crafted message with timestamp = Number.MAX_SAFE_INTEGER
; which effectively would DoS the chat, as any new message would get
; a timestamp of Number.MAX_SAFE_INTEGER (inc becomes a noop).
; We should never receive messages from untrusted peers with a timestamp greater
; then now + 20s.
; We cap the timestamp to time now + 1 month to give some room for trusted peers
(defn- safe-timestamp [t]
(min t (* (+ one-month-in-ms (utils.datetime/timestamp)) post-id-digits)))
(defn send [local-clock]
(inc (or local-clock 0)))
(inc (max local-clock (->timestamp-bid))))
(defn receive [message-clock local-clock]
(inc (max (or message-clock 0) (or local-clock 0))))
(-> (+ 1000 (max (or message-clock 0) (or local-clock 0)))
safe-timestamp
inc))

View File

@ -62,7 +62,7 @@
(defn day-relative [ms]
(to-str ms
#(.format date-time-fmt %)
#(.format date-fmt %)
#(label :t/datetime-yesterday)
#(label :t/datetime-today)))

View File

@ -0,0 +1,57 @@
(ns status-im.test.chat.subs
(:require [cljs.test :refer-macros [deftest is testing]]
[status-im.chat.subs :as s]))
(defn messages-ordered? [messages]
(let [clock-values (map :clock-value messages)]
(= (-> clock-values sort reverse) clock-values)))
(deftest test-message-datemark-groups
(testing "it orders a map of messages by clock-values when all on the same day (by sender timestamp)"
(let [datemark "Jan 1, 1970"
message-1 {:show? true
:timestamp 0
:clock-value 1}
message-2 {:show? true
:timestamp 0
:clock-value 2}
message-3 {:show? true
:timestamp 0
:clock-value 3}
unordered-messages {2 message-2
1 message-1
3 message-3}
[[actual-datemark actual-messages]] (s/message-datemark-groups unordered-messages)]
(is (= datemark actual-datemark))
(is (= 3 (count actual-messages)))
(is (messages-ordered? actual-messages))))
(testing "it mantains the order even when timestamps are across days"
(let [datemark-day-1 "Jan 1, 2000"
datemark-day-2 "Dec 31, 1999"
message-1 {:show? true
:timestamp 946641600000 ; 1999
:clock-value 1}
message-2 {:show? true
:timestamp 946728000000 ; 2000 this will displayed in 1999
:clock-value 2}
message-3 {:show? true
:timestamp 946641600000 ; 1999
:clock-value 3}
message-4 {:show? true
:timestamp 946728000000 ; 2000
:clock-value 4}
unordered-messages {2 message-2
1 message-1
4 message-4
3 message-3}
[[actual-dm-1 actual-msg-1]
[actual-dm-2 actual-msg-2]] (s/message-datemark-groups unordered-messages)]
(is (= datemark-day-1 actual-dm-1))
(is (= datemark-day-2 actual-dm-2))
(is (= 1 (count actual-msg-1)))
(is (= 3 (count actual-msg-2)))
(is (messages-ordered? (concat actual-msg-1 actual-msg-2))))))

View File

@ -87,6 +87,11 @@
{:from "a", :text "5", :clock-value 8})]
(is (not (ordered-increasing-text? bad-thread))))))
(deftest safe-timestamp
(testing "it caps the timestamp when a value too large is provided"
(is (< (clocks/receive js/Number.MAX_SAFE_INTEGER 0)
js/Number.MAX_SAFE_INTEGER))))
;; Debugging
;;(println "******************************************")
;;(println "A's POV :foo" (format-thread (thread a :foo)))

View File

@ -41,11 +41,11 @@
(deftest day-relative-before-yesterday-us-test
(with-redefs [t/*ms-fn* (constantly epoch-plus-3d)
d/time-zone-offset (t/period :hours 0)
d/date-time-fmt (d/mk-fmt "us" d/short-date-time-format)]
d/date-fmt (d/mk-fmt "us" d/short-date-time-format)]
(is (= (d/day-relative epoch) "Jan 1, 1970, 12:00:00 AM"))))
(deftest day-relative-before-yesterday-nb-test
(with-redefs [t/*ms-fn* (constantly epoch-plus-3d)
d/time-zone-offset (t/period :hours 0)
d/date-time-fmt (d/mk-fmt "nb-NO" d/short-date-time-format)]
d/date-fmt (d/mk-fmt "nb-NO" d/short-date-time-format)]
(is (= (d/day-relative epoch) "1. jan. 1970, 00:00:00"))))