diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 37bb59d..1ff4c59 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -80,7 +80,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = enablePX: false, bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps iwantTimeout: 3 * GossipSubHeartbeatInterval, - overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]) + overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]), + disconnectPeerAboveRateLimit: false ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -384,17 +385,17 @@ proc validateAndRelay(g: GossipSub, proc dataAndTopicsIdSize(msgs: seq[Message]): int = msgs.mapIt(it.data.len + it.topicIds.mapIt(it.len).foldl(a + b, 0)).foldl(a + b, 0) -proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: int) {.raises:[PeerRateLimitError, CatchableError].} = +proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: int) {.raises:[PeerRateLimitError, CatchableError], async.} = # In this way we count even ignored fields by protobuf var rmsg = rpcMsgOpt.valueOr: peer.overheadRateLimitOpt.withValue(overheadRateLimit): if not overheadRateLimit.tryConsume(msgSize): - libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. - debug "Peer sent a msg that couldn't be decoded and it's above rate limit", peer, uselessAppBytesNum = msgSize - # discard g.disconnectPeer(peer) - # debug "Peer disconnected", peer, uselessAppBytesNum = msgSize - # raise newException(PeerRateLimitError, "Peer sent a msg that couldn't be decoded and it's above rate limit") + libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. + debug "Peer sent a msg that couldn't be decoded and it's above rate limit.", peer, uselessAppBytesNum = msgSize + if g.parameters.disconnectPeerAboveRateLimit: + await g.disconnectPeer(peer) + raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.") raise newException(CatchableError, "Peer msg couldn't be decoded") @@ -410,11 +411,11 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: peer.overheadRateLimitOpt.withValue(overheadRateLimit): if not overheadRateLimit.tryConsume(uselessAppBytesNum): - libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. + libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. debug "Peer sent too much useless application data and it's above rate limit.", peer, msgSize, uselessAppBytesNum, rmsg - # discard g.disconnectPeer(peer) - # debug "Peer disconnected", peer, msgSize, uselessAppBytesNum - # raise newException(PeerRateLimitError, "Peer sent too much useless application data and it's above rate limit.") + if g.parameters.disconnectPeerAboveRateLimit: + await g.disconnectPeer(peer) + raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.") method rpcHandler*(g: GossipSub, peer: PubSubPeer, @@ -423,11 +424,11 @@ method rpcHandler*(g: GossipSub, let msgSize = data.len var rpcMsg = decodeRpcMsg(data).valueOr: debug "failed to decode msg from peer", peer, err = error - rateLimit(g, peer, Opt.none(RPCMsg), msgSize) + await rateLimit(g, peer, Opt.none(RPCMsg), msgSize) return trace "decoded msg from peer", peer, msg = rpcMsg.shortLog - rateLimit(g, peer, Opt.some(rpcMsg), msgSize) + await rateLimit(g, peer, Opt.some(rpcMsg), msgSize) # trigger hooks peer.recvObservers(rpcMsg) diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index f1c1e92..4fe3060 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -30,7 +30,7 @@ declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed go declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) -declarePublicCounter(libp2p_gossipsub_peers_rate_limit_disconnections, "The number of peer disconnections by gossipsub because of rate limit", labels = ["agent"]) +declarePublicCounter(libp2p_gossipsub_peers_rate_limit_hits, "The number of times peers were above their rate limit", labels = ["agent"]) proc init*(_: type[TopicParams]): TopicParams = TopicParams( @@ -245,7 +245,7 @@ proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) = peer.overheadRateLimitOpt.withValue(overheadRateLimit): if not overheadRateLimit.tryConsume(uselessAppBytesNum): debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum - libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. + libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. # discard g.disconnectPeer(peer) # debug "Peer disconnected", peer, uselessAppBytesNum # raise newException(PeerRateLimitError, "Peer sent invalid message and it's above rate limit") diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 635fc91..3387e91 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -146,6 +146,7 @@ type iwantTimeout*: Duration overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]] + disconnectPeerAboveRateLimit*: bool BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]