First stage of realm transactions

This commit is contained in:
janherich 2018-05-02 18:44:58 +02:00
parent cdb95616ac
commit 62a9f26e30
No known key found for this signature in database
GPG Key ID: C23B473AFBE94D13
23 changed files with 328 additions and 418 deletions

View File

@ -20,6 +20,8 @@
[status-im.transport.message.v1.protocol :as protocol]
[status-im.transport.message.v1.public-chat :as public-chat]
[status-im.transport.message.v1.group-chat :as group-chat]
[status-im.data-store.chats :as chats-store]
[status-im.data-store.messages :as messages-store]
status-im.chat.events.commands
status-im.chat.events.requests
status-im.chat.events.send-message
@ -90,7 +92,9 @@
(let [msg-path [:chats chat-id :messages message-id]
new-db (update-in db (conj msg-path :user-statuses) assoc user-id status)]
{:db new-db
:data-store/update-message (-> (get-in new-db msg-path) (select-keys [:message-id :user-statuses]))})))
:data-store/tx [(messages-store/update-message-tx
(-> (get-in new-db msg-path)
(select-keys [:message-id :user-statuses])))]})))
(handlers/register-handler-fx
:transport/set-message-envelope-hash
@ -125,9 +129,10 @@
updated-messages (map (fn [{:keys [from] :as message}]
(assoc-in message [:user-statuses from] :not-sent))
pending-messages)]
{:data-store/update-messages updated-messages
{:data-store/tx [(messages-store/update-messages-tx updated-messages)]
:db (reduce (fn [m {:keys [chat-id message-id from]}]
(assoc-in m [:chats chat-id :messages message-id :user-statuses from] :not-sent))
(assoc-in m [:chats chat-id :messages message-id
:user-statuses from] :not-sent))
db
pending-messages)})))
@ -137,7 +142,7 @@
{:db (-> db
(assoc :current-chat-id constants/console-chat-id)
(update :chats assoc constants/console-chat-id console/chat))
:data-store/save-chat console/chat}))
:data-store/tx [(chats-store/save-chat-tx console/chat)]}))
(defn- add-default-contacts
[{:keys [db default-contacts] :as cofx}]
@ -209,10 +214,11 @@
(defn- persist-seen-messages
[chat-id unseen-messages-ids {:keys [db]}]
{:data-store/update-messages (map (fn [message-id]
{:data-store/tx [(messages-store/update-messages-tx
(map (fn [message-id]
(-> (get-in db [:chats chat-id :messages message-id])
(select-keys [:message-id :user-statuses])))
unseen-messages-ids)})
unseen-messages-ids))]})
(defn- send-messages-seen [chat-id message-ids {:keys [db] :as cofx}]
(when (and (not (get-in db [:chats chat-id :public?]))
@ -294,7 +300,7 @@
(defn start-chat
"Start a chat, making sure it exists"
[chat-id opts {:keys [db] :as cofx}]
; don't allow to open chat with yourself
;; don't allow to open chat with yourself
(when (not= (:current-public-key db) chat-id)
(handlers-macro/merge-fx cofx
(models/upsert-chat {:chat-id chat-id

View File

@ -1,6 +1,8 @@
(ns status-im.chat.models
(:require [status-im.ui.components.styles :as styles]
[status-im.utils.gfycat.core :as gfycat]))
[status-im.utils.gfycat.core :as gfycat]
[status-im.data-store.chats :as chats-store]
[status-im.data-store.messages :as messages-store]))
(defn set-chat-ui-props
"Updates ui-props in active chat by merging provided kvs into them"
@ -34,7 +36,7 @@
(if (:is-active chat)
{:db (update-in db [:chats chat-id] merge chat)
:data-store/save-chat chat}
:data-store/tx [(chats-store/save-chat-tx chat)]}
;; when chat is deleted, don't change anything
{:db db})))
@ -63,15 +65,16 @@
(> timestamp removed-at)
(> timestamp removed-from-at)))
(defn remove-chat [chat-id {:keys [db] :as cofx}]
(defn remove-chat [chat-id {:keys [db now] :as cofx}]
(let [{:keys [chat-id group-chat debug?]} (get-in db [:chats chat-id])]
(if debug?
(-> {:db db}
(update-in [:db :chats] dissoc chat-id)
(assoc :data-store/delete-chat chat-id))
(assoc :data-store/tx [(chats-store/delete-chat-tx chat-id)
(messages-store/delete-messages-tx chat-id)]))
(-> {:db db}
(assoc-in [:db :chats chat-id :is-active] false)
(assoc :data-store/deactivate-chat chat-id)))))
(assoc :data-store/tx [(chats-store/deactivate-chat-tx chat-id now)])))))
(defn bot-only-chat? [db chat-id]
(let [{:keys [group-chat contacts]} (get-in db [:chats chat-id])]

View File

@ -10,11 +10,11 @@
[status-im.utils.handlers-macro :as handlers-macro]
[status-im.transport.utils :as transport.utils]
[status-im.transport.message.core :as transport]
[status-im.transport.message.v1.protocol :as protocol]))
[status-im.transport.message.v1.protocol :as protocol]
[status-im.data-store.messages :as messages-store]))
(def receive-interceptors
[(re-frame/inject-cofx :data-store/get-message)
(re-frame/inject-cofx :random-id)
[(re-frame/inject-cofx :random-id)
re-frame/trim-v])
(defn- lookup-response-ref
@ -45,7 +45,7 @@
(update-in [:chats chat-id :last-clock-value] (partial utils.clocks/receive clock-value))) ; this will increase last-clock-value twice when sending our own messages
(not current-chat?)
(update-in [:chats chat-id :unviewed-messages] (fnil conj #{}) message-id))
:data-store/save-message prepared-message}))
:data-store/tx [(messages-store/save-message-tx prepared-message)]}))
(defn- prepare-chat [chat-id {:keys [db now] :as cofx}]
(chat-model/upsert-chat {:chat-id chat-id
@ -95,7 +95,7 @@
[{:keys [chat-id message-id] :as message} {:keys [now] :as cofx}]
(handlers-macro/merge-fx cofx
(chat-model/upsert-chat {:chat-id chat-id
; We activate a chat again on new messages
;; We activate a chat again on new messages
:is-active true
:timestamp now})
(add-received-message message)
@ -220,7 +220,7 @@
(defn update-message-status [{:keys [chat-id message-id from] :as message} status {:keys [db]}]
(let [updated-message (assoc-in message [:user-statuses from] status)]
{:db (assoc-in db [:chats chat-id :messages message-id] updated-message)
:data-store/update-message updated-message}))
:data-store/tx [(messages-store/update-message-tx updated-message)]}))
(defn resend-message [chat-id message-id cofx]
(let [message (get-in cofx [:db :chats chat-id :messages message-id])
@ -234,7 +234,7 @@
(defn delete-message [chat-id message-id {:keys [db]}]
{:db (update-in db [:chats chat-id :messages] dissoc message-id)
:data-store/delete-message message-id})
:data-store/tx [(messages-store/delete-message-tx message-id)]})
(defn send-message [{:keys [db now random-id] :as cofx} {:keys [chat-id] :as params}]
(upsert-and-send (prepare-plain-message chat-id params (get-in db [:chats chat-id]) now) cofx))
@ -286,7 +286,7 @@
(defn send-command
[{{:keys [current-public-key chats] :as db} :db :keys [now] :as cofx} params]
(let [{{:keys [handler-data to-message command] :as content} :command chat-id :chat-id} params
; We send commands to deleted chats as well, i.e. signed later transactions
;; We send commands to deleted chats as well, i.e. signed later transactions
chat (or (get chats chat-id) {:chat-id chat-id})
request (:request handler-data)]
(handlers-macro/merge-fx cofx

View File

@ -1,42 +1,67 @@
(ns status-im.data-store.chats
(:require [cljs.core.async :as async]
(:require [goog.object :as object]
[cljs.core.async :as async]
[re-frame.core :as re-frame]
[status-im.data-store.realm.core :as core]
[status-im.data-store.realm.chats :as data-store])
(:refer-clojure :exclude [exists?]))
[status-im.data-store.realm.core :as core]))
(defn- normalize-chat [{:keys [chat-id] :as chat}]
(let [last-clock-value (-> (core/get-by-field @core/account-realm
:message :chat-id chat-id)
(core/sorted :clock-value :desc)
(core/single-clj :message)
:clock-value)]
(assoc chat :last-clock-value (or last-clock-value 0))))
(re-frame/reg-cofx
:data-store/all-chats
(fn [cofx _]
(assoc cofx :all-stored-chats (data-store/get-all))))
(assoc cofx :all-stored-chats (map normalize-chat
(-> @core/account-realm
(core/get-all :chat)
(core/sorted :timestamp :desc)
(core/all-clj :chat))))))
(re-frame/reg-fx
:data-store/save-chat
(fn [{:keys [chat-id] :as chat}]
(async/go (async/>! core/realm-queue #(data-store/save chat (data-store/exists? chat-id))))))
(defn save-chat-tx
"Returns tx function for saving chat"
[{:keys [chat-id] :as chat}]
(fn [realm]
(core/create realm :chat chat (core/exists? realm :chat :chat-id chat-id))))
; Only used in debug mode
(re-frame/reg-fx
:data-store/delete-chat
(fn [chat-id]
(async/go (async/>! core/realm-queue #(data-store/delete chat-id)))))
;; Only used in debug mode
(defn delete-chat-tx
"Returns tx function for hard deleting the chat"
[chat-id]
(fn [realm]
(core/delete realm (core/get-by-field realm :chat :chat chat-id))))
(re-frame/reg-fx
:data-store/deactivate-chat
(fn [chat-id]
(async/go (async/>! core/realm-queue #(data-store/set-inactive chat-id)))))
(defn- get-chat-by-id [chat-id realm]
(core/single (core/get-by-field realm :chat :chat-id chat-id)))
(re-frame/reg-fx
:data-store/add-chat-contacts
(fn [[chat-id contacts]]
(async/go (async/>! core/realm-queue #(data-store/add-contacts chat-id contacts)))))
(defn deactivate-chat-tx
"Returns tx function for deactivating chat"
[chat-id now]
(fn [realm]
(let [chat (get-chat-by-id chat-id realm)]
(doto chat
(aset "is-active" false)
(aset "removed-at" now)))))
(re-frame/reg-fx
:data-store/remove-chat-contacts
(fn [[chat-id contacts]]
(async/go (async/>! core/realm-queue #(data-store/remove-contacts chat-id contacts)))))
(defn add-chat-contacts-tx
"Returns tx function for adding chat contacts"
[chat-id contacts]
(fn [realm]
(let [chat (get-chat-by-id chat-id realm)
existing-contacts (object/get chat "contacts")]
(aset chat "contacts"
(clj->js (into #{} (concat contacts
(core/list->clj existing-contacts))))))))
(re-frame/reg-fx
:data-store/save-chat-property
(fn [[chat-id prop value]]
(async/go (async/>! core/realm-queue #(data-store/save-property chat-id prop value)))))
(defn remove-chat-contacts-tx
"Returns tx function for removing chat contacts"
[chat-id contacts]
(fn [realm]
(let [chat (get-chat-by-id chat-id realm)
existing-contacts (object/get chat "contacts")]
(aset chat "contacts"
(clj->js (remove (into #{} contacts)
(core/list->clj existing-contacts)))))))

View File

@ -1,5 +1,8 @@
(ns status-im.data-store.core
(:require status-im.data-store.chats
(:require [cljs.core.async :as async]
[re-frame.core :as re-frame]
[status-im.data-store.realm.core :as data-source]
status-im.data-store.chats
status-im.data-store.messages
status-im.data-store.contacts
status-im.data-store.transport
@ -7,9 +10,7 @@
status-im.data-store.accounts
status-im.data-store.local-storage
status-im.data-store.contact-groups
status-im.data-store.requests
[status-im.data-store.realm.core :as data-source]
[status-im.utils.handlers :as handlers]))
status-im.data-store.requests))
(defn init [encryption-key]
(when-not @data-source/base-realm
@ -18,3 +19,23 @@
(defn change-account [address new-account? encryption-key handler]
(data-source/change-account address new-account? encryption-key handler))
(defn- perform-transactions [transactions realm]
(data-source/write realm #(doseq [transaction transactions]
(transaction realm))))
(re-frame/reg-fx
:data-store/base-tx
(fn [transactions]
(async/go (async/>! data-source/realm-queue
(partial perform-transactions
transactions
@data-source/base-realm)))))
(re-frame/reg-fx
:data-store/tx
(fn [transactions]
(async/go (async/>! data-source/realm-queue
(partial perform-transactions
transactions
@data-source/account-realm)))))

View File

@ -1,20 +1,9 @@
(ns status-im.data-store.messages
(:require [cljs.reader :as reader]
[cljs.core.async :as async]
[re-frame.core :as re-frame]
[status-im.constants :as constants]
[status-im.data-store.realm.core :as core]
[status-im.data-store.realm.messages :as data-store]
[status-im.utils.random :as random]
[status-im.utils.core :as utils]
[status-im.utils.datetime :as datetime]))
;; TODO janherich: define as cofx once debug handlers are refactored
(defn get-log-messages
[chat-id]
(->> (data-store/get-by-chat-id chat-id 0 100)
(filter #(= (:content-type %) constants/content-type-log-message))
(map #(select-keys % [:content :timestamp]))))
[status-im.utils.core :as utils]))
(defn- command-type?
[type]
@ -22,24 +11,35 @@
#{constants/content-type-command constants/content-type-command-request}
type))
(def default-values
{:outgoing false
:to nil})
(defn- transform-message [{:keys [content-type] :as message}]
(cond-> (-> message
(update :message-type keyword)
(update :user-statuses (partial into {}
(map (fn [[_ {:keys [whisper-identity status]}]]
[whisper-identity (keyword status)])))))
(command-type? content-type)
(update :content reader/read-string)))
(re-frame/reg-cofx
:data-store/get-message
(fn [cofx _]
(assoc cofx :get-stored-message data-store/get-by-id)))
(defn get-by-chat-id
(defn- get-by-chat-id
([chat-id]
(get-by-chat-id chat-id 0))
([chat-id from]
(->> (data-store/get-by-chat-id chat-id from constants/default-number-of-messages)
(keep (fn [{:keys [content-type preview] :as message}]
(if (command-type? content-type)
(update message :content reader/read-string)
message))))))
(let [messages (-> (core/get-by-field @core/account-realm :message :chat-id chat-id)
(core/sorted :timestamp :desc)
(core/page from (+ from constants/default-number-of-messages))
(core/all-clj :message))]
(map transform-message messages))))
;; TODO janherich: define as cofx once debug handlers are refactored
(defn get-log-messages
[chat-id]
(->> (get-by-chat-id chat-id 0)
(filter #(= (:content-type %) constants/content-type-log-message))
(map #(select-keys % [:content :timestamp]))))
(def default-values
{:outgoing false
:to nil})
(re-frame/reg-cofx
:data-store/get-messages
@ -49,7 +49,16 @@
(re-frame/reg-cofx
:data-store/message-ids
(fn [cofx _]
(assoc cofx :stored-message-ids (data-store/get-stored-message-ids))))
(assoc cofx :stored-message-ids (let [chat-id->message-id (volatile! {})]
(-> @core/account-realm
(.objects "message")
(.map (fn [msg _ _]
(vswap! chat-id->message-id
#(update %
(aget msg "chat-id")
(fnil conj #{})
(aget msg "message-id"))))))
@chat-id->message-id))))
(re-frame/reg-cofx
:data-store/unviewed-messages
@ -59,7 +68,13 @@
(into {}
(map (fn [[chat-id user-statuses]]
[chat-id (into #{} (map :message-id) user-statuses)]))
(group-by :chat-id (data-store/get-unviewed (:current-public-key db)))))))
(group-by :chat-id
(-> @core/account-realm
(core/get-by-fields
:user-status
:and {:whisper-identity (:current-public-key db)
:status :received})
(core/all-clj :user-status)))))))
(defn- prepare-content [content]
(if (string? content)
@ -84,52 +99,53 @@
prepare-statuses
(utils/update-if-present :content prepare-content)))
(defn save
[{:keys [message-id content from] :as message}]
(when-not (data-store/exists? message-id)
(data-store/save (prepare-message (merge default-values
(defn save-message-tx
"Returns tx function for saving message"
[{:keys [message-id from] :as message}]
(fn [realm]
(when-not (core/exists? realm :message :message-id message-id)
(core/create realm
:message
(prepare-message (merge default-values
message
{:from (or from "anonymous")
:received-timestamp (datetime/timestamp)})))))
(defn delete
{:from (or from "anonymous")}))))))
(defn delete-message-tx
"Returns tx function for deleting message"
[message-id]
(when (data-store/exists? message-id)
(data-store/delete message-id)))
(fn [realm]
(when-let [message (core/single
(core/get-by-field realm :message :message-id message-id))]
(core/delete realm message))))
(re-frame/reg-fx
:data-store/save-message
(fn [message]
(async/go (async/>! core/realm-queue #(save message)))))
(re-frame/reg-fx
:data-store/delete-message
(fn [message-id]
(async/go (async/>! core/realm-queue #(delete message-id)))))
(defn update-message
(defn update-message-tx
"Returns tx function for updating message"
[{:keys [message-id] :as message}]
(when-let [{:keys [chat-id]} (data-store/get-by-id message-id)]
(data-store/save (prepare-message (assoc message :chat-id chat-id)))))
(fn [realm]
(when-let [{:keys [chat-id] :as loaded}
(some-> (core/get-by-field realm :message :message-id message-id)
(core/single-clj :message))]
(core/create realm :message
(prepare-message (assoc message :chat-id chat-id)) true))))
(re-frame/reg-fx
:data-store/update-message
(fn [message]
(async/go (async/>! core/realm-queue #(update-message message)))))
(re-frame/reg-fx
:data-store/update-messages
(fn [messages]
(defn update-messages-tx
"Returns tx function for updating messages"
[messages]
(fn [realm]
(doseq [message messages]
(async/go (async/>! core/realm-queue #(update-message message))))))
((update-message-tx message) realm))))
(re-frame/reg-fx
:data-store/delete-messages
(fn [chat-id]
(async/go (async/>! core/realm-queue #(data-store/delete-by-chat-id chat-id)))))
(defn delete-messages-tx
"Returns tx function for deleting messages with user statuses for given chat-id"
[chat-id]
(fn [realm]
(core/delete realm (core/get-by-field realm :message :chat-id chat-id))
(core/delete realm (core/get-by-field realm :user-status :chat-id chat-id))))
(re-frame/reg-fx
:data-store/hide-messages
(fn [chat-id]
(async/go (async/>! core/realm-queue #(doseq [message-id (data-store/get-message-ids-by-chat-id chat-id)]
(data-store/save {:message-id message-id
:show? false}))))))
(defn hide-messages-tx
"Returns tx function for hiding messages for given chat-id"
[chat-id]
(fn [realm]
(.map (core/get-by-field realm :message :chat-id chat-id)
(fn [msg _ _]
(aset msg "show?" false)))))

View File

@ -16,11 +16,11 @@
(defn delete
[browser-id]
(when-let [browser (realm/single (realm/get-by-field @realm/account-realm :browser :browser-id browser-id))]
(realm/delete @realm/account-realm browser)))
(realm/write @realm/account-realm #(realm/delete @realm/account-realm browser))))
(defn exists?
[browser-id]
(realm/exists? @realm/account-realm :browser {:browser-id browser-id}))
(realm/exists? @realm/account-realm :browser :browser-id browser-id))
(defn get-by-id
[browser-id]

View File

@ -1,84 +0,0 @@
(ns status-im.data-store.realm.chats
(:require [goog.object :as object]
[status-im.data-store.realm.core :as realm]
[status-im.data-store.realm.messages :as messages]
[status-im.utils.datetime :as datetime]
[taoensso.timbre :as log])
(:refer-clojure :exclude [exists?]))
(defn- normalize-chat [{:keys [chat-id] :as chat}]
(let [last-clock-value (messages/get-last-clock-value chat-id)]
(assoc chat :last-clock-value (or last-clock-value 0))))
(defn get-all
[]
(map normalize-chat
(-> @realm/account-realm
(realm/get-all :chat)
(realm/sorted :timestamp :desc)
(realm/all-clj :chat))))
(defn- get-by-id-obj
[chat-id]
(realm/single (realm/get-by-field @realm/account-realm :chat :chat-id chat-id)))
(defn get-by-id
[chat-id]
(-> @realm/account-realm
(realm/get-by-field :chat :chat-id chat-id)
(realm/single-clj :chat)
normalize-chat))
(defn save
[chat update?]
(realm/save @realm/account-realm :chat chat update?))
(defn exists?
[chat-id]
(realm/exists? @realm/account-realm :chat {:chat-id chat-id}))
(defn delete
[chat-id]
(when-let [chat (realm/get-by-field @realm/account-realm :chat :chat-id chat-id)]
(realm/delete @realm/account-realm chat)))
(defn set-inactive
[chat-id]
(when-let [chat (get-by-id-obj chat-id)]
(realm/write @realm/account-realm
(fn []
(doto chat
(aset "is-active" false)
(aset "removed-at" (datetime/timestamp)))))))
(defn add-contacts
[chat-id identities]
(let [chat (get-by-id-obj chat-id)
contacts (object/get chat "contacts")]
(realm/write @realm/account-realm
#(aset chat "contacts"
(clj->js (into #{} (concat identities
(realm/list->clj contacts))))))))
(defn remove-contacts
[chat-id identities]
(let [chat (get-by-id-obj chat-id)
contacts (object/get chat "contacts")]
(realm/write @realm/account-realm
#(aset chat "contacts"
(clj->js (remove (into #{} identities)
(realm/list->clj contacts)))))))
(defn save-property
[chat-id property-name value]
(realm/write @realm/account-realm
(fn []
(-> @realm/account-realm
(realm/get-by-field :chat :chat-id chat-id)
realm/single
(aset (name property-name) value)))))
(defn get-property
[chat-id property]
(when-let [chat (realm/single (realm/get-by-field @realm/account-realm :chat :chat-id chat-id))]
(object/get chat (name property))))

View File

@ -22,14 +22,14 @@
(defn exists?
[group-id]
(realm/exists? @realm/account-realm :contact-group {:group-id group-id}))
(realm/exists? @realm/account-realm :contact-group :group-id group-id))
(defn delete
[group-id]
(when-let [group (-> @realm/account-realm
(realm/get-by-field :contact-group :group-id group-id)
realm/single)]
(realm/delete @realm/account-realm group)))
(realm/write @realm/account-realm #(realm/delete @realm/account-realm group))))
(defn- get-by-id-obj
[group-id]

View File

@ -22,8 +22,9 @@
(defn delete
[{:keys [whisper-identity]}]
(realm/delete @realm/account-realm (get-by-id whisper-identity)))
(realm/write @realm/account-realm
#(realm/delete @realm/account-realm (get-by-id whisper-identity))))
(defn exists?
[whisper-identity]
(realm/exists? @realm/account-realm :contact {:whisper-identity whisper-identity}))
(realm/exists? @realm/account-realm :contact :whisper-identity whisper-identity))

View File

@ -149,7 +149,7 @@
(mapv #(save realm schema-name % update?) objs)))))
(defn delete [realm obj]
(write realm #(.delete realm obj)))
(.delete realm obj))
(defn get-all [realm schema-name]
(.objects realm (name schema-name)))
@ -251,7 +251,7 @@
:or (or-query queries)))))
(defn exists?
"Returns true if object/s identified by schema-name and field values (`:and`)
"Returns true if object/s identified by schema-name and field and value
exists in realm"
[realm schema-name fields]
(pos? (.-length (get-by-fields realm schema-name :and fields))))
[realm schema-name field value]
(pos? (.-length (get-by-field realm schema-name field value))))

View File

@ -1,85 +0,0 @@
(ns status-im.data-store.realm.messages
(:require [status-im.data-store.realm.core :as realm])
(:refer-clojure :exclude [exists?]))
(defn get-all-as-list
[]
(realm/all-clj (realm/get-all @realm/account-realm :message) :message))
(defn- transform-message [message]
(update message :user-statuses
(partial into {}
(map (fn [[_ {:keys [whisper-identity status]}]]
[whisper-identity (keyword status)])))))
(defn get-by-id
[message-id]
(some-> @realm/account-realm
(realm/get-by-field :message :message-id message-id)
(realm/single-clj :message)
transform-message))
(defn get-by-chat-id
([chat-id number-of-messages]
(get-by-chat-id chat-id 0 number-of-messages))
([chat-id from number-of-messages]
(let [messages (-> (realm/get-by-field @realm/account-realm :message :chat-id chat-id)
(realm/sorted :timestamp :desc)
(realm/page from (+ from number-of-messages))
(realm/all-clj :message))]
(mapv transform-message messages))))
(defn get-message-ids-by-chat-id
[chat-id]
(.map (realm/get-by-field @realm/account-realm :message :chat-id chat-id)
(fn [msg _ _]
(aget msg "message-id"))))
(defn get-stored-message-ids
[]
(let [chat-id->message-id (volatile! {})]
(-> @realm/account-realm
(.objects "message")
(.map (fn [msg _ _]
(vswap! chat-id->message-id
#(update %
(aget msg "chat-id")
(fnil conj #{})
(aget msg "message-id"))))))
@chat-id->message-id))
(defn get-last-clock-value
[chat-id]
(-> (realm/get-by-field @realm/account-realm :message :chat-id chat-id)
(realm/sorted :clock-value :desc)
(realm/single-clj :message)
:clock-value))
(defn get-unviewed
[current-public-key]
(-> @realm/account-realm
(realm/get-by-fields :user-status :and {:whisper-identity current-public-key
:status :received})
(realm/all-clj :user-status)))
(defn exists?
[message-id]
(realm/exists? @realm/account-realm :message {:message-id message-id}))
(defn save
[message]
(realm/save @realm/account-realm :message message true))
(defn delete
[message-id]
(let [current-realm @realm/account-realm]
(when-let [message (realm/get-by-field current-realm :message :message-id message-id)]
(realm/delete current-realm message))))
(defn delete-by-chat-id
[chat-id]
(let [current-realm @realm/account-realm]
(realm/delete current-realm
(realm/get-by-field current-realm :message :chat-id chat-id))
(realm/delete current-realm
(realm/get-by-field current-realm :user-status :chat-id chat-id))))

View File

@ -1,20 +0,0 @@
(ns status-im.data-store.realm.transport
(:require [status-im.data-store.realm.core :as realm])
(:refer-clojure :exclude [exists?]))
(defn get-all
[]
(realm/all-clj (realm/get-all @realm/account-realm :transport) :transport))
(defn exists?
[chat-id]
(realm/exists? @realm/account-realm :transport {:chat-id chat-id}))
(defn save
[{:keys [chat-id] :as chat}]
(realm/save @realm/account-realm :transport chat (exists? chat-id)))
(defn delete
[chat-id]
(when-let [chat (realm/single (realm/get-by-field @realm/account-realm :transport :chat-id chat-id))]
(realm/delete @realm/account-realm chat)))

View File

@ -1,8 +1,6 @@
(ns status-im.data-store.transport
(:require [cljs.tools.reader.edn :as edn]
[cljs.core.async :as async]
[re-frame.core :as re-frame]
[status-im.data-store.realm.transport :as data-store]
[status-im.data-store.realm.core :as core]))
(defn deserialize-chat [serialized-chat]
@ -21,23 +19,28 @@
(reduce (fn [acc {:keys [chat-id] :as chat}]
(assoc acc chat-id (deserialize-chat chat)))
{}
(data-store/get-all)))))
(-> @core/account-realm
(core/get-all :transport)
(core/all-clj :transport))))))
(defn save [chat-id chat]
(let [serialized-chat (-> chat
(defn save-transport-tx
"Returns tx function for saving transport"
[{:keys [chat-id chat]}]
(fn [realm]
(core/create realm
:transport
(-> chat
(assoc :chat-id chat-id)
(update :ack pr-str)
(update :seen pr-str)
(update :pending-ack pr-str)
(update :pending-send pr-str))]
(data-store/save serialized-chat)))
(update :pending-send pr-str))
(core/exists? realm :transport :chat-id chat-id))))
(re-frame/reg-fx
:data-store.transport/save
(fn [{:keys [chat-id chat]}]
(async/go (async/>! core/realm-queue #(save chat-id chat)))))
(re-frame/reg-fx
:data-store.transport/delete
(fn [chat-id]
(async/go (async/>! core/realm-queue #(data-store/delete chat-id)))))
(defn delete-transport-tx
"Returns tx function for deleting transport"
[chat-id]
(fn [realm]
(let [transport (core/single
(core/get-by-field realm :transport :chat-id chat-id))]
(core/delete realm transport))))

View File

@ -11,7 +11,8 @@
[status-im.utils.handlers :as handlers]
[status-im.utils.handlers-macro :as handlers-macro]
[status-im.transport.db :as transport.db]
[status-im.transport.utils :as transport.utils]))
[status-im.transport.utils :as transport.utils]
[status-im.data-store.transport :as transport-store]))
(defn init-whisper
"Initialises whisper protocol by:
@ -46,8 +47,8 @@
(let [web3 (:web3 db)
{:keys [topic] :as chat} (get-in db [:transport/chats chat-id])]
{:db (assoc-in db [:transport/chats chat-id :sym-key-id] sym-key-id)
:data-store.transport/save {:chat-id chat-id
:chat (assoc chat :sym-key-id sym-key-id)}
:data-store/tx [(transport-store/save-transport-tx {:chat-id chat-id
:chat (assoc chat :sym-key-id sym-key-id)})]
:shh/add-filter {:web3 web3
:sym-key-id sym-key-id
:topic topic

View File

@ -15,7 +15,8 @@
[status-im.transport.message.core :as message]
[status-im.utils.handlers-macro :as handlers-macro]
[status-im.transport.message.v1.contact :as v1.contact]
[status-im.transport.message.v1.group-chat :as v1.group-chat]))
[status-im.transport.message.v1.group-chat :as v1.group-chat]
[status-im.data-store.transport :as transport-store]))
(handlers/register-handler-fx
:protocol/receive-whisper-message
@ -54,8 +55,8 @@
:sym-key-id sym-key-id
:topic topic
:chat-id chat-id}
:data-store.transport/save {:chat-id chat-id
:chat chat-transport-info}}
:data-store/tx [(transport-store/save-transport-tx {:chat-id chat-id
:chat chat-transport-info})]}
(message/send (v1.contact/NewContactKey. sym-key topic message)
chat-id)))))
@ -73,8 +74,8 @@
:sym-key-id sym-key-id
:topic topic
:chat-id chat-id}
:data-store.transport/save {:chat-id chat-id
:chat chat-transport-info}}
:data-store/tx [(transport-store/save-transport-tx {:chat-id chat-id
:chat chat-transport-info})]}
(message/receive message chat-id chat-id)))))
#_(handlers/register-handler-fx
@ -110,11 +111,12 @@
:sym-key-id sym-key-id
:topic (transport.utils/get-topic chat-id)
:chat-id chat-id}
:data-store.transport/save {:chat-id chat-id
:data-store/tx [(transport-store/save-transport-tx
{:chat-id chat-id
:chat (-> (get-in db [:transport/chats chat-id])
(assoc :sym-key-id sym-key-id)
;;TODO (yenda) remove once go implements persistence
(assoc :sym-key sym-key))}}
(assoc :sym-key sym-key))})]}
(message/send (v1.group-chat/NewGroupKey. chat-id sym-key message) chat-id)))))
(handlers/register-handler-fx
@ -127,11 +129,12 @@
:sym-key-id sym-key-id
:topic (transport.utils/get-topic chat-id)
:chat-id chat-id}
:data-store.transport/save {:chat-id chat-id
:data-store/tx [(transport-store/save-transport-tx
{:chat-id chat-id
:chat (-> (get-in db [:transport/chats chat-id])
(assoc :sym-key-id sym-key-id)
;;TODO (yenda) remove once go implements persistence
(assoc :sym-key sym-key))}}]
(assoc :sym-key sym-key))})]}]
;; if new sym-key is wrapping some message, call receive on it as well, if not just update the transport layer
(if message
(handlers-macro/merge-fx cofx fx (message/receive message chat-id signature))

View File

@ -5,7 +5,8 @@
[status-im.utils.handlers-macro :as handlers-macro]
[status-im.transport.message.core :as message]
[status-im.transport.message.v1.protocol :as protocol]
[status-im.transport.utils :as transport.utils]))
[status-im.transport.utils :as transport.utils]
[status-im.data-store.transport :as transport-store]))
(defn- has-already-joined? [chat-id {:keys [db]}]
(get-in db [:transport/chats chat-id]))
@ -34,10 +35,11 @@
:sym-key-id sym-key-id
:topic topic
:chat-id chat-id}
:data-store.transport/save {:chat-id chat-id
:data-store/tx [(transport-store/save-transport-tx
{:chat-id chat-id
:chat (-> (get-in db [:transport/chats chat-id])
(assoc :sym-key-id sym-key-id)
;;TODO (yenda) remove once go implements persistence
(assoc :sym-key sym-key))}
(assoc :sym-key sym-key))})]
:dispatch [:inbox/request-messages {:topics [topic]
:from 0}]})))

View File

@ -3,14 +3,15 @@
(:require [cljs-time.coerce :refer [to-long]]
[cljs-time.core :refer [now]]
[clojure.string :as string]
[status-im.js-dependencies :as dependencies]))
[status-im.js-dependencies :as dependencies]
[status-im.data-store.transport :as transport-store]))
(defn unsubscribe-from-chat
"Unsubscribe from chat on transport layer"
[chat-id {:keys [db]}]
(let [filter (get-in db [:transport/chats chat-id :filter])]
{:db (update db :transport/chats dissoc chat-id)
:data-store.transport/delete chat-id
:data-store/tx [(transport-store/delete-transport-tx chat-id)]
:shh/remove-filter filter}))
(defn from-utf8 [s]

View File

@ -6,7 +6,9 @@
[status-im.transport.message.v1.group-chat :as group-chat]
[status-im.transport.message.core :as transport]
[status-im.utils.handlers :as handlers]
[status-im.utils.handlers-macro :as handlers-macro]))
[status-im.utils.handlers-macro :as handlers-macro]
[status-im.data-store.chats :as chats-store]
[status-im.data-store.messages :as messages-store]))
;;;; Handlers
@ -30,7 +32,7 @@
{:db (-> db
(assoc-in [:chats current-chat-id :contacts] participants)
(assoc :selected-participants #{}))
:data-store/add-chat-contacts (select-keys db [:current-chat-id :selected-participants])}
:data-store/tx [(chats-store/add-chat-contacts-tx current-chat-id selected-participants)]}
(models.message/receive
(models.message/system-message current-chat-id message-id now
(str "You've added " (apply str (interpose ", " added-participants-names)))))
@ -45,7 +47,7 @@
removed-participants-names (map #(get-in contacts [% :name]) removed-participants)]
(handlers-macro/merge-fx cofx
{:db (assoc-in db [:chats current-chat-id :contacts] participants)
:data-store/remove-chat-contacts [current-chat-id removed-participants]}
:data-store/tx [(chats-store/remove-chat-contacts-tx current-chat-id removed-participants)]}
(models.message/receive
(models.message/system-message current-chat-id message-id now
(str "You've removed " (apply str (interpose ", " removed-participants-names)))))
@ -55,13 +57,14 @@
:set-group-chat-name
(fn [{{:keys [current-chat-id] :as db} :db} [_ new-chat-name]]
{:db (assoc-in db [:chats current-chat-id :name] new-chat-name)
:data-store/save-chat-property [current-chat-id :name new-chat-name]}))
:data-store/tx [(chats-store/save-chat-tx {:chat-id current-chat-id
:name new-chat-name})]}))
(handlers/register-handler-fx
:clear-history
(fn [{{:keys [current-chat-id] :as db} :db} _]
{:db (assoc-in db [:chats current-chat-id :messages] {})
:data-store/hide-messages current-chat-id}))
:data-store/tx [(messages-store/hide-messages-tx current-chat-id)]}))
(handlers/register-handler-fx
:clear-history?

View File

@ -1,14 +1,19 @@
(ns status-im.ui.screens.group.core)
(ns status-im.ui.screens.group.core
(:require [status-im.data-store.chats :as chats-store]))
(defn participants-added [chat-id added-participants-set {:keys [db] :as cofx}]
(when (seq added-participants-set)
{:db (update-in db [:chats chat-id :contacts] concat added-participants-set)
:data-store/add-chat-contacts [chat-id added-participants-set]}))
{:db (update-in db [:chats chat-id :contacts]
concat added-participants-set)
:data-store/tx [(chats-store/add-chat-contacts-tx
chat-id added-participants-set)]}))
(defn participants-removed [chat-id removed-participants-set {:keys [now db] :as cofx}]
(when (seq removed-participants-set)
(let [{:keys [is-active timestamp]} (get-in db [:chats chat-id])]
;;TODO: not sure what this condition is for
(when (and is-active (>= now timestamp))
{:db (update-in db [:chats chat-id :contacts] (partial remove removed-participants-set))
:data-store/remove-chat-contacts [chat-id removed-participants-set]}))))
{:db (update-in db [:chats chat-id :contacts]
(partial remove removed-participants-set))
:data-store/tx [(chats-store/remove-chat-contacts-tx
chat-id removed-participants-set)]}))))

View File

@ -24,7 +24,9 @@
The next forms are functions applying effects and returning a map of effects.
The macro ensures that updates to db are passed from function to function within the cofx :db key and
that only a :merging-fx-with-common-keys effect is returned if some functions are trying
to produce the same effects (excepted :db effect)"
to produce the same effects (excepted :db, :data-source/tx and :data-source/base-tx effects).
:data-source/tx and :data-source/base-tx effects are handled specially and their results
(list of transactions) are compacted to one transactions list (for each effect). "
{:added "1.0"}
[cofx & forms]
(let [form (first forms)]

View File

@ -1,16 +1,23 @@
(ns status-im.utils.handlers-macro
(:require-macros status-im.utils.handlers-macro))
(:require-macros status-im.utils.handlers-macro)
(:require [clojure.set :as set]))
(defn update-db [cofx fx]
(if-let [db (:db fx)]
(assoc cofx :db db)
cofx))
(def ^:private tx-keys #{:data-store/tx :data-store/base-tx})
(defn safe-merge [fx new-fx]
(if (:merging-fx-with-common-keys fx)
fx
(let [common-keys (clojure.set/intersection (into #{} (keys fx))
(let [common-keys (set/intersection (into #{} (keys fx))
(into #{} (keys new-fx)))]
(if (empty? (disj common-keys :db))
(merge fx new-fx)
(if (empty? (set/difference common-keys (conj tx-keys :db)))
(merge (apply dissoc fx tx-keys)
(apply dissoc new-fx tx-keys)
(merge-with into
(select-keys fx tx-keys)
(select-keys new-fx tx-keys)))
{:merging-fx-with-common-keys common-keys}))))

View File

@ -13,7 +13,7 @@
{:name contact-name}}}}
response (chat/upsert-chat chat-props cofx)
actual-chat (get-in response [:db :chats chat-id])
store-chat-fx (:data-store/save-chat response)]
store-chat-fx (:data-store/tx response)]
(testing "it adds the chat to the chats collection"
(is actual-chat))
(testing "it adds the extra props"
@ -37,7 +37,7 @@
:name "old-name"}}}}
response (chat/upsert-chat chat-props cofx)
actual-chat (get-in response [:db :chats chat-id])
store-chat-fx (:data-store/save-chat response)]
store-chat-fx (:data-store/tx response)]
(testing "it adds the chat to the chats collection"
(is actual-chat))
(testing "it adds the extra props"
@ -66,7 +66,7 @@
admin "admin"
participants ["a"]
fx (chat/add-group-chat chat-id chat-name admin participants {})
store-fx (:data-store/save-chat fx)
store-fx (:data-store/tx fx)
group-chat (get-in fx [:db :chats chat-id])]
(testing "it saves the chat in the database"
(is store-fx))
@ -86,7 +86,7 @@
(deftest add-public-chat
(let [topic "topic"
fx (chat/add-public-chat topic {})
store-fx (:data-store/save-chat fx)
store-fx (:data-store/tx fx)
chat (get-in fx [:db :chats topic])]
(testing "it saves the chat in the database"
(is store-fx))