Fix concurrent retry-request
Sometimes it happens that the expired signal is received while the there's a new request in flight. This happens in cases such as: 1) We send a request (A) 2) We get disconnected from the mailserver 3) We connect to a new mailserver 4) We send a request (B) 5) We receive an expired signal for A In such cases the request should not be retried or counted as a failure. Signed-off-by: Igor Mandrigin <i@mandrigin.ru>
This commit is contained in:
parent
75294302ff
commit
edf810444d
|
@ -434,6 +434,11 @@
|
||||||
(fn [cofx [_ error]]
|
(fn [cofx [_ error]]
|
||||||
(mailserver/handle-request-error cofx error)))
|
(mailserver/handle-request-error cofx error)))
|
||||||
|
|
||||||
|
(handlers/register-handler-fx
|
||||||
|
:mailserver.callback/request-success
|
||||||
|
(fn [cofx [_ request-id]]
|
||||||
|
(mailserver/handle-request-success cofx request-id)))
|
||||||
|
|
||||||
;; network module
|
;; network module
|
||||||
|
|
||||||
(handlers/register-handler-fx
|
(handlers/register-handler-fx
|
||||||
|
|
|
@ -262,6 +262,10 @@
|
||||||
(log/debug "Adjusting mailserver request" "from:" from "adjusted-from:" adjusted-from)
|
(log/debug "Adjusting mailserver request" "from:" from "adjusted-from:" adjusted-from)
|
||||||
adjusted-from))
|
adjusted-from))
|
||||||
|
|
||||||
|
(fx/defn handle-request-success [{:keys [db]} request-id]
|
||||||
|
(when (:mailserver/current-request db)
|
||||||
|
{:db (assoc-in db [:mailserver/current-request :request-id] request-id)}))
|
||||||
|
|
||||||
(defn request-messages! [web3 {:keys [sym-key-id address]} {:keys [topics cursor to from] :as request}]
|
(defn request-messages! [web3 {:keys [sym-key-id address]} {:keys [topics cursor to from] :as request}]
|
||||||
;; Add some room to from, unless we break day boundaries so that messages that have
|
;; Add some room to from, unless we break day boundaries so that messages that have
|
||||||
;; been received after the last request are also fetched
|
;; been received after the last request are also fetched
|
||||||
|
@ -288,10 +292,12 @@
|
||||||
:to to})
|
:to to})
|
||||||
(fn [error request-id]
|
(fn [error request-id]
|
||||||
(if-not error
|
(if-not error
|
||||||
|
(do
|
||||||
(log/info "mailserver: messages request success for topic " topics "from" from "to" to)
|
(log/info "mailserver: messages request success for topic " topics "from" from "to" to)
|
||||||
|
(re-frame/dispatch [:mailserver.callback/request-success {:request-id request-id}]))
|
||||||
(do
|
(do
|
||||||
(log/error "mailserver: messages request error for topic " topics ": " error)
|
(log/error "mailserver: messages request error for topic " topics ": " error)
|
||||||
(utils/set-timeout #(re-frame/dispatch [:mailserver.callback/resend-request {:request-id "failed-request"}])
|
(utils/set-timeout #(re-frame/dispatch [:mailserver.callback/resend-request {:request-id nil}])
|
||||||
backoff-interval-ms)))))))
|
backoff-interval-ms)))))))
|
||||||
|
|
||||||
(re-frame/reg-fx
|
(re-frame/reg-fx
|
||||||
|
@ -575,14 +581,25 @@
|
||||||
|
|
||||||
(fx/defn resend-request
|
(fx/defn resend-request
|
||||||
[{:keys [db] :as cofx} {:keys [request-id]}]
|
[{:keys [db] :as cofx} {:keys [request-id]}]
|
||||||
(if (and (:mailserver/current-request db)
|
(let [current-request (:mailserver/current-request db)]
|
||||||
(<= maximum-number-of-attempts
|
;; no inflight request, do nothing
|
||||||
(get-in db [:mailserver/current-request :attempts])))
|
(when (and current-request
|
||||||
|
;; the request was never successful
|
||||||
|
(or (nil? request-id)
|
||||||
|
;; we haven't received the request-id yet, but has expired,
|
||||||
|
;; so we retry even though we are not sure it's the current
|
||||||
|
;; request that failed
|
||||||
|
(nil? (:request-id current-request))
|
||||||
|
;; this is the same request that we are currently processing
|
||||||
|
(= request-id (:request-id current-request))))
|
||||||
|
|
||||||
|
(if (<= maximum-number-of-attempts
|
||||||
|
(:attempts current-request))
|
||||||
(fx/merge cofx
|
(fx/merge cofx
|
||||||
{:db (update db :mailserver/current-request dissoc :attempts)}
|
{:db (update db :mailserver/current-request dissoc :attempts)}
|
||||||
(change-mailserver))
|
(change-mailserver))
|
||||||
(if-let [mailserver (get-mailserver-when-ready cofx)]
|
(if-let [mailserver (get-mailserver-when-ready cofx)]
|
||||||
(let [{:keys [topics from to cursor limit] :as request} (get db :mailserver/current-request)
|
(let [{:keys [topics from to cursor limit] :as request} current-request
|
||||||
web3 (:web3 db)]
|
web3 (:web3 db)]
|
||||||
(log/info "mailserver: message request " request-id "expired for mailserver topic" topics "from" from "to" to "cursor" cursor "limit" (decrease-limit))
|
(log/info "mailserver: message request " request-id "expired for mailserver topic" topics "from" from "to" to "cursor" cursor "limit" (decrease-limit))
|
||||||
{:db (update-in db [:mailserver/current-request :attempts] inc)
|
{:db (update-in db [:mailserver/current-request :attempts] inc)
|
||||||
|
@ -590,7 +607,7 @@
|
||||||
:mailserver/request-messages {:web3 web3
|
:mailserver/request-messages {:web3 web3
|
||||||
:mailserver mailserver
|
:mailserver mailserver
|
||||||
:request (assoc request :limit (decrease-limit))}})
|
:request (assoc request :limit (decrease-limit))}})
|
||||||
{:mailserver/set-limit (decrease-limit)})))
|
{:mailserver/set-limit (decrease-limit)})))))
|
||||||
|
|
||||||
(fx/defn initialize-mailserver
|
(fx/defn initialize-mailserver
|
||||||
[cofx custom-mailservers]
|
[cofx custom-mailservers]
|
||||||
|
|
|
@ -339,6 +339,42 @@
|
||||||
[{:id "mailserver-id" :enode "enode://mailserver-id@ip"}]
|
[{:id "mailserver-id" :enode "enode://mailserver-id@ip"}]
|
||||||
[])))
|
[])))
|
||||||
|
|
||||||
|
(deftest test-resend-request
|
||||||
|
(testing "there's no current request"
|
||||||
|
(is (not (mailserver/resend-request {:db {}} {}))))
|
||||||
|
(testing "there's a current request"
|
||||||
|
(testing "it reached the maximum number of attempts"
|
||||||
|
(testing "it changes mailserver"
|
||||||
|
(is (= :connecting
|
||||||
|
(get-in (mailserver/resend-request
|
||||||
|
{:db {:mailserver/current-request
|
||||||
|
{:attempts mailserver/maximum-number-of-attempts}}}
|
||||||
|
{})
|
||||||
|
[:db :mailserver/state])))))
|
||||||
|
(testing "it did not reach the maximum number of attempts"
|
||||||
|
(testing "it reached the maximum number of attempts"
|
||||||
|
(testing "it decrease the limit")
|
||||||
|
(is (= {:mailserver/set-limit 1000} (mailserver/resend-request {:db {:mailserver/current-request
|
||||||
|
{}}}
|
||||||
|
{})))))))
|
||||||
|
|
||||||
|
(deftest test-resend-request-request-id
|
||||||
|
(testing "request-id passed is nil"
|
||||||
|
(testing "it resends the request"
|
||||||
|
(is (mailserver/resend-request {:db {:mailserver/current-request {}}} {}))))
|
||||||
|
(testing "request-id is nil in db"
|
||||||
|
(testing "it resends the request"
|
||||||
|
(is (mailserver/resend-request {:db {:mailserver/current-request {}}}
|
||||||
|
{:request-id "a"}))))
|
||||||
|
(testing "request id matches"
|
||||||
|
(testing "it resends the request"
|
||||||
|
(is (mailserver/resend-request {:db {:mailserver/current-request {:request-id "a"}}}
|
||||||
|
{:request-id "a"}))))
|
||||||
|
(testing "request id does not match"
|
||||||
|
(testing "it does not resend the request"
|
||||||
|
(is (not (mailserver/resend-request {:db {:mailserver/current-request {:request-id "a"}}}
|
||||||
|
{:request-id "b"}))))))
|
||||||
|
|
||||||
(deftest peers-summary-change
|
(deftest peers-summary-change
|
||||||
(testing "Mailserver added, sym-key doesn't exist"
|
(testing "Mailserver added, sym-key doesn't exist"
|
||||||
(let [result (peers-summary-change-result false true false)]
|
(let [result (peers-summary-change-result false true false)]
|
||||||
|
|
Loading…
Reference in New Issue