More sync fixes
* Fix IncompleteData issues brought by the new spec-compliant stream closing * Fix logic errors in the sync algorithms
This commit is contained in:
parent
886b92319f
commit
cdff79ec6d
|
@ -117,7 +117,6 @@ proc init*(T: type BlockPool, db: BeaconChainDB): BlockPool =
|
|||
doAssert justifiedHead.slot >= finalizedHead.slot,
|
||||
"justified head comes before finalized head - database corrupt?"
|
||||
|
||||
|
||||
BlockPool(
|
||||
pending: initTable[Eth2Digest, BeaconBlock](),
|
||||
missing: initTable[Eth2Digest, MissingBlock](),
|
||||
|
@ -299,6 +298,63 @@ proc getRef*(pool: BlockPool, root: Eth2Digest): BlockRef =
|
|||
## Retrieve a resolved block reference, if available
|
||||
pool.blocks.getOrDefault(root)
|
||||
|
||||
proc getBlockRange*(pool: BlockPool, headBlock: Eth2Digest,
|
||||
startSlot: Slot, skipStep: Natural,
|
||||
output: var openarray[BlockRef]): Natural =
|
||||
## This function populates an `output` buffer of blocks
|
||||
## with a range starting from `startSlot` and skipping
|
||||
## every `skipTest` number of blocks.
|
||||
##
|
||||
## Please note that the function may not necessarily
|
||||
## populate the entire buffer. The values will be written
|
||||
## in a way such that the last block is placed at the end
|
||||
## of the buffer while the first indices of the buffer
|
||||
## may remain unwritten.
|
||||
##
|
||||
## The result value of the function will be the index of
|
||||
## the first block in the resulting buffer. If no values
|
||||
## were written to the buffer, the result will be equal to
|
||||
## `buffer.len`. In other words, you can use the function
|
||||
## like this:
|
||||
##
|
||||
## var buffer: array[N, BlockRef]
|
||||
## let startPos = pool.getBlockRange(headBlock, startSlot, skipStep, buffer)
|
||||
## for i in startPos ..< buffer.len:
|
||||
## echo buffer[i].slot
|
||||
##
|
||||
result = output.len
|
||||
|
||||
var b = pool.getRef(headBlock)
|
||||
if b == nil or b.slot < startSlot:
|
||||
return
|
||||
|
||||
template skip(n: int) =
|
||||
for i in 0 ..< n:
|
||||
b = b.parent
|
||||
if b == nil: return
|
||||
|
||||
# We must compute the last block that is eligible for inclusion
|
||||
# in the results. This will be a block with a slot number that's
|
||||
# aligned to the stride of the requested block range, so we first
|
||||
# compute the steps needed to get to an aligned position:
|
||||
var blocksToSkip = b.slot.int mod skipStep
|
||||
let alignedHeadSlot = b.slot.int - blocksToSkip
|
||||
|
||||
# Then we see if this aligned position is within our wanted
|
||||
# range. If it's outside it, we must skip more blocks:
|
||||
let lastWantedSlot = startSlot.int + output.len * skipStep
|
||||
if alignedHeadSlot > lastWantedSlot:
|
||||
blocksToSkip += (alignedHeadSlot - lastWantedSlot)
|
||||
|
||||
# Finally, we skip the computed number of blocks
|
||||
skip blocksToSkip
|
||||
|
||||
# From here, we can just write out the requested block range:
|
||||
while b != nil and result > 0:
|
||||
dec result
|
||||
output[result] = b
|
||||
skip skipStep
|
||||
|
||||
proc get*(pool: BlockPool, blck: BlockRef): BlockData =
|
||||
## Retrieve the associated block body of a block reference
|
||||
doAssert (not blck.isNil), "Trying to get nil BlockRef"
|
||||
|
|
|
@ -209,42 +209,42 @@ proc readSizePrefix(transp: StreamTransport,
|
|||
proc readMsgBytes(stream: P2PStream,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Bytes] {.async.} =
|
||||
trace "reading msg bytes", withResponseCode
|
||||
if withResponseCode:
|
||||
var responseCode: byte
|
||||
var readResponseCode = stream.transp.readExactly(addr responseCode, 1)
|
||||
await readResponseCode or deadline
|
||||
if not readResponseCode.finished:
|
||||
try:
|
||||
if withResponseCode:
|
||||
var responseCode: byte
|
||||
var readResponseCode = stream.transp.readExactly(addr responseCode, 1)
|
||||
await readResponseCode or deadline
|
||||
if not readResponseCode.finished:
|
||||
return
|
||||
if responseCode > ResponseCode.high.byte: return
|
||||
|
||||
logScope: responseCode = ResponseCode(responseCode)
|
||||
case ResponseCode(responseCode)
|
||||
of InvalidRequest, ServerError:
|
||||
let responseErrMsg = await readChunk(stream, string, false, deadline)
|
||||
debug "P2P request resulted in error", responseErrMsg
|
||||
return
|
||||
of Success:
|
||||
# The response is OK, the execution continues below
|
||||
discard
|
||||
|
||||
var sizePrefix = await readSizePrefix(stream.transp, deadline)
|
||||
if sizePrefix < -1:
|
||||
debug "Failed to read an incoming message size prefix", peer = stream.peer
|
||||
return
|
||||
if responseCode > ResponseCode.high.byte: return
|
||||
|
||||
logScope: responseCode = ResponseCode(responseCode)
|
||||
case ResponseCode(responseCode)
|
||||
of InvalidRequest, ServerError:
|
||||
let responseErrMsg = await readChunk(stream, string, false, deadline)
|
||||
debug "P2P request resulted in error", responseErrMsg
|
||||
if sizePrefix == 0:
|
||||
debug "Received SSZ with zero size", peer = stream.peer
|
||||
return
|
||||
of Success:
|
||||
# The response is OK, the execution continues below
|
||||
discard
|
||||
|
||||
var sizePrefix = await readSizePrefix(stream.transp, deadline)
|
||||
if sizePrefix < -1:
|
||||
debug "Failed to read an incoming message size prefix", peer = stream.peer
|
||||
return
|
||||
var msgBytes = newSeq[byte](sizePrefix)
|
||||
var readBody = stream.transp.readExactly(addr msgBytes[0], sizePrefix)
|
||||
await readBody or deadline
|
||||
if not readBody.finished: return
|
||||
|
||||
trace "got size prefix", sizePrefix
|
||||
if sizePrefix == 0:
|
||||
debug "Received SSZ with zero size", peer = stream.peer
|
||||
return
|
||||
|
||||
var msgBytes = newSeq[byte](sizePrefix)
|
||||
var readBody = stream.transp.readExactly(addr msgBytes[0], sizePrefix)
|
||||
await readBody or deadline
|
||||
if not readBody.finished: return
|
||||
|
||||
trace "got msg bytes", msgBytes
|
||||
return msgBytes
|
||||
return msgBytes
|
||||
except TransportIncompleteError:
|
||||
return @[]
|
||||
|
||||
proc readChunk(stream: P2PStream,
|
||||
MsgType: type,
|
||||
|
@ -269,7 +269,6 @@ proc readResponse(
|
|||
var results: MsgType
|
||||
while true:
|
||||
let nextRes = await readChunk(stream, E, true, deadline)
|
||||
trace "got response chunk", nextRes
|
||||
if nextRes.isNone: break
|
||||
results.add nextRes.get
|
||||
if results.len > 0:
|
||||
|
|
|
@ -20,10 +20,14 @@ proc fetchAncestorBlocksFromPeer(
|
|||
# instead. In order to do this, we'll need the slot number of the known
|
||||
# block to be stored in the FetchRecord, so we can ask for a range of
|
||||
# blocks starting N positions before this slot number.
|
||||
let blocks = await peer.beaconBlocksByRoot([rec.root])
|
||||
if blocks.isSome:
|
||||
for b in blocks.get:
|
||||
responseHandler(b)
|
||||
try:
|
||||
let blocks = await peer.beaconBlocksByRoot([rec.root])
|
||||
if blocks.isSome:
|
||||
for b in blocks.get:
|
||||
responseHandler(b)
|
||||
except CatchableError as err:
|
||||
debug "Error while fetching ancestor blocks",
|
||||
err = err.msg, root = rec.root, peer
|
||||
|
||||
proc fetchAncestorBlocks*(requestManager: RequestManager,
|
||||
roots: seq[FetchRecord],
|
||||
|
|
|
@ -36,7 +36,7 @@ type
|
|||
slot: Slot
|
||||
|
||||
const
|
||||
maxBlocksToRequest = 16'u64
|
||||
maxBlocksToRequest = 64'u64
|
||||
MaxAncestorBlocksResponse = 256
|
||||
|
||||
func toHeader(b: BeaconBlock): BeaconBlockHeader =
|
||||
|
@ -125,45 +125,27 @@ p2pProtocol BeaconSync(version = 1,
|
|||
|
||||
proc helloResp(peer: Peer, msg: HelloMsg) {.libp2pProtocol("hello", 1).}
|
||||
|
||||
proc goodbye(
|
||||
peer: Peer,
|
||||
reason: DisconnectionReason) {.
|
||||
libp2pProtocol("goodbye", 1).}
|
||||
proc goodbye(peer: Peer, reason: DisconnectionReason) {.libp2pProtocol("goodbye", 1).}
|
||||
|
||||
requestResponse:
|
||||
proc beaconBlocksByRange(
|
||||
peer: Peer,
|
||||
headBlockRoot: Eth2Digest,
|
||||
start_slot: Slot,
|
||||
startSlot: Slot,
|
||||
count: uint64,
|
||||
step: uint64) {.
|
||||
libp2pProtocol("beacon_blocks_by_range", 1).} =
|
||||
# `step == 0` has no sense, so we will return empty array of blocks.
|
||||
# `count == 0` means that empty array of blocks requested.
|
||||
#
|
||||
# Current version of network specification do not cover case when
|
||||
# `start_slot` is empty, in such case we will return next available slot
|
||||
# which is follows `start_slot + step` sequence. For example for, if
|
||||
# `start_slot` is 2 and `step` is 2 and slots 2, 4, 6 are not available,
|
||||
# then [8, 10, ...] will be returned.
|
||||
var sentBlocksCount = 0
|
||||
if step > 0'u64 and count > 0'u64:
|
||||
|
||||
if count > 0'u64:
|
||||
let count = if step != 0: min(count, maxBlocksToRequest.uint64) else: 1
|
||||
let pool = peer.networkState.node.blockPool
|
||||
var blck = pool.getRef(headBlockRoot)
|
||||
var slot = start_slot
|
||||
while not(isNil(blck)):
|
||||
if blck.slot == slot:
|
||||
await response.write(pool.get(blck).data)
|
||||
inc sentBlocksCount
|
||||
slot = slot + step
|
||||
elif blck.slot > slot:
|
||||
if (blck.slot - slot) mod step == 0:
|
||||
await response.write(pool.get(blck).data)
|
||||
inc sentBlocksCount
|
||||
slot = slot + ((blck.slot - slot) div step + 1) * step
|
||||
if uint64(sentBlocksCount) == count:
|
||||
break
|
||||
blck = blck.parent
|
||||
var results: array[maxBlocksToRequest, BlockRef]
|
||||
let
|
||||
lastPos = min(count.int, results.len) - 1
|
||||
firstPos = pool.getBlockRange(headBlockRoot, startSlot, step,
|
||||
results.toOpenArray(0, lastPos))
|
||||
for i in firstPos.int .. lastPos.int:
|
||||
await response.write(pool.get(results[i]).data)
|
||||
|
||||
proc beaconBlocksByRoot(
|
||||
peer: Peer,
|
||||
|
@ -210,21 +192,24 @@ proc handleInitialHello(peer: Peer,
|
|||
else:
|
||||
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the
|
||||
# connection if it's too big.
|
||||
|
||||
var s = ourHello.headSlot + 1
|
||||
var theirHello = theirHello
|
||||
while s <= theirHello.headSlot:
|
||||
debug "Waiting for block headers", peer,
|
||||
remoteHeadSlot = theirHello.headSlot
|
||||
|
||||
let numBlocksToRequest = min(uint64(theirHello.headSlot - s),
|
||||
maxBlocksToRequest)
|
||||
let blocks = await peer.beaconBlocksByRange(ourHello.headRoot, s,
|
||||
|
||||
debug "Requesting blocks", peer, remoteHeadSlot = theirHello.headSlot,
|
||||
ourHeadSlot = s,
|
||||
numBlocksToRequest
|
||||
|
||||
let blocks = await peer.beaconBlocksByRange(theirHello.headRoot, s,
|
||||
numBlocksToRequest, 1'u64)
|
||||
if blocks.isSome:
|
||||
info "got blocks", total = blocks.get.len
|
||||
if blocks.get.len == 0:
|
||||
info "Got 0 blocks while syncing", peer
|
||||
break
|
||||
|
||||
node.importBlocks blocks.get
|
||||
let lastSlot = blocks.get[^1].slot
|
||||
if lastSlot <= s:
|
||||
|
@ -244,6 +229,7 @@ proc handleInitialHello(peer: Peer,
|
|||
# syncing will be interrupted.
|
||||
discard
|
||||
else:
|
||||
error "didn't got objectes in time"
|
||||
break
|
||||
|
||||
except CatchableError:
|
||||
|
|
Loading…
Reference in New Issue