Merge pull request #42 from status-im/sync

Proper sync part 1
This commit is contained in:
Yuriy Glukhov 2018-10-04 11:03:11 +03:00 committed by GitHub
commit e5ff8aea2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 248 additions and 124 deletions

View File

@ -63,6 +63,7 @@ type
connectingNodes: HashSet[Node]
running: bool
listenPort*: Port
observers: Table[int, PeerObserver]
MessageInfo* = object
id*: int
@ -105,6 +106,10 @@ type
protocolOffsets: seq[int]
messages: seq[ptr MessageInfo]
PeerObserver* = object
onPeerConnected*: proc(p: Peer)
onPeerDisconnected*: proc(p: Peer)
MessageHandler = proc(x: Peer, data: Rlp): Future[void]
MessageContentPrinter = proc(msg: pointer): string
RequestResolver = proc(msg: pointer, future: FutureBase)
@ -492,6 +497,11 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
decryptedBytes.setLen(decryptedBytesCount)
var rlp = rlpFromBytes(decryptedBytes.toRange)
let msgId = rlp.read(int)
# if not peer.dispatcher.isNil:
#
# echo "Read msg: ", peer.dispatcher.messages[msgId].name
# else:
# echo "Read msg: ", msgId
return (msgId, rlp)
proc perPeerMsgId(peer: Peer, proto: type, msgId: int): int {.inline.} =
@ -1261,6 +1271,7 @@ proc newPeerPool*(network: EthereumNode,
result.discovery = discovery
result.connectedNodes = initTable[Node, Peer]()
result.connectingNodes = initSet[Node]()
result.observers = initTable[int, PeerObserver]()
result.listenPort = listenPort
template ensureFuture(f: untyped) = asyncCheck f
@ -1268,14 +1279,21 @@ template ensureFuture(f: untyped) = asyncCheck f
proc nodesToConnect(p: PeerPool): seq[Node] {.inline.} =
p.discovery.randomNodes(p.minPeers)
# def subscribe(self, subscriber: PeerPoolSubscriber) -> None:
# self._subscribers.append(subscriber)
# for peer in self.connected_nodes.values():
# subscriber.register_peer(peer)
proc addObserver(p: PeerPool, observerId: int, observer: PeerObserver) =
assert(observerId notin p.observers)
p.observers[observerId] = observer
if not observer.onPeerConnected.isNil:
for peer in p.connectedNodes.values:
observer.onPeerConnected(peer)
# def unsubscribe(self, subscriber: PeerPoolSubscriber) -> None:
# if subscriber in self._subscribers:
# self._subscribers.remove(subscriber)
proc delObserver(p: PeerPool, observerId: int) =
p.observers.del(observerId)
proc addObserver*(p: PeerPool, observerId: ref, observer: PeerObserver) {.inline.} =
p.addObserver(cast[int](observerId), observer)
proc delObserver*(p: PeerPool, observerId: ref) {.inline.} =
p.delObserver(cast[int](observerId))
proc stopAllPeers(p: PeerPool) {.async.} =
info "Stopping all peers ..."
@ -1338,24 +1356,28 @@ proc peerFinished(p: PeerPool, peer: Peer) =
## This is passed as a callback to be called when a peer finishes.
p.connectedNodes.del(peer.remote)
proc run(p: Peer, peerPool: PeerPool) {.async.} =
for o in p.observers.values:
if not o.onPeerDisconnected.isNil:
o.onPeerDisconnected(peer)
proc run(peer: Peer, peerPool: PeerPool) {.async.} =
# TODO: This is a stub that should be implemented in rlpx.nim
try:
while true:
var (nextMsgId, nextMsgData) = await p.recvMsg()
var (nextMsgId, nextMsgData) = await peer.recvMsg()
if nextMsgId == 1:
debug "Run got disconnect msg", reason = nextMsgData.listElem(0).toInt(uint32).DisconnectionReason
debug "Run got disconnect msg", reason = nextMsgData.listElem(0).toInt(uint32).DisconnectionReason, peer
break
else:
# debug "Got msg: ", msg = nextMsgId
await p.dispatchMsg(nextMsgId, nextMsgData)
await peer.dispatchMsg(nextMsgId, nextMsgData)
except:
error "Failed to read from peer",
err = getCurrentExceptionMsg(),
stackTrace = getCurrentException().getStackTrace()
peerPool.peerFinished(p)
peerPool.peerFinished(peer)
proc connectToNode*(p: PeerPool, n: Node) {.async.} =
let peer = await p.connect(n)
@ -1364,8 +1386,9 @@ proc connectToNode*(p: PeerPool, n: Node) {.async.} =
ensureFuture peer.run(p)
p.connectedNodes[peer.remote] = peer
# for subscriber in self._subscribers:
# subscriber.register_peer(peer)
for o in p.observers.values:
if not o.onPeerConnected.isNil:
o.onPeerConnected(peer)
proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
for node in nodes:
@ -1431,6 +1454,7 @@ proc start*(p: PeerPool) =
if not p.running:
asyncCheck p.run()
proc len*(p: PeerPool): int = p.connectedNodes.len
# @property
# def peers(self) -> List[BasePeer]:
# peers = list(self.connected_nodes.values())
@ -1540,14 +1564,22 @@ proc connectToNetwork*(node: EthereumNode,
proc stopListening*(node: EthereumNode) =
node.listeningServer.stop()
iterator peers*(p: PeerPool): Peer =
for remote, peer in p.connectedNodes:
yield peer
iterator peers*(p: PeerPool, Protocol: type): Peer =
for peer in p.peers:
if peer.supports(Protocol):
yield peer
iterator peers*(node: EthereumNode): Peer =
for remote, peer in node.peerPool.connectedNodes:
for peer in node.peerPool.peers:
yield peer
iterator peers*(node: EthereumNode, Protocol: type): Peer =
for peer in node.peers:
if peer.supports(Protocol):
yield peer
for peer in node.peerPool.peers(Protocol):
yield peer
iterator randomPeers*(node: EthereumNode, maxPeers: int): Peer =
# TODO: this can be implemented more efficiently

View File

@ -258,15 +258,23 @@ proc open*(d: DiscoveryProtocol) =
let ta = initTAddress(d.address.ip, d.address.udpPort)
d.transp = newDatagramTransport(processClient, udata = d, local = ta)
proc lookupRandom*(d: DiscoveryProtocol): Future[seq[Node]] {.inline.} =
d.kademlia.lookupRandom()
proc run(d: DiscoveryProtocol) {.async.} =
while true:
discard await d.lookupRandom()
await sleepAsync(3000)
echo "Discovered nodes: ", d.kademlia.nodesDiscovered
proc bootstrap*(d: DiscoveryProtocol) {.async.} =
await d.kademlia.bootstrap(d.bootstrapNodes)
discard d.run()
proc resolve*(d: DiscoveryProtocol, n: NodeId): Future[Node] =
d.kademlia.resolve(n)
proc lookupRandom*(d: DiscoveryProtocol): Future[seq[Node]] {.inline.} =
d.kademlia.lookupRandom()
proc randomNodes*(d: DiscoveryProtocol, count: int): seq[Node] {.inline.} =
d.kademlia.randomNodes(count)

View File

@ -67,6 +67,7 @@ proc newNode*(enode: ENode): Node =
proc distanceTo(n: Node, id: NodeId): UInt256 = n.id xor id
proc `$`*(n: Node): string =
# "Node[" & $n.node & "]"
"Node[" & $n.node.address.ip & ":" & $n.node.address.udpPort & "]"
proc hash*(n: Node): hashes.Hash = hash(n.node.pubkey.data)
@ -479,6 +480,8 @@ proc randomNodes*(k: KademliaProtocol, count: int): seq[Node] =
result.add(node)
seen.incl(node)
proc nodesDiscovered*(k: KademliaProtocol): int {.inline.} = k.routing.len
when isMainModule:
proc randomNode(): Node =
newNode("enode://aa36fdf33dd030378a0168efe6ed7d5cc587fafa3cdd375854fe735a2e11ea3650ba29644e2db48368c46e1f60e716300ba49396cd63778bf8a818c09bded46f@13.93.211.84:30303")

View File

@ -12,7 +12,7 @@
## https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol
import
random,
random, algorithm, hashes,
asyncdispatch2, rlp, stint, eth_common, chronicles,
../../eth_p2p
@ -39,6 +39,7 @@ const
maxReceiptsFetch = 256
maxHeadersFetch = 192
protocolVersion = 63
minPeersToStartSync = 2 # Wait for consensus of at least this number of peers before syncing
rlpxProtocol eth, protocolVersion:
useRequestIds = false
@ -58,6 +59,10 @@ rlpxProtocol eth, protocolVersion:
chain.genesisHash)
let m = await peer.waitSingleMsg(eth.status)
if m.networkId == network.networkId and m.genesisHash == chain.genesisHash:
debug "Suitable peer", peer
else:
raise newException(UselessPeerError, "Eth handshake params mismatch")
peer.state.initialized = true
peer.state.bestDifficulty = m.totalDifficulty
peer.state.bestBlockHash = m.bestHash
@ -146,6 +151,8 @@ rlpxProtocol eth, protocolVersion:
proc receipts(peer: Peer, receipts: openarray[Receipt]) =
discard
proc hash*(p: Peer): Hash {.inline.} = hash(cast[pointer](p))
type
SyncStatus* = enum
syncSuccess
@ -155,7 +162,8 @@ type
WantedBlocksState = enum
Initial,
Requested,
Received
Received,
Persisted
WantedBlocks = object
startIndex: BlockNumber
@ -169,6 +177,9 @@ type
endBlockNumber: BlockNumber
finalizedBlock: BlockNumber # Block which was downloaded and verified
chain: AbstractChainDB
peerPool: PeerPool
trustedPeers: HashSet[Peer]
hasOutOfOrderBlocks: bool
proc endIndex(b: WantedBlocks): BlockNumber =
result = b.startIndex
@ -176,12 +187,13 @@ proc endIndex(b: WantedBlocks): BlockNumber =
proc availableWorkItem(ctx: SyncContext): int =
var maxPendingBlock = ctx.finalizedBlock
echo "queue len: ", ctx.workQueue.len
result = -1
for i in 0 .. ctx.workQueue.high:
case ctx.workQueue[i].state
of Initial:
return i
of Received:
of Persisted:
result = i
else:
discard
@ -202,54 +214,95 @@ proc availableWorkItem(ctx: SyncContext): int =
numBlocks = maxHeadersFetch
ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial)
proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks) =
ctx.chain.persistBlocks(wi.headers, wi.bodies)
wi.headers.setLen(0)
wi.bodies.setLen(0)
ctx.finalizedBlock = wi.endIndex
wi.state = Persisted
proc persistPendingWorkItems(ctx: SyncContext) =
var nextStartIndex = ctx.finalizedBlock + 1
var keepRunning = true
var hasOutOfOrderBlocks = false
debug "Looking for out of order blocks"
while keepRunning:
keepRunning = false
hasOutOfOrderBlocks = false
for i in 0 ..< ctx.workQueue.len:
let start = ctx.workQueue[i].startIndex
if ctx.workQueue[i].state == Received:
if start == nextStartIndex:
debug "Persisting pending work item", start
ctx.persistWorkItem(ctx.workQueue[i])
nextStartIndex = ctx.finalizedBlock + 1
keepRunning = true
break
else:
hasOutOfOrderBlocks = true
ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks
proc returnWorkItem(ctx: SyncContext, workItem: int) =
let wi = addr ctx.workQueue[workItem]
let askedBlocks = wi.numBlocks.int
let receivedBlocks = wi.headers.len
let start = wi.startIndex
if askedBlocks == receivedBlocks:
debug "Work item complete", startBlock = wi.startIndex,
debug "Work item complete", start,
askedBlocks,
receivedBlocks
else:
warn "Work item complete", startBlock = wi.startIndex,
warn "Work item complete", start,
askedBlocks,
receivedBlocks
ctx.chain.persistBlocks(wi.headers, wi.bodies)
wi.headers.setLen(0)
wi.bodies.setLen(0)
if wi.startIndex != ctx.finalizedBlock + 1:
info "Blocks out of order", start, final = ctx.finalizedBlock
ctx.hasOutOfOrderBlocks = true
else:
info "Persisting blocks", start
ctx.persistWorkItem(wi[])
if ctx.hasOutOfOrderBlocks:
ctx.persistPendingWorkItems()
proc newSyncContext(startBlock, endBlock: BlockNumber, chain: AbstractChainDB): SyncContext =
proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext =
new result
result.endBlockNumber = endBlock
result.finalizedBlock = startBlock
result.chain = chain
result.peerPool = peerPool
result.trustedPeers = initSet[Peer]()
result.finalizedBlock = chain.getBestBlockHeader().blockNumber
proc handleLostPeer(ctx: SyncContext) =
# TODO: ask the PeerPool for new connections and then call
# `obtainBlocksFromPeer`
discard
proc randomOtherPeer(node: EthereumNode, particularPeer: Peer): Peer =
# TODO: we can maintain a per-protocol list of peers in EtheruemNode
var ethPeersCount = 0
for peer in node.peers(eth):
if peer != particularPeer:
inc ethPeersCount
proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} =
let request = BlocksRequest(
startBlock: HashOrNum(isHash: true,
hash: p.state(eth).bestBlockHash),
maxResults: 1,
skip: 0,
reverse: true)
if ethPeersCount == 0: return nil
let peerIdx = random(ethPeersCount) + 1
for peer in node.peers(eth):
if peer != particularPeer:
if peerIdx == ethPeersCount: return peer
dec ethPeersCount
let latestBlock = await p.getBlockHeaders(request)
if latestBlock.isSome and latestBlock.get.headers.len > 0:
result = latestBlock.get.headers[0].blockNumber
proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
# Update our best block number
let bestBlockNumber = await peer.getBestBlockNumber()
if bestBlockNumber > syncCtx.endBlockNumber:
info "New sync end block number", number = bestBlockNumber
syncCtx.endBlockNumber = bestBlockNumber
proc obtainBlocksFromPeer(peer: Peer, syncCtx: SyncContext) {.async.} =
while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1):
template workItem: auto = syncCtx.workQueue[workItemIdx]
workItem.state = Requested
debug "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks
debug "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks, peer
let request = BlocksRequest(
startBlock: HashOrNum(isHash: false,
number: workItem.startIndex),
@ -261,7 +314,6 @@ proc obtainBlocksFromPeer(peer: Peer, syncCtx: SyncContext) {.async.} =
try:
let results = await peer.getBlockHeaders(request)
if results.isSome:
workItem.state = Received
shallowCopy(workItem.headers, results.get.headers)
var bodies = newSeq[BlockBody]()
@ -277,8 +329,11 @@ proc obtainBlocksFromPeer(peer: Peer, syncCtx: SyncContext) {.async.} =
let b = await peer.getBlockBodies(hashes)
bodies.add(b.get.blocks)
shallowCopy(workItem.bodies, bodies)
dataReceived = true
if bodies.len == workItem.headers.len:
shallowCopy(workItem.bodies, bodies)
dataReceived = true
else:
warn "Bodies len != headers.len", bodies = bodies.len, headers = workItem.headers.len
except:
# the success case uses `continue`, so we can just fall back to the
# failure path below. If we signal time-outs with exceptions such
@ -286,8 +341,10 @@ proc obtainBlocksFromPeer(peer: Peer, syncCtx: SyncContext) {.async.} =
discard
if dataReceived:
workItem.state = Received
syncCtx.returnWorkItem workItemIdx
else:
workItem.state = Initial
try:
await peer.disconnect(SubprotocolReason)
except:
@ -295,7 +352,101 @@ proc obtainBlocksFromPeer(peer: Peer, syncCtx: SyncContext) {.async.} =
syncCtx.handleLostPeer()
break
debug "Nothing to sync"
debug "Fininshed otaining blocks", peer
proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
# Returns true if one of the peers acknowledges existense of the best block
# of another peer.
var
a = a
b = b
if a.state(eth).bestDifficulty < b.state(eth).bestDifficulty:
swap(a, b)
let request = BlocksRequest(
startBlock: HashOrNum(isHash: true,
hash: b.state(eth).bestBlockHash),
maxResults: 1,
skip: 0,
reverse: true)
let latestBlock = await a.getBlockHeaders(request)
result = latestBlock.isSome and latestBlock.get.headers.len > 0
proc randomTrustedPeer(ctx: SyncContext): Peer =
var k = rand(ctx.trustedPeers.len - 1)
var i = 0
for p in ctx.trustedPeers:
result = p
if i == k: return
inc i
proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
if ctx.trustedPeers.len >= minPeersToStartSync:
# We have enough trusted peers. Validate new peer against trusted
if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()):
ctx.trustedPeers.incl(peer)
asyncCheck ctx.obtainBlocksFromPeer(peer)
elif ctx.trustedPeers.len == 0:
# Assume the peer is trusted, but don't start sync until we reevaluate
# it with more peers
debug "Assume trusted peer", peer
ctx.trustedPeers.incl(peer)
else:
# At this point we have some "trusted" candidates, but they are not
# "trusted" enough. We evaluate `peer` against all other candidates.
# If one of the candidates disagrees, we swap it for `peer`. If all
# candidates agree, we add `peer` to trusted set. The peers in the set
# will become "fully trusted" (and sync will start) when the set is big
# enough
var
agreeScore = 0
disagreedPeer: Peer
for tp in ctx.trustedPeers:
if await peersAgreeOnChain(peer, tp):
inc agreeScore
else:
disagreedPeer = tp
let disagreeScore = ctx.trustedPeers.len - agreeScore
if agreeScore == ctx.trustedPeers.len:
ctx.trustedPeers.incl(peer) # The best possible outsome
elif disagreeScore == 1:
info "Peer is no more trusted for sync", peer
ctx.trustedPeers.excl(disagreedPeer)
ctx.trustedPeers.incl(peer)
else:
info "Peer not trusted for sync", peer
if ctx.trustedPeers.len == minPeersToStartSync:
for p in ctx.trustedPeers:
asyncCheck ctx.obtainBlocksFromPeer(p)
proc onPeerConnected(ctx: SyncContext, peer: Peer) =
debug "New candidate for sync", peer
discard
let f = ctx.startSyncWithPeer(peer)
f.callback = proc(data: pointer) =
if f.failed:
error "startSyncWithPeer failed", msg = f.readError.msg, peer
proc onPeerDisconnected(ctx: SyncContext, p: Peer) =
echo "onPeerDisconnected"
ctx.trustedPeers.excl(p)
proc startSync(ctx: SyncContext) =
var po: PeerObserver
po.onPeerConnected = proc(p: Peer) =
ctx.onPeerConnected(p)
po.onPeerDisconnected = proc(p: Peer) =
ctx.onPeerDisconnected(p)
ctx.peerPool.addObserver(ctx, po)
proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) =
var
@ -315,78 +466,8 @@ proc fastBlockchainSync*(node: EthereumNode): Future[SyncStatus] {.async.} =
## Code for the fast blockchain sync procedure:
## https://github.com/ethereum/wiki/wiki/Parallel-Block-Downloads
## https://github.com/ethereum/go-ethereum/pull/1889
var
bestBlockNumber: BlockNumber
debug "start sync"
var (bestPeer, bestBlockDifficulty) = node.findBestPeer()
if bestPeer == nil:
return syncNotEnoughPeers
while true:
let request = BlocksRequest(
startBlock: HashOrNum(isHash: true,
hash: bestPeer.state(eth).bestBlockHash),
maxResults: 1,
skip: 0,
reverse: true)
let latestBlock = await bestPeer.getBlockHeaders(request)
if latestBlock.isSome and latestBlock.get.headers.len > 0:
bestBlockNumber = latestBlock.get.headers[0].blockNumber
break
# TODO: maintain multiple "best peer" candidates and send requests
# to the second best option
bestPeer = node.randomOtherPeer(bestPeer)
if bestPeer == nil:
return syncNotEnoughPeers
# does the network agree with our best block?
var
localChain = node.chain
bestLocalHeader = localChain.getBestBlockHeader
for peer in node.randomPeers(5):
if peer.supports(eth):
let request = BlocksRequest(
startBlock: HashOrNum(isHash: false,
number: bestLocalHeader.blockNumber),
maxResults: 1,
skip: 0,
reverse: true)
# TODO: check if the majority of peers agree with the block
# positioned at our best block number.
# TODO: In case of disagreement, perform a binary search to locate a
# block where we agree.
if bestLocalHeader.blockNumber >= bestBlockNumber:
return syncSuccess
# 4. Start making requests in parallel for the block headers that we are
# missing (by requesting blocks from peers while honoring maxHeadersFetch).
# Make sure the blocks hashes add up. Don't count on everyone replying, ask
# a different peer in case of time-out. Handle invalid or incomplete replies
# properly. The peer may respond with fewer headers than requested (or with
# different ones if the peer is not behaving properly).
var syncCtx = newSyncContext(bestLocalHeader.blockNumber, bestBlockNumber, node.chain)
for peer in node.peers:
if peer.supports(eth):
# TODO: we should also monitor the PeerPool for new peers here and
# we should automatically add them to the loop.
asyncCheck obtainBlocksFromPeer(peer, syncCtx)
# 5. Store the obtained headers in the blockchain DB
# 6. Once the sync is complete, repeat from 1. until to further progress is
# possible
# 7. Start downloading the blockchain state in parallel
# (maybe this could start earlier).
# TODO: This needs a better interface. Consider removing this function and
# exposing SyncCtx
var syncCtx = newSyncContext(node.chain, node.peerPool)
syncCtx.startSync()