Tighten libp2p request quotas (#4254)

To further tighten Nimbus against spam, this PR introduces a global
quota for block requests (shared between peers) as well as a general
per-peer request limit that applies to all libp2p requests.

* apply request quota before decoding message
* for high-bandwidth requests (blocks), apply a shared global quota
which helps manage bandwidth for high-peer setups
* add metrics
This commit is contained in:
Jacek Sieka 2022-11-02 11:46:53 +01:00 committed by GitHub
parent d839b9d07e
commit fc724b21e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 128 additions and 81 deletions

View File

@ -20,7 +20,7 @@ import
stew/shims/[macros],
faststreams/[inputs, outputs, buffers], snappy, snappy/faststreams,
json_serialization, json_serialization/std/[net, sets, options],
chronos, chronicles, metrics,
chronos, chronos/ratelimit, chronicles, metrics,
libp2p/[switch, peerinfo, multiaddress, multicodec, crypto/crypto,
crypto/secp, builders],
libp2p/protocols/pubsub/[
@ -35,9 +35,9 @@ import
"."/[eth2_discovery, libp2p_json_serialization, peer_pool, peer_scores]
export
tables, chronos, version, multiaddress, peerinfo, p2pProtocol, connection,
libp2p_json_serialization, eth2_ssz_serialization, results, eth2_discovery,
peer_pool, peer_scores
tables, chronos, ratelimit, version, multiaddress, peerinfo, p2pProtocol,
connection, libp2p_json_serialization, eth2_ssz_serialization, results,
eth2_discovery, peer_pool, peer_scores
logScope:
topics = "networking"
@ -86,6 +86,8 @@ type
cfg: RuntimeConfig
getBeaconTime: GetBeaconTimeFn
quota: TokenBucket ## Global quota mainly for high-bandwidth stuff
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
AverageThroughput* = object
@ -100,7 +102,7 @@ type
protocolStates*: seq[RootRef]
netThroughput: AverageThroughput
score*: int
requestQuota*: float
quota*: TokenBucket
lastReqTime*: Moment
connections*: int
enr*: Option[enr.Record]
@ -230,6 +232,9 @@ func toAltairMetadata(phase0: phase0.MetaData): altair.MetaData =
const
clientId* = "Nimbus beacon node " & fullVersionStr
requestPrefix = "/eth2/beacon_chain/req/"
requestSuffix = "/ssz_snappy"
ConcurrentConnections = 20
## Maximum number of active concurrent connection requests.
@ -302,6 +307,18 @@ declareHistogram nbc_resolve_time,
"Time(s) used while resolving peer information",
buckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0]
declareCounter nbc_reqresp_messages_sent,
"Number of Req/Resp messages sent", labels = ["protocol"]
declareCounter nbc_reqresp_messages_received,
"Number of Req/Resp messages received", labels = ["protocol"]
declareCounter nbc_reqresp_messages_failed,
"Number of Req/Resp messages that failed decoding", labels = ["protocol"]
declareCounter nbc_reqresp_messages_throttled,
"Number of Req/Resp messages that were throttled", labels = ["protocol"]
const
libp2p_pki_schemes {.strdefine.} = ""
@ -319,6 +336,15 @@ func shortLog*(peer: Peer): string = shortLog(peer.peerId)
chronicles.formatIt(Peer): shortLog(it)
chronicles.formatIt(PublicKey): byteutils.toHex(it.getBytes().tryGet())
func shortProtocolId(protocolId: string): string =
let
start = if protocolId.startsWith(requestPrefix): requestPrefix.len else: 0
ends = if protocolId.endsWith(requestSuffix):
protocolId.high - requestSuffix.len
else:
protocolId.high
protocolId[start..ends]
proc openStream(node: Eth2Node,
peer: Peer,
protocolId: string): Future[Connection] {.async.} =
@ -326,9 +352,7 @@ proc openStream(node: Eth2Node,
# attempts are handled via `connect` which also takes into account
# reconnection timeouts
let
protocolId = protocolId & "ssz_snappy"
conn = await dial(
node.switch, peer.peerId, protocolId)
conn = await dial(node.switch, peer.peerId, protocolId)
return conn
@ -409,27 +433,41 @@ func `<`(a, b: Peer): bool =
false
const
maxRequestQuota = 1000000.0
maxRequestQuota = 1000000
maxGlobalQuota = 2 * maxRequestQuota
## Roughly, this means we allow 2 peers to sync from us at a time
fullReplenishTime = 5.seconds
replenishRate = (maxRequestQuota / fullReplenishTime.nanoseconds.float)
proc updateRequestQuota*(peer: Peer, reqCost: float) =
template awaitQuota*(peerParam: Peer, costParam: float, protocolIdParam: string) =
let
currentTime = now(chronos.Moment)
nanosSinceLastReq = nanoseconds(currentTime - peer.lastReqTime)
replenishedQuota = peer.requestQuota + nanosSinceLastReq.float * replenishRate
peer = peerParam
cost = int(costParam)
peer.lastReqTime = currentTime
peer.requestQuota = min(replenishedQuota, maxRequestQuota) - reqCost
if not peer.quota.tryConsume(cost.int):
let protocolId = protocolIdParam
debug "Awaiting peer quota", peer, cost, protocolId
nbc_reqresp_messages_throttled.inc(1, [protocolId])
await peer.quota.consume(cost.int)
template awaitNonNegativeRequestQuota*(peer: Peer) =
let quota = peer.requestQuota
if quota < 0:
await sleepAsync(nanoseconds(int((-quota) / replenishRate)))
template awaitQuota*(networkParam: Eth2Node, costParam: float, protocolIdParam: string) =
let
network = networkParam
cost = int(costParam)
if not network.quota.tryConsume(cost.int):
let protocolId = protocolIdParam
debug "Awaiting network quota", peer, cost, protocolId
nbc_reqresp_messages_throttled.inc(1, [protocolId])
await network.quota.consume(cost.int)
func allowedOpsPerSecondCost*(n: int): float =
const replenishRate = (maxRequestQuota / fullReplenishTime.nanoseconds.float)
(replenishRate * 1000000000'f / n.float)
const
libp2pRequestCost = allowedOpsPerSecondCost(8)
## Maximum number of libp2p requests per peer per second
proc isSeen(network: Eth2Node, peerId: PeerId): bool =
## Returns ``true`` if ``peerId`` present in SeenTable and time period is not
## yet expired.
@ -493,7 +531,7 @@ proc getRequestProtoName(fn: NimNode): NimNode =
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
let protoName = $(pragma[1])
let protoVer = $(pragma[2].intVal)
return newLit("/eth2/beacon_chain/req/" & protoName & "/" & protoVer & "/")
return newLit(requestPrefix & protoName & "/" & protoVer & requestSuffix)
except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454
return newLit("")
@ -903,8 +941,7 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout: Duration): Future[NetRes[ResponseMsg]]
{.async.} =
var deadline = sleepAsync timeout
let deadline = sleepAsync timeout
let stream = awaitWithTimeout(peer.network.openStream(peer, protocolId),
deadline): return neterr StreamOpenTimeout
try:
@ -917,6 +954,8 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
# done, the other peer might never send us the response.
await stream.close()
nbc_reqresp_messages_sent.inc(1, [shortProtocolId(protocolId)])
# Read the response
return await readResponse(stream, peer, ResponseMsg, timeout)
finally:
@ -999,6 +1038,7 @@ proc implementSendProcBody(sendProc: SendProc) =
proc handleIncomingStream(network: Eth2Node,
conn: Connection,
protocolId: string,
MsgType: type) {.async.} =
mixin callUserHandler, RecType
@ -1046,6 +1086,21 @@ proc handleIncomingStream(network: Eth2Node,
template returnResourceUnavailable(msg: string) =
returnResourceUnavailable(ErrorMsg msg.toBytes)
nbc_reqresp_messages_received.inc(1, [shortProtocolId(protocolId)])
# The request quota is shared between all requests - it represents the
# cost to perform a service on behalf of a client and is incurred
# regardless if the request succeeds or fails - we don't count waiting
# for this quota against timeouts so as not to prematurely disconnect
# clients that are on the edge - nonetheless, the client will count it.
#
# When a client exceeds their quota, they will be slowed down without
# notification - as long as they don't make parallel requests (which is
# limited by libp2p), this will naturally adapt them to the available
# quota.
awaitQuota(peer, libp2pRequestCost, shortProtocolId(protocolId))
# TODO(zah) The TTFB timeout is not implemented in LibP2P streams back-end
let deadline = sleepAsync RESP_TIMEOUT
@ -1067,20 +1122,25 @@ proc handleIncomingStream(network: Eth2Node,
readChunkPayload(conn, peer, MsgRec), deadline):
# Timeout, e.g., cancellation due to fulfillment by different peer.
# Treat this similarly to `UnexpectedEOF`, `PotentiallyExpectedEOF`.
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
await sendErrorResponse(
peer, conn, InvalidRequest,
errorMsgLit "Request full data not sent in time")
return
except SerializationError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
returnInvalidRequest err.formatMsg("msg")
except SnappyError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
returnInvalidRequest err.msg
if msg.isErr:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
let (responseCode, errMsg) = case msg.error.kind
of UnexpectedEOF, PotentiallyExpectedEOF:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
(InvalidRequest, errorMsgLit "Incomplete request")
of InvalidContextBytes:
@ -1118,14 +1178,16 @@ proc handleIncomingStream(network: Eth2Node,
logReceivedMsg(peer, MsgType(msg.get))
await callUserHandler(MsgType, peer, conn, msg.get)
except InvalidInputsError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
returnInvalidRequest err.msg
except ResourceUnavailableError as err:
returnResourceUnavailable err.msg
except CatchableError as err:
await sendErrorResponse(peer, conn, ServerError,
ErrorMsg err.msg.toBytes)
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
await sendErrorResponse(peer, conn, ServerError, ErrorMsg err.msg.toBytes)
except CatchableError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
debug "Error processing an incoming request", err = err.msg, msgName
finally:
@ -1734,7 +1796,8 @@ proc new(T: type Eth2Node,
discoveryEnabled: discovery,
rng: rng,
connectTimeout: connectTimeout,
seenThreshold: seenThreshold
seenThreshold: seenThreshold,
quota: TokenBucket.new(maxGlobalQuota, fullReplenishTime)
)
newSeq node.protocolStates, allProtocols.len
@ -1853,7 +1916,8 @@ proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer =
connectionState: ConnectionState.None,
lastReqTime: now(chronos.Moment),
lastMetadataTime: now(chronos.Moment),
protocolStates: newSeq[RootRef](len(allProtocols))
protocolStates: newSeq[RootRef](len(allProtocols)),
quota: TokenBucket.new(maxRequestQuota.int, fullReplenishTime)
)
for i in 0 ..< len(allProtocols):
let proto = allProtocols[i]
@ -1965,12 +2029,11 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
proc `protocolMounterName`(`networkVar`: `Eth2Node`) =
proc snappyThunk(`streamVar`: `Connection`,
`protocolVar`: string): Future[void] {.gcsafe.} =
return handleIncomingStream(`networkVar`, `streamVar`,
return handleIncomingStream(`networkVar`, `streamVar`, `protocolVar`,
`MsgStrongRecName`)
mount `networkVar`.switch,
LPProtocol(codecs: @[`codecNameLit` & "ssz_snappy"],
handler: snappyThunk)
LPProtocol(codecs: @[`codecNameLit`], handler: snappyThunk)
##
## Implement Senders and Handshake

View File

@ -26,17 +26,15 @@ logScope:
const
MAX_REQUEST_BLOCKS* = 1024
blockByRootLookupCost = allowedOpsPerSecondCost(50)
blockResponseCost = allowedOpsPerSecondCost(100)
blockByRangeLookupCost = allowedOpsPerSecondCost(20)
blockResponseCost = allowedOpsPerSecondCost(64) # Allow syncing ~64 blocks/sec (minus request costs)
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#configuration
MAX_REQUEST_LIGHT_CLIENT_UPDATES* = 128
lightClientEmptyResponseCost = allowedOpsPerSecondCost(50)
lightClientBootstrapLookupCost = allowedOpsPerSecondCost(5)
lightClientBootstrapResponseCost = allowedOpsPerSecondCost(100)
lightClientUpdateResponseCost = allowedOpsPerSecondCost(100)
lightClientUpdateByRangeLookupCost = allowedOpsPerSecondCost(20)
lightClientBootstrapResponseCost = allowedOpsPerSecondCost(1)
## Only one bootstrap per peer should ever be needed - no need to allow more
lightClientUpdateResponseCost = allowedOpsPerSecondCost(1000)
## Updates are tiny - we can allow lots of them
lightClientFinalityUpdateResponseCost = allowedOpsPerSecondCost(100)
lightClientOptimisticUpdateResponseCost = allowedOpsPerSecondCost(100)
@ -311,9 +309,6 @@ p2pProtocol BeaconSync(version = 1,
startIndex =
dag.getBlockRange(startSlot, reqStep, blocks.toOpenArray(0, endIndex))
peer.updateRequestQuota(blockByRangeLookupCost)
peer.awaitNonNegativeRequestQuota()
var
found = 0
bytes: seq[byte]
@ -333,8 +328,9 @@ p2pProtocol BeaconSync(version = 1,
bytes = bytes.len(), blck = shortLog(blocks[i])
continue
peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()
# TODO extract from libp2pProtocol
peer.awaitQuota(blockResponseCost, "beacon_blocks_by_range/1")
peer.network.awaitQuota(blockResponseCost, "beacon_blocks_by_range/1")
await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0 bytes
@ -375,9 +371,6 @@ p2pProtocol BeaconSync(version = 1,
found = 0
bytes: seq[byte]
peer.updateRequestQuota(count.float * blockByRootLookupCost)
peer.awaitNonNegativeRequestQuota()
for i in 0..<count:
let
blockRef = dag.getBlockRef(blockRoots[i]).valueOr:
@ -400,8 +393,9 @@ p2pProtocol BeaconSync(version = 1,
bytes = bytes.len(), blck = shortLog(blockRef)
continue
peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()
# TODO extract from libp2pProtocol
peer.awaitQuota(blockResponseCost, "beacon_blocks_by_root/1")
peer.network.awaitQuota(blockResponseCost, "beacon_blocks_by_root/1")
await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0
inc found
@ -447,9 +441,6 @@ p2pProtocol BeaconSync(version = 1,
dag.getBlockRange(startSlot, reqStep,
blocks.toOpenArray(0, endIndex))
peer.updateRequestQuota(blockByRangeLookupCost)
peer.awaitNonNegativeRequestQuota()
var
found = 0
bytes: seq[byte]
@ -468,8 +459,9 @@ p2pProtocol BeaconSync(version = 1,
bytes = bytes.len(), blck = shortLog(blocks[i])
continue
peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()
# TODO extract from libp2pProtocol
peer.awaitQuota(blockResponseCost, "beacon_blocks_by_range/2")
peer.network.awaitQuota(blockResponseCost, "beacon_blocks_by_range/2")
await response.writeBytesSZ(
uncompressedLen, bytes,
@ -507,9 +499,6 @@ p2pProtocol BeaconSync(version = 1,
dag = peer.networkState.dag
count = blockRoots.len
peer.updateRequestQuota(count.float * blockByRootLookupCost)
peer.awaitNonNegativeRequestQuota()
var
found = 0
bytes: seq[byte]
@ -532,8 +521,9 @@ p2pProtocol BeaconSync(version = 1,
bytes = bytes.len(), blck = shortLog(blockRef)
continue
peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()
# TODO extract from libp2pProtocol
peer.awaitQuota(blockResponseCost, "beacon_blocks_by_root/2")
peer.network.awaitQuota(blockResponseCost, "beacon_blocks_by_root/2")
await response.writeBytesSZ(
uncompressedLen, bytes,
@ -555,21 +545,19 @@ p2pProtocol BeaconSync(version = 1,
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve
peer.updateRequestQuota(lightClientBootstrapLookupCost)
peer.awaitNonNegativeRequestQuota()
let bootstrap = dag.getLightClientBootstrap(blockRoot)
if bootstrap.isOk:
let
contextEpoch = bootstrap.get.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientBootstrapResponseCost, "light_client_bootstrap/1")
await response.send(bootstrap.get, contextBytes)
else:
peer.updateRequestQuota(lightClientEmptyResponseCost)
raise newException(ResourceUnavailableError, LCBootstrapUnavailable)
peer.updateRequestQuota(lightClientBootstrapResponseCost)
debug "LC bootstrap request done", peer, blockRoot
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange
@ -595,11 +583,6 @@ p2pProtocol BeaconSync(version = 1,
min(headPeriod + 1 - startPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
count = min(reqCount, maxSupportedCount)
onePastPeriod = startPeriod + count
if count == 0:
peer.updateRequestQuota(lightClientEmptyResponseCost)
peer.updateRequestQuota(count.float * lightClientUpdateByRangeLookupCost)
peer.awaitNonNegativeRequestQuota()
var found = 0
for period in startPeriod..<onePastPeriod:
@ -608,11 +591,13 @@ p2pProtocol BeaconSync(version = 1,
let
contextEpoch = update.get.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientUpdateResponseCost, "light_client_updates_by_range/1")
await response.write(update.get, contextBytes)
inc found
peer.updateRequestQuota(found.float * lightClientUpdateResponseCost)
debug "LC updates by range request done", peer, startPeriod, count, found
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate
@ -625,19 +610,19 @@ p2pProtocol BeaconSync(version = 1,
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve
peer.awaitNonNegativeRequestQuota()
let finality_update = dag.getLightClientFinalityUpdate()
if finality_update.isSome:
let
contextEpoch = finality_update.get.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientFinalityUpdateResponseCost, "light_client_finality_update/1")
await response.send(finality_update.get, contextBytes)
else:
peer.updateRequestQuota(lightClientEmptyResponseCost)
raise newException(ResourceUnavailableError, LCFinUpdateUnavailable)
peer.updateRequestQuota(lightClientFinalityUpdateResponseCost)
debug "LC finality update request done", peer
@ -651,20 +636,19 @@ p2pProtocol BeaconSync(version = 1,
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve
peer.awaitNonNegativeRequestQuota()
let optimistic_update = dag.getLightClientOptimisticUpdate()
if optimistic_update.isSome:
let
contextEpoch = optimistic_update.get.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientOptimisticUpdateResponseCost, "light_client_optimistic_update/1")
await response.send(optimistic_update.get, contextBytes)
else:
peer.updateRequestQuota(lightClientEmptyResponseCost)
raise newException(ResourceUnavailableError, LCOptUpdateUnavailable)
peer.updateRequestQuota(lightClientOptimisticUpdateResponseCost)
debug "LC optimistic update request done", peer
proc goodbye(peer: Peer,

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit 266e2c0ed26b455872bccb3ddbd316815a283659
Subproject commit 6525f4ce1d1a7eba146e5f1a53f6f105077ae686