Address review feedback

This commit is contained in:
kdeme 2019-04-05 10:13:22 +02:00 committed by zah
parent 6117fed595
commit 9e5cf2086c
5 changed files with 90 additions and 104 deletions

View File

@ -1,12 +1,20 @@
import
chronos/asyncfutures2, chronicles
chronos/[asyncfutures2, asyncloop], chronicles
proc catchOrQuit(error: Exception) =
if error of CatchableError:
trace "Async operation ended with a recoverable error", err = error.msg
else:
fatal "Fatal exception reached", err = error.msg
quit 1
proc traceAsyncErrors*(fut: FutureBase) =
fut.addCallback do (arg: pointer):
if not fut.error.isNil:
if fut.error[] of CatchableError:
trace "Async operation ended with a recoverable error", err = fut.error.msg
else:
fatal "Fatal exception reached", err = fut.error.msg
quit 1
catchOrQuit fut.error[]
template traceAwaitErrors*(fut: FutureBase) =
let f = fut
yield f
if not f.error.isNil:
catchOrQuit f.error[]

View File

@ -1,6 +1,6 @@
import
macros, tables, algorithm, deques, hashes, options, typetraits,
std_shims/macros_shim, chronicles, nimcrypto, chronos, eth/[rlp, common, keys],
std_shims/macros_shim, chronicles, nimcrypto, chronos, eth/[rlp, common, keys, async_utils],
private/p2p_types, kademlia, auth, rlpxcrypt, enode
when useSnappy:
@ -503,6 +503,12 @@ proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
newFuture result
peer.awaitedMessages[wantedId] = result
# Known fatal errors are handled inside dispatchMessages.
# Errors we are currently unaware of are caught in the dispatchMessages
# callback. There they will be logged if CatchableError and quit on Defect.
# Non fatal errors such as the current CatchableError could be moved and
# handled a layer lower for clarity (and consistency), as also the actual
# message handler code as the TODO mentions already.
proc dispatchMessages*(peer: Peer) {.async.} =
while true:
var msgId: int
@ -510,10 +516,10 @@ proc dispatchMessages*(peer: Peer) {.async.} =
try:
(msgId, msgData) = await peer.recvMsg()
except TransportIncompleteError:
trace "Connection dropped in dispatchMessages"
trace "Connection dropped, ending dispatchMessages loop", peer
# This can happen during the rlpx connection setup or at any point after.
# Because this code does not know, a disconnect needs to be done.
asyncDiscard peer.disconnect(ClientQuitting)
await peer.disconnect(ClientQuitting)
return
if msgId == 1: # p2p.disconnect
@ -525,8 +531,9 @@ proc dispatchMessages*(peer: Peer) {.async.} =
try:
await peer.invokeThunk(msgId, msgData)
except RlpError:
debug "ending dispatchMessages loop", peer, err = getCurrentExceptionMsg()
await peer.disconnect(BreachOfProtocol)
debug "RlpError, ending dispatchMessages loop", peer,
err = getCurrentExceptionMsg()
await peer.disconnect(BreachOfProtocol, true)
return
except CatchableError:
warn "Error while handling RLPx message", peer,
@ -545,8 +552,10 @@ proc dispatchMessages*(peer: Peer) {.async.} =
# TODO: Handling errors here must be investigated more carefully.
# They also are supposed to be handled at the call-site where
# `nextMsg` is used.
debug "nextMsg resolver failed", err = getCurrentExceptionMsg()
raise
debug "nextMsg resolver failed, ending dispatchMessages loop", peer,
err = getCurrentExceptionMsg()
await peer.disconnect(BreachOfProtocol, true)
return
peer.awaitedMessages[msgId] = nil
macro p2pProtocolImpl(name: static[string],
@ -1167,13 +1176,13 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = fals
yield fut
if fut.failed:
debug "Failed to delived disconnect message", peer
try:
if not peer.dispatcher.isNil:
await callDisconnectHandlers(peer, reason)
except:
error "Exception in callDisconnectHandlers()",
err = getCurrentExceptionMsg()
finally:
# In case of `CatchableError` in any of the handlers, this will be logged.
# Other handlers will still execute.
# In case of `Defect` in any of the handlers, program will quit.
traceAwaitErrors callDisconnectHandlers(peer, reason)
logDisconnectedPeer peer
peer.connectionState = Disconnected
removePeer(peer.network, peer)
@ -1237,7 +1246,7 @@ proc postHelloSteps(peer: Peer, h: devp2p.hello) {.async.} =
if messageProcessingLoop.failed:
debug "Ending dispatchMessages loop", peer,
err = messageProcessingLoop.error.msg
asyncDiscard peer.disconnect(ClientQuitting)
traceAsyncErrors peer.disconnect(ClientQuitting)
# The handshake may involve multiple async steps, so we wait
# here for all of them to finish.

View File

@ -0,0 +1,34 @@
import
unittest, chronos, eth/[keys, p2p], eth/p2p/[discovery, enode]
var nextPort = 30303
proc localAddress(port: int): Address =
let port = Port(port)
result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1"))
proc startDiscoveryNode(privKey: PrivateKey, address: Address,
bootnodes: seq[ENode]): Future[DiscoveryProtocol] {.async.} =
result = newDiscoveryProtocol(privKey, address, bootnodes)
result.open()
await result.bootstrap()
proc setupBootNode*(): Future[ENode] {.async.} =
let
bootNodeKey = newPrivateKey()
bootNodeAddr = localAddress(30301)
bootNode = await startDiscoveryNode(bootNodeKey, bootNodeAddr, @[])
result = initENode(bootNodeKey.getPublicKey, bootNodeAddr)
proc setupTestNode*(capabilities: varargs[ProtocolInfo, `protocolInfo`]): EthereumNode =
let keys1 = newKeyPair()
result = newEthereumNode(keys1, localAddress(nextPort), 1, nil,
addAllCapabilities = false)
nextPort.inc
for capability in capabilities:
result.addCapability capability
template asyncTest*(name, body: untyped) =
test name:
proc scenario {.async.} = body
waitFor scenario()

View File

@ -8,31 +8,8 @@
# MIT license (LICENSE-MIT)
import
unittest, tables, chronos, eth/[keys, p2p], eth/p2p/[discovery, enode]
var nextPort = 30303
proc localAddress(port: int): Address =
let port = Port(port)
result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1"))
proc startDiscoveryNode(privKey: PrivateKey, address: Address,
bootnodes: seq[ENode]): Future[DiscoveryProtocol] {.async.} =
result = newDiscoveryProtocol(privKey, address, bootnodes)
result.open()
await result.bootstrap()
proc setupBootNode(): Future[ENode] {.async.} =
let
bootNodeKey = newPrivateKey()
bootNodeAddr = localAddress(30301)
bootNode = await startDiscoveryNode(bootNodeKey, bootNodeAddr, @[])
result = initENode(bootNodeKey.getPublicKey, bootNodeAddr)
template asyncTest(name, body: untyped) =
test name:
proc scenario {.async.} = body
waitFor scenario()
unittest, tables, chronos, eth/p2p,
./p2p_test_helper
type
network = ref object
@ -48,7 +25,7 @@ p2pProtocol abc(version = 1,
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}:
peer.networkState.count -= 1
if true:
raise newException(UnsupportedProtocol, "Fake abc exception")
raise newException(CatchableError, "Fake abc exception")
p2pProtocol xyz(version = 1,
shortName = "xyz",
@ -60,21 +37,13 @@ p2pProtocol xyz(version = 1,
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}:
peer.networkState.count -= 1
if true:
raise newException(UnsupportedProtocol, "Fake xyz exception")
raise newException(CatchableError, "Fake xyz exception")
proc prepTestNode(): EthereumNode =
let keys1 = newKeyPair()
result = newEthereumNode(keys1, localAddress(nextPort), 1, nil,
addAllCapabilities = false)
nextPort.inc
result.addCapability abc
result.addCapability xyz
suite "Failing handlers":
suite "Testing protocol handlers":
asyncTest "Failing disconnect handler":
let bootENode = waitFor setupBootNode()
var node1 = prepTestNode()
var node2 = prepTestNode()
var node1 = setupTestNode(abc, xyz)
var node2 = setupTestNode(abc, xyz)
# node2 listening and node1 not, to avoid many incoming vs outgoing
var node1Connected = node1.connectToNetwork(@[bootENode], false, true)
var node2Connected = node2.connectToNetwork(@[bootENode], true, true)

View File

@ -8,52 +8,18 @@
# MIT license (LICENSE-MIT)
import
sequtils, options, unittest, tables, chronos, eth/[rlp, keys, p2p],
eth/p2p/rlpx_protocols/[whisper_protocol], eth/p2p/[discovery, enode]
const
useCompression = defined(useSnappy)
var nextPort = 30303
proc localAddress(port: int): Address =
let port = Port(port)
result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1"))
proc startDiscoveryNode(privKey: PrivateKey, address: Address,
bootnodes: seq[ENode]): Future[DiscoveryProtocol] {.async.} =
result = newDiscoveryProtocol(privKey, address, bootnodes)
result.open()
await result.bootstrap()
proc setupBootNode(): Future[ENode] {.async.} =
let
bootNodeKey = newPrivateKey()
bootNodeAddr = localAddress(30301)
bootNode = await startDiscoveryNode(bootNodeKey, bootNodeAddr, @[])
result = initENode(bootNodeKey.getPublicKey, bootNodeAddr)
template asyncTest(name, body: untyped) =
test name:
proc scenario {.async.} = body
waitFor scenario()
sequtils, options, unittest, tables, chronos, eth/[keys, p2p],
eth/p2p/rlpx_protocols/[whisper_protocol],
./p2p_test_helper
proc resetMessageQueues(nodes: varargs[EthereumNode]) =
for node in nodes:
node.resetMessageQueue()
proc prepTestNode(): EthereumNode =
let keys1 = newKeyPair()
result = newEthereumNode(keys1, localAddress(nextPort), 1, nil,
addAllCapabilities = false,
useCompression = useCompression)
nextPort.inc
result.addCapability Whisper
let bootENode = waitFor setupBootNode()
var node1 = prepTestNode()
var node2 = prepTestNode()
var node1 = setupTestNode(Whisper)
var node2 = setupTestNode(Whisper)
# node2 listening and node1 not, to avoid many incoming vs outgoing
var node1Connected = node1.connectToNetwork(@[bootENode], false, true)
var node2Connected = node2.connectToNetwork(@[bootENode], true, true)
@ -352,7 +318,7 @@ suite "Whisper connections":
node1.unsubscribeFilter(filter) == true
test "Light node posting":
var ln1 = prepTestNode()
var ln1 = setupTestNode(Whisper)
ln1.setLightNode(true)
# not listening, so will only connect to others that are listening (node2)
@ -373,8 +339,8 @@ suite "Whisper connections":
ln1.protocolState(Whisper).queue.items.len == 0
test "Connect two light nodes":
var ln1 = prepTestNode()
var ln2 = prepTestNode()
var ln1 = setupTestNode(Whisper)
var ln2 = setupTestNode(Whisper)
ln1.setLightNode(true)
ln2.setLightNode(true)