Tighten chunk decoding limits (#4264)

* cap maximum number of chunks to download from peer (fixes #1620)
* drop support for requesting blocks via v1 / phase0 protocol
* tighten bounds checking of fixed-size messages
This commit is contained in:
Jacek Sieka 2022-10-27 18:51:43 +02:00 committed by GitHub
parent 28fc70de6d
commit 63a3f2b1ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 94 deletions

View File

@ -131,9 +131,9 @@ type
## Protocol requests using this type will produce request-making
## client-side procs that return `NetRes[MsgType]`
MultipleChunksResponse*[MsgType] = distinct UntypedResponse
MultipleChunksResponse*[MsgType; maxLen: static Limit] = distinct UntypedResponse
## Protocol requests using this type will produce request-making
## client-side procs that return `NetRes[seq[MsgType]]`.
## client-side procs that return `NetRes[List[MsgType, maxLen]]`.
## In the future, such procs will return an `InputStream[NetRes[MsgType]]`.
MessageInfo* = object
@ -200,6 +200,7 @@ type
ZeroSizePrefix
SizePrefixOverflow
InvalidContextBytes
ResponseChunkOverflow
Eth2NetworkingError = object
case kind*: Eth2NetworkingErrorKind
@ -760,8 +761,43 @@ proc uncompressFramedStream(conn: Connection,
return ok output
func chunkMaxSize[T](): uint32 =
# compiler error on (T: type) syntax...
when T is ForkySignedBeaconBlock:
when T is phase0.SignedBeaconBlock or T is altair.SignedBeaconBlock:
MAX_CHUNK_SIZE
elif T is bellatrix.SignedBeaconBlock:
MAX_CHUNK_SIZE_BELLATRIX
else:
{.fatal: "what's the chunk size here?".}
elif isFixedSize(T):
uint32 fixedPortionSize(T)
else:
MAX_CHUNK_SIZE
func maxGossipMaxSize(): auto {.compileTime.} =
max(GOSSIP_MAX_SIZE, GOSSIP_MAX_SIZE_BELLATRIX)
template gossipMaxSize(T: untyped): uint32 =
const maxSize = static:
when isFixedSize(T):
fixedPortionSize(T)
elif T is bellatrix.SignedBeaconBlock:
GOSSIP_MAX_SIZE_BELLATRIX
# TODO https://github.com/status-im/nim-ssz-serialization/issues/20 for
# Attestation, AttesterSlashing, and SignedAggregateAndProof, which all
# have lists bounded at MAX_VALIDATORS_PER_COMMITTEE (2048) items, thus
# having max sizes significantly smaller than GOSSIP_MAX_SIZE.
elif T is Attestation or T is AttesterSlashing or
T is SignedAggregateAndProof or T is phase0.SignedBeaconBlock or
T is altair.SignedBeaconBlock:
GOSSIP_MAX_SIZE
else:
{.fatal: "unknown type " & name(T).}
static: doAssert maxSize <= maxGossipMaxSize()
maxSize.uint32
proc readChunkPayload*(conn: Connection, peer: Peer,
maxChunkSize: uint32,
MsgType: type): Future[NetRes[MsgType]] {.async.} =
let sm = now(chronos.Moment)
let size =
@ -775,7 +811,8 @@ proc readChunkPayload*(conn: Connection, peer: Peer,
except InvalidVarintError:
return neterr UnexpectedEOF
if size > maxChunkSize:
const maxSize = chunkMaxSize[MsgType]()
if size > maxSize:
return neterr SizePrefixOverflow
if size == 0:
return neterr ZeroSizePrefix
@ -792,8 +829,9 @@ proc readChunkPayload*(conn: Connection, peer: Peer,
debug "Snappy decompression/read failed", msg = $data.error, conn
return neterr InvalidSnappyBytes
proc readResponseChunk(conn: Connection, peer: Peer, maxChunkSize: uint32,
MsgType: typedesc): Future[NetRes[MsgType]] {.async.} =
proc readResponseChunk(
conn: Connection, peer: Peer, MsgType: typedesc):
Future[NetRes[MsgType]] {.async.} =
mixin readChunkPayload
try:
@ -811,8 +849,7 @@ proc readResponseChunk(conn: Connection, peer: Peer, maxChunkSize: uint32,
case responseCode:
of InvalidRequest, ServerError, ResourceUnavailable:
let
errorMsgChunk = await readChunkPayload(
conn, peer, maxChunkSize, ErrorMsg)
errorMsgChunk = await readChunkPayload(conn, peer, ErrorMsg)
errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
else: return err(errorMsgChunk.error)
errorMsgStr = toPrettyString(errorMsg.asSeq)
@ -823,15 +860,15 @@ proc readResponseChunk(conn: Connection, peer: Peer, maxChunkSize: uint32,
of Success:
discard
return await readChunkPayload(conn, peer, maxChunkSize, MsgType)
return await readChunkPayload(conn, peer, MsgType)
except LPStreamEOFError, LPStreamIncompleteError:
return neterr UnexpectedEOF
proc readResponse(conn: Connection, peer: Peer, maxChunkSize: uint32,
proc readResponse(conn: Connection, peer: Peer,
MsgType: type, timeout: Duration): Future[NetRes[MsgType]] {.async.} =
when MsgType is seq:
type E = ElemType(MsgType)
when MsgType is List:
type E = MsgType.T
var results: MsgType
while true:
# Because we interleave networking with response processing, it may
@ -840,7 +877,7 @@ proc readResponse(conn: Connection, peer: Peer, maxChunkSize: uint32,
# The problem is exacerbated by the large number of round-trips to the
# poll loop that each future along the way causes.
trace "reading chunk", conn
let nextFut = conn.readResponseChunk(peer, maxChunkSize, E)
let nextFut = conn.readResponseChunk(peer, E)
if not await nextFut.withTimeout(timeout):
return neterr(ReadResponseTimeout)
let nextRes = nextFut.read()
@ -854,19 +891,14 @@ proc readResponse(conn: Connection, peer: Peer, maxChunkSize: uint32,
return err nextRes.error
else:
trace "Got chunk", conn
results.add nextRes.value
if not results.add nextRes.value:
return neterr(ResponseChunkOverflow)
else:
let nextFut = conn.readResponseChunk(peer, maxChunkSize, MsgType)
let nextFut = conn.readResponseChunk(peer, MsgType)
if not await nextFut.withTimeout(timeout):
return neterr(ReadResponseTimeout)
return nextFut.read()
func maxChunkSize*(t: typedesc[bellatrix.SignedBeaconBlock]): uint32 =
MAX_CHUNK_SIZE_BELLATRIX
func maxChunkSize*(t: typedesc): uint32 =
MAX_CHUNK_SIZE
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout: Duration): Future[NetRes[ResponseMsg]]
@ -886,27 +918,25 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
await stream.close()
# Read the response
return await readResponse(
stream, peer, maxChunkSize(ResponseMsg), ResponseMsg, timeout)
return await readResponse(stream, peer, ResponseMsg, timeout)
finally:
await stream.closeWithEOF()
proc init*[MsgType](T: type MultipleChunksResponse[MsgType],
peer: Peer, conn: Connection): T =
proc init*(T: type MultipleChunksResponse, peer: Peer, conn: Connection): T =
T(UntypedResponse(peer: peer, stream: conn))
proc init*[MsgType](T: type SingleChunkResponse[MsgType],
peer: Peer, conn: Connection): T =
T(UntypedResponse(peer: peer, stream: conn))
template write*[M](
r: MultipleChunksResponse[M], val: M,
template write*[M; maxLen: static Limit](
r: MultipleChunksResponse[M, maxLen], val: M,
contextBytes: openArray[byte] = []): untyped =
mixin sendResponseChunk
sendResponseChunk(UntypedResponse(r), val, contextBytes)
template writeBytesSZ*[M](
r: MultipleChunksResponse[M], uncompressedLen: uint64,
template writeBytesSZ*(
r: MultipleChunksResponse, uncompressedLen: uint64,
bytes: openArray[byte], contextBytes: openArray[byte]): untyped =
sendResponseChunkBytesSZ(UntypedResponse(r), uncompressedLen, bytes, contextBytes)
@ -1034,7 +1064,7 @@ proc handleIncomingStream(network: Eth2Node,
else:
try:
awaitWithTimeout(
readChunkPayload(conn, peer, maxChunkSize(MsgRec), MsgRec), deadline):
readChunkPayload(conn, peer, MsgRec), deadline):
# Timeout, e.g., cancellation due to fulfillment by different peer.
# Treat this similarly to `UnexpectedEOF`, `PotentiallyExpectedEOF`.
await sendErrorResponse(
@ -1078,6 +1108,9 @@ proc handleIncomingStream(network: Eth2Node,
of BrokenConnection:
return
of ResponseChunkOverflow:
(InvalidRequest, errorMsgLit "Too many chunks in response")
await sendErrorResponse(peer, conn, responseCode, errMsg)
return
@ -1911,7 +1944,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
let isChunkStream = eqIdent(OutputParamType[0], "MultipleChunksResponse")
msg.response.recName = if isChunkStream:
newTree(nnkBracketExpr, ident"seq", OutputParamType[1])
newTree(nnkBracketExpr, ident"List", OutputParamType[1], OutputParamType[2])
else:
OutputParamType[1]
@ -2166,27 +2199,6 @@ proc newBeaconSwitch(config: BeaconNodeConf | LightClientConf,
.withTcpTransport({ServerFlags.ReuseAddr})
.build()
func maxGossipMaxSize(): auto {.compileTime.} =
max(GOSSIP_MAX_SIZE, GOSSIP_MAX_SIZE_BELLATRIX)
template gossipMaxSize(T: untyped): uint32 =
const maxSize = static:
when isFixedSize(T):
fixedPortionSize(T)
elif T is bellatrix.SignedBeaconBlock:
GOSSIP_MAX_SIZE_BELLATRIX
# TODO https://github.com/status-im/nim-ssz-serialization/issues/20 for
# Attestation, AttesterSlashing, and SignedAggregateAndProof, which all
# have lists bounded at MAX_VALIDATORS_PER_COMMITTEE (2048) items, thus
# having max sizes significantly smaller than GOSSIP_MAX_SIZE.
elif T is Attestation or T is AttesterSlashing or
T is SignedAggregateAndProof or T is phase0.SignedBeaconBlock or
T is altair.SignedBeaconBlock:
GOSSIP_MAX_SIZE
else:
{.fatal: "unknown type " & name(T).}
static: doAssert maxSize <= maxGossipMaxSize()
maxSize.uint32
proc createEth2Node*(rng: ref HmacDrbgContext,
config: BeaconNodeConf | LightClientConf,

View File

@ -126,7 +126,8 @@ proc doRequest(
peer.lightClientBootstrap(blockRoot)
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange
type LightClientUpdatesByRangeResponse = NetRes[seq[altair.LightClientUpdate]]
type LightClientUpdatesByRangeResponse =
NetRes[List[altair.LightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES]]
proc doRequest(
e: typedesc[UpdatesByRange],
peer: Peer,
@ -198,7 +199,7 @@ template valueVerifier[E](
iterator values(v: auto): auto =
## Local helper for `workerTask` to share the same implementation for both
## scalar and aggregate values, by treating scalars as 1-length aggregates.
when v is seq:
when v is List:
for i in v:
yield i
else:
@ -221,7 +222,7 @@ proc workerTask[E](
await E.doRequest(peer, key)
if value.isOk:
var applyReward = false
for val in value.get.values:
for val in value.get().values:
let res = await self.valueVerifier(E)(val)
if res.isErr:
case res.error

View File

@ -77,16 +77,11 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
debug "Requesting blocks by root", peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()
let blocks = if peer.useSyncV2():
await beaconBlocksByRoot_v2(peer, BlockRootsList items)
else:
(await beaconBlocksByRoot(peer, BlockRootsList items)).map(
proc(blcks: seq[phase0.SignedBeaconBlock]): auto =
blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it))))
let blocks = (await beaconBlocksByRoot_v2(peer, BlockRootsList items))
if blocks.isOk:
let ublocks = blocks.get()
if checkResponse(items, ublocks):
if checkResponse(items, ublocks.asSeq()):
var
gotGoodBlock = false
gotUnviableBlock = false

View File

@ -78,7 +78,7 @@ type
slots*: uint64
SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[ref ForkedSignedBeaconBlock]]
BeaconBlocksRes* = NetRes[List[ref ForkedSignedBeaconBlock, MAX_REQUEST_BLOCKS]]
proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} =
SyncMoment(stamp: now(chronos.Moment), slots: slots)
@ -168,13 +168,7 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
doAssert(not(req.isEmpty()), "Request must not be empty!")
debug "Requesting blocks from peer", request = req
try:
let res =
if peer.useSyncV2():
await beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64)
else:
(await beaconBlocksByRange(peer, req.slot, req.count, 1'u64)).map(
proc(blcks: seq[phase0.SignedBeaconBlock]): auto =
blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it))))
let res = await beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64)
if res.isErr():
debug "Error, while reading getBlocks response", request = req,
@ -336,7 +330,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
try:
let blocks = await man.getBlocks(peer, req)
if blocks.isOk():
let data = blocks.get()
let data = blocks.get().asSeq()
let smap = getShortMap(req, data)
debug "Received blocks on request", blocks_count = len(data),
blocks_map = smap, request = req

View File

@ -25,7 +25,7 @@ logScope:
topics = "sync"
const
MAX_REQUEST_BLOCKS = 1024
MAX_REQUEST_BLOCKS* = 1024
blockByRootLookupCost = allowedOpsPerSecondCost(50)
blockResponseCost = allowedOpsPerSecondCost(100)
blockByRangeLookupCost = allowedOpsPerSecondCost(20)
@ -79,11 +79,10 @@ type
template readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type ForkySignedBeaconBlock):
Future[NetRes[MsgType]] =
readChunkPayload(conn, peer, maxChunkSize(MsgType), MsgType)
readChunkPayload(conn, peer, MsgType)
proc readChunkPayload*(
conn: Connection, peer: Peer, maxChunkSize: uint32,
MsgType: type (ref ForkedSignedBeaconBlock)):
conn: Connection, peer: Peer, MsgType: type (ref ForkedSignedBeaconBlock)):
Future[NetRes[MsgType]] {.async.} =
var contextBytes: ForkDigest
try:
@ -91,8 +90,6 @@ proc readChunkPayload*(
except CatchableError:
return neterr UnexpectedEOF
# Ignores maxChunkSize; needs to be consistent formal parameters,
# but this function is where that's determined.
if contextBytes == peer.network.forkDigests.phase0:
let res = await readChunkPayload(conn, peer, phase0.SignedBeaconBlock)
if res.isOk:
@ -115,8 +112,7 @@ proc readChunkPayload*(
return neterr InvalidContextBytes
proc readChunkPayload*(
conn: Connection, peer: Peer, maxChunkSize: uint32,
MsgType: type SomeLightClientObject):
conn: Connection, peer: Peer, MsgType: type SomeLightClientObject):
Future[NetRes[MsgType]] {.async.} =
var contextBytes: ForkDigest
try:
@ -129,7 +125,7 @@ proc readChunkPayload*(
let res =
if stateFork >= BeaconStateFork.Altair:
await eth2_network.readChunkPayload(conn, peer, maxChunkSize, MsgType)
await eth2_network.readChunkPayload(conn, peer, MsgType)
else:
doAssert stateFork == BeaconStateFork.Phase0
return neterr InvalidContextBytes
@ -272,7 +268,8 @@ p2pProtocol BeaconSync(version = 1,
startSlot: Slot,
reqCount: uint64,
reqStep: uint64,
response: MultipleChunksResponse[phase0.SignedBeaconBlock])
response: MultipleChunksResponse[
phase0.SignedBeaconBlock, MAX_REQUEST_BLOCKS])
{.async, libp2pProtocol("beacon_blocks_by_range", 1).} =
# TODO Semantically, this request should return a non-ref, but doing so
# runs into extreme inefficiency due to the compiler introducing
@ -351,7 +348,8 @@ p2pProtocol BeaconSync(version = 1,
# Please note that the SSZ list here ensures that the
# spec constant MAX_REQUEST_BLOCKS is enforced:
blockRoots: BlockRootsList,
response: MultipleChunksResponse[phase0.SignedBeaconBlock])
response: MultipleChunksResponse[
phase0.SignedBeaconBlock, MAX_REQUEST_BLOCKS])
{.async, libp2pProtocol("beacon_blocks_by_root", 1).} =
# TODO Semantically, this request should return a non-ref, but doing so
# runs into extreme inefficiency due to the compiler introducing
@ -416,7 +414,8 @@ p2pProtocol BeaconSync(version = 1,
startSlot: Slot,
reqCount: uint64,
reqStep: uint64,
response: MultipleChunksResponse[ref ForkedSignedBeaconBlock])
response: MultipleChunksResponse[
ref ForkedSignedBeaconBlock, MAX_REQUEST_BLOCKS])
{.async, libp2pProtocol("beacon_blocks_by_range", 2).} =
# TODO Semantically, this request should return a non-ref, but doing so
# runs into extreme inefficiency due to the compiler introducing
@ -486,7 +485,8 @@ p2pProtocol BeaconSync(version = 1,
# Please note that the SSZ list here ensures that the
# spec constant MAX_REQUEST_BLOCKS is enforced:
blockRoots: BlockRootsList,
response: MultipleChunksResponse[ref ForkedSignedBeaconBlock])
response: MultipleChunksResponse[
ref ForkedSignedBeaconBlock, MAX_REQUEST_BLOCKS])
{.async, libp2pProtocol("beacon_blocks_by_root", 2).} =
# TODO Semantically, this request should return a non-ref, but doing so
# runs into extreme inefficiency due to the compiler introducing
@ -577,7 +577,8 @@ p2pProtocol BeaconSync(version = 1,
peer: Peer,
startPeriod: SyncCommitteePeriod,
reqCount: uint64,
response: MultipleChunksResponse[altair.LightClientUpdate])
response: MultipleChunksResponse[
altair.LightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES])
{.async, libp2pProtocol("light_client_updates_by_range", 1,
isLightClientRequest = true).} =
trace "Received LC updates by range request", peer, startPeriod, reqCount
@ -671,15 +672,6 @@ p2pProtocol BeaconSync(version = 1,
{.async, libp2pProtocol("goodbye", 1, isRequired = true).} =
debug "Received Goodbye message", reason = disconnectReasonName(reason), peer
proc useSyncV2*(state: BeaconSyncNetworkState): bool =
let
wallTimeSlot = state.getBeaconTime().slotOrZero
wallTimeSlot.epoch >= state.cfg.ALTAIR_FORK_EPOCH
proc useSyncV2*(peer: Peer): bool =
peer.networkState(BeaconSync).useSyncV2()
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) =
debug "Peer status", peer, statusMsg
peer.state(BeaconSync).statusMsg = statusMsg