From 3da656687be63ccbf5d659af55d159130d325038 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 2 Jun 2021 07:39:10 -0600 Subject: [PATCH] use LPError more consistently (#582) * use LPError more consistently * don't use Exceptino * annotate with raises * don't panic on concatenation * further rework error handling --- libp2p/builders.nim | 23 +++--- libp2p/connmanager.nim | 8 +-- libp2p/daemon/daemonapi.nim | 13 ++-- libp2p/errors.nim | 11 +-- libp2p/multiaddress.nim | 7 +- libp2p/multistream.nim | 6 +- libp2p/peerinfo.nim | 6 -- libp2p/protocols/pubsub/pubsubpeer.nim | 2 +- libp2p/protocols/pubsub/rpc/message.nim | 20 ++++-- libp2p/stream/lpstream.nim | 6 +- libp2p/switch.nim | 10 +-- libp2p/transports/transport.nim | 5 +- libp2p/wire.nim | 95 +++++++++++++------------ 13 files changed, 110 insertions(+), 102 deletions(-) diff --git a/libp2p/builders.nim b/libp2p/builders.nim index 3e14189a7..8e5f7227c 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -168,19 +168,16 @@ proc build*(b: SwitchBuilder): Switch if isNil(b.rng): b.rng = newRng() - let switch = try: - newSwitch( - peerInfo = peerInfo, - transports = transports, - identity = identify, - muxers = muxers, - secureManagers = secureManagerInstances, - maxConnections = b.maxConnections, - maxIn = b.maxIn, - maxOut = b.maxOut, - maxConnsPerPeer = b.maxConnsPerPeer) - except CatchableError as exc: - raise newException(Defect, exc.msg) + let switch = newSwitch( + peerInfo = peerInfo, + transports = transports, + identity = identify, + muxers = muxers, + secureManagers = secureManagerInstances, + maxConnections = b.maxConnections, + maxIn = b.maxIn, + maxOut = b.maxOut, + maxConnsPerPeer = b.maxConnsPerPeer) return switch diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index cfed78396..1b0a20922 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -375,18 +375,18 @@ proc selectMuxer*(c: ConnManager, conn: Connection): Muxer = debug "no muxer for connection", conn proc storeConn*(c: ConnManager, conn: Connection) - {.raises: [Defect, CatchableError].} = + {.raises: [Defect, LPError].} = ## store a connection ## if isNil(conn): - raise newException(CatchableError, "Connection cannot be nil") + raise newException(LPError, "Connection cannot be nil") if conn.closed or conn.atEof: - raise newException(CatchableError, "Connection closed or EOF") + raise newException(LPError, "Connection closed or EOF") if isNil(conn.peerInfo): - raise newException(CatchableError, "Empty peer info") + raise newException(LPError, "Empty peer info") let peerId = conn.peerInfo.peerId if c.conns.getOrDefault(peerId).len > c.maxConnsPerPeer: diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index f27c446c8..f6734bf70 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -14,10 +14,10 @@ import std/[os, osproc, strutils, tables, strtabs] import pkg/[chronos, chronicles] import ../varint, ../multiaddress, ../multicodec, ../cid, ../peerid import ../wire, ../multihash, ../protobuf/minprotobuf -import ../crypto/crypto +import ../crypto/crypto, ../errors export - peerid, multiaddress, multicodec, multihash, cid, crypto, wire + peerid, multiaddress, multicodec, multihash, cid, crypto, wire, errors when not defined(windows): import posix @@ -501,7 +501,8 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = result = buffer -proc newConnection*(api: DaemonAPI): Future[StreamTransport] = +proc newConnection*(api: DaemonAPI): Future[StreamTransport] + {.raises: [Defect, LPError].} = result = connect(api.address) proc closeConnection*(api: DaemonAPI, transp: StreamTransport): Future[void] = @@ -936,10 +937,10 @@ proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} = if len(stream.protocol) > 0: var handler = api.handlers.getOrDefault(stream.protocol) if not isNil(handler): - asyncCheck handler(api, stream) + asyncSpawn handler(api, stream) proc addHandler*(api: DaemonAPI, protocols: seq[string], - handler: P2PStreamCallback) {.async.} = + handler: P2PStreamCallback) {.async, raises: [Defect, LPError].} = ## Add stream handler ``handler`` for set of protocols ``protocols``. var transp = await api.newConnection() let maddress = await getSocket(api.pattern, addr api.ucounter) @@ -1324,7 +1325,7 @@ proc pubsubSubscribe*(api: DaemonAPI, topic: string, ticket.topic = topic ticket.handler = handler ticket.transp = transp - asyncCheck pubsubLoop(api, ticket) + asyncSpawn pubsubLoop(api, ticket) result = ticket except Exception as exc: await api.closeConnection(transp) diff --git a/libp2p/errors.nim b/libp2p/errors.nim index 7fa50d988..83d41162a 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -8,14 +8,17 @@ import macros type # Base exception type for libp2p LPError* = object of CatchableError - LPAllFuturesError* = object of LPError - errors*: seq[ref CatchableError] -# could not figure how to make it with a simple template +func toException*(e: cstring): ref LPError = + (ref LPError)(msg: $e) + +func toException*(e: string): ref LPError = + (ref LPError)(msg: e) + +# TODO: could not figure how to make it with a simple template # sadly nim needs more love for hygienic templates # so here goes the macro, its based on the proc/template version # and uses quote do so it's quite readable - macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped = let nexclude = exclude.len case nexclude diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index b28e6bcfa..cb6d42af4 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -909,20 +909,23 @@ proc append*(m1: var MultiAddress, m2: MultiAddress): MaResult[void] = ok() proc `&`*(m1, m2: MultiAddress): MultiAddress {. - raises: [Defect, ResultError[string]].} = + raises: [Defect, LPError].} = ## Concatenates two addresses ``m1`` and ``m2``, and returns result. ## ## This procedure performs validation of concatenated result and can raise ## exception on error. + ## + concat(m1, m2).tryGet() proc `&=`*(m1: var MultiAddress, m2: MultiAddress) {. - raises: [Defect, ResultError[string]].} = + raises: [Defect, LPError].} = ## Concatenates two addresses ``m1`` and ``m2``. ## ## This procedure performs validation of concatenated result and can raise ## exception on error. ## + m1.append(m2).tryGet() proc isWire*(ma: MultiAddress): bool = diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 3c3306991..708a8a1e2 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -29,6 +29,8 @@ const type Matcher* = proc (proto: string): bool {.gcsafe, raises: [Defect].} + MultiStreamError* = object of LPError + HandlerHolder* = object protos*: seq[string] protocol*: LPProtocol @@ -46,7 +48,7 @@ template validateSuffix(str: string): untyped = if str.endsWith("\n"): str.removeSuffix("\n") else: - raise newException(CatchableError, "MultistreamSelect failed, malformed message") + raise newException(MultiStreamError, "MultistreamSelect failed, malformed message") proc select*(m: MultistreamSelect, conn: Connection, @@ -64,7 +66,7 @@ proc select*(m: MultistreamSelect, if s != Codec: notice "handshake failed", conn, codec = s - raise newException(CatchableError, "MultistreamSelect handshake failed") + raise newException(MultiStreamError, "MultistreamSelect handshake failed") else: trace "multistream handshake success", conn diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index c7f946b3b..fcf0dcf0b 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -53,12 +53,6 @@ func shortLog*(p: PeerInfo): auto = ) chronicles.formatIt(PeerInfo): shortLog(it) -func toException*(e: string): ref PeerInfoError = - (ref PeerInfoError)(msg: e) - -func toException*(e: cstring): ref PeerInfoError = - (ref PeerInfoError)(msg: $e) - template postInit(peerinfo: PeerInfo, addrs: openarray[MultiAddress], protocols: openarray[string]) = diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 3136d2193..79dd73c07 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -163,7 +163,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} = try: let newConn = await p.getConn() if newConn.isNil: - raise (ref CatchableError)(msg: "Cannot establish send connection") + raise (ref LPError)(msg: "Cannot establish send connection") # When the send channel goes up, subscriptions need to be sent to the # remote peer - if we had multiple channels up and one goes down, all diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index 77f6057c8..d99653584 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -29,12 +29,12 @@ declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages") func defaultMsgIdProvider*(m: Message): MessageID = - let mid = + let mid = if m.seqno.len > 0 and m.fromPeer.data.len > 0: byteutils.toHex(m.seqno) & $m.fromPeer else: # This part is irrelevant because it's not standard, - # We use it exclusively for testing basically and users should + # We use it exclusively for testing basically and users should # implement their own logic in the case they use anonymization $m.data.hash & $m.topicIDs.hash mid.toBytes() @@ -65,7 +65,8 @@ proc init*( data: seq[byte], topic: string, seqno: Option[uint64], - sign: bool = true): Message {.gcsafe, raises: [CatchableError, Defect].} = + sign: bool = true): Message + {.gcsafe, raises: [Defect, LPError].} = var msg = Message(data: data, topicIDs: @[topic]) # order matters, we want to include seqno in the signature @@ -77,10 +78,15 @@ proc init*( msg.fromPeer = peer.peerId if sign: if peer.keyType != KeyType.HasPrivate: - raise (ref CatchableError)(msg: "Cannot sign message without private key") - msg.signature = sign(msg, peer.privateKey).tryGet() - msg.key = peer.privateKey.getKey().tryGet().getBytes().tryGet() + raise (ref LPError)(msg: "Cannot sign message without private key") + + msg.signature = sign(msg, peer.privateKey).expect("Couldn't sign message!") + msg.key = peer.privateKey + .getKey() + .expect("Expected a Private Key!") + .getBytes() + .expect("Couldn't get Private Key bytes!") elif sign: - raise (ref CatchableError)(msg: "Cannot sign message without peer info") + raise (ref LPError)(msg: "Cannot sign message without peer info") msg diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index bff5aee34..e56254868 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -84,16 +84,16 @@ proc getStreamTracker(name: string): StreamTracker {.gcsafe.} = if isNil(result): result = setupStreamTracker(name) -proc newLPStreamReadError*(p: ref CatchableError): ref CatchableError = +proc newLPStreamReadError*(p: ref CatchableError): ref LPStreamReadError = var w = newException(LPStreamReadError, "Read stream failed") w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg w.par = p result = w -proc newLPStreamReadError*(msg: string): ref CatchableError = +proc newLPStreamReadError*(msg: string): ref LPStreamReadError = newException(LPStreamReadError, msg) -proc newLPStreamWriteError*(p: ref CatchableError): ref CatchableError = +proc newLPStreamWriteError*(p: ref CatchableError): ref LPStreamWriteError = var w = newException(LPStreamWriteError, "Write stream failed") w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg w.par = p diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 37a7f1324..d8cd7f99e 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -279,13 +279,13 @@ proc dial*(s: Switch, Future[Connection] = dial(s, peerId, addrs, @[proto]) proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil) - {.gcsafe, raises: [Defect, CatchableError].} = + {.gcsafe, raises: [Defect, LPError].} = if isNil(proto.handler): - raise newException(CatchableError, + raise newException(LPError, "Protocol has to define a handle method or proc") if proto.codec.len == 0: - raise newException(CatchableError, + raise newException(LPError, "Protocol has to define a codec string") s.ms.addHandler(proto.codecs, proto, matcher) @@ -408,10 +408,10 @@ proc newSwitch*(peerInfo: PeerInfo, maxIn = -1, maxOut = -1, maxConnsPerPeer = MaxConnectionsPerPeer): Switch - {.raises: [Defect, CatchableError].} = + {.raises: [Defect, LPError].} = if secureManagers.len == 0: - raise (ref CatchableError)(msg: "Provide at least one secure manager") + raise (ref LPError)(msg: "Provide at least one secure manager") let ms = newMultistream() let connManager = ConnManager.init(maxConnsPerPeer, maxConnections, maxIn, maxOut) diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index 9f9e79223..5eed89e69 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -20,14 +20,15 @@ logScope: topics = "libp2p transport" type - TransportClosedError* = object of CatchableError + TransportError* = object of LPError + TransportClosedError* = object of TransportError Transport* = ref object of RootObj ma*: Multiaddress multicodec*: MultiCodec running*: bool -proc newTransportClosedError*(parent: ref Exception = nil): ref CatchableError = +proc newTransportClosedError*(parent: ref Exception = nil): ref LPError = newException(TransportClosedError, "Transport closed, no more connections!", parent) diff --git a/libp2p/wire.nim b/libp2p/wire.nim index f421a4d75..845e9d70f 100644 --- a/libp2p/wire.nim +++ b/libp2p/wire.nim @@ -11,7 +11,7 @@ ## This module implements wire network connection procedures. import chronos, stew/endians2 -import multiaddress, multicodec +import multiaddress, multicodec, errors when defined(windows): import winlean @@ -30,8 +30,7 @@ const mapAnd(mapEq("unix")) ) -proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] - {.raises: [Defect, ResultError[string]].} = +proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] = ## Initialize ``TransportAddress`` with MultiAddress ``ma``. ## ## MultiAddress must be wire address, e.g. ``{IP4, IP6, UNIX}/{TCP, UDP}``. @@ -39,30 +38,30 @@ proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] if TRANSPMA.match(ma): var pbuf: array[2, byte] - let code = ma[0].tryGet().protoCode().tryGet() + let code = (?(?ma[0]).protoCode()) if code == multiCodec("unix"): var res = TransportAddress(family: AddressFamily.Unix) - if ma[0].tryGet().protoArgument(res.address_un).tryGet() == 0: + if (?(?ma[0]).protoArgument(res.address_un)) == 0: err("Incorrect Unix domain address") else: res.port = Port(1) ok(res) elif code == multiCodec("ip4"): var res = TransportAddress(family: AddressFamily.IPv4) - if ma[0].tryGet().protoArgument(res.address_v4).tryGet() == 0: + if (?(?ma[0]).protoArgument(res.address_v4)) == 0: err("Incorrect IPv4 address") else: - if ma[1].tryGet().protoArgument(pbuf).tryGet() == 0: + if (?(?ma[1]).protoArgument(pbuf)) == 0: err("Incorrect port number") else: res.port = Port(fromBytesBE(uint16, pbuf)) ok(res) else: var res = TransportAddress(family: AddressFamily.IPv6) - if ma[0].tryGet().protoArgument(res.address_v6).tryGet() == 0: + if (?(?ma[0]).protoArgument(res.address_v6)) == 0: err("Incorrect IPv6 address") else: - if ma[1].tryGet().protoArgument(pbuf).tryGet() == 0: + if (?(?ma[1]).protoArgument(pbuf)) == 0: err("Incorrect port number") else: res.port = Port(fromBytesBE(uint16, pbuf)) @@ -70,16 +69,20 @@ proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] else: err("MultiAddress must be wire address (tcp, udp or unix)") -proc connect*(ma: MultiAddress, bufferSize = DefaultStreamBufferSize, - child: StreamTransport = nil): Future[StreamTransport] {.async.} = +proc connect*( + ma: MultiAddress, + bufferSize = DefaultStreamBufferSize, + child: StreamTransport = nil): Future[StreamTransport] + {.raises: [Defect, LPError, MaInvalidAddress].} = ## Open new connection to remote peer with address ``ma`` and create ## new transport object ``StreamTransport`` for established connection. ## ``bufferSize`` is size of internal buffer for transport. + ## + if not(RTRANSPMA.match(ma)): raise newException(MaInvalidAddress, "Incorrect or unsupported address!") - let address = initTAddress(ma).tryGet() - result = await connect(address, bufferSize, child) + return connect(initTAddress(ma).tryGet(), bufferSize, child) proc createStreamServer*[T](ma: MultiAddress, cbproc: StreamCallback, @@ -90,24 +93,24 @@ proc createStreamServer*[T](ma: MultiAddress, bufferSize: int = DefaultStreamBufferSize, child: StreamServer = nil, init: TransportInitCallback = nil): StreamServer - {.raises: [Defect, MaInvalidAddress].} = + {.raises: [Defect, LPError, MaInvalidAddress].} = ## Create new TCP stream server which bounds to ``ma`` address. if not(RTRANSPMA.match(ma)): raise newException(MaInvalidAddress, "Incorrect or unsupported address!") - let address = try: - initTAddress(ma) - except ResultError[string] as exc: - raise newException(Defect, exc.msg) - - if address.isErr: - raise newException(MaInvalidAddress, address.error) - try: - return createStreamServer(address.get(), cbproc, flags, udata, sock, - backlog, bufferSize, child, init) + return createStreamServer( + initTAddress(ma).tryGet(), + cbproc, + flags, + udata, + sock, + backlog, + bufferSize, + child, + init) except CatchableError as exc: - raise newException(Defect, exc.msg) + raise newException(LPError, exc.msg) proc createStreamServer*[T](ma: MultiAddress, flags: set[ServerFlags] = {}, @@ -117,41 +120,41 @@ proc createStreamServer*[T](ma: MultiAddress, bufferSize: int = DefaultStreamBufferSize, child: StreamServer = nil, init: TransportInitCallback = nil): StreamServer - {.raises: [Defect, MaInvalidAddress].} = + {.raises: [Defect, LPError, MaInvalidAddress].} = ## Create new TCP stream server which bounds to ``ma`` address. ## if not(RTRANSPMA.match(ma)): raise newException(MaInvalidAddress, "Incorrect or unsupported address!") - let address = try: - initTAddress(ma) - except ResultError[string] as exc: - raise newException(Defect, exc.msg) - try: - return createStreamServer(address.get(), flags, udata, sock, backlog, - bufferSize, child, init) + return createStreamServer( + initTAddress(ma).tryGet(), + flags, + udata, + sock, + backlog, + bufferSize, + child, + init) except CatchableError as exc: - raise newException(Defect, exc.msg) + raise newException(LPError, exc.msg) proc createAsyncSocket*(ma: MultiAddress): AsyncFD - {.raises: [Defect, ResultError[string]].} = + {.raises: [Defect, LPError].} = ## Create new asynchronous socket using MultiAddress' ``ma`` socket type and ## protocol information. ## ## Returns ``asyncInvalidSocket`` on error. ## ## Note: This procedure only used in `go-libp2p-daemon` wrapper. + ## + var socktype: SockType = SockType.SOCK_STREAM protocol: Protocol = Protocol.IPPROTO_TCP - let maddr = initTAddress(ma) - if maddr.isErr(): - return asyncInvalidSocket - - let address = maddr.tryGet() + let address = initTAddress(ma).tryGet() if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}: if ma[1].tryGet().protoCode().tryGet() == multiCodec("udp"): socktype = SockType.SOCK_DGRAM @@ -168,22 +171,20 @@ proc createAsyncSocket*(ma: MultiAddress): AsyncFD try: createAsyncSocket(address.getDomain(), socktype, protocol) except CatchableError as exc: - raise newException(Defect, exc.msg) + raise newException(LPError, exc.msg) proc bindAsyncSocket*(sock: AsyncFD, ma: MultiAddress): bool - {.raises: [Defect, ResultError[string]].} = + {.raises: [Defect, LPError].} = ## Bind socket ``sock`` to MultiAddress ``ma``. ## ## Note: This procedure only used in `go-libp2p-daemon` wrapper. + ## + var saddr: Sockaddr_storage slen: SockLen - let maddr = initTAddress(ma) - if maddr.isErr(): - return false - - let address = maddr.get() + let address = initTAddress(ma).tryGet() toSAddr(address, saddr, slen) if bindSocket(SocketHandle(sock), cast[ptr SockAddr](addr saddr), slen) == 0: