Add multiple simultaneous control messages handling test.
This commit is contained in:
parent
0dc653abe4
commit
94fdb8d25e
|
@ -1690,3 +1690,112 @@ suite "Gossipsub Parameters":
|
|||
(await iDontWantReceived2.waitForResult()).isOk
|
||||
|
||||
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
|
||||
|
||||
asyncTest "Ensure handling of multiple Control messages simultaneously":
|
||||
# Given 2 nodes
|
||||
let
|
||||
topic = "foo"
|
||||
messageID = @[0'u8, 1, 2, 3]
|
||||
ihaveMessage =
|
||||
ControlMessage(ihave: @[ControlIHave(topicID: topic, messageIDs: @[messageID])])
|
||||
iwantMessage = ControlMessage(iwant: @[ControlIWant(messageIDs: @[messageID])])
|
||||
graftMessage = ControlMessage(graft: @[ControlGraft(topicID: topic)])
|
||||
pruneMessage = ControlMessage(
|
||||
prune:
|
||||
@[
|
||||
ControlPrune(
|
||||
topicID: topic,
|
||||
peers: @[PeerInfoMsg(peerId: PeerId(data: newSeq[byte](10)))],
|
||||
backoff: 60'u64,
|
||||
)
|
||||
]
|
||||
)
|
||||
iDontWantMessage =
|
||||
ControlMessage(idontwant: @[ControlIWant(messageIDs: @[messageID])])
|
||||
|
||||
numberOfNodes = 2
|
||||
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
|
||||
nodesFut = nodes.mapIt(it.switch.start())
|
||||
n0 = nodes[0]
|
||||
n1 = nodes[1]
|
||||
g0 = GossipSub(n0)
|
||||
g1 = GossipSub(n1)
|
||||
tg0 = cast[TestGossipSub](g0)
|
||||
tg1 = cast[TestGossipSub](g1)
|
||||
|
||||
# Setup an observer for node1
|
||||
# All of them are checking for iHave messages
|
||||
var
|
||||
receivedIHaves: int = 0
|
||||
receivedIWants: int = 0
|
||||
receivedGrafts: int = 0
|
||||
receivedPrunes: int = 0
|
||||
receivedIDontWants: int = 0
|
||||
|
||||
let observeControlMessages = proc(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||
if msgs.control.isSome:
|
||||
let
|
||||
iHave = msgs.control.get.ihave
|
||||
iWant = msgs.control.get.iwant
|
||||
graft = msgs.control.get.graft
|
||||
prune = msgs.control.get.prune
|
||||
iDontWant = msgs.control.get.idontwant
|
||||
|
||||
for msg in iHave:
|
||||
if msg.topicID == topic:
|
||||
receivedIHaves += 1
|
||||
for msg in iWant:
|
||||
for msgId in msg.messageIDs:
|
||||
if msgId == messageID:
|
||||
receivedIWants += 1
|
||||
for msg in graft:
|
||||
if msg.topicID == topic:
|
||||
receivedGrafts += 1
|
||||
for msg in prune:
|
||||
if msg.topicID == topic:
|
||||
receivedPrunes += 1
|
||||
for msg in iDontWant:
|
||||
for msgId in msg.messageIDs:
|
||||
if msgId == messageID:
|
||||
receivedIDontWants += 1
|
||||
|
||||
n1.addObserver(PubSubObserver(onRecv: observeControlMessages))
|
||||
|
||||
# Connect them
|
||||
await n0.switch.connect(n1.peerInfo.peerId, n1.peerInfo.addrs)
|
||||
|
||||
# Subscribe them to the same topic
|
||||
for node in nodes:
|
||||
node.subscribe(topic, voidTopicHandler)
|
||||
await waitSubGraph(nodes, topic)
|
||||
|
||||
# When node 0 sends multiple Control messages
|
||||
g0.broadcast(
|
||||
g0.mesh[topic], RPCMsg(control: some(ihaveMessage)), isHighPriority = true
|
||||
)
|
||||
g0.broadcast(
|
||||
g0.mesh[topic], RPCMsg(control: some(iwantMessage)), isHighPriority = true
|
||||
)
|
||||
g0.broadcast(
|
||||
g0.mesh[topic], RPCMsg(control: some(graftMessage)), isHighPriority = true
|
||||
)
|
||||
g0.broadcast(
|
||||
g0.mesh[topic], RPCMsg(control: some(pruneMessage)), isHighPriority = true
|
||||
)
|
||||
g0.broadcast(
|
||||
g0.mesh[topic], RPCMsg(control: some(iDontWantMessage)), isHighPriority = true
|
||||
)
|
||||
await sleepAsync(DURATION_TIMEOUT)
|
||||
|
||||
# Then node 1 should have received all of them
|
||||
|
||||
check:
|
||||
receivedIHaves == 1
|
||||
receivedIWants == 1
|
||||
receivedGrafts == 2 # 1 from the initial subscription and 1 from the graft message
|
||||
receivedPrunes == 1
|
||||
receivedIDontWants == 1
|
||||
|
||||
# Cleanup
|
||||
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
|
||||
await allFuturesThrowing(nodesFut)
|
||||
|
|
Loading…
Reference in New Issue