mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-11 14:54:12 +00:00
simplify libp2p snappy
* handle a few more exceptions gracefully (in libp2p also) * unify libp2p varint parsing * decompress directly into seq * avoid seq slice * stop oversized snappy processing earlier (lowers risk)
This commit is contained in:
parent
cdc866007d
commit
a605c7244e
@ -408,7 +408,7 @@ proc putState(pool: BlockPool, state: HashedBeaconState, blck: BlockRef) =
|
||||
while pool.cachedStates.len > MAX_CACHE_SIZE:
|
||||
discard pool.cachedStates.pop()
|
||||
let cacheLen = pool.cachedStates.len
|
||||
debug "BlockPool.putState(): state cache updated", cacheLen
|
||||
trace "BlockPool.putState(): state cache updated", cacheLen
|
||||
doAssert cacheLen > 0 and cacheLen <= MAX_CACHE_SIZE
|
||||
|
||||
proc add*(
|
||||
|
@ -1,25 +1,26 @@
|
||||
# TODO: How can this be tested?
|
||||
proc uncompressFramedStream*(conn: Connection,
|
||||
output: OutputStream,
|
||||
expectedSize: int): Future[Result[void, cstring]]
|
||||
expectedSize: int): Future[Result[seq[byte], cstring]]
|
||||
{.async.} =
|
||||
var header: array[STREAM_HEADER.len, byte]
|
||||
try:
|
||||
await conn.readExactly(addr header[0], header.len)
|
||||
except LPStreamEOFError:
|
||||
except LPStreamEOFError, LPStreamIncompleteError:
|
||||
return err "Unexpected EOF before snappy header"
|
||||
|
||||
if header != STREAM_HEADER.toOpenArrayByte(0, STREAM_HEADER.high):
|
||||
return err "Incorrect snappy header"
|
||||
|
||||
var totalBytesDecompressed = 0
|
||||
var uncompressedData = newSeq[byte](MAX_UNCOMPRESSED_DATA_LEN)
|
||||
var
|
||||
uncompressedData = newSeq[byte](MAX_UNCOMPRESSED_DATA_LEN)
|
||||
frameData = newSeq[byte](MAX_COMPRESSED_DATA_LEN)
|
||||
output = newSeqOfCap[byte](expectedSize)
|
||||
|
||||
while totalBytesDecompressed < expectedSize:
|
||||
while output.len < expectedSize:
|
||||
var frameHeader: array[4, byte]
|
||||
try:
|
||||
await conn.readExactly(addr frameHeader[0], frameHeader.len)
|
||||
except LPStreamEOFError:
|
||||
except LPStreamEOFError, LPStreamIncompleteError:
|
||||
break
|
||||
|
||||
let x = uint32.fromBytesLE frameHeader
|
||||
@ -29,10 +30,10 @@ proc uncompressFramedStream*(conn: Connection,
|
||||
if dataLen > MAX_COMPRESSED_DATA_LEN:
|
||||
return err "invalid snappy frame length"
|
||||
|
||||
var frameData = newSeq[byte](dataLen)
|
||||
if dataLen > 0:
|
||||
try:
|
||||
await conn.readExactly(addr frameData[0], dataLen)
|
||||
except LPStreamEOFError:
|
||||
except LPStreamEOFError, LPStreamIncompleteError:
|
||||
return err "Incomplete snappy frame"
|
||||
|
||||
if id == COMPRESSED_DATA_IDENTIFIER:
|
||||
@ -40,26 +41,34 @@ proc uncompressFramedStream*(conn: Connection,
|
||||
return err "Snappy frame size too low to contain CRC checksum"
|
||||
|
||||
let
|
||||
crc = uint32.fromBytesLE frameData[0..3]
|
||||
uncompressedLen = snappyUncompress(frameData.toOpenArray(4, frameData.high), uncompressedData)
|
||||
crc = uint32.fromBytesLE frameData.toOpenArray(0, 3)
|
||||
todo = expectedSize - output.len
|
||||
uncompressedLen = snappyUncompress(
|
||||
frameData.toOpenArray(4, dataLen - 1),
|
||||
uncompressedData.toOpenArray(0, min(todo, uncompressedData.len) - 1))
|
||||
|
||||
if uncompressedLen <= 0:
|
||||
return err "Failed to decompress snappy frame"
|
||||
doAssert output.len + uncompressedLen <= expectedSize,
|
||||
"enforced by `min` above"
|
||||
|
||||
if not checkCrcAndAppend(output, uncompressedData.toOpenArray(0, uncompressedLen-1), crc):
|
||||
if not checkCrc(uncompressedData.toOpenArray(0, uncompressedLen-1), crc):
|
||||
return err "Snappy content CRC checksum failed"
|
||||
|
||||
totalBytesDecompressed += uncompressedLen
|
||||
output.add uncompressedData.toOpenArray(0, uncompressedLen-1)
|
||||
|
||||
elif id == UNCOMPRESSED_DATA_IDENTIFIER:
|
||||
if dataLen < 4:
|
||||
return err "Snappy frame size too low to contain CRC checksum"
|
||||
|
||||
let crc = uint32.fromBytesLE frameData[0..3]
|
||||
if not checkCrcAndAppend(output, frameData.toOpenArray(4, frameData.high), crc):
|
||||
if output.len + dataLen - 4 > expectedSize:
|
||||
return err "Too much data"
|
||||
|
||||
let crc = uint32.fromBytesLE frameData.toOpenArray(0, 3)
|
||||
if not checkCrc(frameData.toOpenArray(4, dataLen - 1), crc):
|
||||
return err "Snappy content CRC checksum failed"
|
||||
|
||||
totalBytesDecompressed += frameData.len - 4
|
||||
output.add frameData.toOpenArray(4, dataLen-1)
|
||||
|
||||
elif id < 0x80:
|
||||
# Reserved unskippable chunks (chunk types 0x02-0x7f)
|
||||
@ -72,55 +81,37 @@ proc uncompressFramedStream*(conn: Connection,
|
||||
# including STREAM_HEADER (0xff) should be skipped
|
||||
continue
|
||||
|
||||
return ok()
|
||||
|
||||
proc readSizePrefix(conn: Connection,
|
||||
maxSize: uint32): Future[NetRes[uint32]] {.async.} =
|
||||
trace "about to read msg size prefix"
|
||||
var parser: VarintParser[uint32, ProtoBuf]
|
||||
try:
|
||||
while true:
|
||||
var nextByte: byte
|
||||
await conn.readExactly(addr nextByte, 1)
|
||||
case parser.feedByte(nextByte)
|
||||
of Done:
|
||||
let res = parser.getResult
|
||||
if res > maxSize:
|
||||
return neterr SizePrefixOverflow
|
||||
else:
|
||||
return ok res
|
||||
of Overflow:
|
||||
return neterr SizePrefixOverflow
|
||||
of Incomplete:
|
||||
continue
|
||||
except LPStreamEOFError:
|
||||
return neterr UnexpectedEOF
|
||||
return ok output
|
||||
|
||||
proc readChunkPayload(conn: Connection,
|
||||
noSnappy: bool,
|
||||
MsgType: type): Future[NetRes[MsgType]] {.async.} =
|
||||
let prefix = await readSizePrefix(conn, MAX_CHUNK_SIZE)
|
||||
let size = if prefix.isOk: prefix.value.int
|
||||
else: return err(prefix.error)
|
||||
let size =
|
||||
try: await conn.readVarint()
|
||||
except LPStreamEOFError: #, LPStreamIncompleteError, InvalidVarintError
|
||||
# TODO compiler error - haha, uncaught exception
|
||||
# Error: unhandled exception: closureiters.nim(322, 17) `c[i].kind == nkType` [AssertionError]
|
||||
return neterr UnexpectedEOF
|
||||
except LPStreamIncompleteError:
|
||||
return neterr UnexpectedEOF
|
||||
except InvalidVarintError:
|
||||
return neterr UnexpectedEOF
|
||||
|
||||
if size > MAX_CHUNK_SIZE:
|
||||
return neterr SizePrefixOverflow
|
||||
if size == 0:
|
||||
return neterr ZeroSizePrefix
|
||||
|
||||
if size > 0:
|
||||
if noSnappy:
|
||||
var bytes = newSeq[byte](size)
|
||||
var bytes = newSeq[byte](size.int)
|
||||
await conn.readExactly(addr bytes[0], bytes.len)
|
||||
return ok SSZ.decode(bytes, MsgType)
|
||||
else:
|
||||
var snappyOutput = memoryOutput()
|
||||
let status = await conn.uncompressFramedStream(snappyOutput, size)
|
||||
if status.isOk:
|
||||
var decompressedBytes = snappyOutput.getOutput
|
||||
if decompressedBytes.len != size:
|
||||
return neterr InvalidSnappyBytes
|
||||
else:
|
||||
return ok SSZ.decode(decompressedBytes, MsgType)
|
||||
let data = await conn.uncompressFramedStream(size.int)
|
||||
if data.isOk:
|
||||
return ok SSZ.decode(data.get(), MsgType)
|
||||
else:
|
||||
return neterr InvalidSnappyBytes
|
||||
else:
|
||||
return neterr ZeroSizePrefix
|
||||
|
||||
proc readResponseChunk(conn: Connection,
|
||||
noSnappy: bool,
|
||||
@ -129,7 +120,7 @@ proc readResponseChunk(conn: Connection,
|
||||
var responseCodeByte: byte
|
||||
try:
|
||||
await conn.readExactly(addr responseCodeByte, 1)
|
||||
except LPStreamEOFError:
|
||||
except LPStreamEOFError, LPStreamIncompleteError:
|
||||
return neterr PotentiallyExpectedEOF
|
||||
|
||||
static: assert ResponseCode.low.ord == 0
|
||||
@ -150,7 +141,7 @@ proc readResponseChunk(conn: Connection,
|
||||
|
||||
return await readChunkPayload(conn, noSnappy, MsgType)
|
||||
|
||||
except LPStreamEOFError:
|
||||
except LPStreamEOFError, LPStreamIncompleteError:
|
||||
return neterr UnexpectedEOF
|
||||
|
||||
proc readResponse(conn: Connection,
|
||||
@ -169,4 +160,3 @@ proc readResponse(conn: Connection,
|
||||
results.add nextRes.value
|
||||
else:
|
||||
return await conn.readResponseChunk(noSnappy, MsgType)
|
||||
|
||||
|
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
||||
Subproject commit 618e01eba39ffac0c4425ac193d8a4410201d856
|
||||
Subproject commit 0c3f26c9a5be645367ca433a1e343a634b567524
|
Loading…
x
Reference in New Issue
Block a user