From d0dbc4a8f981ed64c5ce02de7855d8c92819a602 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 15 Apr 2022 09:44:06 +0200 Subject: [PATCH] Snappy revamp (#3564) This PR makes the necessary adjustments to deal with the revamped snappy API. In practical terms for nimbus-eth2, there are performance increases to gossip processing, database reading and writing as well as era file processing. Exporting `.era` files for example, a snappy-heavy operation, almost halves in total processing time: Pre: ``` Average, StdDev, Min, Max, Samples, Test 39.088, 8.735, 23.619, 53.301, 50, tState 237.079, 46.692, 165.620, 355.481, 49, tBlocks ``` Post: ``` All time are ms Average, StdDev, Min, Max, Samples, Test 25.350, 5.303, 15.351, 41.856, 50, tState 141.238, 24.164, 99.990, 199.329, 49, tBlocks ``` --- beacon_chain/beacon_chain_db.nim | 12 +-- beacon_chain/era_db.nim | 7 +- beacon_chain/networking/eth2_network.nim | 4 +- .../networking/libp2p_streams_backend.nim | 84 +++++++++---------- ncli/e2store.nim | 7 +- ncli/ncli_db.nim | 6 +- tests/test_beacon_chain_db.nim | 10 +-- vendor/nim-snappy | 2 +- 8 files changed, 63 insertions(+), 69 deletions(-) diff --git a/beacon_chain/beacon_chain_db.nim b/beacon_chain/beacon_chain_db.nim index 68ef6c878..fdd88f9c1 100644 --- a/beacon_chain/beacon_chain_db.nim +++ b/beacon_chain/beacon_chain_db.nim @@ -10,7 +10,7 @@ import std/[typetraits, tables], stew/[arrayops, assign2, byteutils, endians2, io2, objects, results], - serialization, chronicles, snappy, snappy/framing, + serialization, chronicles, snappy, eth/db/[kvstore, kvstore_sqlite3], ./networking/network_metadata, ./beacon_chain_db_immutable, ./spec/[eth2_ssz_serialization, eth2_merkleization, forks, state_transition], @@ -526,7 +526,7 @@ proc decodeSnappySSZ[T](data: openArray[byte], output: var T): bool = proc decodeSZSSZ[T](data: openArray[byte], output: var T): bool = try: - let decompressed = framingFormatUncompress(data) + let decompressed = decodeFramed(data) readSszBytes(decompressed, output, updateRoot = false) true except CatchableError as e: @@ -552,7 +552,7 @@ proc encodeSnappySSZ(v: auto): seq[byte] = proc encodeSZSSZ(v: auto): seq[byte] = # https://github.com/google/snappy/blob/main/framing_format.txt try: - framingFormatCompress(SSZ.encode(v)) + encodeFramed(SSZ.encode(v)) except CatchableError as err: # In-memory encode shouldn't fail! raiseAssert err.msg @@ -852,7 +852,7 @@ proc getBlockSSZ*( let dataPtr = addr data # Short-lived var success = true proc decode(data: openArray[byte]) = - try: dataPtr[] = framingFormatUncompress(data) + try: dataPtr[] = decodeFramed(data) except CatchableError: success = false db.blocks[T.toFork].get(key.data, decode).expectDb() and success @@ -873,7 +873,7 @@ proc getBlockSZ*( let dataPtr = addr data # Short-lived var success = true proc decode(data: openArray[byte]) = - try: dataPtr[] = framingFormatCompress( + try: dataPtr[] = snappy.encodeFramed( snappy.decode(data, maxDecompressedDbRecordSize)) except CatchableError: success = false db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or @@ -885,7 +885,7 @@ proc getBlockSZ*( let dataPtr = addr data # Short-lived var success = true proc decode(data: openArray[byte]) = - try: dataPtr[] = framingFormatCompress( + try: dataPtr[] = snappy.encodeFramed( snappy.decode(data, maxDecompressedDbRecordSize)) except CatchableError: success = false db.blocks[T.toFork].get(key.data, decode).expectDb() and success diff --git a/beacon_chain/era_db.nim b/beacon_chain/era_db.nim index 83881a6e4..cb70900bb 100644 --- a/beacon_chain/era_db.nim +++ b/beacon_chain/era_db.nim @@ -6,8 +6,7 @@ import std/os, - stew/results, - snappy/framing, + stew/results, snappy, ../ncli/e2store, ./spec/datatypes/[altair, bellatrix, phase0], ./spec/forks, @@ -118,7 +117,7 @@ proc getBlockSSZ*( ? db.getBlockSZ(historical_roots, slot, tmp) try: - bytes = framingFormatUncompress(tmp) + bytes = decodeFramed(tmp) ok() except CatchableError as exc: err(exc.msg) @@ -172,7 +171,7 @@ proc getStateSSZ*( ? db.getStateSZ(historical_roots, slot, tmp) try: - bytes = framingFormatUncompress(tmp) + bytes = decodeFramed(tmp) ok() except CatchableError as exc: err(exc.msg) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 0a84243ad..941f32eec 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -15,7 +15,7 @@ import stew/[leb128, endians2, results, byteutils, io2, bitops2], bearssl, stew/shims/net as stewNet, stew/shims/[macros], - faststreams/[inputs, outputs, buffers], snappy, snappy/framing, + faststreams/[inputs, outputs, buffers], snappy, snappy/faststreams, json_serialization, json_serialization/std/[net, sets, options], chronos, chronicles, metrics, libp2p/[switch, peerinfo, multiaddress, multicodec, crypto/crypto, @@ -548,7 +548,7 @@ proc writeChunk*(conn: Connection, output.write toBytes(payload.lenu64, Leb128).toOpenArray() - framingFormatCompress(output, payload) + compressFramed(payload, output) except IOError as exc: raiseAssert exc.msg # memoryOutput shouldn't raise conn.write(output.getOutput) diff --git a/beacon_chain/networking/libp2p_streams_backend.nim b/beacon_chain/networking/libp2p_streams_backend.nim index 0d6073c9f..f0f318bbf 100644 --- a/beacon_chain/networking/libp2p_streams_backend.nim +++ b/beacon_chain/networking/libp2p_streams_backend.nim @@ -9,33 +9,37 @@ proc uncompressFramedStream*(conn: Connection, expectedSize: int): Future[Result[seq[byte], cstring]] {.async.} = - var header: array[STREAM_HEADER.len, byte] + var header: array[framingHeader.len, byte] try: await conn.readExactly(addr header[0], header.len) except LPStreamEOFError, LPStreamIncompleteError: return err "Unexpected EOF before snappy header" - if header != STREAM_HEADER.toOpenArrayByte(0, STREAM_HEADER.high): + if header != framingHeader: return err "Incorrect snappy header" - var - uncompressedData = newSeq[byte](MAX_UNCOMPRESSED_DATA_LEN) - frameData = newSeq[byte](MAX_COMPRESSED_DATA_LEN) - output = newSeqOfCap[byte](expectedSize) + static: + doAssert maxCompressedFrameDataLen >= maxUncompressedFrameDataLen.uint64 - while output.len < expectedSize: + var + frameData = newSeq[byte](maxCompressedFrameDataLen + 4) + output = newSeqUninitialized[byte](expectedSize) + written = 0 + + while written < expectedSize: var frameHeader: array[4, byte] try: await conn.readExactly(addr frameHeader[0], frameHeader.len) except LPStreamEOFError, LPStreamIncompleteError: - return err "no snappy frame" + return err "Snappy frame header missing" - let x = uint32.fromBytesLE frameHeader - let id = x and 0xFF - let dataLen = (x shr 8).int + let (id, dataLen) = decodeFrameHeader(frameHeader) - if dataLen > MAX_COMPRESSED_DATA_LEN: - return err "invalid snappy frame length" + if dataLen > frameData.len: + # In theory, compressed frames could be bigger and still result in a + # valid, small snappy frame, but this would mean they are not getting + # compressed correctly + return err "Snappy frame too big" if dataLen > 0: try: @@ -43,49 +47,43 @@ proc uncompressFramedStream*(conn: Connection, except LPStreamEOFError, LPStreamIncompleteError: return err "Incomplete snappy frame" - if id == COMPRESSED_DATA_IDENTIFIER: - if dataLen < 4: - return err "Snappy frame size too low to contain CRC checksum" + if id == chunkCompressed: + if dataLen < 6: # At least CRC + 2 bytes of frame data + return err "Compressed snappy frame too small" let crc = uint32.fromBytesLE frameData.toOpenArray(0, 3) - remaining = expectedSize - output.len - chunkLen = min(remaining, uncompressedData.len) + uncompressed = + snappy.uncompress( + frameData.toOpenArray(4, dataLen - 1), + output.toOpenArray(written, output.high)).valueOr: + return err "Failed to decompress content" - # Grab up to MAX_UNCOMPRESSED_DATA_LEN bytes, but no more than remains - # according to the expected size. If it turns out that the uncompressed - # data is longer than that, snappyUncompress will fail and we will not - # decompress the chunk at all, instead reporting failure. - let - # The `int` conversion below is safe, because `uncompressedLen` is - # bounded to `chunkLen` (which in turn is bounded by `MAX_CHUNK_SIZE`). - # TODO: Use a range type for the parameter. - uncompressedLen = int snappyUncompress( - frameData.toOpenArray(4, dataLen - 1), - uncompressedData.toOpenArray(0, chunkLen - 1)) - - if uncompressedLen == 0: - return err "Failed to decompress snappy frame" - doAssert output.len + uncompressedLen <= expectedSize, - "enforced by `remains` limit above" - - if not checkCrc(uncompressedData.toOpenArray(0, uncompressedLen-1), crc): + if maskedCrc( + output.toOpenArray(written, written + uncompressed-1)) != crc: return err "Snappy content CRC checksum failed" - output.add uncompressedData.toOpenArray(0, uncompressedLen-1) + written += uncompressed - elif id == UNCOMPRESSED_DATA_IDENTIFIER: - if dataLen < 4: - return err "Snappy frame size too low to contain CRC checksum" + elif id == chunkUncompressed: + if dataLen < 5: # At least one byte of data + return err "Uncompressed snappy frame too small" - if output.len + dataLen - 4 > expectedSize: + let uncompressed = dataLen - 4 + + if uncompressed > maxUncompressedFrameDataLen.int: + return err "Snappy frame size too large" + + if uncompressed > output.len - written: return err "Too much data" let crc = uint32.fromBytesLE frameData.toOpenArray(0, 3) - if not checkCrc(frameData.toOpenArray(4, dataLen - 1), crc): + if maskedCrc(frameData.toOpenArray(4, dataLen - 1)) != crc: return err "Snappy content CRC checksum failed" - output.add frameData.toOpenArray(4, dataLen-1) + output[written..