diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 9b02e3c83..8dc6a9e84 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -20,7 +20,7 @@ import stew/shims/[macros], faststreams/[inputs, outputs, buffers], snappy, snappy/faststreams, json_serialization, json_serialization/std/[net, sets, options], - chronos, chronicles, metrics, + chronos, chronos/ratelimit, chronicles, metrics, libp2p/[switch, peerinfo, multiaddress, multicodec, crypto/crypto, crypto/secp, builders], libp2p/protocols/pubsub/[ @@ -35,9 +35,9 @@ import "."/[eth2_discovery, libp2p_json_serialization, peer_pool, peer_scores] export - tables, chronos, version, multiaddress, peerinfo, p2pProtocol, connection, - libp2p_json_serialization, eth2_ssz_serialization, results, eth2_discovery, - peer_pool, peer_scores + tables, chronos, ratelimit, version, multiaddress, peerinfo, p2pProtocol, + connection, libp2p_json_serialization, eth2_ssz_serialization, results, + eth2_discovery, peer_pool, peer_scores logScope: topics = "networking" @@ -86,6 +86,8 @@ type cfg: RuntimeConfig getBeaconTime: GetBeaconTimeFn + quota: TokenBucket ## Global quota mainly for high-bandwidth stuff + EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers AverageThroughput* = object @@ -100,7 +102,7 @@ type protocolStates*: seq[RootRef] netThroughput: AverageThroughput score*: int - requestQuota*: float + quota*: TokenBucket lastReqTime*: Moment connections*: int enr*: Option[enr.Record] @@ -230,6 +232,9 @@ func toAltairMetadata(phase0: phase0.MetaData): altair.MetaData = const clientId* = "Nimbus beacon node " & fullVersionStr + requestPrefix = "/eth2/beacon_chain/req/" + requestSuffix = "/ssz_snappy" + ConcurrentConnections = 20 ## Maximum number of active concurrent connection requests. @@ -302,6 +307,18 @@ declareHistogram nbc_resolve_time, "Time(s) used while resolving peer information", buckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0] +declareCounter nbc_reqresp_messages_sent, + "Number of Req/Resp messages sent", labels = ["protocol"] + +declareCounter nbc_reqresp_messages_received, + "Number of Req/Resp messages received", labels = ["protocol"] + +declareCounter nbc_reqresp_messages_failed, + "Number of Req/Resp messages that failed decoding", labels = ["protocol"] + +declareCounter nbc_reqresp_messages_throttled, + "Number of Req/Resp messages that were throttled", labels = ["protocol"] + const libp2p_pki_schemes {.strdefine.} = "" @@ -319,6 +336,15 @@ func shortLog*(peer: Peer): string = shortLog(peer.peerId) chronicles.formatIt(Peer): shortLog(it) chronicles.formatIt(PublicKey): byteutils.toHex(it.getBytes().tryGet()) +func shortProtocolId(protocolId: string): string = + let + start = if protocolId.startsWith(requestPrefix): requestPrefix.len else: 0 + ends = if protocolId.endsWith(requestSuffix): + protocolId.high - requestSuffix.len + else: + protocolId.high + protocolId[start..ends] + proc openStream(node: Eth2Node, peer: Peer, protocolId: string): Future[Connection] {.async.} = @@ -326,9 +352,7 @@ proc openStream(node: Eth2Node, # attempts are handled via `connect` which also takes into account # reconnection timeouts let - protocolId = protocolId & "ssz_snappy" - conn = await dial( - node.switch, peer.peerId, protocolId) + conn = await dial(node.switch, peer.peerId, protocolId) return conn @@ -409,27 +433,41 @@ func `<`(a, b: Peer): bool = false const - maxRequestQuota = 1000000.0 + maxRequestQuota = 1000000 + maxGlobalQuota = 2 * maxRequestQuota + ## Roughly, this means we allow 2 peers to sync from us at a time fullReplenishTime = 5.seconds - replenishRate = (maxRequestQuota / fullReplenishTime.nanoseconds.float) -proc updateRequestQuota*(peer: Peer, reqCost: float) = +template awaitQuota*(peerParam: Peer, costParam: float, protocolIdParam: string) = let - currentTime = now(chronos.Moment) - nanosSinceLastReq = nanoseconds(currentTime - peer.lastReqTime) - replenishedQuota = peer.requestQuota + nanosSinceLastReq.float * replenishRate + peer = peerParam + cost = int(costParam) - peer.lastReqTime = currentTime - peer.requestQuota = min(replenishedQuota, maxRequestQuota) - reqCost + if not peer.quota.tryConsume(cost.int): + let protocolId = protocolIdParam + debug "Awaiting peer quota", peer, cost, protocolId + nbc_reqresp_messages_throttled.inc(1, [protocolId]) + await peer.quota.consume(cost.int) -template awaitNonNegativeRequestQuota*(peer: Peer) = - let quota = peer.requestQuota - if quota < 0: - await sleepAsync(nanoseconds(int((-quota) / replenishRate))) +template awaitQuota*(networkParam: Eth2Node, costParam: float, protocolIdParam: string) = + let + network = networkParam + cost = int(costParam) + + if not network.quota.tryConsume(cost.int): + let protocolId = protocolIdParam + debug "Awaiting network quota", peer, cost, protocolId + nbc_reqresp_messages_throttled.inc(1, [protocolId]) + await network.quota.consume(cost.int) func allowedOpsPerSecondCost*(n: int): float = + const replenishRate = (maxRequestQuota / fullReplenishTime.nanoseconds.float) (replenishRate * 1000000000'f / n.float) +const + libp2pRequestCost = allowedOpsPerSecondCost(8) + ## Maximum number of libp2p requests per peer per second + proc isSeen(network: Eth2Node, peerId: PeerId): bool = ## Returns ``true`` if ``peerId`` present in SeenTable and time period is not ## yet expired. @@ -493,7 +531,7 @@ proc getRequestProtoName(fn: NimNode): NimNode = if pragma.len > 0 and $pragma[0] == "libp2pProtocol": let protoName = $(pragma[1]) let protoVer = $(pragma[2].intVal) - return newLit("/eth2/beacon_chain/req/" & protoName & "/" & protoVer & "/") + return newLit(requestPrefix & protoName & "/" & protoVer & requestSuffix) except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454 return newLit("") @@ -903,8 +941,7 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, ResponseMsg: type, timeout: Duration): Future[NetRes[ResponseMsg]] {.async.} = - var deadline = sleepAsync timeout - + let deadline = sleepAsync timeout let stream = awaitWithTimeout(peer.network.openStream(peer, protocolId), deadline): return neterr StreamOpenTimeout try: @@ -917,6 +954,8 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, # done, the other peer might never send us the response. await stream.close() + nbc_reqresp_messages_sent.inc(1, [shortProtocolId(protocolId)]) + # Read the response return await readResponse(stream, peer, ResponseMsg, timeout) finally: @@ -999,6 +1038,7 @@ proc implementSendProcBody(sendProc: SendProc) = proc handleIncomingStream(network: Eth2Node, conn: Connection, + protocolId: string, MsgType: type) {.async.} = mixin callUserHandler, RecType @@ -1046,6 +1086,21 @@ proc handleIncomingStream(network: Eth2Node, template returnResourceUnavailable(msg: string) = returnResourceUnavailable(ErrorMsg msg.toBytes) + nbc_reqresp_messages_received.inc(1, [shortProtocolId(protocolId)]) + + # The request quota is shared between all requests - it represents the + # cost to perform a service on behalf of a client and is incurred + # regardless if the request succeeds or fails - we don't count waiting + # for this quota against timeouts so as not to prematurely disconnect + # clients that are on the edge - nonetheless, the client will count it. + # + # When a client exceeds their quota, they will be slowed down without + # notification - as long as they don't make parallel requests (which is + # limited by libp2p), this will naturally adapt them to the available + # quota. + + awaitQuota(peer, libp2pRequestCost, shortProtocolId(protocolId)) + # TODO(zah) The TTFB timeout is not implemented in LibP2P streams back-end let deadline = sleepAsync RESP_TIMEOUT @@ -1067,20 +1122,25 @@ proc handleIncomingStream(network: Eth2Node, readChunkPayload(conn, peer, MsgRec), deadline): # Timeout, e.g., cancellation due to fulfillment by different peer. # Treat this similarly to `UnexpectedEOF`, `PotentiallyExpectedEOF`. + nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) await sendErrorResponse( peer, conn, InvalidRequest, errorMsgLit "Request full data not sent in time") return except SerializationError as err: + nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) returnInvalidRequest err.formatMsg("msg") except SnappyError as err: + nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) returnInvalidRequest err.msg if msg.isErr: + nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) let (responseCode, errMsg) = case msg.error.kind of UnexpectedEOF, PotentiallyExpectedEOF: + nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) (InvalidRequest, errorMsgLit "Incomplete request") of InvalidContextBytes: @@ -1118,14 +1178,16 @@ proc handleIncomingStream(network: Eth2Node, logReceivedMsg(peer, MsgType(msg.get)) await callUserHandler(MsgType, peer, conn, msg.get) except InvalidInputsError as err: + nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) returnInvalidRequest err.msg except ResourceUnavailableError as err: returnResourceUnavailable err.msg except CatchableError as err: - await sendErrorResponse(peer, conn, ServerError, - ErrorMsg err.msg.toBytes) + nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) + await sendErrorResponse(peer, conn, ServerError, ErrorMsg err.msg.toBytes) except CatchableError as err: + nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) debug "Error processing an incoming request", err = err.msg, msgName finally: @@ -1734,7 +1796,8 @@ proc new(T: type Eth2Node, discoveryEnabled: discovery, rng: rng, connectTimeout: connectTimeout, - seenThreshold: seenThreshold + seenThreshold: seenThreshold, + quota: TokenBucket.new(maxGlobalQuota, fullReplenishTime) ) newSeq node.protocolStates, allProtocols.len @@ -1853,7 +1916,8 @@ proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer = connectionState: ConnectionState.None, lastReqTime: now(chronos.Moment), lastMetadataTime: now(chronos.Moment), - protocolStates: newSeq[RootRef](len(allProtocols)) + protocolStates: newSeq[RootRef](len(allProtocols)), + quota: TokenBucket.new(maxRequestQuota.int, fullReplenishTime) ) for i in 0 ..< len(allProtocols): let proto = allProtocols[i] @@ -1965,12 +2029,11 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = proc `protocolMounterName`(`networkVar`: `Eth2Node`) = proc snappyThunk(`streamVar`: `Connection`, `protocolVar`: string): Future[void] {.gcsafe.} = - return handleIncomingStream(`networkVar`, `streamVar`, + return handleIncomingStream(`networkVar`, `streamVar`, `protocolVar`, `MsgStrongRecName`) mount `networkVar`.switch, - LPProtocol(codecs: @[`codecNameLit` & "ssz_snappy"], - handler: snappyThunk) + LPProtocol(codecs: @[`codecNameLit`], handler: snappyThunk) ## ## Implement Senders and Handshake diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index b2feb19d0..0d93acf81 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -26,17 +26,15 @@ logScope: const MAX_REQUEST_BLOCKS* = 1024 - blockByRootLookupCost = allowedOpsPerSecondCost(50) - blockResponseCost = allowedOpsPerSecondCost(100) - blockByRangeLookupCost = allowedOpsPerSecondCost(20) + + blockResponseCost = allowedOpsPerSecondCost(64) # Allow syncing ~64 blocks/sec (minus request costs) # https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#configuration MAX_REQUEST_LIGHT_CLIENT_UPDATES* = 128 - lightClientEmptyResponseCost = allowedOpsPerSecondCost(50) - lightClientBootstrapLookupCost = allowedOpsPerSecondCost(5) - lightClientBootstrapResponseCost = allowedOpsPerSecondCost(100) - lightClientUpdateResponseCost = allowedOpsPerSecondCost(100) - lightClientUpdateByRangeLookupCost = allowedOpsPerSecondCost(20) + lightClientBootstrapResponseCost = allowedOpsPerSecondCost(1) + ## Only one bootstrap per peer should ever be needed - no need to allow more + lightClientUpdateResponseCost = allowedOpsPerSecondCost(1000) + ## Updates are tiny - we can allow lots of them lightClientFinalityUpdateResponseCost = allowedOpsPerSecondCost(100) lightClientOptimisticUpdateResponseCost = allowedOpsPerSecondCost(100) @@ -311,9 +309,6 @@ p2pProtocol BeaconSync(version = 1, startIndex = dag.getBlockRange(startSlot, reqStep, blocks.toOpenArray(0, endIndex)) - peer.updateRequestQuota(blockByRangeLookupCost) - peer.awaitNonNegativeRequestQuota() - var found = 0 bytes: seq[byte] @@ -333,8 +328,9 @@ p2pProtocol BeaconSync(version = 1, bytes = bytes.len(), blck = shortLog(blocks[i]) continue - peer.updateRequestQuota(blockResponseCost) - peer.awaitNonNegativeRequestQuota() + # TODO extract from libp2pProtocol + peer.awaitQuota(blockResponseCost, "beacon_blocks_by_range/1") + peer.network.awaitQuota(blockResponseCost, "beacon_blocks_by_range/1") await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0 bytes @@ -375,9 +371,6 @@ p2pProtocol BeaconSync(version = 1, found = 0 bytes: seq[byte] - peer.updateRequestQuota(count.float * blockByRootLookupCost) - peer.awaitNonNegativeRequestQuota() - for i in 0..