Proper sync part 1

This commit is contained in:
Yuriy Glukhov 2018-10-01 14:33:59 +03:00
parent badc616be8
commit 7f14c435d6
2 changed files with 181 additions and 110 deletions

View File

@ -63,6 +63,7 @@ type
connectingNodes: HashSet[Node]
running: bool
listenPort*: Port
observers: Table[int, PeerObserver]
MessageInfo* = object
id*: int
@ -105,6 +106,10 @@ type
protocolOffsets: seq[int]
messages: seq[ptr MessageInfo]
PeerObserver* = object
onPeerConnected*: proc(p: Peer)
onPeerDisconnected*: proc(p: Peer)
MessageHandler = proc(x: Peer, data: Rlp): Future[void]
MessageContentPrinter = proc(msg: pointer): string
RequestResolver = proc(msg: pointer, future: FutureBase)
@ -492,6 +497,11 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
decryptedBytes.setLen(decryptedBytesCount)
var rlp = rlpFromBytes(decryptedBytes.toRange)
let msgId = rlp.read(int)
# if not peer.dispatcher.isNil:
#
# echo "Read msg: ", peer.dispatcher.messages[msgId].name
# else:
# echo "Read msg: ", msgId
return (msgId, rlp)
proc perPeerMsgId(peer: Peer, proto: type, msgId: int): int {.inline.} =
@ -1261,6 +1271,7 @@ proc newPeerPool*(network: EthereumNode,
result.discovery = discovery
result.connectedNodes = initTable[Node, Peer]()
result.connectingNodes = initSet[Node]()
result.observers = initTable[int, PeerObserver]()
result.listenPort = listenPort
template ensureFuture(f: untyped) = asyncCheck f
@ -1268,14 +1279,21 @@ template ensureFuture(f: untyped) = asyncCheck f
proc nodesToConnect(p: PeerPool): seq[Node] {.inline.} =
p.discovery.randomNodes(p.minPeers)
# def subscribe(self, subscriber: PeerPoolSubscriber) -> None:
# self._subscribers.append(subscriber)
# for peer in self.connected_nodes.values():
# subscriber.register_peer(peer)
proc addObserver(p: PeerPool, observerId: int, observer: PeerObserver) =
assert(observerId notin p.observers)
p.observers[observerId] = observer
if not observer.onPeerConnected.isNil:
for peer in p.connectedNodes.values:
observer.onPeerConnected(peer)
# def unsubscribe(self, subscriber: PeerPoolSubscriber) -> None:
# if subscriber in self._subscribers:
# self._subscribers.remove(subscriber)
proc delObserver(p: PeerPool, observerId: int) =
p.observers.del(observerId)
proc addObserver*(p: PeerPool, observerId: ref, observer: PeerObserver) {.inline.} =
p.addObserver(cast[int](observerId), observer)
proc delObserver*(p: PeerPool, observerId: ref) {.inline.} =
p.delObserver(cast[int](observerId))
proc stopAllPeers(p: PeerPool) {.async.} =
info "Stopping all peers ..."
@ -1338,24 +1356,28 @@ proc peerFinished(p: PeerPool, peer: Peer) =
## This is passed as a callback to be called when a peer finishes.
p.connectedNodes.del(peer.remote)
proc run(p: Peer, peerPool: PeerPool) {.async.} =
for o in p.observers.values:
if not o.onPeerDisconnected.isNil:
o.onPeerDisconnected(peer)
proc run(peer: Peer, peerPool: PeerPool) {.async.} =
# TODO: This is a stub that should be implemented in rlpx.nim
try:
while true:
var (nextMsgId, nextMsgData) = await p.recvMsg()
var (nextMsgId, nextMsgData) = await peer.recvMsg()
if nextMsgId == 1:
debug "Run got disconnect msg", reason = nextMsgData.listElem(0).toInt(uint32).DisconnectionReason
debug "Run got disconnect msg", reason = nextMsgData.listElem(0).toInt(uint32).DisconnectionReason, peer
break
else:
# debug "Got msg: ", msg = nextMsgId
await p.dispatchMsg(nextMsgId, nextMsgData)
await peer.dispatchMsg(nextMsgId, nextMsgData)
except:
error "Failed to read from peer",
err = getCurrentExceptionMsg(),
stackTrace = getCurrentException().getStackTrace()
peerPool.peerFinished(p)
peerPool.peerFinished(peer)
proc connectToNode*(p: PeerPool, n: Node) {.async.} =
let peer = await p.connect(n)
@ -1364,8 +1386,9 @@ proc connectToNode*(p: PeerPool, n: Node) {.async.} =
ensureFuture peer.run(p)
p.connectedNodes[peer.remote] = peer
# for subscriber in self._subscribers:
# subscriber.register_peer(peer)
for o in p.observers.values:
if not o.onPeerConnected.isNil:
o.onPeerConnected(peer)
proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
for node in nodes:
@ -1431,6 +1454,7 @@ proc start*(p: PeerPool) =
if not p.running:
asyncCheck p.run()
proc len*(p: PeerPool): int = p.connectedNodes.len
# @property
# def peers(self) -> List[BasePeer]:
# peers = list(self.connected_nodes.values())
@ -1540,14 +1564,22 @@ proc connectToNetwork*(node: EthereumNode,
proc stopListening*(node: EthereumNode) =
node.listeningServer.stop()
iterator peers*(p: PeerPool): Peer =
for remote, peer in p.connectedNodes:
yield peer
iterator peers*(p: PeerPool, Protocol: type): Peer =
for peer in p.peers:
if peer.supports(Protocol):
yield peer
iterator peers*(node: EthereumNode): Peer =
for remote, peer in node.peerPool.connectedNodes:
for peer in node.peerPool.peers:
yield peer
iterator peers*(node: EthereumNode, Protocol: type): Peer =
for peer in node.peers:
if peer.supports(Protocol):
yield peer
for peer in node.peerPool.peers(Protocol):
yield peer
iterator randomPeers*(node: EthereumNode, maxPeers: int): Peer =
# TODO: this can be implemented more efficiently

View File

@ -12,7 +12,7 @@
## https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol
import
random,
random, algorithm, hashes,
asyncdispatch2, rlp, stint, eth_common, chronicles,
../../eth_p2p
@ -39,6 +39,7 @@ const
maxReceiptsFetch = 256
maxHeadersFetch = 192
protocolVersion = 63
minPeersToStartSync = 2 # Wait for consensus of at least this number of peers before syncing
rlpxProtocol eth, protocolVersion:
useRequestIds = false
@ -58,6 +59,10 @@ rlpxProtocol eth, protocolVersion:
chain.genesisHash)
let m = await peer.waitSingleMsg(eth.status)
if m.networkId == network.networkId and m.genesisHash == chain.genesisHash:
debug "Suitable peer", peer
else:
raise newException(UselessPeerError, "Eth handshake params mismatch")
peer.state.initialized = true
peer.state.bestDifficulty = m.totalDifficulty
peer.state.bestBlockHash = m.bestHash
@ -146,6 +151,8 @@ rlpxProtocol eth, protocolVersion:
proc receipts(peer: Peer, receipts: openarray[Receipt]) =
discard
proc hash*(p: Peer): Hash {.inline.} = hash(cast[pointer](p))
type
SyncStatus* = enum
syncSuccess
@ -169,6 +176,8 @@ type
endBlockNumber: BlockNumber
finalizedBlock: BlockNumber # Block which was downloaded and verified
chain: AbstractChainDB
peerPool: PeerPool
trustedPeers: HashSet[Peer]
proc endIndex(b: WantedBlocks): BlockNumber =
result = b.startIndex
@ -220,36 +229,42 @@ proc returnWorkItem(ctx: SyncContext, workItem: int) =
wi.headers.setLen(0)
wi.bodies.setLen(0)
proc newSyncContext(startBlock, endBlock: BlockNumber, chain: AbstractChainDB): SyncContext =
proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext =
new result
result.endBlockNumber = endBlock
result.finalizedBlock = startBlock
result.chain = chain
result.peerPool = peerPool
result.trustedPeers = initSet[Peer]()
result.finalizedBlock = chain.getBestBlockHeader().blockNumber
proc handleLostPeer(ctx: SyncContext) =
# TODO: ask the PeerPool for new connections and then call
# `obtainBlocksFromPeer`
discard
proc randomOtherPeer(node: EthereumNode, particularPeer: Peer): Peer =
# TODO: we can maintain a per-protocol list of peers in EtheruemNode
var ethPeersCount = 0
for peer in node.peers(eth):
if peer != particularPeer:
inc ethPeersCount
proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} =
let request = BlocksRequest(
startBlock: HashOrNum(isHash: true,
hash: p.state(eth).bestBlockHash),
maxResults: 1,
skip: 0,
reverse: true)
if ethPeersCount == 0: return nil
let peerIdx = random(ethPeersCount) + 1
for peer in node.peers(eth):
if peer != particularPeer:
if peerIdx == ethPeersCount: return peer
dec ethPeersCount
let latestBlock = await p.getBlockHeaders(request)
if latestBlock.isSome and latestBlock.get.headers.len > 0:
result = latestBlock.get.headers[0].blockNumber
proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
# Update our best block number
let bestBlockNumber = await peer.getBestBlockNumber()
if bestBlockNumber > syncCtx.endBlockNumber:
info "New sync end block number", number = bestBlockNumber
syncCtx.endBlockNumber = bestBlockNumber
proc obtainBlocksFromPeer(peer: Peer, syncCtx: SyncContext) {.async.} =
while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1):
template workItem: auto = syncCtx.workQueue[workItemIdx]
workItem.state = Requested
debug "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks
debug "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks, peer
let request = BlocksRequest(
startBlock: HashOrNum(isHash: false,
number: workItem.startIndex),
@ -297,6 +312,100 @@ proc obtainBlocksFromPeer(peer: Peer, syncCtx: SyncContext) {.async.} =
debug "Nothing to sync"
proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
# Returns true if one of the peers acknowledges existense of the best block
# of another peer.
var
a = a
b = b
if a.state(eth).bestDifficulty < b.state(eth).bestDifficulty:
swap(a, b)
let request = BlocksRequest(
startBlock: HashOrNum(isHash: true,
hash: b.state(eth).bestBlockHash),
maxResults: 1,
skip: 0,
reverse: true)
let latestBlock = await a.getBlockHeaders(request)
result = latestBlock.isSome and latestBlock.get.headers.len > 0
proc randomTrustedPeer(ctx: SyncContext): Peer =
var k = rand(ctx.trustedPeers.len - 1)
var i = 0
for p in ctx.trustedPeers:
result = p
if i == k: return
inc i
proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
if ctx.trustedPeers.len >= minPeersToStartSync:
# We have enough trusted peers. Validate new peer against trusted
if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()):
ctx.trustedPeers.incl(peer)
asyncCheck ctx.obtainBlocksFromPeer(peer)
elif ctx.trustedPeers.len == 0:
# Assume the peer is trusted, but don't start sync until we reevaluate
# it with more peers
debug "Assume trusted peer", peer
ctx.trustedPeers.incl(peer)
else:
# At this point we have some "trusted" candidates, but they are not
# "trusted" enough. We evaluate `peer` against all other candidates.
# If one of the candidates disagrees, we swap it for `peer`. If all
# candidates agree, we add `peer` to trusted set. The peers in the set
# will become "fully trusted" (and sync will start) when the set is big
# enough
var
agreeScore = 0
disagreedPeer: Peer
for tp in ctx.trustedPeers:
if await peersAgreeOnChain(peer, tp):
inc agreeScore
else:
disagreedPeer = tp
let disagreeScore = ctx.trustedPeers.len - agreeScore
if agreeScore == ctx.trustedPeers.len:
ctx.trustedPeers.incl(peer) # The best possible outsome
elif disagreeScore == 1:
info "Peer is no more trusted for sync", peer
ctx.trustedPeers.excl(disagreedPeer)
ctx.trustedPeers.incl(peer)
else:
info "Peer not trusted for sync", peer
if ctx.trustedPeers.len == minPeersToStartSync:
for p in ctx.trustedPeers:
asyncCheck ctx.obtainBlocksFromPeer(p)
proc onPeerConnected(ctx: SyncContext, peer: Peer) =
debug "New candidate for sync", peer
discard
let f = ctx.startSyncWithPeer(peer)
f.callback = proc(data: pointer) =
if f.failed:
error "startSyncWithPeer failed", msg = f.readError.msg, peer
proc onPeerDisconnected(ctx: SyncContext, p: Peer) =
echo "onPeerDisconnected"
ctx.trustedPeers.excl(p)
proc startSync(ctx: SyncContext) =
var po: PeerObserver
po.onPeerConnected = proc(p: Peer) =
ctx.onPeerConnected(p)
po.onPeerDisconnected = proc(p: Peer) =
ctx.onPeerDisconnected(p)
ctx.peerPool.addObserver(ctx, po)
proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) =
var
bestBlockDifficulty: DifficultyInt = 0.stuint(256)
@ -315,78 +424,8 @@ proc fastBlockchainSync*(node: EthereumNode): Future[SyncStatus] {.async.} =
## Code for the fast blockchain sync procedure:
## https://github.com/ethereum/wiki/wiki/Parallel-Block-Downloads
## https://github.com/ethereum/go-ethereum/pull/1889
var
bestBlockNumber: BlockNumber
debug "start sync"
var (bestPeer, bestBlockDifficulty) = node.findBestPeer()
if bestPeer == nil:
return syncNotEnoughPeers
while true:
let request = BlocksRequest(
startBlock: HashOrNum(isHash: true,
hash: bestPeer.state(eth).bestBlockHash),
maxResults: 1,
skip: 0,
reverse: true)
let latestBlock = await bestPeer.getBlockHeaders(request)
if latestBlock.isSome and latestBlock.get.headers.len > 0:
bestBlockNumber = latestBlock.get.headers[0].blockNumber
break
# TODO: maintain multiple "best peer" candidates and send requests
# to the second best option
bestPeer = node.randomOtherPeer(bestPeer)
if bestPeer == nil:
return syncNotEnoughPeers
# does the network agree with our best block?
var
localChain = node.chain
bestLocalHeader = localChain.getBestBlockHeader
for peer in node.randomPeers(5):
if peer.supports(eth):
let request = BlocksRequest(
startBlock: HashOrNum(isHash: false,
number: bestLocalHeader.blockNumber),
maxResults: 1,
skip: 0,
reverse: true)
# TODO: check if the majority of peers agree with the block
# positioned at our best block number.
# TODO: In case of disagreement, perform a binary search to locate a
# block where we agree.
if bestLocalHeader.blockNumber >= bestBlockNumber:
return syncSuccess
# 4. Start making requests in parallel for the block headers that we are
# missing (by requesting blocks from peers while honoring maxHeadersFetch).
# Make sure the blocks hashes add up. Don't count on everyone replying, ask
# a different peer in case of time-out. Handle invalid or incomplete replies
# properly. The peer may respond with fewer headers than requested (or with
# different ones if the peer is not behaving properly).
var syncCtx = newSyncContext(bestLocalHeader.blockNumber, bestBlockNumber, node.chain)
for peer in node.peers:
if peer.supports(eth):
# TODO: we should also monitor the PeerPool for new peers here and
# we should automatically add them to the loop.
asyncCheck obtainBlocksFromPeer(peer, syncCtx)
# 5. Store the obtained headers in the blockchain DB
# 6. Once the sync is complete, repeat from 1. until to further progress is
# possible
# 7. Start downloading the blockchain state in parallel
# (maybe this could start earlier).
# TODO: This needs a better interface. Consider removing this function and
# exposing SyncCtx
var syncCtx = newSyncContext(node.chain, node.peerPool)
syncCtx.startSync()