diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index 73866720c..8c7a6d1bf 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -23,6 +23,10 @@ type node*: BeaconNode db*: BeaconChainDB +const + MaxRootsToRequest = 512 + MaxHeadersToRequest = MaxRootsToRequest + func toHeader(b: BeaconBlock): BeaconBlockHeaderRLP = BeaconBlockHeaderRLP( slot: b.slot.uint64, @@ -77,12 +81,13 @@ p2pProtocol BeaconSync(version = 1, networkId = peer.networkState.networkId blockPool = node.blockPool latestState = blockPool.latestState() + headBlock = blockPool.head var latestFinalizedRoot: Eth2Digest # TODO latestFinalizedEpoch = latestState.finalized_epoch bestRoot: Eth2Digest # TODO - bestSlot = latestState.slot + bestSlot = headBlock.slot let m = await handshake(peer, timeout = 10.seconds, status(networkId, latestFinalizedRoot, @@ -97,42 +102,54 @@ p2pProtocol BeaconSync(version = 1, # where it needs to sync and it should execute the sync algorithm with a certain # number of randomly selected peers. The algorithm itself must be extracted in a proc. try: - debug "Peer connected. Initiating sync", peer + debug "Peer connected. Initiating sync", peer, bestSlot, remoteBestSlot = m.bestSlot let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot)) - if bestDiff == 0: + if bestDiff >= 0: # Nothing to do? trace "Nothing to sync", peer = peer.remote else: # TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the # connection if it's too big. - if bestDiff > 0: - # Send roots - # TODO: Currently we send all block roots in one "packet". Maybe - # they should be split to multiple packets. - type Root = (Eth2Digest, Slot) - var roots = newSeqOfCap[Root](128) - for i in int(m.bestSlot) + 1 .. int(bestSlot): - for r in blockPool.blockRootsForSlot(i.Slot): - roots.add((r, i.Slot)) + var s = bestSlot + 1 + while s <= m.bestSlot: + debug "Waiting for block roots", fromPeer = peer, remoteBestSlot = m.bestSlot, peer + let r = await peer.getBeaconBlockRoots(s, MaxRootsToRequest) + if not r.isSome: + debug "Block roots not received", peer + break + let roots = r.get.roots + debug "Received block roots", len = roots.len, peer + if roots.len != 0: + if roots.len > MaxRootsToRequest: + # Attack? + await peer.disconnect(BreachOfProtocol, true) + break - debug "Sending block roots", peer, coveredSlots = roots.len - await peer.beaconBlockRoots(roots) - else: - # Receive roots - debug "Waiting for block roots", fromPeer = peer - let roots = await peer.nextMsg(BeaconSync.beaconBlockRoots) + let headers = await peer.getBeaconBlockHeaders(bestRoot, s, roots.len, 0) + var bodiesRequest = newSeqOfCap[Eth2Digest](roots.len) + for r in roots: + bodiesRequest.add(r[0]) - debug "Block roots received. Requesting block headers", bestRoot, bestSlot - let headers = await peer.getBeaconBlockHeaders(bestRoot, bestSlot, roots.roots.len, 0) - var bodiesRequest = newSeqOfCap[Eth2Digest](roots.roots.len) - for r in roots.roots: - bodiesRequest.add(r[0]) + debug "Block headers received. Requesting block bodies", peer + let bodies = await peer.getBeaconBlockBodies(bodiesRequest) + node.importBlocks(roots, headers.get.blockHeaders, bodies.get.blockBodies) - debug "Block headers received. Requesting block bodies", blocks = bodiesRequest - let bodies = await peer.getBeaconBlockBodies(bodiesRequest) - node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies) + let lastSlot = roots[^1][1] + if roots.len == MaxRootsToRequest: + # Next batch of roots starts with the last slot of the current one + # to make sure we did not miss any roots with this slot that did + # not fit into the response. + + if s == lastSlot: + info "Too many roots for a single slot while syncing" + break + s = lastSlot + else: + s = lastSlot + 1 + else: + break except CatchableError: warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg() @@ -145,9 +162,21 @@ p2pProtocol BeaconSync(version = 1, bestRoot: Eth2Digest, bestSlot: Slot) {.libp2pProtocol("hello", "1.0.0").} - proc beaconBlockRoots( - peer: Peer, - roots: openarray[(Eth2Digest, Slot)]) {.libp2pProtocol("rpc/beacon_block_roots", "1.0.0").} + requestResponse: + proc getBeaconBlockRoots(peer: Peer, fromSlot: Slot, maxRoots: int) = + let maxRoots = min(MaxRootsToRequest, maxRoots) + var s = fromSlot + var roots = newSeqOfCap[(Eth2Digest, Slot)](maxRoots) + let blockPool = peer.networkState.node.blockPool + let maxSlot = blockPool.head.slot + while s <= maxSlot: + for r in blockPool.blockRootsForSlot(s): + roots.add((r, s)) + if roots.len == maxRoots: break + s += 1 + await response.send(roots) + + proc beaconBlockRoots(peer: Peer, roots: openarray[(Eth2Digest, Slot)]) requestResponse: proc getBeaconBlockHeaders( @@ -156,16 +185,18 @@ p2pProtocol BeaconSync(version = 1, slot: Slot, maxHeaders: int, skipSlots: int) {.libp2pProtocol("rpc/beacon_block_headers", "1.0.0").} = - # TODO: validate maxHeaders and implement slipSlots - var s = slot.int + # TODO: validate implement slipSlots + let maxHeaders = min(MaxHeadersToRequest, maxHeaders) + var s = slot var headers = newSeqOfCap[BeaconBlockHeaderRLP](maxHeaders) let db = peer.networkState.db let blockPool = peer.networkState.node.blockPool - while headers.len < maxHeaders: - for r in blockPool.blockRootsForSlot(s.Slot): + let maxSlot = blockPool.head.slot + while s <= maxSlot: + for r in blockPool.blockRootsForSlot(s): headers.add(db.getBlock(r).get().toHeader) if headers.len == maxHeaders: break - inc s + s += 1 await response.send(headers) proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeaderRLP])