Implement gossip propagation test.
This commit is contained in:
parent
a7796bf768
commit
4bffaf1a06
|
@ -13,6 +13,7 @@ import sequtils, options, tables, sets, sugar
|
||||||
import chronos, stew/byteutils, chronos/ratelimit
|
import chronos, stew/byteutils, chronos/ratelimit
|
||||||
import chronicles
|
import chronicles
|
||||||
import metrics
|
import metrics
|
||||||
|
import std/options
|
||||||
import ../../libp2p/protocols/pubsub/gossipsub/behavior
|
import ../../libp2p/protocols/pubsub/gossipsub/behavior
|
||||||
import
|
import
|
||||||
utils,
|
utils,
|
||||||
|
@ -34,6 +35,8 @@ import
|
||||||
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
|
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
|
||||||
import ../helpers, ../utils/[async, futures, async, tests]
|
import ../helpers, ../utils/[async, futures, async, tests]
|
||||||
|
|
||||||
|
from ../../libp2p/protocols/pubsub/mcache import window
|
||||||
|
|
||||||
proc `$`(peer: PubSubPeer): string =
|
proc `$`(peer: PubSubPeer): string =
|
||||||
shortLog(peer)
|
shortLog(peer)
|
||||||
|
|
||||||
|
@ -1322,3 +1325,35 @@ suite "Gossipsub Parameters":
|
||||||
gossip.fanout.len == 0
|
gossip.fanout.len == 0
|
||||||
|
|
||||||
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
|
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
|
||||||
|
|
||||||
|
asyncTest "messages sent to peers not in the mesh are propagated via gossip":
|
||||||
|
var validatorFut = newFuture[bool]()
|
||||||
|
proc validator(
|
||||||
|
topic: string, message: Message
|
||||||
|
): Future[ValidationResult] {.async.} =
|
||||||
|
check topic == "foobar"
|
||||||
|
validatorFut.complete(true)
|
||||||
|
result = ValidationResult.Accept
|
||||||
|
|
||||||
|
let
|
||||||
|
numberOfNodes = 5
|
||||||
|
topic = "foobar"
|
||||||
|
dValues = DValues(dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1))
|
||||||
|
nodes = generateNodes(numberOfNodes, gossip = true, dValues = some(dValues))
|
||||||
|
nodesFut = await allFinished(nodes.mapIt(it.switch.start()))
|
||||||
|
|
||||||
|
await subscribeNodes(nodes)
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
node.subscribe(topic, voidTopicHandler)
|
||||||
|
|
||||||
|
# await waitForMesh(nodes[0], nodes[1], topic)
|
||||||
|
await sleepAsync(1.seconds)
|
||||||
|
|
||||||
|
# try:
|
||||||
|
let mcache = GossipSub(nodes[0]).mcache
|
||||||
|
|
||||||
|
discard nodes[0].publish(topic, "Hello!".toBytes())
|
||||||
|
await sleepAsync(3.seconds)
|
||||||
|
|
||||||
|
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
|
||||||
|
|
|
@ -6,6 +6,7 @@ const
|
||||||
|
|
||||||
import hashes, random, tables, sets, sequtils
|
import hashes, random, tables, sets, sequtils
|
||||||
import chronos, stew/[byteutils, results], chronos/ratelimit
|
import chronos, stew/[byteutils, results], chronos/ratelimit
|
||||||
|
import std/options
|
||||||
import
|
import
|
||||||
../../libp2p/[
|
../../libp2p/[
|
||||||
builders,
|
builders,
|
||||||
|
@ -24,7 +25,15 @@ export builders
|
||||||
|
|
||||||
randomize()
|
randomize()
|
||||||
|
|
||||||
type TestGossipSub* = ref object of GossipSub
|
type
|
||||||
|
TestGossipSub* = ref object of GossipSub
|
||||||
|
DValues* = object
|
||||||
|
d*: Option[int]
|
||||||
|
dLow*: Option[int]
|
||||||
|
dHigh*: Option[int]
|
||||||
|
dScore*: Option[int]
|
||||||
|
dOut*: Option[int]
|
||||||
|
dLazy*: Option[int]
|
||||||
|
|
||||||
proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer =
|
proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer =
|
||||||
proc getConn(): Future[Connection] =
|
proc getConn(): Future[Connection] =
|
||||||
|
@ -71,6 +80,7 @@ proc generateNodes*(
|
||||||
overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] =
|
overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] =
|
||||||
Opt.none(tuple[bytes: int, interval: Duration]),
|
Opt.none(tuple[bytes: int, interval: Duration]),
|
||||||
gossipSubVersion: string = "",
|
gossipSubVersion: string = "",
|
||||||
|
dValues: Option[DValues] = DValues.none(),
|
||||||
): seq[PubSub] =
|
): seq[PubSub] =
|
||||||
for i in 0 ..< num:
|
for i in 0 ..< num:
|
||||||
let switch = newStandardSwitch(
|
let switch = newStandardSwitch(
|
||||||
|
@ -94,6 +104,28 @@ proc generateNodes*(
|
||||||
p.unsubscribeBackoff = unsubscribeBackoff
|
p.unsubscribeBackoff = unsubscribeBackoff
|
||||||
p.enablePX = enablePX
|
p.enablePX = enablePX
|
||||||
p.overheadRateLimit = overheadRateLimit
|
p.overheadRateLimit = overheadRateLimit
|
||||||
|
|
||||||
|
if dValues.isSome:
|
||||||
|
let dValuesSome = dValues.get
|
||||||
|
|
||||||
|
if dValuesSome.d.isSome:
|
||||||
|
p.d = dValuesSome.d.get
|
||||||
|
|
||||||
|
if dValuesSome.dLow.isSome:
|
||||||
|
p.dLow = dValuesSome.dLow.get
|
||||||
|
|
||||||
|
if dValuesSome.dHigh.isSome:
|
||||||
|
p.dHigh = dValuesSome.dHigh.get
|
||||||
|
|
||||||
|
if dValuesSome.dScore.isSome:
|
||||||
|
p.dScore = dValuesSome.dScore.get
|
||||||
|
|
||||||
|
if dValuesSome.dOut.isSome:
|
||||||
|
p.dOut = dValuesSome.dOut.get
|
||||||
|
|
||||||
|
if dValuesSome.dLazy.isSome:
|
||||||
|
p.dLazy = dValuesSome.dLazy.get
|
||||||
|
|
||||||
p
|
p
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue