n subscription limits (#528)
* subscription high water, cleanups * subscription limits test * newline
This commit is contained in:
parent
12adefb4de
commit
e124e342b0
|
@ -54,13 +54,13 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
|
||||||
historyGossip: GossipSubHistoryGossip,
|
historyGossip: GossipSubHistoryGossip,
|
||||||
fanoutTTL: GossipSubFanoutTTL,
|
fanoutTTL: GossipSubFanoutTTL,
|
||||||
seenTTL: 2.minutes,
|
seenTTL: 2.minutes,
|
||||||
gossipThreshold: -10,
|
gossipThreshold: -100,
|
||||||
publishThreshold: -100,
|
publishThreshold: -1000,
|
||||||
graylistThreshold: -10000,
|
graylistThreshold: -10000,
|
||||||
opportunisticGraftThreshold: 0,
|
opportunisticGraftThreshold: 0,
|
||||||
decayInterval: 1.seconds,
|
decayInterval: 1.seconds,
|
||||||
decayToZero: 0.01,
|
decayToZero: 0.01,
|
||||||
retainScore: 10.seconds,
|
retainScore: 2.minutes,
|
||||||
appSpecificWeight: 0.0,
|
appSpecificWeight: 0.0,
|
||||||
ipColocationFactorWeight: 0.0,
|
ipColocationFactorWeight: 0.0,
|
||||||
ipColocationFactorThreshold: 1.0,
|
ipColocationFactorThreshold: 1.0,
|
||||||
|
@ -266,8 +266,18 @@ method subscribeTopic*(g: GossipSub,
|
||||||
method rpcHandler*(g: GossipSub,
|
method rpcHandler*(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
rpcMsg: RPCMsg) {.async.} =
|
rpcMsg: RPCMsg) {.async.} =
|
||||||
|
# base will check the amount of subscriptions and process subscriptions
|
||||||
|
# also will update some metrics
|
||||||
await procCall PubSub(g).rpcHandler(peer, rpcMsg)
|
await procCall PubSub(g).rpcHandler(peer, rpcMsg)
|
||||||
|
|
||||||
|
# the above call applied limtis to subs number
|
||||||
|
# in gossipsub we want to apply scoring as well
|
||||||
|
if rpcMsg.subscriptions.len > g.topicsHigh:
|
||||||
|
debug "received an rpc message with an oversized amount of subscriptions", peer,
|
||||||
|
size = rpcMsg.subscriptions.len,
|
||||||
|
limit = g.topicsHigh
|
||||||
|
peer.behaviourPenalty += 0.1
|
||||||
|
|
||||||
for msg in rpcMsg.messages: # for every message
|
for msg in rpcMsg.messages: # for every message
|
||||||
let msgId = g.msgIdProvider(msg)
|
let msgId = g.msgIdProvider(msg)
|
||||||
|
|
||||||
|
|
|
@ -97,8 +97,7 @@ proc handleGraft*(g: GossipSub,
|
||||||
# It is an error to GRAFT on a explicit peer
|
# It is an error to GRAFT on a explicit peer
|
||||||
if peer.peerId in g.parameters.directPeers:
|
if peer.peerId in g.parameters.directPeers:
|
||||||
# receiving a graft from a direct peer should yield a more prominent warning (protocol violation)
|
# receiving a graft from a direct peer should yield a more prominent warning (protocol violation)
|
||||||
debug "attempt to graft an explicit peer", peer=peer.peerId,
|
warn "attempt to graft an explicit peer, peering agreements should be reciprocal", peer=peer.peerId, topic
|
||||||
topic
|
|
||||||
# and such an attempt should be logged and rejected with a PRUNE
|
# and such an attempt should be logged and rejected with a PRUNE
|
||||||
result.add(ControlPrune(
|
result.add(ControlPrune(
|
||||||
topicID: topic,
|
topicID: topic,
|
||||||
|
@ -117,8 +116,8 @@ proc handleGraft*(g: GossipSub,
|
||||||
if g.backingOff
|
if g.backingOff
|
||||||
.getOrDefault(topic)
|
.getOrDefault(topic)
|
||||||
.getOrDefault(peer.peerId) > Moment.now():
|
.getOrDefault(peer.peerId) > Moment.now():
|
||||||
debug "attempt to graft a backingOff peer", peer=peer.peerId,
|
debug "attempt to graft a backingOff peer", peer=peer.peerId,
|
||||||
topic
|
topic
|
||||||
# and such an attempt should be logged and rejected with a PRUNE
|
# and such an attempt should be logged and rejected with a PRUNE
|
||||||
result.add(ControlPrune(
|
result.add(ControlPrune(
|
||||||
topicID: topic,
|
topicID: topic,
|
||||||
|
@ -134,6 +133,7 @@ proc handleGraft*(g: GossipSub,
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Notice this might not be necessary anymore
|
||||||
if peer.peerId notin g.peerStats:
|
if peer.peerId notin g.peerStats:
|
||||||
g.initPeerStats(peer)
|
g.initPeerStats(peer)
|
||||||
|
|
||||||
|
@ -196,6 +196,9 @@ proc handleIHave*(g: GossipSub,
|
||||||
elif peer.iHaveBudget <= 0:
|
elif peer.iHaveBudget <= 0:
|
||||||
trace "ihave: ignoring out of budget peer", peer, score = peer.score
|
trace "ihave: ignoring out of budget peer", peer, score = peer.score
|
||||||
else:
|
else:
|
||||||
|
# TODO review deduplicate algorithm
|
||||||
|
# * https://github.com/nim-lang/Nim/blob/5f46474555ee93306cce55342e81130c1da79a42/lib/pure/collections/sequtils.nim#L184
|
||||||
|
# * it's probably not efficient and might give preference to the first dupe
|
||||||
var deIhaves = ihaves.deduplicate()
|
var deIhaves = ihaves.deduplicate()
|
||||||
for ihave in deIhaves.mitems:
|
for ihave in deIhaves.mitems:
|
||||||
trace "peer sent ihave",
|
trace "peer sent ihave",
|
||||||
|
|
|
@ -106,6 +106,7 @@ type
|
||||||
msgSeqno*: uint64
|
msgSeqno*: uint64
|
||||||
anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send
|
anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send
|
||||||
subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions
|
subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions
|
||||||
|
topicsHigh*: int # the maximum number of topics we allow in a subscription message (application specific, defaults to int max)
|
||||||
|
|
||||||
knownTopics*: HashSet[string]
|
knownTopics*: HashSet[string]
|
||||||
|
|
||||||
|
@ -207,11 +208,13 @@ method rpcHandler*(p: PubSub,
|
||||||
rpcMsg: RPCMsg) {.async, base.} =
|
rpcMsg: RPCMsg) {.async, base.} =
|
||||||
## handle rpc messages
|
## handle rpc messages
|
||||||
trace "processing RPC message", msg = rpcMsg.shortLog, peer
|
trace "processing RPC message", msg = rpcMsg.shortLog, peer
|
||||||
for s in rpcMsg.subscriptions: # subscribe/unsubscribe the peer for each topic
|
for i in 0..<min(rpcMsg.subscriptions.len, p.topicsHigh):
|
||||||
trace "about to subscribe to topic", topicId = s.topic, peer
|
let s = rpcMsg.subscriptions[i]
|
||||||
|
trace "about to subscribe to topic", topicId = s.topic, peer, subscribe = s.subscribe
|
||||||
p.subscribeTopic(s.topic, s.subscribe, peer)
|
p.subscribeTopic(s.topic, s.subscribe, peer)
|
||||||
|
|
||||||
for sub in rpcMsg.subscriptions:
|
for i in 0..<min(rpcMsg.subscriptions.len, p.topicsHigh):
|
||||||
|
let sub = rpcMsg.subscriptions[i]
|
||||||
if sub.subscribe:
|
if sub.subscribe:
|
||||||
if p.knownTopics.contains(sub.topic):
|
if p.knownTopics.contains(sub.topic):
|
||||||
libp2p_pubsub_received_subscriptions.inc(labelValues = [sub.topic])
|
libp2p_pubsub_received_subscriptions.inc(labelValues = [sub.topic])
|
||||||
|
@ -520,7 +523,8 @@ proc init*[PubParams: object | bool](
|
||||||
peers: initTable[PeerID, PubSubPeer](),
|
peers: initTable[PeerID, PubSubPeer](),
|
||||||
topics: initTable[string, Topic](),
|
topics: initTable[string, Topic](),
|
||||||
msgIdProvider: msgIdProvider,
|
msgIdProvider: msgIdProvider,
|
||||||
subscriptionValidator: subscriptionValidator)
|
subscriptionValidator: subscriptionValidator,
|
||||||
|
topicsHigh: int.high)
|
||||||
else:
|
else:
|
||||||
P(switch: switch,
|
P(switch: switch,
|
||||||
peerInfo: switch.peerInfo,
|
peerInfo: switch.peerInfo,
|
||||||
|
@ -532,7 +536,8 @@ proc init*[PubParams: object | bool](
|
||||||
topics: initTable[string, Topic](),
|
topics: initTable[string, Topic](),
|
||||||
msgIdProvider: msgIdProvider,
|
msgIdProvider: msgIdProvider,
|
||||||
subscriptionValidator: subscriptionValidator,
|
subscriptionValidator: subscriptionValidator,
|
||||||
parameters: parameters)
|
parameters: parameters,
|
||||||
|
topicsHigh: int.high)
|
||||||
|
|
||||||
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =
|
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =
|
||||||
if event.kind == PeerEventKind.Joined:
|
if event.kind == PeerEventKind.Joined:
|
||||||
|
|
|
@ -485,3 +485,27 @@ suite "GossipSub internal":
|
||||||
|
|
||||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||||
await gossipSub.switch.stop()
|
await gossipSub.switch.stop()
|
||||||
|
|
||||||
|
asyncTest "subscription limits":
|
||||||
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||||
|
gossipSub.topicsHigh = 10
|
||||||
|
|
||||||
|
var tooManyTopics: seq[string]
|
||||||
|
for i in 0..gossipSub.topicsHigh + 10:
|
||||||
|
tooManyTopics &= "topic" & $i
|
||||||
|
let lotOfSubs = RPCMsg.withSubs(tooManyTopics, true)
|
||||||
|
|
||||||
|
let conn = newBufferStream(noop)
|
||||||
|
let peerInfo = randomPeerInfo()
|
||||||
|
conn.peerInfo = peerInfo
|
||||||
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
||||||
|
gossipSub.peers[peerInfo.peerId] = peer
|
||||||
|
|
||||||
|
await gossipSub.rpcHandler(peer, lotOfSubs)
|
||||||
|
|
||||||
|
check:
|
||||||
|
gossipSub.gossipSub.len == gossipSub.topicsHigh
|
||||||
|
peer.behaviourPenalty > 0.0
|
||||||
|
|
||||||
|
await conn.close()
|
||||||
|
await gossipSub.switch.stop()
|
||||||
|
|
Loading…
Reference in New Issue