GS: Publish to fanout when mesh unhealthy (#638)

* Send to fanout when mesh unhealthy

* don't use fanout when floodPublish
This commit is contained in:
Tanguy 2022-02-21 16:22:08 +01:00 committed by GitHub
parent 3b718baa97
commit bc318084f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 83 additions and 6 deletions

View File

@ -482,13 +482,23 @@ method publish*(g: GossipSub,
if topic in g.topics: # if we're subscribed use the mesh if topic in g.topics: # if we're subscribed use the mesh
peers.incl(g.mesh.getOrDefault(topic)) peers.incl(g.mesh.getOrDefault(topic))
else: # not subscribed, send to fanout peers
# try optimistically if peers.len < g.parameters.dLow and g.parameters.floodPublish == false:
peers.incl(g.fanout.getOrDefault(topic)) # not subscribed or bad mesh, send to fanout peers
if peers.len == 0: # disable for floodPublish, since we already sent to every good peer
# ok we had nothing.. let's try replenish inline #
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
if fanoutPeers.len == 0:
g.replenishFanout(topic) g.replenishFanout(topic)
peers.incl(g.fanout.getOrDefault(topic)) fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
fanoutPeers.shuffle()
if fanoutPeers.len + peers.len > g.parameters.d:
fanoutPeers.setLen(g.parameters.d - peers.len)
for fanPeer in fanoutPeers:
peers.incl(fanPeer)
if peers.len > g.parameters.d: break
# even if we couldn't publish, # even if we couldn't publish,
# we still attempted to publish # we still attempted to publish

View File

@ -489,9 +489,11 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [Defect].} =
logScope: topic logScope: topic
trace "about to replenish fanout" trace "about to replenish fanout"
let currentMesh = g.mesh.getOrDefault(topic)
if g.fanout.peers(topic) < g.parameters.dLow: if g.fanout.peers(topic) < g.parameters.dLow:
trace "replenishing fanout", peers = g.fanout.peers(topic) trace "replenishing fanout", peers = g.fanout.peers(topic)
for peer in g.gossipsub.getOrDefault(topic): for peer in g.gossipsub.getOrDefault(topic):
if peer in currentMesh: continue
if g.fanout.addPeer(topic, peer): if g.fanout.addPeer(topic, peer):
if g.fanout.peers(topic) == g.parameters.d: if g.fanout.peers(topic) == g.parameters.d:
break break

View File

@ -565,6 +565,71 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat()) await allFuturesThrowing(nodesFut.concat())
check observed == 2 check observed == 2
asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic":
var passed = newFuture[void]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
passed.complete()
let
nodes = generateNodes(
2,
gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
nodes[1].subscribe("foobar", handler)
nodes[0].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
await waitSub(nodes[1], nodes[0], "foobar")
nodes[0].unsubscribe("foobar", handler)
let gsNode = GossipSub(nodes[1])
check await checkExpiring(gsNode.mesh.getOrDefault("foobar").len == 0)
nodes[0].subscribe("foobar", handler)
check GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
check:
GossipSub(nodes[0]).fanout.getOrDefault("foobar").len > 0
GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
await passed.wait(2.seconds)
trace "test done, stopping..."
await nodes[0].stop()
await nodes[1].stop()
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub send over mesh A -> B": asyncTest "e2e - GossipSub send over mesh A -> B":
var passed: Future[bool] = newFuture[bool]() var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =