From 48a3ac06ff9c4d84f8debd3aefad25d4ab08af26 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Tue, 12 Mar 2024 21:05:53 +0100 Subject: [PATCH] `{.async: (raises).}` for `MultistreamSelect` (#1066) --- libp2p/multistream.nim | 110 ++++++++++++------ libp2p/protocols/connectivity/relay/relay.nim | 18 ++- libp2p/protocols/protocol.nim | 17 ++- libp2p/protocols/pubsub/gossipsub.nim | 20 +++- libp2p/protocols/rendezvous.nim | 18 ++- 5 files changed, 131 insertions(+), 52 deletions(-) diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index b342e00d6..155c9c1da 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -45,15 +45,18 @@ proc new*(T: typedesc[MultistreamSelect]): T = ) template validateSuffix(str: string): untyped = - if str.endsWith("\n"): - str.removeSuffix("\n") - else: - raise newException(MultiStreamError, "MultistreamSelect failed, malformed message") + if str.endsWith("\n"): + str.removeSuffix("\n") + else: + raise (ref MultiStreamError)(msg: + "MultistreamSelect failed, malformed message") -proc select*(_: MultistreamSelect | type MultistreamSelect, - conn: Connection, - proto: seq[string]): - Future[string] {.async.} = +proc select*( + _: MultistreamSelect | type MultistreamSelect, + conn: Connection, + proto: seq[string] +): Future[string] {.async: (raises: [ + CancelledError, LPStreamError, MultiStreamError]).} = trace "initiating handshake", conn, codec = Codec ## select a remote protocol await conn.writeLp(Codec & "\n") # write handshake @@ -66,7 +69,7 @@ proc select*(_: MultistreamSelect | type MultistreamSelect, if s != Codec: notice "handshake failed", conn, codec = s - raise newException(MultiStreamError, "MultistreamSelect handshake failed") + raise (ref MultiStreamError)(msg: "MultistreamSelect handshake failed") else: trace "multistream handshake success", conn @@ -98,19 +101,29 @@ proc select*(_: MultistreamSelect | type MultistreamSelect, # No alternatives, fail return "" -proc select*(_: MultistreamSelect | type MultistreamSelect, - conn: Connection, - proto: string): Future[bool] {.async.} = +proc select*( + _: MultistreamSelect | type MultistreamSelect, + conn: Connection, + proto: string +): Future[bool] {.async: (raises: [ + CancelledError, LPStreamError, MultiStreamError]).} = if proto.len > 0: - return (await MultistreamSelect.select(conn, @[proto])) == proto + (await MultistreamSelect.select(conn, @[proto])) == proto else: - return (await MultistreamSelect.select(conn, @[])) == Codec + (await MultistreamSelect.select(conn, @[])) == Codec -proc select*(m: MultistreamSelect, conn: Connection): Future[bool] = +proc select*( + m: MultistreamSelect, + conn: Connection +): Future[bool] {.async: (raises: [ + CancelledError, LPStreamError, MultiStreamError], raw: true).} = m.select(conn, "") -proc list*(m: MultistreamSelect, - conn: Connection): Future[seq[string]] {.async.} = +proc list*( + m: MultistreamSelect, + conn: Connection +): Future[seq[string]] {.async: (raises: [ + CancelledError, LPStreamError, MultiStreamError]).} = ## list remote protos requests on connection if not await m.select(conn): return @@ -126,12 +139,13 @@ proc list*(m: MultistreamSelect, result = list proc handle*( - _: type MultistreamSelect, - conn: Connection, - protos: seq[string], - matchers = newSeq[Matcher](), - active: bool = false, - ): Future[string] {.async.} = + _: type MultistreamSelect, + conn: Connection, + protos: seq[string], + matchers = newSeq[Matcher](), + active: bool = false +): Future[string] {.async: (raises: [ + CancelledError, LPStreamError, MultiStreamError]).} = trace "Starting multistream negotiation", conn, handshaked = active var handshaked = active while not conn.atEof: @@ -140,8 +154,8 @@ proc handle*( if not handshaked and ms != Codec: debug "expected handshake message", conn, instead=ms - raise newException(CatchableError, - "MultistreamSelect handling failed, invalid first message") + raise (ref MultiStreamError)(msg: + "MultistreamSelect handling failed, invalid first message") trace "handle: got request", conn, ms if ms.len() <= 0: @@ -172,13 +186,16 @@ proc handle*( trace "no handlers", conn, protocol = ms await conn.writeLp(Na) -proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.async.} = +proc handle*( + m: MultistreamSelect, + conn: Connection, + active: bool = false) {.async: (raises: [CancelledError]).} = trace "Starting multistream handler", conn, handshaked = active var protos: seq[string] matchers: seq[Matcher] for h in m.handlers: - if not isNil(h.match): + if h.match != nil: matchers.add(h.match) for proto in h.protos: protos.add(proto) @@ -186,12 +203,13 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy try: let ms = await MultistreamSelect.handle(conn, protos, matchers, active) for h in m.handlers: - if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms): + if (h.match != nil and h.match(ms)) or h.protos.contains(ms): trace "found handler", conn, protocol = ms var protocolHolder = h let maxIncomingStreams = protocolHolder.protocol.maxIncomingStreams - if protocolHolder.openedStreams.getOrDefault(conn.peerId) >= maxIncomingStreams: + if protocolHolder.openedStreams.getOrDefault(conn.peerId) >= + maxIncomingStreams: debug "Max streams for protocol reached, blocking new stream", conn, protocol = ms, maxIncomingStreams return @@ -242,8 +260,32 @@ proc addHandler*(m: MultistreamSelect, protocol: protocol, match: matcher)) -proc start*(m: MultistreamSelect) {.async.} = - await allFutures(m.handlers.mapIt(it.protocol.start())) +proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} = + let + handlers = m.handlers + futs = handlers.mapIt(it.protocol.start()) + try: + await allFutures(futs) + for fut in futs: + await fut + except CancelledError as exc: + var pending: seq[Future[void].Raising([])] + for i, fut in futs: + if not fut.finished: + pending.add noCancel fut.cancelAndWait() + elif fut.completed: + pending.add handlers[i].protocol.stop() + else: + static: doAssert typeof(fut).E is (CancelledError,) + await noCancel allFutures(pending) + raise exc -proc stop*(m: MultistreamSelect) {.async.} = - await allFutures(m.handlers.mapIt(it.protocol.stop())) + +proc stop*(m: MultistreamSelect) {.async: (raises: []).} = + # Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([CancelledError])` + var futs = newSeqOfCap[Future[void].Raising([])](m.handlers.len) + for it in m.handlers: + futs.add it.protocol.stop() + await noCancel allFutures(futs) + for fut in futs: + await fut diff --git a/libp2p/protocols/connectivity/relay/relay.nim b/libp2p/protocols/connectivity/relay/relay.nim index b078cdb46..6fa9894f8 100644 --- a/libp2p/protocols/connectivity/relay/relay.nim +++ b/libp2p/protocols/connectivity/relay/relay.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -361,17 +361,25 @@ proc deletesReservation(r: Relay) {.async.} = if n > r.rsvp[k]: r.rsvp.del(k) -method start*(r: Relay) {.async.} = +method start*( + r: Relay +): Future[void] {.async: (raises: [CancelledError], raw: true).} = + let fut = newFuture[void]() + fut.complete() if not r.reservationLoop.isNil: warn "Starting relay twice" - return + return fut r.reservationLoop = r.deletesReservation() r.started = true + fut -method stop*(r: Relay) {.async.} = +method stop*(r: Relay): Future[void] {.async: (raises: [], raw: true).} = + let fut = newFuture[void]() + fut.complete() if r.reservationLoop.isNil: warn "Stopping relay without starting it" - return + return fut r.started = false r.reservationLoop.cancel() r.reservationLoop = nil + fut diff --git a/libp2p/protocols/protocol.nim b/libp2p/protocols/protocol.nim index cb328849d..1fd2eb9a1 100644 --- a/libp2p/protocols/protocol.nim +++ b/libp2p/protocols/protocol.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -31,8 +31,19 @@ type maxIncomingStreams: Opt[int] method init*(p: LPProtocol) {.base, gcsafe.} = discard -method start*(p: LPProtocol) {.async, base.} = p.started = true -method stop*(p: LPProtocol) {.async, base.} = p.started = false + +method start*( + p: LPProtocol) {.async: (raises: [CancelledError], raw: true), base.} = + let fut = newFuture[void]() + fut.complete() + p.started = true + fut + +method stop*(p: LPProtocol) {.async: (raises: [], raw: true), base.} = + let fut = newFuture[void]() + fut.complete() + p.started = false + fut proc maxIncomingStreams*(p: LPProtocol): int = p.maxIncomingStreams.get(DefaultMaxIncomingStreams) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 59ad4376d..079808129 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -701,30 +701,40 @@ proc maintainDirectPeers(g: GossipSub) {.async.} = for id, addrs in g.parameters.directPeers: await g.addDirectPeer(id, addrs) -method start*(g: GossipSub) {.async.} = +method start*( + g: GossipSub +): Future[void] {.async: (raises: [CancelledError], raw: true).} = + let fut = newFuture[void]() + fut.complete() + trace "gossipsub start" if not g.heartbeatFut.isNil: warn "Starting gossipsub twice" - return + return fut g.heartbeatFut = g.heartbeat() g.scoringHeartbeatFut = g.scoringHeartbeat() g.directPeersLoop = g.maintainDirectPeers() g.started = true + fut + +method stop*(g: GossipSub): Future[void] {.async: (raises: [], raw: true).} = + let fut = newFuture[void]() + fut.complete() -method stop*(g: GossipSub) {.async.} = trace "gossipsub stop" g.started = false if g.heartbeatFut.isNil: warn "Stopping gossipsub without starting it" - return + return fut # stop heartbeat interval g.directPeersLoop.cancel() g.scoringHeartbeatFut.cancel() g.heartbeatFut.cancel() g.heartbeatFut = nil + fut method initPubSub*(g: GossipSub) {.raises: [InitializationError].} = diff --git a/libp2p/protocols/rendezvous.nim b/libp2p/protocols/rendezvous.nim index 4bacb487d..f638a6b39 100644 --- a/libp2p/protocols/rendezvous.nim +++ b/libp2p/protocols/rendezvous.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -678,17 +678,25 @@ proc deletesRegister(rdv: RendezVous) {.async.} = libp2p_rendezvous_registered.set(int64(total)) libp2p_rendezvous_namespaces.set(int64(rdv.namespaces.len)) -method start*(rdv: RendezVous) {.async.} = +method start*( + rdv: RendezVous +): Future[void] {.async: (raises: [CancelledError], raw: true).} = + let fut = newFuture[void]() + fut.complete() if not rdv.registerDeletionLoop.isNil: warn "Starting rendezvous twice" - return + return fut rdv.registerDeletionLoop = rdv.deletesRegister() rdv.started = true + fut -method stop*(rdv: RendezVous) {.async.} = +method stop*(rdv: RendezVous): Future[void] {.async: (raises: [], raw: true).} = + let fut = newFuture[void]() + fut.complete() if rdv.registerDeletionLoop.isNil: warn "Stopping rendezvous without starting it" - return + return fut rdv.started = false rdv.registerDeletionLoop.cancel() rdv.registerDeletionLoop = nil + fut