From b2eac7ecbdb695b0b7033f2069b03a63d28aee2b Mon Sep 17 00:00:00 2001 From: Tanguy Date: Fri, 15 Sep 2023 17:22:02 +0200 Subject: [PATCH] GS: Relay messages to direct peers (#949) --- libp2p/protocols/pubsub/gossipsub.nim | 21 ++++++++++++------- .../protocols/pubsub/gossipsub/behavior.nim | 11 +++++----- libp2p/protocols/pubsub/gossipsub/types.nim | 2 +- tests/pubsub/testgossipsub2.nim | 18 +++++++++++----- 4 files changed, 33 insertions(+), 19 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index ffca59471..794ea3e7e 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -205,8 +205,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) = for t in toSeq(g.gossipsub.keys): g.gossipsub.removePeer(t, pubSubPeer) - # also try to remove from explicit table here - g.explicit.removePeer(t, pubSubPeer) + # also try to remove from direct peers table here + g.subscribedDirectPeers.removePeer(t, pubSubPeer) for t in toSeq(g.fanout.keys): g.fanout.removePeer(t, pubSubPeer) @@ -245,7 +245,7 @@ proc handleSubscribe*(g: GossipSub, # subscribe remote peer to the topic discard g.gossipsub.addPeer(topic, peer) if peer.peerId in g.parameters.directPeers: - discard g.explicit.addPeer(topic, peer) + discard g.subscribedDirectPeers.addPeer(topic, peer) else: trace "peer unsubscribed from topic" @@ -259,7 +259,7 @@ proc handleSubscribe*(g: GossipSub, g.fanout.removePeer(topic, peer) if peer.peerId in g.parameters.directPeers: - g.explicit.removePeer(topic, peer) + g.subscribedDirectPeers.removePeer(topic, peer) trace "gossip peers", peers = g.gossipsub.peers(topic), topic @@ -338,6 +338,9 @@ proc validateAndRelay(g: GossipSub, g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) + # add direct peers + toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(t)) + # Don't send it to source peer, or peers that # sent it during validation toSendPeers.excl(peer) @@ -522,7 +525,7 @@ method publish*(g: GossipSub, var peers: HashSet[PubSubPeer] # add always direct peers - peers.incl(g.explicit.getOrDefault(topic)) + peers.incl(g.subscribedDirectPeers.getOrDefault(topic)) if topic in g.topics: # if we're subscribed use the mesh peers.incl(g.mesh.getOrDefault(topic)) @@ -608,11 +611,13 @@ method publish*(g: GossipSub, return peers.len proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} = - let peer = g.peers.getOrDefault(id) - if isNil(peer): + if id notin g.peers: trace "Attempting to dial a direct peer", peer = id + if g.switch.isConnected(id): + warn "We are connected to a direct peer, but it isn't a GossipSub peer!", id + return try: - await g.switch.connect(id, addrs) + await g.switch.connect(id, addrs, forceDial = true) # populate the peer after it's connected discard g.getOrCreatePeer(id, g.codecs) except CancelledError as exc: diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 232e1aa7e..008e19744 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -106,10 +106,11 @@ proc handleGraft*(g: GossipSub, let topic = graft.topicId trace "peer grafted topic", peer, topic - # It is an error to GRAFT on a explicit peer + # It is an error to GRAFT on a direct peer if peer.peerId in g.parameters.directPeers: # receiving a graft from a direct peer should yield a more prominent warning (protocol violation) - warn "an explicit peer attempted to graft us, peering agreements should be reciprocal", + # we are trusting direct peer not to abuse this + warn "a direct peer attempted to graft us, peering agreements should be reciprocal", peer, topic # and such an attempt should be logged and rejected with a PRUNE prunes.add(ControlPrune( @@ -352,7 +353,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) # avoid negative score peers it.score >= 0.0 and it notin currentMesh[] and - # don't pick explicit peers + # don't pick direct peers it.peerId notin g.parameters.directPeers and # and avoid peers we are backing off it.peerId notin backingOff: @@ -392,7 +393,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) it notin currentMesh[] and # avoid negative score peers it.score >= 0.0 and - # don't pick explicit peers + # don't pick direct peers it.peerId notin g.parameters.directPeers and # and avoid peers we are backing off it.peerId notin backingOff: @@ -494,7 +495,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) # avoid negative score peers it.score >= median.score and it notin currentMesh[] and - # don't pick explicit peers + # don't pick direct peers it.peerId notin g.parameters.directPeers and # and avoid peers we are backing off it.peerId notin backingOff: diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 25a9e636a..ca79b290e 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -159,7 +159,7 @@ type mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic gossipsub*: PeerTable # peers that are subscribed to a topic - explicit*: PeerTable # directpeers that we keep alive explicitly + subscribedDirectPeers*: PeerTable # directpeers that we keep alive backingOff*: BackoffTable # peers to backoff from when replenishing the mesh lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics gossip*: Table[string, seq[ControlIHave]] # pending gossip diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index 8132a098b..a076fe4aa 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -167,36 +167,44 @@ suite "GossipSub": asyncTest "GossipSub directPeers: always forward messages": let - nodes = generateNodes(2, gossip = true) + nodes = generateNodes(3, gossip = true) # start switches nodesFut = await allFinished( nodes[0].switch.start(), nodes[1].switch.start(), + nodes[2].switch.start(), ) await GossipSub(nodes[0]).addDirectPeer(nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs) await GossipSub(nodes[1]).addDirectPeer(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs) + await GossipSub(nodes[1]).addDirectPeer(nodes[2].switch.peerInfo.peerId, nodes[2].switch.peerInfo.addrs) + await GossipSub(nodes[2]).addDirectPeer(nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs) var handlerFut = newFuture[void]() proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = check topic == "foobar" handlerFut.complete() + proc noop(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" - nodes[0].subscribe("foobar", handler) - nodes[1].subscribe("foobar", handler) + nodes[0].subscribe("foobar", noop) + nodes[1].subscribe("foobar", noop) + nodes[2].subscribe("foobar", handler) tryPublish await nodes[0].publish("foobar", toBytes("hellow")), 1 - await handlerFut + await handlerFut.wait(2.seconds) # peer shouldn't be in our mesh check "foobar" notin GossipSub(nodes[0]).mesh check "foobar" notin GossipSub(nodes[1]).mesh + check "foobar" notin GossipSub(nodes[2]).mesh await allFuturesThrowing( nodes[0].switch.stop(), - nodes[1].switch.stop() + nodes[1].switch.stop(), + nodes[2].switch.stop() ) await allFuturesThrowing(nodesFut.concat())