From 9dc25c77db8919d5d594a2dcb91fcf67bf3aa1b5 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Tue, 12 Mar 2024 21:27:56 +0100 Subject: [PATCH] X --- libp2p/dial.nim | 72 ++++++++++++----------- libp2p/dialer.nim | 104 +++++++++++++++++----------------- libp2p/peerstore.nim | 46 +++++++-------- libp2p/protocols/identify.nim | 94 +++++++++++++++++------------- libp2p/switch.nim | 87 ++++++++++++++-------------- tests/stubs/switchstub.nim | 38 +++++++------ 6 files changed, 232 insertions(+), 209 deletions(-) diff --git a/libp2p/dial.nim b/libp2p/dial.nim index 5d46cf5fc..fad1af5fb 100644 --- a/libp2p/dial.nim +++ b/libp2p/dial.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -21,56 +21,60 @@ type Dial* = ref object of RootObj method connect*( - self: Dial, - peerId: PeerId, - addrs: seq[MultiAddress], - forceDial = false, - reuseConnection = true, - dir = Direction.Out) {.async, base.} = + self: Dial, + peerId: PeerId, + addrs: seq[MultiAddress], + forceDial = false, + reuseConnection = true, + dir = Direction.Out +) {.async: (raises: [CancelledError, LPError], raw: true), base.} = ## connect remote peer without negotiating ## a protocol ## - - doAssert(false, "Not implemented!") + raiseAssert("Not implemented!") method connect*( - self: Dial, - address: MultiAddress, - allowUnknownPeerId = false): Future[PeerId] {.async, base.} = + self: Dial, + address: MultiAddress, + allowUnknownPeerId = false +): Future[PeerId] {.async: (raises: [ + CancelledError, LPError], raw: true), base.} = ## Connects to a peer and retrieve its PeerId - - doAssert(false, "Not implemented!") + raiseAssert("Not implemented!") method dial*( - self: Dial, - peerId: PeerId, - protos: seq[string], - ): Future[Connection] {.async, base.} = + self: Dial, + peerId: PeerId, + protos: seq[string], +): Future[Connection] {.async: (raises: [ + CancelledError, LPError], raw: true), base.} = ## create a protocol stream over an ## existing connection ## - - doAssert(false, "Not implemented!") + raiseAssert("Not implemented!") method dial*( - self: Dial, - peerId: PeerId, - addrs: seq[MultiAddress], - protos: seq[string], - forceDial = false): Future[Connection] {.async, base.} = + self: Dial, + peerId: PeerId, + addrs: seq[MultiAddress], + protos: seq[string], + forceDial = false +): Future[Connection] {.async: (raises: [ + CancelledError, LPError], raw: true), base.} = ## create a protocol stream and establish ## a connection if one doesn't exist already ## - - doAssert(false, "Not implemented!") + raiseAssert("Not implemented!") method addTransport*( - self: Dial, - transport: Transport) {.base.} = - doAssert(false, "Not implemented!") + self: Dial, + transport: Transport) {.base.} = + raiseAssert("Not implemented!") method tryDial*( - self: Dial, - peerId: PeerId, - addrs: seq[MultiAddress]): Future[Opt[MultiAddress]] {.async, base.} = - doAssert(false, "Not implemented!") + self: Dial, + peerId: PeerId, + addrs: seq[MultiAddress] +): Future[Opt[MultiAddress]] {.async: (raises: [ + CancelledError, LPError], raw: true), base.} = + raiseAssert("Not implemented!") diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 986f4e370..0e7496f7e 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -49,13 +49,11 @@ type nameResolver: NameResolver proc dialAndUpgrade( - self: Dialer, - peerId: Opt[PeerId], - hostname: string, - address: MultiAddress, - dir = Direction.Out): - Future[Muxer] {.async.} = - + self: Dialer, + peerId: Opt[PeerId], + hostname: string, + address: MultiAddress, + dir = Direction.Out): Future[Muxer] {.async.} = for transport in self.transports: # for each transport if transport.handles(address): # check if it can dial it trace "Dialing address", address, peerId = peerId.get(default(PeerId)), hostname @@ -163,18 +161,19 @@ proc tryReusingConnection(self: Dialer, peerId: PeerId): Opt[Muxer] = return Opt.some(muxer) proc internalConnect( - self: Dialer, - peerId: Opt[PeerId], - addrs: seq[MultiAddress], - forceDial: bool, - reuseConnection = true, - dir = Direction.Out): - Future[Muxer] {.async.} = + self: Dialer, + peerId: Opt[PeerId], + addrs: seq[MultiAddress], + forceDial: bool, + reuseConnection = true, + dir = Direction.Out +): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = if Opt.some(self.localPeerId) == peerId: - raise newException(CatchableError, "can't dial self!") + raise newException(DialFailedError, "can't dial self!") # Ensure there's only one in-flight attempt per peer - let lock = self.dialLock.mgetOrPut(peerId.get(default(PeerId)), newAsyncLock()) + let lock = self.dialLock.mgetOrPut( + peerId.get(default(PeerId)), newAsyncLock()) try: await lock.acquire() @@ -197,7 +196,11 @@ proc internalConnect( try: self.connManager.storeMuxer(muxed) await self.peerStore.identify(muxed) - except CatchableError as exc: + except CancelledError as exc: + trace "Failed to finish outgoung upgrade", err=exc.msg + await muxed.close() + raise exc + except LPError as exc: trace "Failed to finish outgoung upgrade", err=exc.msg await muxed.close() raise exc @@ -208,27 +211,27 @@ proc internalConnect( lock.release() method connect*( - self: Dialer, - peerId: PeerId, - addrs: seq[MultiAddress], - forceDial = false, - reuseConnection = true, - dir = Direction.Out) {.async.} = + self: Dialer, + peerId: PeerId, + addrs: seq[MultiAddress], + forceDial = false, + reuseConnection = true, + dir = Direction.Out) {.async: (raises: [CancelledError, LPError]).} = ## connect remote peer without negotiating ## a protocol ## - if self.connManager.connCount(peerId) > 0 and reuseConnection: return - discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection, dir) + discard await self.internalConnect( + Opt.some(peerId), addrs, forceDial, reuseConnection, dir) method connect*( - self: Dialer, - address: MultiAddress, - allowUnknownPeerId = false): Future[PeerId] {.async.} = + self: Dialer, + address: MultiAddress, + allowUnknownPeerId = false +): Future[PeerId] {.async: (raises: [CancelledError, LPError]).} = ## Connects to a peer and retrieve its PeerId - parseFullAddress(address).toOpt().withValue(fullAddress): return (await self.internalConnect( Opt.some(fullAddress[0]), @@ -256,14 +259,14 @@ proc negotiateStream( return conn method tryDial*( - self: Dialer, - peerId: PeerId, - addrs: seq[MultiAddress]): Future[Opt[MultiAddress]] {.async.} = + self: Dialer, + peerId: PeerId, + addrs: seq[MultiAddress] +): Future[Opt[MultiAddress]] {.async: (raises: [CancelledError, LPError]).} = ## Create a protocol stream in order to check ## if a connection is possible. ## Doesn't use the Connection Manager to save it. ## - trace "Check if it can dial", peerId, addrs try: let mux = await self.dialAndUpgrade(Opt.some(peerId), addrs) @@ -277,13 +280,13 @@ method tryDial*( raise newException(DialFailedError, exc.msg) method dial*( - self: Dialer, - peerId: PeerId, - protos: seq[string]): Future[Connection] {.async.} = + self: Dialer, + peerId: PeerId, + protos: seq[string] +): Future[Connection] {.async: (raises: [CancelledError, LPError]).} = ## create a protocol stream over an ## existing connection ## - trace "Dialing (existing)", peerId, protos let stream = await self.connManager.getStream(peerId) if stream.isNil: @@ -292,15 +295,15 @@ method dial*( return await self.negotiateStream(stream, protos) method dial*( - self: Dialer, - peerId: PeerId, - addrs: seq[MultiAddress], - protos: seq[string], - forceDial = false): Future[Connection] {.async.} = + self: Dialer, + peerId: PeerId, + addrs: seq[MultiAddress], + protos: seq[string], + forceDial = false +): Future[Connection] {.async: (raises: [CancelledError, LPError]).} = ## create a protocol stream and establish ## a connection if one doesn't exist already ## - var conn: Muxer stream: Connection @@ -336,13 +339,12 @@ method addTransport*(self: Dialer, t: Transport) = self.transports &= t proc new*( - T: type Dialer, - localPeerId: PeerId, - connManager: ConnManager, - peerStore: PeerStore, - transports: seq[Transport], - nameResolver: NameResolver = nil): Dialer = - + T: type Dialer, + localPeerId: PeerId, + connManager: ConnManager, + peerStore: PeerStore, + transports: seq[Transport], + nameResolver: NameResolver = nil): Dialer = T(localPeerId: localPeerId, connManager: connManager, transports: transports, diff --git a/libp2p/peerstore.nim b/libp2p/peerstore.nim index 41698b75d..8f8b62afc 100644 --- a/libp2p/peerstore.nim +++ b/libp2p/peerstore.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -76,7 +76,10 @@ type capacity*: int toClean*: seq[PeerId] -proc new*(T: type PeerStore, identify: Identify, capacity = 1000): PeerStore {.public.} = +proc new*( + T: type PeerStore, + identify: Identify, + capacity = 1000): PeerStore {.public.} = T( identify: identify, capacity: capacity @@ -86,26 +89,21 @@ proc new*(T: type PeerStore, identify: Identify, capacity = 1000): PeerStore {.p # Generic Peer Book API # ######################### -proc `[]`*[T](peerBook: PeerBook[T], - peerId: PeerId): T {.public.} = +proc `[]`*[T](peerBook: PeerBook[T], peerId: PeerId): T {.public.} = ## Get all known metadata of a provided peer, or default(T) if missing peerBook.book.getOrDefault(peerId) -proc `[]=`*[T](peerBook: PeerBook[T], - peerId: PeerId, - entry: T) {.public.} = +proc `[]=`*[T](peerBook: PeerBook[T], peerId: PeerId, entry: T) {.public.} = ## Set metadata for a given peerId. - peerBook.book[peerId] = entry # Notify clients for handler in peerBook.changeHandlers: handler(peerId) -proc del*[T](peerBook: PeerBook[T], - peerId: PeerId): bool {.public.} = - ## Delete the provided peer from the book. Returns whether the peer was in the book - +proc del*[T](peerBook: PeerBook[T], peerId: PeerId): bool {.public.} = + ## Delete the provided peer from the book. + ## Returns whether the peer was in the book if peerId notin peerBook.book: return false else: @@ -118,7 +116,8 @@ proc del*[T](peerBook: PeerBook[T], proc contains*[T](peerBook: PeerBook[T], peerId: PeerId): bool {.public.} = peerId in peerBook.book -proc addHandler*[T](peerBook: PeerBook[T], handler: PeerBookChangeHandler) {.public.} = +proc addHandler*[T]( + peerBook: PeerBook[T], handler: PeerBookChangeHandler) {.public.} = ## Adds a callback that will be called everytime the book changes peerBook.changeHandlers.add(handler) @@ -145,16 +144,12 @@ proc `[]`*[T](p: PeerStore, typ: type[T]): T {.public.} = p.books[name] = result return result -proc del*(peerStore: PeerStore, - peerId: PeerId) {.public.} = +proc del*(peerStore: PeerStore, peerId: PeerId) {.public.} = ## Delete the provided peer from every book. for _, book in peerStore.books: book.deletor(peerId) -proc updatePeerInfo*( - peerStore: PeerStore, - info: IdentifyInfo) = - +proc updatePeerInfo*(peerStore: PeerStore, info: IdentifyInfo) = if info.addrs.len > 0: peerStore[AddressBook][info.peerId] = info.addrs @@ -177,10 +172,7 @@ proc updatePeerInfo*( if cleanupPos >= 0: peerStore.toClean.delete(cleanupPos) -proc cleanup*( - peerStore: PeerStore, - peerId: PeerId) = - +proc cleanup*(peerStore: PeerStore, peerId: PeerId) = if peerStore.capacity == 0: peerStore.del(peerId) return @@ -194,9 +186,8 @@ proc cleanup*( peerStore.toClean.delete(0) proc identify*( - peerStore: PeerStore, - muxer: Muxer) {.async.} = - + peerStore: PeerStore, + muxer: Muxer) {.async: (raises: [CancelledError, LPError]).} = # new stream for identify var stream = await muxer.newStream() if stream == nil: @@ -209,7 +200,8 @@ proc identify*( when defined(libp2p_agents_metrics): var knownAgent = "unknown" - shortAgent = info.agentVersion.get("").split("/")[0].safeToLowerAscii().get("") + shortAgent = info.agentVersion.get("").split("/")[0] + .safeToLowerAscii().get("") if KnownLibP2PAgentsSeq.contains(shortAgent): knownAgent = shortAgent muxer.connection.setShortAgent(knownAgent) diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 3a1f6f925..0af3e6a35 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -45,14 +45,14 @@ type IdentifyNoPubKeyError* = object of IdentifyError IdentifyInfo* {.public.} = object - pubkey*: Option[PublicKey] + pubkey*: Opt[PublicKey] peerId*: PeerId addrs*: seq[MultiAddress] - observedAddr*: Option[MultiAddress] - protoVersion*: Option[string] - agentVersion*: Option[string] + observedAddr*: Opt[MultiAddress] + protoVersion*: Opt[string] + agentVersion*: Opt[string] protos*: seq[string] - signedPeerRecord*: Option[Envelope] + signedPeerRecord*: Opt[Envelope] Identify* = ref object of LPProtocol peerInfo*: PeerInfo @@ -60,10 +60,9 @@ type observedAddrManager*: ObservedAddrManager IdentifyPushHandler* = proc ( - peer: PeerId, - newInfo: IdentifyInfo): - Future[void] - {.gcsafe, raises: [], public.} + peer: PeerId, + newInfo: IdentifyInfo + ): Future[void] {.async: (raises: [CancelledError]), public.} IdentifyPush* = ref object of LPProtocol identifyHandler: IdentifyPushHandler @@ -81,8 +80,10 @@ chronicles.expandIt(IdentifyInfo): if it.signedPeerRecord.isSome(): "Some" else: "None" -proc encodeMsg(peerInfo: PeerInfo, observedAddr: Opt[MultiAddress], sendSpr: bool): ProtoBuffer - {.raises: [].} = +proc encodeMsg( + peerInfo: PeerInfo, + observedAddr: Opt[MultiAddress], + sendSpr: bool): ProtoBuffer = result = initProtoBuffer() let pkey = peerInfo.publicKey @@ -121,27 +122,26 @@ proc decodeMsg*(buf: seq[byte]): Opt[IdentifyInfo] = var pb = initProtoBuffer(buf) if ? pb.getField(1, pubkey).toOpt(): - iinfo.pubkey = some(pubkey) + iinfo.pubkey = Opt.some(pubkey) if ? pb.getField(8, signedPeerRecord).toOpt() and - pubkey == signedPeerRecord.envelope.publicKey: - iinfo.signedPeerRecord = some(signedPeerRecord.envelope) + pubkey == signedPeerRecord.envelope.publicKey: + iinfo.signedPeerRecord = Opt.some(signedPeerRecord.envelope) discard ? pb.getRepeatedField(2, iinfo.addrs).toOpt() discard ? pb.getRepeatedField(3, iinfo.protos).toOpt() if ? pb.getField(4, oaddr).toOpt(): - iinfo.observedAddr = some(oaddr) + iinfo.observedAddr = Opt.some(oaddr) if ? pb.getField(5, protoVersion).toOpt(): - iinfo.protoVersion = some(protoVersion) + iinfo.protoVersion = Opt.some(protoVersion) if ? pb.getField(6, agentVersion).toOpt(): - iinfo.agentVersion = some(agentVersion) + iinfo.agentVersion = Opt.some(agentVersion) Opt.some(iinfo) proc new*( - T: typedesc[Identify], - peerInfo: PeerInfo, - sendSignedPeerRecord = false, - observedAddrManager = ObservedAddrManager.new(), - ): T = + T: typedesc[Identify], + peerInfo: PeerInfo, + sendSignedPeerRecord = false, + observedAddrManager = ObservedAddrManager.new()): T = let identify = T( peerInfo: peerInfo, sendSignedPeerRecord: sendSignedPeerRecord, @@ -158,7 +158,7 @@ method init*(p: Identify) = await conn.writeLp(pb.buffer) except CancelledError as exc: raise exc - except CatchableError as exc: + except LPStreamError as exc: trace "exception in identify handler", exc = exc.msg, conn finally: trace "exiting identify handler", conn @@ -167,36 +167,46 @@ method init*(p: Identify) = p.handler = handle p.codec = IdentifyCodec -proc identify*(self: Identify, - conn: Connection, - remotePeerId: PeerId): Future[IdentifyInfo] {.async.} = +proc identify*( + self: Identify, + conn: Connection, + remotePeerId: PeerId +): Future[IdentifyInfo] {.async: (raises: [ + CancelledError, IdentifyError, LPStreamError]).} = trace "initiating identify", conn var message = await conn.readLp(64*1024) if len(message) == 0: trace "identify: Empty message received!", conn - raise newException(IdentityInvalidMsgError, "Empty message received!") + raise (ref IdentityInvalidMsgError)(msg: "Empty message received!") - var info = decodeMsg(message).valueOr: raise newException(IdentityInvalidMsgError, "Incorrect message received!") + var info = decodeMsg(message).valueOr: + raise (ref IdentityInvalidMsgError)(msg: "Incorrect message received!") debug "identify: decoded message", conn, info let - pubkey = info.pubkey.valueOr: raise newException(IdentityInvalidMsgError, "No pubkey in identify") - peer = PeerId.init(pubkey).valueOr: raise newException(IdentityInvalidMsgError, $error) + pubkey = info.pubkey.valueOr: + raise (ref IdentityInvalidMsgError)(msg: "No pubkey in identify") + peer = PeerId.init(pubkey).valueOr: + raise (ref IdentityInvalidMsgError)(msg: $error) if peer != remotePeerId: trace "Peer ids don't match", remote = peer, local = remotePeerId - raise newException(IdentityNoMatchError, "Peer ids don't match") + raise (ref IdentityNoMatchError)(msg: "Peer ids don't match") info.peerId = peer info.observedAddr.withValue(observed): - # Currently, we use the ObservedAddrManager only to find our dialable external NAT address. Therefore, addresses + # Currently, we use the ObservedAddrManager only to find our + # dialable external NAT address. Therefore, addresses # like "...\p2p-circuit\p2p\..." and "\p2p\..." are not useful to us. - if observed.contains(multiCodec("p2p-circuit")).get(false) or P2PPattern.matchPartial(observed): + if observed.contains(multiCodec("p2p-circuit")).get(false) or + P2PPattern.matchPartial(observed): trace "Not adding address to ObservedAddrManager.", observed elif not self.observedAddrManager.addObservation(observed): trace "Observed address is not valid.", observedAddr = observed return info -proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T {.public.} = +proc new*( + T: typedesc[IdentifyPush], + handler: IdentifyPushHandler = nil): T {.public.} = ## Create a IdentifyPush protocol. `handler` will be called every time ## a peer sends us new `PeerInfo` let identifypush = T(identifyHandler: handler) @@ -210,21 +220,21 @@ proc init*(p: IdentifyPush) = var message = await conn.readLp(64*1024) var identInfo = decodeMsg(message).valueOr: - raise newException(IdentityInvalidMsgError, "Incorrect message received!") + raise (ref IdentityInvalidMsgError)(msg: "Incorrect message received!") debug "identify push: decoded message", conn, identInfo identInfo.pubkey.withValue(pubkey): let receivedPeerId = PeerId.init(pubkey).tryGet() if receivedPeerId != conn.peerId: - raise newException(IdentityNoMatchError, "Peer ids don't match") + raise (ref IdentityNoMatchError)(msg: "Peer ids don't match") identInfo.peerId = receivedPeerId trace "triggering peer event", peerInfo = conn.peerId - if not isNil(p.identifyHandler): + if p.identifyHandler != nil: await p.identifyHandler(conn.peerId, identInfo) except CancelledError as exc: raise exc - except CatchableError as exc: + except LPError as exc: info "exception in identify push handler", exc = exc.msg, conn finally: trace "exiting identify push handler", conn @@ -233,7 +243,11 @@ proc init*(p: IdentifyPush) = p.handler = handle p.codec = IdentifyPushCodec -proc push*(p: IdentifyPush, peerInfo: PeerInfo, conn: Connection) {.async, public.} = +proc push*( + p: IdentifyPush, + peerInfo: PeerInfo, + conn: Connection +) {.async: (raises: [CancelledError, LPStreamError]), public.} = ## Send new `peerInfo`s to a connection var pb = encodeMsg(peerInfo, conn.observedAddr, true) await conn.writeLp(pb.buffer) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 7fc1bada8..cf4cf9738 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -136,68 +136,75 @@ proc disconnect*(s: Switch, peerId: PeerId): Future[void] {.gcsafe, public.} = s.connManager.dropPeer(peerId) method connect*( - s: Switch, - peerId: PeerId, - addrs: seq[MultiAddress], - forceDial = false, - reuseConnection = true, - dir = Direction.Out): Future[void] {.public.} = + s: Switch, + peerId: PeerId, + addrs: seq[MultiAddress], + forceDial = false, + reuseConnection = true, + dir = Direction.Out +): Future[void] {.async: (raises: [ + CancelledError, LPError], raw: true), public.} = ## Connects to a peer without opening a stream to it - s.dialer.connect(peerId, addrs, forceDial, reuseConnection, dir) method connect*( s: Switch, address: MultiAddress, - allowUnknownPeerId = false): Future[PeerId] = + allowUnknownPeerId = false +): Future[PeerId] {.async: (raises: [CancelledError, LPError], raw: true).} = ## Connects to a peer and retrieve its PeerId ## ## If the P2P part is missing from the MA and `allowUnknownPeerId` is set ## to true, this will discover the PeerId while connecting. This exposes ## you to MiTM attacks, so it shouldn't be used without care! - s.dialer.connect(address, allowUnknownPeerId) method dial*( s: Switch, peerId: PeerId, - protos: seq[string]): Future[Connection] {.public.} = + protos: seq[string] +): Future[Connection] {.async: (raises: [ + CancelledError, LPError], raw: true), public.} = ## Open a stream to a connected peer with the specified `protos` - s.dialer.dial(peerId, protos) -proc dial*(s: Switch, - peerId: PeerId, - proto: string): Future[Connection] {.public.} = +proc dial*( + s: Switch, + peerId: PeerId, + proto: string +): Future[Connection] {.async: (raises: [ + CancelledError, LPError], raw: true), public.} = ## Open a stream to a connected peer with the specified `proto` - dial(s, peerId, @[proto]) method dial*( - s: Switch, - peerId: PeerId, - addrs: seq[MultiAddress], - protos: seq[string], - forceDial = false): Future[Connection] {.public.} = + s: Switch, + peerId: PeerId, + addrs: seq[MultiAddress], + protos: seq[string], + forceDial = false +): Future[Connection] {.async: (raises: [ + CancelledError, LPError], raw: true), public.} = ## Connected to a peer and open a stream ## with the specified `protos` - s.dialer.dial(peerId, addrs, protos, forceDial) proc dial*( - s: Switch, - peerId: PeerId, - addrs: seq[MultiAddress], - proto: string): Future[Connection] {.public.} = + s: Switch, + peerId: PeerId, + addrs: seq[MultiAddress], + proto: string +): Future[Connection] {.async: (raises: [ + CancelledError, LPError], raw: true), public.} = ## Connected to a peer and open a stream ## with the specified `proto` - dial(s, peerId, addrs, @[proto]) -proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil) - {.gcsafe, raises: [LPError], public.} = +proc mount*[T: LPProtocol]( + s: Switch, + proto: T, + matcher: Matcher = nil) {.gcsafe, raises: [LPError], public.} = ## mount a protocol to the switch - if isNil(proto.handler): raise newException(LPError, "Protocol has to define a handle method or proc") @@ -237,7 +244,6 @@ proc upgradeMonitor( proc accept(s: Switch, transport: Transport) {.async.} = # noraises ## switch accept loop, ran for every transport ## - let upgrades = newAsyncSemaphore(ConcurrentUpgrades) while transport.running: var conn: Connection @@ -323,7 +329,6 @@ proc stop*(s: Switch) {.async, public.} = proc start*(s: Switch) {.async, public.} = ## Start listening on every transport - if s.started: warn "Switch has already been started" return @@ -365,15 +370,15 @@ proc start*(s: Switch) {.async, public.} = debug "Started libp2p node", peer = s.peerInfo -proc newSwitch*(peerInfo: PeerInfo, - transports: seq[Transport], - secureManagers: openArray[Secure] = [], - connManager: ConnManager, - ms: MultistreamSelect, - peerStore: PeerStore, - nameResolver: NameResolver = nil, - services = newSeq[Service]()): Switch - {.raises: [LPError].} = +proc newSwitch*( + peerInfo: PeerInfo, + transports: seq[Transport], + secureManagers: openArray[Secure] = [], + connManager: ConnManager, + ms: MultistreamSelect, + peerStore: PeerStore, + nameResolver: NameResolver = nil, + services = newSeq[Service]()): Switch {.raises: [LPError].} = if secureManagers.len == 0: raise newException(LPError, "Provide at least one secure manager") diff --git a/tests/stubs/switchstub.nim b/tests/stubs/switchstub.nim index b71cf07b3..dc9d852fe 100644 --- a/tests/stubs/switchstub.nim +++ b/tests/stubs/switchstub.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -19,26 +19,32 @@ type switch*: Switch connectStub*: connectStubType - connectStubType* = proc (self: SwitchStub, - peerId: PeerId, - addrs: seq[MultiAddress], - forceDial = false, - reuseConnection = true, - dir = Direction.Out): Future[void] {.async.} + connectStubType* = proc ( + self: SwitchStub, + peerId: PeerId, + addrs: seq[MultiAddress], + forceDial = false, + reuseConnection = true, + dir = Direction.Out + ): Future[void] {.async: (raises: [CancelledError, LPError]).} method connect*( - self: SwitchStub, - peerId: PeerId, - addrs: seq[MultiAddress], - forceDial = false, - reuseConnection = true, - dir = Direction.Out) {.async.} = + self: SwitchStub, + peerId: PeerId, + addrs: seq[MultiAddress], + forceDial = false, + reuseConnection = true, + dir = Direction.Out +) {.async: (raises: [CancelledError, LPError], raw: true).} = if (self.connectStub != nil): - await self.connectStub(self, peerId, addrs, forceDial, reuseConnection, dir) + self.connectStub(self, peerId, addrs, forceDial, reuseConnection, dir) else: - await self.switch.connect(peerId, addrs, forceDial, reuseConnection, dir) + self.switch.connect(peerId, addrs, forceDial, reuseConnection, dir) -proc new*(T: typedesc[SwitchStub], switch: Switch, connectStub: connectStubType = nil): T = +proc new*( + T: typedesc[SwitchStub], + switch: Switch, + connectStub: connectStubType = nil): T = return SwitchStub( switch: switch, peerInfo: switch.peerInfo,