From cdf9baf5e6a82d3f69762462206f7f8e6d5fea01 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Tue, 9 Apr 2019 02:19:33 +0300 Subject: [PATCH] Revert "Chunked sync" This reverts commit c84d930304822a0f4bc9398ca61c8c48ae18745e. --- beacon_chain/sync_protocol.nim | 79 +++++++++++++++------------------- 1 file changed, 34 insertions(+), 45 deletions(-) diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index 29a868adc..73866720c 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -77,13 +77,12 @@ p2pProtocol BeaconSync(version = 1, networkId = peer.networkState.networkId blockPool = node.blockPool latestState = blockPool.latestState() - headBlock = blockPool.head var latestFinalizedRoot: Eth2Digest # TODO latestFinalizedEpoch = latestState.finalized_epoch bestRoot: Eth2Digest # TODO - bestSlot = headBlock.slot + bestSlot = latestState.slot let m = await handshake(peer, timeout = 10.seconds, status(networkId, latestFinalizedRoot, @@ -98,37 +97,42 @@ p2pProtocol BeaconSync(version = 1, # where it needs to sync and it should execute the sync algorithm with a certain # number of randomly selected peers. The algorithm itself must be extracted in a proc. try: - debug "Peer connected. Initiating sync", peer, bestSlot, remoteBestSlot = m.bestSlot + debug "Peer connected. Initiating sync", peer let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot)) - if bestDiff >= 0: + if bestDiff == 0: # Nothing to do? trace "Nothing to sync", peer = peer.remote else: # TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the # connection if it's too big. - var s = bestSlot + 1 - while s <= m.bestSlot: - debug "Waiting for block roots", fromPeer = peer, remoteBestSlot = m.bestSlot, peer - let r = await peer.getBeaconBlockRoots(s, 512) - if not r.isSome: - debug "Block roots not received", peer - break - let roots = r.get.roots - debug "Received block roots", len = roots.len, peer - if roots.len != 0: - let headers = await peer.getBeaconBlockHeaders(bestRoot, s, roots.len, 0) - var bodiesRequest = newSeqOfCap[Eth2Digest](roots.len) - for r in roots: - bodiesRequest.add(r[0]) + if bestDiff > 0: + # Send roots + # TODO: Currently we send all block roots in one "packet". Maybe + # they should be split to multiple packets. + type Root = (Eth2Digest, Slot) + var roots = newSeqOfCap[Root](128) + for i in int(m.bestSlot) + 1 .. int(bestSlot): + for r in blockPool.blockRootsForSlot(i.Slot): + roots.add((r, i.Slot)) - debug "Block headers received. Requesting block bodies", peer - let bodies = await peer.getBeaconBlockBodies(bodiesRequest) - node.importBlocks(roots, headers.get.blockHeaders, bodies.get.blockBodies) - s = blockPool.head.slot + 1 - else: - break + debug "Sending block roots", peer, coveredSlots = roots.len + await peer.beaconBlockRoots(roots) + else: + # Receive roots + debug "Waiting for block roots", fromPeer = peer + let roots = await peer.nextMsg(BeaconSync.beaconBlockRoots) + + debug "Block roots received. Requesting block headers", bestRoot, bestSlot + let headers = await peer.getBeaconBlockHeaders(bestRoot, bestSlot, roots.roots.len, 0) + var bodiesRequest = newSeqOfCap[Eth2Digest](roots.roots.len) + for r in roots.roots: + bodiesRequest.add(r[0]) + + debug "Block headers received. Requesting block bodies", blocks = bodiesRequest + let bodies = await peer.getBeaconBlockBodies(bodiesRequest) + node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies) except CatchableError: warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg() @@ -141,24 +145,9 @@ p2pProtocol BeaconSync(version = 1, bestRoot: Eth2Digest, bestSlot: Slot) {.libp2pProtocol("hello", "1.0.0").} - requestResponse: - proc getBeaconBlockRoots(peer: Peer, fromSlot: Slot, maxRoots: int) = - debug "GetBeaconBlockRoots" - doAssert(maxRoots <= 512) # TODO: Validate maxRoots properly - var s = fromSlot - var roots = newSeqOfCap[(Eth2Digest, Slot)](maxRoots) - let db = peer.networkState.db - let blockPool = peer.networkState.node.blockPool - let maxSlot = blockPool.head.slot - while s <= maxSlot: - for r in blockPool.blockRootsForSlot(s): - roots.add((r, s)) - if roots.len == maxRoots: break - s += 1 - debug "Sending roots", len = roots.len - await response.send(roots) - - proc beaconBlockRoots(peer: Peer, roots: openarray[(Eth2Digest, Slot)]) + proc beaconBlockRoots( + peer: Peer, + roots: openarray[(Eth2Digest, Slot)]) {.libp2pProtocol("rpc/beacon_block_roots", "1.0.0").} requestResponse: proc getBeaconBlockHeaders( @@ -168,15 +157,15 @@ p2pProtocol BeaconSync(version = 1, maxHeaders: int, skipSlots: int) {.libp2pProtocol("rpc/beacon_block_headers", "1.0.0").} = # TODO: validate maxHeaders and implement slipSlots - var s = slot + var s = slot.int var headers = newSeqOfCap[BeaconBlockHeaderRLP](maxHeaders) let db = peer.networkState.db let blockPool = peer.networkState.node.blockPool while headers.len < maxHeaders: - for r in blockPool.blockRootsForSlot(s): + for r in blockPool.blockRootsForSlot(s.Slot): headers.add(db.getBlock(r).get().toHeader) if headers.len == maxHeaders: break - s += 1 + inc s await response.send(headers) proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeaderRLP])