Download block bodies

This commit is contained in:
Yuriy Glukhov 2018-08-29 11:44:17 +03:00 committed by zah
parent 2c3a183445
commit 0eda6bc91a

View File

@ -53,8 +53,8 @@ rlpxProtocol eth, protocolVersion:
await peer.status(protocolVersion,
network.networkId,
deref(bestBlock).difficulty,
deref(bestBlock).blockHash,
bestBlock.difficulty,
bestBlock.blockHash,
chain.genesisHash)
let m = await peer.waitSingleMsg(eth.status)
@ -92,15 +92,15 @@ rlpxProtocol eth, protocolVersion:
var headers = newSeqOfCap[BlockHeader](request.maxResults)
let chain = peer.network.chain
var foundBlock = chain.getBlockHeader(request.startBlock)
var foundBlock: BlockHeader
if not foundBlock.isNil:
headers.add deref(foundBlock)
if chain.getBlockHeader(request.startBlock, foundBlock):
headers.add foundBlock
while uint64(headers.len) < request.maxResults:
foundBlock = chain.getSuccessorHeader deref(foundBlock)
if foundBlock.isNil: break
headers.add deref(foundBlock)
if not chain.getSuccessorHeader(foundBlock, foundBlock):
break
headers.add foundBlock
await peer.blockHeaders(headers)
@ -160,13 +160,15 @@ type
WantedBlocks = object
startIndex: BlockNumber
numBlocks: uint
results: seq[BlockHeader]
state: WantedBlocksState
headers: seq[BlockHeader]
bodies: seq[BlockBody]
SyncContext = ref object
workQueue: seq[WantedBlocks]
endBlockNumber: BlockNumber
finalizedBlock: BlockNumber # Block which was downloaded and verified
chain: AbstractChainDB
proc endIndex(b: WantedBlocks): BlockNumber =
result = b.startIndex
@ -201,21 +203,28 @@ proc availableWorkItem(ctx: SyncContext): int =
ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial)
proc returnWorkItem(ctx: SyncContext, workItem: int) =
let askedBlocks = ctx.workQueue[workItem].numBlocks.int
let receivedBlocks = ctx.workQueue[workItem].results.len
debug "Work item complete", startBlock = ctx.workQueue[workItem].startIndex,
askedBlocks,
receivedBlocks
let wi = addr ctx.workQueue[workItem]
let askedBlocks = wi.numBlocks.int
let receivedBlocks = wi.headers.len
if askedBlocks != receivedBlocks:
echo "Received blocks is wrong!"
if askedBlocks == receivedBlocks:
debug "Work item complete", startBlock = wi.startIndex,
askedBlocks,
receivedBlocks
else:
warn "Work item complete", startBlock = wi.startIndex,
askedBlocks,
receivedBlocks
ctx.workQueue[workItem].results = nil
ctx.chain.persistBlocks(wi.headers, wi.bodies)
wi.headers.setLen(0)
wi.bodies.setLen(0)
proc newSyncContext(startBlock, endBlock: BlockNumber): SyncContext =
proc newSyncContext(startBlock, endBlock: BlockNumber, chain: AbstractChainDB): SyncContext =
new result
result.endBlockNumber = endBlock
result.finalizedBlock = startBlock
result.chain = chain
proc handleLostPeer(ctx: SyncContext) =
# TODO: ask the PeerPool for new connections and then call
@ -252,8 +261,24 @@ proc obtainBlocksFromPeer(peer: Peer, syncCtx: SyncContext) {.async.} =
let results = await peer.getBlockHeaders(request)
if results.isSome:
workItem.state = Received
shallowCopy(workItem.results, results.get.headers)
shallowCopy(workItem.headers, results.get.headers)
var bodies = newSeq[BlockBody]()
var hashes = newSeq[KeccakHash]()
for i in workItem.headers:
hashes.add(blockHash(i))
if hashes.len == maxBodiesFetch:
let b = await peer.getBlockBodies(hashes)
hashes.setLen(0)
bodies.add(b.get.blocks)
if hashes.len != 0:
let b = await peer.getBlockBodies(hashes)
bodies.add(b.get.blocks)
shallowCopy(workItem.bodies, bodies)
syncCtx.returnWorkItem workItemIdx
continue
except:
# the success case uses `continue`, so we can just fall back to the
@ -344,7 +369,7 @@ proc fastBlockchainSync*(node: EthereumNode): Future[SyncStatus] {.async.} =
# a different peer in case of time-out. Handle invalid or incomplete replies
# properly. The peer may respond with fewer headers than requested (or with
# different ones if the peer is not behaving properly).
var syncCtx = newSyncContext(bestLocalHeader.blockNumber, bestBlockNumber)
var syncCtx = newSyncContext(bestLocalHeader.blockNumber, bestBlockNumber, node.chain)
for peer in node.peers:
if peer.supports(eth):