chat, protocol: Introduce Lamport clock semantics for message order

This commit ensures messages are ordered correctly when participants join and
leave a group chat. Specifically, the last received message will appear last.
Previously the user and chat clock was queried and updated in an ad hoc manner.
With this change there are only two clock changes to keep track of:

Sending messages:
time = time+1;
time_stamp = time;
send(message, time_stamp);

Receiving messages:
(message, time_stamp) = receive();
time = max(time_stamp, time)+1;

(See https://en.wikipedia.org/wiki/Lamport_timestamps)

Note that this means we can get rid of all the non-message clock queries and
updates.
This commit is contained in:
Oskar Thorén 2017-08-01 18:52:30 +02:00 committed by Roman Volosovskyi
parent 646f61a406
commit db6b80d529
9 changed files with 140 additions and 110 deletions

View File

@ -457,41 +457,6 @@
(dispatch [:remove-unviewed-messages chat-id])))]
(u/side-effect! send-seen!))
(defn send-clock-value-request!
[{:keys [web3 current-public-key]
:contacts/keys [contacts]} [_ {:keys [message-id from]}]]
(when-not (get-in contacts [from :dapp?])
(protocol/send-clock-value-request!
{:web3 web3
:message {:from current-public-key
:to from
:message-id message-id}})))
(register-handler :send-clock-value-request! (u/side-effect! send-clock-value-request!))
(defn send-clock-value!
[{:keys [web3 current-public-key]} to message-id clock-value]
(when current-public-key
(protocol/send-clock-value! {:web3 web3
:message {:from current-public-key
:to to
:message-id message-id
:clock-value clock-value}})))
(register-handler :update-clock-value!
(after (fn [db [_ to i {:keys [message-id] :as message} last-clock-value]]
(let [clock-value (+ last-clock-value i 1)]
(messages/update (assoc message :clock-value clock-value))
(send-clock-value! db to message-id clock-value))))
(fn [db [_ _ i {:keys [message-id]} last-clock-value]]
(assoc-in db [:message-extras message-id :clock-value] (+ last-clock-value i 1))))
(register-handler :send-clock-value!
(u/side-effect!
(fn [db [_ to message-id]]
(let [{:keys [clock-value]} (messages/get-by-id message-id)]
(send-clock-value! db to message-id clock-value)))))
(register-handler :check-and-open-dapp!
(u/side-effect!
(fn [{:keys [current-chat-id global-commands]

View File

@ -11,7 +11,8 @@
[cljs.reader :refer [read-string]]
[status-im.data-store.chats :as chats]
[status-im.utils.scheduler :as s]
[taoensso.timbre :as log]))
[taoensso.timbre :as log]
[status-im.utils.clocks :as clocks]))
(defn store-message [{chat-id :chat-id :as message}]
(messages/save chat-id (dissoc message :new?)))
@ -24,7 +25,7 @@
(defn add-message
[db {:keys [from group-id chat-id
message-id timestamp clock-value show?]
message-id timestamp clock-value]
:as message
:or {clock-value 0}}]
(let [same-message (messages/get-by-id message-id)
@ -32,10 +33,8 @@
chat-id' (or group-id chat-id from)
exists? (chats/exists? chat-id')
active? (chats/is-active? chat-id')
chat-clock-value (messages/get-last-clock-value chat-id')
clock-value (if (zero? clock-value)
(inc chat-clock-value)
clock-value)]
local-clock (messages/get-last-clock-value chat-id')
clock-new (clocks/receive clock-value local-clock)]
(when (and (not same-message)
(not= from current-identity)
(or (not exists?) active?))
@ -44,7 +43,7 @@
message' (assoc (cu/check-author-direction previous-message message)
:chat-id chat-id'
:timestamp (or timestamp (random/timestamp))
:clock-value clock-value)]
:clock-value clock-new)]
(store-message message')
(dispatch [:upsert-chat! {:chat-id chat-id'
:group-chat group-chat?}])
@ -54,9 +53,7 @@
(dispatch [::set-last-message message'])
(when (= (:content-type message') content-type-command-request)
(dispatch [:add-request chat-id' message']))
(dispatch [:add-unviewed-message chat-id' message-id])
(when-not show?
(dispatch [:send-clock-value-request! message])))
(dispatch [:add-unviewed-message chat-id' message-id]))
(if (and
(= (:content-type message) content-type-command)
(not= chat-id' wallet-chat-id)

View File

@ -19,7 +19,8 @@
[status-im.protocol.core :as protocol]
[taoensso.timbre :refer-macros [debug] :as log]
[status-im.chat.handlers.console :as console]
[status-im.utils.types :as types]))
[status-im.utils.types :as types]
[status-im.utils.clocks :as clocks]))
(defn prepare-command
[identity chat-id clock-value
@ -52,7 +53,7 @@
:to-message to-message
:type (:type command)
:has-handler (:has-handler command)
:clock-value (inc clock-value)
:clock-value (clocks/send clock-value)
:show? true}))
(defn console-command? [chat-id command-name]
@ -177,7 +178,7 @@
:content-type text-content-type
:outgoing true
:timestamp (time/now-ms)
:clock-value (inc clock-value)
:clock-value (clocks/send clock-value)
:show? true})
message'' (cond-> message'
(and group-chat public?)

View File

@ -40,23 +40,3 @@
:requires-ack? false)
(assoc-in [:payload :group-id] (:group-id message))
(dissoc :group-id)))))
(defn send-clock-value-request!
[{:keys [web3 message]}]
(debug :send-clock-value-request message)
(d/add-pending-message!
web3
(merge message-defaults
(assoc message
:type :clock-value-request
:requires-ack? false))))
(defn send-clock-value!
[{:keys [web3 message]}]
(debug :send-clock-value message)
(d/add-pending-message!
web3
(merge message-defaults
(assoc message
:type :clock-value
:requires-ack? false))))

View File

@ -18,8 +18,6 @@
;; user
(def send-message! chat/send!)
(def send-seen! chat/send-seen!)
(def send-clock-value-request! chat/send-clock-value-request!)
(def send-clock-value! chat/send-clock-value!)
(def reset-pending-messages! d/reset-pending-messages!)
;; group

View File

@ -117,8 +117,6 @@
(dispatch [:message-delivered message])
(dispatch [:pending-message-remove message]))
:seen (dispatch [:message-seen message])
:clock-value-request (dispatch [:message-clock-value-request message])
:clock-value (dispatch [:message-clock-value message])
:group-invitation (dispatch [:group-chat-invite-received message])
:update-group (dispatch [:update-group-message message])
:add-group-identity (dispatch [:participant-invited-to-group message])
@ -299,15 +297,6 @@
(assoc message :message-status status))]
(messages/update message)))))))
(defn save-message-clock-value!
[{:keys [message-extras]}
[_ {{:keys [message-id clock-value]} :payload}]]
(when-let [{old-clock-value :clock-value
:as message} (merge (messages/get-by-id message-id)
(get message-extras message-id))]
(if (>= clock-value old-clock-value)
(messages/update (assoc message :clock-value clock-value :show? true)))))
(defn update-message-status [status]
(fn [db
[_ {:keys [from]
@ -346,33 +335,6 @@
[(after (save-message-status! :seen))]
(update-message-status :seen))
(register-handler :message-clock-value-request
(u/side-effect!
(fn [_ [_ {:keys [from] {:keys [message-id]} :payload}]]
(let [{:keys [chat-id]} (messages/get-by-id message-id)
message-overhead (chats/get-message-overhead chat-id)
last-clock-value (messages/get-last-clock-value chat-id)]
(if (pos? message-overhead)
(let [last-outgoing (->> (messages/get-last-outgoing chat-id message-overhead)
(reverse)
(map-indexed vector))]
(chats/reset-message-overhead chat-id)
(doseq [[i message] last-outgoing]
(dispatch [:update-clock-value! from i message (+ last-clock-value 100)])))
(dispatch [:send-clock-value! from message-id]))))))
(register-handler :message-clock-value
(after save-message-clock-value!)
(fn [{:keys [message-extras] :as db}
[_ {{:keys [message-id clock-value]} :payload}]]
(if-let [{old-clock-value :clock-value} (merge (messages/get-by-id message-id)
(get message-extras message-id))]
(if (> clock-value old-clock-value)
(assoc-in db [:message-extras message-id] {:clock-value clock-value
:show? true})
db)
db)))
(register-handler :pending-message-upsert
(after
(fn [_ [_ {:keys [type id] :as pending-message}]]

View File

@ -0,0 +1,32 @@
(ns status-im.utils.clocks)
;; We use Lamport clocks to ensure correct ordering of events in chats. This is
;; necessary because we operate in a distributed system and there is no central
;; coordinator for what happened before what.
;;
;; For example, the last received message in a group chat will appear last,
;; regardless if that person has seen all the previous group chat messages. The
;; principal invariant to maintain is that clock-values should be monotonically
;; increasing.
;;
;; All clock updates happens as part of sending or receiving a message. Here's
;; the basic algorithm:
;;
;; Sending messages:
;; time = time+1;
;; time_stamp = time;
;; send(message, time_stamp);
;;
;; Receiving messages:
;; (message, time_stamp) = receive();
;; time = max(time_stamp, time)+1;
;;
;; Details:
;; https://en.wikipedia.org/wiki/Lamport_timestamps
;; http://amturing.acm.org/p558-lamport.pdf
(defn send [local-clock]
(inc local-clock))
(defn receive [message-clock local-clock]
(inc (max message-clock local-clock)))

View File

@ -4,7 +4,8 @@
[status-im.test.chat.models.input]
[status-im.test.handlers]
[status-im.test.utils.utils]
[status-im.test.utils.money]))
[status-im.test.utils.money]
[status-im.test.utils.clocks]))
(enable-console-print!)
@ -18,4 +19,5 @@
'status-im.test.chat.models.input
'status-im.test.handlers
'status-im.test.utils.utils
'status-im.test.utils.money)
'status-im.test.utils.money
'status-im.test.utils.clocks)

View File

@ -0,0 +1,93 @@
(ns status-im.test.utils.clocks
(:require [cljs.test :refer-macros [deftest is testing]]
[status-im.utils.clocks :as clocks]))
;; Messages are shown on a per-chat basis, ordered by the message clock-value.
;; See status-im-utils.clocks namespace for details.
;; We are not a monolith.
(def a (atom {:identity "a"}))
(def b (atom {:identity "b"}))
(def c (atom {:identity "c"}))
;; The network is unreliable.
(defn random-broadcast! [chat-id message]
(when (> (rand-int 10) 5) (recv! a chat-id message))
(when (> (rand-int 10) 5) (recv! b chat-id message))
(when (> (rand-int 10) 5) (recv! c chat-id message)))
(defn get-last-clock-value
[db chat-id]
(if-let [messages (-> @db :chats chat-id :messages)]
(-> (sort-by :clock-value > messages)
first
:clock-value)
0))
(defn save! [db chat-id message]
(swap! db
(fn [state]
(let [messages (-> state :chats chat-id :messages)]
(assoc-in state [:chats chat-id :messages]
(conj messages message))))))
(defn send! [db chat-id message]
(let [clock-value (get-last-clock-value db chat-id)
prepared-message (assoc message :clock-value (clocks/send clock-value))]
(save! db chat-id prepared-message)
(random-broadcast! chat-id prepared-message)))
(defn recv! [db chat-id {:keys [clock-value] :as message}]
(let [local-clock (get-last-clock-value db chat-id)
new-clock (clocks/receive clock-value local-clock)]
(when-not (= (:from message) (:identity @db))
(save! db chat-id (assoc message :clock-value new-clock)))))
(defn thread [db chat-id]
(let [messages (-> @db :chats chat-id :messages)]
(sort-by :clock-value < messages)))
(defn format-message [{:keys [from text]}]
(str from ": " text ", "))
(defn format-thread [thread]
(apply str (map format-message thread)))
;; Invariant we want to maintain.
(defn ordered-increasing-text? [thread]
(let [xs (map :text thread)]
(or (empty? xs) (apply < xs))))
(defn simulate! []
(send! a :foo {:from "a" :text "1"})
(send! a :foo {:from "a" :text "2"})
(send! a :bar {:from "a" :text "1"})
(send! b :foo {:from "b" :text "3"})
(send! c :foo {:from "c" :text "4"})
(send! a :foo {:from "a" :text "5"})
(send! c :bar {:from "c" :text "7"}))
(deftest clocks
(testing "Message order preserved"
(simulate!)
(is (ordered-increasing-text? (thread a :foo)))
(is (ordered-increasing-text? (thread b :foo)))
(is (ordered-increasing-text? (thread c :foo)))
(is (ordered-increasing-text? (thread a :bar))))
(testing "Bad thread recognized as such"
(let [bad-thread '({:from "a", :text "1", :clock-value 1}
{:from "c", :text "4", :clock-value 1}
{:from "a", :text "2", :clock-value 2}
{:from "a", :text "5", :clock-value 8})]
(is (not (ordered-increasing-text? bad-thread))))))
;; Debugging
;;(println "******************************************")
;;(println "A's POV :foo" (format-thread (thread a :foo)))
;;(println "B's POV :foo" (format-thread (thread b :foo)))
;;(println "C's POV :foo" (format-thread (thread c :foo)))
;;(println "******************************************")