From 6893bd9dbb4f9a3a9cd1eafc2430dd2157dd4590 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Thu, 2 Dec 2021 15:47:40 +0100 Subject: [PATCH] Customizable gossipsub backoff on unsubscribe (#665) * Customizable gossipsub backoff on unsubscribe * change default to 5s --- libp2p/protocols/pubsub/gossipsub.nim | 7 +- .../protocols/pubsub/gossipsub/behavior.nim | 15 ++++- libp2p/protocols/pubsub/gossipsub/types.nim | 1 + tests/pubsub/testgossipsub.nim | 64 +++++++++++++++++++ tests/pubsub/utils.nim | 2 +- 5 files changed, 83 insertions(+), 6 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 64181bff4..ef5300c44 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -43,6 +43,7 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = GossipSubParams( explicit: true, pruneBackoff: 1.minutes, + unsubcribeBackoff: 5.seconds, floodPublish: true, gossipFactor: 0.25, d: GossipSubD, @@ -77,6 +78,8 @@ proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = err("gossipsub: dOut parameter error, Number of outbound connections to keep in the mesh. Must be less than D_lo and at most D/2") elif parameters.gossipThreshold >= 0: err("gossipsub: gossipThreshold parameter error, Must be < 0") + elif parameters.unsubcribeBackoff.seconds <= 0: + err("gossipsub: unsubcribeBackoff parameter error, Must be > 0 seconds") elif parameters.publishThreshold >= parameters.gossipThreshold: err("gossipsub: publishThreshold parameter error, Must be < gossipThreshold") elif parameters.graylistThreshold >= parameters.publishThreshold: @@ -413,11 +416,11 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) = prune: @[ControlPrune( topicID: topic, peers: g.peerExchangeList(topic), - backoff: g.parameters.pruneBackoff.seconds.uint64)]))) + backoff: g.parameters.unsubcribeBackoff.seconds.uint64)]))) g.broadcast(mpeers, msg) for peer in mpeers: - g.pruned(peer, topic) + g.pruned(peer, topic, backoff = some(g.parameters.unsubcribeBackoff)) g.mesh.del(topic) diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 8f9c55e89..28dca3274 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -38,11 +38,20 @@ proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} = trace "grafted", peer=p, topic -proc pruned*(g: GossipSub, p: PubSubPeer, topic: string, setBackoff: bool = true) {.raises: [Defect].} = +proc pruned*(g: GossipSub, + p: PubSubPeer, + topic: string, + setBackoff: bool = true, + backoff = none(Duration)) {.raises: [Defect].} = if setBackoff: - let backoff = Moment.fromNow(g.parameters.pruneBackoff) + let + backoffDuration = + if isSome(backoff): backoff.get() + else: g.parameters.pruneBackoff + backoffMoment = Moment.fromNow(backoffDuration) + g.backingOff - .mgetOrPut(topic, initTable[PeerID, Moment]())[p.peerId] = backoff + .mgetOrPut(topic, initTable[PeerID, Moment]())[p.peerId] = backoffMoment g.peerStats.withValue(p.peerId, stats): stats.topicInfos.withValue(topic, info): diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index a59e185b4..6c78fe08b 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -102,6 +102,7 @@ type GossipSubParams* = object explicit*: bool pruneBackoff*: Duration + unsubcribeBackoff*: Duration floodPublish*: bool gossipFactor*: float64 d*: int diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 8e8339280..f953e5d46 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -320,6 +320,70 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + asyncTest "GossipSub unsub - resub faster than backoff": + var handlerFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + handlerFut.complete(true) + + let + nodes = generateNodes(2, gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + await subscribeNodes(nodes) + + nodes[0].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) + + var subs: seq[Future[void]] + subs &= waitSub(nodes[1], nodes[0], "foobar") + subs &= waitSub(nodes[0], nodes[1], "foobar") + + await allFuturesThrowing(subs) + + nodes[0].unsubscribe("foobar", handler) + nodes[0].subscribe("foobar", handler) + + # regular backoff is 60 seconds, so we must not wait that long + await (waitSub(nodes[0], nodes[1], "foobar") and waitSub(nodes[1], nodes[0], "foobar")).wait(30.seconds) + + var validatorFut = newFuture[bool]() + proc validator(topic: string, + message: Message): + Future[ValidationResult] {.async.} = + check topic == "foobar" + validatorFut.complete(true) + result = ValidationResult.Accept + + nodes[1].addValidator("foobar", validator) + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 + + check (await validatorFut) and (await handlerFut) + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - GossipSub should add remote peer topic subscriptions": proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 452edd412..43c3b9739 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -40,7 +40,7 @@ proc generateNodes*( msgIdProvider = msgIdProvider, anonymize = anonymize, maxMessageSize = maxMessageSize, - parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p)) + parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p.unsubcribeBackoff = 1.seconds; p)) # set some testing params, to enable scores g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0 g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0