From be476feeec9440b514cab1ef4cbfdd20204d4c47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C8=98tefan=20Talpalaru?= Date: Fri, 14 Dec 2018 13:54:43 +0100 Subject: [PATCH] better error handling - changed the logging level of some messages that should not appear by default in a debug build - most errors in persistWorkItem() are gracefully recovered from - fixed the handling of out of order blocks - dropped work items with fewer blocks than what we requested - getBestBlockNumber(): log and allow exceptions from getBlockHeaders() - obtainBlocksFromPeer(): moved here the check for block numbers being in sequence (from nimbus/p2p/chain.nim) - sendMsg(): log and catch exceptions in callers --- eth_p2p.nim | 6 +- eth_p2p/blockchain_sync.nim | 119 +++++++++++++++--------- eth_p2p/discovery.nim | 24 ++--- eth_p2p/peer_pool.nim | 12 +-- eth_p2p/rlpx.nim | 28 ++++-- eth_p2p/rlpx_protocols/eth_protocol.nim | 2 +- 6 files changed, 116 insertions(+), 75 deletions(-) diff --git a/eth_p2p.nim b/eth_p2p.nim index 919bb0d..621e7f3 100644 --- a/eth_p2p.nim +++ b/eth_p2p.nim @@ -74,7 +74,7 @@ proc processIncoming(server: StreamServer, remote.close() proc startListening*(node: EthereumNode) = - info "RLPx listener up", self = initENode(node.keys.pubKey, node.address) + trace "RLPx listener up", self = initENode(node.keys.pubKey, node.address) let ta = initTAddress(node.address.ip, node.address.tcpPort) if node.listeningServer == nil: node.listeningServer = createStreamServer(ta, processIncoming, @@ -107,12 +107,12 @@ proc connectToNetwork*(node: EthereumNode, node.discovery.open() await node.discovery.bootstrap() else: - info "Disovery disabled" + info "Discovery disabled" node.peerPool.start() while node.peerPool.connectedNodes.len == 0: - debug "Waiting for more peers", peers = node.peerPool.connectedNodes.len + trace "Waiting for more peers", peers = node.peerPool.connectedNodes.len await sleepAsync(500) proc stopListening*(node: EthereumNode) = diff --git a/eth_p2p/blockchain_sync.nim b/eth_p2p/blockchain_sync.nim index 4d0f30a..3688b15 100644 --- a/eth_p2p/blockchain_sync.nim +++ b/eth_p2p/blockchain_sync.nim @@ -44,7 +44,7 @@ proc endIndex(b: WantedBlocks): BlockNumber = proc availableWorkItem(ctx: SyncContext): int = var maxPendingBlock = ctx.finalizedBlock - echo "queue len: ", ctx.workQueue.len + trace "queue len", length = ctx.workQueue.len result = -1 for i in 0 .. ctx.workQueue.high: case ctx.workQueue[i].state @@ -72,17 +72,21 @@ proc availableWorkItem(ctx: SyncContext): int = ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial) proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks) = - ctx.chain.persistBlocks(wi.headers, wi.bodies) + case ctx.chain.persistBlocks(wi.headers, wi.bodies) + of ValidationResult.OK: + ctx.finalizedBlock = wi.endIndex + wi.state = Persisted + of ValidationResult.Error: + wi.state = Initial + # successful or not, we're done with these blocks wi.headers.setLen(0) wi.bodies.setLen(0) - ctx.finalizedBlock = wi.endIndex - wi.state = Persisted proc persistPendingWorkItems(ctx: SyncContext) = var nextStartIndex = ctx.finalizedBlock + 1 var keepRunning = true var hasOutOfOrderBlocks = false - debug "Looking for out of order blocks" + trace "Looking for out of order blocks" while keepRunning: keepRunning = false hasOutOfOrderBlocks = false @@ -90,7 +94,7 @@ proc persistPendingWorkItems(ctx: SyncContext) = let start = ctx.workQueue[i].startIndex if ctx.workQueue[i].state == Received: if start == nextStartIndex: - debug "Persisting pending work item", start + trace "Persisting pending work item", start ctx.persistWorkItem(ctx.workQueue[i]) nextStartIndex = ctx.finalizedBlock + 1 keepRunning = true @@ -100,29 +104,32 @@ proc persistPendingWorkItems(ctx: SyncContext) = ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks -proc returnWorkItem(ctx: SyncContext, workItem: int) = +proc returnWorkItem(ctx: SyncContext, workItem: int): ValidationResult = let wi = addr ctx.workQueue[workItem] let askedBlocks = wi.numBlocks.int let receivedBlocks = wi.headers.len let start = wi.startIndex if askedBlocks == receivedBlocks: - debug "Work item complete", start, - askedBlocks, - receivedBlocks - else: - warn "Work item complete", start, - askedBlocks, - receivedBlocks + trace "Work item complete", + start, + askedBlocks, + receivedBlocks + + if wi.startIndex != ctx.finalizedBlock + 1: + trace "Blocks out of order", start, final = ctx.finalizedBlock + ctx.hasOutOfOrderBlocks = true - if wi.startIndex != ctx.finalizedBlock + 1: - info "Blocks out of order", start, final = ctx.finalizedBlock - ctx.hasOutOfOrderBlocks = true - else: - info "Persisting blocks", start - ctx.persistWorkItem(wi[]) if ctx.hasOutOfOrderBlocks: ctx.persistPendingWorkItems() + else: + ctx.persistWorkItem(wi[]) + else: + trace "Work item complete but we got fewer blocks than requested, so we're ditching the whole thing.", + start, + askedBlocks, + receivedBlocks + return ValidationResult.Error proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext = new result @@ -151,18 +158,24 @@ proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} = proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} = # Update our best block number - let bestBlockNumber = await peer.getBestBlockNumber() - if bestBlockNumber > syncCtx.endBlockNumber: - info "New sync end block number", number = bestBlockNumber - syncCtx.endBlockNumber = bestBlockNumber + try: + let bestBlockNumber = await peer.getBestBlockNumber() + if bestBlockNumber > syncCtx.endBlockNumber: + trace "New sync end block number", number = bestBlockNumber + syncCtx.endBlockNumber = bestBlockNumber + except: + debug "Exception in getBestBlockNumber()", + exc = getCurrentException().name, + err = getCurrentExceptionMsg() + # no need to exit here, because the context might still have blocks to fetch + # from this peer while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1): template workItem: auto = syncCtx.workQueue[workItemIdx] workItem.state = Requested - debug "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks, peer + trace "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks, peer let request = BlocksRequest( - startBlock: HashOrNum(isHash: false, - number: workItem.startIndex), + startBlock: HashOrNum(isHash: false, number: workItem.startIndex), maxResults: workItem.numBlocks, skip: 0, reverse: false) @@ -175,7 +188,12 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} = var bodies = newSeq[BlockBody]() var hashes = newSeq[KeccakHash]() + var nextIndex = workItem.startIndex for i in workItem.headers: + if i.blockNumber != nextIndex: + raise newException(Exception, "The block numbers are not in sequence. Not processing this workItem.") + else: + nextIndex = nextIndex + 1 hashes.add(blockHash(i)) if hashes.len == maxBodiesFetch: let b = await peer.getBlockBodies(hashes) @@ -192,15 +210,23 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} = else: warn "Bodies len != headers.len", bodies = bodies.len, headers = workItem.headers.len except: - # the success case uses `continue`, so we can just fall back to the + # the success case sets `dataReceived`, so we can just fall back to the # failure path below. If we signal time-outs with exceptions such # failures will be easier to handle. - discard + debug "Exception in obtainBlocksFromPeer()", + exc = getCurrentException().name, + err = getCurrentExceptionMsg() + + var giveUpOnPeer = false if dataReceived: workItem.state = Received - syncCtx.returnWorkItem workItemIdx + if syncCtx.returnWorkItem(workItemIdx) != ValidationResult.OK: + giveUpOnPeer = true else: + giveUpOnPeer = true + + if giveUpOnPeer: workItem.state = Initial try: await peer.disconnect(SubprotocolReason) @@ -209,10 +235,10 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} = syncCtx.handleLostPeer() break - debug "Fininshed otaining blocks", peer + trace "Finished obtaining blocks", peer proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} = - # Returns true if one of the peers acknowledges existense of the best block + # Returns true if one of the peers acknowledges existence of the best block # of another peer. var a = a @@ -240,7 +266,7 @@ proc randomTrustedPeer(ctx: SyncContext): Peer = inc i proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} = - debug "start sync ", peer, trustedPeers = ctx.trustedPeers.len + trace "start sync", peer, trustedPeers = ctx.trustedPeers.len if ctx.trustedPeers.len >= minPeersToStartSync: # We have enough trusted peers. Validate new peer against trusted if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()): @@ -249,7 +275,7 @@ proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} = elif ctx.trustedPeers.len == 0: # Assume the peer is trusted, but don't start sync until we reevaluate # it with more peers - debug "Assume trusted peer", peer + trace "Assume trusted peer", peer ctx.trustedPeers.incl(peer) else: # At this point we have some "trusted" candidates, but they are not @@ -271,13 +297,13 @@ proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} = let disagreeScore = ctx.trustedPeers.len - agreeScore if agreeScore == ctx.trustedPeers.len: - ctx.trustedPeers.incl(peer) # The best possible outsome + ctx.trustedPeers.incl(peer) # The best possible outcome elif disagreeScore == 1: - info "Peer is no more trusted for sync", peer + trace "Peer is no longer trusted for sync", peer ctx.trustedPeers.excl(disagreedPeer) ctx.trustedPeers.incl(peer) else: - info "Peer not trusted for sync", peer + trace "Peer not trusted for sync", peer if ctx.trustedPeers.len == minPeersToStartSync: for p in ctx.trustedPeers: @@ -285,15 +311,20 @@ proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} = proc onPeerConnected(ctx: SyncContext, peer: Peer) = - debug "New candidate for sync", peer - discard - let f = ctx.startSyncWithPeer(peer) - f.callback = proc(data: pointer) {.gcsafe.} = - if f.failed: - error "startSyncWithPeer failed", msg = f.readError.msg, peer + trace "New candidate for sync", peer + try: + let f = ctx.startSyncWithPeer(peer) + f.callback = proc(data: pointer) {.gcsafe.} = + if f.failed: + error "startSyncWithPeer failed", msg = f.readError.msg, peer + except: + debug "Exception in startSyncWithPeer()", + exc = getCurrentException().name, + err = getCurrentExceptionMsg() + proc onPeerDisconnected(ctx: SyncContext, p: Peer) = - debug "peer disconnected ", peer = p + trace "peer disconnected ", peer = p ctx.trustedPeers.excl(p) proc startSync(ctx: SyncContext) = diff --git a/eth_p2p/discovery.nim b/eth_p2p/discovery.nim index 4d0d017..210461f 100644 --- a/eth_p2p/discovery.nim +++ b/eth_p2p/discovery.nim @@ -10,7 +10,7 @@ from strutils import nil import times, algorithm, logging -import asyncdispatch2, eth_keys, ranges, stint, nimcrypto, rlp +import asyncdispatch2, eth_keys, ranges, stint, nimcrypto, rlp, chronicles import kademlia, enode export Node @@ -100,22 +100,22 @@ proc expiration(): uint32 = proc send(d: DiscoveryProtocol, n: Node, data: seq[byte]) = let ta = initTAddress(n.node.address.ip, n.node.address.udpPort) let f = d.transp.sendTo(ta, data) - f.callback = proc(data: pointer) = + f.callback = proc(data: pointer) {.gcsafe.} = if f.failed: - error "Discovery send failed: ", f.readError.msg + debug "Discovery send failed", msg = f.readError.msg proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] = let payload = rlp.encode((PROTO_VERSION, d.address, n.node.address, expiration())).toRange let msg = pack(cmdPing, payload, d.privKey) result = msg[0 ..< MAC_SIZE] - debug ">>> ping ", n + trace ">>> ping ", n d.send(n, msg) proc sendPong*(d: DiscoveryProtocol, n: Node, token: MDigest[256]) = let payload = rlp.encode((n.node.address, token, expiration())).toRange let msg = pack(cmdPong, payload, d.privKey) - debug ">>> pong ", n + trace ">>> pong ", n d.send(n, msg) proc sendFindNode*(d: DiscoveryProtocol, n: Node, targetNodeId: NodeId) = @@ -123,7 +123,7 @@ proc sendFindNode*(d: DiscoveryProtocol, n: Node, targetNodeId: NodeId) = data[32 .. ^1] = targetNodeId.toByteArrayBE() let payload = rlp.encode((data, expiration())).toRange let msg = pack(cmdFindNode, payload, d.privKey) - debug ">>> find_node to ", n#, ": ", msg.toHex() + trace ">>> find_node to ", n#, ": ", msg.toHex() d.send(n, msg) proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) = @@ -136,7 +136,7 @@ proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) = block: let payload = rlp.encode((nodes, expiration())).toRange let msg = pack(cmdNeighbours, payload, d.privkey) - debug ">>> neighbours to ", node, ": ", nodes + trace "Neighbours to", node, nodes d.send(node, msg) nodes.setLen(0) @@ -202,7 +202,7 @@ proc recvNeighbours(d: DiscoveryProtocol, node: Node, proc recvFindNode(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} = let rlp = rlpFromBytes(payload.toRange) - debug "<<< find_node from ", node + trace "<<< find_node from ", node let rng = rlp.listElem(0).toBytes let nodeId = readUIntBE[256](rng[32 .. ^1].toOpenArray()) d.kademlia.recvFindNode(node, nodeId) @@ -232,9 +232,9 @@ proc receive(d: DiscoveryProtocol, a: Address, msg: Bytes) = of cmdFindNode: d.recvFindNode(node, payload) else: - echo "Unknown command: ", cmdId + debug "Unknown command", cmdId else: - debug "Received msg ", cmdId, " from ", a, " already expired" + trace "Received msg already expired", cmdId, a else: error "Wrong public key from ", a else: @@ -252,7 +252,7 @@ proc processClient(transp: DatagramTransport, let a = Address(ip: raddr.address, udpPort: raddr.port, tcpPort: raddr.port) proto.receive(a, buf) except: - error "receive failed: ", getCurrentExceptionMsg() + debug "receive failed", exception = getCurrentExceptionMsg() proc open*(d: DiscoveryProtocol) = let ta = initTAddress(d.address.ip, d.address.udpPort) @@ -265,7 +265,7 @@ proc run(d: DiscoveryProtocol) {.async.} = while true: discard await d.lookupRandom() await sleepAsync(3000) - echo "Discovered nodes: ", d.kademlia.nodesDiscovered + trace "Discovered nodes", nodes = d.kademlia.nodesDiscovered proc bootstrap*(d: DiscoveryProtocol) {.async.} = await d.kademlia.bootstrap(d.bootstrapNodes) diff --git a/eth_p2p/peer_pool.nim b/eth_p2p/peer_pool.nim index 3fec987..d597b19 100644 --- a/eth_p2p/peer_pool.nim +++ b/eth_p2p/peer_pool.nim @@ -47,7 +47,7 @@ proc delObserver*(p: PeerPool, observerId: ref) {.inline.} = p.delObserver(cast[int](observerId)) proc stopAllPeers(p: PeerPool) {.async.} = - info "Stopping all peers ..." + debug "Stopping all peers ..." # TODO: ... # await asyncio.gather( # *[peer.stop() for peer in self.connected_nodes.values()]) @@ -60,14 +60,14 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} = ## Connect to the given remote and return a Peer instance when successful. ## Returns nil if the remote is unreachable, times out or is useless. if remote in p.connectedNodes: - debug "skipping_connection_to_already_connected_peer", remote + trace "skipping_connection_to_already_connected_peer", remote return nil if remote in p.connectingNodes: # debug "skipping connection" return nil - debug "Connecting to node", remote + trace "Connecting to node", remote p.connectingNodes.incl(remote) result = await p.network.rlpxConnect(remote) p.connectingNodes.excl(remote) @@ -114,10 +114,10 @@ proc addPeer*(pool: PeerPool, peer: Peer): bool = proc connectToNode*(p: PeerPool, n: Node) {.async.} = let peer = await p.connect(n) if not peer.isNil: - info "Connection established", peer + trace "Connection established", peer if not p.addPeer(peer): # In case an incoming connection was added in the meanwhile - debug "Disconnecting peer (outgoing)", reason = AlreadyConnected + trace "Disconnecting peer (outgoing)", reason = AlreadyConnected await peer.disconnect(AlreadyConnected) proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} = @@ -161,7 +161,7 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} = await p.connectToNode(p.getRandomBootnode()) proc run(p: PeerPool) {.async.} = - info "Running PeerPool..." + trace "Running PeerPool..." p.running = true while p.running: var dropConnections = false diff --git a/eth_p2p/rlpx.nim b/eth_p2p/rlpx.nim index 539c380..6d38c2f 100644 --- a/eth_p2p/rlpx.nim +++ b/eth_p2p/rlpx.nim @@ -203,8 +203,13 @@ proc requestResolver[MsgType](msg: pointer, future: FutureBase) = else: doAssert false, "trying to resolve a timed out request with a value" else: - if not f.read.isSome: - doAssert false, "a request timed out twice" + try: + if not f.read.isSome: + doAssert false, "a request timed out twice" + except: + debug "Exception in requestResolver()", + exc = getCurrentException().name, + err = getCurrentExceptionMsg() proc registerMsg(protocol: var ProtocolInfo, id: int, name: string, @@ -287,6 +292,9 @@ proc sendMsg*(peer: Peer, data: Bytes) {.async.} = discard await peer.transport.write(cipherText) except: await peer.disconnect(TcpError) + # this is usually a "(32) Broken pipe": + # FIXME: this exception should be caught somewhere in addMsgHandler() and + # sending should be retried a few times raise proc send*[Msg](peer: Peer, msg: Msg): Future[void] = @@ -352,7 +360,7 @@ proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) = let oldestReq = outstandingReqs.popFirst resolve oldestReq.future else: - debug "late or duplicate reply for a RLPx request" + trace "late or duplicate reply for a RLPx request" else: # TODO: This is not completely sound because we are still using a global # `reqId` sequence (the problem is that we might get a response ID that @@ -458,7 +466,7 @@ proc checkedRlpRead(peer: Peer, r: var Rlp, MsgType: type): auto {.inline.} = return r.read(MsgType) except: # echo "Failed rlp.read:", tmp.inspect - error "Failed rlp.read", + debug "Failed rlp.read", peer = peer, msg = MsgType.name, exception = getCurrentExceptionMsg() @@ -514,7 +522,7 @@ proc dispatchMessages*(peer: Peer) {.async.} = try: await peer.invokeThunk(msgId, msgData) except RlpError: - error "endind dispatchMessages loop", peer, err = getCurrentExceptionMsg() + debug "ending dispatchMessages loop", peer, err = getCurrentExceptionMsg() await peer.disconnect(BreachOfProtocol) return @@ -1334,15 +1342,17 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} = if e.reason != TooManyPeers: debug "Unexpected disconnect during rlpxConnect", reason = e.reason except TransportIncompleteError: - debug "Connection dropped in rlpxConnect", remote + trace "Connection dropped in rlpxConnect", remote except UselessPeerError: - debug "Useless peer ", peer = remote + trace "Useless peer ", peer = remote except RlpTypeMismatch: # Some peers report capabilities with names longer than 3 chars. We ignore # those for now. Maybe we should allow this though. debug "Rlp error in rlpxConnect" + except TransportOsError: + trace "TransportOsError", err = getCurrentExceptionMsg() except: - info "Exception in rlpxConnect", remote, + debug "Exception in rlpxConnect", remote, exc = getCurrentException().name, err = getCurrentExceptionMsg() @@ -1413,7 +1423,7 @@ proc rlpxAccept*(node: EthereumNode, raise e except: let e = getCurrentException() - error "Exception in rlpxAccept", + debug "Exception in rlpxAccept", err = getCurrentExceptionMsg(), stackTrace = getCurrentException().getStackTrace() transport.close() diff --git a/eth_p2p/rlpx_protocols/eth_protocol.nim b/eth_p2p/rlpx_protocols/eth_protocol.nim index c19bae7..96c4afa 100644 --- a/eth_p2p/rlpx_protocols/eth_protocol.nim +++ b/eth_p2p/rlpx_protocols/eth_protocol.nim @@ -54,7 +54,7 @@ p2pProtocol eth(version = protocolVersion, let m = await peer.nextMsg(eth.status) if m.networkId == network.networkId and m.genesisHash == chain.genesisHash: - debug "suitable peer", peer + trace "suitable peer", peer else: raise newException(UselessPeerError, "Eth handshake params mismatch") peer.state.initialized = true