handle a few exceptions (#170)
* handle a few exceptions Some of these are maybe too aggressive, but in return, they'll log their exception - more refactoring needed to sort this out - right now we get crashes on unhandled exceptions of unknown origin * during connection setup * while closing channels * while processing pubsubs * catch exceptions that are raised and don't try to catch exceptions that are not raised * propagate cancellederror * one more * more * more * make interop tests less fragile * Raise expiration time in gossipsub fanout test for slow CI Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com> Co-authored-by: Giovanni Petrantoni <giovanni@fragcolor.xyz>
This commit is contained in:
parent
100d6ef595
commit
69abf5097d
|
@ -127,7 +127,12 @@ method close*(s: Connection) {.async, gcsafe.} =
|
|||
|
||||
if not isNil(s.stream) and not s.stream.closed:
|
||||
trace "closing child stream", closed = s.closed, conn = $s
|
||||
await s.stream.close()
|
||||
try:
|
||||
await s.stream.close()
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Error while closing child stream", err = exc.msg
|
||||
|
||||
s.closeEvent.fire()
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ proc select*(m: MultistreamSelect,
|
|||
result = cast[string]((await conn.readLp(1024))) # read ms header
|
||||
result.removeSuffix("\n")
|
||||
if result != Codec:
|
||||
error "handshake failed", codec = result.toHex()
|
||||
notice "handshake failed", codec = result.toHex()
|
||||
raise newMultistreamHandshakeException()
|
||||
|
||||
if proto.len() == 0: # no protocols, must be a handshake call
|
||||
|
|
|
@ -50,26 +50,34 @@ proc writeMsg*(conn: Connection,
|
|||
trace "sending data over mplex", id,
|
||||
msgType,
|
||||
data = data.len
|
||||
var
|
||||
try:
|
||||
var
|
||||
left = data.len
|
||||
offset = 0
|
||||
while left > 0 or data.len == 0:
|
||||
let
|
||||
chunkSize = if left > MaxMsgSize: MaxMsgSize - 64 else: left
|
||||
chunk = if chunkSize > 0 : data[offset..(offset + chunkSize - 1)] else: data
|
||||
## write lenght prefixed
|
||||
var buf = initVBuffer()
|
||||
buf.writePBVarint(id shl 3 or ord(msgType).uint64)
|
||||
buf.writePBVarint(chunkSize.uint64) # size should be always sent
|
||||
buf.finish()
|
||||
left = left - chunkSize
|
||||
offset = offset + chunkSize
|
||||
try:
|
||||
while left > 0 or data.len == 0:
|
||||
let
|
||||
chunkSize = if left > MaxMsgSize: MaxMsgSize - 64 else: left
|
||||
chunk = if chunkSize > 0 : data[offset..(offset + chunkSize - 1)] else: data
|
||||
## write lenght prefixed
|
||||
var buf = initVBuffer()
|
||||
buf.writePBVarint(id shl 3 or ord(msgType).uint64)
|
||||
buf.writePBVarint(chunkSize.uint64) # size should be always sent
|
||||
buf.finish()
|
||||
left = left - chunkSize
|
||||
offset = offset + chunkSize
|
||||
await conn.write(buf.buffer & chunk)
|
||||
except LPStreamIncompleteError as exc:
|
||||
trace "unable to send message", exc = exc.msg
|
||||
if data.len == 0:
|
||||
return
|
||||
|
||||
if data.len == 0:
|
||||
return
|
||||
except LPStreamEOFError:
|
||||
trace "Ignoring EOF while writing"
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
# TODO these exceptions are ignored since it's likely that if writes are
|
||||
# are failing, the underlying connection is already closed - this needs
|
||||
# more cleanup though
|
||||
debug "Could not write to connection", msg = exc.msg
|
||||
|
||||
proc writeMsg*(conn: Connection,
|
||||
id: uint64,
|
||||
|
|
|
@ -92,14 +92,7 @@ proc open*(s: LPChannel): Future[void] =
|
|||
|
||||
method close*(s: LPChannel) {.async, gcsafe.} =
|
||||
s.closedLocal = true
|
||||
# If remote is closed
|
||||
# EOF will happepn here
|
||||
# We can safely ignore in that case
|
||||
# s.closed won't be true sadly
|
||||
try:
|
||||
await s.closeMessage()
|
||||
except LPStreamEOFError:
|
||||
discard
|
||||
await s.closeMessage()
|
||||
|
||||
proc resetMessage(s: LPChannel) {.async.} =
|
||||
await s.conn.writeMsg(s.id, s.resetCode)
|
||||
|
@ -163,13 +156,11 @@ method readOnce*(s: LPChannel,
|
|||
result = await procCall readOnce(BufferStream(s), pbytes, nbytes)
|
||||
await s.tryCleanup()
|
||||
|
||||
template writePrefix: untyped =
|
||||
method write*(s: LPChannel, msg: seq[byte]) {.async.} =
|
||||
if s.closedLocal or s.isReset:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
if s.isLazy and not s.isOpen:
|
||||
await s.open()
|
||||
|
||||
method write*(s: LPChannel, msg: seq[byte]) {.async.} =
|
||||
writePrefix()
|
||||
await procCall write(BufferStream(s), msg)
|
||||
|
|
|
@ -201,7 +201,17 @@ method publish*(p: PubSub,
|
|||
if p.triggerSelf and topic in p.topics:
|
||||
for h in p.topics[topic].handler:
|
||||
trace "triggering handler", topicID = topic
|
||||
await h(topic, data)
|
||||
try:
|
||||
await h(topic, data)
|
||||
except LPStreamEOFError:
|
||||
trace "Ignoring EOF while writing"
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
# TODO these exceptions are ignored since it's likely that if writes are
|
||||
# are failing, the underlying connection is already closed - this needs
|
||||
# more cleanup though
|
||||
debug "Could not write to pubsub connection", msg = exc.msg
|
||||
|
||||
method initPubSub*(p: PubSub) {.base.} =
|
||||
## perform pubsub initializaion
|
||||
|
|
|
@ -455,8 +455,15 @@ method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async.
|
|||
outbuf &= besize
|
||||
outbuf &= cipher
|
||||
await sconn.stream.write(outbuf)
|
||||
except AsyncStreamWriteError:
|
||||
trace "Could not write to connection"
|
||||
except LPStreamEOFError:
|
||||
trace "Ignoring EOF while writing"
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
# TODO these exceptions are ignored since it's likely that if writes are
|
||||
# are failing, the underlying connection is already closed - this needs
|
||||
# more cleanup though
|
||||
debug "Could not write to connection", msg = exc.msg
|
||||
|
||||
method handshake*(p: Noise, conn: Connection, initiator: bool = false): Future[SecureConn] {.async.} =
|
||||
trace "Starting Noise handshake", initiator
|
||||
|
|
|
@ -236,8 +236,15 @@ method write*(sconn: SecioConn, message: seq[byte]) {.async.} =
|
|||
|
||||
trace "Writing message", message = msg.shortLog, left, offset
|
||||
await sconn.stream.write(msg)
|
||||
except AsyncStreamWriteError:
|
||||
trace "Could not write to connection"
|
||||
except LPStreamEOFError:
|
||||
trace "Ignoring EOF while writing"
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
# TODO these exceptions are ignored since it's likely that if writes are
|
||||
# are failing, the underlying connection is already closed - this needs
|
||||
# more cleanup though
|
||||
debug "Could not write to connection", msg = exc.msg
|
||||
|
||||
proc newSecioConn(conn: Connection,
|
||||
hash: string,
|
||||
|
|
|
@ -67,6 +67,12 @@ method close*(s: ChronosStream) {.async.} =
|
|||
if not s.closed:
|
||||
trace "shutting chronos stream", address = $s.client.remoteAddress()
|
||||
if not s.client.closed():
|
||||
await s.client.closeWait()
|
||||
try:
|
||||
await s.client.closeWait()
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
# Shouldn't happen, but we can't be sure
|
||||
warn "error while closing connection", msg = exc.msg
|
||||
|
||||
s.closeEvent.fire()
|
||||
|
|
|
@ -167,19 +167,25 @@ proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Connection] {.async,
|
|||
result = conn
|
||||
|
||||
proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||
trace "handling connection", conn = $conn
|
||||
result = conn
|
||||
try:
|
||||
trace "handling connection", conn = $conn
|
||||
result = conn
|
||||
|
||||
# don't mux/secure twise
|
||||
if conn.peerInfo.id in s.muxed:
|
||||
return
|
||||
# don't mux/secure twise
|
||||
if conn.peerInfo.id in s.muxed:
|
||||
return
|
||||
|
||||
result = await s.secure(result) # secure the connection
|
||||
if isNil(result):
|
||||
return
|
||||
result = await s.secure(result) # secure the connection
|
||||
if isNil(result):
|
||||
return
|
||||
|
||||
await s.mux(result) # mux it if possible
|
||||
s.connections[conn.peerInfo.id] = result
|
||||
await s.mux(result) # mux it if possible
|
||||
s.connections[conn.peerInfo.id] = result
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Couldn't upgrade outgoing connection", msg = exc.msg
|
||||
return nil
|
||||
|
||||
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
|
||||
trace "upgrading incoming connection", conn = $conn
|
||||
|
@ -189,25 +195,37 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
|
|||
proc securedHandler (conn: Connection,
|
||||
proto: string)
|
||||
{.async, gcsafe, closure.} =
|
||||
trace "Securing connection"
|
||||
let secure = s.secureManagers[proto]
|
||||
let sconn = await secure.secure(conn, false)
|
||||
if not isNil(sconn):
|
||||
try:
|
||||
trace "Securing connection"
|
||||
let secure = s.secureManagers[proto]
|
||||
let sconn = await secure.secure(conn, false)
|
||||
if sconn.isNil:
|
||||
return
|
||||
|
||||
# add the muxer
|
||||
for muxer in s.muxers.values:
|
||||
ms.addHandler(muxer.codec, muxer)
|
||||
|
||||
# handle subsequent requests
|
||||
await ms.handle(sconn)
|
||||
await sconn.close()
|
||||
# handle subsequent requests
|
||||
await ms.handle(sconn)
|
||||
await sconn.close()
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "ending secured handler", err = exc.msg
|
||||
|
||||
if (await ms.select(conn)): # just handshake
|
||||
# add the secure handlers
|
||||
for k in s.secureManagers.keys:
|
||||
ms.addHandler(k, securedHandler)
|
||||
|
||||
# handle secured connections
|
||||
await ms.handle(conn)
|
||||
try:
|
||||
# handle secured connections
|
||||
await ms.handle(conn)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "ending multistream", err = exc.msg
|
||||
|
||||
proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
|
||||
|
||||
|
@ -223,6 +241,8 @@ proc internalConnect(s: Switch,
|
|||
trace "Dialing address", address = $a
|
||||
try:
|
||||
conn = await t.dial(a)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "couldn't dial peer, transport failed", exc = exc.msg, address = a
|
||||
continue
|
||||
|
@ -288,6 +308,8 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
|
|||
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
|
||||
try:
|
||||
await s.upgradeIncoming(conn) # perform upgrade on incoming connection
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "Exception occurred in Switch.start", exc = exc.msg
|
||||
finally:
|
||||
|
@ -333,6 +355,8 @@ proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
|
|||
s.dialedPubSubPeers.incl(peerInfo.id)
|
||||
let conn = await s.dial(peerInfo, s.pubSub.get().codec)
|
||||
await s.pubSub.get().subscribeToPeer(conn)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
warn "unable to initiate pubsub", exc = exc.msg
|
||||
finally:
|
||||
|
|
|
@ -64,7 +64,7 @@ proc cleanup(t: Transport, conn: Connection) {.async.} =
|
|||
proc connHandler*(t: TcpTransport,
|
||||
client: StreamTransport,
|
||||
initiator: bool): Connection =
|
||||
trace "handling connection for", address = $client.remoteAddress
|
||||
trace "handling connection", address = $client.remoteAddress
|
||||
let conn: Connection = newConnection(newChronosStream(client))
|
||||
conn.observedAddrs = MultiAddress.init(client.remoteAddress)
|
||||
if not initiator:
|
||||
|
@ -78,11 +78,24 @@ proc connHandler*(t: TcpTransport,
|
|||
|
||||
proc connCb(server: StreamServer,
|
||||
client: StreamTransport) {.async, gcsafe.} =
|
||||
trace "incomming connection for", address = $client.remoteAddress
|
||||
let t = cast[TcpTransport](server.udata)
|
||||
# we don't need result connection in this case
|
||||
# as it's added inside connHandler
|
||||
discard t.connHandler(client, false)
|
||||
trace "incoming connection", address = $client.remoteAddress
|
||||
try:
|
||||
let t = cast[TcpTransport](server.udata)
|
||||
# we don't need result connection in this case
|
||||
# as it's added inside connHandler
|
||||
discard t.connHandler(client, false)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as err:
|
||||
debug "Connection setup failed", err = err.msg
|
||||
if not client.closed:
|
||||
try:
|
||||
client.close()
|
||||
except CancelledError as err:
|
||||
raise err
|
||||
except CatchableError as err:
|
||||
# shouldn't happen but..
|
||||
warn "Error closing connection", err = err.msg
|
||||
|
||||
method init*(t: TcpTransport) =
|
||||
t.multicodec = multiCodec("tcp")
|
||||
|
|
|
@ -105,14 +105,14 @@ proc testPubSubDaemonPublish(gossip: bool = false,
|
|||
let awaiters = nativeNode.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
var handlerFuture = newFuture[bool]()
|
||||
var finished = false
|
||||
var times = 0
|
||||
proc nativeHandler(topic: string, data: seq[byte]) {.async.} =
|
||||
let smsg = cast[string](data)
|
||||
check smsg == pubsubData
|
||||
times.inc()
|
||||
if times >= count and not handlerFuture.finished:
|
||||
handlerFuture.complete(true)
|
||||
if times >= count and not finished:
|
||||
finished = true
|
||||
|
||||
await nativeNode.connect(NativePeerInfo.init(daemonPeer.peer,
|
||||
daemonPeer.addresses))
|
||||
|
@ -126,12 +126,16 @@ proc testPubSubDaemonPublish(gossip: bool = false,
|
|||
|
||||
asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
|
||||
await nativeNode.subscribe(testTopic, nativeHandler)
|
||||
while times < count:
|
||||
await sleepAsync(1.seconds)
|
||||
await daemonNode.pubsubPublish(testTopic, msgData)
|
||||
await sleepAsync(100.millis)
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
result = await handlerFuture
|
||||
proc publisher() {.async.} =
|
||||
while not finished:
|
||||
await daemonNode.pubsubPublish(testTopic, msgData)
|
||||
await sleepAsync(100.millis)
|
||||
|
||||
await wait(publisher(), 1.minutes) # should be plenty of time
|
||||
|
||||
result = true
|
||||
await nativeNode.stop()
|
||||
await allFutures(awaiters)
|
||||
await daemonNode.close()
|
||||
|
@ -152,7 +156,6 @@ proc testPubSubNodePublish(gossip: bool = false,
|
|||
let awaiters = nativeNode.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
var handlerFuture = newFuture[bool]()
|
||||
await nativeNode.connect(NativePeerInfo.init(daemonPeer.peer,
|
||||
daemonPeer.addresses))
|
||||
|
||||
|
@ -160,26 +163,30 @@ proc testPubSubNodePublish(gossip: bool = false,
|
|||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
|
||||
var times = 0
|
||||
var finished = false
|
||||
proc pubsubHandler(api: DaemonAPI,
|
||||
ticket: PubsubTicket,
|
||||
message: PubSubMessage): Future[bool] {.async.} =
|
||||
let smsg = cast[string](message.data)
|
||||
check smsg == pubsubData
|
||||
times.inc()
|
||||
if times >= count:
|
||||
handlerFuture.complete(true)
|
||||
if times >= count and not finished:
|
||||
finished = true
|
||||
result = true # don't cancel subscription
|
||||
|
||||
discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
|
||||
proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard
|
||||
await nativeNode.subscribe(testTopic, nativeHandler)
|
||||
await sleepAsync(1.seconds)
|
||||
while times < count:
|
||||
await sleepAsync(1.seconds)
|
||||
await nativeNode.publish(testTopic, msgData)
|
||||
await sleepAsync(100.millis)
|
||||
|
||||
result = await handlerFuture
|
||||
proc publisher() {.async.} =
|
||||
while not finished:
|
||||
await nativeNode.publish(testTopic, msgData)
|
||||
await sleepAsync(100.millis)
|
||||
|
||||
await wait(publisher(), 1.minutes) # should be plenty of time
|
||||
|
||||
result = finished
|
||||
await nativeNode.stop()
|
||||
await allFutures(awaiters)
|
||||
await daemonNode.close()
|
||||
|
@ -385,7 +392,7 @@ suite "Interop":
|
|||
check test == cast[string](line)
|
||||
inc(count2)
|
||||
|
||||
result = 10 == (await wait(testFuture, 10.secs))
|
||||
result = 10 == (await wait(testFuture, 1.minutes))
|
||||
await stream.close()
|
||||
await nativeNode.stop()
|
||||
await allFutures(awaiters)
|
||||
|
|
Loading…
Reference in New Issue