Read messages before applying quota to avoid mplex backpressure issues (#4697)
* Apply global quota after reading messages * Also wait quota for failed requests * Better integration * comments
This commit is contained in:
parent
8db87a0cfc
commit
46a12639b8
|
@ -1105,19 +1105,6 @@ proc handleIncomingStream(network: Eth2Node,
|
||||||
|
|
||||||
nbc_reqresp_messages_received.inc(1, [shortProtocolId(protocolId)])
|
nbc_reqresp_messages_received.inc(1, [shortProtocolId(protocolId)])
|
||||||
|
|
||||||
# The request quota is shared between all requests - it represents the
|
|
||||||
# cost to perform a service on behalf of a client and is incurred
|
|
||||||
# regardless if the request succeeds or fails - we don't count waiting
|
|
||||||
# for this quota against timeouts so as not to prematurely disconnect
|
|
||||||
# clients that are on the edge - nonetheless, the client will count it.
|
|
||||||
#
|
|
||||||
# When a client exceeds their quota, they will be slowed down without
|
|
||||||
# notification - as long as they don't make parallel requests (which is
|
|
||||||
# limited by libp2p), this will naturally adapt them to the available
|
|
||||||
# quota.
|
|
||||||
|
|
||||||
awaitQuota(peer, libp2pRequestCost, shortProtocolId(protocolId))
|
|
||||||
|
|
||||||
# TODO(zah) The TTFB timeout is not implemented in LibP2P streams back-end
|
# TODO(zah) The TTFB timeout is not implemented in LibP2P streams back-end
|
||||||
let deadline = sleepAsync RESP_TIMEOUT
|
let deadline = sleepAsync RESP_TIMEOUT
|
||||||
|
|
||||||
|
@ -1131,19 +1118,20 @@ proc handleIncomingStream(network: Eth2Node,
|
||||||
else:
|
else:
|
||||||
false
|
false
|
||||||
|
|
||||||
let msg = when isEmptyMsg:
|
let msg =
|
||||||
NetRes[MsgRec].ok default(MsgRec)
|
|
||||||
else:
|
|
||||||
try:
|
try:
|
||||||
awaitWithTimeout(
|
when isEmptyMsg:
|
||||||
readChunkPayload(conn, peer, MsgRec), deadline):
|
NetRes[MsgRec].ok default(MsgRec)
|
||||||
# Timeout, e.g., cancellation due to fulfillment by different peer.
|
else:
|
||||||
# Treat this similarly to `UnexpectedEOF`, `PotentiallyExpectedEOF`.
|
awaitWithTimeout(
|
||||||
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
|
readChunkPayload(conn, peer, MsgRec), deadline):
|
||||||
await sendErrorResponse(
|
# Timeout, e.g., cancellation due to fulfillment by different peer.
|
||||||
peer, conn, InvalidRequest,
|
# Treat this similarly to `UnexpectedEOF`, `PotentiallyExpectedEOF`.
|
||||||
errorMsgLit "Request full data not sent in time")
|
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
|
||||||
return
|
await sendErrorResponse(
|
||||||
|
peer, conn, InvalidRequest,
|
||||||
|
errorMsgLit "Request full data not sent in time")
|
||||||
|
return
|
||||||
|
|
||||||
except SerializationError as err:
|
except SerializationError as err:
|
||||||
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
|
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
|
||||||
|
@ -1152,6 +1140,26 @@ proc handleIncomingStream(network: Eth2Node,
|
||||||
except SnappyError as err:
|
except SnappyError as err:
|
||||||
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
|
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
|
||||||
returnInvalidRequest err.msg
|
returnInvalidRequest err.msg
|
||||||
|
finally:
|
||||||
|
# The request quota is shared between all requests - it represents the
|
||||||
|
# cost to perform a service on behalf of a client and is incurred
|
||||||
|
# regardless if the request succeeds or fails - we don't count waiting
|
||||||
|
# for this quota against timeouts so as not to prematurely disconnect
|
||||||
|
# clients that are on the edge - nonetheless, the client will count it.
|
||||||
|
|
||||||
|
# When a client exceeds their quota, they will be slowed down without
|
||||||
|
# notification - as long as they don't make parallel requests (which is
|
||||||
|
# limited by libp2p), this will naturally adapt them to the available
|
||||||
|
# quota.
|
||||||
|
|
||||||
|
# Note that the `msg` will be stored in memory while we wait for the
|
||||||
|
# quota to be available. The amount of such messages in memory is
|
||||||
|
# bounded by the libp2p limit of parallel streams
|
||||||
|
|
||||||
|
# This quota also applies to invalid requests thanks to the use of
|
||||||
|
# `finally`.
|
||||||
|
|
||||||
|
awaitQuota(peer, libp2pRequestCost, shortProtocolId(protocolId))
|
||||||
|
|
||||||
if msg.isErr:
|
if msg.isErr:
|
||||||
if msg.error.kind in ProtocolViolations:
|
if msg.error.kind in ProtocolViolations:
|
||||||
|
|
Loading…
Reference in New Issue