* Race condition in pubsub #469 * use allFinished * improve cancellation handling
This commit is contained in:
parent
a1a5f9abac
commit
6c2e743ff3
|
@ -169,15 +169,22 @@ proc getOrCreatePeer*(
|
||||||
proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.async.} =
|
proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.async.} =
|
||||||
if topic notin p.topics: return # Not subscribed
|
if topic notin p.topics: return # Not subscribed
|
||||||
|
|
||||||
for h in p.topics[topic].handler:
|
# gather all futures without yielding to scheduler
|
||||||
trace "triggering handler", topicID = topic
|
var futs = p.topics[topic].handler.mapIt(it(topic, data))
|
||||||
try:
|
|
||||||
await h(topic, data)
|
try:
|
||||||
except CancelledError as exc:
|
futs = await allFinished(futs)
|
||||||
raise exc
|
except CancelledError:
|
||||||
except CatchableError as exc:
|
# propagate cancellation
|
||||||
# Handlers should never raise exceptions
|
for fut in futs:
|
||||||
warn "Error in topic handler", msg = exc.msg
|
if not(fut.finished):
|
||||||
|
fut.cancel()
|
||||||
|
|
||||||
|
# check for errors in futures
|
||||||
|
for fut in futs:
|
||||||
|
if fut.failed:
|
||||||
|
let err = fut.readError()
|
||||||
|
warn "Error in topic handler", msg = err.msg
|
||||||
|
|
||||||
method handleConn*(p: PubSub,
|
method handleConn*(p: PubSub,
|
||||||
conn: Connection,
|
conn: Connection,
|
||||||
|
|
Loading…
Reference in New Issue