mirror of https://github.com/status-im/nim-eth.git
Fix unhandled exceptions
This commit is contained in:
parent
55e1eff4a9
commit
b977996b0a
|
@ -194,6 +194,14 @@ proc requestResolver[MsgType](msg: pointer, future: FutureBase) {.gcsafe.} =
|
||||||
try:
|
try:
|
||||||
if not f.read.isSome:
|
if not f.read.isSome:
|
||||||
doAssert false, "a request timed out twice"
|
doAssert false, "a request timed out twice"
|
||||||
|
# This can except when the future still completes with an error.
|
||||||
|
# E.g. the `sendMsg` fails because of an already closed transport or a
|
||||||
|
# broken pipe
|
||||||
|
except TransportOsError:
|
||||||
|
# E.g. broken pipe
|
||||||
|
trace "TransportOsError during request", err = getCurrentExceptionMsg()
|
||||||
|
except TransportError:
|
||||||
|
trace "Transport got closed during request"
|
||||||
except:
|
except:
|
||||||
debug "Exception in requestResolver()",
|
debug "Exception in requestResolver()",
|
||||||
exc = getCurrentException().name,
|
exc = getCurrentException().name,
|
||||||
|
@ -516,9 +524,11 @@ proc dispatchMessages*(peer: Peer) {.async.} =
|
||||||
var msgData: Rlp
|
var msgData: Rlp
|
||||||
try:
|
try:
|
||||||
(msgId, msgData) = await peer.recvMsg()
|
(msgId, msgData) = await peer.recvMsg()
|
||||||
except TransportIncompleteError:
|
except TransportError:
|
||||||
# Note: Could also "Transport is already closed!" error occur? Might have
|
# Note: This will also catch TransportIncompleteError. TransportError will
|
||||||
# to change here to the general TransportError.
|
# here usually occur when a read is attempted when the transport is
|
||||||
|
# already closed. TransportIncompleteError when the transport is closed
|
||||||
|
# during read.
|
||||||
case peer.connectionState
|
case peer.connectionState
|
||||||
of Connected:
|
of Connected:
|
||||||
# Dropped connection, still need to cleanup the peer.
|
# Dropped connection, still need to cleanup the peer.
|
||||||
|
@ -544,8 +554,7 @@ proc dispatchMessages*(peer: Peer) {.async.} =
|
||||||
try:
|
try:
|
||||||
await peer.invokeThunk(msgId, msgData)
|
await peer.invokeThunk(msgId, msgData)
|
||||||
except RlpError:
|
except RlpError:
|
||||||
debug "RlpError, ending dispatchMessages loop", peer,
|
debug "RlpError, ending dispatchMessages loop", peer
|
||||||
err = getCurrentExceptionMsg()
|
|
||||||
await peer.disconnect(BreachOfProtocol, true)
|
await peer.disconnect(BreachOfProtocol, true)
|
||||||
return
|
return
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
|
|
||||||
import
|
import
|
||||||
algorithm, bitops, endians, math, options, sequtils, strutils, tables, times,
|
algorithm, bitops, endians, math, options, sequtils, strutils, tables, times,
|
||||||
secp256k1, chronicles, chronos, eth/common/eth_types, eth/[keys, rlp],
|
secp256k1, chronicles, chronos, eth/common/eth_types, eth/[keys, rlp, async_utils],
|
||||||
hashes, byteutils, nimcrypto/[bcmode, hash, keccak, rijndael, sysrand],
|
hashes, byteutils, nimcrypto/[bcmode, hash, keccak, rijndael, sysrand],
|
||||||
eth/p2p, ../ecies
|
eth/p2p, ../ecies
|
||||||
|
|
||||||
|
@ -554,7 +554,7 @@ proc initQueue*(capacity: int): Queue =
|
||||||
result.capacity = capacity
|
result.capacity = capacity
|
||||||
result.itemHashes.init()
|
result.itemHashes.init()
|
||||||
|
|
||||||
proc prune(self: var Queue) =
|
proc prune(self: var Queue) {.raises: [].} =
|
||||||
## Remove items that are past their expiry time
|
## Remove items that are past their expiry time
|
||||||
let now = epochTime().uint32
|
let now = epochTime().uint32
|
||||||
|
|
||||||
|
@ -766,7 +766,7 @@ p2pProtocol Whisper(version = whisperVersion,
|
||||||
whisperPeer.initialized = true
|
whisperPeer.initialized = true
|
||||||
|
|
||||||
if not whisperNet.config.isLightNode:
|
if not whisperNet.config.isLightNode:
|
||||||
asyncCheck peer.run()
|
traceAsyncErrors peer.run()
|
||||||
|
|
||||||
debug "Whisper peer initialized"
|
debug "Whisper peer initialized"
|
||||||
|
|
||||||
|
@ -860,6 +860,7 @@ p2pProtocol Whisper(version = whisperVersion,
|
||||||
# 'Runner' calls ---------------------------------------------------------------
|
# 'Runner' calls ---------------------------------------------------------------
|
||||||
|
|
||||||
proc processQueue(peer: Peer) =
|
proc processQueue(peer: Peer) =
|
||||||
|
# Send to peer all valid and previously not send envelopes in the queue.
|
||||||
var
|
var
|
||||||
envelopes: seq[Envelope] = @[]
|
envelopes: seq[Envelope] = @[]
|
||||||
whisperPeer = peer.state(Whisper)
|
whisperPeer = peer.state(Whisper)
|
||||||
|
@ -884,8 +885,9 @@ proc processQueue(peer: Peer) =
|
||||||
whisperPeer.received.incl(message)
|
whisperPeer.received.incl(message)
|
||||||
|
|
||||||
trace "Sending envelopes", amount=envelopes.len
|
trace "Sending envelopes", amount=envelopes.len
|
||||||
# await peer.messages(envelopes)
|
# Ignore failure of sending messages, this could occur when the connection
|
||||||
asyncCheck peer.messages(envelopes)
|
# gets dropped
|
||||||
|
traceAsyncErrors peer.messages(envelopes)
|
||||||
|
|
||||||
proc run(peer: Peer) {.async.} =
|
proc run(peer: Peer) {.async.} =
|
||||||
var
|
var
|
||||||
|
@ -897,7 +899,7 @@ proc run(peer: Peer) {.async.} =
|
||||||
peer.processQueue()
|
peer.processQueue()
|
||||||
await sleepAsync(messageInterval)
|
await sleepAsync(messageInterval)
|
||||||
|
|
||||||
proc pruneReceived(node: EthereumNode) =
|
proc pruneReceived(node: EthereumNode) {.raises: [].} =
|
||||||
if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ...
|
if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ...
|
||||||
var whisperNet = node.protocolState(Whisper)
|
var whisperNet = node.protocolState(Whisper)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue