Read at startup & write through async queues

This commit is contained in:
janherich 2018-01-28 12:23:51 +01:00
parent 8e7bf6eab4
commit 80fb8dde8b
No known key found for this signature in database
GPG Key ID: C23B473AFBE94D13
32 changed files with 424 additions and 455 deletions

View File

@ -1,9 +1,11 @@
(ns status-im.chat.events
(:require [cljs.core.async :as async]
(:require [clojure.set :as set]
[cljs.core.async :as async]
[re-frame.core :as re-frame]
[taoensso.timbre :as log]
[status-im.utils.handlers :as handlers]
[status-im.utils.gfycat.core :as gfycat]
[status-im.utils.async :as async-utils]
[status-im.chat.models :as model]
[status-im.chat.console :as console-chat]
[status-im.chat.constants :as chat-const]
@ -13,6 +15,7 @@
[status-im.data-store.contacts :as contacts-store]
[status-im.data-store.requests :as requests-store]
[status-im.data-store.messages :as messages-store]
[status-im.data-store.pending-messages :as pending-messages-store]
[status-im.ui.screens.navigation :as navigation]
[status-im.protocol.core :as protocol]
[status-im.constants :as const]
@ -46,6 +49,11 @@
(fn [cofx _]
(assoc cofx :get-stored-messages messages-store/get-by-chat-id)))
(re-frame/reg-cofx
:stored-message-ids
(fn [cofx _]
(assoc cofx :stored-message-ids (messages-store/get-stored-message-ids))))
(re-frame/reg-cofx
:all-stored-chats
(fn [cofx _]
@ -57,38 +65,50 @@
(assoc cofx :get-stored-chat chats-store/get-by-id)))
(re-frame/reg-cofx
:message-exists?
(fn [cofx]
(assoc cofx :message-exists? messages-store/exists?)))
(re-frame/reg-cofx
:get-last-clock-value
(fn [cofx]
(assoc cofx :get-last-clock-value messages-store/get-last-clock-value)))
:inactive-chat-ids
(fn [cofx _]
(assoc cofx :inactive-chat-ids (chats-store/get-inactive-ids))))
;;;; Effects
(def ^:private update-message-queue (async/chan 100))
(async/go-loop [message (async/<! update-message-queue)]
(when message
(messages-store/update-message message)
(recur (async/<! update-message-queue))))
(def ^:private realm-queue (async-utils/task-queue 200))
(re-frame/reg-fx
:update-message
(fn [message]
(async/put! update-message-queue message)))
(async/put! realm-queue #(messages-store/update-message message))))
(re-frame/reg-fx
:save-message
(fn [message]
(messages-store/save message)))
(async/put! realm-queue #(messages-store/save message))))
(re-frame/reg-fx
:delete-chat-messages
(fn [{:keys [chat-id group-chat debug?]}]
(when (or group-chat debug?)
(async/put! realm-queue #(messages-store/delete-by-chat-id chat-id)))
(async/put! realm-queue #(pending-messages-store/delete-all-by-chat-id chat-id))))
(re-frame/reg-fx
:update-message-overhead
(fn [[chat-id network-status]]
(let [update-fn (if (= network-status :offline)
chats-store/inc-message-overhead
chats-store/reset-message-overhead)]
(async/put! realm-queue #(update-fn chat-id)))))
(re-frame/reg-fx
:save-chat
(fn [chat]
(chats-store/save chat)))
(async/put! realm-queue #(chats-store/save chat))))
(re-frame/reg-fx
:delete-chat
(fn [{:keys [chat-id debug?]}]
(if debug?
(async/put! realm-queue #(chats-store/delete chat-id))
(async/put! realm-queue #(chats-store/set-inactive chat-id)))))
(re-frame/reg-fx
:save-all-contacts
@ -141,17 +161,18 @@
(fn [{{:keys [current-chat-id] :as db} :db get-stored-messages :get-stored-messages} _]
(when-not (get-in db [:chats current-chat-id :all-loaded?])
(let [loaded-count (count (get-in db [:chats current-chat-id :messages]))
new-messages (get-stored-messages current-chat-id loaded-count)]
new-messages (index-messages (get-stored-messages current-chat-id loaded-count))]
{:db (-> db
(update-in [:chats current-chat-id :messages] merge (index-messages new-messages))
(update-in [:chats current-chat-id :messages] merge new-messages)
(update-in [:chats current-chat-id :not-loaded-message-ids] #(apply disj % (keys new-messages)))
(assoc-in [:chats current-chat-id :all-loaded?]
(> const/default-number-of-messages (count new-messages))))}))))
(handlers/register-handler-db
:set-message-shown
:message-appeared
[re-frame/trim-v]
(fn [db [{:keys [chat-id message-id]}]]
(update-in db [:chats chat-id :messages message-id] assoc :new? false)))
(update-in db [:chats chat-id :messages message-id] assoc :appearing? false)))
(defn init-console-chat
[{:keys [chats] :accounts/keys [current-account-id] :as db}]
@ -167,7 +188,6 @@
(not current-account-id)
(update :dispatch-n concat [[:chat-received-message/add-when-commands-loaded console-chat/intro-message1]]))))
(handlers/register-handler-fx
:init-console-chat
(fn [{:keys [db]} _]
@ -176,14 +196,18 @@
(handlers/register-handler-fx
:initialize-chats
[(re-frame/inject-cofx :all-stored-chats)
(re-frame/inject-cofx :inactive-chat-ids)
(re-frame/inject-cofx :get-stored-messages)
(re-frame/inject-cofx :stored-unviewed-messages)
(re-frame/inject-cofx :stored-message-ids)
(re-frame/inject-cofx :get-stored-unanswered-requests)]
(fn [{:keys [db
all-stored-chats
inactive-chat-ids
stored-unanswered-requests
get-stored-messages
stored-unviewed-messages]} _]
stored-unviewed-messages
stored-message-ids]} _]
(let [{:accounts/keys [account-creation?]} db
load-default-contacts-event [:load-default-contacts!]]
(if account-creation?
@ -194,15 +218,19 @@
{}
stored-unanswered-requests)
chats (reduce (fn [acc {:keys [chat-id] :as chat}]
(assoc acc chat-id
(assoc chat
:unviewed-messages (get stored-unviewed-messages chat-id)
:requests (get chat->message-id->request chat-id)
:messages (index-messages (get-stored-messages chat-id)))))
(let [chat-messages (index-messages (get-stored-messages chat-id))]
(assoc acc chat-id
(assoc chat
:unviewed-messages (get stored-unviewed-messages chat-id)
:requests (get chat->message-id->request chat-id)
:messages chat-messages
:not-loaded-message-ids (set/difference (get stored-message-ids chat-id)
(-> chat-messages keys set))))))
{}
all-stored-chats)]
(-> db
(assoc :chats chats)
(assoc :chats chats
:deleted-chats inactive-chat-ids)
init-console-chat
(update :dispatch-n conj load-default-contacts-event)))))))
@ -272,7 +300,7 @@
(handlers/register-handler-fx
:add-chat-loaded-event
[re-frame/trim-v]
[(re-frame/inject-cofx :get-stored-chat) re-frame/trim-v]
(fn [{:keys [db] :as cofx} [chat-id event]]
(if (get (:chats db) chat-id)
{:db (assoc-in db [:chats chat-id :chat-loaded-event] event)}
@ -282,7 +310,7 @@
;; TODO(janherich): remove this unnecessary event in the future (only model function `add-chat` will stay)
(handlers/register-handler-fx
:add-chat
[re-frame/trim-v]
[(re-frame/inject-cofx :get-stored-chat) re-frame/trim-v]
(fn [cofx [chat-id chat-props]]
(model/add-chat cofx chat-id chat-props)))
@ -305,7 +333,7 @@
(handlers/register-handler-fx
:start-chat
[re-frame/trim-v]
[(re-frame/inject-cofx :get-stored-chat) re-frame/trim-v]
(fn [{:keys [db] :as cofx} [contact-id {:keys [navigation-replace?]}]]
(when (not= (:current-public-key db) contact-id) ; don't allow to open chat with yourself
(if (get (:chats db) contact-id)
@ -319,13 +347,17 @@
;; TODO(janherich): remove this unnecessary event in the future (only model function `update-chat` will stay)
(handlers/register-handler-fx
:update-chat!
[(re-frame/inject-cofx :get-stored-chat) re-frame/trim-v]
[re-frame/trim-v]
(fn [cofx [chat]]
(model/update-chat cofx chat)))
;; TODO(janherich): remove this unnecessary event in the future (only model function `upsert-chat` will stay)
(handlers/register-handler-fx
:upsert-chat!
[(re-frame/inject-cofx :get-stored-chat) re-frame/trim-v]
(fn [cofx [chat]]
(model/upsert-chat cofx chat)))
:remove-chat
[re-frame/trim-v]
(fn [{:keys [db]} [chat-id]]
(let [chat (get-in db [:chats chat-id])]
{:db (-> db
(update :chats dissoc chat-id)
(update :deleted-chats (fnil conj #{}) chat-id))
:delete-chat chat
:delete-chat-messages chat})))

View File

@ -31,23 +31,25 @@
:contacts/keys [contacts]} db
jail-id (or bot jail-id chat-id)
jail-command-name (or content-command-name command-name)]
(if (get-in contacts [jail-id :jail-loaded?])
(let [path [(if (= :response (keyword type)) :responses :commands)
[jail-command-name
(or content-command-scope-bitmask scope-bitmask)]
data-type]
to (get-in contacts [chat-id :address])
jail-params {:parameters params
:context (generate-context current-account-id chat-id to group-id)}]
{:db db
:call-jail {:jail-id jail-id
:path path
:params jail-params
:callback-events-creator (fn [jail-response]
[[::jail-command-data-response
jail-response message opts]])}})
{:db (update-in db [:contacts/contacts jail-id :jail-loaded-events]
conj [:request-command-message-data message opts])})))
(if-not (get contacts jail-id) ;; bot is not even in contacts, do nothing
{:db db}
(if (get-in contacts [jail-id :jail-loaded?])
(let [path [(if (= :response (keyword type)) :responses :commands)
[jail-command-name
(or content-command-scope-bitmask scope-bitmask)]
data-type]
to (get-in contacts [chat-id :address])
jail-params {:parameters params
:context (generate-context current-account-id chat-id to group-id)}]
{:db db
:call-jail {:jail-id jail-id
:path path
:params jail-params
:callback-events-creator (fn [jail-response]
[[::jail-command-data-response
jail-response message opts]])}})
{:db (update-in db [:contacts/contacts jail-id :jail-loaded-events]
conj [:request-command-message-data message opts])}))))
;;;; Handlers

View File

@ -3,7 +3,6 @@
[re-frame.core :as re-frame]
[taoensso.timbre :as log]
[status-im.chat.constants :as constants]
[status-im.chat.utils :as chat-utils]
[status-im.chat.models :as model]
[status-im.chat.models.input :as input-model]
[status-im.chat.models.commands :as commands-model]
@ -193,7 +192,7 @@
:validation-messages nil
:prev-command name})
(set-chat-input-metadata metadata)
(set-chat-input-text (str (chat-utils/command-name command)
(set-chat-input-text (str (commands-model/command-name command)
constants/spacing-char
(when-not sequential-params
(input-model/join-command-args prefill)))))
@ -476,9 +475,7 @@
(handlers/register-handler-fx
:send-current-message
[(re-frame/inject-cofx :random-id)
(re-frame/inject-cofx :get-last-clock-value)
(re-frame/inject-cofx :get-stored-chat)]
message-model/send-interceptors
(fn [{{:keys [current-chat-id current-public-key] :as db} :db message-id :random-id current-time :now
:as cofx} _]
(when-not (get-in db [:chat-ui-props current-chat-id :sending-in-progress?])

View File

@ -1,7 +1,6 @@
(ns status-im.chat.events.receive-message
(:require [re-frame.core :as re-frame]
[taoensso.timbre :as log]
[status-im.data-store.chats :as chat-store]
[status-im.data-store.messages :as messages-store]
[status-im.chat.events.commands :as commands-events]
[status-im.chat.models.message :as message-model]
@ -9,16 +8,7 @@
[status-im.utils.handlers :as handlers]
[status-im.utils.random :as random]))
;;;; Coeffects
(re-frame/reg-cofx
:pop-up-chat?
(fn [cofx]
(assoc cofx :pop-up-chat? (fn [chat-id]
(or (not (chat-store/exists? chat-id))
(chat-store/is-active? chat-id))))))
;;;; FX
;;;; Handlers
(handlers/register-handler-fx
::received-message
@ -30,25 +20,26 @@
:chat-received-message/add
message-model/receive-interceptors
(fn [{:keys [db] :as cofx} [{:keys [content] :as message}]]
(if (:command content)
;; we are dealing with received command message, we can't add it right away,
;; we first need to fetch short-preview + preview and add it only after we already have those.
;; note that `request-command-message-data` implicitly wait till jail is ready and
;; calls are made only after that
(commands-events/request-command-message-data
db message
{:data-type :short-preview
:proceed-event-creator (fn [short-preview]
[:request-command-message-data
message
{:data-type :preview
:proceed-event-creator (fn [preview]
[::received-message
(update message :content merge
{:short-preview short-preview
:preview preview})])}])})
;; regular non command message, we can add it right away
(message-model/receive cofx message))))
(when (message-model/add-to-chat? cofx message)
(if (:command content)
;; we are dealing with received command message, we can't add it right away,
;; we first need to fetch short-preview + preview and add it only after we already have those.
;; note that `request-command-message-data` implicitly wait till jail is ready and
;; calls are made only after that
(commands-events/request-command-message-data
db message
{:data-type :short-preview
:proceed-event-creator (fn [short-preview]
[:request-command-message-data
message
{:data-type :preview
:proceed-event-creator (fn [preview]
[::received-message
(update message :content merge
{:short-preview short-preview
:preview preview})])}])})
;; regular non command message, we can add it right away
(message-model/receive cofx message)))))
;; TODO janherich: get rid of this special case once they hacky app start-up sequence is refactored
(handlers/register-handler-fx
@ -72,9 +63,9 @@
(re-frame/dispatch [:update-bot-db {:bot bot-id
:db update-db}]))
(re-frame/dispatch [:suggestions-handler (assoc params
:bot-id bot-id
:result data
:default-db default-db)])
:bot-id bot-id
:result data
:default-db default-db)])
(doseq [message log-messages]
(let [{:keys [message type]} message]
(when (or (not= type "debug")

View File

@ -1,16 +1,11 @@
(ns status-im.chat.events.send-message
(:require [re-frame.core :as re-frame]
[status-im.chat.utils :as chat-utils]
[status-im.chat.models.message :as message-model]
[status-im.constants :as constants]
[status-im.data-store.chats :as chats-store]
[status-im.native-module.core :as status]
[status-im.protocol.core :as protocol]
[status-im.utils.config :as config]
[status-im.utils.handlers :as handlers]
[status-im.utils.random :as random]
[status-im.utils.types :as types]
[status-im.utils.datetime :as datetime]
[taoensso.timbre :as log]))
(re-frame/reg-fx
@ -34,13 +29,6 @@
(fn [value]
(protocol/send-message! value)))
(re-frame/reg-fx
:update-message-overhead!
(fn [[chat-id network-status]]
(if (= network-status :offline)
(chats-store/inc-message-overhead chat-id)
(chats-store/reset-message-overhead chat-id))))
;;;; Handlers
(handlers/register-handler-fx

View File

@ -1,12 +1,11 @@
(ns status-im.chat.handlers
(:require [re-frame.core :refer [enrich after debug dispatch reg-fx]]
(:require [re-frame.core :refer [after dispatch reg-fx]]
[clojure.string :as string]
[status-im.ui.components.styles :refer [default-chat-color]]
[status-im.chat.constants :as chat-consts]
[status-im.protocol.core :as protocol]
[status-im.data-store.chats :as chats]
[status-im.data-store.messages :as messages]
[status-im.data-store.pending-messages :as pending-messages]
[status-im.constants :refer [text-content-type
content-type-command
content-type-command-request
@ -15,33 +14,6 @@
[status-im.utils.handlers :refer [register-handler register-handler-fx] :as u]
status-im.chat.events))
(defn remove-chat
[db [_ chat-id]]
(update db :chats dissoc chat-id))
(reg-fx
::delete-messages
(fn [id]
(messages/delete-by-chat-id id)))
(defn delete-messages!
[{:keys [current-chat-id chats]} [_ chat-id]]
(let [id (or chat-id current-chat-id)
{:keys [group-chat]} (chats/get-by-id chat-id)]
(when group-chat
(messages/delete-by-chat-id id))))
(defn delete-chat!
[_ [_ chat-id]]
(let [{:keys [debug?]} (chats/get-by-id chat-id)]
(if debug?
(chats/delete chat-id)
(chats/set-inactive chat-id))))
(defn remove-pending-messages!
[_ [_ chat-id]]
(pending-messages/delete-all-by-chat-id chat-id))
(register-handler
:leave-group-chat
;; todo order of operations tbd
@ -62,14 +34,6 @@
:message-id (random/id)}})))
(dispatch [:remove-chat current-chat-id]))))
(register-handler
:remove-chat
(u/handlers->
remove-chat
delete-messages!
remove-pending-messages!
delete-chat!))
(register-handler :update-group-message
(u/side-effect!
(fn [{:keys [current-public-key web3 chats]}
@ -93,11 +57,6 @@
:keypair keypair
:callback #(dispatch [:incoming-message %1 %2])}))))))))
(reg-fx
::save-public-chat
(fn [chat]
(chats/save chat)))
(reg-fx
::start-watching-group
(fn [{:keys [group-id web3 current-public-key keypair]}]
@ -122,17 +81,12 @@
(merge
(when-not exists?
{:db (assoc-in db [:chats (:chat-id chat)] chat)
::save-public-chat chat
:save-chat chat
::start-watching-group (merge {:group-id topic}
(select-keys db [:web3 :current-public-key]))})
{:dispatch-n [[:navigate-to-clean :home]
[:navigate-to-chat topic]]}))))
(reg-fx
::save-chat
(fn [new-chat]
(chats/save new-chat)))
(reg-fx
::start-listen-group
(fn [{:keys [new-chat web3 current-public-key]}]
@ -193,7 +147,7 @@
{:db (-> db
(assoc-in [:chats (:chat-id new-chat)] new-chat)
(assoc :group/selected-contacts #{}))
::save-chat new-chat
:save-chat new-chat
::start-listen-group (merge {:new-chat new-chat}
(select-keys db [:web3 :current-public-key]))
:dispatch-n [[:navigate-to-clean :home]

View File

@ -13,40 +13,45 @@
(update-in db [:chat-ui-props current-chat-id ui-element] not))
(defn- create-new-chat
[{:keys [db now] :as cofx} chat-id chat-props]
[{:keys [db now] :as cofx} chat-id]
(let [name (get-in db [:contacts/contacts chat-id :name])]
(merge {:chat-id chat-id
:name (or name (gfycat/generate-gfy chat-id))
:color styles/default-chat-color
:group-chat false
:is-active true
:timestamp now
:contacts [{:identity chat-id}]}
chat-props)))
{:chat-id chat-id
:name (or name (gfycat/generate-gfy chat-id))
:color styles/default-chat-color
:group-chat false
:is-active true
:timestamp now
:contacts [{:identity chat-id}]}))
(defn add-chat
"Adds new chat to db & realm, if the chat with same id already exists, justs restores it"
([cofx chat-id]
(add-chat cofx chat-id {}))
([{:keys [db] :as cofx} chat-id chat-props]
(let [new-chat (create-new-chat cofx chat-id chat-props)
existing-chats (:chats db)]
{:db (cond-> db
(not (contains? existing-chats chat-id))
(update :chats assoc chat-id new-chat))
([{:keys [db get-stored-chat] :as cofx} chat-id chat-props]
(let [{:keys [chats deleted-chats]} db
new-chat (merge (if (get deleted-chats chat-id)
(assoc (get-stored-chat chat-id) :is-active true)
(create-new-chat cofx chat-id))
chat-props)]
{:db (-> db
(update :chats assoc chat-id new-chat)
(update :deleted-chats (fnil disj #{}) chat-id))
:save-chat new-chat})))
;; TODO (yenda): there should be an option to update the timestamp
;; this shouldn't need a specific function like `upsert-chat` which
;; is wrongfuly named
(defn update-chat
"Updates chat properties, if chat is not present in db, creates a default new one"
[{:keys [db get-stored-chat] :as cofx} {:keys [chat-id] :as chat}]
(let [chat (merge (or (get-stored-chat chat-id)
(create-new-chat cofx chat-id {}))
chat)]
{:db (cond-> db
(:is-active chat) (update-in [:chats chat-id] merge chat))
:save-chat chat}))
"Updates chat properties when not deleted, if chat is not present in app-db, creates a default new one"
[{:keys [db] :as cofx} {:keys [chat-id] :as chat-props}]
(let [{:keys [chats deleted-chats]} db]
(if (get deleted-chats chat-id) ;; when chat is deleted, don't change anything
{:db db}
(let [chat (merge (or (get chats chat-id)
(create-new-chat cofx chat-id))
chat-props)]
{:db (update-in db [:chats chat-id] merge chat)
:save-chat chat}))))
;; TODO (yenda): an upsert is suppose to add the entry if it doesn't
;; exist and update it if it does

View File

@ -13,6 +13,9 @@
(defn- is-dapp? [all-contacts {:keys [identity]}]
(get-in all-contacts [identity :dapp?]))
(defn command-name [{:keys [name]}]
(str chat-consts/command-char name))
(defn commands-responses
"Returns map of commands/responses eligible for current chat."
[type access-scope->commands-responses {:keys [address]} {:keys [contacts group-chat public?]} all-contacts]

View File

@ -6,7 +6,6 @@
[status-im.chat.constants :as const]
[status-im.chat.models.commands :as commands-model]
[status-im.chat.views.input.validation-messages :refer [validation-message]]
[status-im.chat.utils :as chat-utils]
[status-im.i18n :as i18n]
[status-im.utils.phone-number :as phone-number]
[status-im.js-dependencies :as dependencies]

View File

@ -5,8 +5,6 @@
[status-im.chat.events.requests :as requests-events]
[status-im.chat.models :as chat-model]
[status-im.chat.models.commands :as commands-model]
[status-im.chat.utils :as chat-utils]
[status-im.data-store.messages :as messages-store]
[status-im.utils.datetime :as datetime-utils]
[status-im.utils.clocks :as clocks-utils]
[status-im.utils.random :as random]))
@ -16,12 +14,8 @@
(get accounts current-account-id))
(def receive-interceptors
[(re-frame/inject-cofx :message-exists?)
(re-frame/inject-cofx :pop-up-chat?)
(re-frame/inject-cofx :get-last-clock-value)
(re-frame/inject-cofx :random-id)
(re-frame/inject-cofx :get-stored-chat)
re-frame/trim-v])
[(re-frame/inject-cofx :get-stored-message) (re-frame/inject-cofx :get-stored-chat)
(re-frame/inject-cofx :random-id) re-frame/trim-v])
(defn- lookup-response-ref
[access-scope->commands-responses account chat contacts response-name]
@ -32,58 +26,81 @@
contacts)]
(:ref (get available-commands-responses response-name))))
(defn- add-message-to-db
[db {:keys [message-id] :as message} chat-id current-chat?]
(cond-> (chat-utils/add-message-to-db db chat-id chat-id message (:new? message))
(not current-chat?)
(update-in [:chats chat-id :unviewed-messages] (fnil conj #{}) message-id)))
(defn add-message-to-db
[db chat-id {:keys [message-id clock-value] :as message} current-chat?]
(let [prepared-message (cond-> (assoc message
:chat-id chat-id
:appearing? true)
(not current-chat?)
(assoc :appearing? false))]
(cond-> (-> db
(update-in [:chats chat-id :messages] assoc message-id prepared-message)
(update-in [:chats chat-id :last-clock-value] (fnil max 0) clock-value))
(not current-chat?)
(update-in [:chats chat-id :unviewed-messages] (fnil conj #{}) message-id))))
(defn receive
[{:keys [db message-exists? pop-up-chat? get-last-clock-value now] :as cofx}
[{:keys [db now] :as cofx}
{:keys [from group-id chat-id content-type content message-id timestamp clock-value]
:as message
:or {clock-value 0}}]
:as message}]
(let [{:keys [current-chat-id view-id
access-scope->commands-responses] :contacts/keys [contacts]} db
{:keys [public-key] :as current-account} (get-current-account db)
chat-identifier (or group-id chat-id from)
direct-message? (nil? group-id)]
;; proceed with adding message if message is not already stored in realm,
;; it's not from current user (outgoing message) and it's for relevant chat
;; (either current active chat or new chat not existing yet or it's a direct message)
(when (and (not (message-exists? message-id))
(not= from public-key)
(or (pop-up-chat? chat-identifier)
direct-message?))
(let [current-chat? (and (= :chat view-id)
(= current-chat-id chat-identifier))
fx (if (get-in db [:chats chat-identifier])
(chat-model/upsert-chat cofx {:chat-id chat-identifier
:group-chat (boolean group-id)})
(chat-model/add-chat cofx chat-identifier))
command-request? (= content-type constants/content-type-command-request)
command (:command content)
enriched-message (cond-> (assoc message
:chat-id chat-identifier
:timestamp (or timestamp now)
:show? true
:clock-value (clocks-utils/receive
clock-value
(get-last-clock-value chat-identifier)))
public-key
(assoc :user-statuses {public-key (if current-chat? :seen :received)})
(and command command-request?)
(assoc-in [:content :content-command-ref]
(lookup-response-ref access-scope->commands-responses
current-account
(get-in fx [:db :chats chat-identifier])
contacts
command)))]
(cond-> (-> fx
(update :db add-message-to-db enriched-message chat-identifier current-chat?)
(assoc :save-message (dissoc enriched-message :new?)))
command-request?
(requests-events/add-request chat-identifier enriched-message))))))
current-chat? (and (= :chat view-id)
(= current-chat-id chat-identifier))
fx (if (get-in db [:chats chat-identifier])
(chat-model/upsert-chat cofx {:chat-id chat-identifier
:group-chat (boolean group-id)})
(chat-model/add-chat cofx chat-identifier))
chat (get-in fx [:db :chats chat-identifier])
command-request? (= content-type constants/content-type-command-request)
command (:command content)
enriched-message (cond-> (assoc message
:chat-id chat-identifier
:timestamp (or timestamp now)
:show? true
:clock-value (clocks-utils/receive
clock-value
(:last-clock-value chat)))
public-key
(assoc :user-statuses {public-key (if current-chat? :seen :received)})
(and command command-request?)
(assoc-in [:content :content-command-ref]
(lookup-response-ref access-scope->commands-responses
current-account
(get-in fx [:db :chats chat-identifier])
contacts
command)))]
(cond-> (-> fx
(update :db add-message-to-db chat-identifier enriched-message current-chat?)
(assoc :save-message (dissoc enriched-message :new?)))
command-request?
(requests-events/add-request chat-identifier enriched-message))))
(defn add-to-chat?
[{:keys [db get-stored-message]} {:keys [group-id chat-id from message-id]}]
(let [chat-identifier (or group-id chat-id from)
{:keys [chats deleted-chats current-public-key]} db
{:keys [messages not-loaded-message-ids]} (get chats chat-identifier)]
(when (not= from current-public-key)
(if group-id
(not (or (get deleted-chats chat-identifier)
(get messages message-id)
(get not-loaded-message-ids message-id)))
(not (or (get messages message-id)
(get not-loaded-message-ids message-id)
(and (get deleted-chats chat-identifier)
(get-stored-message message-id))))))))
(defn message-seen-by? [message user-pk]
(= :seen (get-in message [:user-statuses user-pk])))
;;;; Send message
(def send-interceptors
[(re-frame/inject-cofx :random-id) (re-frame/inject-cofx :random-id-seq)
(re-frame/inject-cofx :get-stored-chat) re-frame/trim-v])
(defn- handle-message-from-bot [cofx {:keys [message chat-id]}]
(when-let [message (cond
@ -159,31 +176,21 @@
(and group-chat (not public?))
(let [{:keys [public-key private-key]} (get chats chat-id)]
{:send-group-message (assoc options
:group-id chat-id
:keypair {:public public-key
:private private-key})})
:group-id chat-id
:keypair {:public public-key
:private private-key})})
(and group-chat public?)
{:send-public-group-message (assoc options :group-id chat-id
:username (get-in accounts [current-account-id :name]))}
:username (get-in accounts [current-account-id :name]))}
:else
(merge {:send-message (assoc-in options [:message :to] chat-id)}
(when-not command) {:send-notification fcm-token}))))))
;;;; Send message
(def send-interceptors
[(re-frame/inject-cofx :random-id)
(re-frame/inject-cofx :random-id-seq)
(re-frame/inject-cofx :get-stored-chat)
(re-frame/inject-cofx :now)
(re-frame/inject-cofx :get-last-clock-value)
re-frame/trim-v])
(defn- prepare-message [clock-value params chat]
(defn- prepare-message [params chat]
(let [{:keys [chat-id identity message-text]} params
{:keys [group-chat public?]} chat
{:keys [group-chat public? last-clock-value]} chat
message {:message-id (random/id)
:chat-id chat-id
:content message-text
@ -191,36 +198,34 @@
:content-type constants/text-content-type
:outgoing true
:timestamp (datetime-utils/now-ms)
:clock-value (clocks-utils/send clock-value)
:clock-value (clocks-utils/send last-clock-value)
:show? true}]
(cond-> message
(not group-chat)
(assoc :message-type :user-message
:to chat-id)
group-chat
(assoc :group-id chat-id)
(and group-chat public?)
(assoc :message-type :public-group-user-message)
(and group-chat (not public?))
(assoc :message-type :group-user-message)
(not group-chat)
(assoc :to chat-id :message-type :user-message))))
(not group-chat)
(assoc :message-type :user-message
:to chat-id)
group-chat
(assoc :group-id chat-id)
(and group-chat public?)
(assoc :message-type :public-group-user-message)
(and group-chat (not public?))
(assoc :message-type :group-user-message)
(not group-chat)
(assoc :to chat-id :message-type :user-message))))
(defn send-message [{{:keys [network-status] :as db} :db
:keys [now get-stored-chat get-last-clock-value]}
:keys [now]}
{:keys [chat-id] :as params}]
(let [chat (get-in db [:chats chat-id])
message (prepare-message (get-last-clock-value chat-id) params chat)
message (prepare-message params chat)
params' (assoc params :message message)
fx {:db (chat-utils/add-message-to-db db chat-id chat-id message)
:update-message-overhead! [chat-id network-status]
fx {:db (add-message-to-db db chat-id message true)
:update-message-overhead [chat-id network-status]
:save-message message}]
(-> (merge fx (chat-model/upsert-chat (assoc fx :get-stored-chat get-stored-chat :now now)
(-> (merge fx (chat-model/upsert-chat (assoc fx :now now)
{:chat-id chat-id}))
(as-> fx'
(merge fx' (send fx' params'))))))
(merge fx' (send fx' params'))))))
(defn- prepare-command
[identity chat-id clock-value
@ -238,14 +243,14 @@
:scope (:scope command)
:params params})
content' (assoc content :handler-data handler-data
:type (name (:type command))
:content-command (:name command)
:content-command-scope-bitmask (:scope-bitmask command)
:content-command-ref (:ref command)
:preview (:preview command)
:short-preview (:short-preview command)
:bot (or (:bot command)
(:owner-id command)))]
:type (name (:type command))
:content-command (:name command)
:content-command-scope-bitmask (:scope-bitmask command)
:content-command-ref (:ref command)
:preview (:preview command)
:short-preview (:short-preview command)
:bot (or (:bot command)
(:owner-id command)))]
{:message-id id
:from identity
:to chat-id
@ -263,36 +268,37 @@
:show? true}))
(defn send-command
[{{:keys [current-public-key network-status] :as db} :db
:keys [now get-stored-chat random-id-seq get-last-clock-value]} result add-to-chat-id params]
[{{:keys [current-public-key network-status chats] :as db} :db
:keys [now random-id-seq]} result add-to-chat-id params]
(let [{{:keys [handler-data
command]
:as content} :command
chat-id :chat-id} params
request (:request handler-data)
hidden-params (->> (:params command)
(filter :hidden)
(map :name))
command' (prepare-command current-public-key chat-id (get-last-clock-value chat-id) request content)
params' (assoc params :command command')
request (:request handler-data)
last-clock-value (get-in chats [chat-id :last-clock-value])
hidden-params (->> (:params command)
(filter :hidden)
(map :name))
command' (prepare-command current-public-key chat-id last-clock-value request content)
params' (assoc params :command command')
fx {:db (-> (merge db (:db result))
(chat-utils/add-message-to-db add-to-chat-id chat-id command'))
:update-message-overhead! [chat-id network-status]
:save-message (-> command'
(assoc :chat-id chat-id)
(update-in [:content :params]
#(apply dissoc % hidden-params))
(dissoc :to-message :has-handler :raw-input))}]
fx {:db (-> (merge db (:db result))
(add-message-to-db chat-id command' true))
:update-message-overhead [chat-id network-status]
:save-message (-> command'
(assoc :chat-id chat-id)
(update-in [:content :params]
#(apply dissoc % hidden-params))
(dissoc :to-message :has-handler :raw-input))}]
(cond-> (merge fx
(chat-model/upsert-chat (assoc fx :get-stored-chat get-stored-chat :now now)
(chat-model/upsert-chat (assoc fx :now now)
{:chat-id chat-id})
(dissoc result :db))
true
(as-> fx'
(merge fx' (send fx' params')))
(merge fx' (send fx' params')))
(:to-message command')
(assoc :chat-requests/mark-as-answered {:chat-id chat-id
@ -300,9 +306,9 @@
(= constants/console-chat-id chat-id)
(as-> fx'
(let [messages (console-events/console-respond-command-messages params' random-id-seq)
events (mapv #(vector :chat-received-message/add %) messages)]
(update fx' :dispatch-n into events))))))
(let [messages (console-events/console-respond-command-messages params' random-id-seq)
events (mapv #(vector :chat-received-message/add %) messages)]
(update fx' :dispatch-n into events))))))
(defn invoke-console-command-handler
[{:keys [db] :as cofx} {:keys [chat-id command] :as command-params}]
@ -329,8 +335,8 @@
:to to
:current-account (get accounts current-account-id)
:message-id id}
(:async-handler command)
(assoc :orig-params orig-params))}]
(:async-handler command)
(assoc :orig-params orig-params))}]
{:call-jail {:jail-id identity
:path [handler-type [name scope-bitmask] :handler]
:params jail-params
@ -344,13 +350,13 @@
(-> {:db (chat-model/set-chat-ui-props db {:sending-in-progress? false})}
(as-> fx'
(cond
(and (= constants/console-chat-id chat-id)
(console-events/commands-names (:name command)))
(invoke-console-command-handler (merge cofx fx') params)
(cond
(and (= constants/console-chat-id chat-id)
(console-events/commands-names (:name command)))
(invoke-console-command-handler (merge cofx fx') params)
(:has-handler command)
(merge fx' (invoke-command-handlers fx' params))
(:has-handler command)
(merge fx' (invoke-command-handlers fx' params))
:else
(merge fx' (send-command cofx fx' chat-id params)))))))
:else
(merge fx' (send-command cofx fx' chat-id params)))))))

View File

@ -2,6 +2,7 @@
(:require [cljs.spec.alpha :as s]))
(s/def :chat/chats (s/nilable map?)) ; {id (string) chat (map)} active chats on chat's tab
(s/def :chat/deleted-chats (s/nilable set?)) ; set of deleted chat ids
(s/def :chat/current-chat-id (s/nilable string?)) ; current or last opened chat-id
(s/def :chat/chat-id (s/nilable string?)) ; what is the difference ? ^
(s/def :chat/new-chat-name (s/nilable string?)) ; we have name in the new-chat why do we need this field
@ -15,5 +16,7 @@
(s/def :chat/public-group-topic (s/nilable string?))
(s/def :chat/confirmation-code-sms-listener (s/nilable any?)) ; .addListener result object
(s/def :chat/messages (s/nilable map?)) ; messages indexed by message-id
(s/def :chat/not-loaded-message-ids (s/nilable set?)) ; set of message-ids not yet fully loaded from persisted state
(s/def :chat/last-clock-value (s/nilable number?)) ; last logical clock value of messages in chat
(s/def :chat/loaded-chats (s/nilable seq?))
(s/def :chat/bot-db (s/nilable map?))

View File

@ -3,7 +3,6 @@
[status-im.constants :as constants]
[status-im.chat.models.input :as input-model]
[status-im.chat.models.commands :as commands-model]
[status-im.chat.utils :as chat-utils]
[status-im.chat.views.input.utils :as input-utils]
[status-im.commands.utils :as commands-utils]
[status-im.utils.datetime :as time]
@ -11,7 +10,7 @@
[status-im.i18n :as i18n]
[clojure.string :as string]))
(reg-sub :chats :chats)
(reg-sub :get-chats :chats)
(reg-sub :get-current-chat-id :current-chat-id)
@ -48,15 +47,21 @@
(fn [kb-height]
(if platform/ios? kb-height 0)))
(reg-sub
:get-active-chats
:<- [:get-chats]
(fn [chats]
(into {} (filter (comp :is-active second)) chats)))
(reg-sub
:get-chat
:<- [:chats]
:<- [:get-active-chats]
(fn [chats [_ chat-id]]
(get chats chat-id)))
(reg-sub
:get-current-chat
:<- [:chats]
:<- [:get-active-chats]
:<- [:get-current-chat-id]
(fn [[chats current-chat-id]]
(get chats current-chat-id)))
@ -69,7 +74,7 @@
(reg-sub
:chat
:<- [:chats]
:<- [:get-active-chats]
:<- [:get-current-chat-id]
(fn [[chats id] [_ k chat-id]]
(get-in chats [(or chat-id id) k])))
@ -161,7 +166,7 @@
(defn- available-commands-responses [[commands-responses {:keys [input-text]}]]
(->> commands-responses
map->sorted-seq
(filter #(string/includes? (chat-utils/command-name %) (or input-text "")))))
(filter #(string/includes? (commands-model/command-name %) (or input-text "")))))
(reg-sub
:get-available-commands

View File

@ -1,17 +0,0 @@
(ns status-im.chat.utils
(:require [status-im.chat.constants :as chat.constants]
[taoensso.timbre :as log]))
(defn add-message-to-db
([db add-to-chat-id chat-id message] (add-message-to-db db add-to-chat-id chat-id message true))
([db add-to-chat-id chat-id {:keys [message-id] :as message} new?]
(let [prepared-message (assoc message
:chat-id chat-id
:new? (if (nil? new?) true new?))]
(update-in db [:chats add-to-chat-id :messages] assoc message-id prepared-message))))
(defn message-seen-by? [message user-pk]
(= :seen (get-in message [:user-statuses user-pk])))
(defn command-name [{:keys [name]}]
(str chat.constants/command-char name))

View File

@ -6,8 +6,8 @@
[taoensso.timbre :as log]
[status-im.chat.constants :as const]
[status-im.chat.models.input :as input-model]
[status-im.chat.models.commands :as commands-model]
[status-im.chat.styles.input.input :as style]
[status-im.chat.utils :as chat-utils]
[status-im.chat.views.input.emoji :as emoji]
[status-im.chat.views.input.parameter-box :as parameter-box]
[status-im.chat.views.input.input-actions :as input-actions]
@ -25,7 +25,7 @@
[react/view
[react/text {:style (style/command first?)
:font :roboto-mono}
(chat-utils/command-name command)]]])
(commands-model/command-name command)]]])
(defview commands-view []
[all-commands-responses [:get-available-commands-responses]

View File

@ -4,7 +4,7 @@
[status-im.ui.components.react :as react]
[status-im.chat.styles.input.suggestions :as style]
[status-im.chat.views.input.animations.expandable :as expandable]
[status-im.chat.utils :as chat.utils]
[status-im.chat.models.commands :as commands-model]
[status-im.i18n :as i18n]))
(defn suggestion-item [{:keys [on-press name description last?]}]
@ -23,14 +23,14 @@
[suggestion-item
{:on-press #(let [metadata (assoc params :to-message-id message-id)]
(re-frame/dispatch [:select-chat-input-command command metadata]))
:name (chat.utils/command-name command)
:name (commands-model/command-name command)
:description description
:last? last?}])
(defn command-item [{:keys [name description bot] :as command} last?]
[suggestion-item
{:on-press #(re-frame/dispatch [:select-chat-input-command command nil])
:name (chat.utils/command-name command)
:name (commands-model/command-name command)
:description description
:last? last?}])

View File

@ -6,15 +6,14 @@
[status-im.ui.components.react :as react]
[status-im.ui.components.animation :as animation]
[status-im.ui.components.list-selection :as list-selection]
[status-im.chat.models.commands :as commands]
[status-im.commands.utils :as commands.utils]
[status-im.chat.utils :as chat.utils]
[status-im.chat.models.commands :as models.commands]
[status-im.chat.models.message :as models.message]
[status-im.chat.styles.message.message :as style]
[status-im.chat.styles.message.command-pill :as pill-style]
[status-im.chat.views.message.request-message :as request-message]
[status-im.constants :as constants]
[status-im.ui.components.chat-icon.screen :as chat-icon.screen]
[status-im.utils.events-buffer :as events-buffer]
[status-im.utils.identicon :as identicon]
[status-im.utils.gfycat.core :as gfycat]
[status-im.utils.platform :as platform]
@ -79,7 +78,7 @@
[react/view (pill-style/pill command)
[react/text {:style pill-style/pill-text
:font :default}
(chat.utils/command-name command)]]])
(models.commands/command-name command)]]])
(when icon-path
[react/view style/command-image-view
[react/icon icon-path style/command-image]])
@ -266,10 +265,10 @@
(callback))))))))
(defn message-container [message & children]
(if (:new? message)
(if (:appearing? message)
(let [layout-height (reagent/atom 0)
anim-value (animation/create-value 1)
anim-callback #(events-buffer/dispatch [:set-message-shown message])
anim-callback #(re-frame/dispatch [:message-appeared message])
context {:to-value layout-height
:val anim-value
:callback anim-callback}
@ -296,13 +295,12 @@
"chat-message"
:component-did-mount
;; send `:seen` signal when we have signed-in user, message not from us and we didn't sent it already
(fn []
(when (and current-public-key message-id chat-id (not outgoing)
(not (chat.utils/message-seen-by? message current-public-key)))
(events-buffer/dispatch [:send-seen! {:chat-id chat-id
:from from
:me current-public-key
:message-id message-id}])))
#(when (and current-public-key message-id chat-id (not outgoing)
(not (models.message/message-seen-by? message current-public-key)))
(re-frame/dispatch [:send-seen! {:chat-id chat-id
:from from
:me current-public-key
:message-id message-id}]))
:reagent-render
(fn [{:keys [outgoing group-chat content-type content] :as message}]
[message-container message

View File

@ -48,7 +48,7 @@
(get-in contacts [whisper-identity :debug?]))
(let [dapp (merge dapp-data {:dapp? true
:debug? true})]
(re-frame/dispatch [:upsert-chat! {:chat-id whisper-identity
(re-frame/dispatch [:update-chat! {:chat-id whisper-identity
:name name
:debug? true}])
(if (get contacts whisper-identity)

View File

@ -2,13 +2,13 @@
(:require [status-im.data-store.realm.chats :as data-store])
(:refer-clojure :exclude [exists?]))
(defn- normalize-contacts
[item]
(update item :contacts vals))
(defn get-all
[]
(map normalize-contacts (data-store/get-all-active)))
(data-store/get-all-active))
(defn get-inactive-ids
[]
(data-store/get-inactive-ids))
(defn get-by-id
[id]
@ -19,7 +19,7 @@
(data-store/exists? chat-id))
(defn save
[{:keys [last-message-id chat-id] :as chat}]
[{:keys [chat-id] :as chat}]
(data-store/save chat (data-store/exists? chat-id)))
(defn delete
@ -85,8 +85,8 @@
(defn new-update?
[timestamp chat-id]
(let
[{:keys [added-to-at removed-at removed-from-at added-at]}
(get-by-id chat-id)]
[{:keys [added-to-at removed-at removed-from-at added-at]}
(get-by-id chat-id)]
(and (> timestamp added-to-at)
(> timestamp removed-at)
(> timestamp removed-from-at)

View File

@ -16,9 +16,6 @@
{:outgoing false
:to nil})
(defn exists? [message-id]
(data-store/exists? message-id))
(defn get-by-id
[message-id]
(data-store/get-by-id message-id))
@ -33,18 +30,16 @@
(update message :content reader/read-string)
message))))))
(defn get-stored-message-ids
[]
(data-store/get-stored-message-ids))
(defn get-log-messages
[chat-id]
(->> (data-store/get-by-chat-id chat-id 0 100)
(filter #(= (:content-type %) constants/content-type-log-message))
(map #(select-keys % [:content :timestamp]))))
(defn get-last-clock-value
[chat-id]
(if-let [message (data-store/get-last-message chat-id)]
(:clock-value message)
0))
(defn get-unviewed
[current-public-key]
(into {}

View File

@ -1,31 +1,39 @@
(ns status-im.data-store.realm.chats
(:require [goog.object :as object]
[status-im.data-store.realm.core :as realm]
[status-im.data-store.realm.messages :as messages]
[status-im.utils.random :refer [timestamp]]
[taoensso.timbre :as log])
(:refer-clojure :exclude [exists?]))
(defn get-all
[]
(-> @realm/account-realm
(realm/get-all :chat)
(realm/sorted :timestamp :desc)))
(defn get-all-as-list
[]
(realm/js-object->clj (get-all)))
(defn- normalize-chat [{:keys [chat-id] :as chat}]
(let [last-message (messages/get-last-message chat-id)]
(-> chat
(realm/fix-map->vec :contacts)
(assoc :last-clock-value (or (:clock-value last-message) 0)))))
(defn get-all-active
[]
(-> (realm/get-by-field @realm/account-realm :chat :is-active true)
(realm/sorted :timestamp :desc)
realm/js-object->clj))
(map normalize-chat
(-> (realm/get-by-field @realm/account-realm :chat :is-active true)
(realm/sorted :timestamp :desc)
realm/js-object->clj)))
(defn get-inactive-ids
[]
(-> (realm/get-by-field @realm/account-realm :chat :is-active false)
(.map (fn [chat _ _]
(aget chat "chat-id")))
realm/js-object->clj
set))
(defn- groups
[active?]
(realm/filtered (get-all)
(str "group-chat = true && is-active = "
(if active? "true" "false"))))
(-> @realm/account-realm
(realm/get-all :chat)
(realm/sorted :timestamp :desc)
(realm/filtered (str "group-chat = true && is-active = "
(if active? "true" "false")))))
(defn get-active-group-chats
[]
@ -46,7 +54,7 @@
[chat-id]
(-> @realm/account-realm
(realm/get-one-by-field-clj :chat :chat-id chat-id)
(realm/fix-map->vec :contacts)))
normalize-chat))
(defn save
[chat update?]

View File

@ -32,6 +32,19 @@
realm/js-object->clj)]
(mapv transform-message messages))))
(defn get-stored-message-ids
[]
(let [chat-id->message-id (volatile! {})]
(-> @realm/account-realm
(.objects "message")
(.map (fn [msg _ _]
(vswap! chat-id->message-id
#(update %
(aget msg "chat-id")
(fnil conj #{})
(aget msg "message-id"))))))
@chat-id->message-id))
(defn get-by-fields
[fields from number-of-messages]
(-> (realm/get-by-fields @realm/account-realm :message :and fields)

View File

@ -1,5 +1,6 @@
(ns status-im.protocol.handlers
(:require [re-frame.core :as re-frame]
[cljs.core.async :as async]
[status-im.utils.handlers :as handlers]
[status-im.data-store.contacts :as contacts]
[status-im.data-store.messages :as messages]
@ -10,13 +11,13 @@
[status-im.constants :as constants]
[status-im.i18n :as i18n]
[status-im.utils.random :as random]
[status-im.utils.async :as async-utils]
[status-im.protocol.message-cache :as cache]
[status-im.protocol.listeners :as listeners]
[status-im.chat.utils :as chat.utils]
[status-im.chat.models.message :as models.message]
[status-im.protocol.web3.inbox :as inbox]
[status-im.protocol.web3.keys :as web3.keys]
[status-im.utils.datetime :as datetime]
[status-im.utils.events-buffer :as events-buffer]
[taoensso.timbre :as log]
[status-im.native-module.core :as status]
[clojure.string :as string]
@ -40,11 +41,6 @@
(fn [coeffects _]
(assoc coeffects :pending-messages (pending-messages/get-all))))
(re-frame/reg-cofx
::get-all-contacts
(fn [coeffects _]
(assoc coeffects :contacts (contacts/get-all))))
(re-frame/reg-cofx
::message-get-by-id
(fn [coeffects _]
@ -74,6 +70,8 @@
;;;; FX
(def ^:private protocol-realm-queue (async-utils/task-queue 200))
(re-frame/reg-fx
:stop-whisper
(fn [] (protocol/stop-whisper!)))
@ -85,7 +83,7 @@
{:web3 web3
:identity public-key
:groups groups
:callback #(events-buffer/dispatch [:incoming-message %1 %2])
:callback #(re-frame/dispatch [:incoming-message %1 %2])
:ack-not-received-s-interval 125
:default-ttl 120
:send-online-s-interval 180
@ -118,7 +116,7 @@
(re-frame/reg-fx
::save-processed-messages
(fn [processed-message]
(processed-messages/save processed-message)))
(async/put! protocol-realm-queue #(processed-messages/save processed-message))))
(defn system-message [message-id timestamp content]
{:from "system"
@ -198,12 +196,12 @@
(re-frame/reg-fx
::pending-messages-delete
(fn [message-id]
(pending-messages/delete message-id)))
(async/put! protocol-realm-queue #(pending-messages/delete message-id))))
(re-frame/reg-fx
::pending-messages-save
(fn [pending-message]
(pending-messages/save pending-message)))
(async/put! protocol-realm-queue #(pending-messages/save pending-message))))
(re-frame/reg-fx
::status-init-jail
@ -370,15 +368,15 @@
(re-frame/inject-cofx ::get-web3)
(re-frame/inject-cofx ::get-chat-groups)
(re-frame/inject-cofx ::get-pending-messages)
(re-frame/inject-cofx ::get-all-contacts)]
(fn [{:keys [db web3 groups contacts pending-messages]} [current-account-id ethereum-rpc-url]]
(re-frame/inject-cofx :get-all-contacts)]
(fn [{:keys [db web3 groups all-contacts pending-messages]} [current-account-id ethereum-rpc-url]]
(let [{:keys [public-key status updates-public-key
updates-private-key]}
(get-in db [:accounts/accounts current-account-id])]
(when public-key
{::init-whisper {:web3 web3 :public-key public-key :groups groups :pending-messages pending-messages
:updates-public-key updates-public-key :updates-private-key updates-private-key
:status status :contacts contacts}
:status status :contacts all-contacts}
:db (assoc db :web3 web3
:rpc-url (or ethereum-rpc-url constants/ethereum-rpc-url))}))))
@ -429,7 +427,7 @@
(defn- transform-protocol-message [{:keys [from to payload]}]
(merge payload {:from from
:to to
:chat-id from}))
:chat-id (or (:group-id payload) from)}))
(defn- message-from-self [{:keys [current-public-key]} {:keys [id to group-id]}]
{:from to
@ -491,11 +489,14 @@
chat-identifier (or (:group-id payload) from)
message-db-path [:chats chat-identifier :messages message-identifier]
from-id (or sent-from from)
message (get-stored-message message-identifier)]
message (or (get-in db message-db-path)
(and (get (:not-loaded-message-ids db) message-identifier)
(get-stored-message message-identifier)))]
;; proceed with updating status if chat is in db, status is not the same and message was not already seen
(when (and (get-in db [:chats chat-identifier])
(when (and message
(get-in db [:chats chat-identifier])
(not= status (get-in message [:user-statuses from-id]))
(not (chat.utils/message-seen-by? message from-id)))
(not (models.message/message-seen-by? message from-id)))
(let [statuses (assoc (:user-statuses message) from-id status)]
(cond-> {:update-message {:message-id message-identifier
:user-statuses statuses}}
@ -527,7 +528,6 @@
;; Root level "timestamp" is a unix ts in seconds.
timestamp' (or (:payload timestamp)
(* 1000 timestamp))]
(if-not existing-contact
(let [contact (assoc contact :pending? true)]
{:dispatch-n [[:add-contacts [contact]]
@ -535,7 +535,7 @@
(when-not (:pending? existing-contact)
(cond-> {:dispatch-n [[:update-chat! chat]
[:watch-contact contact]]}
(<= prev-last-updated timestamp') (update :dispatch-n concat [[:update-contact! contact]]))))))))
(<= prev-last-updated timestamp') (update :dispatch-n concat [[:update-contact! contact]]))))))))
;;GROUP

View File

@ -1,11 +1,11 @@
(ns status-im.protocol.listeners
(:require [cljs.reader :as r]
[re-frame.core :as re-frame]
[status-im.protocol.ack :as ack]
[status-im.protocol.web3.utils :as u]
[status-im.protocol.encryption :as e]
[taoensso.timbre :as log]
[status-im.utils.hex :as i]
[status-im.utils.events-buffer :as events-buffer]))
[status-im.utils.hex :as i]))
(defn empty-public-key? [public-key]
(or (= "0x0" public-key)
@ -96,5 +96,5 @@
"Valid options are: web3, identity, callback, keypair"
[options]
(fn [js-error js-message]
(events-buffer/dispatch [:handle-whisper-message js-error js-message options])))
(re-frame/dispatch [:handle-whisper-message js-error js-message options])))

View File

@ -27,7 +27,7 @@
;;;; COFX
(reg-cofx
::get-all-contacts
:get-all-contacts
(fn [coeffects _]
(assoc coeffects :all-contacts (contacts/get-all))))
@ -278,7 +278,7 @@
(register-handler-fx
:load-contacts
[(inject-cofx ::get-all-contacts)]
[(inject-cofx :get-all-contacts)]
(fn [{:keys [db all-contacts]} _]
(let [contacts-list (map #(vector (:whisper-identity %) %) all-contacts)
contacts (into {} contacts-list)]

View File

@ -161,6 +161,7 @@
:qr/qr-modal
:qr/current-qr-context
:chat/chats
:chat/deleted-chats
:chat/current-chat-id
:chat/chat-id
:chat/new-chat
@ -177,6 +178,8 @@
:chat/public-group-topic
:chat/confirmation-code-sms-listener
:chat/messages
:chat/not-loaded-message-ids
:chat/last-clock-value
:chat/loaded-chats
:chat/bot-db
:commands/access-scope->commands-responses

View File

@ -25,7 +25,7 @@
(reg-sub :discover/discoveries-with-priority
:<- [:discover/discoveries]
:<- [:chats]
:<- [:get-active-chats]
:<- [:get-contacts]
:<- [:get :current-public-key]
(fn [[discoveries chats contacts current-public-key]]
@ -88,7 +88,7 @@
(reg-sub :discover/search-results
:<- [:discover/discoveries-by-tags]
:<- [:discover/search-tags]
:<- [:chats]
:<- [:get-active-chats]
:<- [:get-contacts]
:<- [:get :current-public-key]
(fn [[discoveries search-tags chats contacts current-public-key] [_ limit]]

View File

@ -2,7 +2,7 @@
(:require [re-frame.core :as re-frame]))
(re-frame/reg-sub :home-items
:<- [:chats]
:<- [:get-active-chats]
:<- [:browsers]
(fn [[chats browsers]]
(sort-by #(-> % second :timestamp) > (merge chats browsers))))

View File

@ -22,3 +22,15 @@
(recur (conj acc v) (and (seq acc) flush?))
(async/close! output-ch))
(recur acc (seq acc)))))))
(defn task-queue
"Creates `core.async` channel which will process 0 arg functions put there in serial fashon.
Takes the same argument/s as `core.async/chan`, those arguments will be delegated to the
channel constructor.
Returns task-queue where tasks represented by 0 arg task functions can be put for processing."
[& args]
(let [task-queue (apply async/chan args)]
(async/go-loop [task-fn (async/<! task-queue)]
(task-fn)
(recur (async/<! task-queue)))
task-queue))

View File

@ -26,7 +26,7 @@
;; http://amturing.acm.org/p558-lamport.pdf
(defn send [local-clock]
(inc local-clock))
(inc (or local-clock 0)))
(defn receive [message-clock local-clock]
(inc (max message-clock local-clock)))
(inc (max (or message-clock 0) (or local-clock 0))))

View File

@ -1,28 +0,0 @@
(ns status-im.utils.events-buffer
(:require [cljs.core.async :as async]
[re-frame.core :as re-frame])
(:require-macros [cljs.core.async.macros :refer [go-loop]]))
;; NOTE:(dmitryn) Ideally we should not exceed current buffer size.
;; Buffer length is an experimental number, consider to change it.
(defonce ^:private buffer (async/chan 10000))
;; NOTE:(dmitryn) Reference to re-frame event loop mechanism
;; https://github.com/Day8/re-frame/blob/master/src/re_frame/router.cljc#L8
;; Might need future improvements.
;; "Fast" events could be processed in batches to speed up things,
;; so multiple buffers/channels could be introduced.
(defn- start-loop! [c t]
"Dispatches events to re-frame processing queue,
but in a way that doesn't block events processing."
(go-loop [e (async/<! c)]
(re-frame/dispatch e)
(async/<! (async/timeout t))
(recur (async/<! c))))
(defonce ^:private dispatch-loop (start-loop! buffer 0))
;; Accepts re-frame event vector [:event-id args]
;; NOTE(dmitryn) Puts all events into a single buffer (naive approach).
(defn dispatch [event]
(async/put! buffer event))

View File

@ -79,7 +79,7 @@
(rf/reg-fx :save-chat #())
(rf/reg-cofx
::contacts-events/get-all-contacts
:get-all-contacts
(fn [coeffects _]
(assoc coeffects :all-contacts [])))