From 6c2e743ff37e4ab75867f7d5f8ffdbaa4f9e2bf1 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Date: Sat, 19 Dec 2020 00:56:46 +0900 Subject: [PATCH] Race condition in pubsub #469 (#471) * Race condition in pubsub #469 * use allFinished * improve cancellation handling --- libp2p/protocols/pubsub/pubsub.nim | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 9901fbd7e..e7c0844a4 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -169,15 +169,22 @@ proc getOrCreatePeer*( proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.async.} = if topic notin p.topics: return # Not subscribed - for h in p.topics[topic].handler: - trace "triggering handler", topicID = topic - try: - await h(topic, data) - except CancelledError as exc: - raise exc - except CatchableError as exc: - # Handlers should never raise exceptions - warn "Error in topic handler", msg = exc.msg + # gather all futures without yielding to scheduler + var futs = p.topics[topic].handler.mapIt(it(topic, data)) + + try: + futs = await allFinished(futs) + except CancelledError: + # propagate cancellation + for fut in futs: + if not(fut.finished): + fut.cancel() + + # check for errors in futures + for fut in futs: + if fut.failed: + let err = fut.readError() + warn "Error in topic handler", msg = err.msg method handleConn*(p: PubSub, conn: Connection,