use MAX_CHUNK_SIZE_BELLATRIX for signed Bellatrix blocks (#3613)
* use MAX_CHUNK_SIZE_BELLATRIX for signed Bellatrix blocks * Update beacon_chain/networking/eth2_network.nim Co-authored-by: Etan Kissling <etan@status.im> * localPassC to localPassc * check against maxChunkSize rather than constant Co-authored-by: Etan Kissling <etan@status.im>
This commit is contained in:
parent
f016e1abbd
commit
4a372410a4
|
@ -614,6 +614,12 @@ when useNativeSnappy:
|
||||||
else:
|
else:
|
||||||
include libp2p_streams_backend
|
include libp2p_streams_backend
|
||||||
|
|
||||||
|
func maxChunkSize(t: typedesc[bellatrix.SignedBeaconBlock]): uint32 =
|
||||||
|
MAX_CHUNK_SIZE_BELLATRIX
|
||||||
|
|
||||||
|
func maxChunkSize(t: typedesc): uint32 =
|
||||||
|
MAX_CHUNK_SIZE
|
||||||
|
|
||||||
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
||||||
ResponseMsg: type,
|
ResponseMsg: type,
|
||||||
timeout: Duration): Future[NetRes[ResponseMsg]]
|
timeout: Duration): Future[NetRes[ResponseMsg]]
|
||||||
|
@ -635,7 +641,7 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
||||||
# Read the response
|
# Read the response
|
||||||
return
|
return
|
||||||
await readResponse(when useNativeSnappy: libp2pInput(stream) else: stream,
|
await readResponse(when useNativeSnappy: libp2pInput(stream) else: stream,
|
||||||
peer, ResponseMsg, timeout)
|
peer, maxChunkSize(ResponseMsg), ResponseMsg, timeout)
|
||||||
finally:
|
finally:
|
||||||
await stream.closeWithEOF()
|
await stream.closeWithEOF()
|
||||||
|
|
||||||
|
@ -787,8 +793,10 @@ proc handleIncomingStream(network: Eth2Node,
|
||||||
NetRes[MsgRec].ok default(MsgRec)
|
NetRes[MsgRec].ok default(MsgRec)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
awaitWithTimeout(readChunkPayload(s, peer, MsgRec), deadline):
|
awaitWithTimeout(
|
||||||
returnInvalidRequest(errorMsgLit "Request full data not sent in time")
|
readChunkPayload(s, peer, maxChunkSize(MsgRec), MsgRec), deadline):
|
||||||
|
returnInvalidRequest(
|
||||||
|
errorMsgLit "Request full data not sent in time")
|
||||||
|
|
||||||
except SerializationError as err:
|
except SerializationError as err:
|
||||||
returnInvalidRequest err.formatMsg("msg")
|
returnInvalidRequest err.formatMsg("msg")
|
||||||
|
@ -1963,7 +1971,7 @@ template gossipMaxSize(T: untyped): uint32 =
|
||||||
T is altair.SignedBeaconBlock:
|
T is altair.SignedBeaconBlock:
|
||||||
GOSSIP_MAX_SIZE
|
GOSSIP_MAX_SIZE
|
||||||
else:
|
else:
|
||||||
{.fatal: "unknown type".}
|
{.fatal: "unknown type " & name(T).}
|
||||||
static: doAssert maxSize <= maxGossipMaxSize()
|
static: doAssert maxSize <= maxGossipMaxSize()
|
||||||
maxSize.uint32
|
maxSize.uint32
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,8 @@ proc readSszValue(s: AsyncInputStream,
|
||||||
proc readChunkPayload(s: AsyncInputStream,
|
proc readChunkPayload(s: AsyncInputStream,
|
||||||
noSnappy: bool,
|
noSnappy: bool,
|
||||||
MsgType: type): Future[NetRes[MsgType]] {.async.} =
|
MsgType: type): Future[NetRes[MsgType]] {.async.} =
|
||||||
|
# TODO this needs to sometimes be MAX_CHUNK_SIZE_BELLATRIX, for at least
|
||||||
|
# bellatrix.SignedBeaconBlock
|
||||||
let prefix = await readSizePrefix(s, MAX_CHUNK_SIZE)
|
let prefix = await readSizePrefix(s, MAX_CHUNK_SIZE)
|
||||||
let size = if prefix.isOk: prefix.value.int
|
let size = if prefix.isOk: prefix.value.int
|
||||||
else: return err(prefix.error)
|
else: return err(prefix.error)
|
||||||
|
|
|
@ -99,6 +99,7 @@ proc uncompressFramedStream*(conn: Connection,
|
||||||
return ok output
|
return ok output
|
||||||
|
|
||||||
proc readChunkPayload*(conn: Connection, peer: Peer,
|
proc readChunkPayload*(conn: Connection, peer: Peer,
|
||||||
|
maxChunkSize: uint32,
|
||||||
MsgType: type): Future[NetRes[MsgType]] {.async.} =
|
MsgType: type): Future[NetRes[MsgType]] {.async.} =
|
||||||
let sm = now(chronos.Moment)
|
let sm = now(chronos.Moment)
|
||||||
let size =
|
let size =
|
||||||
|
@ -112,7 +113,7 @@ proc readChunkPayload*(conn: Connection, peer: Peer,
|
||||||
except InvalidVarintError:
|
except InvalidVarintError:
|
||||||
return neterr UnexpectedEOF
|
return neterr UnexpectedEOF
|
||||||
|
|
||||||
if size > MAX_CHUNK_SIZE:
|
if size > maxChunkSize:
|
||||||
return neterr SizePrefixOverflow
|
return neterr SizePrefixOverflow
|
||||||
if size == 0:
|
if size == 0:
|
||||||
return neterr ZeroSizePrefix
|
return neterr ZeroSizePrefix
|
||||||
|
@ -129,7 +130,7 @@ proc readChunkPayload*(conn: Connection, peer: Peer,
|
||||||
debug "Snappy decompression/read failed", msg = $data.error, conn
|
debug "Snappy decompression/read failed", msg = $data.error, conn
|
||||||
return neterr InvalidSnappyBytes
|
return neterr InvalidSnappyBytes
|
||||||
|
|
||||||
proc readResponseChunk(conn: Connection, peer: Peer,
|
proc readResponseChunk(conn: Connection, peer: Peer, maxChunkSize: uint32,
|
||||||
MsgType: typedesc): Future[NetRes[MsgType]] {.async.} =
|
MsgType: typedesc): Future[NetRes[MsgType]] {.async.} =
|
||||||
mixin readChunkPayload
|
mixin readChunkPayload
|
||||||
|
|
||||||
|
@ -148,7 +149,8 @@ proc readResponseChunk(conn: Connection, peer: Peer,
|
||||||
case responseCode:
|
case responseCode:
|
||||||
of InvalidRequest, ServerError, ResourceUnavailable:
|
of InvalidRequest, ServerError, ResourceUnavailable:
|
||||||
let
|
let
|
||||||
errorMsgChunk = await readChunkPayload(conn, peer, ErrorMsg)
|
errorMsgChunk = await readChunkPayload(
|
||||||
|
conn, peer, maxChunkSize, ErrorMsg)
|
||||||
errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
|
errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
|
||||||
else: return err(errorMsgChunk.error)
|
else: return err(errorMsgChunk.error)
|
||||||
errorMsgStr = toPrettyString(errorMsg.asSeq)
|
errorMsgStr = toPrettyString(errorMsg.asSeq)
|
||||||
|
@ -159,12 +161,12 @@ proc readResponseChunk(conn: Connection, peer: Peer,
|
||||||
of Success:
|
of Success:
|
||||||
discard
|
discard
|
||||||
|
|
||||||
return await readChunkPayload(conn, peer, MsgType)
|
return await readChunkPayload(conn, peer, maxChunkSize, MsgType)
|
||||||
|
|
||||||
except LPStreamEOFError, LPStreamIncompleteError:
|
except LPStreamEOFError, LPStreamIncompleteError:
|
||||||
return neterr UnexpectedEOF
|
return neterr UnexpectedEOF
|
||||||
|
|
||||||
proc readResponse(conn: Connection, peer: Peer,
|
proc readResponse(conn: Connection, peer: Peer, maxChunkSize: uint32,
|
||||||
MsgType: type, timeout: Duration): Future[NetRes[MsgType]] {.async.} =
|
MsgType: type, timeout: Duration): Future[NetRes[MsgType]] {.async.} =
|
||||||
when MsgType is seq:
|
when MsgType is seq:
|
||||||
type E = ElemType(MsgType)
|
type E = ElemType(MsgType)
|
||||||
|
@ -176,7 +178,7 @@ proc readResponse(conn: Connection, peer: Peer,
|
||||||
# The problem is exacerbated by the large number of round-trips to the
|
# The problem is exacerbated by the large number of round-trips to the
|
||||||
# poll loop that each future along the way causes.
|
# poll loop that each future along the way causes.
|
||||||
trace "reading chunk", conn
|
trace "reading chunk", conn
|
||||||
let nextFut = conn.readResponseChunk(peer, E)
|
let nextFut = conn.readResponseChunk(peer, maxChunkSize, E)
|
||||||
if not await nextFut.withTimeout(timeout):
|
if not await nextFut.withTimeout(timeout):
|
||||||
return neterr(ReadResponseTimeout)
|
return neterr(ReadResponseTimeout)
|
||||||
let nextRes = nextFut.read()
|
let nextRes = nextFut.read()
|
||||||
|
@ -192,7 +194,7 @@ proc readResponse(conn: Connection, peer: Peer,
|
||||||
trace "Got chunk", conn
|
trace "Got chunk", conn
|
||||||
results.add nextRes.value
|
results.add nextRes.value
|
||||||
else:
|
else:
|
||||||
let nextFut = conn.readResponseChunk(peer, MsgType)
|
let nextFut = conn.readResponseChunk(peer, maxChunkSize, MsgType)
|
||||||
if not await nextFut.withTimeout(timeout):
|
if not await nextFut.withTimeout(timeout):
|
||||||
return neterr(ReadResponseTimeout)
|
return neterr(ReadResponseTimeout)
|
||||||
return nextFut.read()
|
return nextFut.read()
|
||||||
|
|
|
@ -29,7 +29,7 @@ import nimcrypto/utils as ncrutils
|
||||||
export
|
export
|
||||||
results, burnMem, writeValue, readValue
|
results, burnMem, writeValue, readValue
|
||||||
|
|
||||||
{.localPassC: "-fno-lto".} # no LTO for crypto
|
{.localPassc: "-fno-lto".} # no LTO for crypto
|
||||||
|
|
||||||
type
|
type
|
||||||
KeystoreMode* = enum
|
KeystoreMode* = enum
|
||||||
|
|
|
@ -70,7 +70,8 @@ type
|
||||||
BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS]
|
BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS]
|
||||||
|
|
||||||
proc readChunkPayload*(
|
proc readChunkPayload*(
|
||||||
conn: Connection, peer: Peer, MsgType: type (ref ForkedSignedBeaconBlock)):
|
conn: Connection, peer: Peer, maxChunkSize: uint32,
|
||||||
|
MsgType: type (ref ForkedSignedBeaconBlock)):
|
||||||
Future[NetRes[MsgType]] {.async.} =
|
Future[NetRes[MsgType]] {.async.} =
|
||||||
var contextBytes: ForkDigest
|
var contextBytes: ForkDigest
|
||||||
try:
|
try:
|
||||||
|
@ -78,20 +79,25 @@ proc readChunkPayload*(
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
return neterr UnexpectedEOF
|
return neterr UnexpectedEOF
|
||||||
|
|
||||||
|
# Ignores maxChunkSize; needs to be consistent formal parameters,
|
||||||
|
# but this function is where that's determined.
|
||||||
if contextBytes == peer.network.forkDigests.phase0:
|
if contextBytes == peer.network.forkDigests.phase0:
|
||||||
let res = await readChunkPayload(conn, peer, phase0.SignedBeaconBlock)
|
let res = await readChunkPayload(
|
||||||
|
conn, peer, MAX_CHUNK_SIZE, phase0.SignedBeaconBlock)
|
||||||
if res.isOk:
|
if res.isOk:
|
||||||
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
|
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
|
||||||
else:
|
else:
|
||||||
return err(res.error)
|
return err(res.error)
|
||||||
elif contextBytes == peer.network.forkDigests.altair:
|
elif contextBytes == peer.network.forkDigests.altair:
|
||||||
let res = await readChunkPayload(conn, peer, altair.SignedBeaconBlock)
|
let res = await readChunkPayload(
|
||||||
|
conn, peer, MAX_CHUNK_SIZE, altair.SignedBeaconBlock)
|
||||||
if res.isOk:
|
if res.isOk:
|
||||||
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
|
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
|
||||||
else:
|
else:
|
||||||
return err(res.error)
|
return err(res.error)
|
||||||
elif contextBytes == peer.network.forkDigests.bellatrix:
|
elif contextBytes == peer.network.forkDigests.bellatrix:
|
||||||
let res = await readChunkPayload(conn, peer, bellatrix.SignedBeaconBlock)
|
let res = await readChunkPayload(
|
||||||
|
conn, peer, MAX_CHUNK_SIZE_BELLATRIX, bellatrix.SignedBeaconBlock)
|
||||||
if res.isOk:
|
if res.isOk:
|
||||||
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
|
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -26,7 +26,7 @@ export
|
||||||
when defined(windows):
|
when defined(windows):
|
||||||
import stew/[windows/acl]
|
import stew/[windows/acl]
|
||||||
|
|
||||||
{.localPassC: "-fno-lto".} # no LTO for crypto
|
{.localPassc: "-fno-lto".} # no LTO for crypto
|
||||||
|
|
||||||
const
|
const
|
||||||
KeystoreFileName* = "keystore.json"
|
KeystoreFileName* = "keystore.json"
|
||||||
|
|
|
@ -181,7 +181,7 @@ switch("warning", "LockLevel:off")
|
||||||
# ############################################################
|
# ############################################################
|
||||||
|
|
||||||
# This applies per-file compiler flags to C files
|
# This applies per-file compiler flags to C files
|
||||||
# which do not support {.localPassC: "-fno-lto".}
|
# which do not support {.localPassc: "-fno-lto".}
|
||||||
# Unfortunately this is filename based instead of path-based
|
# Unfortunately this is filename based instead of path-based
|
||||||
# Assumes GCC
|
# Assumes GCC
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue