diff --git a/beacon_chain/beacon_chain_db.nim b/beacon_chain/beacon_chain_db.nim index 68ef6c878..fdd88f9c1 100644 --- a/beacon_chain/beacon_chain_db.nim +++ b/beacon_chain/beacon_chain_db.nim @@ -10,7 +10,7 @@ import std/[typetraits, tables], stew/[arrayops, assign2, byteutils, endians2, io2, objects, results], - serialization, chronicles, snappy, snappy/framing, + serialization, chronicles, snappy, eth/db/[kvstore, kvstore_sqlite3], ./networking/network_metadata, ./beacon_chain_db_immutable, ./spec/[eth2_ssz_serialization, eth2_merkleization, forks, state_transition], @@ -526,7 +526,7 @@ proc decodeSnappySSZ[T](data: openArray[byte], output: var T): bool = proc decodeSZSSZ[T](data: openArray[byte], output: var T): bool = try: - let decompressed = framingFormatUncompress(data) + let decompressed = decodeFramed(data) readSszBytes(decompressed, output, updateRoot = false) true except CatchableError as e: @@ -552,7 +552,7 @@ proc encodeSnappySSZ(v: auto): seq[byte] = proc encodeSZSSZ(v: auto): seq[byte] = # https://github.com/google/snappy/blob/main/framing_format.txt try: - framingFormatCompress(SSZ.encode(v)) + encodeFramed(SSZ.encode(v)) except CatchableError as err: # In-memory encode shouldn't fail! raiseAssert err.msg @@ -852,7 +852,7 @@ proc getBlockSSZ*( let dataPtr = addr data # Short-lived var success = true proc decode(data: openArray[byte]) = - try: dataPtr[] = framingFormatUncompress(data) + try: dataPtr[] = decodeFramed(data) except CatchableError: success = false db.blocks[T.toFork].get(key.data, decode).expectDb() and success @@ -873,7 +873,7 @@ proc getBlockSZ*( let dataPtr = addr data # Short-lived var success = true proc decode(data: openArray[byte]) = - try: dataPtr[] = framingFormatCompress( + try: dataPtr[] = snappy.encodeFramed( snappy.decode(data, maxDecompressedDbRecordSize)) except CatchableError: success = false db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or @@ -885,7 +885,7 @@ proc getBlockSZ*( let dataPtr = addr data # Short-lived var success = true proc decode(data: openArray[byte]) = - try: dataPtr[] = framingFormatCompress( + try: dataPtr[] = snappy.encodeFramed( snappy.decode(data, maxDecompressedDbRecordSize)) except CatchableError: success = false db.blocks[T.toFork].get(key.data, decode).expectDb() and success diff --git a/beacon_chain/era_db.nim b/beacon_chain/era_db.nim index 83881a6e4..cb70900bb 100644 --- a/beacon_chain/era_db.nim +++ b/beacon_chain/era_db.nim @@ -6,8 +6,7 @@ import std/os, - stew/results, - snappy/framing, + stew/results, snappy, ../ncli/e2store, ./spec/datatypes/[altair, bellatrix, phase0], ./spec/forks, @@ -118,7 +117,7 @@ proc getBlockSSZ*( ? db.getBlockSZ(historical_roots, slot, tmp) try: - bytes = framingFormatUncompress(tmp) + bytes = decodeFramed(tmp) ok() except CatchableError as exc: err(exc.msg) @@ -172,7 +171,7 @@ proc getStateSSZ*( ? db.getStateSZ(historical_roots, slot, tmp) try: - bytes = framingFormatUncompress(tmp) + bytes = decodeFramed(tmp) ok() except CatchableError as exc: err(exc.msg) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 0a84243ad..941f32eec 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -15,7 +15,7 @@ import stew/[leb128, endians2, results, byteutils, io2, bitops2], bearssl, stew/shims/net as stewNet, stew/shims/[macros], - faststreams/[inputs, outputs, buffers], snappy, snappy/framing, + faststreams/[inputs, outputs, buffers], snappy, snappy/faststreams, json_serialization, json_serialization/std/[net, sets, options], chronos, chronicles, metrics, libp2p/[switch, peerinfo, multiaddress, multicodec, crypto/crypto, @@ -548,7 +548,7 @@ proc writeChunk*(conn: Connection, output.write toBytes(payload.lenu64, Leb128).toOpenArray() - framingFormatCompress(output, payload) + compressFramed(payload, output) except IOError as exc: raiseAssert exc.msg # memoryOutput shouldn't raise conn.write(output.getOutput) diff --git a/beacon_chain/networking/libp2p_streams_backend.nim b/beacon_chain/networking/libp2p_streams_backend.nim index 0d6073c9f..f0f318bbf 100644 --- a/beacon_chain/networking/libp2p_streams_backend.nim +++ b/beacon_chain/networking/libp2p_streams_backend.nim @@ -9,33 +9,37 @@ proc uncompressFramedStream*(conn: Connection, expectedSize: int): Future[Result[seq[byte], cstring]] {.async.} = - var header: array[STREAM_HEADER.len, byte] + var header: array[framingHeader.len, byte] try: await conn.readExactly(addr header[0], header.len) except LPStreamEOFError, LPStreamIncompleteError: return err "Unexpected EOF before snappy header" - if header != STREAM_HEADER.toOpenArrayByte(0, STREAM_HEADER.high): + if header != framingHeader: return err "Incorrect snappy header" - var - uncompressedData = newSeq[byte](MAX_UNCOMPRESSED_DATA_LEN) - frameData = newSeq[byte](MAX_COMPRESSED_DATA_LEN) - output = newSeqOfCap[byte](expectedSize) + static: + doAssert maxCompressedFrameDataLen >= maxUncompressedFrameDataLen.uint64 - while output.len < expectedSize: + var + frameData = newSeq[byte](maxCompressedFrameDataLen + 4) + output = newSeqUninitialized[byte](expectedSize) + written = 0 + + while written < expectedSize: var frameHeader: array[4, byte] try: await conn.readExactly(addr frameHeader[0], frameHeader.len) except LPStreamEOFError, LPStreamIncompleteError: - return err "no snappy frame" + return err "Snappy frame header missing" - let x = uint32.fromBytesLE frameHeader - let id = x and 0xFF - let dataLen = (x shr 8).int + let (id, dataLen) = decodeFrameHeader(frameHeader) - if dataLen > MAX_COMPRESSED_DATA_LEN: - return err "invalid snappy frame length" + if dataLen > frameData.len: + # In theory, compressed frames could be bigger and still result in a + # valid, small snappy frame, but this would mean they are not getting + # compressed correctly + return err "Snappy frame too big" if dataLen > 0: try: @@ -43,49 +47,43 @@ proc uncompressFramedStream*(conn: Connection, except LPStreamEOFError, LPStreamIncompleteError: return err "Incomplete snappy frame" - if id == COMPRESSED_DATA_IDENTIFIER: - if dataLen < 4: - return err "Snappy frame size too low to contain CRC checksum" + if id == chunkCompressed: + if dataLen < 6: # At least CRC + 2 bytes of frame data + return err "Compressed snappy frame too small" let crc = uint32.fromBytesLE frameData.toOpenArray(0, 3) - remaining = expectedSize - output.len - chunkLen = min(remaining, uncompressedData.len) + uncompressed = + snappy.uncompress( + frameData.toOpenArray(4, dataLen - 1), + output.toOpenArray(written, output.high)).valueOr: + return err "Failed to decompress content" - # Grab up to MAX_UNCOMPRESSED_DATA_LEN bytes, but no more than remains - # according to the expected size. If it turns out that the uncompressed - # data is longer than that, snappyUncompress will fail and we will not - # decompress the chunk at all, instead reporting failure. - let - # The `int` conversion below is safe, because `uncompressedLen` is - # bounded to `chunkLen` (which in turn is bounded by `MAX_CHUNK_SIZE`). - # TODO: Use a range type for the parameter. - uncompressedLen = int snappyUncompress( - frameData.toOpenArray(4, dataLen - 1), - uncompressedData.toOpenArray(0, chunkLen - 1)) - - if uncompressedLen == 0: - return err "Failed to decompress snappy frame" - doAssert output.len + uncompressedLen <= expectedSize, - "enforced by `remains` limit above" - - if not checkCrc(uncompressedData.toOpenArray(0, uncompressedLen-1), crc): + if maskedCrc( + output.toOpenArray(written, written + uncompressed-1)) != crc: return err "Snappy content CRC checksum failed" - output.add uncompressedData.toOpenArray(0, uncompressedLen-1) + written += uncompressed - elif id == UNCOMPRESSED_DATA_IDENTIFIER: - if dataLen < 4: - return err "Snappy frame size too low to contain CRC checksum" + elif id == chunkUncompressed: + if dataLen < 5: # At least one byte of data + return err "Uncompressed snappy frame too small" - if output.len + dataLen - 4 > expectedSize: + let uncompressed = dataLen - 4 + + if uncompressed > maxUncompressedFrameDataLen.int: + return err "Snappy frame size too large" + + if uncompressed > output.len - written: return err "Too much data" let crc = uint32.fromBytesLE frameData.toOpenArray(0, 3) - if not checkCrc(frameData.toOpenArray(4, dataLen - 1), crc): + if maskedCrc(frameData.toOpenArray(4, dataLen - 1)) != crc: return err "Snappy content CRC checksum failed" - output.add frameData.toOpenArray(4, dataLen-1) + output[written..