From e64ab468ae7ab60426d40f159d15948bdd5e5075 Mon Sep 17 00:00:00 2001 From: Yuriy Glukhov Date: Tue, 2 Oct 2018 14:40:08 +0300 Subject: [PATCH] Blocks ordering --- eth_p2p/discovery.nim | 14 ++++++-- eth_p2p/kademlia.nim | 3 ++ eth_p2p/rlpx_protocols/eth.nim | 64 ++++++++++++++++++++++++++++------ 3 files changed, 67 insertions(+), 14 deletions(-) diff --git a/eth_p2p/discovery.nim b/eth_p2p/discovery.nim index 8a3503c..4d0d017 100644 --- a/eth_p2p/discovery.nim +++ b/eth_p2p/discovery.nim @@ -258,15 +258,23 @@ proc open*(d: DiscoveryProtocol) = let ta = initTAddress(d.address.ip, d.address.udpPort) d.transp = newDatagramTransport(processClient, udata = d, local = ta) +proc lookupRandom*(d: DiscoveryProtocol): Future[seq[Node]] {.inline.} = + d.kademlia.lookupRandom() + +proc run(d: DiscoveryProtocol) {.async.} = + while true: + discard await d.lookupRandom() + await sleepAsync(3000) + echo "Discovered nodes: ", d.kademlia.nodesDiscovered + proc bootstrap*(d: DiscoveryProtocol) {.async.} = await d.kademlia.bootstrap(d.bootstrapNodes) + discard d.run() + proc resolve*(d: DiscoveryProtocol, n: NodeId): Future[Node] = d.kademlia.resolve(n) -proc lookupRandom*(d: DiscoveryProtocol): Future[seq[Node]] {.inline.} = - d.kademlia.lookupRandom() - proc randomNodes*(d: DiscoveryProtocol, count: int): seq[Node] {.inline.} = d.kademlia.randomNodes(count) diff --git a/eth_p2p/kademlia.nim b/eth_p2p/kademlia.nim index 3d8685d..9aeb1d7 100644 --- a/eth_p2p/kademlia.nim +++ b/eth_p2p/kademlia.nim @@ -67,6 +67,7 @@ proc newNode*(enode: ENode): Node = proc distanceTo(n: Node, id: NodeId): UInt256 = n.id xor id proc `$`*(n: Node): string = + # "Node[" & $n.node & "]" "Node[" & $n.node.address.ip & ":" & $n.node.address.udpPort & "]" proc hash*(n: Node): hashes.Hash = hash(n.node.pubkey.data) @@ -479,6 +480,8 @@ proc randomNodes*(k: KademliaProtocol, count: int): seq[Node] = result.add(node) seen.incl(node) +proc nodesDiscovered*(k: KademliaProtocol): int {.inline.} = k.routing.len + when isMainModule: proc randomNode(): Node = newNode("enode://aa36fdf33dd030378a0168efe6ed7d5cc587fafa3cdd375854fe735a2e11ea3650ba29644e2db48368c46e1f60e716300ba49396cd63778bf8a818c09bded46f@13.93.211.84:30303") diff --git a/eth_p2p/rlpx_protocols/eth.nim b/eth_p2p/rlpx_protocols/eth.nim index 1f0dbf6..9843803 100644 --- a/eth_p2p/rlpx_protocols/eth.nim +++ b/eth_p2p/rlpx_protocols/eth.nim @@ -162,7 +162,8 @@ type WantedBlocksState = enum Initial, Requested, - Received + Received, + Persisted WantedBlocks = object startIndex: BlockNumber @@ -178,6 +179,7 @@ type chain: AbstractChainDB peerPool: PeerPool trustedPeers: HashSet[Peer] + hasOutOfOrderBlocks: bool proc endIndex(b: WantedBlocks): BlockNumber = result = b.startIndex @@ -185,12 +187,13 @@ proc endIndex(b: WantedBlocks): BlockNumber = proc availableWorkItem(ctx: SyncContext): int = var maxPendingBlock = ctx.finalizedBlock + echo "queue len: ", ctx.workQueue.len result = -1 for i in 0 .. ctx.workQueue.high: case ctx.workQueue[i].state of Initial: return i - of Received: + of Persisted: result = i else: discard @@ -211,23 +214,58 @@ proc availableWorkItem(ctx: SyncContext): int = numBlocks = maxHeadersFetch ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial) +proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks) = + ctx.chain.persistBlocks(wi.headers, wi.bodies) + wi.headers.setLen(0) + wi.bodies.setLen(0) + ctx.finalizedBlock = wi.endIndex + wi.state = Persisted + +proc persistPendingWorkItems(ctx: SyncContext) = + var nextStartIndex = ctx.finalizedBlock + 1 + var keepRunning = true + var hasOutOfOrderBlocks = false + debug "Looking for out of order blocks" + while keepRunning: + keepRunning = false + hasOutOfOrderBlocks = false + for i in 0 ..< ctx.workQueue.len: + let start = ctx.workQueue[i].startIndex + if ctx.workQueue[i].state == Received: + if start == nextStartIndex: + debug "Persisting pending work item", start + ctx.persistWorkItem(ctx.workQueue[i]) + nextStartIndex = ctx.finalizedBlock + 1 + keepRunning = true + break + else: + hasOutOfOrderBlocks = true + + ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks + proc returnWorkItem(ctx: SyncContext, workItem: int) = let wi = addr ctx.workQueue[workItem] let askedBlocks = wi.numBlocks.int let receivedBlocks = wi.headers.len + let start = wi.startIndex if askedBlocks == receivedBlocks: - debug "Work item complete", startBlock = wi.startIndex, + debug "Work item complete", start, askedBlocks, receivedBlocks else: - warn "Work item complete", startBlock = wi.startIndex, + warn "Work item complete", start, askedBlocks, receivedBlocks - ctx.chain.persistBlocks(wi.headers, wi.bodies) - wi.headers.setLen(0) - wi.bodies.setLen(0) + if wi.startIndex != ctx.finalizedBlock + 1: + info "Blocks out of order", start, final = ctx.finalizedBlock + ctx.hasOutOfOrderBlocks = true + else: + info "Persisting blocks", start + ctx.persistWorkItem(wi[]) + if ctx.hasOutOfOrderBlocks: + ctx.persistPendingWorkItems() proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext = new result @@ -276,7 +314,6 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} = try: let results = await peer.getBlockHeaders(request) if results.isSome: - workItem.state = Received shallowCopy(workItem.headers, results.get.headers) var bodies = newSeq[BlockBody]() @@ -292,8 +329,11 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} = let b = await peer.getBlockBodies(hashes) bodies.add(b.get.blocks) - shallowCopy(workItem.bodies, bodies) - dataReceived = true + if bodies.len == workItem.headers.len: + shallowCopy(workItem.bodies, bodies) + dataReceived = true + else: + warn "Bodies len != headers.len", bodies = bodies.len, headers = workItem.headers.len except: # the success case uses `continue`, so we can just fall back to the # failure path below. If we signal time-outs with exceptions such @@ -301,8 +341,10 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} = discard if dataReceived: + workItem.state = Received syncCtx.returnWorkItem workItemIdx else: + workItem.state = Initial try: await peer.disconnect(SubprotocolReason) except: @@ -310,7 +352,7 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} = syncCtx.handleLostPeer() break - debug "Nothing to sync" + debug "Fininshed otaining blocks", peer proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} = # Returns true if one of the peers acknowledges existense of the best block