diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 3b408e3b8..54e07f052 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -251,8 +251,11 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) = respControl.prune.add(g.handleGraft(peer, control.graft)) let messages = g.handleIWant(peer, control.iwant) - if respControl.graft.len > 0 or respControl.prune.len > 0 or - respControl.ihave.len > 0 or messages.len > 0: + if + respControl.graft.len > 0 or + respControl.prune.len > 0 or + respControl.iwant.len > 0 or + messages.len > 0: # iwant and prunes from here, also messages for smsg in messages: @@ -506,7 +509,7 @@ method publish*(g: GossipSub, else: libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"]) - trace "Published message to peers" + trace "Published message to peers", peers=peers.len return peers.len diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 0a2ac6175..f3a4e0a51 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -39,10 +39,11 @@ proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} = trace "grafted", peer=p, topic -proc pruned*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} = - let backoff = Moment.fromNow(g.parameters.pruneBackoff) - g.backingOff - .mgetOrPut(topic, initTable[PeerID, Moment]())[p.peerId] = backoff +proc pruned*(g: GossipSub, p: PubSubPeer, topic: string, setBackoff: bool = true) {.raises: [Defect].} = + if setBackoff: + let backoff = Moment.fromNow(g.parameters.pruneBackoff) + g.backingOff + .mgetOrPut(topic, initTable[PeerID, Moment]())[p.peerId] = backoff g.peerStats.withValue(p.peerId, stats): stats.topicInfos.withValue(topic, info): @@ -80,6 +81,7 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises: proc handleGraft*(g: GossipSub, peer: PubSubPeer, grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows + var prunes: seq[ControlPrune] for graft in grafts: let topic = graft.topicID trace "peer grafted topic", peer, topic @@ -90,7 +92,7 @@ proc handleGraft*(g: GossipSub, warn "attempt to graft an explicit peer, peering agreements should be reciprocal", peer, topic # and such an attempt should be logged and rejected with a PRUNE - result.add(ControlPrune( + prunes.add(ControlPrune( topicID: topic, peers: @[], # omitting heavy computation here as the remote did something illegal backoff: g.parameters.pruneBackoff.seconds.uint64)) @@ -108,7 +110,7 @@ proc handleGraft*(g: GossipSub, .getOrDefault(peer.peerId) > Moment.now(): debug "attempt to graft a backingOff peer", peer, topic # and such an attempt should be logged and rejected with a PRUNE - result.add(ControlPrune( + prunes.add(ControlPrune( topicID: topic, peers: @[], # omitting heavy computation here as the remote did something illegal backoff: g.parameters.pruneBackoff.seconds.uint64)) @@ -141,7 +143,7 @@ proc handleGraft*(g: GossipSub, else: trace "pruning grafting peer, mesh full", peer, topic, score = peer.score, mesh = g.mesh.peers(topic) - result.add(ControlPrune( + prunes.add(ControlPrune( topicID: topic, peers: g.peerExchangeList(topic), backoff: g.parameters.pruneBackoff.seconds.uint64)) @@ -149,6 +151,8 @@ proc handleGraft*(g: GossipSub, trace "peer grafting topic we're not interested in", peer, topic # gossip 1.1, we do not send a control message prune anymore + return prunes + proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [Defect].} = for prune in prunes: let topic = prune.topicID @@ -173,7 +177,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r .mgetOrPut(topic, initTable[PeerID, Moment]())[peer.peerId] = backoff trace "pruning rpc received peer", peer, score = peer.score - g.pruned(peer, topic) + g.pruned(peer, topic, setBackoff = false) g.mesh.removePeer(topic, peer) # TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that @@ -183,6 +187,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r proc handleIHave*(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant {.raises: [Defect].} = + var res: ControlIWant if peer.score < g.parameters.gossipThreshold: trace "ihave: ignoring low score peer", peer, score = peer.score elif peer.iHaveBudget <= 0: @@ -201,18 +206,20 @@ proc handleIHave*(g: GossipSub, for msgId in deIhavesMsgs: if not g.hasSeen(msgId): if peer.iHaveBudget > 0: - result.messageIDs.add(msgId) + res.messageIDs.add(msgId) dec peer.iHaveBudget + trace "requested message via ihave", messageID=msgId else: - return - - # shuffling result.messageIDs before sending it out to increase the likelihood + break + # shuffling res.messageIDs before sending it out to increase the likelihood # of getting an answer if the peer truncates the list due to internal size restrictions. - shuffle(result.messageIDs) + shuffle(res.messageIDs) + return res proc handleIWant*(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq[Message] {.raises: [Defect].} = + var messages: seq[Message] if peer.score < g.parameters.gossipThreshold: trace "iwant: ignoring low score peer", peer, score = peer.score elif peer.iWantBudget <= 0: @@ -227,10 +234,11 @@ proc handleIWant*(g: GossipSub, if msg.isSome: # avoid spam if peer.iWantBudget > 0: - result.add(msg.get()) + messages.add(msg.get()) dec peer.iWantBudget else: - return + break + return messages proc commitMetrics(metrics: var MeshMetrics) {.raises: [Defect].} = libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics) @@ -485,9 +493,10 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: ## var cacheWindowSize = 0 + var control: Table[PubSubPeer, ControlMessage] - trace "getting gossip peers (iHave)" let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) + trace "getting gossip peers (iHave)", ntopics=topics.len for topic in topics: if topic notin g.gossipsub: trace "topic not in gossip array, skipping", topicID = topic @@ -495,12 +504,15 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: let mids = g.mcache.window(topic) if not(mids.len > 0): + trace "no messages to emit" continue var midsSeq = toSeq(mids) cacheWindowSize += midsSeq.len + trace "got messages to emit", size=midsSeq.len + # not in spec # similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101 # and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582 @@ -530,10 +542,12 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: allPeers.setLen(target) for peer in allPeers: - result.mGetOrPut(peer, ControlMessage()).ihave.add(ihave) + control.mGetOrPut(peer, ControlMessage()).ihave.add(ihave) libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64) + return control + proc onHeartbeat(g: GossipSub) {.raises: [Defect].} = # reset IWANT budget # reset IHAVE cap diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index f944602b1..eda515604 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -563,6 +563,43 @@ suite "GossipSub internal": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() + asyncTest "rebalanceMesh fail due to backoff - remote": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + + var conns = newSeq[Connection]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + for i in 0..<15: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + peer.sendConn = conn + gossipSub.gossipsub[topic].incl(peer) + gossipSub.mesh[topic].incl(peer) + + check gossipSub.peers.len == 15 + gossipSub.rebalanceMesh(topic) + check gossipSub.mesh[topic].len != 0 + + for i in 0..<15: + let peerInfo = conns[i].peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.handlePrune(peer, @[ControlPrune( + topicID: topic, + peers: @[], + backoff: gossipSub.parameters.pruneBackoff.seconds.uint64 + )]) + + # expect topic cleaned up since they are all pruned + check topic notin gossipSub.mesh + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + asyncTest "rebalanceMesh Degree Hi - audit scenario": let gossipSub = TestGossipSub.init(newStandardSwitch()) let topic = "foobar" diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 9b12b2174..7cb0594b3 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -556,6 +556,63 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - GossipSub send over floodPublish A -> B": + var passed: Future[bool] = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + passed.complete(true) + + let + nodes = generateNodes( + 2, + gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + var gossip1: GossipSub = GossipSub(nodes[0]) + gossip1.parameters.floodPublish = true + var gossip2: GossipSub = GossipSub(nodes[1]) + gossip2.parameters.floodPublish = true + + await subscribeNodes(nodes) + + # nodes[0].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") + + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 + + check await passed + + check: + "foobar" in gossip1.gossipsub + "foobar" notin gossip2.gossipsub + not gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId) + not gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.peerId) + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - GossipSub with multiple peers": var runs = 10 @@ -660,212 +717,3 @@ suite "GossipSub": it.switch.stop()))) await allFuturesThrowing(nodesFut) - - asyncTest "GossipSub invalid topic subscription": - var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - handlerFut.complete(true) - - let - nodes = generateNodes(2, gossip = true) - - # start switches - nodesFut = await allFinished( - nodes[0].switch.start(), - nodes[1].switch.start(), - ) - - # start pubsub - await allFuturesThrowing( - allFinished( - nodes[0].start(), - nodes[1].start(), - )) - - var gossip = GossipSub(nodes[0]) - let invalidDetected = newFuture[void]() - gossip.subscriptionValidator = - proc(topic: string): bool = - if topic == "foobar": - try: - invalidDetected.complete() - except: - raise newException(Defect, "Exception during subscriptionValidator") - false - else: - true - - await subscribeNodes(nodes) - - nodes[0].subscribe("foobar", handler) - nodes[1].subscribe("foobar", handler) - - await invalidDetected.wait(10.seconds) - - await allFuturesThrowing( - nodes[0].switch.stop(), - nodes[1].switch.stop() - ) - - await allFuturesThrowing( - nodes[0].stop(), - nodes[1].stop() - ) - - await allFuturesThrowing(nodesFut.concat()) - - asyncTest "GossipSub test directPeers": - var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - handlerFut.complete(true) - - let - nodes = generateNodes(2, gossip = true) - - # start switches - nodesFut = await allFinished( - nodes[0].switch.start(), - nodes[1].switch.start(), - ) - - var gossip = GossipSub(nodes[0]) - gossip.parameters.directPeers[nodes[1].switch.peerInfo.peerId] = nodes[1].switch.peerInfo.addrs - - # start pubsub - await allFuturesThrowing( - allFinished( - nodes[0].start(), - nodes[1].start(), - )) - - let invalidDetected = newFuture[void]() - gossip.subscriptionValidator = - proc(topic: string): bool = - if topic == "foobar": - try: - invalidDetected.complete() - except: - raise newException(Defect, "Exception during subscriptionValidator") - false - else: - true - - # DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN - ### await subscribeNodes(nodes) - - nodes[0].subscribe("foobar", handler) - nodes[1].subscribe("foobar", handler) - - await invalidDetected.wait(10.seconds) - - await allFuturesThrowing( - nodes[0].switch.stop(), - nodes[1].switch.stop() - ) - - await allFuturesThrowing( - nodes[0].stop(), - nodes[1].stop() - ) - - await allFuturesThrowing(nodesFut.concat()) - - asyncTest "GossipsSub peers disconnections mechanics": - var runs = 10 - - let - nodes = generateNodes(runs, gossip = true, triggerSelf = true) - nodesFut = nodes.mapIt(it.switch.start()) - - await allFuturesThrowing(nodes.mapIt(it.start())) - await subscribeNodes(nodes) - - var seen: Table[string, int] - var seenFut = newFuture[void]() - for i in 0..= runs: - seenFut.complete() - - dialer.subscribe("foobar", handler) - await waitSub(nodes[0], dialer, "foobar") - - # ensure peer stats are stored properly and kept properly - check: - GossipSub(nodes[0]).peerStats.len == runs - 1 # minus self - - tryPublish await wait(nodes[0].publish("foobar", - toBytes("from node " & - $nodes[0].peerInfo.peerId)), - 1.minutes), 1, 5.seconds - - await wait(seenFut, 5.minutes) - check: seen.len >= runs - for k, v in seen.pairs: - check: v >= 1 - - for node in nodes: - var gossip = GossipSub(node) - check: - "foobar" in gossip.gossipsub - gossip.fanout.len == 0 - gossip.mesh["foobar"].len > 0 - - # Removing some subscriptions - - for i in 0.. 0, "waitSub timeout!") + +template tryPublish(call: untyped, require: int, wait: Duration = 1.seconds, times: int = 10): untyped = + var + limit = times + pubs = 0 + while pubs < require and limit > 0: + pubs = pubs + call + await sleepAsync(wait) + limit.dec() + if limit == 0: + doAssert(false, "Failed to publish!") + +suite "GossipSub": + teardown: + checkTrackers() + + asyncTest "e2e - GossipSub with multiple peers - control deliver (sparse)": + var runs = 10 + + let + nodes = generateNodes(runs, gossip = true, triggerSelf = true) + nodesFut = nodes.mapIt(it.switch.start()) + + await allFuturesThrowing(nodes.mapIt(it.start())) + await subscribeSparseNodes(nodes) + + var seen: Table[string, int] + var seenFut = newFuture[void]() + for i in 0..= runs: + seenFut.complete() + + dialer.subscribe("foobar", handler) + await waitSub(nodes[0], dialer, "foobar") + + # we want to test ping pong deliveries via control Iwant/Ihave, so we publish just in a tap + let publishedTo = nodes[0] + .publish("foobar", toBytes("from node " & $nodes[0].peerInfo.peerId)) + .await + check: + publishedTo != 0 + publishedTo != runs + + await wait(seenFut, 5.minutes) + check: seen.len >= runs + for k, v in seen.pairs: + check: v >= 1 + + await allFuturesThrowing( + nodes.mapIt( + allFutures( + it.stop(), + it.switch.stop()))) + + await allFuturesThrowing(nodesFut) + + asyncTest "GossipSub invalid topic subscription": + var handlerFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + handlerFut.complete(true) + + let + nodes = generateNodes(2, gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + var gossip = GossipSub(nodes[0]) + let invalidDetected = newFuture[void]() + gossip.subscriptionValidator = + proc(topic: string): bool = + if topic == "foobar": + try: + invalidDetected.complete() + except: + raise newException(Defect, "Exception during subscriptionValidator") + false + else: + true + + await subscribeNodes(nodes) + + nodes[0].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) + + await invalidDetected.wait(10.seconds) + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + + asyncTest "GossipSub test directPeers": + var handlerFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + handlerFut.complete(true) + + let + nodes = generateNodes(2, gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + var gossip = GossipSub(nodes[0]) + gossip.parameters.directPeers[nodes[1].switch.peerInfo.peerId] = nodes[1].switch.peerInfo.addrs + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + let invalidDetected = newFuture[void]() + gossip.subscriptionValidator = + proc(topic: string): bool = + if topic == "foobar": + try: + invalidDetected.complete() + except: + raise newException(Defect, "Exception during subscriptionValidator") + false + else: + true + + # DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN + ### await subscribeNodes(nodes) + + nodes[0].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) + + await invalidDetected.wait(10.seconds) + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + + asyncTest "GossipsSub peers disconnections mechanics": + var runs = 10 + + let + nodes = generateNodes(runs, gossip = true, triggerSelf = true) + nodesFut = nodes.mapIt(it.switch.start()) + + await allFuturesThrowing(nodes.mapIt(it.start())) + await subscribeNodes(nodes) + + var seen: Table[string, int] + var seenFut = newFuture[void]() + for i in 0..= runs: + seenFut.complete() + + dialer.subscribe("foobar", handler) + await waitSub(nodes[0], dialer, "foobar") + + # ensure peer stats are stored properly and kept properly + check: + GossipSub(nodes[0]).peerStats.len == runs - 1 # minus self + + tryPublish await wait(nodes[0].publish("foobar", + toBytes("from node " & + $nodes[0].peerInfo.peerId)), + 1.minutes), 1, 5.seconds + + await wait(seenFut, 5.minutes) + check: seen.len >= runs + for k, v in seen.pairs: + check: v >= 1 + + for node in nodes: + var gossip = GossipSub(node) + check: + "foobar" in gossip.gossipsub + gossip.fanout.len == 0 + gossip.mesh["foobar"].len > 0 + + # Removing some subscriptions + + for i in 0..