From 2482d8a0d8cd1705cb91bc6174bbab23a8b5ae15 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Wed, 6 Mar 2024 13:12:05 +0100 Subject: [PATCH] X --- libp2p/builders.nim | 31 ++- libp2p/connmanager.nim | 49 ++--- libp2p/daemon/daemonapi.nim | 186 +++++++++++------- libp2p/dial.nim | 48 ++--- libp2p/dialer.nim | 123 +++++++----- libp2p/multistream.nim | 29 ++- libp2p/nameresolving/dnsresolver.nim | 23 ++- libp2p/nameresolving/mockresolver.nim | 33 +++- libp2p/nameresolving/nameresolver.nim | 58 +++--- libp2p/peerinfo.nim | 24 +-- libp2p/peerstore.nim | 4 +- .../protocols/connectivity/relay/client.nim | 82 +++++--- libp2p/protocols/connectivity/relay/utils.nim | 58 +++--- libp2p/protocols/identify.nim | 9 +- libp2p/protocols/rendezvous.nim | 81 ++++---- libp2p/protocols/secure/noise.nim | 44 ++++- libp2p/protocols/secure/secio.nim | 99 ++++++---- libp2p/protocols/secure/secure.nim | 11 +- libp2p/switch.nim | 67 ++++--- libp2p/wire.nim | 85 ++++---- 20 files changed, 676 insertions(+), 468 deletions(-) diff --git a/libp2p/builders.nim b/libp2p/builders.nim index 20256301c..12d2d8342 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -291,24 +291,23 @@ proc build*(b: SwitchBuilder): Switch return switch proc newStandardSwitch*( - privKey = none(PrivateKey), - addrs: MultiAddress | seq[MultiAddress] = - MultiAddress.init("/ip4/127.0.0.1/tcp/0").expect("valid address"), - secureManagers: openArray[SecureProtocol] = [ + privKey = none(PrivateKey), + addrs: MultiAddress | seq[MultiAddress] = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), + secureManagers: openArray[SecureProtocol] = [ SecureProtocol.Noise, ], - transportFlags: set[ServerFlags] = {}, - rng = newRng(), - inTimeout: Duration = 5.minutes, - outTimeout: Duration = 5.minutes, - maxConnections = MaxConnections, - maxIn = -1, - maxOut = -1, - maxConnsPerPeer = MaxConnectionsPerPeer, - nameResolver: NameResolver = nil, - sendSignedPeerRecord = false, - peerStoreCapacity = 1000 -): Switch {.raises: [LPError], public.} = + transportFlags: set[ServerFlags] = {}, + rng = newRng(), + inTimeout: Duration = 5.minutes, + outTimeout: Duration = 5.minutes, + maxConnections = MaxConnections, + maxIn = -1, + maxOut = -1, + maxConnsPerPeer = MaxConnectionsPerPeer, + nameResolver: NameResolver = nil, + sendSignedPeerRecord = false, + peerStoreCapacity = 1000): Switch + {.raises: [LPError], public.} = ## Helper for common switch configurations. {.push warning[Deprecated]:off.} if SecureProtocol.Secio in secureManagers: diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 210dc660b..5f35e58d9 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -280,9 +280,7 @@ proc selectMuxer*(c: ConnManager, peerId: PeerId): Muxer = trace "connection not found", peerId return mux -proc storeMuxer*(c: ConnManager, - muxer: Muxer) - {.raises: [CatchableError].} = +proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [LPError].} = ## store the connection and muxer ## @@ -311,14 +309,12 @@ proc storeMuxer*(c: ConnManager, raise newTooManyConnectionsError() - var newPeer = false - c.muxed.withValue(peerId, muxers): - doAssert muxers[].len > 0 - doAssert muxer notin muxers[] - muxers[].add(muxer) - do: - c.muxed[peerId] = @[muxer] - newPeer = true + assert muxer notin c.muxed.getOrDefault(peerId) + + let + newPeer = peerId notin c.muxed + assert newPeer or c.muxed[peerId].len > 0 + c.muxed.mgetOrPut(peerId, newSeq[Muxer]()).add(muxer) libp2p_peers.set(c.muxed.len.int64) asyncSpawn c.triggerConnEvent( @@ -380,28 +376,34 @@ proc trackMuxer*(cs: ConnectionSlot, mux: Muxer) = return cs.trackConnection(mux.connection) -proc getStream*(c: ConnManager, - muxer: Muxer): Future[Connection] {.async.} = +proc getStream*( + c: ConnManager, + muxer: Muxer +): Future[Connection] {.async: (raises: [ + CancelledError, LPStreamError, MuxerError]).} = ## get a muxed stream for the passed muxer ## - if not(isNil(muxer)): return await muxer.newStream() -proc getStream*(c: ConnManager, - peerId: PeerId): Future[Connection] {.async.} = +proc getStream*( + c: ConnManager, + peerId: PeerId +): Future[Connection] {.async: (raises: [ + CancelledError, LPStreamError, MuxerError], raw: true).} = ## get a muxed stream for the passed peer from any connection ## + c.getStream(c.selectMuxer(peerId)) - return await c.getStream(c.selectMuxer(peerId)) - -proc getStream*(c: ConnManager, - peerId: PeerId, - dir: Direction): Future[Connection] {.async.} = +proc getStream*( + c: ConnManager, + peerId: PeerId, + dir: Direction +): Future[Connection] {.async: (raises: [ + CancelledError, LPStreamError, MuxerError], raw: true).} = ## get a muxed stream for the passed peer from a connection with `dir` ## - - return await c.getStream(c.selectMuxer(peerId, dir)) + c.getStream(c.selectMuxer(peerId, dir)) proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} = @@ -435,4 +437,3 @@ proc close*(c: ConnManager) {.async.} = await closeMuxer(mux) trace "Closed ConnManager" - diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index 645a61081..9b766a349 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -149,11 +149,13 @@ type signature*: Signature key*: PublicKey - P2PStreamCallback* = proc(api: DaemonAPI, - stream: P2PStream): Future[void] {.gcsafe, raises: [CatchableError].} - P2PPubSubCallback* = proc(api: DaemonAPI, - ticket: PubsubTicket, - message: PubSubMessage): Future[bool] {.gcsafe, raises: [CatchableError].} + P2PStreamCallback* = proc( + api: DaemonAPI, + stream: P2PStream): Future[void] {.async: (raises: []).} + P2PPubSubCallback* = proc( + api: DaemonAPI, + ticket: PubsubTicket, + message: PubSubMessage): Future[bool] {.async: (raises: []).} DaemonError* = object of LPError DaemonRemoteError* = object of DaemonError @@ -477,7 +479,9 @@ proc getErrorMessage(pb: ProtoBuffer): string {.inline, raises: [DaemonLocalErro if initProtoBuffer(error).getRequiredField(1, result).isErr(): raise newException(DaemonLocalError, "Error message is missing!") -proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = +proc recvMessage( + conn: StreamTransport +): Future[seq[byte]] {.async: (raises: [CancelledError, TransportError]).} = var size: uint length: int @@ -500,14 +504,18 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = result = buffer -proc newConnection*(api: DaemonAPI): Future[StreamTransport] - {.raises: [LPError].} = - result = connect(api.address) +proc newConnection*( + api: DaemonAPI +): Future[StreamTransport] {.async: (raises: [ + CancelledError, LPError], raw: true).} = + connect(api.address) -proc closeConnection*(api: DaemonAPI, transp: StreamTransport): Future[void] = - result = transp.closeWait() +proc closeConnection*( + api: DaemonAPI, + transp: StreamTransport): Future[void] {.async: (raises: []).} = + transp.closeWait() -proc socketExists(address: MultiAddress): Future[bool] {.async.} = +proc socketExists(address: MultiAddress): Future[bool] {.async: (raises: []).} = try: var transp = await connect(address) await transp.closeWait() @@ -524,31 +532,44 @@ else: proc getProcessId(): int = result = int(posix.getpid()) -proc getSocket(pattern: string, - count: ptr int): Future[MultiAddress] {.async.} = +proc getSocket( + pattern: string, + count: ptr int): Future[MultiAddress] {.async: (raises: [LPError]).} = var sockname = "" var pid = $getProcessId() - sockname = pattern % [pid, $(count[])] + try: + sockname = pattern % [pid, $(count[])] + except ValueError as exc: + raiseAssert("Pattern `" & pattern & "` is invalid: " & $exc.msg) let tmpma = MultiAddress.init(sockname).tryGet() if UNIX.match(tmpma): while true: count[] = count[] + 1 - sockname = pattern % [pid, $(count[])] + try: + sockname = pattern % [pid, $(count[])] + except ValueError as exc: + raiseAssert("Pattern `" & pattern & "` is invalid: " & $exc.msg) var ma = MultiAddress.init(sockname).tryGet() let res = await socketExists(ma) if not res: result = ma break elif TCP.match(tmpma): - sockname = pattern % [pid, "0"] + try: + sockname = pattern % [pid, "0"] + except ValueError as exc: + raiseAssert("Pattern `" & pattern & "` is invalid: " & $exc.msg) var ma = MultiAddress.init(sockname).tryGet() var sock = createAsyncSocket(ma) if sock.bindAsyncSocket(ma): # Socket was successfully bound, then its free to use count[] = count[] + 1 var ta = sock.getLocalAddress() - sockname = pattern % [pid, $ta.port] + try: + sockname = pattern % [pid, $ta.port] + except ValueError as exc: + raiseAssert("Pattern `" & pattern & "` is invalid: " & $exc.msg) result = MultiAddress.init(sockname).tryGet() closeSocket(sock) @@ -822,13 +843,25 @@ template withMessage(m, body: untyped): untyped = else: body -proc transactMessage(transp: StreamTransport, - pb: ProtoBuffer): Future[ProtoBuffer] {.async.} = +proc transactMessage( + transp: StreamTransport, + pb: ProtoBuffer +): Future[ProtoBuffer] {.async: (raises: [CancelledError, LPError]).} = let length = pb.getLen() - let res = await transp.write(pb.getPtr(), length) + let res = + try: + await transp.write(pb.getPtr(), length) + except TransportError as exc: + raise newException(DaemonLocalError, + "Could not send message to daemon!", exc) if res != length: - raise newException(DaemonLocalError, "Could not send message to daemon!") - var message = await transp.recvMessage() + raise newException(DaemonLocalError, "Sent incomplete message to daemon!") + var message = + try: + await transp.recvMessage() + except TransportError as exc: + raise newException(DaemonLocalError, + "Could not receive message from daemon!", exc) if len(message) == 0: raise newException(DaemonLocalError, "Incorrect or empty message received!") result = initProtoBuffer(message) @@ -878,16 +911,18 @@ proc disconnect*(api: DaemonAPI, peer: PeerId) {.async.} = finally: await api.closeConnection(transp) -proc openStream*(api: DaemonAPI, peer: PeerId, - protocols: seq[string], - timeout = 0): Future[P2PStream] {.async.} = +proc openStream*( + api: DaemonAPI, peer: PeerId, + protocols: seq[string], + timeout = 0 +): Future[P2PStream] {.async: (raises: [CancelledError, LPError]).} = ## Open new stream to peer ``peer`` using one of the protocols in ## ``protocols``. Returns ``StreamTransport`` for the stream. var transp = await api.newConnection() var stream = new P2PStream try: - var pb = await transp.transactMessage(requestStreamOpen(peer, protocols, - timeout)) + var pb = await transp.transactMessage( + requestStreamOpen(peer, protocols, timeout)) pb.withMessage() do: var res: seq[byte] if pb.getRequiredField(ResponseType.STREAMINFO.int, res).isOk(): @@ -902,52 +937,67 @@ proc openStream*(api: DaemonAPI, peer: PeerId, stream.flags.incl(Outbound) stream.transp = transp result = stream - except CatchableError as exc: + except ResultError[ProtoError] as exc: await api.closeConnection(transp) - raise exc + raise newException(LPError, "Failed to parse message", exc) -proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} = +proc streamHandler( + server: StreamServer, + transp: StreamTransport) {.async: (raises: []).} = var api = getUserData[DaemonAPI](server) - var message = await transp.recvMessage() - var pb = initProtoBuffer(message) - var stream = new P2PStream - var raddress = newSeq[byte]() - stream.protocol = "" - pb.getRequiredField(1, stream.peer).tryGet() - pb.getRequiredField(2, raddress).tryGet() - stream.raddress = MultiAddress.init(raddress).tryGet() - pb.getRequiredField(3, stream.protocol).tryGet() - stream.flags.incl(Inbound) - stream.transp = transp - if len(stream.protocol) > 0: - var handler = api.handlers.getOrDefault(stream.protocol) - if not isNil(handler): - asyncSpawn handler(api, stream) - -proc addHandler*(api: DaemonAPI, protocols: seq[string], - handler: P2PStreamCallback) {.async, raises: [LPError].} = - ## Add stream handler ``handler`` for set of protocols ``protocols``. - var transp = await api.newConnection() - let maddress = await getSocket(api.pattern, addr api.ucounter) - var server = createStreamServer(maddress, streamHandler, udata = api) try: - for item in protocols: - api.handlers[item] = handler - server.start() - var pb = await transp.transactMessage(requestStreamHandler(maddress, - protocols)) - pb.withMessage() do: - api.servers.add(P2PServer(server: server, address: maddress)) - except CatchableError as exc: - for item in protocols: - api.handlers.del(item) - server.stop() - server.close() - await server.join() - raise exc - finally: + var message = await transp.recvMessage() + var pb = initProtoBuffer(message) + var stream = new P2PStream + var raddress = newSeq[byte]() + stream.protocol = "" + pb.getRequiredField(1, stream.peer).tryGet() + pb.getRequiredField(2, raddress).tryGet() + stream.raddress = MultiAddress.init(raddress).tryGet() + pb.getRequiredField(3, stream.protocol).tryGet() + stream.flags.incl(Inbound) + stream.transp = transp + if len(stream.protocol) > 0: + var handler = api.handlers.getOrDefault(stream.protocol) + if not isNil(handler): + asyncSpawn handler(api, stream) + except CancelledError, LPError, ResultError[ProtoError], TransportError: await api.closeConnection(transp) +proc addHandler*( + api: DaemonAPI, + protocols: seq[string], + handler: P2PStreamCallback) {.async: (raises: [CancelledError, LPError]).} = + ## Add stream handler ``handler`` for set of protocols ``protocols``. + let transp = await api.newConnection() + defer: await api.closeConnection(transp) + + var added = false + for item in protocols: + api.handlers[item] = handler + defer: + if not added: + for item in protocols: + api.handlers.del(item) + + let + maddress = await getSocket(api.pattern, addr api.ucounter) + server = createStreamServer(maddress, streamHandler, udata = api) + defer: + if not added: + try: + server.stop() + except TransportOsError: + discard + server.close() + await noCancel server.join() + + var pb = await transp.transactMessage( + requestStreamHandler(maddress, protocols)) + pb.withMessage() do: + api.servers.add(P2PServer(server: server, address: maddress)) + added = true + proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} = ## Get list of remote peers to which we are currently connected. var transp = await api.newConnection() diff --git a/libp2p/dial.nim b/libp2p/dial.nim index 5d46cf5fc..5baf0f438 100644 --- a/libp2p/dial.nim +++ b/libp2p/dial.nim @@ -21,47 +21,49 @@ type Dial* = ref object of RootObj method connect*( - self: Dial, - peerId: PeerId, - addrs: seq[MultiAddress], - forceDial = false, - reuseConnection = true, - dir = Direction.Out) {.async, base.} = + self: Dial, + peerId: PeerId, + addrs: seq[MultiAddress], + forceDial = false, + reuseConnection = true, + dir = Direction.Out +) {.async: (raises: [CancelledError, LPError], raw: true), base.} = ## connect remote peer without negotiating ## a protocol ## - - doAssert(false, "Not implemented!") + raiseAssert("Not implemented!") method connect*( self: Dial, address: MultiAddress, - allowUnknownPeerId = false): Future[PeerId] {.async, base.} = + allowUnknownPeerId = false +): Future[PeerId] {.async: (raises: [ + CancelledError, LPError], raw: true), base.} = ## Connects to a peer and retrieve its PeerId - - doAssert(false, "Not implemented!") + raiseAssert("Not implemented!") method dial*( - self: Dial, - peerId: PeerId, - protos: seq[string], - ): Future[Connection] {.async, base.} = + self: Dial, + peerId: PeerId, + protos: seq[string] +): Future[Connection] {.async: (raises: [ + CancelledError, LPError], raw: true), base.} = ## create a protocol stream over an ## existing connection ## - - doAssert(false, "Not implemented!") + raiseAssert("Not implemented!") method dial*( - self: Dial, - peerId: PeerId, - addrs: seq[MultiAddress], - protos: seq[string], - forceDial = false): Future[Connection] {.async, base.} = + self: Dial, + peerId: PeerId, + addrs: seq[MultiAddress], + protos: seq[string], + forceDial = false +): Future[Connection] {.async: (raises: [ + CancelledError, LPError], raw: true), base.} = ## create a protocol stream and establish ## a connection if one doesn't exist already ## - doAssert(false, "Not implemented!") method addTransport*( diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 986f4e370..19e508515 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -49,12 +49,12 @@ type nameResolver: NameResolver proc dialAndUpgrade( - self: Dialer, - peerId: Opt[PeerId], - hostname: string, - address: MultiAddress, - dir = Direction.Out): - Future[Muxer] {.async.} = + self: Dialer, + peerId: Opt[PeerId], + hostname: string, + address: MultiAddress, + dir = Direction.Out +): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = for transport in self.transports: # for each transport if transport.handles(address): # check if it can dial it @@ -101,10 +101,11 @@ proc dialAndUpgrade( return nil proc expandDnsAddr( - self: Dialer, - peerId: Opt[PeerId], - address: MultiAddress): Future[seq[(MultiAddress, Opt[PeerId])]] {.async.} = - + self: Dialer, + peerId: Opt[PeerId], + address: MultiAddress +): Future[seq[(MultiAddress, Opt[PeerId])]] {.async: (raises: [ + CancelledError, LPError]).} = if not DNSADDR.matchPartial(address): return @[(address, peerId)] if isNil(self.nameResolver): info "Can't resolve DNSADDR without NameResolver", ma=address @@ -113,7 +114,8 @@ proc expandDnsAddr( let toResolve = if peerId.isSome: - address & MultiAddress.init(multiCodec("p2p"), peerId.tryGet()).tryGet() + address & MultiAddress.init(multiCodec("p2p"), peerId.get()) + .tryGet() else: address resolved = await self.nameResolver.resolveDnsAddr(toResolve) @@ -124,17 +126,17 @@ proc expandDnsAddr( let peerIdBytes = lastPart.protoArgument().tryGet() addrPeerId = PeerId.init(peerIdBytes).tryGet() - result.add((resolvedAddress[0..^2].tryGet(), Opt.some(addrPeerId))) + result.add(( + resolvedAddress[0..^2].tryGet(), Opt.some(addrPeerId))) else: result.add((resolvedAddress, peerId)) proc dialAndUpgrade( - self: Dialer, - peerId: Opt[PeerId], - addrs: seq[MultiAddress], - dir = Direction.Out): - Future[Muxer] {.async.} = - + self: Dialer, + peerId: Opt[PeerId], + addrs: seq[MultiAddress], + dir = Direction.Out +): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = debug "Dialing peer", peerId = peerId.get(default(PeerId)) for rawAddress in addrs: @@ -163,15 +165,15 @@ proc tryReusingConnection(self: Dialer, peerId: PeerId): Opt[Muxer] = return Opt.some(muxer) proc internalConnect( - self: Dialer, - peerId: Opt[PeerId], - addrs: seq[MultiAddress], - forceDial: bool, - reuseConnection = true, - dir = Direction.Out): - Future[Muxer] {.async.} = + self: Dialer, + peerId: Opt[PeerId], + addrs: seq[MultiAddress], + forceDial: bool, + reuseConnection = true, + dir = Direction.Out +): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = if Opt.some(self.localPeerId) == peerId: - raise newException(CatchableError, "can't dial self!") + raise newException(DialFailedError, "can't dial self!") # Ensure there's only one in-flight attempt per peer let lock = self.dialLock.mgetOrPut(peerId.get(default(PeerId)), newAsyncLock()) @@ -187,7 +189,10 @@ proc internalConnect( let muxed = try: await self.dialAndUpgrade(peerId, addrs, dir) - except CatchableError as exc: + except CancelledError as exc: + slot.release() + raise exc + except LPError as exc: slot.release() raise exc slot.trackMuxer(muxed) @@ -197,7 +202,11 @@ proc internalConnect( try: self.connManager.storeMuxer(muxed) await self.peerStore.identify(muxed) - except CatchableError as exc: + except CancelledError as exc: + trace "Failed to finish outgoung upgrade", err=exc.msg + await muxed.close() + raise exc + except LPError as exc: trace "Failed to finish outgoung upgrade", err=exc.msg await muxed.close() raise exc @@ -205,28 +214,32 @@ proc internalConnect( return muxed finally: if lock.locked(): - lock.release() + try: + lock.release() + except AsyncLockError as exc: + raiseAssert("Releasing an acquired lock should work: " & $exc.msg) method connect*( - self: Dialer, - peerId: PeerId, - addrs: seq[MultiAddress], - forceDial = false, - reuseConnection = true, - dir = Direction.Out) {.async.} = + self: Dialer, + peerId: PeerId, + addrs: seq[MultiAddress], + forceDial = false, + reuseConnection = true, + dir = Direction.Out) {.async: (raises: [CancelledError, LPError]).} = ## connect remote peer without negotiating ## a protocol ## - if self.connManager.connCount(peerId) > 0 and reuseConnection: return - discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection, dir) + discard await self.internalConnect( + Opt.some(peerId), addrs, forceDial, reuseConnection, dir) method connect*( self: Dialer, address: MultiAddress, - allowUnknownPeerId = false): Future[PeerId] {.async.} = + allowUnknownPeerId = false +): Future[PeerId] {.async: (raises: [CancelledError, LPError]).} = ## Connects to a peer and retrieve its PeerId parseFullAddress(address).toOpt().withValue(fullAddress): @@ -236,17 +249,19 @@ method connect*( false)).connection.peerId if allowUnknownPeerId == false: - raise newException(DialFailedError, "Address without PeerID and unknown peer id disabled!") + raise newException(DialFailedError, + "Address without PeerID and unknown peer id disabled!") - return (await self.internalConnect( + (await self.internalConnect( Opt.none(PeerId), @[address], false)).connection.peerId proc negotiateStream( - self: Dialer, - conn: Connection, - protos: seq[string]): Future[Connection] {.async.} = + self: Dialer, + conn: Connection, + protos: seq[string] +): Future[Connection] {.async: (raises: [CancelledError, LPError]).} = trace "Negotiating stream", conn, protos let selected = await MultistreamSelect.select(conn, protos) if not protos.contains(selected): @@ -277,9 +292,10 @@ method tryDial*( raise newException(DialFailedError, exc.msg) method dial*( - self: Dialer, - peerId: PeerId, - protos: seq[string]): Future[Connection] {.async.} = + self: Dialer, + peerId: PeerId, + protos: seq[string] +): Future[Connection] {.async: (raises: [CancelledError, LPError]).} = ## create a protocol stream over an ## existing connection ## @@ -292,11 +308,12 @@ method dial*( return await self.negotiateStream(stream, protos) method dial*( - self: Dialer, - peerId: PeerId, - addrs: seq[MultiAddress], - protos: seq[string], - forceDial = false): Future[Connection] {.async.} = + self: Dialer, + peerId: PeerId, + addrs: seq[MultiAddress], + protos: seq[string], + forceDial = false +): Future[Connection] {.async: (raises: [CancelledError, LPError]).} = ## create a protocol stream and establish ## a connection if one doesn't exist already ## @@ -305,7 +322,7 @@ method dial*( conn: Muxer stream: Connection - proc cleanup() {.async.} = + proc cleanup() {.async: (raises: []).} = if not(isNil(stream)): await stream.closeWithEOF() @@ -327,7 +344,7 @@ method dial*( trace "Dial canceled", conn await cleanup() raise exc - except CatchableError as exc: + except LPError as exc: debug "Error dialing", conn, err = exc.msg await cleanup() raise exc diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index b342e00d6..345ef2d7e 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -50,10 +50,12 @@ template validateSuffix(str: string): untyped = else: raise newException(MultiStreamError, "MultistreamSelect failed, malformed message") -proc select*(_: MultistreamSelect | type MultistreamSelect, - conn: Connection, - proto: seq[string]): - Future[string] {.async.} = +proc select*( + _: MultistreamSelect | type MultistreamSelect, + conn: Connection, + proto: seq[string] +): Future[string] {.async: (raises: [ + CancelledError, LPStreamError, MultiStreamError]).} = trace "initiating handshake", conn, codec = Codec ## select a remote protocol await conn.writeLp(Codec & "\n") # write handshake @@ -98,15 +100,22 @@ proc select*(_: MultistreamSelect | type MultistreamSelect, # No alternatives, fail return "" -proc select*(_: MultistreamSelect | type MultistreamSelect, - conn: Connection, - proto: string): Future[bool] {.async.} = +proc select*( + _: MultistreamSelect | type MultistreamSelect, + conn: Connection, + proto: string +): Future[bool] {.async: (raises: [ + CancelledError, LPStreamError, MultiStreamError]).} = if proto.len > 0: - return (await MultistreamSelect.select(conn, @[proto])) == proto + (await MultistreamSelect.select(conn, @[proto])) == proto else: - return (await MultistreamSelect.select(conn, @[])) == Codec + (await MultistreamSelect.select(conn, @[])) == Codec -proc select*(m: MultistreamSelect, conn: Connection): Future[bool] = +proc select*( + m: MultistreamSelect, + conn: Connection +): Future[bool] {.async: (raises: [ + CancelledError, LPStreamError, MultiStreamError], raw: true).} = m.select(conn, "") proc list*(m: MultistreamSelect, diff --git a/libp2p/nameresolving/dnsresolver.nim b/libp2p/nameresolving/dnsresolver.nim index 58bfdf208..0751d5827 100644 --- a/libp2p/nameresolving/dnsresolver.nim +++ b/libp2p/nameresolving/dnsresolver.nim @@ -83,11 +83,11 @@ proc getDnsResponse( await sock.closeWait() method resolveIp*( - self: DnsResolver, - address: string, - port: Port, - domain: Domain = Domain.AF_UNSPEC): Future[seq[TransportAddress]] {.async.} = - + self: DnsResolver, + address: string, + port: Port, + domain: Domain = Domain.AF_UNSPEC +): Future[seq[TransportAddress]] {.async: (raises: [CancelledError]).} = trace "Resolving IP using DNS", address, servers = self.nameServers.mapIt($it), domain for _ in 0 ..< self.nameServers.len: let server = self.nameServers[0] @@ -132,14 +132,21 @@ method resolveIp*( continue trace "Got IPs from DNS server", resolvedAddresses, server = $server - return resolvedAddresses.toSeq().mapIt(initTAddress(it, port)) + var res = newSeqOfCap[TransportAddress](resolvedAddresses.len) + for address in resolvedAddresses: + try: + res.add(initTAddress(address, port)) + except TransportAddressError as e: + debug "Failed to parse IP from DNS server", error=e.msg + return res debug "Failed to resolve address, returning empty set" return @[] method resolveTxt*( - self: DnsResolver, - address: string): Future[seq[string]] {.async.} = + self: DnsResolver, + address: string +): Future[seq[string]] {.async: (raises: [CancelledError]).} = trace "Resolving TXT using DNS", address, servers = self.nameServers.mapIt($it) for _ in 0 ..< self.nameServers.len: diff --git a/libp2p/nameresolving/mockresolver.nim b/libp2p/nameresolving/mockresolver.nim index 492dc1c42..fc3ccd696 100644 --- a/libp2p/nameresolving/mockresolver.nim +++ b/libp2p/nameresolving/mockresolver.nim @@ -26,21 +26,36 @@ type MockResolver* = ref object of NameResolver ipResponses*: Table[(string, bool), seq[string]] method resolveIp*( - self: MockResolver, - address: string, - port: Port, - domain: Domain = Domain.AF_UNSPEC): Future[seq[TransportAddress]] {.async.} = + self: MockResolver, + address: string, + port: Port, + domain: Domain = Domain.AF_UNSPEC +): Future[seq[TransportAddress]] {.async: (raises: [ + CancelledError], raw: true).} = + var res: seq[TransportAddress] if domain == Domain.AF_INET or domain == Domain.AF_UNSPEC: for resp in self.ipResponses.getOrDefault((address, false)): - result.add(initTAddress(resp, port)) + try: + res.add(initTAddress(resp, port)) + except TransportAddressError: + raiseAssert("ipResponses should only contain valid IP addresses") if domain == Domain.AF_INET6 or domain == Domain.AF_UNSPEC: for resp in self.ipResponses.getOrDefault((address, true)): - result.add(initTAddress(resp, port)) + try: + res.add(initTAddress(resp, port)) + except TransportAddressError: + raiseAssert("ipResponses should only contain valid IP addresses") + let fut = newFuture[seq[TransportAddress]]() + fut.complete(res) + fut method resolveTxt*( - self: MockResolver, - address: string): Future[seq[string]] {.async.} = - return self.txtResponses.getOrDefault(address) + self: MockResolver, + address: string +): Future[seq[string]] {.async: (raises: [CancelledError], raw: true).} = + let fut = newFuture[seq[string]]() + fut.complete(self.txtResponses.getOrDefault(address)) + fut proc new*(T: typedesc[MockResolver]): T = T() diff --git a/libp2p/nameresolving/nameresolver.nim b/libp2p/nameresolving/nameresolver.nim index be30b46cd..090fd9f7a 100644 --- a/libp2p/nameresolving/nameresolver.nim +++ b/libp2p/nameresolving/nameresolver.nim @@ -14,7 +14,7 @@ import chronos, chronicles, stew/endians2 -import ".."/[multiaddress, multicodec] +import ".."/[errors, multiaddress, multicodec] logScope: topics = "libp2p nameresolver" @@ -23,22 +23,23 @@ type NameResolver* = ref object of RootObj method resolveTxt*( - self: NameResolver, - address: string): Future[seq[string]] {.async, base.} = + self: NameResolver, + address: string +): Future[seq[string]] {.async: (raises: [CancelledError], raw: true), base.} = ## Get TXT record ## - - doAssert(false, "Not implemented!") + raiseAssert("Not implemented!") method resolveIp*( - self: NameResolver, - address: string, - port: Port, - domain: Domain = Domain.AF_UNSPEC): Future[seq[TransportAddress]] {.async, base.} = + self: NameResolver, + address: string, + port: Port, + domain: Domain = Domain.AF_UNSPEC +): Future[seq[TransportAddress]] {.async: (raises: [ + CancelledError], raw: true), base.} = ## Resolve the specified address ## - - doAssert(false, "Not implemented!") + raiseAssert("Not implemented!") proc getHostname*(ma: MultiAddress): string = let @@ -48,17 +49,18 @@ proc getHostname*(ma: MultiAddress): string = else: "" proc resolveOneAddress( - self: NameResolver, - ma: MultiAddress, - domain: Domain = Domain.AF_UNSPEC, - prefix = ""): Future[seq[MultiAddress]] - {.async.} = + self: NameResolver, + ma: MultiAddress, + domain: Domain = Domain.AF_UNSPEC, + prefix = "" +): Future[seq[MultiAddress]] {.async: (raises: [CancelledError, LPError]).} = #Resolve a single address var pbuf: array[2, byte] var dnsval = getHostname(ma) - if ma[1].tryGet().protoArgument(pbuf).tryGet() == 0: + if ma[1].tryGet() + .protoArgument(pbuf).tryGet() == 0: raise newException(MaError, "Incorrect port number") let port = Port(fromBytesBE(uint16, pbuf)) @@ -66,17 +68,18 @@ proc resolveOneAddress( return collect(newSeqOfCap(4)): for address in resolvedAddresses: - var createdAddress = MultiAddress.init(address).tryGet()[0].tryGet() + var createdAddress = MultiAddress.init(address) + .tryGet()[0].tryGet() for part in ma: if DNS.match(part.tryGet()): continue createdAddress &= part.tryGet() createdAddress proc resolveDnsAddr*( - self: NameResolver, - ma: MultiAddress, - depth: int = 0): Future[seq[MultiAddress]] {.async.} = - + self: NameResolver, + ma: MultiAddress, + depth: int = 0 +): Future[seq[MultiAddress]] {.async: (raises: [CancelledError, LPError]).} = if not DNSADDR.matchPartial(ma): return @[ma] @@ -96,7 +99,8 @@ proc resolveDnsAddr*( if not entry.startsWith("dnsaddr="): continue let entryValue = MultiAddress.init(entry[8..^1]).tryGet() - if entryValue.contains(multiCodec("p2p")).tryGet() and ma.contains(multiCodec("p2p")).tryGet(): + if entryValue.contains(multiCodec("p2p")).tryGet() and + ma.contains(multiCodec("p2p")).tryGet(): if entryValue[multiCodec("p2p")] != ma[multiCodec("p2p")]: continue @@ -111,14 +115,16 @@ proc resolveDnsAddr*( proc resolveMAddress*( - self: NameResolver, - address: MultiAddress): Future[seq[MultiAddress]] {.async.} = + self: NameResolver, + address: MultiAddress +): Future[seq[MultiAddress]] {.async: (raises: [CancelledError, LPError]).} = var res = initOrderedSet[MultiAddress]() if not DNS.matchPartial(address): res.incl(address) else: - let code = address[0].tryGet().protoCode().tryGet() + let code = address[0].tryGet() + .protoCode().tryGet() let seq = case code: of multiCodec("dns"): await self.resolveOneAddress(address) diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index a31f42eb5..9dc231fa3 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -85,20 +85,16 @@ proc parseFullAddress*(ma: string | seq[byte]): MaResult[(PeerId, MultiAddress)] parseFullAddress(? MultiAddress.init(ma)) proc new*( - p: typedesc[PeerInfo], - key: PrivateKey, - listenAddrs: openArray[MultiAddress] = [], - protocols: openArray[string] = [], - protoVersion: string = "", - agentVersion: string = "", - addressMappers = newSeq[AddressMapper](), - ): PeerInfo - {.raises: [LPError].} = - - let pubkey = try: - key.getPublicKey().tryGet() - except CatchableError: - raise newException(PeerInfoError, "invalid private key") + p: typedesc[PeerInfo], + key: PrivateKey, + listenAddrs: openArray[MultiAddress] = [], + protocols: openArray[string] = [], + protoVersion: string = "", + agentVersion: string = "", + addressMappers = newSeq[AddressMapper]() +): PeerInfo {.raises: [LPError].} = + let pubkey = key.getPublicKey().valueOr: + raise newException(PeerInfoError, "invalid private key") let peerId = PeerId.init(key).tryGet() diff --git a/libp2p/peerstore.nim b/libp2p/peerstore.nim index 41698b75d..be571dd86 100644 --- a/libp2p/peerstore.nim +++ b/libp2p/peerstore.nim @@ -194,8 +194,8 @@ proc cleanup*( peerStore.toClean.delete(0) proc identify*( - peerStore: PeerStore, - muxer: Muxer) {.async.} = + peerStore: PeerStore, + muxer: Muxer) {.async: (raises: [CancelledError, LPError]).} = # new stream for identify var stream = await muxer.newStream() diff --git a/libp2p/protocols/connectivity/relay/client.nim b/libp2p/protocols/connectivity/relay/client.nim index 3ddeb991d..8eeeeb0ee 100644 --- a/libp2p/protocols/connectivity/relay/client.nim +++ b/libp2p/protocols/connectivity/relay/client.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -30,9 +30,13 @@ type ReservationError* = object of RelayClientError RelayV1DialError* = object of RelayClientError RelayV2DialError* = object of RelayClientError - RelayClientAddConn* = proc(conn: Connection, - duration: uint32, - data: uint64): Future[void] {.gcsafe, raises: [].} + + RelayClientAddConn* = proc( + conn: Connection, + duration: uint32, + data: uint64 + ): Future[void] {.async: (raises: []).} + RelayClient* = ref object of Relay onNewConnection*: RelayClientAddConn canHop: bool @@ -44,12 +48,19 @@ type limitDuration*: uint32 # seconds limitData*: uint64 # bytes -proc sendStopError(conn: Connection, code: StatusV2) {.async.} = +proc sendStopError( + conn: Connection, + code: StatusV2 +): Future[void] {.async: (raises: [ + CancelledError, LPStreamError], raw: true).} = trace "send stop status", status = $code & " (" & $ord(code) & ")" let msg = StopMessage(msgType: StopMessageType.Status, status: Opt.some(code)) - await conn.writeLp(encode(msg).buffer) + conn.writeLp(encode(msg).buffer) -proc handleRelayedConnect(cl: RelayClient, conn: Connection, msg: StopMessage) {.async.} = +proc handleRelayedConnect( + cl: RelayClient, + conn: Connection, + msg: StopMessage) {.async: (raises: [CancelledError, LPStreamError]).} = let # TODO: check the go version to see in which way this could fail # it's unclear in the spec @@ -72,12 +83,16 @@ proc handleRelayedConnect(cl: RelayClient, conn: Connection, msg: StopMessage) { await conn.writeLp(pb.buffer) # This sound redundant but the callback could, in theory, be set to nil during # conn.writeLp so it's safer to double check - if cl.onNewConnection != nil: await cl.onNewConnection(conn, limitDuration, limitData) - else: await conn.close() + if cl.onNewConnection != nil: + await cl.onNewConnection(conn, limitDuration, limitData) + else: + await conn.close() -proc reserve*(cl: RelayClient, - peerId: PeerId, - addrs: seq[MultiAddress] = @[]): Future[Rsvp] {.async.} = +proc reserve*( + cl: RelayClient, + peerId: PeerId, + addrs: seq[MultiAddress] = @[] +): Future[Rsvp] {.async: (raises: [CancelledError, LPError]).} = let conn = await cl.switch.dial(peerId, addrs, RelayV2HopCodec) defer: await conn.close() let @@ -87,9 +102,10 @@ proc reserve*(cl: RelayClient, HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet() except CancelledError as exc: raise exc - except CatchableError as exc: - trace "error writing or reading reservation message", exc=exc.msg - raise newException(ReservationError, exc.msg) + except LPError, ResultError[void]: + let msg = getCurrentExceptionMsg() + trace "error writing or reading reservation message", exc = msg + raise newException(ReservationError, msg) if msg.msgType != HopMessageType.Status: raise newException(ReservationError, "Unexpected relay response type") @@ -118,11 +134,14 @@ proc dialPeerV1*( cl: RelayClient, conn: Connection, dstPeerId: PeerId, - dstAddrs: seq[MultiAddress]): Future[Connection] {.async.} = + dstAddrs: seq[MultiAddress] +): Future[Connection] {.async: (raises: [ + CancelledError, LPStreamError, RelayV1DialError]).} = var msg = RelayMessage( msgType: Opt.some(RelayType.Hop), - srcPeer: Opt.some(RelayPeer(peerId: cl.switch.peerInfo.peerId, addrs: cl.switch.peerInfo.addrs)), + srcPeer: Opt.some(RelayPeer( + peerId: cl.switch.peerInfo.peerId, addrs: cl.switch.peerInfo.addrs)), dstPeer: Opt.some(RelayPeer(peerId: dstPeerId, addrs: dstAddrs))) pb = encode(msg) @@ -132,7 +151,7 @@ proc dialPeerV1*( await conn.writeLp(pb.buffer) except CancelledError as exc: raise exc - except CatchableError as exc: + except LPStreamError as exc: trace "error writing hop request", exc=exc.msg raise exc @@ -140,31 +159,35 @@ proc dialPeerV1*( RelayMessage.decode(await conn.readLp(RelayClientMsgSize)) except CancelledError as exc: raise exc - except CatchableError as exc: + except LPStreamError as exc: trace "error reading stop response", exc=exc.msg await sendStatus(conn, StatusV1.HopCantOpenDstStream) raise exc try: let msgRcvFromRelay = msgRcvFromRelayOpt.valueOr: - raise newException(RelayV1DialError, "Hop can't open destination stream") + raise newException(RelayV1DialError, + "Hop can't open destination stream") if msgRcvFromRelay.msgType.tryGet() != RelayType.Status: - raise newException(RelayV1DialError, "Hop can't open destination stream: wrong message type") + raise newException(RelayV1DialError, + "Hop can't open destination stream: wrong message type") if msgRcvFromRelay.status.tryGet() != StatusV1.Success: - raise newException(RelayV1DialError, "Hop can't open destination stream: status failed") + raise newException(RelayV1DialError, + "Hop can't open destination stream: status failed") except RelayV1DialError as exc: await sendStatus(conn, StatusV1.HopCantOpenDstStream) raise exc - except ValueError as exc: + except ResultError[void] as exc: await sendStatus(conn, StatusV1.HopCantOpenDstStream) raise newException(RelayV1DialError, exc.msg) - result = conn + conn proc dialPeerV2*( cl: RelayClient, conn: RelayConnection, dstPeerId: PeerId, - dstAddrs: seq[MultiAddress]): Future[Connection] {.async.} = + dstAddrs: seq[MultiAddress] +): Future[Connection] {.async: (raises: [CancelledError, RelayV2DialError]).} = let p = Peer(peerId: dstPeerId, addrs: dstAddrs) pb = encode(HopMessage(msgType: HopMessageType.Connect, peer: Opt.some(p))) @@ -176,8 +199,11 @@ proc dialPeerV2*( HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet() except CancelledError as exc: raise exc - except CatchableError as exc: - trace "error reading stop response", exc=exc.msg + except LPStreamError as exc: + trace "error reading stop response", exc = exc.msg + raise newException(RelayV2DialError, exc.msg) + except ResultError[void] as exc: + trace "error reading stop response", exc = exc.msg raise newException(RelayV2DialError, exc.msg) if msgRcvFromRelay.msgType != HopMessageType.Status: @@ -187,7 +213,7 @@ proc dialPeerV2*( raise newException(RelayV2DialError, "Relay stop failure") conn.limitDuration = msgRcvFromRelay.limit.duration conn.limitData = msgRcvFromRelay.limit.data - return conn + conn proc handleStopStreamV2(cl: RelayClient, conn: Connection) {.async.} = let msg = StopMessage.decode(await conn.readLp(RelayClientMsgSize)).valueOr: diff --git a/libp2p/protocols/connectivity/relay/utils.nim b/libp2p/protocols/connectivity/relay/utils.nim index ac1965921..46ba9cf79 100644 --- a/libp2p/protocols/connectivity/relay/utils.nim +++ b/libp2p/protocols/connectivity/relay/utils.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -21,36 +21,48 @@ const RelayV2HopCodec* = "/libp2p/circuit/relay/0.2.0/hop" RelayV2StopCodec* = "/libp2p/circuit/relay/0.2.0/stop" -proc sendStatus*(conn: Connection, code: StatusV1) {.async.} = +proc sendStatus*( + conn: Connection, + code: StatusV1 +) {.async: (raises: [CancelledError, LPStreamError], raw: true).} = trace "send relay/v1 status", status = $code & "(" & $ord(code) & ")" let - msg = RelayMessage(msgType: Opt.some(RelayType.Status), status: Opt.some(code)) + msg = RelayMessage( + msgType: Opt.some(RelayType.Status), status: Opt.some(code)) pb = encode(msg) - await conn.writeLp(pb.buffer) + conn.writeLp(pb.buffer) -proc sendHopStatus*(conn: Connection, code: StatusV2) {.async.} = +proc sendHopStatus*( + conn: Connection, + code: StatusV2 +) {.async: (raises: [CancelledError, LPStreamError], raw: true).} = trace "send hop relay/v2 status", status = $code & "(" & $ord(code) & ")" let msg = HopMessage(msgType: HopMessageType.Status, status: Opt.some(code)) pb = encode(msg) - await conn.writeLp(pb.buffer) + conn.writeLp(pb.buffer) -proc sendStopStatus*(conn: Connection, code: StatusV2) {.async.} = +proc sendStopStatus*( + conn: Connection, + code: StatusV2 +) {.async: (raises: [CancelledError, LPStreamError], raw: true).} = trace "send stop relay/v2 status", status = $code & " (" & $ord(code) & ")" let msg = StopMessage(msgType: StopMessageType.Status, status: Opt.some(code)) pb = encode(msg) - await conn.writeLp(pb.buffer) + conn.writeLp(pb.buffer) -proc bridge*(connSrc: Connection, connDst: Connection) {.async.} = +proc bridge*( + connSrc: Connection, + connDst: Connection) {.async: (raises: [CancelledError]).} = const bufferSize = 4096 var bufSrcToDst: array[bufferSize, byte] bufDstToSrc: array[bufferSize, byte] - futSrc = connSrc.readOnce(addr bufSrcToDst[0], bufSrcToDst.high + 1) - futDst = connDst.readOnce(addr bufDstToSrc[0], bufDstToSrc.high + 1) - bytesSendFromSrcToDst = 0 - bytesSendFromDstToSrc = 0 + futSrc = connSrc.readOnce(addr bufSrcToDst[0], bufSrcToDst.len) + futDst = connDst.readOnce(addr bufDstToSrc[0], bufDstToSrc.len) + bytesSentFromSrcToDst = 0 + bytesSentFromDstToSrc = 0 bufRead: int try: @@ -61,25 +73,25 @@ proc bridge*(connSrc: Connection, connDst: Connection) {.async.} = if futSrc.finished(): bufRead = await futSrc if bufRead > 0: - bytesSendFromSrcToDst.inc(bufRead) - await connDst.write(@bufSrcToDst[0.. 0: - bytesSendFromDstToSrc += bufRead - await connSrc.write(bufDstToSrc[0..