Paginate using clock-value & message-id instead of skip/limit

Paginating using the count of loaded messages might result in some
messages being skipped and not being loaded in the database, in case of
out-of-order messages received.

This commit changes the behavior to sort by `clock-value` and
`message-id`, which gives a consistent sorting.

The initial idea was to use a cursor `clock-value-message-id` and
iterate on that, but realm does not support filtering on string (</>),
so instead we keep track of messages with identical clock-value and
exclude those in the next page query.

The change might result in pages that have duplicates (so messages needs
to be deduped), but won't result in skipped messages.
This commit is contained in:
Andrea Maria Piana 2018-12-17 08:44:19 +01:00
parent fb9c278bd0
commit dfdbe1ccbc
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
3 changed files with 36 additions and 8 deletions

View File

@ -111,9 +111,10 @@
get-stored-user-statuses :get-stored-user-statuses get-stored-user-statuses :get-stored-user-statuses
get-referenced-messages :get-referenced-messages :as cofx}] get-referenced-messages :get-referenced-messages :as cofx}]
(when-not (get-in db [:chats current-chat-id :all-loaded?]) (when-not (get-in db [:chats current-chat-id :all-loaded?])
(let [loaded-count (count (get-in db [:chats current-chat-id :messages])) (let [previous-pagination-info (get-in db [:chats current-chat-id :pagination-info])
{:keys [messages {:keys [messages
all-loaded?]} (get-stored-messages current-chat-id loaded-count) pagination-info
all-loaded?]} (get-stored-messages current-chat-id previous-pagination-info)
already-loaded-messages (get-in db [:chats current-chat-id :messages]) already-loaded-messages (get-in db [:chats current-chat-id :messages])
;; We remove those messages that are already loaded, as we might get some duplicates ;; We remove those messages that are already loaded, as we might get some duplicates
new-messages (remove (comp already-loaded-messages :message-id) new-messages (remove (comp already-loaded-messages :message-id)
@ -132,6 +133,7 @@
(update-in [:chats current-chat-id :message-statuses] merge new-statuses) (update-in [:chats current-chat-id :message-statuses] merge new-statuses)
(update-in [:chats current-chat-id :referenced-messages] (update-in [:chats current-chat-id :referenced-messages]
#(into (apply dissoc % new-message-ids) referenced-messages)) #(into (apply dissoc % new-message-ids) referenced-messages))
(assoc-in [:chats current-chat-id :pagination-info] pagination-info)
(assoc-in [:chats current-chat-id :all-loaded?] (assoc-in [:chats current-chat-id :all-loaded?]
all-loaded?))} all-loaded?))}
(chat-model/update-chats-unviewed-messages-count (chat-model/update-chats-unviewed-messages-count

View File

@ -1,5 +1,7 @@
(ns status-im.data-store.messages (ns status-im.data-store.messages
(:require [re-frame.core :as re-frame] (:require [re-frame.core :as re-frame]
[clojure.set :as clojure.set]
[clojure.string :as string]
[status-im.constants :as constants] [status-im.constants :as constants]
[status-im.data-store.realm.core :as core] [status-im.data-store.realm.core :as core]
[status-im.utils.core :as utils] [status-im.utils.core :as utils]
@ -11,15 +13,36 @@
(update :message-type keyword) (update :message-type keyword)
(assoc :content parsed-content)))) (assoc :content parsed-content))))
(defn- exclude-messages [query message-ids]
(let [string-queries (map #(str "message-id != \"" % "\"") message-ids)]
(core/filtered query (string/join " AND " string-queries))))
(defn- get-by-chat-id (defn- get-by-chat-id
([chat-id] ([chat-id]
(get-by-chat-id chat-id 0)) (get-by-chat-id chat-id nil))
([chat-id from] ([chat-id {:keys [last-clock-value message-ids]}]
(let [messages (-> (core/get-by-field @core/account-realm :message :chat-id chat-id) (let [messages (cond-> (core/get-by-field @core/account-realm :message :chat-id chat-id)
(core/sorted :timestamp :desc) :always (core/multi-field-sorted [["clock-value" true] ["message-id" true]])
(core/page from (+ from constants/default-number-of-messages)) last-clock-value (core/filtered (str "clock-value <= \"" last-clock-value "\""))
(core/all-clj :message))] (seq message-ids) (exclude-messages message-ids)
:always (core/page 0 constants/default-number-of-messages)
:always (core/all-clj :message))
clock-value (-> messages last :clock-value)
new-message-ids (->> messages
(filter #(= clock-value (:clock-value %)))
(map :message-id)
(into #{}))]
{:all-loaded? (> constants/default-number-of-messages (count messages)) {:all-loaded? (> constants/default-number-of-messages (count messages))
;; We paginate using clock-value + message-id to break ties, we need
;; to exclude previously loaded messages with identical clock value
;; otherwise we might fetch exactly the same page if all the messages
;; in a page have the same clock-value. The initial idea was to use a
;; cursor clock-value-message-id but realm does not support </> operators
;; on strings
:pagination-info {:last-clock-value clock-value
:message-ids (if (= clock-value last-clock-value)
(clojure.set/union message-ids new-message-ids)
new-message-ids)}
:messages (keep transform-message messages)}))) :messages (keep transform-message messages)})))
(defn get-message-id-by-old [old-message-id] (defn get-message-id-by-old [old-message-id]

View File

@ -307,6 +307,9 @@
false false
true))) true)))
(defn multi-field-sorted [results fields]
(.sorted results (clj->js fields)))
(defn page [results from to] (defn page [results from to]
(js/Array.prototype.slice.call results from to)) (js/Array.prototype.slice.call results from to))