Implement flood publish with score test

This commit is contained in:
Alejandro Cabeza Romero 2024-09-04 18:35:28 +02:00
parent a2d2e5d287
commit aac457ec5b
No known key found for this signature in database
GPG Key ID: DA3D14AE478030FD
2 changed files with 53 additions and 1 deletions

View File

@ -1422,3 +1422,54 @@ suite "Gossipsub Parameters":
(await handlerFuture2.waitForResult()).isOk (await handlerFuture2.waitForResult()).isOk
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
asyncTest "flood publish to all peers with score above threshold, regardless of subscription":
# Given 3 nodes
let
numberOfNodes = 3
topic = "foobar"
nodes = generateNodes(numberOfNodes, gossip = true, floodPublish = true)
nodesFut = nodes.mapIt(it.switch.start())
g0 = GossipSub(nodes[0])
# Nodes 1 and 2 are connected to node 0
await nodes[0].switch.connect(nodes[1].peerInfo.peerId, nodes[1].peerInfo.addrs)
await nodes[0].switch.connect(nodes[2].peerInfo.peerId, nodes[2].peerInfo.addrs)
# Given 2 handlers
var
handlerFut1 = newFuture[bool]()
handlerFut2 = newFuture[bool]()
proc handler1(topic: string, data: seq[byte]) {.async.} =
handlerFut1.complete(true)
proc handler2(topic: string, data: seq[byte]) {.async.} =
handlerFut2.complete(true)
# Nodes are subscribed to the same topic
nodes[1].subscribe(topic, handler1)
nodes[2].subscribe(topic, handler2)
await sleepAsync(1.seconds)
# Given node 2's score is below the threshold
for peer in g0.gossipsub.getOrDefault(topic):
if peer.peerId == nodes[2].peerInfo.peerId:
peer.score = (g0.parameters.publishThreshold - 1)
# When node 0 publishes a message to topic "foo"
let message = "Hello!".toBytes()
check (await nodes[0].publish(topic, message)) > 0
await sleepAsync(3.seconds)
# Then only node 1 should receive the message
let
result1 = await handlerFut1.waitForResult(DURATION_TIMEOUT)
result2 = await handlerFut2.waitForResult(DURATION_TIMEOUT)
check:
result1.isOk and result1.get == true
result2.isErr
# Cleanup
await allFuturesThrowing(nodes.mapIt(it.switch.stop()))
await allFuturesThrowing(nodesFut)

View File

@ -80,6 +80,7 @@ proc generateNodes*(
overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] = overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] =
Opt.none(tuple[bytes: int, interval: Duration]), Opt.none(tuple[bytes: int, interval: Duration]),
gossipSubVersion: string = "", gossipSubVersion: string = "",
floodPublish: bool = false,
dValues: Option[DValues] = DValues.none(), dValues: Option[DValues] = DValues.none(),
): seq[PubSub] = ): seq[PubSub] =
for i in 0 ..< num: for i in 0 ..< num:
@ -98,7 +99,7 @@ proc generateNodes*(
maxMessageSize = maxMessageSize, maxMessageSize = maxMessageSize,
parameters = ( parameters = (
var p = GossipSubParams.init() var p = GossipSubParams.init()
p.floodPublish = false p.floodPublish = floodPublish
p.historyLength = 20 p.historyLength = 20
p.historyGossip = 20 p.historyGossip = 20
p.unsubscribeBackoff = unsubscribeBackoff p.unsubscribeBackoff = unsubscribeBackoff