diff --git a/beacon_chain/beacon_chain_db.nim b/beacon_chain/beacon_chain_db.nim index fdd88f9c1..4c520eb2b 100644 --- a/beacon_chain/beacon_chain_db.nim +++ b/beacon_chain/beacon_chain_db.nim @@ -824,6 +824,17 @@ proc getPhase0BlockSSZ( db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and success +proc getPhase0BlockSZ( + db: BeaconChainDBV0, key: Eth2Digest, data: var seq[byte]): bool = + let dataPtr = addr data # Short-lived + var success = true + proc decode(data: openArray[byte]) = + try: dataPtr[] = snappy.encodeFramed( + snappy.decode(data, maxDecompressedDbRecordSize)) + except CatchableError: success = false + db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and + success + # SSZ implementations are separate so as to avoid unnecessary data copies proc getBlockSSZ*( db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], @@ -877,7 +888,7 @@ proc getBlockSZ*( snappy.decode(data, maxDecompressedDbRecordSize)) except CatchableError: success = false db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or - db.v0.getPhase0BlockSSZ(key, data) + db.v0.getPhase0BlockSZ(key, data) proc getBlockSZ*( db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 86eccac8d..5f4f92d53 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -533,6 +533,27 @@ proc isLightClientRequestProto(fn: NimNode): NimNode = return newLit(false) +proc writeChunkSZ*( + conn: Connection, responseCode: Option[ResponseCode], + uncompressedLen: uint64, payloadSZ: openArray[byte], + contextBytes: openArray[byte] = []): Future[void] = + # max 10 bytes varint length + 1 byte response code + data + const numOverheadBytes = sizeof(byte) + Leb128.maxLen(typeof(uncompressedLen)) + var output = memoryOutput(payloadSZ.len + contextBytes.len + numOverheadBytes) + try: + if responseCode.isSome: + output.write byte(responseCode.get) + + if contextBytes.len > 0: + output.write contextBytes + + output.write toBytes(uncompressedLen, Leb128).toOpenArray() + output.write payloadSZ + except IOError as exc: + raiseAssert exc.msg # memoryOutput shouldn't raise + + conn.write(output.getOutput) + proc writeChunk*(conn: Connection, responseCode: Option[ResponseCode], payload: openArray[byte], @@ -591,6 +612,14 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {. finally: await stream.close() +proc sendResponseChunkBytesSZ( + response: UntypedResponse, uncompressedLen: uint64, + payloadSZ: openArray[byte], + contextBytes: openArray[byte] = []): Future[void] = + inc response.writtenChunks + response.stream.writeChunkSZ( + some Success, uncompressedLen, payloadSZ, contextBytes) + proc sendResponseChunkBytes( response: UntypedResponse, payload: openArray[byte], contextBytes: openArray[byte] = []): Future[void] = @@ -657,10 +686,10 @@ template write*[M](r: MultipleChunksResponse[M], val: M): untyped = mixin sendResponseChunk sendResponseChunk(UntypedResponse(r), val) -template writeRawBytes*[M]( - r: MultipleChunksResponse[M], bytes: openArray[byte], - contextBytes: openArray[byte]): untyped = - sendResponseChunkBytes(UntypedResponse(r), bytes, contextBytes) +template writeBytesSZ*[M]( + r: MultipleChunksResponse[M], uncompressedLen: uint64, + bytes: openArray[byte], contextBytes: openArray[byte]): untyped = + sendResponseChunkBytesSZ(UntypedResponse(r), uncompressedLen, bytes, contextBytes) template send*[M](r: SingleChunkResponse[M], val: M): untyped = mixin sendResponseChunk diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index 92b6188b0..a083652f5 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -12,8 +12,9 @@ # https://github.com/ethereum/consensus-specs/pull/2802 import - options, tables, sets, macros, - chronicles, chronos, stew/ranges/bitranges, libp2p/switch, + std/[options, tables, sets, macros], + chronicles, chronos, snappy/codec, + stew/ranges/bitranges, libp2p/switch, ../spec/datatypes/[phase0, altair, bellatrix], ../spec/[helpers, forks, network], ".."/[beacon_clock], @@ -279,15 +280,16 @@ p2pProtocol BeaconSync(version = 1, # Also, our response would be indistinguishable from a node # that have been synced exactly to the altair transition slot. break - - if dag.getBlockSSZ(blocks[i], bytes): - trace "writing response block", - slot = blocks[i].slot, roor = shortLog(blocks[i].root) + if dag.getBlockSZ(blocks[i], bytes): + let uncompressedLen = uncompressedLenFramed(bytes).valueOr: + warn "Cannot read block size, database corrupt?", + bytes = bytes.len(), blck = shortLog(blocks[i]) + continue peer.updateRequestQuota(blockResponseCost) peer.awaitNonNegativeRequestQuota() - await response.writeRawBytes(bytes, []) # phase0 bytes + await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0 bytes inc found @@ -342,11 +344,16 @@ p2pProtocol BeaconSync(version = 1, # that have been synced exactly to the altair transition slot. continue - if dag.getBlockSSZ(blockRef.bid, bytes): + if dag.getBlockSZ(blockRef.bid, bytes): + let uncompressedLen = uncompressedLenFramed(bytes).valueOr: + warn "Cannot read block size, database corrupt?", + bytes = bytes.len(), blck = shortLog(blockRef) + continue + peer.updateRequestQuota(blockResponseCost) peer.awaitNonNegativeRequestQuota() - await response.writeRawBytes(bytes, []) # phase0 bytes + await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0 inc found debug "Block root request done", @@ -395,12 +402,18 @@ p2pProtocol BeaconSync(version = 1, bytes: seq[byte] for i in startIndex..endIndex: - if dag.getBlockSSZ(blocks[i], bytes): + if dag.getBlockSZ(blocks[i], bytes): + let uncompressedLen = uncompressedLenFramed(bytes).valueOr: + warn "Cannot read block size, database corrupt?", + bytes = bytes.len(), blck = shortLog(blocks[i]) + continue + peer.updateRequestQuota(blockResponseCost) peer.awaitNonNegativeRequestQuota() - await response.writeRawBytes( - bytes, dag.forkDigestAtEpoch(blocks[i].slot.epoch).data) + await response.writeBytesSZ( + uncompressedLen, bytes, + dag.forkDigestAtEpoch(blocks[i].slot.epoch).data) inc found @@ -445,12 +458,18 @@ p2pProtocol BeaconSync(version = 1, blockRef = dag.getBlockRef(blockRoots[i]).valueOr: continue - if dag.getBlockSSZ(blockRef.bid, bytes): + if dag.getBlockSZ(blockRef.bid, bytes): + let uncompressedLen = uncompressedLenFramed(bytes).valueOr: + warn "Cannot read block size, database corrupt?", + bytes = bytes.len(), blck = shortLog(blockRef) + continue + peer.updateRequestQuota(blockResponseCost) peer.awaitNonNegativeRequestQuota() - await response.writeRawBytes( - bytes, dag.forkDigestAtEpoch(blockRef.slot.epoch).data) + await response.writeBytesSZ( + uncompressedLen, bytes, + dag.forkDigestAtEpoch(blockRef.slot.epoch).data) inc found diff --git a/tests/test_beacon_chain_db.nim b/tests/test_beacon_chain_db.nim index cb836c735..8e541aef8 100644 --- a/tests/test_beacon_chain_db.nim +++ b/tests/test_beacon_chain_db.nim @@ -111,6 +111,7 @@ suite "Beacon chain DB" & preset(): db.getBlockSZ(root, tmp2, phase0.TrustedSignedBeaconBlock) tmp == SSZ.encode(signedBlock) tmp2 == encodeFramed(tmp) + uncompressedLenFramed(tmp2).isSome db.delBlock(root) check: @@ -153,6 +154,7 @@ suite "Beacon chain DB" & preset(): db.getBlockSZ(root, tmp2, altair.TrustedSignedBeaconBlock) tmp == SSZ.encode(signedBlock) tmp2 == encodeFramed(tmp) + uncompressedLenFramed(tmp2).isSome db.delBlock(root) check: @@ -195,6 +197,7 @@ suite "Beacon chain DB" & preset(): db.getBlockSZ(root, tmp2, bellatrix.TrustedSignedBeaconBlock) tmp == SSZ.encode(signedBlock) tmp2 == encodeFramed(tmp) + uncompressedLenFramed(tmp2).isSome db.delBlock(root) check: