use LPError more consistently (#582)

* use LPError more consistently

* don't use Exceptino

* annotate with raises

* don't panic on concatenation

* further rework error handling
This commit is contained in:
Dmitriy Ryajov 2021-06-02 07:39:10 -06:00 committed by GitHub
parent 1b94c3feda
commit 3da656687b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 110 additions and 102 deletions

View File

@ -168,19 +168,16 @@ proc build*(b: SwitchBuilder): Switch
if isNil(b.rng):
b.rng = newRng()
let switch = try:
newSwitch(
peerInfo = peerInfo,
transports = transports,
identity = identify,
muxers = muxers,
secureManagers = secureManagerInstances,
maxConnections = b.maxConnections,
maxIn = b.maxIn,
maxOut = b.maxOut,
maxConnsPerPeer = b.maxConnsPerPeer)
except CatchableError as exc:
raise newException(Defect, exc.msg)
let switch = newSwitch(
peerInfo = peerInfo,
transports = transports,
identity = identify,
muxers = muxers,
secureManagers = secureManagerInstances,
maxConnections = b.maxConnections,
maxIn = b.maxIn,
maxOut = b.maxOut,
maxConnsPerPeer = b.maxConnsPerPeer)
return switch

View File

@ -375,18 +375,18 @@ proc selectMuxer*(c: ConnManager, conn: Connection): Muxer =
debug "no muxer for connection", conn
proc storeConn*(c: ConnManager, conn: Connection)
{.raises: [Defect, CatchableError].} =
{.raises: [Defect, LPError].} =
## store a connection
##
if isNil(conn):
raise newException(CatchableError, "Connection cannot be nil")
raise newException(LPError, "Connection cannot be nil")
if conn.closed or conn.atEof:
raise newException(CatchableError, "Connection closed or EOF")
raise newException(LPError, "Connection closed or EOF")
if isNil(conn.peerInfo):
raise newException(CatchableError, "Empty peer info")
raise newException(LPError, "Empty peer info")
let peerId = conn.peerInfo.peerId
if c.conns.getOrDefault(peerId).len > c.maxConnsPerPeer:

View File

@ -14,10 +14,10 @@ import std/[os, osproc, strutils, tables, strtabs]
import pkg/[chronos, chronicles]
import ../varint, ../multiaddress, ../multicodec, ../cid, ../peerid
import ../wire, ../multihash, ../protobuf/minprotobuf
import ../crypto/crypto
import ../crypto/crypto, ../errors
export
peerid, multiaddress, multicodec, multihash, cid, crypto, wire
peerid, multiaddress, multicodec, multihash, cid, crypto, wire, errors
when not defined(windows):
import posix
@ -501,7 +501,8 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} =
result = buffer
proc newConnection*(api: DaemonAPI): Future[StreamTransport] =
proc newConnection*(api: DaemonAPI): Future[StreamTransport]
{.raises: [Defect, LPError].} =
result = connect(api.address)
proc closeConnection*(api: DaemonAPI, transp: StreamTransport): Future[void] =
@ -936,10 +937,10 @@ proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
if len(stream.protocol) > 0:
var handler = api.handlers.getOrDefault(stream.protocol)
if not isNil(handler):
asyncCheck handler(api, stream)
asyncSpawn handler(api, stream)
proc addHandler*(api: DaemonAPI, protocols: seq[string],
handler: P2PStreamCallback) {.async.} =
handler: P2PStreamCallback) {.async, raises: [Defect, LPError].} =
## Add stream handler ``handler`` for set of protocols ``protocols``.
var transp = await api.newConnection()
let maddress = await getSocket(api.pattern, addr api.ucounter)
@ -1324,7 +1325,7 @@ proc pubsubSubscribe*(api: DaemonAPI, topic: string,
ticket.topic = topic
ticket.handler = handler
ticket.transp = transp
asyncCheck pubsubLoop(api, ticket)
asyncSpawn pubsubLoop(api, ticket)
result = ticket
except Exception as exc:
await api.closeConnection(transp)

View File

@ -8,14 +8,17 @@ import macros
type
# Base exception type for libp2p
LPError* = object of CatchableError
LPAllFuturesError* = object of LPError
errors*: seq[ref CatchableError]
# could not figure how to make it with a simple template
func toException*(e: cstring): ref LPError =
(ref LPError)(msg: $e)
func toException*(e: string): ref LPError =
(ref LPError)(msg: e)
# TODO: could not figure how to make it with a simple template
# sadly nim needs more love for hygienic templates
# so here goes the macro, its based on the proc/template version
# and uses quote do so it's quite readable
macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped =
let nexclude = exclude.len
case nexclude

View File

@ -909,20 +909,23 @@ proc append*(m1: var MultiAddress, m2: MultiAddress): MaResult[void] =
ok()
proc `&`*(m1, m2: MultiAddress): MultiAddress {.
raises: [Defect, ResultError[string]].} =
raises: [Defect, LPError].} =
## Concatenates two addresses ``m1`` and ``m2``, and returns result.
##
## This procedure performs validation of concatenated result and can raise
## exception on error.
##
concat(m1, m2).tryGet()
proc `&=`*(m1: var MultiAddress, m2: MultiAddress) {.
raises: [Defect, ResultError[string]].} =
raises: [Defect, LPError].} =
## Concatenates two addresses ``m1`` and ``m2``.
##
## This procedure performs validation of concatenated result and can raise
## exception on error.
##
m1.append(m2).tryGet()
proc isWire*(ma: MultiAddress): bool =

View File

@ -29,6 +29,8 @@ const
type
Matcher* = proc (proto: string): bool {.gcsafe, raises: [Defect].}
MultiStreamError* = object of LPError
HandlerHolder* = object
protos*: seq[string]
protocol*: LPProtocol
@ -46,7 +48,7 @@ template validateSuffix(str: string): untyped =
if str.endsWith("\n"):
str.removeSuffix("\n")
else:
raise newException(CatchableError, "MultistreamSelect failed, malformed message")
raise newException(MultiStreamError, "MultistreamSelect failed, malformed message")
proc select*(m: MultistreamSelect,
conn: Connection,
@ -64,7 +66,7 @@ proc select*(m: MultistreamSelect,
if s != Codec:
notice "handshake failed", conn, codec = s
raise newException(CatchableError, "MultistreamSelect handshake failed")
raise newException(MultiStreamError, "MultistreamSelect handshake failed")
else:
trace "multistream handshake success", conn

View File

@ -53,12 +53,6 @@ func shortLog*(p: PeerInfo): auto =
)
chronicles.formatIt(PeerInfo): shortLog(it)
func toException*(e: string): ref PeerInfoError =
(ref PeerInfoError)(msg: e)
func toException*(e: cstring): ref PeerInfoError =
(ref PeerInfoError)(msg: $e)
template postInit(peerinfo: PeerInfo,
addrs: openarray[MultiAddress],
protocols: openarray[string]) =

View File

@ -163,7 +163,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
try:
let newConn = await p.getConn()
if newConn.isNil:
raise (ref CatchableError)(msg: "Cannot establish send connection")
raise (ref LPError)(msg: "Cannot establish send connection")
# When the send channel goes up, subscriptions need to be sent to the
# remote peer - if we had multiple channels up and one goes down, all

View File

@ -29,12 +29,12 @@ declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated
declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
func defaultMsgIdProvider*(m: Message): MessageID =
let mid =
let mid =
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
byteutils.toHex(m.seqno) & $m.fromPeer
else:
# This part is irrelevant because it's not standard,
# We use it exclusively for testing basically and users should
# We use it exclusively for testing basically and users should
# implement their own logic in the case they use anonymization
$m.data.hash & $m.topicIDs.hash
mid.toBytes()
@ -65,7 +65,8 @@ proc init*(
data: seq[byte],
topic: string,
seqno: Option[uint64],
sign: bool = true): Message {.gcsafe, raises: [CatchableError, Defect].} =
sign: bool = true): Message
{.gcsafe, raises: [Defect, LPError].} =
var msg = Message(data: data, topicIDs: @[topic])
# order matters, we want to include seqno in the signature
@ -77,10 +78,15 @@ proc init*(
msg.fromPeer = peer.peerId
if sign:
if peer.keyType != KeyType.HasPrivate:
raise (ref CatchableError)(msg: "Cannot sign message without private key")
msg.signature = sign(msg, peer.privateKey).tryGet()
msg.key = peer.privateKey.getKey().tryGet().getBytes().tryGet()
raise (ref LPError)(msg: "Cannot sign message without private key")
msg.signature = sign(msg, peer.privateKey).expect("Couldn't sign message!")
msg.key = peer.privateKey
.getKey()
.expect("Expected a Private Key!")
.getBytes()
.expect("Couldn't get Private Key bytes!")
elif sign:
raise (ref CatchableError)(msg: "Cannot sign message without peer info")
raise (ref LPError)(msg: "Cannot sign message without peer info")
msg

View File

@ -84,16 +84,16 @@ proc getStreamTracker(name: string): StreamTracker {.gcsafe.} =
if isNil(result):
result = setupStreamTracker(name)
proc newLPStreamReadError*(p: ref CatchableError): ref CatchableError =
proc newLPStreamReadError*(p: ref CatchableError): ref LPStreamReadError =
var w = newException(LPStreamReadError, "Read stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
w.par = p
result = w
proc newLPStreamReadError*(msg: string): ref CatchableError =
proc newLPStreamReadError*(msg: string): ref LPStreamReadError =
newException(LPStreamReadError, msg)
proc newLPStreamWriteError*(p: ref CatchableError): ref CatchableError =
proc newLPStreamWriteError*(p: ref CatchableError): ref LPStreamWriteError =
var w = newException(LPStreamWriteError, "Write stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
w.par = p

View File

@ -279,13 +279,13 @@ proc dial*(s: Switch,
Future[Connection] = dial(s, peerId, addrs, @[proto])
proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil)
{.gcsafe, raises: [Defect, CatchableError].} =
{.gcsafe, raises: [Defect, LPError].} =
if isNil(proto.handler):
raise newException(CatchableError,
raise newException(LPError,
"Protocol has to define a handle method or proc")
if proto.codec.len == 0:
raise newException(CatchableError,
raise newException(LPError,
"Protocol has to define a codec string")
s.ms.addHandler(proto.codecs, proto, matcher)
@ -408,10 +408,10 @@ proc newSwitch*(peerInfo: PeerInfo,
maxIn = -1,
maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer): Switch
{.raises: [Defect, CatchableError].} =
{.raises: [Defect, LPError].} =
if secureManagers.len == 0:
raise (ref CatchableError)(msg: "Provide at least one secure manager")
raise (ref LPError)(msg: "Provide at least one secure manager")
let ms = newMultistream()
let connManager = ConnManager.init(maxConnsPerPeer, maxConnections, maxIn, maxOut)

View File

@ -20,14 +20,15 @@ logScope:
topics = "libp2p transport"
type
TransportClosedError* = object of CatchableError
TransportError* = object of LPError
TransportClosedError* = object of TransportError
Transport* = ref object of RootObj
ma*: Multiaddress
multicodec*: MultiCodec
running*: bool
proc newTransportClosedError*(parent: ref Exception = nil): ref CatchableError =
proc newTransportClosedError*(parent: ref Exception = nil): ref LPError =
newException(TransportClosedError,
"Transport closed, no more connections!", parent)

View File

@ -11,7 +11,7 @@
## This module implements wire network connection procedures.
import chronos, stew/endians2
import multiaddress, multicodec
import multiaddress, multicodec, errors
when defined(windows):
import winlean
@ -30,8 +30,7 @@ const
mapAnd(mapEq("unix"))
)
proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress]
{.raises: [Defect, ResultError[string]].} =
proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
## Initialize ``TransportAddress`` with MultiAddress ``ma``.
##
## MultiAddress must be wire address, e.g. ``{IP4, IP6, UNIX}/{TCP, UDP}``.
@ -39,30 +38,30 @@ proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress]
if TRANSPMA.match(ma):
var pbuf: array[2, byte]
let code = ma[0].tryGet().protoCode().tryGet()
let code = (?(?ma[0]).protoCode())
if code == multiCodec("unix"):
var res = TransportAddress(family: AddressFamily.Unix)
if ma[0].tryGet().protoArgument(res.address_un).tryGet() == 0:
if (?(?ma[0]).protoArgument(res.address_un)) == 0:
err("Incorrect Unix domain address")
else:
res.port = Port(1)
ok(res)
elif code == multiCodec("ip4"):
var res = TransportAddress(family: AddressFamily.IPv4)
if ma[0].tryGet().protoArgument(res.address_v4).tryGet() == 0:
if (?(?ma[0]).protoArgument(res.address_v4)) == 0:
err("Incorrect IPv4 address")
else:
if ma[1].tryGet().protoArgument(pbuf).tryGet() == 0:
if (?(?ma[1]).protoArgument(pbuf)) == 0:
err("Incorrect port number")
else:
res.port = Port(fromBytesBE(uint16, pbuf))
ok(res)
else:
var res = TransportAddress(family: AddressFamily.IPv6)
if ma[0].tryGet().protoArgument(res.address_v6).tryGet() == 0:
if (?(?ma[0]).protoArgument(res.address_v6)) == 0:
err("Incorrect IPv6 address")
else:
if ma[1].tryGet().protoArgument(pbuf).tryGet() == 0:
if (?(?ma[1]).protoArgument(pbuf)) == 0:
err("Incorrect port number")
else:
res.port = Port(fromBytesBE(uint16, pbuf))
@ -70,16 +69,20 @@ proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress]
else:
err("MultiAddress must be wire address (tcp, udp or unix)")
proc connect*(ma: MultiAddress, bufferSize = DefaultStreamBufferSize,
child: StreamTransport = nil): Future[StreamTransport] {.async.} =
proc connect*(
ma: MultiAddress,
bufferSize = DefaultStreamBufferSize,
child: StreamTransport = nil): Future[StreamTransport]
{.raises: [Defect, LPError, MaInvalidAddress].} =
## Open new connection to remote peer with address ``ma`` and create
## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` is size of internal buffer for transport.
##
if not(RTRANSPMA.match(ma)):
raise newException(MaInvalidAddress, "Incorrect or unsupported address!")
let address = initTAddress(ma).tryGet()
result = await connect(address, bufferSize, child)
return connect(initTAddress(ma).tryGet(), bufferSize, child)
proc createStreamServer*[T](ma: MultiAddress,
cbproc: StreamCallback,
@ -90,24 +93,24 @@ proc createStreamServer*[T](ma: MultiAddress,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil): StreamServer
{.raises: [Defect, MaInvalidAddress].} =
{.raises: [Defect, LPError, MaInvalidAddress].} =
## Create new TCP stream server which bounds to ``ma`` address.
if not(RTRANSPMA.match(ma)):
raise newException(MaInvalidAddress, "Incorrect or unsupported address!")
let address = try:
initTAddress(ma)
except ResultError[string] as exc:
raise newException(Defect, exc.msg)
if address.isErr:
raise newException(MaInvalidAddress, address.error)
try:
return createStreamServer(address.get(), cbproc, flags, udata, sock,
backlog, bufferSize, child, init)
return createStreamServer(
initTAddress(ma).tryGet(),
cbproc,
flags,
udata,
sock,
backlog,
bufferSize,
child,
init)
except CatchableError as exc:
raise newException(Defect, exc.msg)
raise newException(LPError, exc.msg)
proc createStreamServer*[T](ma: MultiAddress,
flags: set[ServerFlags] = {},
@ -117,41 +120,41 @@ proc createStreamServer*[T](ma: MultiAddress,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil): StreamServer
{.raises: [Defect, MaInvalidAddress].} =
{.raises: [Defect, LPError, MaInvalidAddress].} =
## Create new TCP stream server which bounds to ``ma`` address.
##
if not(RTRANSPMA.match(ma)):
raise newException(MaInvalidAddress, "Incorrect or unsupported address!")
let address = try:
initTAddress(ma)
except ResultError[string] as exc:
raise newException(Defect, exc.msg)
try:
return createStreamServer(address.get(), flags, udata, sock, backlog,
bufferSize, child, init)
return createStreamServer(
initTAddress(ma).tryGet(),
flags,
udata,
sock,
backlog,
bufferSize,
child,
init)
except CatchableError as exc:
raise newException(Defect, exc.msg)
raise newException(LPError, exc.msg)
proc createAsyncSocket*(ma: MultiAddress): AsyncFD
{.raises: [Defect, ResultError[string]].} =
{.raises: [Defect, LPError].} =
## Create new asynchronous socket using MultiAddress' ``ma`` socket type and
## protocol information.
##
## Returns ``asyncInvalidSocket`` on error.
##
## Note: This procedure only used in `go-libp2p-daemon` wrapper.
##
var
socktype: SockType = SockType.SOCK_STREAM
protocol: Protocol = Protocol.IPPROTO_TCP
let maddr = initTAddress(ma)
if maddr.isErr():
return asyncInvalidSocket
let address = maddr.tryGet()
let address = initTAddress(ma).tryGet()
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
if ma[1].tryGet().protoCode().tryGet() == multiCodec("udp"):
socktype = SockType.SOCK_DGRAM
@ -168,22 +171,20 @@ proc createAsyncSocket*(ma: MultiAddress): AsyncFD
try:
createAsyncSocket(address.getDomain(), socktype, protocol)
except CatchableError as exc:
raise newException(Defect, exc.msg)
raise newException(LPError, exc.msg)
proc bindAsyncSocket*(sock: AsyncFD, ma: MultiAddress): bool
{.raises: [Defect, ResultError[string]].} =
{.raises: [Defect, LPError].} =
## Bind socket ``sock`` to MultiAddress ``ma``.
##
## Note: This procedure only used in `go-libp2p-daemon` wrapper.
##
var
saddr: Sockaddr_storage
slen: SockLen
let maddr = initTAddress(ma)
if maddr.isErr():
return false
let address = maddr.get()
let address = initTAddress(ma).tryGet()
toSAddr(address, saddr, slen)
if bindSocket(SocketHandle(sock), cast[ptr SockAddr](addr saddr),
slen) == 0: