diff --git a/eth/p2p/blockchain_sync.nim b/eth/p2p/blockchain_sync.nim index 9ae4dae..6e4f7a6 100644 --- a/eth/p2p/blockchain_sync.nim +++ b/eth/p2p/blockchain_sync.nim @@ -43,37 +43,52 @@ proc endIndex(b: WantedBlocks): BlockNumber = result += (b.numBlocks - 1).toBlockNumber proc availableWorkItem(ctx: SyncContext): int = - var maxPendingBlock = ctx.finalizedBlock + var maxPendingBlock = ctx.finalizedBlock # the last downloaded & processed trace "queue len", length = ctx.workQueue.len result = -1 for i in 0 .. ctx.workQueue.high: case ctx.workQueue[i].state of Initial: + # When there is a work item at Initial state, immediatly use this one. + # This usually means a previous work item that failed somewhere in the + # process, and thus can be reused to work on. return i of Persisted: + # In case of Persisted, we can reset this work item to a new one. result = i + # No break here to give work items in Initial state priority and to + # calculate endBlock. else: discard + # Check all endBlocks of all workqueue items to decide on next range of + # blocks to collect & process. let endBlock = ctx.workQueue[i].endIndex if endBlock > maxPendingBlock: maxPendingBlock = endBlock let nextRequestedBlock = maxPendingBlock + 1 + # If this next block doesn't exist yet according to any of our peers, don't + # return a work item (and sync will be stopped). if nextRequestedBlock >= ctx.endBlockNumber: return -1 + # Increase queue when there are no free (Initial / Persisted) work items in + # the queue. At start, queue will be empty. if result == -1: result = ctx.workQueue.len ctx.workQueue.setLen(result + 1) + # Create new work item when queue was increased, reset when selected work item + # is at Persisted state. var numBlocks = (ctx.endBlockNumber - nextRequestedBlock).toInt if numBlocks > maxHeadersFetch: numBlocks = maxHeadersFetch ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial) -proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks) = - case ctx.chain.persistBlocks(wi.headers, wi.bodies) +proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks): ValidationResult = + result = ctx.chain.persistBlocks(wi.headers, wi.bodies) + case result of ValidationResult.OK: ctx.finalizedBlock = wi.endIndex wi.state = Persisted @@ -83,7 +98,7 @@ proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks) = wi.headers = @[] wi.bodies = @[] -proc persistPendingWorkItems(ctx: SyncContext) = +proc persistPendingWorkItems(ctx: SyncContext): (int, ValidationResult) = var nextStartIndex = ctx.finalizedBlock + 1 var keepRunning = true var hasOutOfOrderBlocks = false @@ -91,12 +106,18 @@ proc persistPendingWorkItems(ctx: SyncContext) = while keepRunning: keepRunning = false hasOutOfOrderBlocks = false + # Go over the full work queue and check for every work item if it is in + # Received state and has the next blocks in line to be processed. for i in 0 ..< ctx.workQueue.len: let start = ctx.workQueue[i].startIndex + # There should be at least 1 like this, namely the just received work item + # that initiated this call. if ctx.workQueue[i].state == Received: if start == nextStartIndex: - trace "Persisting pending work item", start - ctx.persistWorkItem(ctx.workQueue[i]) + trace "Processing pending work item", number = start + result = (i, ctx.persistWorkItem(ctx.workQueue[i])) + # TODO: We can stop here on failure, but have to set + # hasOutofORderBlocks. Is this always valid? nextStartIndex = ctx.finalizedBlock + 1 keepRunning = true break @@ -122,9 +143,20 @@ proc returnWorkItem(ctx: SyncContext, workItem: int): ValidationResult = ctx.hasOutOfOrderBlocks = true if ctx.hasOutOfOrderBlocks: - ctx.persistPendingWorkItems() + let (index, validation) = ctx.persistPendingWorkItems() + # Only report an error if it was this peer's work item that failed + if validation == ValidationResult.Error and index == workitem: + result = ValidationResult.Error + # TODO: What about failures on other peers' work items? + # In that case the peer will probably get disconnected on future erroneous + # work items, but before this occurs, several more blocks (that will fail) + # might get downloaded from this peer. This will delay the sync and this + # should be improved. else: - ctx.persistWorkItem(wi[]) + trace "Processing work item", number = wi.startIndex + # Validation result needs to be returned so that higher up can be decided + # to disconnect from this peer in case of error. + result = ctx.persistWorkItem(wi[]) else: trace "Work item complete but we got fewer blocks than requested, so we're ditching the whole thing.", start, @@ -177,7 +209,8 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} = peer.connectionState notin {Disconnecting, Disconnected}): template workItem: auto = syncCtx.workQueue[workItemIdx] workItem.state = Requested - trace "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks, peer + trace "Requesting block headers", start = workItem.startIndex, + count = workItem.numBlocks, peer = peer.remote.node let request = BlocksRequest( startBlock: HashOrNum(isHash: false, number: workItem.startIndex), maxResults: workItem.numBlocks,