diff --git a/eth/async_utils.nim b/eth/async_utils.nim index f456038..964bb6b 100644 --- a/eth/async_utils.nim +++ b/eth/async_utils.nim @@ -1,12 +1,20 @@ import - chronos/asyncfutures2, chronicles + chronos/[asyncfutures2, asyncloop], chronicles + +proc catchOrQuit(error: Exception) = + if error of CatchableError: + trace "Async operation ended with a recoverable error", err = error.msg + else: + fatal "Fatal exception reached", err = error.msg + quit 1 proc traceAsyncErrors*(fut: FutureBase) = fut.addCallback do (arg: pointer): if not fut.error.isNil: - if fut.error[] of CatchableError: - trace "Async operation ended with a recoverable error", err = fut.error.msg - else: - fatal "Fatal exception reached", err = fut.error.msg - quit 1 + catchOrQuit fut.error[] +template traceAwaitErrors*(fut: FutureBase) = + let f = fut + yield f + if not f.error.isNil: + catchOrQuit f.error[] diff --git a/eth/p2p/rlpx.nim b/eth/p2p/rlpx.nim index 778a3f6..da80edd 100644 --- a/eth/p2p/rlpx.nim +++ b/eth/p2p/rlpx.nim @@ -1,6 +1,6 @@ import macros, tables, algorithm, deques, hashes, options, typetraits, - std_shims/macros_shim, chronicles, nimcrypto, chronos, eth/[rlp, common, keys], + std_shims/macros_shim, chronicles, nimcrypto, chronos, eth/[rlp, common, keys, async_utils], private/p2p_types, kademlia, auth, rlpxcrypt, enode when useSnappy: @@ -503,6 +503,12 @@ proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = newFuture result peer.awaitedMessages[wantedId] = result +# Known fatal errors are handled inside dispatchMessages. +# Errors we are currently unaware of are caught in the dispatchMessages +# callback. There they will be logged if CatchableError and quit on Defect. +# Non fatal errors such as the current CatchableError could be moved and +# handled a layer lower for clarity (and consistency), as also the actual +# message handler code as the TODO mentions already. proc dispatchMessages*(peer: Peer) {.async.} = while true: var msgId: int @@ -510,10 +516,10 @@ proc dispatchMessages*(peer: Peer) {.async.} = try: (msgId, msgData) = await peer.recvMsg() except TransportIncompleteError: - trace "Connection dropped in dispatchMessages" + trace "Connection dropped, ending dispatchMessages loop", peer # This can happen during the rlpx connection setup or at any point after. # Because this code does not know, a disconnect needs to be done. - asyncDiscard peer.disconnect(ClientQuitting) + await peer.disconnect(ClientQuitting) return if msgId == 1: # p2p.disconnect @@ -525,8 +531,9 @@ proc dispatchMessages*(peer: Peer) {.async.} = try: await peer.invokeThunk(msgId, msgData) except RlpError: - debug "ending dispatchMessages loop", peer, err = getCurrentExceptionMsg() - await peer.disconnect(BreachOfProtocol) + debug "RlpError, ending dispatchMessages loop", peer, + err = getCurrentExceptionMsg() + await peer.disconnect(BreachOfProtocol, true) return except CatchableError: warn "Error while handling RLPx message", peer, @@ -545,8 +552,10 @@ proc dispatchMessages*(peer: Peer) {.async.} = # TODO: Handling errors here must be investigated more carefully. # They also are supposed to be handled at the call-site where # `nextMsg` is used. - debug "nextMsg resolver failed", err = getCurrentExceptionMsg() - raise + debug "nextMsg resolver failed, ending dispatchMessages loop", peer, + err = getCurrentExceptionMsg() + await peer.disconnect(BreachOfProtocol, true) + return peer.awaitedMessages[msgId] = nil macro p2pProtocolImpl(name: static[string], @@ -1167,16 +1176,16 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = fals yield fut if fut.failed: debug "Failed to delived disconnect message", peer - try: - if not peer.dispatcher.isNil: - await callDisconnectHandlers(peer, reason) - except: - error "Exception in callDisconnectHandlers()", - err = getCurrentExceptionMsg() - finally: - logDisconnectedPeer peer - peer.connectionState = Disconnected - removePeer(peer.network, peer) + + if not peer.dispatcher.isNil: + # In case of `CatchableError` in any of the handlers, this will be logged. + # Other handlers will still execute. + # In case of `Defect` in any of the handlers, program will quit. + traceAwaitErrors callDisconnectHandlers(peer, reason) + + logDisconnectedPeer peer + peer.connectionState = Disconnected + removePeer(peer.network, peer) proc validatePubKeyInHello(msg: devp2p.hello, pubKey: PublicKey): bool = var pk: PublicKey @@ -1237,7 +1246,7 @@ proc postHelloSteps(peer: Peer, h: devp2p.hello) {.async.} = if messageProcessingLoop.failed: debug "Ending dispatchMessages loop", peer, err = messageProcessingLoop.error.msg - asyncDiscard peer.disconnect(ClientQuitting) + traceAsyncErrors peer.disconnect(ClientQuitting) # The handshake may involve multiple async steps, so we wait # here for all of them to finish. diff --git a/tests/p2p/p2p_test_helper.nim b/tests/p2p/p2p_test_helper.nim new file mode 100644 index 0000000..5381a0f --- /dev/null +++ b/tests/p2p/p2p_test_helper.nim @@ -0,0 +1,34 @@ +import + unittest, chronos, eth/[keys, p2p], eth/p2p/[discovery, enode] + +var nextPort = 30303 + +proc localAddress(port: int): Address = + let port = Port(port) + result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1")) + +proc startDiscoveryNode(privKey: PrivateKey, address: Address, + bootnodes: seq[ENode]): Future[DiscoveryProtocol] {.async.} = + result = newDiscoveryProtocol(privKey, address, bootnodes) + result.open() + await result.bootstrap() + +proc setupBootNode*(): Future[ENode] {.async.} = + let + bootNodeKey = newPrivateKey() + bootNodeAddr = localAddress(30301) + bootNode = await startDiscoveryNode(bootNodeKey, bootNodeAddr, @[]) + result = initENode(bootNodeKey.getPublicKey, bootNodeAddr) + +proc setupTestNode*(capabilities: varargs[ProtocolInfo, `protocolInfo`]): EthereumNode = + let keys1 = newKeyPair() + result = newEthereumNode(keys1, localAddress(nextPort), 1, nil, + addAllCapabilities = false) + nextPort.inc + for capability in capabilities: + result.addCapability capability + +template asyncTest*(name, body: untyped) = + test name: + proc scenario {.async.} = body + waitFor scenario() diff --git a/tests/p2p/test_failing_handler.nim b/tests/p2p/test_failing_handler.nim index a5d7cf1..8694bbd 100644 --- a/tests/p2p/test_failing_handler.nim +++ b/tests/p2p/test_failing_handler.nim @@ -8,31 +8,8 @@ # MIT license (LICENSE-MIT) import - unittest, tables, chronos, eth/[keys, p2p], eth/p2p/[discovery, enode] - -var nextPort = 30303 - -proc localAddress(port: int): Address = - let port = Port(port) - result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1")) - -proc startDiscoveryNode(privKey: PrivateKey, address: Address, - bootnodes: seq[ENode]): Future[DiscoveryProtocol] {.async.} = - result = newDiscoveryProtocol(privKey, address, bootnodes) - result.open() - await result.bootstrap() - -proc setupBootNode(): Future[ENode] {.async.} = - let - bootNodeKey = newPrivateKey() - bootNodeAddr = localAddress(30301) - bootNode = await startDiscoveryNode(bootNodeKey, bootNodeAddr, @[]) - result = initENode(bootNodeKey.getPublicKey, bootNodeAddr) - -template asyncTest(name, body: untyped) = - test name: - proc scenario {.async.} = body - waitFor scenario() + unittest, tables, chronos, eth/p2p, + ./p2p_test_helper type network = ref object @@ -48,7 +25,7 @@ p2pProtocol abc(version = 1, onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}: peer.networkState.count -= 1 if true: - raise newException(UnsupportedProtocol, "Fake abc exception") + raise newException(CatchableError, "Fake abc exception") p2pProtocol xyz(version = 1, shortName = "xyz", @@ -60,21 +37,13 @@ p2pProtocol xyz(version = 1, onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}: peer.networkState.count -= 1 if true: - raise newException(UnsupportedProtocol, "Fake xyz exception") + raise newException(CatchableError, "Fake xyz exception") -proc prepTestNode(): EthereumNode = - let keys1 = newKeyPair() - result = newEthereumNode(keys1, localAddress(nextPort), 1, nil, - addAllCapabilities = false) - nextPort.inc - result.addCapability abc - result.addCapability xyz - -suite "Failing handlers": +suite "Testing protocol handlers": asyncTest "Failing disconnect handler": let bootENode = waitFor setupBootNode() - var node1 = prepTestNode() - var node2 = prepTestNode() + var node1 = setupTestNode(abc, xyz) + var node2 = setupTestNode(abc, xyz) # node2 listening and node1 not, to avoid many incoming vs outgoing var node1Connected = node1.connectToNetwork(@[bootENode], false, true) var node2Connected = node2.connectToNetwork(@[bootENode], true, true) diff --git a/tests/p2p/test_shh_connect.nim b/tests/p2p/test_shh_connect.nim index 831e664..bad506e 100644 --- a/tests/p2p/test_shh_connect.nim +++ b/tests/p2p/test_shh_connect.nim @@ -8,52 +8,18 @@ # MIT license (LICENSE-MIT) import - sequtils, options, unittest, tables, chronos, eth/[rlp, keys, p2p], - eth/p2p/rlpx_protocols/[whisper_protocol], eth/p2p/[discovery, enode] - -const - useCompression = defined(useSnappy) - -var nextPort = 30303 - -proc localAddress(port: int): Address = - let port = Port(port) - result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1")) - -proc startDiscoveryNode(privKey: PrivateKey, address: Address, - bootnodes: seq[ENode]): Future[DiscoveryProtocol] {.async.} = - result = newDiscoveryProtocol(privKey, address, bootnodes) - result.open() - await result.bootstrap() - -proc setupBootNode(): Future[ENode] {.async.} = - let - bootNodeKey = newPrivateKey() - bootNodeAddr = localAddress(30301) - bootNode = await startDiscoveryNode(bootNodeKey, bootNodeAddr, @[]) - result = initENode(bootNodeKey.getPublicKey, bootNodeAddr) - -template asyncTest(name, body: untyped) = - test name: - proc scenario {.async.} = body - waitFor scenario() + sequtils, options, unittest, tables, chronos, eth/[keys, p2p], + eth/p2p/rlpx_protocols/[whisper_protocol], + ./p2p_test_helper proc resetMessageQueues(nodes: varargs[EthereumNode]) = for node in nodes: node.resetMessageQueue() -proc prepTestNode(): EthereumNode = - let keys1 = newKeyPair() - result = newEthereumNode(keys1, localAddress(nextPort), 1, nil, - addAllCapabilities = false, - useCompression = useCompression) - nextPort.inc - result.addCapability Whisper - let bootENode = waitFor setupBootNode() -var node1 = prepTestNode() -var node2 = prepTestNode() +var node1 = setupTestNode(Whisper) +var node2 = setupTestNode(Whisper) # node2 listening and node1 not, to avoid many incoming vs outgoing var node1Connected = node1.connectToNetwork(@[bootENode], false, true) var node2Connected = node2.connectToNetwork(@[bootENode], true, true) @@ -352,7 +318,7 @@ suite "Whisper connections": node1.unsubscribeFilter(filter) == true test "Light node posting": - var ln1 = prepTestNode() + var ln1 = setupTestNode(Whisper) ln1.setLightNode(true) # not listening, so will only connect to others that are listening (node2) @@ -373,8 +339,8 @@ suite "Whisper connections": ln1.protocolState(Whisper).queue.items.len == 0 test "Connect two light nodes": - var ln1 = prepTestNode() - var ln2 = prepTestNode() + var ln1 = setupTestNode(Whisper) + var ln2 = setupTestNode(Whisper) ln1.setLightNode(true) ln2.setLightNode(true)