Compare commits
4 Commits
8e4ba2730a
...
2eac0033b0
Author | SHA1 | Date |
---|---|---|
|
2eac0033b0 | |
|
f676cf0c86 | |
|
69d8ffaefb | |
|
2195313dba |
|
@ -411,6 +411,9 @@ proc validateAndRelay(
|
|||
# small).
|
||||
var peersToSendIDontWant = HashSet[PubSubPeer]()
|
||||
addToSendPeers(peersToSendIDontWant)
|
||||
peersToSendIDontWant.exclIfIt(
|
||||
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
|
||||
)
|
||||
g.broadcast(
|
||||
peersToSendIDontWant,
|
||||
RPCMsg(
|
||||
|
|
|
@ -871,6 +871,70 @@ suite "GossipSub":
|
|||
|
||||
await allFuturesThrowing(nodesFut.concat())
|
||||
|
||||
asyncTest "e2e - iDontWant is sent only for 1.2":
|
||||
# 3 nodes: A <=> B <=> C
|
||||
# (A & C are NOT connected). We pre-emptively send a dontwant from C to B,
|
||||
# and check that B doesn't relay the message to C.
|
||||
# We also check that B sends IDONTWANT to C, but not A
|
||||
func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] =
|
||||
ok(newSeq[byte](10))
|
||||
let
|
||||
nodeA = generateNodes(1, gossip = true, msgIdProvider = dumbMsgIdProvider)[0]
|
||||
nodeB = generateNodes(1, gossip = true, msgIdProvider = dumbMsgIdProvider)[0]
|
||||
nodeC = generateNodes(
|
||||
1,
|
||||
gossip = true,
|
||||
msgIdProvider = dumbMsgIdProvider,
|
||||
gossipSubVersion = GossipSubCodec_11,
|
||||
)[0]
|
||||
|
||||
let nodesFut = await allFinished(
|
||||
nodeA.switch.start(), nodeB.switch.start(), nodeC.switch.start()
|
||||
)
|
||||
|
||||
await nodeA.switch.connect(
|
||||
nodeB.switch.peerInfo.peerId, nodeB.switch.peerInfo.addrs
|
||||
)
|
||||
await nodeB.switch.connect(
|
||||
nodeC.switch.peerInfo.peerId, nodeC.switch.peerInfo.addrs
|
||||
)
|
||||
|
||||
let bFinished = newFuture[void]()
|
||||
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
proc handlerB(topic: string, data: seq[byte]) {.async.} =
|
||||
bFinished.complete()
|
||||
|
||||
nodeA.subscribe("foobar", handler)
|
||||
nodeB.subscribe("foobar", handlerB)
|
||||
nodeC.subscribe("foobar", handler)
|
||||
await waitSubGraph(@[nodeA, nodeB, nodeC], "foobar")
|
||||
|
||||
var gossipA: GossipSub = GossipSub(nodeA)
|
||||
var gossipB: GossipSub = GossipSub(nodeB)
|
||||
var gossipC: GossipSub = GossipSub(nodeC)
|
||||
|
||||
check:
|
||||
gossipC.mesh.peers("foobar") == 1
|
||||
|
||||
tryPublish await nodeA.publish("foobar", newSeq[byte](10000)), 1
|
||||
|
||||
await bFinished
|
||||
|
||||
# "check" alone isn't suitable for testing that a condition is true after some time has passed. Below we verify that
|
||||
# peers A and C haven't received an IDONTWANT message from B, but we need wait some time for potential in flight messages to arrive.
|
||||
await sleepAsync(500.millis)
|
||||
check:
|
||||
toSeq(gossipC.mesh.getOrDefault("foobar")).anyIt(it.iDontWants[^1].len == 0)
|
||||
toSeq(gossipA.mesh.getOrDefault("foobar")).anyIt(it.iDontWants[^1].len == 0)
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodeA.switch.stop(), nodeB.switch.stop(), nodeC.switch.stop()
|
||||
)
|
||||
|
||||
await allFuturesThrowing(nodesFut.concat())
|
||||
|
||||
proc initializeGossipTest(): Future[(seq[PubSub], GossipSub, GossipSub)] {.async.} =
|
||||
let nodes =
|
||||
generateNodes(2, gossip = true, overheadRateLimit = Opt.some((20, 1.millis)))
|
||||
|
|
|
@ -258,9 +258,9 @@ suite "Circuit Relay V2":
|
|||
await allFutures(src.stop(), dst.stop(), rel.stop())
|
||||
|
||||
asyncTest "Connection data exceeded":
|
||||
ldata = 1000
|
||||
ldata = 1500
|
||||
proto.handler = proc(conn: Connection, proto: string) {.async.} =
|
||||
check "count me the better story you know" ==
|
||||
check "count me the best story you know" ==
|
||||
string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("do you expect a lorem ipsum or...?")
|
||||
check "surprise me!" == string.fromBytes(await conn.readLp(1024))
|
||||
|
@ -281,6 +281,7 @@ suite "Circuit Relay V2":
|
|||
philosophical flourish Cato throws himself upon his sword; I quietly
|
||||
take to the ship."""
|
||||
)
|
||||
await conn.close()
|
||||
rv2 = Relay.new(
|
||||
reservationTTL = initDuration(seconds = ttl),
|
||||
limitDuration = ldur,
|
||||
|
@ -305,7 +306,7 @@ suite "Circuit Relay V2":
|
|||
|
||||
rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
|
||||
conn = await src.dial(dst.peerInfo.peerId, @[addrs], customProtoCodec)
|
||||
await conn.writeLp("count me the better story you know")
|
||||
await conn.writeLp("count me the best story you know")
|
||||
check:
|
||||
"do you expect a lorem ipsum or...?" ==
|
||||
string.fromBytes(await conn.readLp(1024))
|
||||
|
|
|
@ -39,6 +39,7 @@ suite "Yamux":
|
|||
conna.close(), connb.close(), yamuxa.close(), yamuxb.close(), handlera, handlerb
|
||||
)
|
||||
|
||||
|
||||
suite "Simple Reading/Writing yamux messages":
|
||||
asyncTest "Roundtrip of small messages":
|
||||
mSetup()
|
||||
|
@ -197,13 +198,14 @@ suite "Yamux":
|
|||
await writerBlocker
|
||||
try:
|
||||
var buffer: array[256, byte]
|
||||
check:
|
||||
(await conn.readOnce(addr buffer[0], 256)) == 0
|
||||
while true:
|
||||
# will crash when reset is received
|
||||
discard await conn.readOnce(addr buffer[0], 256)
|
||||
except CancelledError, LPStreamError:
|
||||
return
|
||||
finally:
|
||||
readerBlocker.complete()
|
||||
await conn.close()
|
||||
readerBlocker.complete()
|
||||
|
||||
let streamA = await yamuxa.newStream()
|
||||
check streamA == yamuxa.getStreams()[0]
|
||||
|
@ -215,7 +217,6 @@ suite "Yamux":
|
|||
for i in 0 .. 3:
|
||||
expect(LPStreamEOFError):
|
||||
await wrFut[i]
|
||||
await sleepAsync(50.millis) # waiting for reset to be send
|
||||
writerBlocker.complete()
|
||||
await readerBlocker
|
||||
await streamA.close()
|
||||
|
|
Loading…
Reference in New Issue