add sparse message propagation tests to gossipsub (#202)

* add sparce tests to gossipsub

* add send hooks

* remove `all`
This commit is contained in:
Dmitriy Ryajov 2020-06-02 17:53:38 -06:00 committed by GitHub
parent 285884c20c
commit bb8bff2195
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 171 additions and 79 deletions

View File

@ -79,8 +79,8 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
var msg = decodeRpcMsg(data) var msg = decodeRpcMsg(data)
trace "decoded msg from peer", peer = p.id, msg = msg.shortLog trace "decoded msg from peer", peer = p.id, msg = msg.shortLog
# trigger hooks # trigger hooks
for obs in p.observers[]: p.recvObservers(msg)
obs.onRecv(p, msg)
await p.handler(p, @[msg]) await p.handler(p, @[msg])
p.recvdRpcCache.put(digest) p.recvdRpcCache.put(digest)
finally: finally:

View File

@ -224,7 +224,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
debug "ending multistream", err = exc.msg trace "error in multistream", err = exc.msg
proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}

View File

@ -50,7 +50,7 @@ suite "FloodSub":
let let
nodes = generateNodes(2) nodes = generateNodes(2)
nodesFut = await all( nodesFut = await allFinished(
nodes[0].start(), nodes[0].start(),
nodes[1].start() nodes[1].start()
) )
@ -64,12 +64,12 @@ suite "FloodSub":
result = await completionFut.wait(5.seconds) result = await completionFut.wait(5.seconds)
await all( await allFuturesThrowing(
nodes[0].stop(), nodes[0].stop(),
nodes[1].stop() nodes[1].stop()
) )
await all(nodesFut.concat()) await allFuturesThrowing(nodesFut.concat())
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
@ -95,8 +95,8 @@ suite "FloodSub":
result = await completionFut.wait(5.seconds) result = await completionFut.wait(5.seconds)
await all(nodes[0].stop(), nodes[1].stop()) await allFuturesThrowing(nodes[0].stop(), nodes[1].stop())
await all(awaiters) await allFuturesThrowing(awaiters)
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
@ -129,8 +129,10 @@ suite "FloodSub":
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
check (await handlerFut) == true check (await handlerFut) == true
await all(nodes[0].stop(), nodes[1].stop()) await allFuturesThrowing(
await all(awaiters) nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(awaiters)
result = true result = true
check: check:
@ -160,8 +162,10 @@ suite "FloodSub":
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
await all(nodes[0].stop(), nodes[1].stop()) await allFuturesThrowing(
await all(awaiters) nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(awaiters)
result = true result = true
check: check:
@ -197,8 +201,10 @@ suite "FloodSub":
await nodes[0].publish("foo", cast[seq[byte]]("Hello!")) await nodes[0].publish("foo", cast[seq[byte]]("Hello!"))
await nodes[0].publish("bar", cast[seq[byte]]("Hello!")) await nodes[0].publish("bar", cast[seq[byte]]("Hello!"))
await all(nodes[0].stop(), nodes[1].stop()) await allFuturesThrowing(
await all(awaiters) nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(awaiters)
result = true result = true
check: check:
@ -242,16 +248,16 @@ suite "FloodSub":
for y in 0..<runs: for y in 0..<runs:
if y != i: if y != i:
subs &= waitSub(nodes[i], nodes[y], "foobar") subs &= waitSub(nodes[i], nodes[y], "foobar")
await all(subs) await allFuturesThrowing(subs)
var pubs: seq[Future[void]] var pubs: seq[Future[void]]
for i in 0..<runs: for i in 0..<runs:
pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!")) pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!"))
await all(pubs) await allFuturesThrowing(pubs)
await all(futs.mapIt(it[0])) await allFuturesThrowing(futs.mapIt(it[0]))
await all(nodes.mapIt(it.stop())) await allFuturesThrowing(nodes.mapIt(it.stop()))
await all(awaitters) await allFuturesThrowing(awaitters)
result = true result = true
check: check:
@ -296,16 +302,16 @@ suite "FloodSub":
for y in 0..<runs: for y in 0..<runs:
if y != i: if y != i:
subs &= waitSub(nodes[i], nodes[y], "foobar") subs &= waitSub(nodes[i], nodes[y], "foobar")
await all(subs) await allFuturesThrowing(subs)
var pubs: seq[Future[void]] var pubs: seq[Future[void]]
for i in 0..<runs: for i in 0..<runs:
pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!")) pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!"))
await all(pubs) await allFuturesThrowing(pubs)
await all(futs.mapIt(it[0])) await allFuturesThrowing(futs.mapIt(it[0]))
await all(nodes.mapIt(it.stop())) await allFuturesThrowing(nodes.mapIt(it.stop()))
await all(awaitters) await allFuturesThrowing(awaitters)
result = true result = true

View File

@ -42,7 +42,7 @@ suite "GossipSub internal":
await gossipSub.rebalanceMesh(topic) await gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len == GossipSubD check gossipSub.mesh[topic].len == GossipSubD
await all(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
result = true result = true
@ -71,7 +71,7 @@ suite "GossipSub internal":
await gossipSub.rebalanceMesh(topic) await gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len == GossipSubD check gossipSub.mesh[topic].len == GossipSubD
await all(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
result = true result = true
@ -103,7 +103,7 @@ suite "GossipSub internal":
await gossipSub.replenishFanout(topic) await gossipSub.replenishFanout(topic)
check gossipSub.fanout[topic].len == GossipSubD check gossipSub.fanout[topic].len == GossipSubD
await all(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
result = true result = true
@ -138,7 +138,7 @@ suite "GossipSub internal":
await gossipSub.dropFanoutPeers() await gossipSub.dropFanoutPeers()
check topic notin gossipSub.fanout check topic notin gossipSub.fanout
await all(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
result = true result = true
@ -179,7 +179,7 @@ suite "GossipSub internal":
check topic1 notin gossipSub.fanout check topic1 notin gossipSub.fanout
check topic2 in gossipSub.fanout check topic2 in gossipSub.fanout
await all(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
result = true result = true
@ -200,7 +200,7 @@ suite "GossipSub internal":
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
# geerate mesh and fanout peers # generate mesh and fanout peers
for i in 0..<30: for i in 0..<30:
let conn = newConnection(newBufferStream(noop)) let conn = newConnection(newBufferStream(noop))
conns &= conn conns &= conn
@ -242,7 +242,7 @@ suite "GossipSub internal":
check p notin gossipSub.fanout[topic] check p notin gossipSub.fanout[topic]
check p notin gossipSub.mesh[topic] check p notin gossipSub.mesh[topic]
await all(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
result = true result = true
@ -285,7 +285,7 @@ suite "GossipSub internal":
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == GossipSubD check peers.len == GossipSubD
await all(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
result = true result = true
@ -328,7 +328,7 @@ suite "GossipSub internal":
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == GossipSubD check peers.len == GossipSubD
await all(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
result = true result = true
@ -371,7 +371,7 @@ suite "GossipSub internal":
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == 0 check peers.len == 0
await all(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
result = true result = true

View File

@ -79,8 +79,10 @@ suite "GossipSub":
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
result = (await validatorFut) and (await handlerFut) result = (await validatorFut) and (await handlerFut)
await all(nodes[0].stop(), nodes[1].stop()) await allFuturesThrowing(
await all(awaiters) nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(awaiters)
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
@ -111,8 +113,10 @@ suite "GossipSub":
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
result = await validatorFut result = await validatorFut
await all(nodes[0].stop(), nodes[1].stop()) await allFuturesThrowing(
await all(awaiters) nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(awaiters)
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
@ -151,8 +155,10 @@ suite "GossipSub":
await nodes[0].publish("bar", cast[seq[byte]]("Hello!")) await nodes[0].publish("bar", cast[seq[byte]]("Hello!"))
result = ((await passed) and (await failed) and (await handlerFut)) result = ((await passed) and (await failed) and (await handlerFut))
await all(nodes[0].stop(), nodes[1].stop()) await allFuturesThrowing(
await all(awaiters) nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(awaiters)
result = true result = true
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
@ -182,8 +188,8 @@ suite "GossipSub":
"foobar" in gossip1.gossipsub "foobar" in gossip1.gossipsub
gossip2.peerInfo.id in gossip1.gossipsub["foobar"] gossip2.peerInfo.id in gossip1.gossipsub["foobar"]
await all(nodes.mapIt(it.stop())) await allFuturesThrowing(nodes.mapIt(it.stop()))
await all(awaitters) await allFuturesThrowing(awaitters)
result = true result = true
@ -211,7 +217,7 @@ suite "GossipSub":
var subs: seq[Future[void]] var subs: seq[Future[void]]
subs &= waitSub(nodes[1], nodes[0], "foobar") subs &= waitSub(nodes[1], nodes[0], "foobar")
subs &= waitSub(nodes[0], nodes[1], "foobar") subs &= waitSub(nodes[0], nodes[1], "foobar")
await all(subs) await allFuturesThrowing(subs)
let let
gossip1 = GossipSub(nodes[0].pubSub.get()) gossip1 = GossipSub(nodes[0].pubSub.get())
@ -230,8 +236,8 @@ suite "GossipSub":
gossip1.peerInfo.id in gossip2.gossipsub["foobar"] or gossip1.peerInfo.id in gossip2.gossipsub["foobar"] or
gossip1.peerInfo.id in gossip2.mesh["foobar"] gossip1.peerInfo.id in gossip2.mesh["foobar"]
await all(nodes.mapIt(it.stop())) await allFuturesThrowing(nodes.mapIt(it.stop()))
await all(awaitters) await allFuturesThrowing(awaitters)
result = true result = true
@ -279,7 +285,7 @@ suite "GossipSub":
await nodes[0].stop() await nodes[0].stop()
await nodes[1].stop() await nodes[1].stop()
await all(wait) await allFuturesThrowing(wait)
result = observed == 2 result = observed == 2
@ -309,7 +315,7 @@ suite "GossipSub":
await nodes[0].stop() await nodes[0].stop()
await nodes[1].stop() await nodes[1].stop()
await all(wait) await allFuturesThrowing(wait)
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
@ -344,7 +350,7 @@ suite "GossipSub":
subs.add(allFutures(dialer.subscribe("foobar", handler), subs.add(allFutures(dialer.subscribe("foobar", handler),
waitSub(nodes[0], dialer, "foobar"))) waitSub(nodes[0], dialer, "foobar")))
await all(subs) await allFuturesThrowing(subs)
await wait(nodes[0].publish("foobar", await wait(nodes[0].publish("foobar",
cast[seq[byte]]("from node " & cast[seq[byte]]("from node " &
@ -356,8 +362,56 @@ suite "GossipSub":
for k, v in seen.pairs: for k, v in seen.pairs:
check: v == 1 check: v == 1
await all(nodes.mapIt(it.stop())) await allFuturesThrowing(nodes.mapIt(it.stop()))
await all(awaitters) await allFuturesThrowing(awaitters)
result = true
check:
waitFor(runTests()) == true
test "e2e - GossipSub with multiple peers (sparse)":
proc runTests(): Future[bool] {.async.} =
var nodes: seq[Switch] = newSeq[Switch]()
var awaitters: seq[Future[void]]
var runs = 10
for i in 0..<runs:
nodes.add newStandardSwitch(triggerSelf = true, gossip = true)
awaitters.add((await nodes[i].start()))
await subscribeSparseNodes(nodes, 4)
var seen: Table[string, int]
var subs: seq[Future[void]]
var seenFut = newFuture[void]()
for dialer in nodes:
var handler: TopicHandler
closureScope:
var dialerNode = dialer
handler = proc(topic: string, data: seq[byte])
{.async, gcsafe, closure.} =
if dialerNode.peerInfo.id notin seen:
seen[dialerNode.peerInfo.id] = 0
seen[dialerNode.peerInfo.id].inc
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
seenFut.complete()
subs &= dialer.subscribe("foobar", handler)
await allFuturesThrowing(subs)
await wait(nodes[0].publish("foobar",
cast[seq[byte]]("from node " &
nodes[1].peerInfo.id)),
1.minutes)
await wait(seenFut, 5.minutes)
check: seen.len >= runs
for k, v in seen.pairs:
check: v == 1
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(awaitters)
result = true result = true
check: check:

View File

@ -17,6 +17,20 @@ proc subscribeNodes*(nodes: seq[Switch]) {.async.} =
dials.add(dialer.connect(node.peerInfo)) dials.add(dialer.connect(node.peerInfo))
await allFutures(dials) await allFutures(dials)
proc subscribeSparseNodes*(nodes: seq[Switch], degree: int = 2) {.async.} =
if nodes.len < degree:
raise (ref CatchableError)(msg: "nodes count needs to be greater or equal to degree!")
var dials: seq[Future[void]]
for i, dialer in nodes:
if (i mod degree) != 0:
continue
for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId:
dials.add(dialer.connect(node.peerInfo))
await allFutures(dials)
proc subscribeRandom*(nodes: seq[Switch]) {.async.} = proc subscribeRandom*(nodes: seq[Switch]) {.async.} =
var dials: seq[Future[void]] var dials: seq[Future[void]]
for dialer in nodes: for dialer in nodes:

View File

@ -250,7 +250,7 @@ suite "BufferStream":
await buf1.pushTo(cast[seq[byte]]("Hello2!")) await buf1.pushTo(cast[seq[byte]]("Hello2!"))
await buf2.pushTo(cast[seq[byte]]("Hello1!")) await buf2.pushTo(cast[seq[byte]]("Hello1!"))
await all(readFut1, readFut2) await allFuturesThrowing(readFut1, readFut2)
check: check:
res1 == cast[seq[byte]]("Hello2!") res1 == cast[seq[byte]]("Hello2!")
@ -300,7 +300,7 @@ suite "BufferStream":
await buf1.write(cast[seq[byte]]("Hello1!")) await buf1.write(cast[seq[byte]]("Hello1!"))
await buf2.write(cast[seq[byte]]("Hello2!")) await buf2.write(cast[seq[byte]]("Hello2!"))
await all(readFut1, readFut2) await allFuturesThrowing(readFut1, readFut2)
check: check:
res1 == cast[seq[byte]]("Hello2!") res1 == cast[seq[byte]]("Hello2!")
@ -376,7 +376,7 @@ suite "BufferStream":
await buf1.write(cast[seq[byte]]("Hello1!")) await buf1.write(cast[seq[byte]]("Hello1!"))
await buf2.write(cast[seq[byte]]("Hello2!")) await buf2.write(cast[seq[byte]]("Hello2!"))
await all(readFut1, readFut2) await allFuturesThrowing(readFut1, readFut2)
check: check:
res1 == cast[seq[byte]]("Hello2!") res1 == cast[seq[byte]]("Hello2!")
@ -437,7 +437,7 @@ suite "BufferStream":
var writerFut = writer() var writerFut = writer()
var readerFut = reader() var readerFut = reader()
await all(readerFut, writerFut) await allFuturesThrowing(readerFut, writerFut)
result = true result = true
await buf1.close() await buf1.close()

View File

@ -245,7 +245,9 @@ suite "Mplex":
await done.wait(1.seconds) await done.wait(1.seconds)
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await all(transport1.close(), transport2.close()) await allFuturesThrowing(
transport1.close(),
transport2.close())
await listenFut await listenFut
waitFor(testNewStream()) waitFor(testNewStream())
@ -284,7 +286,9 @@ suite "Mplex":
await done.wait(1.seconds) await done.wait(1.seconds)
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await all(transport1.close(), transport2.close()) await allFuturesThrowing(
transport1.close(),
transport2.close())
await listenFut await listenFut
waitFor(testNewStream()) waitFor(testNewStream())
@ -331,7 +335,9 @@ suite "Mplex":
await stream.close() await stream.close()
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await all(transport1.close(), transport2.close()) await allFuturesThrowing(
transport1.close(),
transport2.close())
await listenFut await listenFut
waitFor(testNewStream()) waitFor(testNewStream())
@ -368,7 +374,9 @@ suite "Mplex":
await done.wait(1.seconds) await done.wait(1.seconds)
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await all(transport1.close(), transport2.close()) await allFuturesThrowing(
transport1.close(),
transport2.close())
await listenFut await listenFut
waitFor(testNewStream()) waitFor(testNewStream())
@ -410,7 +418,9 @@ suite "Mplex":
await done.wait(10.seconds) await done.wait(10.seconds)
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await all(transport1.close(), transport2.close()) await allFuturesThrowing(
transport1.close(),
transport2.close())
await listenFut await listenFut
waitFor(testNewStream()) waitFor(testNewStream())
@ -454,7 +464,9 @@ suite "Mplex":
await done.wait(5.seconds) await done.wait(5.seconds)
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await all(transport1.close(), transport2.close()) await allFuturesThrowing(
transport1.close(),
transport2.close())
await listenFut await listenFut
waitFor(testNewStream()) waitFor(testNewStream())
@ -522,7 +534,9 @@ suite "Mplex":
await complete.wait(1.seconds) await complete.wait(1.seconds)
await mplexDialFut await mplexDialFut
await all(transport1.close(), transport2.close()) await allFuturesThrowing(
transport1.close(),
transport2.close())
await listenFut await listenFut
waitFor(test()) waitFor(test())
@ -579,7 +593,9 @@ suite "Mplex":
await stream.close() await stream.close()
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await all(transport1.close(), transport2.close()) await allFuturesThrowing(
transport1.close(),
transport2.close())
await listenFut await listenFut
waitFor(test()) waitFor(test())

View File

@ -277,7 +277,9 @@ suite "Multistream select":
await transport2.close() await transport2.close()
await transport1.close() await transport1.close()
await all(handlerWait1.wait(5000.millis) #[if OK won't happen!!]#, handlerWait2.wait(5000.millis) #[if OK won't happen!!]#) await allFuturesThrowing(
handlerWait1.wait(5000.millis),
handlerWait2.wait(5000.millis))
check: check:
waitFor(endToEnd()) == true waitFor(endToEnd()) == true

View File

@ -222,8 +222,10 @@ suite "Noise":
check "Hello!" == msg check "Hello!" == msg
await conn.close() await conn.close()
await all(switch1.stop(), switch2.stop()) await allFuturesThrowing(
await all(awaiters) switch1.stop(),
switch2.stop())
await allFuturesThrowing(awaiters)
result = true result = true
check: check:

View File

@ -67,14 +67,13 @@ suite "Switch":
check "Hello!" == msg check "Hello!" == msg
await conn.close() await conn.close()
await all( await allFuturesThrowing(
done.wait(5.seconds), #[if OK won't happen!!]# done.wait(5.seconds),
switch1.stop(), switch1.stop(),
switch2.stop(), switch2.stop())
)
# this needs to go at end # this needs to go at end
await all(awaiters) await allFuturesThrowing(awaiters)
waitFor(testSwitch()) waitFor(testSwitch())
@ -125,14 +124,14 @@ suite "Switch":
check (ConnectionTracker(connTracker).opened == check (ConnectionTracker(connTracker).opened ==
(ConnectionTracker(connTracker).closed + 8.uint64)) (ConnectionTracker(connTracker).closed + 8.uint64))
await all( await allFuturesThrowing(
done.wait(5.seconds), #[if OK won't happen!!]# done.wait(5.seconds),
switch1.stop(), switch1.stop(),
switch2.stop(), switch2.stop(),
) )
# this needs to go at end # this needs to go at end
await all(awaiters) await allFuturesThrowing(awaiters)
waitFor(testSwitch()) waitFor(testSwitch())
@ -172,12 +171,12 @@ suite "Switch":
except LPStreamError: except LPStreamError:
result = false result = false
await all( await allFuturesThrowing(
conn.close(), conn.close(),
switch1.stop(), switch1.stop(),
switch2.stop() switch2.stop()
) )
await all(awaiters) await allFuturesThrowing(awaiters)
check: check:
waitFor(testSwitch()) == true waitFor(testSwitch()) == true
@ -210,11 +209,10 @@ suite "Switch":
check switch1.connections.len == 0 check switch1.connections.len == 0
check switch2.connections.len == 0 check switch2.connections.len == 0
await all( await allFuturesThrowing(
switch1.stop(), switch1.stop(),
switch2.stop() switch2.stop())
) await allFuturesThrowing(awaiters)
await all(awaiters)
waitFor(testSwitch()) waitFor(testSwitch())