More gossip coverage (#553)

* add floodPublish test

* test delivery via control Iwant/have mechanics

* fix issues in control, and add testing

* fix possible backoff issue with pruned routine overriding it
This commit is contained in:
Giovanni Petrantoni 2021-04-22 18:51:22 +09:00 committed by GitHub
parent e285d8bbf4
commit f81a085d0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 468 additions and 230 deletions

View File

@ -251,8 +251,11 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) =
respControl.prune.add(g.handleGraft(peer, control.graft)) respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant) let messages = g.handleIWant(peer, control.iwant)
if respControl.graft.len > 0 or respControl.prune.len > 0 or if
respControl.ihave.len > 0 or messages.len > 0: respControl.graft.len > 0 or
respControl.prune.len > 0 or
respControl.iwant.len > 0 or
messages.len > 0:
# iwant and prunes from here, also messages # iwant and prunes from here, also messages
for smsg in messages: for smsg in messages:
@ -506,7 +509,7 @@ method publish*(g: GossipSub,
else: else:
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"]) libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"])
trace "Published message to peers" trace "Published message to peers", peers=peers.len
return peers.len return peers.len

View File

@ -39,7 +39,8 @@ proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} =
trace "grafted", peer=p, topic trace "grafted", peer=p, topic
proc pruned*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} = proc pruned*(g: GossipSub, p: PubSubPeer, topic: string, setBackoff: bool = true) {.raises: [Defect].} =
if setBackoff:
let backoff = Moment.fromNow(g.parameters.pruneBackoff) let backoff = Moment.fromNow(g.parameters.pruneBackoff)
g.backingOff g.backingOff
.mgetOrPut(topic, initTable[PeerID, Moment]())[p.peerId] = backoff .mgetOrPut(topic, initTable[PeerID, Moment]())[p.peerId] = backoff
@ -80,6 +81,7 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises:
proc handleGraft*(g: GossipSub, proc handleGraft*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows
var prunes: seq[ControlPrune]
for graft in grafts: for graft in grafts:
let topic = graft.topicID let topic = graft.topicID
trace "peer grafted topic", peer, topic trace "peer grafted topic", peer, topic
@ -90,7 +92,7 @@ proc handleGraft*(g: GossipSub,
warn "attempt to graft an explicit peer, peering agreements should be reciprocal", warn "attempt to graft an explicit peer, peering agreements should be reciprocal",
peer, topic peer, topic
# and such an attempt should be logged and rejected with a PRUNE # and such an attempt should be logged and rejected with a PRUNE
result.add(ControlPrune( prunes.add(ControlPrune(
topicID: topic, topicID: topic,
peers: @[], # omitting heavy computation here as the remote did something illegal peers: @[], # omitting heavy computation here as the remote did something illegal
backoff: g.parameters.pruneBackoff.seconds.uint64)) backoff: g.parameters.pruneBackoff.seconds.uint64))
@ -108,7 +110,7 @@ proc handleGraft*(g: GossipSub,
.getOrDefault(peer.peerId) > Moment.now(): .getOrDefault(peer.peerId) > Moment.now():
debug "attempt to graft a backingOff peer", peer, topic debug "attempt to graft a backingOff peer", peer, topic
# and such an attempt should be logged and rejected with a PRUNE # and such an attempt should be logged and rejected with a PRUNE
result.add(ControlPrune( prunes.add(ControlPrune(
topicID: topic, topicID: topic,
peers: @[], # omitting heavy computation here as the remote did something illegal peers: @[], # omitting heavy computation here as the remote did something illegal
backoff: g.parameters.pruneBackoff.seconds.uint64)) backoff: g.parameters.pruneBackoff.seconds.uint64))
@ -141,7 +143,7 @@ proc handleGraft*(g: GossipSub,
else: else:
trace "pruning grafting peer, mesh full", trace "pruning grafting peer, mesh full",
peer, topic, score = peer.score, mesh = g.mesh.peers(topic) peer, topic, score = peer.score, mesh = g.mesh.peers(topic)
result.add(ControlPrune( prunes.add(ControlPrune(
topicID: topic, topicID: topic,
peers: g.peerExchangeList(topic), peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)) backoff: g.parameters.pruneBackoff.seconds.uint64))
@ -149,6 +151,8 @@ proc handleGraft*(g: GossipSub,
trace "peer grafting topic we're not interested in", peer, topic trace "peer grafting topic we're not interested in", peer, topic
# gossip 1.1, we do not send a control message prune anymore # gossip 1.1, we do not send a control message prune anymore
return prunes
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [Defect].} = proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [Defect].} =
for prune in prunes: for prune in prunes:
let topic = prune.topicID let topic = prune.topicID
@ -173,7 +177,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r
.mgetOrPut(topic, initTable[PeerID, Moment]())[peer.peerId] = backoff .mgetOrPut(topic, initTable[PeerID, Moment]())[peer.peerId] = backoff
trace "pruning rpc received peer", peer, score = peer.score trace "pruning rpc received peer", peer, score = peer.score
g.pruned(peer, topic) g.pruned(peer, topic, setBackoff = false)
g.mesh.removePeer(topic, peer) g.mesh.removePeer(topic, peer)
# TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that # TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that
@ -183,6 +187,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r
proc handleIHave*(g: GossipSub, proc handleIHave*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant {.raises: [Defect].} = ihaves: seq[ControlIHave]): ControlIWant {.raises: [Defect].} =
var res: ControlIWant
if peer.score < g.parameters.gossipThreshold: if peer.score < g.parameters.gossipThreshold:
trace "ihave: ignoring low score peer", peer, score = peer.score trace "ihave: ignoring low score peer", peer, score = peer.score
elif peer.iHaveBudget <= 0: elif peer.iHaveBudget <= 0:
@ -201,18 +206,20 @@ proc handleIHave*(g: GossipSub,
for msgId in deIhavesMsgs: for msgId in deIhavesMsgs:
if not g.hasSeen(msgId): if not g.hasSeen(msgId):
if peer.iHaveBudget > 0: if peer.iHaveBudget > 0:
result.messageIDs.add(msgId) res.messageIDs.add(msgId)
dec peer.iHaveBudget dec peer.iHaveBudget
trace "requested message via ihave", messageID=msgId
else: else:
return break
# shuffling res.messageIDs before sending it out to increase the likelihood
# shuffling result.messageIDs before sending it out to increase the likelihood
# of getting an answer if the peer truncates the list due to internal size restrictions. # of getting an answer if the peer truncates the list due to internal size restrictions.
shuffle(result.messageIDs) shuffle(res.messageIDs)
return res
proc handleIWant*(g: GossipSub, proc handleIWant*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] {.raises: [Defect].} = iwants: seq[ControlIWant]): seq[Message] {.raises: [Defect].} =
var messages: seq[Message]
if peer.score < g.parameters.gossipThreshold: if peer.score < g.parameters.gossipThreshold:
trace "iwant: ignoring low score peer", peer, score = peer.score trace "iwant: ignoring low score peer", peer, score = peer.score
elif peer.iWantBudget <= 0: elif peer.iWantBudget <= 0:
@ -227,10 +234,11 @@ proc handleIWant*(g: GossipSub,
if msg.isSome: if msg.isSome:
# avoid spam # avoid spam
if peer.iWantBudget > 0: if peer.iWantBudget > 0:
result.add(msg.get()) messages.add(msg.get())
dec peer.iWantBudget dec peer.iWantBudget
else: else:
return break
return messages
proc commitMetrics(metrics: var MeshMetrics) {.raises: [Defect].} = proc commitMetrics(metrics: var MeshMetrics) {.raises: [Defect].} =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics) libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
@ -485,9 +493,10 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
## ##
var cacheWindowSize = 0 var cacheWindowSize = 0
var control: Table[PubSubPeer, ControlMessage]
trace "getting gossip peers (iHave)"
let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys))
trace "getting gossip peers (iHave)", ntopics=topics.len
for topic in topics: for topic in topics:
if topic notin g.gossipsub: if topic notin g.gossipsub:
trace "topic not in gossip array, skipping", topicID = topic trace "topic not in gossip array, skipping", topicID = topic
@ -495,12 +504,15 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
let mids = g.mcache.window(topic) let mids = g.mcache.window(topic)
if not(mids.len > 0): if not(mids.len > 0):
trace "no messages to emit"
continue continue
var midsSeq = toSeq(mids) var midsSeq = toSeq(mids)
cacheWindowSize += midsSeq.len cacheWindowSize += midsSeq.len
trace "got messages to emit", size=midsSeq.len
# not in spec # not in spec
# similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101 # similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101
# and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582 # and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582
@ -530,10 +542,12 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
allPeers.setLen(target) allPeers.setLen(target)
for peer in allPeers: for peer in allPeers:
result.mGetOrPut(peer, ControlMessage()).ihave.add(ihave) control.mGetOrPut(peer, ControlMessage()).ihave.add(ihave)
libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64) libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64)
return control
proc onHeartbeat(g: GossipSub) {.raises: [Defect].} = proc onHeartbeat(g: GossipSub) {.raises: [Defect].} =
# reset IWANT budget # reset IWANT budget
# reset IHAVE cap # reset IHAVE cap

View File

@ -563,6 +563,43 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop() await gossipSub.switch.stop()
asyncTest "rebalanceMesh fail due to backoff - remote":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0..<15:
let conn = newBufferStream(noop)
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.sendConn = conn
gossipSub.gossipsub[topic].incl(peer)
gossipSub.mesh[topic].incl(peer)
check gossipSub.peers.len == 15
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len != 0
for i in 0..<15:
let peerInfo = conns[i].peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.handlePrune(peer, @[ControlPrune(
topicID: topic,
peers: @[],
backoff: gossipSub.parameters.pruneBackoff.seconds.uint64
)])
# expect topic cleaned up since they are all pruned
check topic notin gossipSub.mesh
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "rebalanceMesh Degree Hi - audit scenario": asyncTest "rebalanceMesh Degree Hi - audit scenario":
let gossipSub = TestGossipSub.init(newStandardSwitch()) let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar" let topic = "foobar"

View File

@ -556,6 +556,63 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat()) await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub send over floodPublish A -> B":
var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
passed.complete(true)
let
nodes = generateNodes(
2,
gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
var gossip1: GossipSub = GossipSub(nodes[0])
gossip1.parameters.floodPublish = true
var gossip2: GossipSub = GossipSub(nodes[1])
gossip2.parameters.floodPublish = true
await subscribeNodes(nodes)
# nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
check await passed
check:
"foobar" in gossip1.gossipsub
"foobar" notin gossip2.gossipsub
not gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId)
not gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.peerId)
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub with multiple peers": asyncTest "e2e - GossipSub with multiple peers":
var runs = 10 var runs = 10
@ -660,212 +717,3 @@ suite "GossipSub":
it.switch.stop()))) it.switch.stop())))
await allFuturesThrowing(nodesFut) await allFuturesThrowing(nodesFut)
asyncTest "GossipSub invalid topic subscription":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete(true)
let
nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
var gossip = GossipSub(nodes[0])
let invalidDetected = newFuture[void]()
gossip.subscriptionValidator =
proc(topic: string): bool =
if topic == "foobar":
try:
invalidDetected.complete()
except:
raise newException(Defect, "Exception during subscriptionValidator")
false
else:
true
await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await invalidDetected.wait(10.seconds)
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub test directPeers":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete(true)
let
nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
var gossip = GossipSub(nodes[0])
gossip.parameters.directPeers[nodes[1].switch.peerInfo.peerId] = nodes[1].switch.peerInfo.addrs
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
let invalidDetected = newFuture[void]()
gossip.subscriptionValidator =
proc(topic: string): bool =
if topic == "foobar":
try:
invalidDetected.complete()
except:
raise newException(Defect, "Exception during subscriptionValidator")
false
else:
true
# DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN
### await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await invalidDetected.wait(10.seconds)
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipsSub peers disconnections mechanics":
var runs = 10
let
nodes = generateNodes(runs, gossip = true, triggerSelf = true)
nodesFut = nodes.mapIt(it.switch.start())
await allFuturesThrowing(nodes.mapIt(it.start()))
await subscribeNodes(nodes)
var seen: Table[string, int]
var seenFut = newFuture[void]()
for i in 0..<nodes.len:
let dialer = nodes[i]
var handler: TopicHandler
closureScope:
var peerName = $dialer.peerInfo.peerId
handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} =
if peerName notin seen:
seen[peerName] = 0
seen[peerName].inc
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
seenFut.complete()
dialer.subscribe("foobar", handler)
await waitSub(nodes[0], dialer, "foobar")
# ensure peer stats are stored properly and kept properly
check:
GossipSub(nodes[0]).peerStats.len == runs - 1 # minus self
tryPublish await wait(nodes[0].publish("foobar",
toBytes("from node " &
$nodes[0].peerInfo.peerId)),
1.minutes), 1, 5.seconds
await wait(seenFut, 5.minutes)
check: seen.len >= runs
for k, v in seen.pairs:
check: v >= 1
for node in nodes:
var gossip = GossipSub(node)
check:
"foobar" in gossip.gossipsub
gossip.fanout.len == 0
gossip.mesh["foobar"].len > 0
# Removing some subscriptions
for i in 0..<runs:
if i mod 3 != 0:
nodes[i].unsubscribeAll("foobar")
# Waiting 2 heartbeats
for _ in 0..1:
for i in 0..<runs:
if i mod 3 == 0:
let evnt = newAsyncEvent()
GossipSub(nodes[i]).heartbeatEvents &= evnt
await evnt.wait()
# ensure peer stats are stored properly and kept properly
check:
GossipSub(nodes[0]).peerStats.len == runs - 1 # minus self
# Adding again subscriptions
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
for i in 0..<runs:
if i mod 3 != 0:
nodes[i].subscribe("foobar", handler)
# Waiting 2 heartbeats
for _ in 0..1:
for i in 0..<runs:
if i mod 3 == 0:
let evnt = newAsyncEvent()
GossipSub(nodes[i]).heartbeatEvents &= evnt
await evnt.wait()
# ensure peer stats are stored properly and kept properly
check:
GossipSub(nodes[0]).peerStats.len == runs - 1 # minus self
await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))
await allFuturesThrowing(nodesFut)

View File

@ -0,0 +1,335 @@
## Nim-Libp2p
## Copyright (c) 2018 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.used.}
import unittest, sequtils, options, tables, sets
import chronos, stew/byteutils
import chronicles
import utils, ../../libp2p/[errors,
peerid,
peerinfo,
stream/connection,
stream/bufferstream,
crypto/crypto,
protocols/pubsub/pubsub,
protocols/pubsub/gossipsub,
protocols/pubsub/pubsubpeer,
protocols/pubsub/peertable,
protocols/pubsub/rpc/messages]
import ../helpers
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
if sender == receiver:
return
# turn things deterministic
# this is for testing purposes only
# peers can be inside `mesh` and `fanout`, not just `gossipsub`
var ceil = 15
let fsub = GossipSub(sender)
let ev = newAsyncEvent()
fsub.heartbeatEvents.add(ev)
# await first heartbeat
await ev.wait()
ev.clear()
while (not fsub.gossipsub.hasKey(key) or
not fsub.gossipsub.hasPeerID(key, receiver.peerInfo.peerId)) and
(not fsub.mesh.hasKey(key) or
not fsub.mesh.hasPeerID(key, receiver.peerInfo.peerId)) and
(not fsub.fanout.hasKey(key) or
not fsub.fanout.hasPeerID(key , receiver.peerInfo.peerId)):
trace "waitSub sleeping..."
# await more heartbeats
await ev.wait()
ev.clear()
dec ceil
doAssert(ceil > 0, "waitSub timeout!")
template tryPublish(call: untyped, require: int, wait: Duration = 1.seconds, times: int = 10): untyped =
var
limit = times
pubs = 0
while pubs < require and limit > 0:
pubs = pubs + call
await sleepAsync(wait)
limit.dec()
if limit == 0:
doAssert(false, "Failed to publish!")
suite "GossipSub":
teardown:
checkTrackers()
asyncTest "e2e - GossipSub with multiple peers - control deliver (sparse)":
var runs = 10
let
nodes = generateNodes(runs, gossip = true, triggerSelf = true)
nodesFut = nodes.mapIt(it.switch.start())
await allFuturesThrowing(nodes.mapIt(it.start()))
await subscribeSparseNodes(nodes)
var seen: Table[string, int]
var seenFut = newFuture[void]()
for i in 0..<nodes.len:
let dialer = nodes[i]
let dgossip = GossipSub(dialer)
dgossip.parameters.dHigh = 2
dgossip.parameters.dLow = 1
dgossip.parameters.d = 1
dgossip.parameters.dOut = 1
var handler: TopicHandler
closureScope:
var peerName = $dialer.peerInfo.peerId
handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} =
if peerName notin seen:
seen[peerName] = 0
seen[peerName].inc
info "seen up", count=seen.len
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
seenFut.complete()
dialer.subscribe("foobar", handler)
await waitSub(nodes[0], dialer, "foobar")
# we want to test ping pong deliveries via control Iwant/Ihave, so we publish just in a tap
let publishedTo = nodes[0]
.publish("foobar", toBytes("from node " & $nodes[0].peerInfo.peerId))
.await
check:
publishedTo != 0
publishedTo != runs
await wait(seenFut, 5.minutes)
check: seen.len >= runs
for k, v in seen.pairs:
check: v >= 1
await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))
await allFuturesThrowing(nodesFut)
asyncTest "GossipSub invalid topic subscription":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete(true)
let
nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
var gossip = GossipSub(nodes[0])
let invalidDetected = newFuture[void]()
gossip.subscriptionValidator =
proc(topic: string): bool =
if topic == "foobar":
try:
invalidDetected.complete()
except:
raise newException(Defect, "Exception during subscriptionValidator")
false
else:
true
await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await invalidDetected.wait(10.seconds)
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub test directPeers":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete(true)
let
nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
var gossip = GossipSub(nodes[0])
gossip.parameters.directPeers[nodes[1].switch.peerInfo.peerId] = nodes[1].switch.peerInfo.addrs
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
let invalidDetected = newFuture[void]()
gossip.subscriptionValidator =
proc(topic: string): bool =
if topic == "foobar":
try:
invalidDetected.complete()
except:
raise newException(Defect, "Exception during subscriptionValidator")
false
else:
true
# DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN
### await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await invalidDetected.wait(10.seconds)
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipsSub peers disconnections mechanics":
var runs = 10
let
nodes = generateNodes(runs, gossip = true, triggerSelf = true)
nodesFut = nodes.mapIt(it.switch.start())
await allFuturesThrowing(nodes.mapIt(it.start()))
await subscribeNodes(nodes)
var seen: Table[string, int]
var seenFut = newFuture[void]()
for i in 0..<nodes.len:
let dialer = nodes[i]
var handler: TopicHandler
closureScope:
var peerName = $dialer.peerInfo.peerId
handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} =
if peerName notin seen:
seen[peerName] = 0
seen[peerName].inc
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
seenFut.complete()
dialer.subscribe("foobar", handler)
await waitSub(nodes[0], dialer, "foobar")
# ensure peer stats are stored properly and kept properly
check:
GossipSub(nodes[0]).peerStats.len == runs - 1 # minus self
tryPublish await wait(nodes[0].publish("foobar",
toBytes("from node " &
$nodes[0].peerInfo.peerId)),
1.minutes), 1, 5.seconds
await wait(seenFut, 5.minutes)
check: seen.len >= runs
for k, v in seen.pairs:
check: v >= 1
for node in nodes:
var gossip = GossipSub(node)
check:
"foobar" in gossip.gossipsub
gossip.fanout.len == 0
gossip.mesh["foobar"].len > 0
# Removing some subscriptions
for i in 0..<runs:
if i mod 3 != 0:
nodes[i].unsubscribeAll("foobar")
# Waiting 2 heartbeats
for _ in 0..1:
for i in 0..<runs:
if i mod 3 == 0:
let evnt = newAsyncEvent()
GossipSub(nodes[i]).heartbeatEvents &= evnt
await evnt.wait()
# ensure peer stats are stored properly and kept properly
check:
GossipSub(nodes[0]).peerStats.len == runs - 1 # minus self
# Adding again subscriptions
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
for i in 0..<runs:
if i mod 3 != 0:
nodes[i].subscribe("foobar", handler)
# Waiting 2 heartbeats
for _ in 0..1:
for i in 0..<runs:
if i mod 3 == 0:
let evnt = newAsyncEvent()
GossipSub(nodes[i]).heartbeatEvents &= evnt
await evnt.wait()
# ensure peer stats are stored properly and kept properly
check:
GossipSub(nodes[0]).peerStats.len == runs - 1 # minus self
await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))
await allFuturesThrowing(nodesFut)

View File

@ -3,6 +3,7 @@
import testgossipinternal, import testgossipinternal,
testfloodsub, testfloodsub,
testgossipsub, testgossipsub,
testgossipsub2,
testmcache, testmcache,
testtimedcache, testtimedcache,
testmessage testmessage

View File

@ -38,7 +38,7 @@ proc generateNodes*(
sign = sign, sign = sign,
msgIdProvider = msgIdProvider, msgIdProvider = msgIdProvider,
anonymize = anonymize, anonymize = anonymize,
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p)) parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p))
# set some testing params, to enable scores # set some testing params, to enable scores
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0 g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0 g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0