avoid unnecessary recompression in block protocol (#3598)

Blocks can be sent straight from compressed data sources

Co-authored-by: Etan Kissling <etan@status.im>
This commit is contained in:
Jacek Sieka 2022-05-05 13:00:02 +02:00 committed by GitHub
parent 7bb40d28ae
commit 138c40161d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 82 additions and 20 deletions

View File

@ -824,6 +824,17 @@ proc getPhase0BlockSSZ(
db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and
success success
proc getPhase0BlockSZ(
db: BeaconChainDBV0, key: Eth2Digest, data: var seq[byte]): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = snappy.encodeFramed(
snappy.decode(data, maxDecompressedDbRecordSize))
except CatchableError: success = false
db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and
success
# SSZ implementations are separate so as to avoid unnecessary data copies # SSZ implementations are separate so as to avoid unnecessary data copies
proc getBlockSSZ*( proc getBlockSSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
@ -877,7 +888,7 @@ proc getBlockSZ*(
snappy.decode(data, maxDecompressedDbRecordSize)) snappy.decode(data, maxDecompressedDbRecordSize))
except CatchableError: success = false except CatchableError: success = false
db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or
db.v0.getPhase0BlockSSZ(key, data) db.v0.getPhase0BlockSZ(key, data)
proc getBlockSZ*( proc getBlockSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],

View File

@ -533,6 +533,27 @@ proc isLightClientRequestProto(fn: NimNode): NimNode =
return newLit(false) return newLit(false)
proc writeChunkSZ*(
conn: Connection, responseCode: Option[ResponseCode],
uncompressedLen: uint64, payloadSZ: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] =
# max 10 bytes varint length + 1 byte response code + data
const numOverheadBytes = sizeof(byte) + Leb128.maxLen(typeof(uncompressedLen))
var output = memoryOutput(payloadSZ.len + contextBytes.len + numOverheadBytes)
try:
if responseCode.isSome:
output.write byte(responseCode.get)
if contextBytes.len > 0:
output.write contextBytes
output.write toBytes(uncompressedLen, Leb128).toOpenArray()
output.write payloadSZ
except IOError as exc:
raiseAssert exc.msg # memoryOutput shouldn't raise
conn.write(output.getOutput)
proc writeChunk*(conn: Connection, proc writeChunk*(conn: Connection,
responseCode: Option[ResponseCode], responseCode: Option[ResponseCode],
payload: openArray[byte], payload: openArray[byte],
@ -591,6 +612,14 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
finally: finally:
await stream.close() await stream.close()
proc sendResponseChunkBytesSZ(
response: UntypedResponse, uncompressedLen: uint64,
payloadSZ: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] =
inc response.writtenChunks
response.stream.writeChunkSZ(
some Success, uncompressedLen, payloadSZ, contextBytes)
proc sendResponseChunkBytes( proc sendResponseChunkBytes(
response: UntypedResponse, payload: openArray[byte], response: UntypedResponse, payload: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] = contextBytes: openArray[byte] = []): Future[void] =
@ -657,10 +686,10 @@ template write*[M](r: MultipleChunksResponse[M], val: M): untyped =
mixin sendResponseChunk mixin sendResponseChunk
sendResponseChunk(UntypedResponse(r), val) sendResponseChunk(UntypedResponse(r), val)
template writeRawBytes*[M]( template writeBytesSZ*[M](
r: MultipleChunksResponse[M], bytes: openArray[byte], r: MultipleChunksResponse[M], uncompressedLen: uint64,
contextBytes: openArray[byte]): untyped = bytes: openArray[byte], contextBytes: openArray[byte]): untyped =
sendResponseChunkBytes(UntypedResponse(r), bytes, contextBytes) sendResponseChunkBytesSZ(UntypedResponse(r), uncompressedLen, bytes, contextBytes)
template send*[M](r: SingleChunkResponse[M], val: M): untyped = template send*[M](r: SingleChunkResponse[M], val: M): untyped =
mixin sendResponseChunk mixin sendResponseChunk

View File

@ -12,8 +12,9 @@
# https://github.com/ethereum/consensus-specs/pull/2802 # https://github.com/ethereum/consensus-specs/pull/2802
import import
options, tables, sets, macros, std/[options, tables, sets, macros],
chronicles, chronos, stew/ranges/bitranges, libp2p/switch, chronicles, chronos, snappy/codec,
stew/ranges/bitranges, libp2p/switch,
../spec/datatypes/[phase0, altair, bellatrix], ../spec/datatypes/[phase0, altair, bellatrix],
../spec/[helpers, forks, network], ../spec/[helpers, forks, network],
".."/[beacon_clock], ".."/[beacon_clock],
@ -279,15 +280,16 @@ p2pProtocol BeaconSync(version = 1,
# Also, our response would be indistinguishable from a node # Also, our response would be indistinguishable from a node
# that have been synced exactly to the altair transition slot. # that have been synced exactly to the altair transition slot.
break break
if dag.getBlockSZ(blocks[i], bytes):
if dag.getBlockSSZ(blocks[i], bytes): let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
trace "writing response block", warn "Cannot read block size, database corrupt?",
slot = blocks[i].slot, roor = shortLog(blocks[i].root) bytes = bytes.len(), blck = shortLog(blocks[i])
continue
peer.updateRequestQuota(blockResponseCost) peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota() peer.awaitNonNegativeRequestQuota()
await response.writeRawBytes(bytes, []) # phase0 bytes await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0 bytes
inc found inc found
@ -342,11 +344,16 @@ p2pProtocol BeaconSync(version = 1,
# that have been synced exactly to the altair transition slot. # that have been synced exactly to the altair transition slot.
continue continue
if dag.getBlockSSZ(blockRef.bid, bytes): if dag.getBlockSZ(blockRef.bid, bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read block size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blockRef)
continue
peer.updateRequestQuota(blockResponseCost) peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota() peer.awaitNonNegativeRequestQuota()
await response.writeRawBytes(bytes, []) # phase0 bytes await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0
inc found inc found
debug "Block root request done", debug "Block root request done",
@ -395,12 +402,18 @@ p2pProtocol BeaconSync(version = 1,
bytes: seq[byte] bytes: seq[byte]
for i in startIndex..endIndex: for i in startIndex..endIndex:
if dag.getBlockSSZ(blocks[i], bytes): if dag.getBlockSZ(blocks[i], bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read block size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blocks[i])
continue
peer.updateRequestQuota(blockResponseCost) peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota() peer.awaitNonNegativeRequestQuota()
await response.writeRawBytes( await response.writeBytesSZ(
bytes, dag.forkDigestAtEpoch(blocks[i].slot.epoch).data) uncompressedLen, bytes,
dag.forkDigestAtEpoch(blocks[i].slot.epoch).data)
inc found inc found
@ -445,12 +458,18 @@ p2pProtocol BeaconSync(version = 1,
blockRef = dag.getBlockRef(blockRoots[i]).valueOr: blockRef = dag.getBlockRef(blockRoots[i]).valueOr:
continue continue
if dag.getBlockSSZ(blockRef.bid, bytes): if dag.getBlockSZ(blockRef.bid, bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read block size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blockRef)
continue
peer.updateRequestQuota(blockResponseCost) peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota() peer.awaitNonNegativeRequestQuota()
await response.writeRawBytes( await response.writeBytesSZ(
bytes, dag.forkDigestAtEpoch(blockRef.slot.epoch).data) uncompressedLen, bytes,
dag.forkDigestAtEpoch(blockRef.slot.epoch).data)
inc found inc found

View File

@ -111,6 +111,7 @@ suite "Beacon chain DB" & preset():
db.getBlockSZ(root, tmp2, phase0.TrustedSignedBeaconBlock) db.getBlockSZ(root, tmp2, phase0.TrustedSignedBeaconBlock)
tmp == SSZ.encode(signedBlock) tmp == SSZ.encode(signedBlock)
tmp2 == encodeFramed(tmp) tmp2 == encodeFramed(tmp)
uncompressedLenFramed(tmp2).isSome
db.delBlock(root) db.delBlock(root)
check: check:
@ -153,6 +154,7 @@ suite "Beacon chain DB" & preset():
db.getBlockSZ(root, tmp2, altair.TrustedSignedBeaconBlock) db.getBlockSZ(root, tmp2, altair.TrustedSignedBeaconBlock)
tmp == SSZ.encode(signedBlock) tmp == SSZ.encode(signedBlock)
tmp2 == encodeFramed(tmp) tmp2 == encodeFramed(tmp)
uncompressedLenFramed(tmp2).isSome
db.delBlock(root) db.delBlock(root)
check: check:
@ -195,6 +197,7 @@ suite "Beacon chain DB" & preset():
db.getBlockSZ(root, tmp2, bellatrix.TrustedSignedBeaconBlock) db.getBlockSZ(root, tmp2, bellatrix.TrustedSignedBeaconBlock)
tmp == SSZ.encode(signedBlock) tmp == SSZ.encode(signedBlock)
tmp2 == encodeFramed(tmp) tmp2 == encodeFramed(tmp)
uncompressedLenFramed(tmp2).isSome
db.delBlock(root) db.delBlock(root)
check: check: