Co-authored-by: Ludovic Chenut <ludovic@status.im>
This commit is contained in:
parent
a69301f392
commit
dc13ff81d3
|
@ -360,7 +360,7 @@ method rpcHandler*(g: GossipSub,
|
||||||
template sub: untyped = rpcMsg.subscriptions[i]
|
template sub: untyped = rpcMsg.subscriptions[i]
|
||||||
g.handleSubscribe(peer, sub.topic, sub.subscribe)
|
g.handleSubscribe(peer, sub.topic, sub.subscribe)
|
||||||
|
|
||||||
# the above call applied limtis to subs number
|
# the above call applied limits to subs number
|
||||||
# in gossipsub we want to apply scoring as well
|
# in gossipsub we want to apply scoring as well
|
||||||
if rpcMsg.subscriptions.len > g.topicsHigh:
|
if rpcMsg.subscriptions.len > g.topicsHigh:
|
||||||
debug "received an rpc message with an oversized amount of subscriptions", peer,
|
debug "received an rpc message with an oversized amount of subscriptions", peer,
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
|
|
||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import sequtils, options, tables, sets
|
import sequtils, options, tables, sets, sugar
|
||||||
import chronos, stew/byteutils
|
import chronos, stew/byteutils
|
||||||
import chronicles
|
import chronicles
|
||||||
import utils, ../../libp2p/[errors,
|
import utils, ../../libp2p/[errors,
|
||||||
|
@ -29,26 +29,6 @@ import ../helpers
|
||||||
|
|
||||||
proc `$`(peer: PubSubPeer): string = shortLog(peer)
|
proc `$`(peer: PubSubPeer): string = shortLog(peer)
|
||||||
|
|
||||||
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
|
||||||
if sender == receiver:
|
|
||||||
return
|
|
||||||
let timeout = Moment.now() + 5.seconds
|
|
||||||
let fsub = GossipSub(sender)
|
|
||||||
|
|
||||||
# this is for testing purposes only
|
|
||||||
# peers can be inside `mesh` and `fanout`, not just `gossipsub`
|
|
||||||
while (not fsub.gossipsub.hasKey(key) or
|
|
||||||
not fsub.gossipsub.hasPeerId(key, receiver.peerInfo.peerId)) and
|
|
||||||
(not fsub.mesh.hasKey(key) or
|
|
||||||
not fsub.mesh.hasPeerId(key, receiver.peerInfo.peerId)) and
|
|
||||||
(not fsub.fanout.hasKey(key) or
|
|
||||||
not fsub.fanout.hasPeerId(key , receiver.peerInfo.peerId)):
|
|
||||||
trace "waitSub sleeping..."
|
|
||||||
|
|
||||||
# await
|
|
||||||
await sleepAsync(5.milliseconds)
|
|
||||||
doAssert Moment.now() < timeout, "waitSub timeout!"
|
|
||||||
|
|
||||||
template tryPublish(call: untyped, require: int, wait = 10.milliseconds, timeout = 5.seconds): untyped =
|
template tryPublish(call: untyped, require: int, wait = 10.milliseconds, timeout = 5.seconds): untyped =
|
||||||
var
|
var
|
||||||
expiration = Moment.now() + timeout
|
expiration = Moment.now() + timeout
|
||||||
|
@ -690,14 +670,14 @@ suite "GossipSub":
|
||||||
seenFut.complete()
|
seenFut.complete()
|
||||||
|
|
||||||
dialer.subscribe("foobar", handler)
|
dialer.subscribe("foobar", handler)
|
||||||
await waitSub(nodes[0], dialer, "foobar")
|
await waitSubGraph(nodes, "foobar")
|
||||||
|
|
||||||
tryPublish await wait(nodes[0].publish("foobar",
|
tryPublish await wait(nodes[0].publish("foobar",
|
||||||
toBytes("from node " &
|
toBytes("from node " &
|
||||||
$nodes[0].peerInfo.peerId)),
|
$nodes[0].peerInfo.peerId)),
|
||||||
1.minutes), 1
|
1.minutes), 1
|
||||||
|
|
||||||
await wait(seenFut, 2.minutes)
|
await wait(seenFut, 1.minutes)
|
||||||
check: seen.len >= runs
|
check: seen.len >= runs
|
||||||
for k, v in seen.pairs:
|
for k, v in seen.pairs:
|
||||||
check: v >= 1
|
check: v >= 1
|
||||||
|
@ -726,10 +706,11 @@ suite "GossipSub":
|
||||||
|
|
||||||
var seen: Table[string, int]
|
var seen: Table[string, int]
|
||||||
var seenFut = newFuture[void]()
|
var seenFut = newFuture[void]()
|
||||||
|
|
||||||
for i in 0..<nodes.len:
|
for i in 0..<nodes.len:
|
||||||
let dialer = nodes[i]
|
let dialer = nodes[i]
|
||||||
var handler: TopicHandler
|
var handler: TopicHandler
|
||||||
closureScope:
|
capture dialer, i:
|
||||||
var peerName = $dialer.peerInfo.peerId
|
var peerName = $dialer.peerInfo.peerId
|
||||||
handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} =
|
handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} =
|
||||||
if peerName notin seen:
|
if peerName notin seen:
|
||||||
|
@ -740,14 +721,14 @@ suite "GossipSub":
|
||||||
seenFut.complete()
|
seenFut.complete()
|
||||||
|
|
||||||
dialer.subscribe("foobar", handler)
|
dialer.subscribe("foobar", handler)
|
||||||
await waitSub(nodes[0], dialer, "foobar")
|
|
||||||
|
|
||||||
|
await waitSubGraph(nodes, "foobar")
|
||||||
tryPublish await wait(nodes[0].publish("foobar",
|
tryPublish await wait(nodes[0].publish("foobar",
|
||||||
toBytes("from node " &
|
toBytes("from node " &
|
||||||
$nodes[0].peerInfo.peerId)),
|
$nodes[0].peerInfo.peerId)),
|
||||||
1.minutes), 1
|
1.minutes), 1
|
||||||
|
|
||||||
await wait(seenFut, 5.minutes)
|
await wait(seenFut, 60.seconds)
|
||||||
check: seen.len >= runs
|
check: seen.len >= runs
|
||||||
for k, v in seen.pairs:
|
for k, v in seen.pairs:
|
||||||
check: v >= 1
|
check: v >= 1
|
||||||
|
|
|
@ -10,8 +10,7 @@
|
||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import sequtils, options, tables, sets
|
import sequtils, options, tables, sets
|
||||||
import chronos, stew/byteutils
|
import chronos, stew/byteutils, chronicles
|
||||||
import chronicles
|
|
||||||
import utils, ../../libp2p/[errors,
|
import utils, ../../libp2p/[errors,
|
||||||
peerid,
|
peerid,
|
||||||
peerinfo,
|
peerinfo,
|
||||||
|
@ -25,26 +24,6 @@ import utils, ../../libp2p/[errors,
|
||||||
protocols/pubsub/rpc/messages]
|
protocols/pubsub/rpc/messages]
|
||||||
import ../helpers
|
import ../helpers
|
||||||
|
|
||||||
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
|
||||||
if sender == receiver:
|
|
||||||
return
|
|
||||||
let timeout = Moment.now() + 5.seconds
|
|
||||||
let fsub = GossipSub(sender)
|
|
||||||
|
|
||||||
# this is for testing purposes only
|
|
||||||
# peers can be inside `mesh` and `fanout`, not just `gossipsub`
|
|
||||||
while (not fsub.gossipsub.hasKey(key) or
|
|
||||||
not fsub.gossipsub.hasPeerId(key, receiver.peerInfo.peerId)) and
|
|
||||||
(not fsub.mesh.hasKey(key) or
|
|
||||||
not fsub.mesh.hasPeerId(key, receiver.peerInfo.peerId)) and
|
|
||||||
(not fsub.fanout.hasKey(key) or
|
|
||||||
not fsub.fanout.hasPeerId(key , receiver.peerInfo.peerId)):
|
|
||||||
trace "waitSub sleeping..."
|
|
||||||
|
|
||||||
# await
|
|
||||||
await sleepAsync(5.milliseconds)
|
|
||||||
doAssert Moment.now() < timeout, "waitSub timeout!"
|
|
||||||
|
|
||||||
template tryPublish(call: untyped, require: int, wait = 10.milliseconds, timeout = 10.seconds): untyped =
|
template tryPublish(call: untyped, require: int, wait = 10.milliseconds, timeout = 10.seconds): untyped =
|
||||||
var
|
var
|
||||||
expiration = Moment.now() + timeout
|
expiration = Moment.now() + timeout
|
||||||
|
@ -269,7 +248,7 @@ suite "GossipSub":
|
||||||
|
|
||||||
await allFuturesThrowing(nodesFut.concat())
|
await allFuturesThrowing(nodesFut.concat())
|
||||||
|
|
||||||
asyncTest "GossipsSub peers disconnections mechanics":
|
asyncTest "GossipSub peers disconnections mechanics":
|
||||||
var runs = 10
|
var runs = 10
|
||||||
|
|
||||||
let
|
let
|
||||||
|
@ -294,7 +273,8 @@ suite "GossipSub":
|
||||||
seenFut.complete()
|
seenFut.complete()
|
||||||
|
|
||||||
dialer.subscribe("foobar", handler)
|
dialer.subscribe("foobar", handler)
|
||||||
await waitSub(nodes[0], dialer, "foobar")
|
|
||||||
|
await waitSubGraph(nodes, "foobar")
|
||||||
|
|
||||||
# ensure peer stats are stored properly and kept properly
|
# ensure peer stats are stored properly and kept properly
|
||||||
check:
|
check:
|
||||||
|
|
|
@ -2,11 +2,9 @@
|
||||||
|
|
||||||
import ../stublogger
|
import ../stublogger
|
||||||
|
|
||||||
import testfloodsub
|
import testfloodsub,
|
||||||
when not defined(linux):
|
testgossipsub,
|
||||||
import testgossipsub,
|
testgossipsub2,
|
||||||
testgossipsub2
|
|
||||||
import
|
|
||||||
testmcache,
|
testmcache,
|
||||||
testtimedcache,
|
testtimedcache,
|
||||||
testmessage
|
testmessage
|
||||||
|
|
|
@ -4,7 +4,7 @@ const
|
||||||
libp2p_pubsub_verify {.booldefine.} = true
|
libp2p_pubsub_verify {.booldefine.} = true
|
||||||
libp2p_pubsub_anonymize {.booldefine.} = false
|
libp2p_pubsub_anonymize {.booldefine.} = false
|
||||||
|
|
||||||
import hashes, random, tables
|
import hashes, random, tables, sets, sequtils
|
||||||
import chronos, stew/[byteutils, results]
|
import chronos, stew/[byteutils, results]
|
||||||
import ../../libp2p/[builders,
|
import ../../libp2p/[builders,
|
||||||
protocols/pubsub/errors,
|
protocols/pubsub/errors,
|
||||||
|
@ -13,6 +13,7 @@ import ../../libp2p/[builders,
|
||||||
protocols/pubsub/floodsub,
|
protocols/pubsub/floodsub,
|
||||||
protocols/pubsub/rpc/messages,
|
protocols/pubsub/rpc/messages,
|
||||||
protocols/secure/secure]
|
protocols/secure/secure]
|
||||||
|
import chronicles
|
||||||
|
|
||||||
export builders
|
export builders
|
||||||
|
|
||||||
|
@ -102,3 +103,43 @@ proc subscribeRandom*(nodes: seq[PubSub]) {.async.} =
|
||||||
if dialer.peerInfo.peerId != node.peerInfo.peerId:
|
if dialer.peerInfo.peerId != node.peerInfo.peerId:
|
||||||
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
|
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
|
||||||
dialed.add(node.peerInfo.peerId)
|
dialed.add(node.peerInfo.peerId)
|
||||||
|
|
||||||
|
proc waitSub*(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
||||||
|
if sender == receiver:
|
||||||
|
return
|
||||||
|
let timeout = Moment.now() + 5.seconds
|
||||||
|
let fsub = GossipSub(sender)
|
||||||
|
|
||||||
|
# this is for testing purposes only
|
||||||
|
# peers can be inside `mesh` and `fanout`, not just `gossipsub`
|
||||||
|
while (not fsub.gossipsub.hasKey(key) or
|
||||||
|
not fsub.gossipsub.hasPeerId(key, receiver.peerInfo.peerId)) and
|
||||||
|
(not fsub.mesh.hasKey(key) or
|
||||||
|
not fsub.mesh.hasPeerId(key, receiver.peerInfo.peerId)) and
|
||||||
|
(not fsub.fanout.hasKey(key) or
|
||||||
|
not fsub.fanout.hasPeerId(key , receiver.peerInfo.peerId)):
|
||||||
|
trace "waitSub sleeping..."
|
||||||
|
|
||||||
|
# await
|
||||||
|
await sleepAsync(5.milliseconds)
|
||||||
|
doAssert Moment.now() < timeout, "waitSub timeout!"
|
||||||
|
|
||||||
|
proc waitSubGraph*(nodes: seq[PubSub], key: string) {.async, gcsafe.} =
|
||||||
|
let timeout = Moment.now() + 5.seconds
|
||||||
|
while true:
|
||||||
|
var
|
||||||
|
nodesMesh: Table[PeerId, seq[PeerId]]
|
||||||
|
seen: HashSet[PeerId]
|
||||||
|
for n in nodes:
|
||||||
|
nodesMesh[n.peerInfo.peerId] = toSeq(GossipSub(n).mesh.getOrDefault(key).items()).mapIt(it.peerId)
|
||||||
|
proc explore(p: PeerId) =
|
||||||
|
if p in seen: return
|
||||||
|
seen.incl(p)
|
||||||
|
for peer in nodesMesh.getOrDefault(p):
|
||||||
|
explore(peer)
|
||||||
|
explore(nodes[0].peerInfo.peerId)
|
||||||
|
if seen.len == nodes.len: return
|
||||||
|
trace "waitSubGraph sleeping..."
|
||||||
|
|
||||||
|
await sleepAsync(5.milliseconds)
|
||||||
|
doAssert Moment.now() < timeout, "waitSubGraph timeout!"
|
||||||
|
|
Loading…
Reference in New Issue