From e124e342b04b0c79c32c71abe0b13041ebd9d2a8 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Date: Fri, 12 Feb 2021 12:27:26 +0900 Subject: [PATCH] n subscription limits (#528) * subscription high water, cleanups * subscription limits test * newline --- libp2p/protocols/pubsub/gossipsub.nim | 16 ++++++++++--- .../protocols/pubsub/gossipsub/behavior.nim | 11 +++++---- libp2p/protocols/pubsub/pubsub.nim | 15 ++++++++---- tests/pubsub/testgossipinternal.nim | 24 +++++++++++++++++++ 4 files changed, 54 insertions(+), 12 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index b609c9a85..beee0be77 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -54,13 +54,13 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = historyGossip: GossipSubHistoryGossip, fanoutTTL: GossipSubFanoutTTL, seenTTL: 2.minutes, - gossipThreshold: -10, - publishThreshold: -100, + gossipThreshold: -100, + publishThreshold: -1000, graylistThreshold: -10000, opportunisticGraftThreshold: 0, decayInterval: 1.seconds, decayToZero: 0.01, - retainScore: 10.seconds, + retainScore: 2.minutes, appSpecificWeight: 0.0, ipColocationFactorWeight: 0.0, ipColocationFactorThreshold: 1.0, @@ -266,8 +266,18 @@ method subscribeTopic*(g: GossipSub, method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = + # base will check the amount of subscriptions and process subscriptions + # also will update some metrics await procCall PubSub(g).rpcHandler(peer, rpcMsg) + # the above call applied limtis to subs number + # in gossipsub we want to apply scoring as well + if rpcMsg.subscriptions.len > g.topicsHigh: + debug "received an rpc message with an oversized amount of subscriptions", peer, + size = rpcMsg.subscriptions.len, + limit = g.topicsHigh + peer.behaviourPenalty += 0.1 + for msg in rpcMsg.messages: # for every message let msgId = g.msgIdProvider(msg) diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 37ca392ce..aedacf0b1 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -97,8 +97,7 @@ proc handleGraft*(g: GossipSub, # It is an error to GRAFT on a explicit peer if peer.peerId in g.parameters.directPeers: # receiving a graft from a direct peer should yield a more prominent warning (protocol violation) - debug "attempt to graft an explicit peer", peer=peer.peerId, - topic + warn "attempt to graft an explicit peer, peering agreements should be reciprocal", peer=peer.peerId, topic # and such an attempt should be logged and rejected with a PRUNE result.add(ControlPrune( topicID: topic, @@ -117,8 +116,8 @@ proc handleGraft*(g: GossipSub, if g.backingOff .getOrDefault(topic) .getOrDefault(peer.peerId) > Moment.now(): - debug "attempt to graft a backingOff peer", peer=peer.peerId, - topic + debug "attempt to graft a backingOff peer", peer=peer.peerId, + topic # and such an attempt should be logged and rejected with a PRUNE result.add(ControlPrune( topicID: topic, @@ -134,6 +133,7 @@ proc handleGraft*(g: GossipSub, continue + # Notice this might not be necessary anymore if peer.peerId notin g.peerStats: g.initPeerStats(peer) @@ -196,6 +196,9 @@ proc handleIHave*(g: GossipSub, elif peer.iHaveBudget <= 0: trace "ihave: ignoring out of budget peer", peer, score = peer.score else: + # TODO review deduplicate algorithm + # * https://github.com/nim-lang/Nim/blob/5f46474555ee93306cce55342e81130c1da79a42/lib/pure/collections/sequtils.nim#L184 + # * it's probably not efficient and might give preference to the first dupe var deIhaves = ihaves.deduplicate() for ihave in deIhaves.mitems: trace "peer sent ihave", diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 5f6bf9dee..ac13675b3 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -106,6 +106,7 @@ type msgSeqno*: uint64 anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions + topicsHigh*: int # the maximum number of topics we allow in a subscription message (application specific, defaults to int max) knownTopics*: HashSet[string] @@ -207,11 +208,13 @@ method rpcHandler*(p: PubSub, rpcMsg: RPCMsg) {.async, base.} = ## handle rpc messages trace "processing RPC message", msg = rpcMsg.shortLog, peer - for s in rpcMsg.subscriptions: # subscribe/unsubscribe the peer for each topic - trace "about to subscribe to topic", topicId = s.topic, peer + for i in 0.. 0.0 + + await conn.close() + await gossipSub.switch.stop()