2020-05-12 22:35:40 +00:00
|
|
|
# TODO: How can this be tested?
|
2020-05-12 22:37:07 +00:00
|
|
|
proc uncompressFramedStream*(conn: Connection,
|
2020-05-13 21:04:09 +00:00
|
|
|
expectedSize: int): Future[Result[seq[byte], cstring]]
|
2020-05-12 22:37:07 +00:00
|
|
|
{.async.} =
|
2020-05-12 22:35:40 +00:00
|
|
|
var header: array[STREAM_HEADER.len, byte]
|
|
|
|
try:
|
|
|
|
await conn.readExactly(addr header[0], header.len)
|
2020-05-13 21:04:09 +00:00
|
|
|
except LPStreamEOFError, LPStreamIncompleteError:
|
2020-05-12 22:35:40 +00:00
|
|
|
return err "Unexpected EOF before snappy header"
|
|
|
|
|
2020-05-12 22:37:07 +00:00
|
|
|
if header != STREAM_HEADER.toOpenArrayByte(0, STREAM_HEADER.high):
|
2020-05-12 22:35:40 +00:00
|
|
|
return err "Incorrect snappy header"
|
|
|
|
|
2020-05-13 21:04:09 +00:00
|
|
|
var
|
|
|
|
uncompressedData = newSeq[byte](MAX_UNCOMPRESSED_DATA_LEN)
|
|
|
|
frameData = newSeq[byte](MAX_COMPRESSED_DATA_LEN)
|
|
|
|
output = newSeqOfCap[byte](expectedSize)
|
2020-05-12 22:35:40 +00:00
|
|
|
|
2020-05-13 21:04:09 +00:00
|
|
|
while output.len < expectedSize:
|
2020-05-12 22:35:40 +00:00
|
|
|
var frameHeader: array[4, byte]
|
|
|
|
try:
|
|
|
|
await conn.readExactly(addr frameHeader[0], frameHeader.len)
|
2020-05-13 21:04:09 +00:00
|
|
|
except LPStreamEOFError, LPStreamIncompleteError:
|
2020-05-18 15:27:14 +00:00
|
|
|
return err "no snappy frame"
|
2020-05-12 22:35:40 +00:00
|
|
|
|
|
|
|
let x = uint32.fromBytesLE frameHeader
|
|
|
|
let id = x and 0xFF
|
|
|
|
let dataLen = (x shr 8).int
|
|
|
|
|
|
|
|
if dataLen > MAX_COMPRESSED_DATA_LEN:
|
|
|
|
return err "invalid snappy frame length"
|
|
|
|
|
2020-05-13 21:04:09 +00:00
|
|
|
if dataLen > 0:
|
|
|
|
try:
|
|
|
|
await conn.readExactly(addr frameData[0], dataLen)
|
|
|
|
except LPStreamEOFError, LPStreamIncompleteError:
|
|
|
|
return err "Incomplete snappy frame"
|
2020-05-12 22:35:40 +00:00
|
|
|
|
|
|
|
if id == COMPRESSED_DATA_IDENTIFIER:
|
|
|
|
if dataLen < 4:
|
|
|
|
return err "Snappy frame size too low to contain CRC checksum"
|
|
|
|
|
|
|
|
let
|
2020-05-13 21:04:09 +00:00
|
|
|
crc = uint32.fromBytesLE frameData.toOpenArray(0, 3)
|
2020-05-14 13:25:01 +00:00
|
|
|
remaining = expectedSize - output.len
|
|
|
|
chunkLen = min(remaining, uncompressedData.len)
|
|
|
|
|
|
|
|
# Grab up to MAX_UNCOMPRESSED_DATA_LEN bytes, but no more than remains
|
|
|
|
# according to the expected size. If it turns out that the uncompressed
|
|
|
|
# data is longer than that, snappyUncompress will fail and we will not
|
|
|
|
# decompress the chunk at all, instead reporting failure.
|
|
|
|
let
|
2020-08-18 20:31:55 +00:00
|
|
|
# The `int` conversion below is safe, because `uncompressedLen` is
|
|
|
|
# bounded to `chunkLen` (which in turn is bounded by `MAX_CHUNK_SIZE`).
|
|
|
|
# TODO: Use a range type for the parameter.
|
|
|
|
uncompressedLen = int snappyUncompress(
|
2020-05-13 21:04:09 +00:00
|
|
|
frameData.toOpenArray(4, dataLen - 1),
|
2020-05-14 13:25:01 +00:00
|
|
|
uncompressedData.toOpenArray(0, chunkLen - 1))
|
2020-05-12 22:35:40 +00:00
|
|
|
|
2020-08-18 20:31:55 +00:00
|
|
|
if uncompressedLen == 0:
|
2020-05-12 22:35:40 +00:00
|
|
|
return err "Failed to decompress snappy frame"
|
2020-05-13 21:04:09 +00:00
|
|
|
doAssert output.len + uncompressedLen <= expectedSize,
|
2020-05-14 13:25:01 +00:00
|
|
|
"enforced by `remains` limit above"
|
2020-05-12 22:35:40 +00:00
|
|
|
|
2020-05-13 21:04:09 +00:00
|
|
|
if not checkCrc(uncompressedData.toOpenArray(0, uncompressedLen-1), crc):
|
2020-05-12 22:35:40 +00:00
|
|
|
return err "Snappy content CRC checksum failed"
|
|
|
|
|
2020-05-13 21:04:09 +00:00
|
|
|
output.add uncompressedData.toOpenArray(0, uncompressedLen-1)
|
2020-05-12 22:37:07 +00:00
|
|
|
|
2020-05-12 22:35:40 +00:00
|
|
|
elif id == UNCOMPRESSED_DATA_IDENTIFIER:
|
|
|
|
if dataLen < 4:
|
|
|
|
return err "Snappy frame size too low to contain CRC checksum"
|
|
|
|
|
2020-05-13 21:04:09 +00:00
|
|
|
if output.len + dataLen - 4 > expectedSize:
|
|
|
|
return err "Too much data"
|
|
|
|
|
|
|
|
let crc = uint32.fromBytesLE frameData.toOpenArray(0, 3)
|
|
|
|
if not checkCrc(frameData.toOpenArray(4, dataLen - 1), crc):
|
2020-05-12 22:35:40 +00:00
|
|
|
return err "Snappy content CRC checksum failed"
|
|
|
|
|
2020-05-13 21:04:09 +00:00
|
|
|
output.add frameData.toOpenArray(4, dataLen-1)
|
2020-05-12 22:37:07 +00:00
|
|
|
|
2020-05-12 22:35:40 +00:00
|
|
|
elif id < 0x80:
|
|
|
|
# Reserved unskippable chunks (chunk types 0x02-0x7f)
|
|
|
|
# if we encounter this type of chunk, stop decoding
|
|
|
|
# the spec says it is an error
|
|
|
|
return err "Invalid snappy chunk type"
|
|
|
|
|
|
|
|
else:
|
|
|
|
# Reserved skippable chunks (chunk types 0x80-0xfe)
|
|
|
|
# including STREAM_HEADER (0xff) should be skipped
|
|
|
|
continue
|
|
|
|
|
2020-05-13 21:04:09 +00:00
|
|
|
return ok output
|
2020-05-12 22:37:07 +00:00
|
|
|
|
2021-07-07 09:09:47 +00:00
|
|
|
proc readChunkPayload*(conn: Connection, peer: Peer,
|
|
|
|
MsgType: type): Future[NetRes[MsgType]] {.async.} =
|
2020-07-25 06:12:23 +00:00
|
|
|
let sm = now(chronos.Moment)
|
2020-05-13 21:04:09 +00:00
|
|
|
let size =
|
|
|
|
try: await conn.readVarint()
|
|
|
|
except LPStreamEOFError: #, LPStreamIncompleteError, InvalidVarintError
|
|
|
|
# TODO compiler error - haha, uncaught exception
|
|
|
|
# Error: unhandled exception: closureiters.nim(322, 17) `c[i].kind == nkType` [AssertionError]
|
|
|
|
return neterr UnexpectedEOF
|
|
|
|
except LPStreamIncompleteError:
|
|
|
|
return neterr UnexpectedEOF
|
|
|
|
except InvalidVarintError:
|
|
|
|
return neterr UnexpectedEOF
|
|
|
|
|
|
|
|
if size > MAX_CHUNK_SIZE:
|
|
|
|
return neterr SizePrefixOverflow
|
|
|
|
if size == 0:
|
2020-05-12 22:37:07 +00:00
|
|
|
return neterr ZeroSizePrefix
|
2020-05-12 22:35:40 +00:00
|
|
|
|
2020-08-18 20:31:55 +00:00
|
|
|
# The `size.int` conversion is safe because `size` is bounded to `MAX_CHUNK_SIZE`
|
2020-08-10 13:18:17 +00:00
|
|
|
let data = await conn.uncompressFramedStream(size.int)
|
|
|
|
if data.isOk:
|
2020-07-25 06:12:23 +00:00
|
|
|
# `10` is the maximum size of variable integer on wire, so error could
|
|
|
|
# not be significant.
|
2020-08-10 13:18:17 +00:00
|
|
|
peer.updateNetThroughput(now(chronos.Moment) - sm,
|
|
|
|
uint64(10 + size))
|
|
|
|
return ok SSZ.decode(data.get(), MsgType)
|
2020-05-13 21:04:09 +00:00
|
|
|
else:
|
2020-09-06 08:39:25 +00:00
|
|
|
debug "Snappy decompression/read failed", msg = $data.error, conn
|
2020-08-10 13:18:17 +00:00
|
|
|
return neterr InvalidSnappyBytes
|
2020-05-13 21:04:09 +00:00
|
|
|
|
2020-08-10 13:18:17 +00:00
|
|
|
proc readResponseChunk(conn: Connection, peer: Peer,
|
2020-05-12 22:37:07 +00:00
|
|
|
MsgType: typedesc): Future[NetRes[MsgType]] {.async.} =
|
2021-07-07 09:09:47 +00:00
|
|
|
mixin readChunkPayload
|
|
|
|
|
2020-05-12 22:35:40 +00:00
|
|
|
try:
|
2020-05-12 22:37:07 +00:00
|
|
|
var responseCodeByte: byte
|
|
|
|
try:
|
|
|
|
await conn.readExactly(addr responseCodeByte, 1)
|
2020-05-13 21:04:09 +00:00
|
|
|
except LPStreamEOFError, LPStreamIncompleteError:
|
2020-05-12 22:37:07 +00:00
|
|
|
return neterr PotentiallyExpectedEOF
|
|
|
|
|
|
|
|
static: assert ResponseCode.low.ord == 0
|
|
|
|
if responseCodeByte > ResponseCode.high.byte:
|
|
|
|
return neterr InvalidResponseCode
|
|
|
|
|
|
|
|
let responseCode = ResponseCode responseCodeByte
|
|
|
|
case responseCode:
|
|
|
|
of InvalidRequest, ServerError:
|
2020-08-10 13:18:17 +00:00
|
|
|
let errorMsgChunk = await readChunkPayload(conn, peer, ErrorMsg)
|
2020-05-12 22:37:07 +00:00
|
|
|
let errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
|
|
|
|
else: return err(errorMsgChunk.error)
|
|
|
|
return err Eth2NetworkingError(kind: ReceivedErrorResponse,
|
|
|
|
responseCode: responseCode,
|
Implement split preset/config support (#2710)
* Implement split preset/config support
This is the initial bulk refactor to introduce runtime config values in
a number of places, somewhat replacing the existing mechanism of loading
network metadata.
It still needs more work, this is the initial refactor that introduces
runtime configuration in some of the places that need it.
The PR changes the way presets and constants work, to match the spec. In
particular, a "preset" now refers to the compile-time configuration
while a "cfg" or "RuntimeConfig" is the dynamic part.
A single binary can support either mainnet or minimal, but not both.
Support for other presets has been removed completely (can be readded,
in case there's need).
There's a number of outstanding tasks:
* `SECONDS_PER_SLOT` still needs fixing
* loading custom runtime configs needs redoing
* checking constants against YAML file
* yeerongpilly support
`build/nimbus_beacon_node --network=yeerongpilly --discv5:no --log-level=DEBUG`
* load fork epoch from config
* fix fork digest sent in status
* nicer error string for request failures
* fix tools
* one more
* fixup
* fixup
* fixup
* use "standard" network definition folder in local testnet
Files are loaded from their standard locations, including genesis etc,
to conform to the format used in the `eth2-networks` repo.
* fix launch scripts, allow unknown config values
* fix base config of rest test
* cleanups
* bundle mainnet config using common loader
* fix spec links and names
* only include supported preset in binary
* drop yeerongpilly, add altair-devnet-0, support boot_enr.yaml
2021-07-12 13:01:38 +00:00
|
|
|
errorMsg: toPrettyString(errorMsg.asSeq))
|
2020-05-12 22:37:07 +00:00
|
|
|
of Success:
|
|
|
|
discard
|
|
|
|
|
2020-08-10 13:18:17 +00:00
|
|
|
return await readChunkPayload(conn, peer, MsgType)
|
2020-05-12 22:35:40 +00:00
|
|
|
|
2020-05-13 21:04:09 +00:00
|
|
|
except LPStreamEOFError, LPStreamIncompleteError:
|
2020-05-12 22:37:07 +00:00
|
|
|
return neterr UnexpectedEOF
|
|
|
|
|
2020-08-10 13:18:17 +00:00
|
|
|
proc readResponse(conn: Connection, peer: Peer,
|
2020-09-14 14:50:03 +00:00
|
|
|
MsgType: type, timeout: Duration): Future[NetRes[MsgType]] {.async.} =
|
2020-05-12 22:35:40 +00:00
|
|
|
when MsgType is seq:
|
|
|
|
type E = ElemType(MsgType)
|
|
|
|
var results: MsgType
|
|
|
|
while true:
|
2020-09-10 19:40:09 +00:00
|
|
|
# Because we interleave networking with response processing, it may
|
|
|
|
# happen that reading all chunks takes longer than a strict dealine
|
|
|
|
# timeout would allow, so we allow each chunk a new timeout instead.
|
|
|
|
# The problem is exacerbated by the large number of round-trips to the
|
|
|
|
# poll loop that each future along the way causes.
|
|
|
|
trace "reading chunk", conn
|
|
|
|
let nextFut = conn.readResponseChunk(peer, E)
|
|
|
|
if not await nextFut.withTimeout(timeout):
|
|
|
|
return neterr(ReadResponseTimeout)
|
|
|
|
let nextRes = nextFut.read()
|
2020-05-12 22:37:07 +00:00
|
|
|
if nextRes.isErr:
|
|
|
|
if nextRes.error.kind == PotentiallyExpectedEOF:
|
2020-09-10 19:40:09 +00:00
|
|
|
trace "EOF chunk", conn, err = nextRes.error
|
|
|
|
|
2020-05-12 22:37:07 +00:00
|
|
|
return ok results
|
2020-09-10 19:40:09 +00:00
|
|
|
trace "Error chunk", conn, err = nextRes.error
|
|
|
|
|
2020-05-12 22:37:07 +00:00
|
|
|
return err nextRes.error
|
|
|
|
else:
|
2020-09-10 19:40:09 +00:00
|
|
|
trace "Got chunk", conn
|
2020-05-12 22:37:07 +00:00
|
|
|
results.add nextRes.value
|
2020-05-12 22:35:40 +00:00
|
|
|
else:
|
2020-09-10 19:40:09 +00:00
|
|
|
let nextFut = conn.readResponseChunk(peer, MsgType)
|
|
|
|
if not await nextFut.withTimeout(timeout):
|
|
|
|
return neterr(ReadResponseTimeout)
|
|
|
|
return nextFut.read()
|