Snappy revamp (#3564)

This PR makes the necessary adjustments to deal with the revamped snappy
API.

In practical terms for nimbus-eth2, there are performance increases to
gossip processing, database reading and writing as well as era file
processing. Exporting `.era` files for example, a snappy-heavy
operation, almost halves in total processing time:

Pre:

```
     Average,       StdDev,          Min,          Max,      Samples,         Test
      39.088,        8.735,       23.619,       53.301,           50, tState
     237.079,       46.692,      165.620,      355.481,           49, tBlocks
```

Post:

```
All time are ms
     Average,       StdDev,          Min,          Max,      Samples,         Test
      25.350,        5.303,       15.351,       41.856,           50, tState
     141.238,       24.164,       99.990,      199.329,           49, tBlocks
```
This commit is contained in:
Jacek Sieka 2022-04-15 09:44:06 +02:00 committed by GitHub
parent b237afeb16
commit d0dbc4a8f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 63 additions and 69 deletions

View File

@ -10,7 +10,7 @@
import
std/[typetraits, tables],
stew/[arrayops, assign2, byteutils, endians2, io2, objects, results],
serialization, chronicles, snappy, snappy/framing,
serialization, chronicles, snappy,
eth/db/[kvstore, kvstore_sqlite3],
./networking/network_metadata, ./beacon_chain_db_immutable,
./spec/[eth2_ssz_serialization, eth2_merkleization, forks, state_transition],
@ -526,7 +526,7 @@ proc decodeSnappySSZ[T](data: openArray[byte], output: var T): bool =
proc decodeSZSSZ[T](data: openArray[byte], output: var T): bool =
try:
let decompressed = framingFormatUncompress(data)
let decompressed = decodeFramed(data)
readSszBytes(decompressed, output, updateRoot = false)
true
except CatchableError as e:
@ -552,7 +552,7 @@ proc encodeSnappySSZ(v: auto): seq[byte] =
proc encodeSZSSZ(v: auto): seq[byte] =
# https://github.com/google/snappy/blob/main/framing_format.txt
try:
framingFormatCompress(SSZ.encode(v))
encodeFramed(SSZ.encode(v))
except CatchableError as err:
# In-memory encode shouldn't fail!
raiseAssert err.msg
@ -852,7 +852,7 @@ proc getBlockSSZ*(
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = framingFormatUncompress(data)
try: dataPtr[] = decodeFramed(data)
except CatchableError: success = false
db.blocks[T.toFork].get(key.data, decode).expectDb() and success
@ -873,7 +873,7 @@ proc getBlockSZ*(
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = framingFormatCompress(
try: dataPtr[] = snappy.encodeFramed(
snappy.decode(data, maxDecompressedDbRecordSize))
except CatchableError: success = false
db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or
@ -885,7 +885,7 @@ proc getBlockSZ*(
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = framingFormatCompress(
try: dataPtr[] = snappy.encodeFramed(
snappy.decode(data, maxDecompressedDbRecordSize))
except CatchableError: success = false
db.blocks[T.toFork].get(key.data, decode).expectDb() and success

View File

@ -6,8 +6,7 @@
import
std/os,
stew/results,
snappy/framing,
stew/results, snappy,
../ncli/e2store,
./spec/datatypes/[altair, bellatrix, phase0],
./spec/forks,
@ -118,7 +117,7 @@ proc getBlockSSZ*(
? db.getBlockSZ(historical_roots, slot, tmp)
try:
bytes = framingFormatUncompress(tmp)
bytes = decodeFramed(tmp)
ok()
except CatchableError as exc:
err(exc.msg)
@ -172,7 +171,7 @@ proc getStateSSZ*(
? db.getStateSZ(historical_roots, slot, tmp)
try:
bytes = framingFormatUncompress(tmp)
bytes = decodeFramed(tmp)
ok()
except CatchableError as exc:
err(exc.msg)

View File

@ -15,7 +15,7 @@ import
stew/[leb128, endians2, results, byteutils, io2, bitops2], bearssl,
stew/shims/net as stewNet,
stew/shims/[macros],
faststreams/[inputs, outputs, buffers], snappy, snappy/framing,
faststreams/[inputs, outputs, buffers], snappy, snappy/faststreams,
json_serialization, json_serialization/std/[net, sets, options],
chronos, chronicles, metrics,
libp2p/[switch, peerinfo, multiaddress, multicodec, crypto/crypto,
@ -548,7 +548,7 @@ proc writeChunk*(conn: Connection,
output.write toBytes(payload.lenu64, Leb128).toOpenArray()
framingFormatCompress(output, payload)
compressFramed(payload, output)
except IOError as exc:
raiseAssert exc.msg # memoryOutput shouldn't raise
conn.write(output.getOutput)

View File

@ -9,33 +9,37 @@
proc uncompressFramedStream*(conn: Connection,
expectedSize: int): Future[Result[seq[byte], cstring]]
{.async.} =
var header: array[STREAM_HEADER.len, byte]
var header: array[framingHeader.len, byte]
try:
await conn.readExactly(addr header[0], header.len)
except LPStreamEOFError, LPStreamIncompleteError:
return err "Unexpected EOF before snappy header"
if header != STREAM_HEADER.toOpenArrayByte(0, STREAM_HEADER.high):
if header != framingHeader:
return err "Incorrect snappy header"
var
uncompressedData = newSeq[byte](MAX_UNCOMPRESSED_DATA_LEN)
frameData = newSeq[byte](MAX_COMPRESSED_DATA_LEN)
output = newSeqOfCap[byte](expectedSize)
static:
doAssert maxCompressedFrameDataLen >= maxUncompressedFrameDataLen.uint64
while output.len < expectedSize:
var
frameData = newSeq[byte](maxCompressedFrameDataLen + 4)
output = newSeqUninitialized[byte](expectedSize)
written = 0
while written < expectedSize:
var frameHeader: array[4, byte]
try:
await conn.readExactly(addr frameHeader[0], frameHeader.len)
except LPStreamEOFError, LPStreamIncompleteError:
return err "no snappy frame"
return err "Snappy frame header missing"
let x = uint32.fromBytesLE frameHeader
let id = x and 0xFF
let dataLen = (x shr 8).int
let (id, dataLen) = decodeFrameHeader(frameHeader)
if dataLen > MAX_COMPRESSED_DATA_LEN:
return err "invalid snappy frame length"
if dataLen > frameData.len:
# In theory, compressed frames could be bigger and still result in a
# valid, small snappy frame, but this would mean they are not getting
# compressed correctly
return err "Snappy frame too big"
if dataLen > 0:
try:
@ -43,49 +47,43 @@ proc uncompressFramedStream*(conn: Connection,
except LPStreamEOFError, LPStreamIncompleteError:
return err "Incomplete snappy frame"
if id == COMPRESSED_DATA_IDENTIFIER:
if dataLen < 4:
return err "Snappy frame size too low to contain CRC checksum"
if id == chunkCompressed:
if dataLen < 6: # At least CRC + 2 bytes of frame data
return err "Compressed snappy frame too small"
let
crc = uint32.fromBytesLE frameData.toOpenArray(0, 3)
remaining = expectedSize - output.len
chunkLen = min(remaining, uncompressedData.len)
uncompressed =
snappy.uncompress(
frameData.toOpenArray(4, dataLen - 1),
output.toOpenArray(written, output.high)).valueOr:
return err "Failed to decompress content"
# Grab up to MAX_UNCOMPRESSED_DATA_LEN bytes, but no more than remains
# according to the expected size. If it turns out that the uncompressed
# data is longer than that, snappyUncompress will fail and we will not
# decompress the chunk at all, instead reporting failure.
let
# The `int` conversion below is safe, because `uncompressedLen` is
# bounded to `chunkLen` (which in turn is bounded by `MAX_CHUNK_SIZE`).
# TODO: Use a range type for the parameter.
uncompressedLen = int snappyUncompress(
frameData.toOpenArray(4, dataLen - 1),
uncompressedData.toOpenArray(0, chunkLen - 1))
if uncompressedLen == 0:
return err "Failed to decompress snappy frame"
doAssert output.len + uncompressedLen <= expectedSize,
"enforced by `remains` limit above"
if not checkCrc(uncompressedData.toOpenArray(0, uncompressedLen-1), crc):
if maskedCrc(
output.toOpenArray(written, written + uncompressed-1)) != crc:
return err "Snappy content CRC checksum failed"
output.add uncompressedData.toOpenArray(0, uncompressedLen-1)
written += uncompressed
elif id == UNCOMPRESSED_DATA_IDENTIFIER:
if dataLen < 4:
return err "Snappy frame size too low to contain CRC checksum"
elif id == chunkUncompressed:
if dataLen < 5: # At least one byte of data
return err "Uncompressed snappy frame too small"
if output.len + dataLen - 4 > expectedSize:
let uncompressed = dataLen - 4
if uncompressed > maxUncompressedFrameDataLen.int:
return err "Snappy frame size too large"
if uncompressed > output.len - written:
return err "Too much data"
let crc = uint32.fromBytesLE frameData.toOpenArray(0, 3)
if not checkCrc(frameData.toOpenArray(4, dataLen - 1), crc):
if maskedCrc(frameData.toOpenArray(4, dataLen - 1)) != crc:
return err "Snappy content CRC checksum failed"
output.add frameData.toOpenArray(4, dataLen-1)
output[written..<written + uncompressed] =
frameData.toOpenArray(4, dataLen-1)
written += uncompressed
elif id < 0x80:
# Reserved unskippable chunks (chunk types 0x02-0x7f)

View File

@ -3,7 +3,7 @@
import
std/strformat,
stew/[arrayops, endians2, io2, results],
snappy, snappy/framing,
snappy,
../beacon_chain/spec/[beacon_time, forks],
../beacon_chain/spec/eth2_ssz_serialization
@ -89,10 +89,7 @@ proc appendRecord*(f: IoHandle, typ: Type, data: openArray[byte]): Result[int64,
ok(start)
proc toCompressedBytes(item: auto): seq[byte] =
try:
framingFormatCompress(SSZ.encode(item))
except CatchableError as exc:
raiseAssert exc.msg # shouldn't happen
snappy.encodeFramed(SSZ.encode(item))
proc appendRecord*(f: IoHandle, v: ForkyTrustedSignedBeaconBlock): Result[int64, string] =
f.appendRecord(SnappyBeaconBlock, toCompressedBytes(v))

View File

@ -7,7 +7,7 @@
import
std/[os, stats, strformat, tables],
snappy, snappy/framing,
snappy,
chronicles, confutils, stew/[byteutils, io2], eth/db/kvstore_sqlite3,
../beacon_chain/networking/network_metadata,
../beacon_chain/[beacon_chain_db],
@ -536,7 +536,7 @@ proc cmdImportEra(conf: DbConf, cfg: RuntimeConfig) =
if header.typ == SnappyBeaconBlock:
withTimer(timers[tBlock]):
let uncompressed = framingFormatUncompress(data)
let uncompressed = decodeFramed(data)
let blck = try: readSszForkedSignedBeaconBlock(cfg, uncompressed)
except CatchableError as exc:
error "Invalid snappy block", msg = exc.msg, file
@ -547,7 +547,7 @@ proc cmdImportEra(conf: DbConf, cfg: RuntimeConfig) =
blocks += 1
elif header.typ == SnappyBeaconState:
withTimer(timers[tState]):
let uncompressed = framingFormatUncompress(data)
let uncompressed = decodeFramed(data)
let state = try: newClone(
readSszForkedHashedBeaconState(cfg, uncompressed))
except CatchableError as exc:

View File

@ -9,12 +9,12 @@
import
std/[algorithm, options, sequtils],
unittest2,
unittest2, snappy,
../beacon_chain/[beacon_chain_db, interop],
../beacon_chain/spec/[beaconstate, forks, state_transition],
../beacon_chain/spec/datatypes/[phase0, altair, bellatrix],
../beacon_chain/consensus_object_pools/blockchain_dag,
eth/db/kvstore, snappy/framing,
eth/db/kvstore,
# test utilies
./testutil, ./testdbutil, ./testblockutil, ./teststateutil
@ -110,7 +110,7 @@ suite "Beacon chain DB" & preset():
db.getBlockSSZ(root, tmp, phase0.TrustedSignedBeaconBlock)
db.getBlockSZ(root, tmp2, phase0.TrustedSignedBeaconBlock)
tmp == SSZ.encode(signedBlock)
tmp2 == framingFormatCompress(tmp)
tmp2 == encodeFramed(tmp)
db.delBlock(root)
check:
@ -152,7 +152,7 @@ suite "Beacon chain DB" & preset():
db.getBlockSSZ(root, tmp, altair.TrustedSignedBeaconBlock)
db.getBlockSZ(root, tmp2, altair.TrustedSignedBeaconBlock)
tmp == SSZ.encode(signedBlock)
tmp2 == framingFormatCompress(tmp)
tmp2 == encodeFramed(tmp)
db.delBlock(root)
check:
@ -194,7 +194,7 @@ suite "Beacon chain DB" & preset():
db.getBlockSSZ(root, tmp, bellatrix.TrustedSignedBeaconBlock)
db.getBlockSZ(root, tmp2, bellatrix.TrustedSignedBeaconBlock)
tmp == SSZ.encode(signedBlock)
tmp2 == framingFormatCompress(tmp)
tmp2 == encodeFramed(tmp)
db.delBlock(root)
check:

2
vendor/nim-snappy vendored

@ -1 +1 @@
Subproject commit 6537b10600686f3a698aa5f4f71cc12a40154eb9
Subproject commit 7cb2e57a58619a6ca2be94db94591467a41d8476