Disconnect on failed persistWorkItem + add comments

This commit is contained in:
kdeme 2019-07-29 15:13:52 +02:00 committed by zah
parent 0d282dd9d6
commit aef7522788
1 changed files with 42 additions and 9 deletions

View File

@ -43,37 +43,52 @@ proc endIndex(b: WantedBlocks): BlockNumber =
result += (b.numBlocks - 1).toBlockNumber result += (b.numBlocks - 1).toBlockNumber
proc availableWorkItem(ctx: SyncContext): int = proc availableWorkItem(ctx: SyncContext): int =
var maxPendingBlock = ctx.finalizedBlock var maxPendingBlock = ctx.finalizedBlock # the last downloaded & processed
trace "queue len", length = ctx.workQueue.len trace "queue len", length = ctx.workQueue.len
result = -1 result = -1
for i in 0 .. ctx.workQueue.high: for i in 0 .. ctx.workQueue.high:
case ctx.workQueue[i].state case ctx.workQueue[i].state
of Initial: of Initial:
# When there is a work item at Initial state, immediatly use this one.
# This usually means a previous work item that failed somewhere in the
# process, and thus can be reused to work on.
return i return i
of Persisted: of Persisted:
# In case of Persisted, we can reset this work item to a new one.
result = i result = i
# No break here to give work items in Initial state priority and to
# calculate endBlock.
else: else:
discard discard
# Check all endBlocks of all workqueue items to decide on next range of
# blocks to collect & process.
let endBlock = ctx.workQueue[i].endIndex let endBlock = ctx.workQueue[i].endIndex
if endBlock > maxPendingBlock: if endBlock > maxPendingBlock:
maxPendingBlock = endBlock maxPendingBlock = endBlock
let nextRequestedBlock = maxPendingBlock + 1 let nextRequestedBlock = maxPendingBlock + 1
# If this next block doesn't exist yet according to any of our peers, don't
# return a work item (and sync will be stopped).
if nextRequestedBlock >= ctx.endBlockNumber: if nextRequestedBlock >= ctx.endBlockNumber:
return -1 return -1
# Increase queue when there are no free (Initial / Persisted) work items in
# the queue. At start, queue will be empty.
if result == -1: if result == -1:
result = ctx.workQueue.len result = ctx.workQueue.len
ctx.workQueue.setLen(result + 1) ctx.workQueue.setLen(result + 1)
# Create new work item when queue was increased, reset when selected work item
# is at Persisted state.
var numBlocks = (ctx.endBlockNumber - nextRequestedBlock).toInt var numBlocks = (ctx.endBlockNumber - nextRequestedBlock).toInt
if numBlocks > maxHeadersFetch: if numBlocks > maxHeadersFetch:
numBlocks = maxHeadersFetch numBlocks = maxHeadersFetch
ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial) ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial)
proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks) = proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks): ValidationResult =
case ctx.chain.persistBlocks(wi.headers, wi.bodies) result = ctx.chain.persistBlocks(wi.headers, wi.bodies)
case result
of ValidationResult.OK: of ValidationResult.OK:
ctx.finalizedBlock = wi.endIndex ctx.finalizedBlock = wi.endIndex
wi.state = Persisted wi.state = Persisted
@ -83,7 +98,7 @@ proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks) =
wi.headers = @[] wi.headers = @[]
wi.bodies = @[] wi.bodies = @[]
proc persistPendingWorkItems(ctx: SyncContext) = proc persistPendingWorkItems(ctx: SyncContext): (int, ValidationResult) =
var nextStartIndex = ctx.finalizedBlock + 1 var nextStartIndex = ctx.finalizedBlock + 1
var keepRunning = true var keepRunning = true
var hasOutOfOrderBlocks = false var hasOutOfOrderBlocks = false
@ -91,12 +106,18 @@ proc persistPendingWorkItems(ctx: SyncContext) =
while keepRunning: while keepRunning:
keepRunning = false keepRunning = false
hasOutOfOrderBlocks = false hasOutOfOrderBlocks = false
# Go over the full work queue and check for every work item if it is in
# Received state and has the next blocks in line to be processed.
for i in 0 ..< ctx.workQueue.len: for i in 0 ..< ctx.workQueue.len:
let start = ctx.workQueue[i].startIndex let start = ctx.workQueue[i].startIndex
# There should be at least 1 like this, namely the just received work item
# that initiated this call.
if ctx.workQueue[i].state == Received: if ctx.workQueue[i].state == Received:
if start == nextStartIndex: if start == nextStartIndex:
trace "Persisting pending work item", start trace "Processing pending work item", number = start
ctx.persistWorkItem(ctx.workQueue[i]) result = (i, ctx.persistWorkItem(ctx.workQueue[i]))
# TODO: We can stop here on failure, but have to set
# hasOutofORderBlocks. Is this always valid?
nextStartIndex = ctx.finalizedBlock + 1 nextStartIndex = ctx.finalizedBlock + 1
keepRunning = true keepRunning = true
break break
@ -122,9 +143,20 @@ proc returnWorkItem(ctx: SyncContext, workItem: int): ValidationResult =
ctx.hasOutOfOrderBlocks = true ctx.hasOutOfOrderBlocks = true
if ctx.hasOutOfOrderBlocks: if ctx.hasOutOfOrderBlocks:
ctx.persistPendingWorkItems() let (index, validation) = ctx.persistPendingWorkItems()
# Only report an error if it was this peer's work item that failed
if validation == ValidationResult.Error and index == workitem:
result = ValidationResult.Error
# TODO: What about failures on other peers' work items?
# In that case the peer will probably get disconnected on future erroneous
# work items, but before this occurs, several more blocks (that will fail)
# might get downloaded from this peer. This will delay the sync and this
# should be improved.
else: else:
ctx.persistWorkItem(wi[]) trace "Processing work item", number = wi.startIndex
# Validation result needs to be returned so that higher up can be decided
# to disconnect from this peer in case of error.
result = ctx.persistWorkItem(wi[])
else: else:
trace "Work item complete but we got fewer blocks than requested, so we're ditching the whole thing.", trace "Work item complete but we got fewer blocks than requested, so we're ditching the whole thing.",
start, start,
@ -177,7 +209,8 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
peer.connectionState notin {Disconnecting, Disconnected}): peer.connectionState notin {Disconnecting, Disconnected}):
template workItem: auto = syncCtx.workQueue[workItemIdx] template workItem: auto = syncCtx.workQueue[workItemIdx]
workItem.state = Requested workItem.state = Requested
trace "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks, peer trace "Requesting block headers", start = workItem.startIndex,
count = workItem.numBlocks, peer = peer.remote.node
let request = BlocksRequest( let request = BlocksRequest(
startBlock: HashOrNum(isHash: false, number: workItem.startIndex), startBlock: HashOrNum(isHash: false, number: workItem.startIndex),
maxResults: workItem.numBlocks, maxResults: workItem.numBlocks,