From 6dd16c138ef9f14cf89d208f59acccba3cd52c20 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Wed, 6 Mar 2024 20:58:22 +0100 Subject: [PATCH] X --- .../protocols/connectivity/relay/client.nim | 4 +- libp2p/protocols/connectivity/relay/relay.nim | 4 +- libp2p/protocols/identify.nim | 12 ++- libp2p/protocols/ping.nim | 12 +-- libp2p/protocols/protocol.nim | 19 ++-- libp2p/protocols/rendezvous.nim | 4 +- libp2p/protocols/secure/noise.nim | 44 +++++++-- libp2p/protocols/secure/secio.nim | 99 ++++++++++++------- libp2p/protocols/secure/secure.nim | 49 ++++----- 9 files changed, 155 insertions(+), 92 deletions(-) diff --git a/libp2p/protocols/connectivity/relay/client.nim b/libp2p/protocols/connectivity/relay/client.nim index 3ddeb991d..85cf12eca 100644 --- a/libp2p/protocols/connectivity/relay/client.nim +++ b/libp2p/protocols/connectivity/relay/client.nim @@ -266,7 +266,9 @@ proc new*(T: typedesc[RelayClient], canHop: bool = false, maxCircuitPerPeer: maxCircuitPerPeer, msgSize: msgSize, isCircuitRelayV1: circuitRelayV1) - proc handleStream(conn: Connection, proto: string) {.async.} = + proc handleStream( + conn: Connection, + proto: string) {.async: (raises: [CancelledError]).} = try: case proto: of RelayV1Codec: await cl.handleStreamV1(conn) diff --git a/libp2p/protocols/connectivity/relay/relay.nim b/libp2p/protocols/connectivity/relay/relay.nim index b078cdb46..6c7c7bbd5 100644 --- a/libp2p/protocols/connectivity/relay/relay.nim +++ b/libp2p/protocols/connectivity/relay/relay.nim @@ -336,7 +336,9 @@ proc new*(T: typedesc[Relay], msgSize: msgSize, isCircuitRelayV1: circuitRelayV1) - proc handleStream(conn: Connection, proto: string) {.async.} = + proc handleStream( + conn: Connection, + proto: string) {.async: (raises: [CancelledError]).} = try: case proto: of RelayV2HopCodec: await r.handleHopStreamV2(conn) diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 3a1f6f925..8533a586d 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -151,14 +151,16 @@ proc new*( identify method init*(p: Identify) = - proc handle(conn: Connection, proto: string) {.async.} = + proc handle( + conn: Connection, + proto: string) {.async: (raises: [CancelledError]).} = try: trace "handling identify request", conn var pb = encodeMsg(p.peerInfo, conn.observedAddr, p.sendSignedPeerRecord) await conn.writeLp(pb.buffer) except CancelledError as exc: raise exc - except CatchableError as exc: + except LPStreamError as exc: trace "exception in identify handler", exc = exc.msg, conn finally: trace "exiting identify handler", conn @@ -204,7 +206,9 @@ proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T {.pu identifypush proc init*(p: IdentifyPush) = - proc handle(conn: Connection, proto: string) {.async.} = + proc handle( + conn: Connection, + proto: string) {.async: (raises: [CancelledError]).} = trace "handling identify push", conn try: var message = await conn.readLp(64*1024) diff --git a/libp2p/protocols/ping.nim b/libp2p/protocols/ping.nim index 0921022b9..f7c36960a 100644 --- a/libp2p/protocols/ping.nim +++ b/libp2p/protocols/ping.nim @@ -36,10 +36,8 @@ type PingError* = object of LPError WrongPingAckError* = object of PingError - PingHandler* {.public.} = proc ( - peer: PeerId): - Future[void] - {.gcsafe, raises: [].} + PingHandler* {.public.} = + proc (peer: PeerId): Future[void] {.async: (raises: []).} Ping* = ref object of LPProtocol pingHandler*: PingHandler @@ -51,7 +49,9 @@ proc new*(T: typedesc[Ping], handler: PingHandler = nil, rng: ref HmacDrbgContex ping method init*(p: Ping) = - proc handle(conn: Connection, proto: string) {.async.} = + proc handle( + conn: Connection, + proto: string) {.async: (raises: [CancelledError]).} = try: trace "handling ping", conn var buf: array[PingSize, byte] @@ -62,7 +62,7 @@ method init*(p: Ping) = await p.pingHandler(conn.peerId) except CancelledError as exc: raise exc - except CatchableError as exc: + except LPStreamError as exc: trace "exception in ping handler", exc = exc.msg, conn p.handler = handle diff --git a/libp2p/protocols/protocol.nim b/libp2p/protocols/protocol.nim index cb328849d..b9becb53f 100644 --- a/libp2p/protocols/protocol.nim +++ b/libp2p/protocols/protocol.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -19,14 +19,13 @@ const type LPProtoHandler* = proc ( - conn: Connection, - proto: string): - Future[void] - {.gcsafe, raises: [].} + conn: Connection, + proto: string + ): Future[void] {.async: (raises: [CancelledError]).} LPProtocol* = ref object of RootObj codecs*: seq[string] - handler*: LPProtoHandler ## this handler gets invoked by the protocol negotiator + handler*: LPProtoHandler ## gets invoked by the protocol negotiator started*: bool maxIncomingStreams: Opt[int] @@ -50,10 +49,10 @@ func `codec=`*(p: LPProtocol, codec: string) = p.codecs.insert(codec, 0) proc new*( - T: type LPProtocol, - codecs: seq[string], - handler: LPProtoHandler, - maxIncomingStreams: Opt[int] | int = Opt.none(int)): T = + T: type LPProtocol, + codecs: seq[string], + handler: LPProtoHandler, + maxIncomingStreams: Opt[int] | int = Opt.none(int)): T = T( codecs: codecs, handler: handler, diff --git a/libp2p/protocols/rendezvous.nim b/libp2p/protocols/rendezvous.nim index 4bacb487d..c5459fa8a 100644 --- a/libp2p/protocols/rendezvous.nim +++ b/libp2p/protocols/rendezvous.nim @@ -636,7 +636,9 @@ proc new*(T: typedesc[RendezVous], sema: newAsyncSemaphore(SemaphoreDefaultSize) ) logScope: topics = "libp2p discovery rendezvous" - proc handleStream(conn: Connection, proto: string) {.async.} = + proc handleStream( + conn: Connection, + proto: string) {.async: (raises: [CancelledError]).} = try: let buf = await conn.readLp(4096) diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index a5a2b8c16..f56e7242c 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -315,7 +315,11 @@ proc readFrame( await sconn.readExactly(addr buffer[0], buffer.len) return buffer -proc writeFrame(sconn: Connection, buf: openArray[byte]): Future[void] = +proc writeFrame( + sconn: Connection, + buf: openArray[byte] +): Future[void] {.async: (raises: [ + CancelledError, LPStreamError], raw: true).} = doAssert buf.len <= uint16.high.int var lesize = buf.len.uint16 @@ -326,13 +330,24 @@ proc writeFrame(sconn: Connection, buf: openArray[byte]): Future[void] = outbuf &= buf sconn.write(outbuf) -proc receiveHSMessage(sconn: Connection): Future[seq[byte]] = readFrame(sconn) -proc sendHSMessage(sconn: Connection, buf: openArray[byte]): Future[void] = +proc receiveHSMessage( + sconn: Connection +): Future[seq[byte]] {.async: (raises: [ + CancelledError, LPStreamError], raw: true).} = + readFrame(sconn) + +proc sendHSMessage( + sconn: Connection, + buf: openArray[byte] +): Future[void] {.async: (raises: [ + CancelledError, LPStreamError], raw: true).} = writeFrame(sconn, buf) proc handshakeXXOutbound( p: Noise, conn: Connection, - p2pSecret: seq[byte]): Future[HandshakeResult] {.async.} = + p2pSecret: seq[byte] +): Future[HandshakeResult] {.async: (raises: [ + CancelledError, LPStreamError]).} = const initiator = true var hs = HandshakeState.init() @@ -380,7 +395,9 @@ proc handshakeXXOutbound( proc handshakeXXInbound( p: Noise, conn: Connection, - p2pSecret: seq[byte]): Future[HandshakeResult] {.async.} = + p2pSecret: seq[byte] +): Future[HandshakeResult] {.async: (raises: [ + CancelledError, LPStreamError]).} = const initiator = false var @@ -509,21 +526,28 @@ method write*( # sequencing issues sconn.stream.write(cipherFrames) -method handshake*(p: Noise, conn: Connection, initiator: bool, peerId: Opt[PeerId]): Future[SecureConn] {.async.} = +method handshake*( + p: Noise, + conn: Connection, + initiator: bool, + peerId: Opt[PeerId] +): Future[SecureConn] {.async: (raises: [CancelledError, LPStreamError]).} = trace "Starting Noise handshake", conn, initiator let timeout = conn.timeout conn.timeout = HandshakeTimeout # https://github.com/libp2p/specs/tree/master/noise#libp2p-data-in-handshake-messages - let - signedPayload = p.localPrivateKey.sign( - PayloadString & p.noiseKeys.publicKey.getBytes).tryGet() + let signedPayload = p.localPrivateKey.sign( + PayloadString & p.noiseKeys.publicKey.getBytes) + if signedPayload.isErr(): + raise newException(NoiseHandshakeError, + "Failed to sign public key: " & $signedPayload.error()) var libp2pProof = initProtoBuffer() libp2pProof.write(1, p.localPublicKey) - libp2pProof.write(2, signedPayload.getBytes()) + libp2pProof.write(2, signedPayload.get().getBytes()) # data field also there but not used! libp2pProof.finish() diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index 032fd0df5..e6d166072 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -259,13 +259,13 @@ method write*( await sconn.stream.write(msg) sconn.activity = true -proc newSecioConn(conn: Connection, - hash: string, - cipher: string, - secrets: Secret, - order: int, - remotePubKey: PublicKey): SecioConn - {.raises: [LPError].} = +proc newSecioConn( + conn: Connection, + hash: string, + cipher: string, + secrets: Secret, + order: int, + remotePubKey: PublicKey): SecioConn = ## Create new secure stream/lpstream, using specified hash algorithm ``hash``, ## cipher algorithm ``cipher``, stretched keys ``secrets`` and order ## ``order``. @@ -288,13 +288,20 @@ proc newSecioConn(conn: Connection, result.readerCoder.init(cipher, secrets.keyOpenArray(i1), secrets.ivOpenArray(i1)) -proc transactMessage(conn: Connection, - msg: seq[byte]): Future[seq[byte]] {.async.} = +proc transactMessage( + conn: Connection, + msg: seq[byte] +): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} = trace "Sending message", message = msg.shortLog, length = len(msg) await conn.write(msg) - return await conn.readRawMessage() + await conn.readRawMessage() -method handshake*(s: Secio, conn: Connection, initiator: bool, peerId: Opt[PeerId]): Future[SecureConn] {.async.} = +method handshake*( + s: Secio, + conn: Connection, + initiator: bool, + peerId: Opt[PeerId] +): Future[SecureConn] {.async: (raises: [CancelledError, LPStreamError]).} = var localNonce: array[SecioNonceSize, byte] remoteNonce: seq[byte] @@ -307,25 +314,29 @@ method handshake*(s: Secio, conn: Connection, initiator: bool, peerId: Opt[PeerI remoteExchanges: string remoteCiphers: string remoteHashes: string - remotePeerId: PeerId - localPeerId: PeerId - localBytesPubkey = s.localPublicKey.getBytes().tryGet() + localBytesPubkey = s.localPublicKey.getBytes() + if localBytesPubkey.isErr(): + raise (ref SecioError)( + msg: "Failed to get local public key bytes: " & $localBytesPubkey.error()) hmacDrbgGenerate(s.rng[], localNonce) var request = createProposal(localNonce, - localBytesPubkey, + localBytesPubkey.get(), SecioExchanges, SecioCiphers, SecioHashes) - localPeerId = PeerId.init(s.localPublicKey).tryGet() + let localPeerId = PeerId.init(s.localPublicKey) + if localPeerId.isErr(): + raise (ref SecioError)( + msg: "Failed to initialize local peer ID: " & $localPeerId.error()) trace "Local proposal", schemes = SecioExchanges, ciphers = SecioCiphers, hashes = SecioHashes, - pubkey = localBytesPubkey.shortLog, - peer = localPeerId + pubkey = localBytesPubkey.get().shortLog, + peer = localPeerId.get() var answer = await transactMessage(conn, request) @@ -343,39 +354,54 @@ method handshake*(s: Secio, conn: Connection, initiator: bool, peerId: Opt[PeerI pubkey = remoteBytesPubkey.shortLog raise (ref SecioError)(msg: "Remote public key incorrect or corrupted") - remotePeerId = PeerId.init(remotePubkey).tryGet() + let remotePeerId = PeerId.init(remotePubkey) + if remotePeerId.isErr(): + raise (ref SecioError)( + msg: "Failed to initialize remote peer ID: " & $remotePeerId.error()) peerId.withValue(targetPid): if not targetPid.validate(): raise newException(SecioError, "Failed to validate expected peerId.") - if remotePeerId != targetPid: + if remotePeerId.get() != targetPid: raise newException(SecioError, "Peer ids don't match!") - conn.peerId = remotePeerId - let order = getOrder(remoteBytesPubkey, localNonce, localBytesPubkey, - remoteNonce).tryGet() + conn.peerId = remotePeerId.get() + let order = getOrder( + remoteBytesPubkey, localNonce, localBytesPubkey.get(), remoteNonce) + if order.isErr(): + raise (ref SecioError)(msg: "Failed to get order: " & $order.error()) trace "Remote proposal", schemes = remoteExchanges, ciphers = remoteCiphers, hashes = remoteHashes, - pubkey = remoteBytesPubkey.shortLog, order = order, - peer = remotePeerId + pubkey = remoteBytesPubkey.shortLog, + order = order.get(), + peer = remotePeerId.get() - let scheme = selectBest(order, SecioExchanges, remoteExchanges) - let cipher = selectBest(order, SecioCiphers, remoteCiphers) - let hash = selectBest(order, SecioHashes, remoteHashes) + let scheme = selectBest(order.get(), SecioExchanges, remoteExchanges) + let cipher = selectBest(order.get(), SecioCiphers, remoteCiphers) + let hash = selectBest(order.get(), SecioHashes, remoteHashes) if len(scheme) == 0 or len(cipher) == 0 or len(hash) == 0: - trace "No algorithms in common", peer = remotePeerId + trace "No algorithms in common", peer = remotePeerId.get() raise (ref SecioError)(msg: "No algorithms in common") trace "Encryption scheme selected", scheme = scheme, cipher = cipher, hash = hash - var ekeypair = ephemeral(scheme, s.rng[]).tryGet() + let ekeypair = ephemeral(scheme, s.rng[]) + if ekeypair.isErr(): + raise (ref SecioError)( + msg: "Failed to create ephemeral keypair: " & $ekeypair.error()) # We need EC public key in raw binary form - var epubkey = ekeypair.pubkey.getRawBytes().tryGet() - var localCorpus = request[4..^1] & answer & epubkey - var signature = s.localPrivateKey.sign(localCorpus).tryGet() + let epubkey = ekeypair.get().pubkey.getRawBytes() + if epubkey.isErr(): + raise (ref SecioError)( + msg: "Failed to get ephemeral key bytes: " & $epubkey.error()) + var localCorpus = request[4..^1] & answer & epubkey.get() + let signature = s.localPrivateKey.sign(localCorpus) + if signature.isErr(): + raise (ref SecioError)( + msg: "Failed to sign local corpus: " & $signature.error()) - var localExchange = createExchange(epubkey, signature.getBytes()) + var localExchange = createExchange(epubkey.get(), signature.get().getBytes()) var remoteExchange = await transactMessage(conn, localExchange) if len(remoteExchange) == 0: trace "Corpus exchange failed", conn @@ -404,7 +430,7 @@ method handshake*(s: Secio, conn: Connection, initiator: bool, peerId: Opt[PeerI pubkey = toHex(remoteEBytesPubkey) raise (ref SecioError)(msg: "Remote ephemeral public key incorrect or corrupted") - var secret = getSecret(remoteEPubkey, ekeypair.seckey) + var secret = getSecret(remoteEPubkey, ekeypair.get().seckey) if len(secret) == 0: trace "Shared secret could not be created" raise (ref SecioError)(msg: "Shared secret could not be created") @@ -421,7 +447,8 @@ method handshake*(s: Secio, conn: Connection, initiator: bool, peerId: Opt[PeerI # Perform Nonce exchange over encrypted channel. - var secioConn = newSecioConn(conn, hash, cipher, keys, order, remotePubkey) + var secioConn = newSecioConn( + conn, hash, cipher, keys, order.get(), remotePubkey) result = secioConn await secioConn.write(remoteNonce) var res = await secioConn.readMessage() diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index b1270216b..e16e2bf5c 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -79,16 +79,21 @@ method readMessage*( method getWrapped*(s: SecureConn): Connection = s.stream -method handshake*(s: Secure, - conn: Connection, - initiator: bool, - peerId: Opt[PeerId]): Future[SecureConn] {.async, base.} = +method handshake*( + s: Secure, + conn: Connection, + initiator: bool, + peerId: Opt[PeerId] +): Future[SecureConn] {.async: (raises: [ + CancelledError, LPStreamError], raw: true), base.} = raiseAssert("Not implemented!") -proc handleConn(s: Secure, - conn: Connection, - initiator: bool, - peerId: Opt[PeerId]): Future[Connection] {.async.} = +proc handleConn( + s: Secure, + conn: Connection, + initiator: bool, + peerId: Opt[PeerId] +): Future[Connection] {.async: (raises: [CancelledError, LPStreamError]).} = var sconn = await s.handshake(conn, initiator, peerId) # mark connection bottom level transport direction # this is the safest place to do this @@ -112,14 +117,8 @@ proc handleConn(s: Secure, fut1 = sconn.close() fut2 = conn.close() await allFutures(fut1, fut2) - if fut1.failed: - let err = fut1.error() - if not (err of CancelledError): - debug "error cleaning up secure connection", err = err.msg, sconn - if fut2.failed: - let err = fut2.error() - if not (err of CancelledError): - debug "error cleaning up secure connection", err = err.msg, sconn + static: doAssert typeof(fut1).E is void # Cannot fail + static: doAssert typeof(fut2).E is void # Cannot fail except CancelledError: # This is top-level procedure which will work as separate task, so it @@ -130,12 +129,14 @@ proc handleConn(s: Secure, # All the errors are handled inside `cleanup()` procedure. asyncSpawn cleanup() - return sconn + sconn method init*(s: Secure) = procCall LPProtocol(s).init() - proc handle(conn: Connection, proto: string) {.async.} = + proc handle( + conn: Connection, + proto: string) {.async: (raises: [CancelledError]).} = trace "handling connection upgrade", proto, conn try: # We don't need the result but we @@ -146,16 +147,18 @@ method init*(s: Secure) = warn "securing connection canceled", conn await conn.close() raise exc - except CatchableError as exc: + except LPStreamError as exc: warn "securing connection failed", err = exc.msg, conn await conn.close() s.handler = handle -method secure*(s: Secure, - conn: Connection, - peerId: Opt[PeerId]): - Future[Connection] {.base.} = +method secure*( + s: Secure, + conn: Connection, + peerId: Opt[PeerId] +): Future[Connection] {.async: (raises: [ + CancelledError, LPStreamError], raw: true), base.} = s.handleConn(conn, conn.dir == Direction.Out, peerId) method readOnce*(