Use blocking puts

This commit is contained in:
janherich 2018-02-06 15:43:56 +01:00
parent 0b0405d9a5
commit c7e34ed122
No known key found for this signature in database
GPG Key ID: C23B473AFBE94D13
3 changed files with 17 additions and 16 deletions

View File

@ -70,24 +70,24 @@
;;;; Effects
(def ^:private realm-queue (async-utils/task-queue 200))
(def ^:private realm-queue (async-utils/task-queue 2000))
(re-frame/reg-fx
:update-message
(fn [message]
(async/put! realm-queue #(messages-store/update-message message))))
(async/go (async/>! realm-queue #(messages-store/update-message message)))))
(re-frame/reg-fx
:save-message
(fn [message]
(async/put! realm-queue #(messages-store/save message))))
(async/go (async/>! realm-queue #(messages-store/save message)))))
(re-frame/reg-fx
:delete-chat-messages
(fn [{:keys [chat-id group-chat debug?]}]
(when (or group-chat debug?)
(async/put! realm-queue #(messages-store/delete-by-chat-id chat-id)))
(async/put! realm-queue #(pending-messages-store/delete-all-by-chat-id chat-id))))
(async/go (async/>! realm-queue #(messages-store/delete-by-chat-id chat-id))))
(async/go (async/>! realm-queue #(pending-messages-store/delete-all-by-chat-id chat-id)))))
(re-frame/reg-fx
:update-message-overhead
@ -95,19 +95,19 @@
(let [update-fn (if (= network-status :offline)
chats-store/inc-message-overhead
chats-store/reset-message-overhead)]
(async/put! realm-queue #(update-fn chat-id)))))
(async/go (async/>! realm-queue #(update-fn chat-id))))))
(re-frame/reg-fx
:save-chat
(fn [chat]
(async/put! realm-queue #(chats-store/save chat))))
(async/go (async/>! realm-queue #(chats-store/save chat)))))
(re-frame/reg-fx
:delete-chat
(fn [{:keys [chat-id debug?]}]
(if debug?
(async/put! realm-queue #(chats-store/delete chat-id))
(async/put! realm-queue #(chats-store/set-inactive chat-id)))))
(async/go (async/>! realm-queue #(chats-store/delete chat-id)))
(async/go (async/>! realm-queue #(chats-store/set-inactive chat-id))))))
(re-frame/reg-fx
:save-all-contacts

View File

@ -70,7 +70,7 @@
;;;; FX
(def ^:private protocol-realm-queue (async-utils/task-queue 200))
(def ^:private protocol-realm-queue (async-utils/task-queue 2000))
(re-frame/reg-fx
:stop-whisper
@ -116,7 +116,7 @@
(re-frame/reg-fx
::save-processed-messages
(fn [processed-message]
(async/put! protocol-realm-queue #(processed-messages/save processed-message))))
(async/go (async/>! protocol-realm-queue #(processed-messages/save processed-message)))))
(defn system-message [message-id timestamp content]
{:from "system"
@ -196,12 +196,12 @@
(re-frame/reg-fx
::pending-messages-delete
(fn [message-id]
(async/put! protocol-realm-queue #(pending-messages/delete message-id))))
(async/go (async/>! protocol-realm-queue #(pending-messages/delete message-id)))))
(re-frame/reg-fx
::pending-messages-save
(fn [pending-message]
(async/put! protocol-realm-queue #(pending-messages/save pending-message))))
(async/go (async/>! protocol-realm-queue #(pending-messages/save pending-message)))))
(re-frame/reg-fx
::status-init-jail

View File

@ -53,7 +53,8 @@
(defonce pending-message-callback (atom nil))
(defonce recipient->pending-message (atom {}))
(def ^:private pending-message-queue (async/chan 100))
;; Buffer needs to be big enough to not block even with many outbound messages
(def ^:private pending-message-queue (async/chan 2000))
(async/go-loop [[web3 {:keys [type message-id requires-ack? to ack?] :as message}]
(async/<! pending-message-queue)]
@ -89,7 +90,7 @@
{:pre [(valid? :protocol/message message)]}
(debug :add-pending-message! message)
;; encryption can take some time, better to run asynchronously
(async/put! pending-message-queue [web3 message]))
(async/go (async/>! pending-message-queue [web3 message])))
(s/def :delivery/pending-message
(s/keys :req-un [:message/sig :message/to :shh/payload :payload/ack? ::id