Merge pull request #245 from status-im/chunked-sync2

Chunked sync second try
This commit is contained in:
Yuriy Glukhov 2019-04-10 21:16:38 +03:00 committed by GitHub
commit 02daef60c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -23,6 +23,10 @@ type
node*: BeaconNode node*: BeaconNode
db*: BeaconChainDB db*: BeaconChainDB
const
MaxRootsToRequest = 512
MaxHeadersToRequest = MaxRootsToRequest
func toHeader(b: BeaconBlock): BeaconBlockHeaderRLP = func toHeader(b: BeaconBlock): BeaconBlockHeaderRLP =
BeaconBlockHeaderRLP( BeaconBlockHeaderRLP(
slot: b.slot.uint64, slot: b.slot.uint64,
@ -77,12 +81,13 @@ p2pProtocol BeaconSync(version = 1,
networkId = peer.networkState.networkId networkId = peer.networkState.networkId
blockPool = node.blockPool blockPool = node.blockPool
latestState = blockPool.latestState() latestState = blockPool.latestState()
headBlock = blockPool.head
var var
latestFinalizedRoot: Eth2Digest # TODO latestFinalizedRoot: Eth2Digest # TODO
latestFinalizedEpoch = latestState.finalized_epoch latestFinalizedEpoch = latestState.finalized_epoch
bestRoot: Eth2Digest # TODO bestRoot: Eth2Digest # TODO
bestSlot = latestState.slot bestSlot = headBlock.slot
let m = await handshake(peer, timeout = 10.seconds, let m = await handshake(peer, timeout = 10.seconds,
status(networkId, latestFinalizedRoot, status(networkId, latestFinalizedRoot,
@ -97,42 +102,54 @@ p2pProtocol BeaconSync(version = 1,
# where it needs to sync and it should execute the sync algorithm with a certain # where it needs to sync and it should execute the sync algorithm with a certain
# number of randomly selected peers. The algorithm itself must be extracted in a proc. # number of randomly selected peers. The algorithm itself must be extracted in a proc.
try: try:
debug "Peer connected. Initiating sync", peer debug "Peer connected. Initiating sync", peer, bestSlot, remoteBestSlot = m.bestSlot
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot)) let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot))
if bestDiff == 0: if bestDiff >= 0:
# Nothing to do? # Nothing to do?
trace "Nothing to sync", peer = peer.remote trace "Nothing to sync", peer = peer.remote
else: else:
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the # TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the
# connection if it's too big. # connection if it's too big.
if bestDiff > 0: var s = bestSlot + 1
# Send roots while s <= m.bestSlot:
# TODO: Currently we send all block roots in one "packet". Maybe debug "Waiting for block roots", fromPeer = peer, remoteBestSlot = m.bestSlot, peer
# they should be split to multiple packets. let r = await peer.getBeaconBlockRoots(s, MaxRootsToRequest)
type Root = (Eth2Digest, Slot) if not r.isSome:
var roots = newSeqOfCap[Root](128) debug "Block roots not received", peer
for i in int(m.bestSlot) + 1 .. int(bestSlot): break
for r in blockPool.blockRootsForSlot(i.Slot): let roots = r.get.roots
roots.add((r, i.Slot)) debug "Received block roots", len = roots.len, peer
if roots.len != 0:
if roots.len > MaxRootsToRequest:
# Attack?
await peer.disconnect(BreachOfProtocol, true)
break
debug "Sending block roots", peer, coveredSlots = roots.len let headers = await peer.getBeaconBlockHeaders(bestRoot, s, roots.len, 0)
await peer.beaconBlockRoots(roots) var bodiesRequest = newSeqOfCap[Eth2Digest](roots.len)
else: for r in roots:
# Receive roots bodiesRequest.add(r[0])
debug "Waiting for block roots", fromPeer = peer
let roots = await peer.nextMsg(BeaconSync.beaconBlockRoots)
debug "Block roots received. Requesting block headers", bestRoot, bestSlot debug "Block headers received. Requesting block bodies", peer
let headers = await peer.getBeaconBlockHeaders(bestRoot, bestSlot, roots.roots.len, 0) let bodies = await peer.getBeaconBlockBodies(bodiesRequest)
var bodiesRequest = newSeqOfCap[Eth2Digest](roots.roots.len) node.importBlocks(roots, headers.get.blockHeaders, bodies.get.blockBodies)
for r in roots.roots:
bodiesRequest.add(r[0])
debug "Block headers received. Requesting block bodies", blocks = bodiesRequest let lastSlot = roots[^1][1]
let bodies = await peer.getBeaconBlockBodies(bodiesRequest) if roots.len == MaxRootsToRequest:
node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies) # Next batch of roots starts with the last slot of the current one
# to make sure we did not miss any roots with this slot that did
# not fit into the response.
if s == lastSlot:
info "Too many roots for a single slot while syncing"
break
s = lastSlot
else:
s = lastSlot + 1
else:
break
except CatchableError: except CatchableError:
warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg() warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg()
@ -145,9 +162,21 @@ p2pProtocol BeaconSync(version = 1,
bestRoot: Eth2Digest, bestRoot: Eth2Digest,
bestSlot: Slot) {.libp2pProtocol("hello", "1.0.0").} bestSlot: Slot) {.libp2pProtocol("hello", "1.0.0").}
proc beaconBlockRoots( requestResponse:
peer: Peer, proc getBeaconBlockRoots(peer: Peer, fromSlot: Slot, maxRoots: int) =
roots: openarray[(Eth2Digest, Slot)]) {.libp2pProtocol("rpc/beacon_block_roots", "1.0.0").} let maxRoots = min(MaxRootsToRequest, maxRoots)
var s = fromSlot
var roots = newSeqOfCap[(Eth2Digest, Slot)](maxRoots)
let blockPool = peer.networkState.node.blockPool
let maxSlot = blockPool.head.slot
while s <= maxSlot:
for r in blockPool.blockRootsForSlot(s):
roots.add((r, s))
if roots.len == maxRoots: break
s += 1
await response.send(roots)
proc beaconBlockRoots(peer: Peer, roots: openarray[(Eth2Digest, Slot)])
requestResponse: requestResponse:
proc getBeaconBlockHeaders( proc getBeaconBlockHeaders(
@ -156,16 +185,18 @@ p2pProtocol BeaconSync(version = 1,
slot: Slot, slot: Slot,
maxHeaders: int, maxHeaders: int,
skipSlots: int) {.libp2pProtocol("rpc/beacon_block_headers", "1.0.0").} = skipSlots: int) {.libp2pProtocol("rpc/beacon_block_headers", "1.0.0").} =
# TODO: validate maxHeaders and implement slipSlots # TODO: validate implement slipSlots
var s = slot.int let maxHeaders = min(MaxHeadersToRequest, maxHeaders)
var s = slot
var headers = newSeqOfCap[BeaconBlockHeaderRLP](maxHeaders) var headers = newSeqOfCap[BeaconBlockHeaderRLP](maxHeaders)
let db = peer.networkState.db let db = peer.networkState.db
let blockPool = peer.networkState.node.blockPool let blockPool = peer.networkState.node.blockPool
while headers.len < maxHeaders: let maxSlot = blockPool.head.slot
for r in blockPool.blockRootsForSlot(s.Slot): while s <= maxSlot:
for r in blockPool.blockRootsForSlot(s):
headers.add(db.getBlock(r).get().toHeader) headers.add(db.getBlock(r).get().toHeader)
if headers.len == maxHeaders: break if headers.len == maxHeaders: break
inc s s += 1
await response.send(headers) await response.send(headers)
proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeaderRLP]) proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeaderRLP])