Sync-related fixes:
* off-by-one error in the returned range of blocks * larger request time-outs to deal with non-responsive servers * fix an unhandled exception when we fail to deliver a response chunk
This commit is contained in:
parent
6e8a518077
commit
5a93e50b5e
|
@ -491,9 +491,13 @@ proc onBeaconBlock(node: BeaconNode, blck: BeaconBlock) =
|
|||
# The block we received contains attestations, and we might not yet know about
|
||||
# all of them. Let's add them to the attestation pool - in case they block
|
||||
# is not yet resolved, neither will the attestations be!
|
||||
# But please note that we only care about recent attestations.
|
||||
# TODO shouldn't add attestations if the block turns out to be invalid..
|
||||
for attestation in blck.body.attestations:
|
||||
node.onAttestation(attestation)
|
||||
let currentSlot = node.beaconClock.now.toSlot
|
||||
if currentSlot.afterGenesis and
|
||||
blck.slot.epoch + 1 >= currentSlot.slot.epoch:
|
||||
for attestation in blck.body.attestations:
|
||||
node.onAttestation(attestation)
|
||||
|
||||
proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
|
||||
## Perform all attestations that the validators attached to this node should
|
||||
|
|
|
@ -354,7 +354,7 @@ proc getBlockRange*(pool: BlockPool, headBlock: Eth2Digest,
|
|||
|
||||
# Then we see if this aligned position is within our wanted
|
||||
# range. If it's outside it, we must skip more blocks:
|
||||
let lastWantedSlot = startSlot.int + output.len * skipStep
|
||||
let lastWantedSlot = startSlot.int + (output.len - 1) * skipStep
|
||||
if alignedHeadSlot > lastWantedSlot:
|
||||
blocksToSkip += (alignedHeadSlot - lastWantedSlot)
|
||||
|
||||
|
|
|
@ -614,7 +614,10 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||
# the `response` API in the high-level protocols more complicated for now).
|
||||
chronicles.debug "response size limit reached", peer, reqName = `msgNameLit`
|
||||
except CatchableError as `errVar`:
|
||||
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, `errVar`.msg)
|
||||
try:
|
||||
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, `errVar`.msg)
|
||||
except CatchableError as err:
|
||||
debug "Failed to deliver error response", peer = `peerVar`
|
||||
|
||||
##
|
||||
## Implement Senders and Handshake
|
||||
|
|
|
@ -104,7 +104,9 @@ p2pProtocol BeaconSync(version = 1,
|
|||
let
|
||||
node = peer.networkState.node
|
||||
ourStatus = node.getCurrentStatus
|
||||
theirStatus = await peer.status(ourStatus)
|
||||
# TODO: The timeout here is so high only because we fail to
|
||||
# respond in time due to high CPU load in our single thread.
|
||||
theirStatus = await peer.status(ourStatus, timeout = 60.seconds)
|
||||
|
||||
if theirStatus.isSome:
|
||||
await peer.handleInitialStatus(node, ourStatus, theirStatus.get)
|
||||
|
@ -186,8 +188,9 @@ proc handleInitialStatus(peer: Peer,
|
|||
libp2p_peers.set peer.network.peers.len.int64
|
||||
|
||||
debug "Peer connected. Initiating sync", peer,
|
||||
headSlot = ourStatus.headSlot,
|
||||
remoteHeadSlot = theirStatus.headSlot
|
||||
localHeadSlot = ourStatus.headSlot,
|
||||
remoteHeadSlot = theirStatus.headSlot,
|
||||
remoteHeadRoot = theirStatus.headRoot
|
||||
|
||||
let bestDiff = cmp((ourStatus.finalizedEpoch, ourStatus.headSlot),
|
||||
(theirStatus.finalizedEpoch, theirStatus.headSlot))
|
||||
|
@ -207,8 +210,11 @@ proc handleInitialStatus(peer: Peer,
|
|||
ourHeadSlot = s,
|
||||
numBlocksToRequest
|
||||
|
||||
# TODO: The timeout here is so high only because we fail to
|
||||
# respond in time due to high CPU load in our single thread.
|
||||
let blocks = await peer.beaconBlocksByRange(theirStatus.headRoot, s,
|
||||
numBlocksToRequest, 1'u64)
|
||||
numBlocksToRequest, 1'u64,
|
||||
timeout = 60.seconds)
|
||||
if blocks.isSome:
|
||||
info "got blocks", total = blocks.get.len
|
||||
if blocks.get.len == 0:
|
||||
|
|
Loading…
Reference in New Issue