Implement gossip adaptive dissemination tests.

This commit is contained in:
Alejandro Cabeza Romero 2024-09-04 22:05:16 +02:00
parent aac457ec5b
commit e6a06dcf32
No known key found for this signature in database
GPG Key ID: DA3D14AE478030FD
2 changed files with 162 additions and 4 deletions

View File

@ -1337,7 +1337,7 @@ suite "Gossipsub Parameters":
nodesFut = await allFinished(nodes.mapIt(it.switch.start())) nodesFut = await allFinished(nodes.mapIt(it.switch.start()))
# All of them are checking for iHave messages # All of them are checking for iHave messages
var receivedIHaves: seq[bool] = repeat(false, numberOfNodes) var receivedIHaves: seq[int] = repeat(0, numberOfNodes)
for i in 0 ..< numberOfNodes: for i in 0 ..< numberOfNodes:
var pubsubObserver: PubSubObserver var pubsubObserver: PubSubObserver
capture i: capture i:
@ -1347,8 +1347,7 @@ suite "Gossipsub Parameters":
if iHave.len > 0: if iHave.len > 0:
for msg in iHave: for msg in iHave:
if msg.topicID == topic: if msg.topicID == topic:
receivedIHaves[i] = true receivedIHaves[i] += 1
break
pubsubObserver = PubSubObserver(onRecv: checkForIhaves) pubsubObserver = PubSubObserver(onRecv: checkForIhaves)
@ -1369,7 +1368,7 @@ suite "Gossipsub Parameters":
# At least one of the nodes should have received an iHave message # At least one of the nodes should have received an iHave message
# The check is made this way because the mesh structure changes from run to run # The check is made this way because the mesh structure changes from run to run
check: check:
anyIt(receivedIHaves, it == true) anyIt(receivedIHaves, it > 0)
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
@ -1473,3 +1472,158 @@ suite "Gossipsub Parameters":
# Cleanup # Cleanup
await allFuturesThrowing(nodes.mapIt(it.switch.stop())) await allFuturesThrowing(nodes.mapIt(it.switch.stop()))
await allFuturesThrowing(nodesFut) await allFuturesThrowing(nodesFut)
asyncTest "adaptive gossip dissemination, dLazy and gossipFactor to 0":
# Given 20 nodes
let
numberOfNodes = 20
topic = "foobar"
dValues = DValues(
dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1), dLazy: some(0)
)
nodes = generateNodes(
numberOfNodes,
gossip = true,
dValues = some(dValues),
gossipFactor = some(0.float),
)
nodesFut = await allFinished(nodes.mapIt(it.switch.start()))
# All of them are checking for iHave messages
var receivedIHaves: seq[int] = repeat(0, numberOfNodes)
for i in 0 ..< numberOfNodes:
var pubsubObserver: PubSubObserver
capture i:
let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) =
if msgs.control.isSome:
let iHave = msgs.control.get.ihave
if iHave.len > 0:
for msg in iHave:
if msg.topicID == topic:
receivedIHaves[i] += 1
pubsubObserver = PubSubObserver(onRecv: checkForIhaves)
nodes[i].addObserver(pubsubObserver)
# All of them are connected to node 0
for i in 1 ..< numberOfNodes:
await nodes[0].switch.connect(nodes[i].peerInfo.peerId, nodes[i].peerInfo.addrs)
# And subscribed to the same topic
for node in nodes:
node.subscribe(topic, voidTopicHandler)
await sleepAsync(DURATION_TIMEOUT)
# When node 0 sends a message
discard nodes[0].publish(topic, "Hello!".toBytes())
await sleepAsync(DURATION_TIMEOUT)
# None of the nodes should have received an iHave message
check:
filterIt(receivedIHaves, it > 0).len == 0
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
asyncTest "adaptive gossip dissemination, with gossipFactor priority":
# Given 20 nodes
let
numberOfNodes = 20
topic = "foobar"
dValues = DValues(
dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1), dLazy: some(4)
)
nodes = generateNodes(
numberOfNodes, gossip = true, dValues = some(dValues), gossipFactor = some(0.5)
)
nodesFut = await allFinished(nodes.mapIt(it.switch.start()))
# All of them are checking for iHave messages
var receivedIHaves: seq[int] = repeat(0, numberOfNodes)
for i in 0 ..< numberOfNodes:
var pubsubObserver: PubSubObserver
capture i:
let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) =
if msgs.control.isSome:
let iHave = msgs.control.get.ihave
if iHave.len > 0:
for msg in iHave:
if msg.topicID == topic:
receivedIHaves[i] += 1
pubsubObserver = PubSubObserver(onRecv: checkForIhaves)
nodes[i].addObserver(pubsubObserver)
# All of them are connected to node 0
for i in 1 ..< numberOfNodes:
await nodes[0].switch.connect(nodes[i].peerInfo.peerId, nodes[i].peerInfo.addrs)
# And subscribed to the same topic
for node in nodes:
node.subscribe(topic, voidTopicHandler)
await sleepAsync(DURATION_TIMEOUT)
# When node 0 sends a message
discard nodes[0].publish(topic, "Hello!".toBytes())
await sleepAsync(DURATION_TIMEOUT)
# At least 8 of the nodes should have received an iHave message
# That's because the gossip factor is 0.5 over 16 available nodes
check:
filterIt(receivedIHaves, it > 0).len >= 8
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
asyncTest "adaptive gossip dissemination, with dLazy priority":
# Given 20 nodes
let
numberOfNodes = 20
topic = "foobar"
dValues = DValues(
dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1), dLazy: some(6)
)
nodes = generateNodes(
numberOfNodes,
gossip = true,
dValues = some(dValues),
gossipFactor = some(0.float),
)
nodesFut = await allFinished(nodes.mapIt(it.switch.start()))
# All of them are checking for iHave messages
var receivedIHaves: seq[int] = repeat(0, numberOfNodes)
for i in 0 ..< numberOfNodes:
var pubsubObserver: PubSubObserver
capture i:
let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) =
if msgs.control.isSome:
let iHave = msgs.control.get.ihave
if iHave.len > 0:
for msg in iHave:
if msg.topicID == topic:
receivedIHaves[i] += 1
pubsubObserver = PubSubObserver(onRecv: checkForIhaves)
nodes[i].addObserver(pubsubObserver)
# All of them are connected to node 0
for i in 1 ..< numberOfNodes:
await nodes[0].switch.connect(nodes[i].peerInfo.peerId, nodes[i].peerInfo.addrs)
# And subscribed to the same topic
for node in nodes:
node.subscribe(topic, voidTopicHandler)
await sleepAsync(DURATION_TIMEOUT)
# When node 0 sends a message
discard nodes[0].publish(topic, "Hello!".toBytes())
await sleepAsync(DURATION_TIMEOUT)
# At least 6 of the nodes should have received an iHave message
# That's because the dLazy is 6
check:
filterIt(receivedIHaves, it > 0).len == 6
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))

View File

@ -82,6 +82,7 @@ proc generateNodes*(
gossipSubVersion: string = "", gossipSubVersion: string = "",
floodPublish: bool = false, floodPublish: bool = false,
dValues: Option[DValues] = DValues.none(), dValues: Option[DValues] = DValues.none(),
gossipFactor: Option[float] = float.none(),
): seq[PubSub] = ): seq[PubSub] =
for i in 0 ..< num: for i in 0 ..< num:
let switch = newStandardSwitch( let switch = newStandardSwitch(
@ -106,6 +107,9 @@ proc generateNodes*(
p.enablePX = enablePX p.enablePX = enablePX
p.overheadRateLimit = overheadRateLimit p.overheadRateLimit = overheadRateLimit
if gossipFactor.isSome:
p.gossipFactor = gossipFactor.get
if dValues.isSome: if dValues.isSome:
let dValuesSome = dValues.get let dValuesSome = dValues.get