better error handling

- changed the logging level of some messages that should not appear by
  default in a debug build
- most errors in persistWorkItem() are gracefully recovered from
- fixed the handling of out of order blocks
- dropped work items with fewer blocks than what we requested
- getBestBlockNumber(): log and allow exceptions from getBlockHeaders()
- obtainBlocksFromPeer(): moved here the check for block numbers being
  in sequence (from nimbus/p2p/chain.nim)
- sendMsg(): log and catch exceptions in callers
This commit is contained in:
Ștefan Talpalaru 2018-12-14 13:54:43 +01:00
parent 8cad437112
commit be476feeec
No known key found for this signature in database
GPG Key ID: CBF7934204F1B6F9
6 changed files with 116 additions and 75 deletions

View File

@ -74,7 +74,7 @@ proc processIncoming(server: StreamServer,
remote.close()
proc startListening*(node: EthereumNode) =
info "RLPx listener up", self = initENode(node.keys.pubKey, node.address)
trace "RLPx listener up", self = initENode(node.keys.pubKey, node.address)
let ta = initTAddress(node.address.ip, node.address.tcpPort)
if node.listeningServer == nil:
node.listeningServer = createStreamServer(ta, processIncoming,
@ -107,12 +107,12 @@ proc connectToNetwork*(node: EthereumNode,
node.discovery.open()
await node.discovery.bootstrap()
else:
info "Disovery disabled"
info "Discovery disabled"
node.peerPool.start()
while node.peerPool.connectedNodes.len == 0:
debug "Waiting for more peers", peers = node.peerPool.connectedNodes.len
trace "Waiting for more peers", peers = node.peerPool.connectedNodes.len
await sleepAsync(500)
proc stopListening*(node: EthereumNode) =

View File

@ -44,7 +44,7 @@ proc endIndex(b: WantedBlocks): BlockNumber =
proc availableWorkItem(ctx: SyncContext): int =
var maxPendingBlock = ctx.finalizedBlock
echo "queue len: ", ctx.workQueue.len
trace "queue len", length = ctx.workQueue.len
result = -1
for i in 0 .. ctx.workQueue.high:
case ctx.workQueue[i].state
@ -72,17 +72,21 @@ proc availableWorkItem(ctx: SyncContext): int =
ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial)
proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks) =
ctx.chain.persistBlocks(wi.headers, wi.bodies)
wi.headers.setLen(0)
wi.bodies.setLen(0)
case ctx.chain.persistBlocks(wi.headers, wi.bodies)
of ValidationResult.OK:
ctx.finalizedBlock = wi.endIndex
wi.state = Persisted
of ValidationResult.Error:
wi.state = Initial
# successful or not, we're done with these blocks
wi.headers.setLen(0)
wi.bodies.setLen(0)
proc persistPendingWorkItems(ctx: SyncContext) =
var nextStartIndex = ctx.finalizedBlock + 1
var keepRunning = true
var hasOutOfOrderBlocks = false
debug "Looking for out of order blocks"
trace "Looking for out of order blocks"
while keepRunning:
keepRunning = false
hasOutOfOrderBlocks = false
@ -90,7 +94,7 @@ proc persistPendingWorkItems(ctx: SyncContext) =
let start = ctx.workQueue[i].startIndex
if ctx.workQueue[i].state == Received:
if start == nextStartIndex:
debug "Persisting pending work item", start
trace "Persisting pending work item", start
ctx.persistWorkItem(ctx.workQueue[i])
nextStartIndex = ctx.finalizedBlock + 1
keepRunning = true
@ -100,29 +104,32 @@ proc persistPendingWorkItems(ctx: SyncContext) =
ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks
proc returnWorkItem(ctx: SyncContext, workItem: int) =
proc returnWorkItem(ctx: SyncContext, workItem: int): ValidationResult =
let wi = addr ctx.workQueue[workItem]
let askedBlocks = wi.numBlocks.int
let receivedBlocks = wi.headers.len
let start = wi.startIndex
if askedBlocks == receivedBlocks:
debug "Work item complete", start,
askedBlocks,
receivedBlocks
else:
warn "Work item complete", start,
trace "Work item complete",
start,
askedBlocks,
receivedBlocks
if wi.startIndex != ctx.finalizedBlock + 1:
info "Blocks out of order", start, final = ctx.finalizedBlock
trace "Blocks out of order", start, final = ctx.finalizedBlock
ctx.hasOutOfOrderBlocks = true
else:
info "Persisting blocks", start
ctx.persistWorkItem(wi[])
if ctx.hasOutOfOrderBlocks:
ctx.persistPendingWorkItems()
else:
ctx.persistWorkItem(wi[])
else:
trace "Work item complete but we got fewer blocks than requested, so we're ditching the whole thing.",
start,
askedBlocks,
receivedBlocks
return ValidationResult.Error
proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext =
new result
@ -151,18 +158,24 @@ proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} =
proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
# Update our best block number
try:
let bestBlockNumber = await peer.getBestBlockNumber()
if bestBlockNumber > syncCtx.endBlockNumber:
info "New sync end block number", number = bestBlockNumber
trace "New sync end block number", number = bestBlockNumber
syncCtx.endBlockNumber = bestBlockNumber
except:
debug "Exception in getBestBlockNumber()",
exc = getCurrentException().name,
err = getCurrentExceptionMsg()
# no need to exit here, because the context might still have blocks to fetch
# from this peer
while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1):
template workItem: auto = syncCtx.workQueue[workItemIdx]
workItem.state = Requested
debug "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks, peer
trace "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks, peer
let request = BlocksRequest(
startBlock: HashOrNum(isHash: false,
number: workItem.startIndex),
startBlock: HashOrNum(isHash: false, number: workItem.startIndex),
maxResults: workItem.numBlocks,
skip: 0,
reverse: false)
@ -175,7 +188,12 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
var bodies = newSeq[BlockBody]()
var hashes = newSeq[KeccakHash]()
var nextIndex = workItem.startIndex
for i in workItem.headers:
if i.blockNumber != nextIndex:
raise newException(Exception, "The block numbers are not in sequence. Not processing this workItem.")
else:
nextIndex = nextIndex + 1
hashes.add(blockHash(i))
if hashes.len == maxBodiesFetch:
let b = await peer.getBlockBodies(hashes)
@ -192,15 +210,23 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
else:
warn "Bodies len != headers.len", bodies = bodies.len, headers = workItem.headers.len
except:
# the success case uses `continue`, so we can just fall back to the
# the success case sets `dataReceived`, so we can just fall back to the
# failure path below. If we signal time-outs with exceptions such
# failures will be easier to handle.
discard
debug "Exception in obtainBlocksFromPeer()",
exc = getCurrentException().name,
err = getCurrentExceptionMsg()
var giveUpOnPeer = false
if dataReceived:
workItem.state = Received
syncCtx.returnWorkItem workItemIdx
if syncCtx.returnWorkItem(workItemIdx) != ValidationResult.OK:
giveUpOnPeer = true
else:
giveUpOnPeer = true
if giveUpOnPeer:
workItem.state = Initial
try:
await peer.disconnect(SubprotocolReason)
@ -209,10 +235,10 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
syncCtx.handleLostPeer()
break
debug "Fininshed otaining blocks", peer
trace "Finished obtaining blocks", peer
proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
# Returns true if one of the peers acknowledges existense of the best block
# Returns true if one of the peers acknowledges existence of the best block
# of another peer.
var
a = a
@ -240,7 +266,7 @@ proc randomTrustedPeer(ctx: SyncContext): Peer =
inc i
proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
debug "start sync ", peer, trustedPeers = ctx.trustedPeers.len
trace "start sync", peer, trustedPeers = ctx.trustedPeers.len
if ctx.trustedPeers.len >= minPeersToStartSync:
# We have enough trusted peers. Validate new peer against trusted
if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()):
@ -249,7 +275,7 @@ proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
elif ctx.trustedPeers.len == 0:
# Assume the peer is trusted, but don't start sync until we reevaluate
# it with more peers
debug "Assume trusted peer", peer
trace "Assume trusted peer", peer
ctx.trustedPeers.incl(peer)
else:
# At this point we have some "trusted" candidates, but they are not
@ -271,13 +297,13 @@ proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
let disagreeScore = ctx.trustedPeers.len - agreeScore
if agreeScore == ctx.trustedPeers.len:
ctx.trustedPeers.incl(peer) # The best possible outsome
ctx.trustedPeers.incl(peer) # The best possible outcome
elif disagreeScore == 1:
info "Peer is no more trusted for sync", peer
trace "Peer is no longer trusted for sync", peer
ctx.trustedPeers.excl(disagreedPeer)
ctx.trustedPeers.incl(peer)
else:
info "Peer not trusted for sync", peer
trace "Peer not trusted for sync", peer
if ctx.trustedPeers.len == minPeersToStartSync:
for p in ctx.trustedPeers:
@ -285,15 +311,20 @@ proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
proc onPeerConnected(ctx: SyncContext, peer: Peer) =
debug "New candidate for sync", peer
discard
trace "New candidate for sync", peer
try:
let f = ctx.startSyncWithPeer(peer)
f.callback = proc(data: pointer) {.gcsafe.} =
if f.failed:
error "startSyncWithPeer failed", msg = f.readError.msg, peer
except:
debug "Exception in startSyncWithPeer()",
exc = getCurrentException().name,
err = getCurrentExceptionMsg()
proc onPeerDisconnected(ctx: SyncContext, p: Peer) =
debug "peer disconnected ", peer = p
trace "peer disconnected ", peer = p
ctx.trustedPeers.excl(p)
proc startSync(ctx: SyncContext) =

View File

@ -10,7 +10,7 @@
from strutils import nil
import times, algorithm, logging
import asyncdispatch2, eth_keys, ranges, stint, nimcrypto, rlp
import asyncdispatch2, eth_keys, ranges, stint, nimcrypto, rlp, chronicles
import kademlia, enode
export Node
@ -100,22 +100,22 @@ proc expiration(): uint32 =
proc send(d: DiscoveryProtocol, n: Node, data: seq[byte]) =
let ta = initTAddress(n.node.address.ip, n.node.address.udpPort)
let f = d.transp.sendTo(ta, data)
f.callback = proc(data: pointer) =
f.callback = proc(data: pointer) {.gcsafe.} =
if f.failed:
error "Discovery send failed: ", f.readError.msg
debug "Discovery send failed", msg = f.readError.msg
proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] =
let payload = rlp.encode((PROTO_VERSION, d.address, n.node.address,
expiration())).toRange
let msg = pack(cmdPing, payload, d.privKey)
result = msg[0 ..< MAC_SIZE]
debug ">>> ping ", n
trace ">>> ping ", n
d.send(n, msg)
proc sendPong*(d: DiscoveryProtocol, n: Node, token: MDigest[256]) =
let payload = rlp.encode((n.node.address, token, expiration())).toRange
let msg = pack(cmdPong, payload, d.privKey)
debug ">>> pong ", n
trace ">>> pong ", n
d.send(n, msg)
proc sendFindNode*(d: DiscoveryProtocol, n: Node, targetNodeId: NodeId) =
@ -123,7 +123,7 @@ proc sendFindNode*(d: DiscoveryProtocol, n: Node, targetNodeId: NodeId) =
data[32 .. ^1] = targetNodeId.toByteArrayBE()
let payload = rlp.encode((data, expiration())).toRange
let msg = pack(cmdFindNode, payload, d.privKey)
debug ">>> find_node to ", n#, ": ", msg.toHex()
trace ">>> find_node to ", n#, ": ", msg.toHex()
d.send(n, msg)
proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) =
@ -136,7 +136,7 @@ proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) =
block:
let payload = rlp.encode((nodes, expiration())).toRange
let msg = pack(cmdNeighbours, payload, d.privkey)
debug ">>> neighbours to ", node, ": ", nodes
trace "Neighbours to", node, nodes
d.send(node, msg)
nodes.setLen(0)
@ -202,7 +202,7 @@ proc recvNeighbours(d: DiscoveryProtocol, node: Node,
proc recvFindNode(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} =
let rlp = rlpFromBytes(payload.toRange)
debug "<<< find_node from ", node
trace "<<< find_node from ", node
let rng = rlp.listElem(0).toBytes
let nodeId = readUIntBE[256](rng[32 .. ^1].toOpenArray())
d.kademlia.recvFindNode(node, nodeId)
@ -232,9 +232,9 @@ proc receive(d: DiscoveryProtocol, a: Address, msg: Bytes) =
of cmdFindNode:
d.recvFindNode(node, payload)
else:
echo "Unknown command: ", cmdId
debug "Unknown command", cmdId
else:
debug "Received msg ", cmdId, " from ", a, " already expired"
trace "Received msg already expired", cmdId, a
else:
error "Wrong public key from ", a
else:
@ -252,7 +252,7 @@ proc processClient(transp: DatagramTransport,
let a = Address(ip: raddr.address, udpPort: raddr.port, tcpPort: raddr.port)
proto.receive(a, buf)
except:
error "receive failed: ", getCurrentExceptionMsg()
debug "receive failed", exception = getCurrentExceptionMsg()
proc open*(d: DiscoveryProtocol) =
let ta = initTAddress(d.address.ip, d.address.udpPort)
@ -265,7 +265,7 @@ proc run(d: DiscoveryProtocol) {.async.} =
while true:
discard await d.lookupRandom()
await sleepAsync(3000)
echo "Discovered nodes: ", d.kademlia.nodesDiscovered
trace "Discovered nodes", nodes = d.kademlia.nodesDiscovered
proc bootstrap*(d: DiscoveryProtocol) {.async.} =
await d.kademlia.bootstrap(d.bootstrapNodes)

View File

@ -47,7 +47,7 @@ proc delObserver*(p: PeerPool, observerId: ref) {.inline.} =
p.delObserver(cast[int](observerId))
proc stopAllPeers(p: PeerPool) {.async.} =
info "Stopping all peers ..."
debug "Stopping all peers ..."
# TODO: ...
# await asyncio.gather(
# *[peer.stop() for peer in self.connected_nodes.values()])
@ -60,14 +60,14 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
## Connect to the given remote and return a Peer instance when successful.
## Returns nil if the remote is unreachable, times out or is useless.
if remote in p.connectedNodes:
debug "skipping_connection_to_already_connected_peer", remote
trace "skipping_connection_to_already_connected_peer", remote
return nil
if remote in p.connectingNodes:
# debug "skipping connection"
return nil
debug "Connecting to node", remote
trace "Connecting to node", remote
p.connectingNodes.incl(remote)
result = await p.network.rlpxConnect(remote)
p.connectingNodes.excl(remote)
@ -114,10 +114,10 @@ proc addPeer*(pool: PeerPool, peer: Peer): bool =
proc connectToNode*(p: PeerPool, n: Node) {.async.} =
let peer = await p.connect(n)
if not peer.isNil:
info "Connection established", peer
trace "Connection established", peer
if not p.addPeer(peer):
# In case an incoming connection was added in the meanwhile
debug "Disconnecting peer (outgoing)", reason = AlreadyConnected
trace "Disconnecting peer (outgoing)", reason = AlreadyConnected
await peer.disconnect(AlreadyConnected)
proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
@ -161,7 +161,7 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
await p.connectToNode(p.getRandomBootnode())
proc run(p: PeerPool) {.async.} =
info "Running PeerPool..."
trace "Running PeerPool..."
p.running = true
while p.running:
var dropConnections = false

View File

@ -203,8 +203,13 @@ proc requestResolver[MsgType](msg: pointer, future: FutureBase) =
else:
doAssert false, "trying to resolve a timed out request with a value"
else:
try:
if not f.read.isSome:
doAssert false, "a request timed out twice"
except:
debug "Exception in requestResolver()",
exc = getCurrentException().name,
err = getCurrentExceptionMsg()
proc registerMsg(protocol: var ProtocolInfo,
id: int, name: string,
@ -287,6 +292,9 @@ proc sendMsg*(peer: Peer, data: Bytes) {.async.} =
discard await peer.transport.write(cipherText)
except:
await peer.disconnect(TcpError)
# this is usually a "(32) Broken pipe":
# FIXME: this exception should be caught somewhere in addMsgHandler() and
# sending should be retried a few times
raise
proc send*[Msg](peer: Peer, msg: Msg): Future[void] =
@ -352,7 +360,7 @@ proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
let oldestReq = outstandingReqs.popFirst
resolve oldestReq.future
else:
debug "late or duplicate reply for a RLPx request"
trace "late or duplicate reply for a RLPx request"
else:
# TODO: This is not completely sound because we are still using a global
# `reqId` sequence (the problem is that we might get a response ID that
@ -458,7 +466,7 @@ proc checkedRlpRead(peer: Peer, r: var Rlp, MsgType: type): auto {.inline.} =
return r.read(MsgType)
except:
# echo "Failed rlp.read:", tmp.inspect
error "Failed rlp.read",
debug "Failed rlp.read",
peer = peer,
msg = MsgType.name,
exception = getCurrentExceptionMsg()
@ -514,7 +522,7 @@ proc dispatchMessages*(peer: Peer) {.async.} =
try:
await peer.invokeThunk(msgId, msgData)
except RlpError:
error "endind dispatchMessages loop", peer, err = getCurrentExceptionMsg()
debug "ending dispatchMessages loop", peer, err = getCurrentExceptionMsg()
await peer.disconnect(BreachOfProtocol)
return
@ -1334,15 +1342,17 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} =
if e.reason != TooManyPeers:
debug "Unexpected disconnect during rlpxConnect", reason = e.reason
except TransportIncompleteError:
debug "Connection dropped in rlpxConnect", remote
trace "Connection dropped in rlpxConnect", remote
except UselessPeerError:
debug "Useless peer ", peer = remote
trace "Useless peer ", peer = remote
except RlpTypeMismatch:
# Some peers report capabilities with names longer than 3 chars. We ignore
# those for now. Maybe we should allow this though.
debug "Rlp error in rlpxConnect"
except TransportOsError:
trace "TransportOsError", err = getCurrentExceptionMsg()
except:
info "Exception in rlpxConnect", remote,
debug "Exception in rlpxConnect", remote,
exc = getCurrentException().name,
err = getCurrentExceptionMsg()
@ -1413,7 +1423,7 @@ proc rlpxAccept*(node: EthereumNode,
raise e
except:
let e = getCurrentException()
error "Exception in rlpxAccept",
debug "Exception in rlpxAccept",
err = getCurrentExceptionMsg(),
stackTrace = getCurrentException().getStackTrace()
transport.close()

View File

@ -54,7 +54,7 @@ p2pProtocol eth(version = protocolVersion,
let m = await peer.nextMsg(eth.status)
if m.networkId == network.networkId and m.genesisHash == chain.genesisHash:
debug "suitable peer", peer
trace "suitable peer", peer
else:
raise newException(UselessPeerError, "Eth handshake params mismatch")
peer.state.initialized = true