more fleshed out implementation of the blockchain sync procedure

This commit is contained in:
Zahary Karadjov 2018-07-24 00:39:41 +03:00
parent 6351fc38b9
commit be7ca479b3
2 changed files with 228 additions and 29 deletions

View File

@ -9,7 +9,8 @@
#
import
tables, deques, macros, sets, algorithm, hashes, times, random, options,
tables, deques, macros, sets, algorithm, hashes, times,
random, options, sequtils,
asyncdispatch2, asyncdispatch2/timer,
rlp, ranges/[stackarrays, ptr_arith], nimcrypto, chronicles,
eth_keys, eth_common,
@ -488,7 +489,7 @@ proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} =
if nextMsgId == wantedId:
return nextMsgData.read(MsgType)
proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] {.async.} =
proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
## This procs awaits a specific RLPx message.
## Any messages received while waiting will be dispatched to their
## respective handlers. The designated message handler will also run
@ -542,10 +543,11 @@ template supports*(peer: Peer, Protocol: type): bool =
## Checks whether a Peer supports a particular protocol
peer.dispatcher.protocolOffsets[Protocol.protocolInfo.index] != -1
template state*(connection: Peer, Protocol: type): untyped =
template state*(peer: Peer, Protocol: type): untyped =
## Returns the state object of a particular protocol for a
## particular connection.
cast[ref Protocol.State](connection.getState(Protocol.protocolInfo))
bind getState
cast[ref Protocol.State](getState(peer, Protocol.protocolInfo))
proc getNetworkState(peer: Peer, proto: ProtocolInfo): RootRef =
peer.network.protocolStates[proto.index]
@ -985,8 +987,11 @@ rlpxProtocol p2p, 0:
discard
proc disconnect*(peer: Peer, reason: DisconnectionReason) {.async.} =
await peer.sendDisconnectMsg(reason)
# TODO: Any other clean up required?
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
await peer.sendDisconnectMsg(reason)
peer.connectionState = Disconnected
# TODO: Any other clean up required?
template `^`(arr): auto =
# passes a stack array with a matching `arrLen`
@ -1397,6 +1402,30 @@ iterator peers*(node: EthereumNode): Peer =
for remote, peer in node.peerPool.connectedNodes:
yield peer
iterator peers*(node: EthereumNode, Protocol: type): Peer =
for peer in node.peers:
if peer.supports(Protocol):
yield peer
iterator randomPeers*(node: EthereumNode, maxPeers: int): Peer =
# TODO: this can be implemented more efficiently
# XXX: this doesn't compile, why?
# var peer = toSeq node.peers
var peers = newSeqOfCap[Peer](node.peerPool.connectedNodes.len)
for peer in node.peers: peers.add(peer)
shuffle(peers)
for i in 0 ..< min(maxPeers, peers.len):
yield peers[i]
proc randomPeer*(node: EthereumNode): Peer =
let peerIdx = random(node.peerPool.connectedNodes.len)
var i = 0
for peer in node.peers:
if i == peerIdx: return peer
inc i
when isMainModule:
import rlp, strformat

View File

@ -9,6 +9,7 @@
#
import
random,
asyncdispatch2, rlp, stint, eth_common,
../../eth_p2p
@ -25,24 +26,43 @@ type
syncing: bool
PeerState = object
reportedTotalDifficulty: DifficultyInt
latestBlockHash: KeccakHash
initialized: bool
bestBlockHash: KeccakHash
bestDifficulty: DifficultyInt
const
maxStateFetch = 384
maxBodiesFetch = 128
maxReceiptsFetch = 256
maxHeadersFetch = 192
protocolVersion = 63
rlpxProtocol eth, 63:
rlpxProtocol eth, protocolVersion:
useRequestIds = false
type State = PeerState
onPeerConnected do (peer: Peer):
let
network = peer.network
chain = network.chain
bestBlock = chain.getBestBlockHeader
await peer.status(protocolVersion,
network.networkId,
deref(bestBlock).difficulty,
deref(bestBlock).blockHash,
chain.genesisHash)
discard await peer.nextMsg(eth.status)
peer.state.initialized = true
proc status(peer: Peer,
protocolVersion, networkId: uint,
protocolVersion: uint,
networkId: uint,
totalDifficulty: DifficultyInt,
bestHash, genesisHash: KeccakHash) =
bestHash: KeccakHash,
genesisHash: KeccakHash) =
# verify that the peer is on the same chain:
if peer.network.networkId != networkId or
peer.network.chain.genesisHash != genesisHash:
@ -50,7 +70,8 @@ rlpxProtocol eth, 63:
await peer.disconnect(SubprotocolReason)
return
peer.state.reportedTotalDifficulty = totalDifficulty
peer.state.bestBlockHash = bestHash
peer.state.bestDifficulty = totalDifficulty
proc newBlockHashes(peer: Peer, hashes: openarray[NewBlockHashesAnnounce]) =
discard
@ -119,27 +140,168 @@ rlpxProtocol eth, 63:
proc receipts(peer: Peer, receipts: openarray[Receipt]) =
discard
proc fastBlockchainSync*(node: EthereumNode) {.async.} =
# 1. obtain last N block headers from all peers
var latestBlocksRequest: BlocksRequest
var requests = newSeqOfCap[Future[Option[eth.blockHeaders]]](32)
for peer in node.peers:
type
SyncStatus* = enum
syncSuccess
syncNotEnoughPeers
syncTimeOut
WantedBlocksState = enum
Initial,
Requested,
Received
WantedBlocks = object
startIndex, endIndex: int
results: seq[BlockHeader]
state: WantedBlocksState
nextWorkItem: int
SyncContext = ref object
workQueue: seq[WantedBlocks]
nextWorkItem: int
proc popWorkItem(ctx: SyncContext): int =
result = ctx.nextWorkItem
ctx.nextWorkItem = ctx.workQueue[result].nextWorkItem
proc returnWorkItem(ctx: SyncContext, workItem: int) =
ctx.workQueue[workItem].state = Initial
ctx.workQueue[workItem].nextWorkItem = ctx.nextWorkItem
ctx.nextWorkItem = workItem
proc newSyncContext(startBlock, endBlock: int): SyncContext =
new result
let totalBlocksNeeded = endBlock - startBlock
let workQueueSize = totalBlocksNeeded div maxHeadersFetch
result.workQueue = newSeq[WantedBlocks](workQueueSize)
for i in 0 ..< workQueueSize:
let startIndex = startBlock + i * maxHeadersFetch
result.workQueue[i].startIndex = startIndex
result.workQueue[i].endIndex = startIndex + maxHeadersFetch
result.nextWorkItem = i + 1
if totalBlocksNeeded mod maxHeadersFetch == 0:
result.workQueue[^1].nextWorkItem = -1
else:
# TODO: this still has a tiny risk of reallocation
result.workQueue.add WantedBlocks(
startIndex: result.workQueue[^1].endIndex + 1,
endIndex: endBlock,
nextWorkItem: -1)
proc handleLostPeer(ctx: SyncContext) =
# TODO: ask the PeerPool for new connections and then call
# `obtainsBlocksFromPeer`
discard
proc randomOtherPeer(node: EthereumNode, particularPeer: Peer): Peer =
# TODO: we can maintain a per-protocol list of peers in EtheruemNode
var ethPeersCount = 0
for peer in node.peers(eth):
if peer != particularPeer:
inc ethPeersCount
if ethPeersCount == 0: return nil
let peerIdx = random(ethPeersCount) + 1
for peer in node.peers(eth):
if peer != particularPeer:
if peerIdx == ethPeersCount: return peer
dec ethPeersCount
proc obtainsBlocksFromPeer(peer: Peer, syncCtx: SyncContext) {.async.} =
# TODO: add support for request pipelining here
# (asking for multiple blocks even before the results are in)
while (let workItemIdx = syncCtx.popWorkItem; workItemIdx != -1):
template workItem: auto = syncCtx.workQueue[workItemIdx]
workItem.state = Requested
let request = BlocksRequest(
startBlock: HashOrNum(isHash: false,
number: workItem.startIndex.toBlockNumber),
maxResults: maxHeadersFetch,
skip: 0,
reverse: false)
try:
let results = await peer.getBlockHeaders(request)
if results.isSome:
workItem.state = Received
shallowCopy(workItem.results, results.get.headers)
continue
except:
# the success case uses `continue`, so we can just fall back to the
# failure path below. If we signal time-outs with exceptions such
# failures will be easier to handle.
discard
# This peer proved to be unreliable. TODO: Decrease its reputation.
await peer.disconnect(SubprotocolReason)
syncCtx.returnWorkItem workItemIdx
syncCtx.handleLostPeer()
proc fastBlockchainSync*(node: EthereumNode): Future[SyncStatus] {.async.} =
var
bestBlockDifficulty: DifficultyInt = 0.stuint(256)
bestPeer: Peer = nil
bestBlockNumber: BlockNumber
for peer in node.peers(eth):
let peerEthState = peer.state(eth)
if peerEthState.initialized:
if peerEthState.bestDifficulty > bestBlockDifficulty:
bestBlockDifficulty = peerEthState.bestDifficulty
bestPeer = peer
if bestPeer == nil:
return syncNotEnoughPeers
while true:
let request = BlocksRequest(
startBlock: HashOrNum(isHash: true,
hash: bestPeer.state(eth).bestBlockHash),
maxResults: 1,
skip: 0,
reverse: true)
let latestBlock = await bestPeer.getBlockHeaders(request)
if latestBlock.isSome and latestBlock.get.headers.len > 0:
bestBlockNumber = latestBlock.get.headers[0].blockNumber
break
# TODO: maintain multiple "best peer" candidates and send requests
# to the second best option
bestPeer = node.randomOtherPeer(bestPeer)
if bestPeer == nil:
return syncNotEnoughPeers
# does the network agree with our best block?
var
localChain = node.chain
bestLocalHeader = localChain.getBestBlockHeader
for peer in node.randomPeers(5):
if peer.supports(eth):
requests.add peer.getBlockHeaders(latestBlocksRequest)
let request = BlocksRequest(
startBlock: HashOrNum(isHash: false,
number: bestLocalHeader.blockNumber),
maxResults: 1,
skip: 0,
reverse: true)
discard await all(requests)
# TODO: check if the majority of peers agree with the block
# positioned at our best block number.
# 2. find out what is the block with best total difficulty
var bestBlockDifficulty: DifficultyInt = 0.stuint(256)
for req in requests:
if req.read.isNone: continue
for header in req.read.get.headers:
if header.difficulty > bestBlockDifficulty:
discard
# TODO: In case of disagreement, perform a binary search to locate a
# block where we agree.
# 3. establish the highest valid block for each peer
# keep in mind that some of the peers may report an alternative history, so
# we must find the last block where each peer agreed with the best peer
if bestLocalHeader.blockNumber >= bestBlockNumber:
return syncSuccess
# 4. Start making requests in parallel for the block headers that we are
# missing (by requesting blocks from peers while honoring maxHeadersFetch).
@ -147,6 +309,14 @@ proc fastBlockchainSync*(node: EthereumNode) {.async.} =
# a different peer in case of time-out. Handle invalid or incomplete replies
# properly. The peer may respond with fewer headers than requested (or with
# different ones if the peer is not behaving properly).
var syncCtx = newSyncContext(bestLocalHeader.blockNumber.toInt,
bestBlockNumber.toInt)
for peer in node.peers:
if peer.supports(eth):
# TODO: we should also monitor the PeerPool for new peers here and
# we should automatically add them to the loop.
asyncCheck obtainsBlocksFromPeer(peer, syncCtx)
# 5. Store the obtained headers in the blockchain DB