From db6b80d529f3b7512d4831b88246b60a86072b1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oskar=20Thor=C3=A9n?= Date: Tue, 1 Aug 2017 18:52:30 +0200 Subject: [PATCH] chat, protocol: Introduce Lamport clock semantics for message order This commit ensures messages are ordered correctly when participants join and leave a group chat. Specifically, the last received message will appear last. Previously the user and chat clock was queried and updated in an ad hoc manner. With this change there are only two clock changes to keep track of: Sending messages: time = time+1; time_stamp = time; send(message, time_stamp); Receiving messages: (message, time_stamp) = receive(); time = max(time_stamp, time)+1; (See https://en.wikipedia.org/wiki/Lamport_timestamps) Note that this means we can get rid of all the non-message clock queries and updates. --- src/status_im/chat/handlers.cljs | 35 ------- .../chat/handlers/receive_message.cljs | 17 ++-- src/status_im/chat/handlers/send_message.cljs | 7 +- src/status_im/protocol/chat.cljs | 20 ---- src/status_im/protocol/core.cljs | 2 - src/status_im/protocol/handlers.cljs | 38 -------- src/status_im/utils/clocks.cljs | 32 +++++++ test/cljs/status_im/test/runner.cljs | 6 +- test/cljs/status_im/test/utils/clocks.cljs | 93 +++++++++++++++++++ 9 files changed, 140 insertions(+), 110 deletions(-) create mode 100644 src/status_im/utils/clocks.cljs create mode 100644 test/cljs/status_im/test/utils/clocks.cljs diff --git a/src/status_im/chat/handlers.cljs b/src/status_im/chat/handlers.cljs index 50ffa75841..ee3cfb81d1 100644 --- a/src/status_im/chat/handlers.cljs +++ b/src/status_im/chat/handlers.cljs @@ -457,41 +457,6 @@ (dispatch [:remove-unviewed-messages chat-id])))] (u/side-effect! send-seen!)) -(defn send-clock-value-request! - [{:keys [web3 current-public-key] - :contacts/keys [contacts]} [_ {:keys [message-id from]}]] - (when-not (get-in contacts [from :dapp?]) - (protocol/send-clock-value-request! - {:web3 web3 - :message {:from current-public-key - :to from - :message-id message-id}}))) - -(register-handler :send-clock-value-request! (u/side-effect! send-clock-value-request!)) - -(defn send-clock-value! - [{:keys [web3 current-public-key]} to message-id clock-value] - (when current-public-key - (protocol/send-clock-value! {:web3 web3 - :message {:from current-public-key - :to to - :message-id message-id - :clock-value clock-value}}))) - -(register-handler :update-clock-value! - (after (fn [db [_ to i {:keys [message-id] :as message} last-clock-value]] - (let [clock-value (+ last-clock-value i 1)] - (messages/update (assoc message :clock-value clock-value)) - (send-clock-value! db to message-id clock-value)))) - (fn [db [_ _ i {:keys [message-id]} last-clock-value]] - (assoc-in db [:message-extras message-id :clock-value] (+ last-clock-value i 1)))) - -(register-handler :send-clock-value! - (u/side-effect! - (fn [db [_ to message-id]] - (let [{:keys [clock-value]} (messages/get-by-id message-id)] - (send-clock-value! db to message-id clock-value))))) - (register-handler :check-and-open-dapp! (u/side-effect! (fn [{:keys [current-chat-id global-commands] diff --git a/src/status_im/chat/handlers/receive_message.cljs b/src/status_im/chat/handlers/receive_message.cljs index b416fbf3bb..c7df299444 100644 --- a/src/status_im/chat/handlers/receive_message.cljs +++ b/src/status_im/chat/handlers/receive_message.cljs @@ -11,7 +11,8 @@ [cljs.reader :refer [read-string]] [status-im.data-store.chats :as chats] [status-im.utils.scheduler :as s] - [taoensso.timbre :as log])) + [taoensso.timbre :as log] + [status-im.utils.clocks :as clocks])) (defn store-message [{chat-id :chat-id :as message}] (messages/save chat-id (dissoc message :new?))) @@ -24,7 +25,7 @@ (defn add-message [db {:keys [from group-id chat-id - message-id timestamp clock-value show?] + message-id timestamp clock-value] :as message :or {clock-value 0}}] (let [same-message (messages/get-by-id message-id) @@ -32,10 +33,8 @@ chat-id' (or group-id chat-id from) exists? (chats/exists? chat-id') active? (chats/is-active? chat-id') - chat-clock-value (messages/get-last-clock-value chat-id') - clock-value (if (zero? clock-value) - (inc chat-clock-value) - clock-value)] + local-clock (messages/get-last-clock-value chat-id') + clock-new (clocks/receive clock-value local-clock)] (when (and (not same-message) (not= from current-identity) (or (not exists?) active?)) @@ -44,7 +43,7 @@ message' (assoc (cu/check-author-direction previous-message message) :chat-id chat-id' :timestamp (or timestamp (random/timestamp)) - :clock-value clock-value)] + :clock-value clock-new)] (store-message message') (dispatch [:upsert-chat! {:chat-id chat-id' :group-chat group-chat?}]) @@ -54,9 +53,7 @@ (dispatch [::set-last-message message']) (when (= (:content-type message') content-type-command-request) (dispatch [:add-request chat-id' message'])) - (dispatch [:add-unviewed-message chat-id' message-id]) - (when-not show? - (dispatch [:send-clock-value-request! message]))) + (dispatch [:add-unviewed-message chat-id' message-id])) (if (and (= (:content-type message) content-type-command) (not= chat-id' wallet-chat-id) diff --git a/src/status_im/chat/handlers/send_message.cljs b/src/status_im/chat/handlers/send_message.cljs index 6398ef9cb3..84ebc1c032 100644 --- a/src/status_im/chat/handlers/send_message.cljs +++ b/src/status_im/chat/handlers/send_message.cljs @@ -19,7 +19,8 @@ [status-im.protocol.core :as protocol] [taoensso.timbre :refer-macros [debug] :as log] [status-im.chat.handlers.console :as console] - [status-im.utils.types :as types])) + [status-im.utils.types :as types] + [status-im.utils.clocks :as clocks])) (defn prepare-command [identity chat-id clock-value @@ -52,7 +53,7 @@ :to-message to-message :type (:type command) :has-handler (:has-handler command) - :clock-value (inc clock-value) + :clock-value (clocks/send clock-value) :show? true})) (defn console-command? [chat-id command-name] @@ -177,7 +178,7 @@ :content-type text-content-type :outgoing true :timestamp (time/now-ms) - :clock-value (inc clock-value) + :clock-value (clocks/send clock-value) :show? true}) message'' (cond-> message' (and group-chat public?) diff --git a/src/status_im/protocol/chat.cljs b/src/status_im/protocol/chat.cljs index dc9c3208dc..0590c7d9f8 100644 --- a/src/status_im/protocol/chat.cljs +++ b/src/status_im/protocol/chat.cljs @@ -40,23 +40,3 @@ :requires-ack? false) (assoc-in [:payload :group-id] (:group-id message)) (dissoc :group-id))))) - -(defn send-clock-value-request! - [{:keys [web3 message]}] - (debug :send-clock-value-request message) - (d/add-pending-message! - web3 - (merge message-defaults - (assoc message - :type :clock-value-request - :requires-ack? false)))) - -(defn send-clock-value! - [{:keys [web3 message]}] - (debug :send-clock-value message) - (d/add-pending-message! - web3 - (merge message-defaults - (assoc message - :type :clock-value - :requires-ack? false)))) diff --git a/src/status_im/protocol/core.cljs b/src/status_im/protocol/core.cljs index caa1623e03..4a770fb6e5 100644 --- a/src/status_im/protocol/core.cljs +++ b/src/status_im/protocol/core.cljs @@ -18,8 +18,6 @@ ;; user (def send-message! chat/send!) (def send-seen! chat/send-seen!) -(def send-clock-value-request! chat/send-clock-value-request!) -(def send-clock-value! chat/send-clock-value!) (def reset-pending-messages! d/reset-pending-messages!) ;; group diff --git a/src/status_im/protocol/handlers.cljs b/src/status_im/protocol/handlers.cljs index fcf314d2af..a34c9c18ac 100644 --- a/src/status_im/protocol/handlers.cljs +++ b/src/status_im/protocol/handlers.cljs @@ -117,8 +117,6 @@ (dispatch [:message-delivered message]) (dispatch [:pending-message-remove message])) :seen (dispatch [:message-seen message]) - :clock-value-request (dispatch [:message-clock-value-request message]) - :clock-value (dispatch [:message-clock-value message]) :group-invitation (dispatch [:group-chat-invite-received message]) :update-group (dispatch [:update-group-message message]) :add-group-identity (dispatch [:participant-invited-to-group message]) @@ -299,15 +297,6 @@ (assoc message :message-status status))] (messages/update message))))))) -(defn save-message-clock-value! - [{:keys [message-extras]} - [_ {{:keys [message-id clock-value]} :payload}]] - (when-let [{old-clock-value :clock-value - :as message} (merge (messages/get-by-id message-id) - (get message-extras message-id))] - (if (>= clock-value old-clock-value) - (messages/update (assoc message :clock-value clock-value :show? true))))) - (defn update-message-status [status] (fn [db [_ {:keys [from] @@ -346,33 +335,6 @@ [(after (save-message-status! :seen))] (update-message-status :seen)) -(register-handler :message-clock-value-request - (u/side-effect! - (fn [_ [_ {:keys [from] {:keys [message-id]} :payload}]] - (let [{:keys [chat-id]} (messages/get-by-id message-id) - message-overhead (chats/get-message-overhead chat-id) - last-clock-value (messages/get-last-clock-value chat-id)] - (if (pos? message-overhead) - (let [last-outgoing (->> (messages/get-last-outgoing chat-id message-overhead) - (reverse) - (map-indexed vector))] - (chats/reset-message-overhead chat-id) - (doseq [[i message] last-outgoing] - (dispatch [:update-clock-value! from i message (+ last-clock-value 100)]))) - (dispatch [:send-clock-value! from message-id])))))) - -(register-handler :message-clock-value - (after save-message-clock-value!) - (fn [{:keys [message-extras] :as db} - [_ {{:keys [message-id clock-value]} :payload}]] - (if-let [{old-clock-value :clock-value} (merge (messages/get-by-id message-id) - (get message-extras message-id))] - (if (> clock-value old-clock-value) - (assoc-in db [:message-extras message-id] {:clock-value clock-value - :show? true}) - db) - db))) - (register-handler :pending-message-upsert (after (fn [_ [_ {:keys [type id] :as pending-message}]] diff --git a/src/status_im/utils/clocks.cljs b/src/status_im/utils/clocks.cljs new file mode 100644 index 0000000000..3795c7003b --- /dev/null +++ b/src/status_im/utils/clocks.cljs @@ -0,0 +1,32 @@ +(ns status-im.utils.clocks) + +;; We use Lamport clocks to ensure correct ordering of events in chats. This is +;; necessary because we operate in a distributed system and there is no central +;; coordinator for what happened before what. +;; +;; For example, the last received message in a group chat will appear last, +;; regardless if that person has seen all the previous group chat messages. The +;; principal invariant to maintain is that clock-values should be monotonically +;; increasing. +;; +;; All clock updates happens as part of sending or receiving a message. Here's +;; the basic algorithm: +;; +;; Sending messages: +;; time = time+1; +;; time_stamp = time; +;; send(message, time_stamp); +;; +;; Receiving messages: +;; (message, time_stamp) = receive(); +;; time = max(time_stamp, time)+1; +;; +;; Details: +;; https://en.wikipedia.org/wiki/Lamport_timestamps +;; http://amturing.acm.org/p558-lamport.pdf + +(defn send [local-clock] + (inc local-clock)) + +(defn receive [message-clock local-clock] + (inc (max message-clock local-clock))) diff --git a/test/cljs/status_im/test/runner.cljs b/test/cljs/status_im/test/runner.cljs index 954ec0a833..b013a0f964 100644 --- a/test/cljs/status_im/test/runner.cljs +++ b/test/cljs/status_im/test/runner.cljs @@ -4,7 +4,8 @@ [status-im.test.chat.models.input] [status-im.test.handlers] [status-im.test.utils.utils] - [status-im.test.utils.money])) + [status-im.test.utils.money] + [status-im.test.utils.clocks])) (enable-console-print!) @@ -18,4 +19,5 @@ 'status-im.test.chat.models.input 'status-im.test.handlers 'status-im.test.utils.utils - 'status-im.test.utils.money) + 'status-im.test.utils.money + 'status-im.test.utils.clocks) diff --git a/test/cljs/status_im/test/utils/clocks.cljs b/test/cljs/status_im/test/utils/clocks.cljs new file mode 100644 index 0000000000..332cced072 --- /dev/null +++ b/test/cljs/status_im/test/utils/clocks.cljs @@ -0,0 +1,93 @@ +(ns status-im.test.utils.clocks + (:require [cljs.test :refer-macros [deftest is testing]] + [status-im.utils.clocks :as clocks])) + +;; Messages are shown on a per-chat basis, ordered by the message clock-value. +;; See status-im-utils.clocks namespace for details. + +;; We are not a monolith. +(def a (atom {:identity "a"})) +(def b (atom {:identity "b"})) +(def c (atom {:identity "c"})) + +;; The network is unreliable. +(defn random-broadcast! [chat-id message] + (when (> (rand-int 10) 5) (recv! a chat-id message)) + (when (> (rand-int 10) 5) (recv! b chat-id message)) + (when (> (rand-int 10) 5) (recv! c chat-id message))) + +(defn get-last-clock-value + [db chat-id] + (if-let [messages (-> @db :chats chat-id :messages)] + (-> (sort-by :clock-value > messages) + first + :clock-value) + 0)) + +(defn save! [db chat-id message] + (swap! db + (fn [state] + (let [messages (-> state :chats chat-id :messages)] + (assoc-in state [:chats chat-id :messages] + (conj messages message)))))) + +(defn send! [db chat-id message] + (let [clock-value (get-last-clock-value db chat-id) + prepared-message (assoc message :clock-value (clocks/send clock-value))] + (save! db chat-id prepared-message) + (random-broadcast! chat-id prepared-message))) + +(defn recv! [db chat-id {:keys [clock-value] :as message}] + (let [local-clock (get-last-clock-value db chat-id) + new-clock (clocks/receive clock-value local-clock)] + (when-not (= (:from message) (:identity @db)) + (save! db chat-id (assoc message :clock-value new-clock))))) + +(defn thread [db chat-id] + (let [messages (-> @db :chats chat-id :messages)] + (sort-by :clock-value < messages))) + +(defn format-message [{:keys [from text]}] + (str from ": " text ", ")) + +(defn format-thread [thread] + (apply str (map format-message thread))) + +;; Invariant we want to maintain. +(defn ordered-increasing-text? [thread] + (let [xs (map :text thread)] + (or (empty? xs) (apply < xs)))) + +(defn simulate! [] + (send! a :foo {:from "a" :text "1"}) + (send! a :foo {:from "a" :text "2"}) + + (send! a :bar {:from "a" :text "1"}) + + (send! b :foo {:from "b" :text "3"}) + (send! c :foo {:from "c" :text "4"}) + (send! a :foo {:from "a" :text "5"}) + + (send! c :bar {:from "c" :text "7"})) + +(deftest clocks + (testing "Message order preserved" + (simulate!) + (is (ordered-increasing-text? (thread a :foo))) + (is (ordered-increasing-text? (thread b :foo))) + (is (ordered-increasing-text? (thread c :foo))) + (is (ordered-increasing-text? (thread a :bar)))) + + (testing "Bad thread recognized as such" + (let [bad-thread '({:from "a", :text "1", :clock-value 1} + {:from "c", :text "4", :clock-value 1} + {:from "a", :text "2", :clock-value 2} + {:from "a", :text "5", :clock-value 8})] + (is (not (ordered-increasing-text? bad-thread)))))) + + ;; Debugging +;;(println "******************************************") +;;(println "A's POV :foo" (format-thread (thread a :foo))) +;;(println "B's POV :foo" (format-thread (thread b :foo))) +;;(println "C's POV :foo" (format-thread (thread c :foo))) +;;(println "******************************************")