diff --git a/src/status_im/chat/handlers.cljs b/src/status_im/chat/handlers.cljs index 50ffa75841..ee3cfb81d1 100644 --- a/src/status_im/chat/handlers.cljs +++ b/src/status_im/chat/handlers.cljs @@ -457,41 +457,6 @@ (dispatch [:remove-unviewed-messages chat-id])))] (u/side-effect! send-seen!)) -(defn send-clock-value-request! - [{:keys [web3 current-public-key] - :contacts/keys [contacts]} [_ {:keys [message-id from]}]] - (when-not (get-in contacts [from :dapp?]) - (protocol/send-clock-value-request! - {:web3 web3 - :message {:from current-public-key - :to from - :message-id message-id}}))) - -(register-handler :send-clock-value-request! (u/side-effect! send-clock-value-request!)) - -(defn send-clock-value! - [{:keys [web3 current-public-key]} to message-id clock-value] - (when current-public-key - (protocol/send-clock-value! {:web3 web3 - :message {:from current-public-key - :to to - :message-id message-id - :clock-value clock-value}}))) - -(register-handler :update-clock-value! - (after (fn [db [_ to i {:keys [message-id] :as message} last-clock-value]] - (let [clock-value (+ last-clock-value i 1)] - (messages/update (assoc message :clock-value clock-value)) - (send-clock-value! db to message-id clock-value)))) - (fn [db [_ _ i {:keys [message-id]} last-clock-value]] - (assoc-in db [:message-extras message-id :clock-value] (+ last-clock-value i 1)))) - -(register-handler :send-clock-value! - (u/side-effect! - (fn [db [_ to message-id]] - (let [{:keys [clock-value]} (messages/get-by-id message-id)] - (send-clock-value! db to message-id clock-value))))) - (register-handler :check-and-open-dapp! (u/side-effect! (fn [{:keys [current-chat-id global-commands] diff --git a/src/status_im/chat/handlers/receive_message.cljs b/src/status_im/chat/handlers/receive_message.cljs index b416fbf3bb..c7df299444 100644 --- a/src/status_im/chat/handlers/receive_message.cljs +++ b/src/status_im/chat/handlers/receive_message.cljs @@ -11,7 +11,8 @@ [cljs.reader :refer [read-string]] [status-im.data-store.chats :as chats] [status-im.utils.scheduler :as s] - [taoensso.timbre :as log])) + [taoensso.timbre :as log] + [status-im.utils.clocks :as clocks])) (defn store-message [{chat-id :chat-id :as message}] (messages/save chat-id (dissoc message :new?))) @@ -24,7 +25,7 @@ (defn add-message [db {:keys [from group-id chat-id - message-id timestamp clock-value show?] + message-id timestamp clock-value] :as message :or {clock-value 0}}] (let [same-message (messages/get-by-id message-id) @@ -32,10 +33,8 @@ chat-id' (or group-id chat-id from) exists? (chats/exists? chat-id') active? (chats/is-active? chat-id') - chat-clock-value (messages/get-last-clock-value chat-id') - clock-value (if (zero? clock-value) - (inc chat-clock-value) - clock-value)] + local-clock (messages/get-last-clock-value chat-id') + clock-new (clocks/receive clock-value local-clock)] (when (and (not same-message) (not= from current-identity) (or (not exists?) active?)) @@ -44,7 +43,7 @@ message' (assoc (cu/check-author-direction previous-message message) :chat-id chat-id' :timestamp (or timestamp (random/timestamp)) - :clock-value clock-value)] + :clock-value clock-new)] (store-message message') (dispatch [:upsert-chat! {:chat-id chat-id' :group-chat group-chat?}]) @@ -54,9 +53,7 @@ (dispatch [::set-last-message message']) (when (= (:content-type message') content-type-command-request) (dispatch [:add-request chat-id' message'])) - (dispatch [:add-unviewed-message chat-id' message-id]) - (when-not show? - (dispatch [:send-clock-value-request! message]))) + (dispatch [:add-unviewed-message chat-id' message-id])) (if (and (= (:content-type message) content-type-command) (not= chat-id' wallet-chat-id) diff --git a/src/status_im/chat/handlers/send_message.cljs b/src/status_im/chat/handlers/send_message.cljs index 6398ef9cb3..84ebc1c032 100644 --- a/src/status_im/chat/handlers/send_message.cljs +++ b/src/status_im/chat/handlers/send_message.cljs @@ -19,7 +19,8 @@ [status-im.protocol.core :as protocol] [taoensso.timbre :refer-macros [debug] :as log] [status-im.chat.handlers.console :as console] - [status-im.utils.types :as types])) + [status-im.utils.types :as types] + [status-im.utils.clocks :as clocks])) (defn prepare-command [identity chat-id clock-value @@ -52,7 +53,7 @@ :to-message to-message :type (:type command) :has-handler (:has-handler command) - :clock-value (inc clock-value) + :clock-value (clocks/send clock-value) :show? true})) (defn console-command? [chat-id command-name] @@ -177,7 +178,7 @@ :content-type text-content-type :outgoing true :timestamp (time/now-ms) - :clock-value (inc clock-value) + :clock-value (clocks/send clock-value) :show? true}) message'' (cond-> message' (and group-chat public?) diff --git a/src/status_im/protocol/chat.cljs b/src/status_im/protocol/chat.cljs index dc9c3208dc..0590c7d9f8 100644 --- a/src/status_im/protocol/chat.cljs +++ b/src/status_im/protocol/chat.cljs @@ -40,23 +40,3 @@ :requires-ack? false) (assoc-in [:payload :group-id] (:group-id message)) (dissoc :group-id))))) - -(defn send-clock-value-request! - [{:keys [web3 message]}] - (debug :send-clock-value-request message) - (d/add-pending-message! - web3 - (merge message-defaults - (assoc message - :type :clock-value-request - :requires-ack? false)))) - -(defn send-clock-value! - [{:keys [web3 message]}] - (debug :send-clock-value message) - (d/add-pending-message! - web3 - (merge message-defaults - (assoc message - :type :clock-value - :requires-ack? false)))) diff --git a/src/status_im/protocol/core.cljs b/src/status_im/protocol/core.cljs index caa1623e03..4a770fb6e5 100644 --- a/src/status_im/protocol/core.cljs +++ b/src/status_im/protocol/core.cljs @@ -18,8 +18,6 @@ ;; user (def send-message! chat/send!) (def send-seen! chat/send-seen!) -(def send-clock-value-request! chat/send-clock-value-request!) -(def send-clock-value! chat/send-clock-value!) (def reset-pending-messages! d/reset-pending-messages!) ;; group diff --git a/src/status_im/protocol/handlers.cljs b/src/status_im/protocol/handlers.cljs index fcf314d2af..a34c9c18ac 100644 --- a/src/status_im/protocol/handlers.cljs +++ b/src/status_im/protocol/handlers.cljs @@ -117,8 +117,6 @@ (dispatch [:message-delivered message]) (dispatch [:pending-message-remove message])) :seen (dispatch [:message-seen message]) - :clock-value-request (dispatch [:message-clock-value-request message]) - :clock-value (dispatch [:message-clock-value message]) :group-invitation (dispatch [:group-chat-invite-received message]) :update-group (dispatch [:update-group-message message]) :add-group-identity (dispatch [:participant-invited-to-group message]) @@ -299,15 +297,6 @@ (assoc message :message-status status))] (messages/update message))))))) -(defn save-message-clock-value! - [{:keys [message-extras]} - [_ {{:keys [message-id clock-value]} :payload}]] - (when-let [{old-clock-value :clock-value - :as message} (merge (messages/get-by-id message-id) - (get message-extras message-id))] - (if (>= clock-value old-clock-value) - (messages/update (assoc message :clock-value clock-value :show? true))))) - (defn update-message-status [status] (fn [db [_ {:keys [from] @@ -346,33 +335,6 @@ [(after (save-message-status! :seen))] (update-message-status :seen)) -(register-handler :message-clock-value-request - (u/side-effect! - (fn [_ [_ {:keys [from] {:keys [message-id]} :payload}]] - (let [{:keys [chat-id]} (messages/get-by-id message-id) - message-overhead (chats/get-message-overhead chat-id) - last-clock-value (messages/get-last-clock-value chat-id)] - (if (pos? message-overhead) - (let [last-outgoing (->> (messages/get-last-outgoing chat-id message-overhead) - (reverse) - (map-indexed vector))] - (chats/reset-message-overhead chat-id) - (doseq [[i message] last-outgoing] - (dispatch [:update-clock-value! from i message (+ last-clock-value 100)]))) - (dispatch [:send-clock-value! from message-id])))))) - -(register-handler :message-clock-value - (after save-message-clock-value!) - (fn [{:keys [message-extras] :as db} - [_ {{:keys [message-id clock-value]} :payload}]] - (if-let [{old-clock-value :clock-value} (merge (messages/get-by-id message-id) - (get message-extras message-id))] - (if (> clock-value old-clock-value) - (assoc-in db [:message-extras message-id] {:clock-value clock-value - :show? true}) - db) - db))) - (register-handler :pending-message-upsert (after (fn [_ [_ {:keys [type id] :as pending-message}]] diff --git a/src/status_im/utils/clocks.cljs b/src/status_im/utils/clocks.cljs new file mode 100644 index 0000000000..3795c7003b --- /dev/null +++ b/src/status_im/utils/clocks.cljs @@ -0,0 +1,32 @@ +(ns status-im.utils.clocks) + +;; We use Lamport clocks to ensure correct ordering of events in chats. This is +;; necessary because we operate in a distributed system and there is no central +;; coordinator for what happened before what. +;; +;; For example, the last received message in a group chat will appear last, +;; regardless if that person has seen all the previous group chat messages. The +;; principal invariant to maintain is that clock-values should be monotonically +;; increasing. +;; +;; All clock updates happens as part of sending or receiving a message. Here's +;; the basic algorithm: +;; +;; Sending messages: +;; time = time+1; +;; time_stamp = time; +;; send(message, time_stamp); +;; +;; Receiving messages: +;; (message, time_stamp) = receive(); +;; time = max(time_stamp, time)+1; +;; +;; Details: +;; https://en.wikipedia.org/wiki/Lamport_timestamps +;; http://amturing.acm.org/p558-lamport.pdf + +(defn send [local-clock] + (inc local-clock)) + +(defn receive [message-clock local-clock] + (inc (max message-clock local-clock))) diff --git a/test/cljs/status_im/test/runner.cljs b/test/cljs/status_im/test/runner.cljs index 954ec0a833..b013a0f964 100644 --- a/test/cljs/status_im/test/runner.cljs +++ b/test/cljs/status_im/test/runner.cljs @@ -4,7 +4,8 @@ [status-im.test.chat.models.input] [status-im.test.handlers] [status-im.test.utils.utils] - [status-im.test.utils.money])) + [status-im.test.utils.money] + [status-im.test.utils.clocks])) (enable-console-print!) @@ -18,4 +19,5 @@ 'status-im.test.chat.models.input 'status-im.test.handlers 'status-im.test.utils.utils - 'status-im.test.utils.money) + 'status-im.test.utils.money + 'status-im.test.utils.clocks) diff --git a/test/cljs/status_im/test/utils/clocks.cljs b/test/cljs/status_im/test/utils/clocks.cljs new file mode 100644 index 0000000000..332cced072 --- /dev/null +++ b/test/cljs/status_im/test/utils/clocks.cljs @@ -0,0 +1,93 @@ +(ns status-im.test.utils.clocks + (:require [cljs.test :refer-macros [deftest is testing]] + [status-im.utils.clocks :as clocks])) + +;; Messages are shown on a per-chat basis, ordered by the message clock-value. +;; See status-im-utils.clocks namespace for details. + +;; We are not a monolith. +(def a (atom {:identity "a"})) +(def b (atom {:identity "b"})) +(def c (atom {:identity "c"})) + +;; The network is unreliable. +(defn random-broadcast! [chat-id message] + (when (> (rand-int 10) 5) (recv! a chat-id message)) + (when (> (rand-int 10) 5) (recv! b chat-id message)) + (when (> (rand-int 10) 5) (recv! c chat-id message))) + +(defn get-last-clock-value + [db chat-id] + (if-let [messages (-> @db :chats chat-id :messages)] + (-> (sort-by :clock-value > messages) + first + :clock-value) + 0)) + +(defn save! [db chat-id message] + (swap! db + (fn [state] + (let [messages (-> state :chats chat-id :messages)] + (assoc-in state [:chats chat-id :messages] + (conj messages message)))))) + +(defn send! [db chat-id message] + (let [clock-value (get-last-clock-value db chat-id) + prepared-message (assoc message :clock-value (clocks/send clock-value))] + (save! db chat-id prepared-message) + (random-broadcast! chat-id prepared-message))) + +(defn recv! [db chat-id {:keys [clock-value] :as message}] + (let [local-clock (get-last-clock-value db chat-id) + new-clock (clocks/receive clock-value local-clock)] + (when-not (= (:from message) (:identity @db)) + (save! db chat-id (assoc message :clock-value new-clock))))) + +(defn thread [db chat-id] + (let [messages (-> @db :chats chat-id :messages)] + (sort-by :clock-value < messages))) + +(defn format-message [{:keys [from text]}] + (str from ": " text ", ")) + +(defn format-thread [thread] + (apply str (map format-message thread))) + +;; Invariant we want to maintain. +(defn ordered-increasing-text? [thread] + (let [xs (map :text thread)] + (or (empty? xs) (apply < xs)))) + +(defn simulate! [] + (send! a :foo {:from "a" :text "1"}) + (send! a :foo {:from "a" :text "2"}) + + (send! a :bar {:from "a" :text "1"}) + + (send! b :foo {:from "b" :text "3"}) + (send! c :foo {:from "c" :text "4"}) + (send! a :foo {:from "a" :text "5"}) + + (send! c :bar {:from "c" :text "7"})) + +(deftest clocks + (testing "Message order preserved" + (simulate!) + (is (ordered-increasing-text? (thread a :foo))) + (is (ordered-increasing-text? (thread b :foo))) + (is (ordered-increasing-text? (thread c :foo))) + (is (ordered-increasing-text? (thread a :bar)))) + + (testing "Bad thread recognized as such" + (let [bad-thread '({:from "a", :text "1", :clock-value 1} + {:from "c", :text "4", :clock-value 1} + {:from "a", :text "2", :clock-value 2} + {:from "a", :text "5", :clock-value 8})] + (is (not (ordered-increasing-text? bad-thread)))))) + + ;; Debugging +;;(println "******************************************") +;;(println "A's POV :foo" (format-thread (thread a :foo))) +;;(println "B's POV :foo" (format-thread (thread b :foo))) +;;(println "C's POV :foo" (format-thread (thread c :foo))) +;;(println "******************************************")