harden and speed up block sync (#3358)

* harden and speed up block sync

The `GetBlockBy*` server implementation currently reads SSZ bytes from
database, deserializes them into a Nim object then serializes them right
back to SSZ - here, we eliminate the deser/ser steps and send the bytes
straight to the network. Unfortunately, the snappy recoding must still
be done because of differences in framing.

Also, the quota system makes one giant request for quota right before
sending all blocks - this means that a 1024 block request will be
"paused" for a long time, then all blocks will be sent at once causing a
spike in database reads which potentially will see the reading client
time out before any block is sent.

Finally, on the reading side we make several copies of blocks as they
travel through various queues - this was not noticeable before but
becomes a problem in two cases: bellatrix blocks are up to 10mb (instead
of .. 30-40kb) and when backfilling, we process a lot more of them a lot
faster.

* fix status comparisons for nodes syncing from genesis (#3327 was a bit
too hard)
* don't hit database at all for post-altair slots in GetBlock v1
requests
This commit is contained in:
Jacek Sieka 2022-02-07 18:20:10 +01:00 committed by GitHub
parent bf3ef987e4
commit c7abc97545
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 248 additions and 194 deletions

View File

@ -502,8 +502,8 @@ proc getRequestProtoName(fn: NimNode): NimNode =
proc writeChunk*(conn: Connection, proc writeChunk*(conn: Connection,
responseCode: Option[ResponseCode], responseCode: Option[ResponseCode],
payload: Bytes, payload: openArray[byte],
contextBytes: openarray[byte] = []): Future[void] = contextBytes: openArray[byte] = []): Future[void] =
var output = memoryOutput() var output = memoryOutput()
try: try:
@ -558,13 +558,14 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
finally: finally:
await stream.close() await stream.close()
proc sendResponseChunkBytes(response: UntypedResponse, payload: Bytes): Future[void] = proc sendResponseChunkBytes(
response: UntypedResponse, payload: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] =
inc response.writtenChunks inc response.writtenChunks
response.stream.writeChunk(some Success, payload) response.stream.writeChunk(some Success, payload, contextBytes)
proc sendResponseChunk*(response: UntypedResponse, val: auto): Future[void] = proc sendResponseChunk*(response: UntypedResponse, val: auto): Future[void] =
inc response.writtenChunks sendResponseChunkBytes(response, SSZ.encode(val))
response.stream.writeChunk(some Success, SSZ.encode(val))
template sendUserHandlerResultAsChunkImpl*(stream: Connection, template sendUserHandlerResultAsChunkImpl*(stream: Connection,
handlerResultFut: Future): untyped = handlerResultFut: Future): untyped =
@ -617,6 +618,11 @@ template write*[M](r: MultipleChunksResponse[M], val: M): untyped =
mixin sendResponseChunk mixin sendResponseChunk
sendResponseChunk(UntypedResponse(r), val) sendResponseChunk(UntypedResponse(r), val)
template writeRawBytes*[M](
r: MultipleChunksResponse[M], bytes: openArray[byte],
contextBytes: openArray[byte]): untyped =
sendResponseChunkBytes(UntypedResponse(r), bytes, contextBytes)
template send*[M](r: SingleChunkResponse[M], val: M): untyped = template send*[M](r: SingleChunkResponse[M], val: M): untyped =
mixin sendResponseChunk mixin sendResponseChunk
doAssert UntypedResponse(r).writtenChunks == 0 doAssert UntypedResponse(r).writtenChunks == 0

View File

@ -53,13 +53,13 @@ proc init*(T: type RequestManager, network: Eth2Node,
) )
proc checkResponse(roots: openArray[Eth2Digest], proc checkResponse(roots: openArray[Eth2Digest],
blocks: openArray[ForkedSignedBeaconBlock]): bool = blocks: openArray[ref ForkedSignedBeaconBlock]): bool =
## This procedure checks peer's response. ## This procedure checks peer's response.
var checks = @roots var checks = @roots
if len(blocks) > len(roots): if len(blocks) > len(roots):
return false return false
for blk in blocks: for blk in blocks:
let res = checks.find(blk.root) let res = checks.find(blk[].root)
if res == -1: if res == -1:
return false return false
else: else:
@ -75,10 +75,11 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
peer_score = peer.getScore() peer_score = peer.getScore()
let blocks = if peer.useSyncV2(): let blocks = if peer.useSyncV2():
await peer.beaconBlocksByRoot_v2(BlockRootsList items) await beaconBlocksByRoot_v2(peer, BlockRootsList items)
else: else:
(await peer.beaconBlocksByRoot(BlockRootsList items)).map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto: (await beaconBlocksByRoot(peer, BlockRootsList items)).map(
blcks.mapIt(ForkedSignedBeaconBlock.init(it)) proc(blcks: seq[phase0.SignedBeaconBlock]): auto =
blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it))))
if blocks.isOk: if blocks.isOk:
let ublocks = blocks.get() let ublocks = blocks.get()
@ -88,7 +89,7 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
gotUnviableBlock = false gotUnviableBlock = false
for b in ublocks: for b in ublocks:
let ver = await rman.blockVerifier(b) let ver = await rman.blockVerifier(b[])
if ver.isErr(): if ver.isErr():
case ver.error() case ver.error()
of BlockError.MissingParent: of BlockError.MissingParent:

View File

@ -73,7 +73,7 @@ type
slots*: uint64 slots*: uint64
SyncManagerError* = object of CatchableError SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]] BeaconBlocksRes* = NetRes[seq[ref ForkedSignedBeaconBlock]]
proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} = proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} =
SyncMoment(stamp: now(chronos.Moment), slots: slots) SyncMoment(stamp: now(chronos.Moment), slots: slots)
@ -157,22 +157,15 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
slot = req.slot, slot_count = req.count, step = req.step, slot = req.slot, slot_count = req.count, step = req.step,
peer_score = peer.getScore(), peer_speed = peer.netKbps(), peer_score = peer.getScore(), peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman" direction = man.direction, topics = "syncman"
if peer.useSyncV2():
let res =
try: try:
let res =
if peer.useSyncV2():
await beaconBlocksByRange_v2(peer, req.slot, req.count, req.step) await beaconBlocksByRange_v2(peer, req.slot, req.count, req.step)
except CancelledError: else:
debug "Interrupt, while waiting getBlocks response", peer = peer, (await beaconBlocksByRange(peer, req.slot, req.count, req.step)).map(
slot = req.slot, slot_count = req.count, step = req.step, proc(blcks: seq[phase0.SignedBeaconBlock]): auto =
peer_speed = peer.netKbps(), direction = man.direction, blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it))))
topics = "syncman"
return
except CatchableError as exc:
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
return
if res.isErr(): if res.isErr():
debug "Error, while reading getBlocks response", debug "Error, while reading getBlocks response",
peer = peer, slot = req.slot, count = req.count, peer = peer, slot = req.slot, count = req.count,
@ -181,10 +174,6 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
error = $res.error() error = $res.error()
return return
return res return res
else:
let res =
try:
await beaconBlocksByRange(peer, req.slot, req.count, req.step)
except CancelledError: except CancelledError:
debug "Interrupt, while waiting getBlocks response", peer = peer, debug "Interrupt, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step, slot = req.slot, slot_count = req.count, step = req.step,
@ -197,17 +186,6 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(), errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman" direction = man.direction, topics = "syncman"
return return
if res.isErr():
debug "Error, while reading getBlocks response",
peer = peer, slot = req.slot, count = req.count,
step = req.step, peer_speed = peer.netKbps(),
direction = man.direction, error = $res.error(),
topics = "syncman"
return
let forked =
res.map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto:
blcks.mapIt(ForkedSignedBeaconBlock.init(it))
return forked
proc remainingSlots(man: SyncManager): uint64 = proc remainingSlots(man: SyncManager): uint64 =
if man.direction == SyncQueueKind.Forward: if man.direction == SyncQueueKind.Forward:

View File

@ -59,8 +59,9 @@ type
BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS] BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS]
proc readChunkPayload*(conn: Connection, peer: Peer, proc readChunkPayload*(
MsgType: type ForkedSignedBeaconBlock): Future[NetRes[ForkedSignedBeaconBlock]] {.async.} = conn: Connection, peer: Peer, MsgType: type (ref ForkedSignedBeaconBlock)):
Future[NetRes[MsgType]] {.async.} =
var contextBytes: ForkDigest var contextBytes: ForkDigest
try: try:
await conn.readExactly(addr contextBytes, sizeof contextBytes) await conn.readExactly(addr contextBytes, sizeof contextBytes)
@ -70,42 +71,24 @@ proc readChunkPayload*(conn: Connection, peer: Peer,
if contextBytes == peer.network.forkDigests.phase0: if contextBytes == peer.network.forkDigests.phase0:
let res = await readChunkPayload(conn, peer, phase0.SignedBeaconBlock) let res = await readChunkPayload(conn, peer, phase0.SignedBeaconBlock)
if res.isOk: if res.isOk:
return ok ForkedSignedBeaconBlock.init(res.get) return ok newClone(ForkedSignedBeaconBlock.init(res.get))
else: else:
return err(res.error) return err(res.error)
elif contextBytes == peer.network.forkDigests.altair: elif contextBytes == peer.network.forkDigests.altair:
let res = await readChunkPayload(conn, peer, altair.SignedBeaconBlock) let res = await readChunkPayload(conn, peer, altair.SignedBeaconBlock)
if res.isOk: if res.isOk:
return ok ForkedSignedBeaconBlock.init(res.get) return ok newClone(ForkedSignedBeaconBlock.init(res.get))
else: else:
return err(res.error) return err(res.error)
elif contextBytes == peer.network.forkDigests.bellatrix: elif contextBytes == peer.network.forkDigests.bellatrix:
let res = await readChunkPayload(conn, peer, bellatrix.SignedBeaconBlock) let res = await readChunkPayload(conn, peer, bellatrix.SignedBeaconBlock)
if res.isOk: if res.isOk:
return ok ForkedSignedBeaconBlock.init(res.get) return ok newClone(ForkedSignedBeaconBlock.init(res.get))
else: else:
return err(res.error) return err(res.error)
else: else:
return neterr InvalidContextBytes return neterr InvalidContextBytes
proc sendResponseChunk*(response: UntypedResponse,
val: ForkedSignedBeaconBlock): Future[void] =
inc response.writtenChunks
case val.kind
of BeaconBlockFork.Phase0:
response.stream.writeChunk(some ResponseCode.Success,
SSZ.encode(val.phase0Data),
response.peer.network.forkDigests.phase0.data)
of BeaconBlockFork.Altair:
response.stream.writeChunk(some ResponseCode.Success,
SSZ.encode(val.altairData),
response.peer.network.forkDigests.altair.data)
of BeaconBlockFork.Bellatrix:
response.stream.writeChunk(some ResponseCode.Success,
SSZ.encode(val.bellatrixData),
response.peer.network.forkDigests.bellatrix.data)
func shortLog*(s: StatusMsg): auto = func shortLog*(s: StatusMsg): auto =
( (
forkDigest: s.forkDigest, forkDigest: s.forkDigest,
@ -153,7 +136,9 @@ proc checkStatusMsg(state: BeaconSyncNetworkState, status: StatusMsg):
if status.finalizedEpoch <= dag.finalizedHead.slot.epoch: if status.finalizedEpoch <= dag.finalizedHead.slot.epoch:
let blockId = dag.getBlockIdAtSlot(status.finalizedEpoch.start_slot()) let blockId = dag.getBlockIdAtSlot(status.finalizedEpoch.start_slot())
if status.finalizedRoot != blockId.bid.root and blockId.bid.root != Eth2Digest(): if status.finalizedRoot != blockId.bid.root and
blockId.bid.root != Eth2Digest() and
status.finalizedRoot != Eth2Digest():
return err("peer following different finality") return err("peer following different finality")
ok() ok()
@ -224,34 +209,51 @@ p2pProtocol BeaconSync(version = 1,
reqStep: uint64, reqStep: uint64,
response: MultipleChunksResponse[phase0.SignedBeaconBlock]) response: MultipleChunksResponse[phase0.SignedBeaconBlock])
{.async, libp2pProtocol("beacon_blocks_by_range", 1).} = {.async, libp2pProtocol("beacon_blocks_by_range", 1).} =
# TODO Semantically, this request should return a non-ref, but doing so
# runs into extreme inefficiency due to the compiler introducing
# hidden copies - in future nim versions with move support, this should
# be revisited
# TODO This code is more complicated than it needs to be, since the type
# of the multiple chunks response is not actually used in this server
# implementation (it's used to derive the signature of the client
# function, not in the code below!)
# TODO although you can't tell from this function definition, a magic
# client call that returns `seq[ref SignedBeaconBlock]` will
# will be generated by the libp2p macro - we guarantee that seq items
# are `not-nil` in the implementation
trace "got range request", peer, startSlot, trace "got range request", peer, startSlot,
count = reqCount, step = reqStep count = reqCount, step = reqStep
if reqCount > 0'u64 and reqStep > 0'u64: if reqCount == 0'u64 or reqStep == 0'u64:
var blocks: array[MAX_REQUEST_BLOCKS, BlockId] raise newException(InvalidInputsError, "Empty range requested")
let
dag = peer.networkState.dag
# Limit number of blocks in response
count = int min(reqCount, blocks.lenu64)
let let
dag = peer.networkState.dag
if startSlot.epoch >= dag.cfg.ALTAIR_FORK_EPOCH:
# "Clients MAY limit the number of blocks in the response."
# https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange
debug "Block range v1 request for post-altair range",
peer, startSlot, reqCount, reqStep
return
var blocks: array[MAX_REQUEST_BLOCKS, BlockId]
let
# Limit number of blocks in response
count = int min(reqCount, blocks.lenu64)
endIndex = count - 1 endIndex = count - 1
startIndex = startIndex =
dag.getBlockRange(startSlot, reqStep, blocks.toOpenArray(0, endIndex)) dag.getBlockRange(startSlot, reqStep, blocks.toOpenArray(0, endIndex))
peer.updateRequestQuota(
blockByRangeLookupCost + peer.updateRequestQuota(blockByRangeLookupCost)
max(0, endIndex - startIndex + 1).float * blockResponseCost)
peer.awaitNonNegativeRequestQuota() peer.awaitNonNegativeRequestQuota()
var
found = 0
bytes: seq[byte]
for i in startIndex..endIndex: for i in startIndex..endIndex:
trace "wrote response block", if blocks[i].slot.epoch >= dag.cfg.ALTAIR_FORK_EPOCH:
slot = blocks[i].slot, roor = shortLog(blocks[i].root)
let blk = dag.getForkedBlock(blocks[i])
if blk.isSome():
let blck = blk.get()
case blck.kind
of BeaconBlockFork.Phase0:
await response.write(blck.phase0Data.asSigned)
else:
# Skipping all subsequent blocks should be OK because the spec says: # Skipping all subsequent blocks should be OK because the spec says:
# "Clients MAY limit the number of blocks in the response." # "Clients MAY limit the number of blocks in the response."
# https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange # https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange
@ -260,10 +262,19 @@ p2pProtocol BeaconSync(version = 1,
# that have been synced exactly to the altair transition slot. # that have been synced exactly to the altair transition slot.
break break
if dag.getBlockSSZ(blocks[i], bytes):
trace "writing response block",
slot = blocks[i].slot, roor = shortLog(blocks[i].root)
peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()
await response.writeRawBytes(bytes, []) # phase0 bytes
inc found
debug "Block range request done", debug "Block range request done",
peer, startSlot, count, reqStep, found = count - startIndex peer, startSlot, count, reqStep, found
else:
raise newException(InvalidInputsError, "Empty range requested")
proc beaconBlocksByRoot( proc beaconBlocksByRoot(
peer: Peer, peer: Peer,
@ -272,6 +283,19 @@ p2pProtocol BeaconSync(version = 1,
blockRoots: BlockRootsList, blockRoots: BlockRootsList,
response: MultipleChunksResponse[phase0.SignedBeaconBlock]) response: MultipleChunksResponse[phase0.SignedBeaconBlock])
{.async, libp2pProtocol("beacon_blocks_by_root", 1).} = {.async, libp2pProtocol("beacon_blocks_by_root", 1).} =
# TODO Semantically, this request should return a non-ref, but doing so
# runs into extreme inefficiency due to the compiler introducing
# hidden copies - in future nim versions with move support, this should
# be revisited
# TODO This code is more complicated than it needs to be, since the type
# of the multiple chunks response is not actually used in this server
# implementation (it's used to derive the signature of the client
# function, not in the code below!)
# TODO although you can't tell from this function definition, a magic
# client call that returns `seq[ref SignedBeaconBlock]` will
# will be generated by the libp2p macro - we guarantee that seq items
# are `not-nil` in the implementation
if blockRoots.len == 0: if blockRoots.len == 0:
raise newException(InvalidInputsError, "No blocks requested") raise newException(InvalidInputsError, "No blocks requested")
@ -279,19 +303,19 @@ p2pProtocol BeaconSync(version = 1,
dag = peer.networkState.dag dag = peer.networkState.dag
count = blockRoots.len count = blockRoots.len
var
found = 0
bytes: seq[byte]
peer.updateRequestQuota(count.float * blockByRootLookupCost) peer.updateRequestQuota(count.float * blockByRootLookupCost)
peer.awaitNonNegativeRequestQuota() peer.awaitNonNegativeRequestQuota()
var found = 0
for i in 0..<count: for i in 0..<count:
let blockRef = dag.getBlockRef(blockRoots[i]) let
if blockRef.isSome(): blockRef = dag.getBlockRef(blockRoots[i]).valueOr:
let blk = dag.getForkedBlock(blockRef[]) continue
case blk.kind
of BeaconBlockFork.Phase0: if blockRef.slot.epoch >= dag.cfg.ALTAIR_FORK_EPOCH:
await response.write(blk.phase0Data.asSigned)
inc found
of BeaconBlockFork.Altair, BeaconBlockFork.Bellatrix:
# Skipping this block should be fine because the spec says: # Skipping this block should be fine because the spec says:
# "Clients MAY limit the number of blocks in the response." # "Clients MAY limit the number of blocks in the response."
# https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyroot # https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyroot
@ -300,7 +324,12 @@ p2pProtocol BeaconSync(version = 1,
# that have been synced exactly to the altair transition slot. # that have been synced exactly to the altair transition slot.
continue continue
peer.updateRequestQuota(found.float * blockResponseCost) if dag.getBlockSSZ(blockRef.bid, bytes):
peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()
await response.writeRawBytes(bytes, []) # phase0 bytes
inc found
debug "Block root request done", debug "Block root request done",
peer, roots = blockRoots.len, count, found peer, roots = blockRoots.len, count, found
@ -310,47 +339,75 @@ p2pProtocol BeaconSync(version = 1,
startSlot: Slot, startSlot: Slot,
reqCount: uint64, reqCount: uint64,
reqStep: uint64, reqStep: uint64,
response: MultipleChunksResponse[ForkedSignedBeaconBlock]) response: MultipleChunksResponse[ref ForkedSignedBeaconBlock])
{.async, libp2pProtocol("beacon_blocks_by_range", 2).} = {.async, libp2pProtocol("beacon_blocks_by_range", 2).} =
# TODO Semantically, this request should return a non-ref, but doing so
# runs into extreme inefficiency due to the compiler introducing
# hidden copies - in future nim versions with move support, this should
# be revisited
# TODO This code is more complicated than it needs to be, since the type
# of the multiple chunks response is not actually used in this server
# implementation (it's used to derive the signature of the client
# function, not in the code below!)
# TODO although you can't tell from this function definition, a magic
# client call that returns `seq[ref ForkedSignedBeaconBlock]` will
# will be generated by the libp2p macro - we guarantee that seq items
# are `not-nil` in the implementation
trace "got range request", peer, startSlot, trace "got range request", peer, startSlot,
count = reqCount, step = reqStep count = reqCount, step = reqStep
if reqCount > 0'u64 and reqStep > 0'u64: if reqCount == 0 or reqStep == 0:
raise newException(InvalidInputsError, "Empty range requested")
var blocks: array[MAX_REQUEST_BLOCKS, BlockId] var blocks: array[MAX_REQUEST_BLOCKS, BlockId]
let let
dag = peer.networkState.dag dag = peer.networkState.dag
# Limit number of blocks in response # Limit number of blocks in response
count = int min(reqCount, blocks.lenu64) count = int min(reqCount, blocks.lenu64)
let
endIndex = count - 1 endIndex = count - 1
startIndex = startIndex =
dag.getBlockRange(startSlot, reqStep, dag.getBlockRange(startSlot, reqStep,
blocks.toOpenArray(0, endIndex)) blocks.toOpenArray(0, endIndex))
peer.updateRequestQuota(
blockByRangeLookupCost + peer.updateRequestQuota(blockByRangeLookupCost)
max(0, endIndex - startIndex + 1).float * blockResponseCost)
peer.awaitNonNegativeRequestQuota() peer.awaitNonNegativeRequestQuota()
for i in startIndex..endIndex: var
let found = 0
blck = dag.getForkedBlock(blocks[i]).valueOr: bytes: seq[byte]
continue
await response.write(blck.asSigned) for i in startIndex..endIndex:
if dag.getBlockSSZ(blocks[i], bytes):
peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()
await response.writeRawBytes(
bytes, dag.forkDigestAtEpoch(blocks[i].slot.epoch).data)
inc found
debug "Block range request done", debug "Block range request done",
peer, startSlot, count, reqStep, found = count - startIndex peer, startSlot, count, reqStep
else:
raise newException(InvalidInputsError, "Empty range requested")
proc beaconBlocksByRoot_v2( proc beaconBlocksByRoot_v2(
peer: Peer, peer: Peer,
# Please note that the SSZ list here ensures that the # Please note that the SSZ list here ensures that the
# spec constant MAX_REQUEST_BLOCKS is enforced: # spec constant MAX_REQUEST_BLOCKS is enforced:
blockRoots: BlockRootsList, blockRoots: BlockRootsList,
response: MultipleChunksResponse[ForkedSignedBeaconBlock]) response: MultipleChunksResponse[ref ForkedSignedBeaconBlock])
{.async, libp2pProtocol("beacon_blocks_by_root", 2).} = {.async, libp2pProtocol("beacon_blocks_by_root", 2).} =
# TODO Semantically, this request should return a non-ref, but doing so
# runs into extreme inefficiency due to the compiler introducing
# hidden copies - in future nim versions with move support, this should
# be revisited
# TODO This code is more complicated than it needs to be, since the type
# of the multiple chunks response is not actually used in this server
# implementation (it's used to derive the signature of the client
# function, not in the code below!)
# TODO although you can't tell from this function definition, a magic
# client call that returns `seq[ref ForkedSignedBeaconBlock]` will
# will be generated by the libp2p macro - we guarantee that seq items
# are `not-nil` in the implementation
if blockRoots.len == 0: if blockRoots.len == 0:
raise newException(InvalidInputsError, "No blocks requested") raise newException(InvalidInputsError, "No blocks requested")
@ -361,15 +418,23 @@ p2pProtocol BeaconSync(version = 1,
peer.updateRequestQuota(count.float * blockByRootLookupCost) peer.updateRequestQuota(count.float * blockByRootLookupCost)
peer.awaitNonNegativeRequestQuota() peer.awaitNonNegativeRequestQuota()
var found = 0 var
for i in 0..<count: found = 0
let blockRef = dag.getBlockRef(blockRoots[i]) bytes: seq[byte]
if blockRef.isSome():
let blk = dag.getForkedBlock(blockRef[])
await response.write(blk.asSigned)
inc found
peer.updateRequestQuota(found.float * blockResponseCost) for i in 0..<count:
let
blockRef = dag.getBlockRef(blockRoots[i]).valueOr:
continue
if dag.getBlockSSZ(blockRef.bid, bytes):
peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()
await response.writeRawBytes(
bytes, dag.forkDigestAtEpoch(blockRef.slot.epoch).data)
inc found
debug "Block root request done", debug "Block root request done",
peer, roots = blockRoots.len, count, found peer, roots = blockRoots.len, count, found

View File

@ -43,7 +43,7 @@ type
SyncResult*[T] = object SyncResult*[T] = object
request*: SyncRequest[T] request*: SyncRequest[T]
data*: seq[ForkedSignedBeaconBlock] data*: seq[ref ForkedSignedBeaconBlock]
SyncWaiter* = ref object SyncWaiter* = ref object
future: Future[void] future: Future[void]
@ -72,12 +72,12 @@ type
blockVerifier: BlockVerifier blockVerifier: BlockVerifier
SyncManagerError* = object of CatchableError SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]] BeaconBlocksRes* = NetRes[seq[ref ForkedSignedBeaconBlock]]
chronicles.formatIt SyncQueueKind: $it chronicles.formatIt SyncQueueKind: $it
proc getShortMap*[T](req: SyncRequest[T], proc getShortMap*[T](req: SyncRequest[T],
data: openArray[ForkedSignedBeaconBlock]): string = data: openArray[ref ForkedSignedBeaconBlock]): string =
## Returns all slot numbers in ``data`` as placement map. ## Returns all slot numbers in ``data`` as placement map.
var res = newStringOfCap(req.count) var res = newStringOfCap(req.count)
var slider = req.slot var slider = req.slot
@ -85,11 +85,11 @@ proc getShortMap*[T](req: SyncRequest[T],
for i in 0 ..< req.count: for i in 0 ..< req.count:
if last < len(data): if last < len(data):
for k in last ..< len(data): for k in last ..< len(data):
if slider == data[k].slot: if slider == data[k][].slot:
res.add('x') res.add('x')
last = k + 1 last = k + 1
break break
elif slider < data[k].slot: elif slider < data[k][].slot:
res.add('.') res.add('.')
break break
else: else:
@ -105,7 +105,7 @@ proc cmp*[T](a, b: SyncRequest[T]): int =
cmp(uint64(a.slot), uint64(b.slot)) cmp(uint64(a.slot), uint64(b.slot))
proc checkResponse*[T](req: SyncRequest[T], proc checkResponse*[T](req: SyncRequest[T],
data: openArray[ForkedSignedBeaconBlock]): bool = data: openArray[ref ForkedSignedBeaconBlock]): bool =
if len(data) == 0: if len(data) == 0:
# Impossible to verify empty response. # Impossible to verify empty response.
return true return true
@ -120,9 +120,9 @@ proc checkResponse*[T](req: SyncRequest[T],
var dindex = 0 var dindex = 0
while (rindex < req.count) and (dindex < len(data)): while (rindex < req.count) and (dindex < len(data)):
if slot < data[dindex].slot: if slot < data[dindex][].slot:
discard discard
elif slot == data[dindex].slot: elif slot == data[dindex][].slot:
inc(dindex) inc(dindex)
else: else:
return false return false
@ -360,7 +360,7 @@ proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} =
let lastslot = sr.request.slot + sr.request.count - 1'u64 let lastslot = sr.request.slot + sr.request.count - 1'u64
if len(sr.data) == 0: if len(sr.data) == 0:
return true return true
if sr.data[^1].slot != lastslot: if sr.data[^1][].slot != lastslot:
return true return true
return false return false
@ -371,7 +371,7 @@ proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} =
# If response has only empty slots we going to use original request slot # If response has only empty slots we going to use original request slot
sr.request.slot sr.request.slot
else: else:
sr.data[^1].slot sr.data[^1][].slot
proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) = proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
sq.debtsQueue.push(sr) sq.debtsQueue.push(sr)
@ -472,7 +472,7 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
safeSlot safeSlot
iterator blocks*[T](sq: SyncQueue[T], iterator blocks*[T](sq: SyncQueue[T],
sr: SyncResult[T]): ForkedSignedBeaconBlock = sr: SyncResult[T]): ref ForkedSignedBeaconBlock =
case sq.kind case sq.kind
of SyncQueueKind.Forward: of SyncQueueKind.Forward:
for i in countup(0, len(sr.data) - 1): for i in countup(0, len(sr.data) - 1):
@ -503,7 +503,7 @@ proc notInRange[T](sq: SyncQueue[T], sr: SyncRequest[T]): bool =
(sq.queueSize > 0) and (sr.slot + sr.count - 1'u64 != sq.outSlot) (sq.queueSize > 0) and (sr.slot + sr.count - 1'u64 != sq.outSlot)
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ForkedSignedBeaconBlock], data: seq[ref ForkedSignedBeaconBlock],
processingCb: ProcessingCallback = nil) {.async.} = processingCb: ProcessingCallback = nil) {.async.} =
## Push successful result to queue ``sq``. ## Push successful result to queue ``sq``.
mixin updateScore mixin updateScore
@ -579,13 +579,13 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
res: Result[void, BlockError] res: Result[void, BlockError]
for blk in sq.blocks(item): for blk in sq.blocks(item):
res = await sq.blockVerifier(blk) res = await sq.blockVerifier(blk[])
if res.isOk(): if res.isOk():
hasOkBlock = true hasOkBlock = true
else: else:
case res.error() case res.error()
of BlockError.MissingParent: of BlockError.MissingParent:
missingParentSlot = some(blk.slot) missingParentSlot = some(blk[].slot)
break break
of BlockError.Duplicate: of BlockError.Duplicate:
# Keep going, happens naturally # Keep going, happens naturally
@ -595,7 +595,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
# quarantine # quarantine
if unviableBlock.isNone: if unviableBlock.isNone:
# Remember the first unviable block, so we can log it # Remember the first unviable block, so we can log it
unviableBlock = some((blk.root, blk.slot)) unviableBlock = some((blk[].root, blk[].slot))
of BlockError.Invalid: of BlockError.Invalid:
hasInvalidBlock = true hasInvalidBlock = true

View File

@ -41,22 +41,26 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier =
return verify return verify
suite "SyncManager test suite": suite "SyncManager test suite":
proc createChain(start, finish: Slot): seq[ForkedSignedBeaconBlock] = proc createChain(start, finish: Slot): seq[ref ForkedSignedBeaconBlock] =
doAssert(start <= finish) doAssert(start <= finish)
let count = int(finish - start + 1'u64) let count = int(finish - start + 1'u64)
var res = newSeq[ForkedSignedBeaconBlock](count) var res = newSeq[ref ForkedSignedBeaconBlock](count)
var curslot = start var curslot = start
for item in res.mitems(): for item in res.mitems():
item.phase0Data.message.slot = curslot item = new ForkedSignedBeaconBlock
item[].phase0Data.message.slot = curslot
curslot = curslot + 1'u64 curslot = curslot + 1'u64
res res
proc getSlice(chain: openarray[ForkedSignedBeaconBlock], startSlot: Slot, proc getSlice(chain: openArray[ref ForkedSignedBeaconBlock], startSlot: Slot,
request: SyncRequest[SomeTPeer]): seq[ForkedSignedBeaconBlock] = request: SyncRequest[SomeTPeer]): seq[ref ForkedSignedBeaconBlock] =
let let
startIndex = int(request.slot - startSlot) startIndex = int(request.slot - startSlot)
finishIndex = int(request.slot - startSlot) + int(request.count) - 1 finishIndex = int(request.slot - startSlot) + int(request.count) - 1
@chain[startIndex..finishIndex] var res = newSeq[ref ForkedSignedBeaconBlock](1 + finishIndex - startIndex)
for i in 0..<res.len:
res[i] = newClone(chain[i + startIndex][])
res
template startAndFinishSlotsEqual(kind: SyncQueueKind) = template startAndFinishSlotsEqual(kind: SyncQueueKind) =
let p1 = SomeTPeer() let p1 = SomeTPeer()
@ -508,7 +512,7 @@ suite "SyncManager test suite":
f14.finished == false f14.finished == false
var missingSlice = chain.getSlice(startSlot, r13) var missingSlice = chain.getSlice(startSlot, r13)
withBlck(missingSlice[0]): withBlck(missingSlice[0][]):
blck.message.proposer_index = 0xDEADBEAF'u64 blck.message.proposer_index = 0xDEADBEAF'u64
var f13 = queue.push(r13, missingSlice) var f13 = queue.push(r13, missingSlice)
await allFutures(f13, f14) await allFutures(f13, f14)
@ -672,7 +676,7 @@ suite "SyncManager test suite":
f14.finished == false f14.finished == false
var missingSlice = chain.getSlice(startSlot, r13) var missingSlice = chain.getSlice(startSlot, r13)
withBlck(missingSlice[0]): withBlck(missingSlice[0][]):
blck.message.proposer_index = 0xDEADBEAF'u64 blck.message.proposer_index = 0xDEADBEAF'u64
var f13 = queue.push(r13, missingSlice) var f13 = queue.push(r13, missingSlice)
await allFutures(f13, f14) await allFutures(f13, f14)
@ -709,7 +713,7 @@ suite "SyncManager test suite":
test "[SyncQueue] hasEndGap() test": test "[SyncQueue] hasEndGap() test":
let chain1 = createChain(Slot(1), Slot(1)) let chain1 = createChain(Slot(1), Slot(1))
let chain2 = newSeq[ForkedSignedBeaconBlock]() let chain2 = newSeq[ref ForkedSignedBeaconBlock]()
for counter in countdown(32'u64, 2'u64): for counter in countdown(32'u64, 2'u64):
let req = SyncRequest[SomeTPeer](slot: Slot(1), count: counter, let req = SyncRequest[SomeTPeer](slot: Slot(1), count: counter,
@ -726,7 +730,7 @@ suite "SyncManager test suite":
test "[SyncQueue] getLastNonEmptySlot() test": test "[SyncQueue] getLastNonEmptySlot() test":
let chain1 = createChain(Slot(10), Slot(10)) let chain1 = createChain(Slot(10), Slot(10))
let chain2 = newSeq[ForkedSignedBeaconBlock]() let chain2 = newSeq[ref ForkedSignedBeaconBlock]()
for counter in countdown(32'u64, 2'u64): for counter in countdown(32'u64, 2'u64):
let req = SyncRequest[SomeTPeer](slot: Slot(10), count: counter, let req = SyncRequest[SomeTPeer](slot: Slot(10), count: counter,