diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 9901fbd7e..e7c0844a4 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -169,15 +169,22 @@ proc getOrCreatePeer*( proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.async.} = if topic notin p.topics: return # Not subscribed - for h in p.topics[topic].handler: - trace "triggering handler", topicID = topic - try: - await h(topic, data) - except CancelledError as exc: - raise exc - except CatchableError as exc: - # Handlers should never raise exceptions - warn "Error in topic handler", msg = exc.msg + # gather all futures without yielding to scheduler + var futs = p.topics[topic].handler.mapIt(it(topic, data)) + + try: + futs = await allFinished(futs) + except CancelledError: + # propagate cancellation + for fut in futs: + if not(fut.finished): + fut.cancel() + + # check for errors in futures + for fut in futs: + if fut.failed: + let err = fut.readError() + warn "Error in topic handler", msg = err.msg method handleConn*(p: PubSub, conn: Connection,