mirror of
https://github.com/logos-storage/nim-libp2p.git
synced 2026-01-08 16:43:09 +00:00
GS: Relay messages to direct peers (#949)
This commit is contained in:
parent
20b0e40f7d
commit
b2eac7ecbd
@ -205,8 +205,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
|
|||||||
|
|
||||||
for t in toSeq(g.gossipsub.keys):
|
for t in toSeq(g.gossipsub.keys):
|
||||||
g.gossipsub.removePeer(t, pubSubPeer)
|
g.gossipsub.removePeer(t, pubSubPeer)
|
||||||
# also try to remove from explicit table here
|
# also try to remove from direct peers table here
|
||||||
g.explicit.removePeer(t, pubSubPeer)
|
g.subscribedDirectPeers.removePeer(t, pubSubPeer)
|
||||||
|
|
||||||
for t in toSeq(g.fanout.keys):
|
for t in toSeq(g.fanout.keys):
|
||||||
g.fanout.removePeer(t, pubSubPeer)
|
g.fanout.removePeer(t, pubSubPeer)
|
||||||
@ -245,7 +245,7 @@ proc handleSubscribe*(g: GossipSub,
|
|||||||
# subscribe remote peer to the topic
|
# subscribe remote peer to the topic
|
||||||
discard g.gossipsub.addPeer(topic, peer)
|
discard g.gossipsub.addPeer(topic, peer)
|
||||||
if peer.peerId in g.parameters.directPeers:
|
if peer.peerId in g.parameters.directPeers:
|
||||||
discard g.explicit.addPeer(topic, peer)
|
discard g.subscribedDirectPeers.addPeer(topic, peer)
|
||||||
else:
|
else:
|
||||||
trace "peer unsubscribed from topic"
|
trace "peer unsubscribed from topic"
|
||||||
|
|
||||||
@ -259,7 +259,7 @@ proc handleSubscribe*(g: GossipSub,
|
|||||||
|
|
||||||
g.fanout.removePeer(topic, peer)
|
g.fanout.removePeer(topic, peer)
|
||||||
if peer.peerId in g.parameters.directPeers:
|
if peer.peerId in g.parameters.directPeers:
|
||||||
g.explicit.removePeer(topic, peer)
|
g.subscribedDirectPeers.removePeer(topic, peer)
|
||||||
|
|
||||||
trace "gossip peers", peers = g.gossipsub.peers(topic), topic
|
trace "gossip peers", peers = g.gossipsub.peers(topic), topic
|
||||||
|
|
||||||
@ -338,6 +338,9 @@ proc validateAndRelay(g: GossipSub,
|
|||||||
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
|
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
|
||||||
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
|
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
|
||||||
|
|
||||||
|
# add direct peers
|
||||||
|
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(t))
|
||||||
|
|
||||||
# Don't send it to source peer, or peers that
|
# Don't send it to source peer, or peers that
|
||||||
# sent it during validation
|
# sent it during validation
|
||||||
toSendPeers.excl(peer)
|
toSendPeers.excl(peer)
|
||||||
@ -522,7 +525,7 @@ method publish*(g: GossipSub,
|
|||||||
var peers: HashSet[PubSubPeer]
|
var peers: HashSet[PubSubPeer]
|
||||||
|
|
||||||
# add always direct peers
|
# add always direct peers
|
||||||
peers.incl(g.explicit.getOrDefault(topic))
|
peers.incl(g.subscribedDirectPeers.getOrDefault(topic))
|
||||||
|
|
||||||
if topic in g.topics: # if we're subscribed use the mesh
|
if topic in g.topics: # if we're subscribed use the mesh
|
||||||
peers.incl(g.mesh.getOrDefault(topic))
|
peers.incl(g.mesh.getOrDefault(topic))
|
||||||
@ -608,11 +611,13 @@ method publish*(g: GossipSub,
|
|||||||
return peers.len
|
return peers.len
|
||||||
|
|
||||||
proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
|
proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
|
||||||
let peer = g.peers.getOrDefault(id)
|
if id notin g.peers:
|
||||||
if isNil(peer):
|
|
||||||
trace "Attempting to dial a direct peer", peer = id
|
trace "Attempting to dial a direct peer", peer = id
|
||||||
|
if g.switch.isConnected(id):
|
||||||
|
warn "We are connected to a direct peer, but it isn't a GossipSub peer!", id
|
||||||
|
return
|
||||||
try:
|
try:
|
||||||
await g.switch.connect(id, addrs)
|
await g.switch.connect(id, addrs, forceDial = true)
|
||||||
# populate the peer after it's connected
|
# populate the peer after it's connected
|
||||||
discard g.getOrCreatePeer(id, g.codecs)
|
discard g.getOrCreatePeer(id, g.codecs)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
|
|||||||
@ -106,10 +106,11 @@ proc handleGraft*(g: GossipSub,
|
|||||||
let topic = graft.topicId
|
let topic = graft.topicId
|
||||||
trace "peer grafted topic", peer, topic
|
trace "peer grafted topic", peer, topic
|
||||||
|
|
||||||
# It is an error to GRAFT on a explicit peer
|
# It is an error to GRAFT on a direct peer
|
||||||
if peer.peerId in g.parameters.directPeers:
|
if peer.peerId in g.parameters.directPeers:
|
||||||
# receiving a graft from a direct peer should yield a more prominent warning (protocol violation)
|
# receiving a graft from a direct peer should yield a more prominent warning (protocol violation)
|
||||||
warn "an explicit peer attempted to graft us, peering agreements should be reciprocal",
|
# we are trusting direct peer not to abuse this
|
||||||
|
warn "a direct peer attempted to graft us, peering agreements should be reciprocal",
|
||||||
peer, topic
|
peer, topic
|
||||||
# and such an attempt should be logged and rejected with a PRUNE
|
# and such an attempt should be logged and rejected with a PRUNE
|
||||||
prunes.add(ControlPrune(
|
prunes.add(ControlPrune(
|
||||||
@ -352,7 +353,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
|
|||||||
# avoid negative score peers
|
# avoid negative score peers
|
||||||
it.score >= 0.0 and
|
it.score >= 0.0 and
|
||||||
it notin currentMesh[] and
|
it notin currentMesh[] and
|
||||||
# don't pick explicit peers
|
# don't pick direct peers
|
||||||
it.peerId notin g.parameters.directPeers and
|
it.peerId notin g.parameters.directPeers and
|
||||||
# and avoid peers we are backing off
|
# and avoid peers we are backing off
|
||||||
it.peerId notin backingOff:
|
it.peerId notin backingOff:
|
||||||
@ -392,7 +393,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
|
|||||||
it notin currentMesh[] and
|
it notin currentMesh[] and
|
||||||
# avoid negative score peers
|
# avoid negative score peers
|
||||||
it.score >= 0.0 and
|
it.score >= 0.0 and
|
||||||
# don't pick explicit peers
|
# don't pick direct peers
|
||||||
it.peerId notin g.parameters.directPeers and
|
it.peerId notin g.parameters.directPeers and
|
||||||
# and avoid peers we are backing off
|
# and avoid peers we are backing off
|
||||||
it.peerId notin backingOff:
|
it.peerId notin backingOff:
|
||||||
@ -494,7 +495,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
|
|||||||
# avoid negative score peers
|
# avoid negative score peers
|
||||||
it.score >= median.score and
|
it.score >= median.score and
|
||||||
it notin currentMesh[] and
|
it notin currentMesh[] and
|
||||||
# don't pick explicit peers
|
# don't pick direct peers
|
||||||
it.peerId notin g.parameters.directPeers and
|
it.peerId notin g.parameters.directPeers and
|
||||||
# and avoid peers we are backing off
|
# and avoid peers we are backing off
|
||||||
it.peerId notin backingOff:
|
it.peerId notin backingOff:
|
||||||
|
|||||||
@ -159,7 +159,7 @@ type
|
|||||||
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
|
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
|
||||||
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
|
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
|
||||||
gossipsub*: PeerTable # peers that are subscribed to a topic
|
gossipsub*: PeerTable # peers that are subscribed to a topic
|
||||||
explicit*: PeerTable # directpeers that we keep alive explicitly
|
subscribedDirectPeers*: PeerTable # directpeers that we keep alive
|
||||||
backingOff*: BackoffTable # peers to backoff from when replenishing the mesh
|
backingOff*: BackoffTable # peers to backoff from when replenishing the mesh
|
||||||
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
|
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
|
||||||
gossip*: Table[string, seq[ControlIHave]] # pending gossip
|
gossip*: Table[string, seq[ControlIHave]] # pending gossip
|
||||||
|
|||||||
@ -167,36 +167,44 @@ suite "GossipSub":
|
|||||||
|
|
||||||
asyncTest "GossipSub directPeers: always forward messages":
|
asyncTest "GossipSub directPeers: always forward messages":
|
||||||
let
|
let
|
||||||
nodes = generateNodes(2, gossip = true)
|
nodes = generateNodes(3, gossip = true)
|
||||||
|
|
||||||
# start switches
|
# start switches
|
||||||
nodesFut = await allFinished(
|
nodesFut = await allFinished(
|
||||||
nodes[0].switch.start(),
|
nodes[0].switch.start(),
|
||||||
nodes[1].switch.start(),
|
nodes[1].switch.start(),
|
||||||
|
nodes[2].switch.start(),
|
||||||
)
|
)
|
||||||
|
|
||||||
await GossipSub(nodes[0]).addDirectPeer(nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs)
|
await GossipSub(nodes[0]).addDirectPeer(nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs)
|
||||||
await GossipSub(nodes[1]).addDirectPeer(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs)
|
await GossipSub(nodes[1]).addDirectPeer(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs)
|
||||||
|
await GossipSub(nodes[1]).addDirectPeer(nodes[2].switch.peerInfo.peerId, nodes[2].switch.peerInfo.addrs)
|
||||||
|
await GossipSub(nodes[2]).addDirectPeer(nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs)
|
||||||
|
|
||||||
var handlerFut = newFuture[void]()
|
var handlerFut = newFuture[void]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
handlerFut.complete()
|
handlerFut.complete()
|
||||||
|
proc noop(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
check topic == "foobar"
|
||||||
|
|
||||||
nodes[0].subscribe("foobar", handler)
|
nodes[0].subscribe("foobar", noop)
|
||||||
nodes[1].subscribe("foobar", handler)
|
nodes[1].subscribe("foobar", noop)
|
||||||
|
nodes[2].subscribe("foobar", handler)
|
||||||
|
|
||||||
tryPublish await nodes[0].publish("foobar", toBytes("hellow")), 1
|
tryPublish await nodes[0].publish("foobar", toBytes("hellow")), 1
|
||||||
|
|
||||||
await handlerFut
|
await handlerFut.wait(2.seconds)
|
||||||
|
|
||||||
# peer shouldn't be in our mesh
|
# peer shouldn't be in our mesh
|
||||||
check "foobar" notin GossipSub(nodes[0]).mesh
|
check "foobar" notin GossipSub(nodes[0]).mesh
|
||||||
check "foobar" notin GossipSub(nodes[1]).mesh
|
check "foobar" notin GossipSub(nodes[1]).mesh
|
||||||
|
check "foobar" notin GossipSub(nodes[2]).mesh
|
||||||
|
|
||||||
await allFuturesThrowing(
|
await allFuturesThrowing(
|
||||||
nodes[0].switch.stop(),
|
nodes[0].switch.stop(),
|
||||||
nodes[1].switch.stop()
|
nodes[1].switch.stop(),
|
||||||
|
nodes[2].switch.stop()
|
||||||
)
|
)
|
||||||
|
|
||||||
await allFuturesThrowing(nodesFut.concat())
|
await allFuturesThrowing(nodesFut.concat())
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user