defaultMsgIdProvider alternative/test anonymize (#379)

* defaultMsgIdProvider alternative/test anonymize

* avoid freeze during flood tests

* avoid `empty message, skipping` situation

* test observers

* avoid double initPubSub

* fix gossip testing (specially when anonymize is on)

* make azure tests shorter
This commit is contained in:
Giovanni Petrantoni 2020-09-28 16:11:18 +09:00 committed by GitHub
parent 8ecef46738
commit 98d0cc3a16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 43 additions and 23 deletions

View File

@ -119,7 +119,7 @@ steps:
nimble install -y --depsOnly nimble install -y --depsOnly
# run tests # run tests
nimble test nimble test_slim
nimble examples_build nimble examples_build
displayName: 'build and test' displayName: 'build and test'

View File

@ -50,10 +50,18 @@ task testpubsub, "Runs pubsub tests":
runTest("pubsub/testgossipinternal", sign = false, verify = false, moreoptions = "-d:pubsub_internal_testing") runTest("pubsub/testgossipinternal", sign = false, verify = false, moreoptions = "-d:pubsub_internal_testing")
runTest("pubsub/testpubsub") runTest("pubsub/testpubsub")
runTest("pubsub/testpubsub", sign = false, verify = false) runTest("pubsub/testpubsub", sign = false, verify = false)
runTest("pubsub/testpubsub", sign = false, verify = false, moreoptions = "-d:libp2p_pubsub_anonymize=true")
runTest("pubsub/testgossipinternal10", sign = false, verify = false, moreoptions = "-d:pubsub_internal_testing") runTest("pubsub/testgossipinternal10", sign = false, verify = false, moreoptions = "-d:pubsub_internal_testing")
runTest("pubsub/testpubsub", moreoptions = "-d:fallback_gossipsub_10") runTest("pubsub/testpubsub", moreoptions = "-d:fallback_gossipsub_10")
runTest("pubsub/testpubsub", sign = false, verify = false, moreoptions = "-d:fallback_gossipsub_10") runTest("pubsub/testpubsub", sign = false, verify = false, moreoptions = "-d:fallback_gossipsub_10")
task testpubsub_slim, "Runs pubsub tests":
runTest("pubsub/testgossipinternal", sign = false, verify = false, moreoptions = "-d:pubsub_internal_testing")
runTest("pubsub/testpubsub")
runTest("pubsub/testpubsub", sign = false, verify = false, moreoptions = "-d:libp2p_pubsub_anonymize=true")
runTest("pubsub/testgossipinternal10", sign = false, verify = false, moreoptions = "-d:pubsub_internal_testing")
runTest("pubsub/testpubsub", moreoptions = "-d:fallback_gossipsub_10")
task testfilter, "Run PKI filter test": task testfilter, "Run PKI filter test":
runTest("testpkifilter", runTest("testpkifilter",
moreoptions = "-d:libp2p_pki_schemes=\"secp256k1\"") moreoptions = "-d:libp2p_pki_schemes=\"secp256k1\"")
@ -71,5 +79,12 @@ task test, "Runs the test suite":
exec "nimble testinterop" exec "nimble testinterop"
exec "nimble testfilter" exec "nimble testfilter"
task test_slim, "Runs the test suite":
exec "nimble testnative"
exec "nimble testpubsub_slim"
exec "nimble testdaemon"
exec "nimble testinterop"
exec "nimble testfilter"
task examples_build, "Build the samples": task examples_build, "Build the samples":
buildSample("directchat") buildSample("directchat")

View File

@ -125,6 +125,7 @@ method onPubSubPeerEvent*(p: PubSub, peer: PubsubPeer, event: PubsubPeerEvent) {
# Peer event is raised for the send connection in particular # Peer event is raised for the send connection in particular
case event.kind case event.kind
of PubSubPeerEventKind.Connected: of PubSubPeerEventKind.Connected:
if p.topics.len > 0:
p.sendSubs(peer, toSeq(p.topics.keys), true) p.sendSubs(peer, toSeq(p.topics.keys), true)
of PubSubPeerEventKind.Disconnected: of PubSubPeerEventKind.Disconnected:
discard discard
@ -357,7 +358,6 @@ proc init*[PubParams: object | bool](
topics: initTable[string, Topic](), topics: initTable[string, Topic](),
msgIdProvider: msgIdProvider, msgIdProvider: msgIdProvider,
parameters: parameters) parameters: parameters)
pubsub.initPubSub()
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =
if event == PeerEvent.Joined: if event == PeerEvent.Joined:
@ -369,6 +369,7 @@ proc init*[PubParams: object | bool](
switch.addPeerEventHandler(peerEventHandler, PeerEvent.Left) switch.addPeerEventHandler(peerEventHandler, PeerEvent.Left)
pubsub.initPubSub() pubsub.initPubSub()
return pubsub return pubsub

View File

@ -9,6 +9,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import hashes
import chronicles, metrics, stew/[byteutils, endians2] import chronicles, metrics, stew/[byteutils, endians2]
import ./messages, import ./messages,
./protobuf, ./protobuf,
@ -28,7 +29,10 @@ declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated
declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages") declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
func defaultMsgIdProvider*(m: Message): string = func defaultMsgIdProvider*(m: Message): string =
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
byteutils.toHex(m.seqno) & $m.fromPeer byteutils.toHex(m.seqno) & $m.fromPeer
else:
$m.data.hash & $m.topicIDs.hash
proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] = proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] =
ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes()) ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes())

View File

@ -326,7 +326,7 @@ suite "FloodSub":
var pubs: seq[Future[int]] var pubs: seq[Future[int]]
for i in 0..<runs: for i in 0..<runs:
pubs &= nodes[i].publish("foobar", "Hello!".toBytes()) pubs &= nodes[i].publish("foobar", ("Hello!" & $i).toBytes())
await allFuturesThrowing(pubs) await allFuturesThrowing(pubs)
await allFuturesThrowing(futs.mapIt(it[0])) await allFuturesThrowing(futs.mapIt(it[0]))
@ -379,7 +379,7 @@ suite "FloodSub":
var pubs: seq[Future[int]] var pubs: seq[Future[int]]
for i in 0..<runs: for i in 0..<runs:
pubs &= nodes[i].publish("foobar", "Hello!".toBytes()) pubs &= nodes[i].publish("foobar", ("Hello!" & $i).toBytes())
await allFuturesThrowing(pubs) await allFuturesThrowing(pubs)
await allFuturesThrowing(futs.mapIt(it[0])) await allFuturesThrowing(futs.mapIt(it[0]))

View File

@ -426,8 +426,8 @@ suite "GossipSub":
inc observed inc observed
) )
# nodes[1].addObserver(obs1) nodes[1].addObserver(obs1)
# nodes[0].addObserver(obs2) nodes[0].addObserver(obs2)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
@ -457,7 +457,7 @@ suite "GossipSub":
) )
await allFuturesThrowing(nodesFut.concat()) await allFuturesThrowing(nodesFut.concat())
# check observed == 2 check observed == 2
waitFor(runTests()) waitFor(runTests())
@ -535,7 +535,6 @@ suite "GossipSub":
await subscribeNodes(nodes) await subscribeNodes(nodes)
var seen: Table[string, int] var seen: Table[string, int]
var subs: seq[Future[void]]
var seenFut = newFuture[void]() var seenFut = newFuture[void]()
for dialer in nodes: for dialer in nodes:
var handler: TopicHandler var handler: TopicHandler
@ -549,14 +548,13 @@ suite "GossipSub":
if not seenFut.finished() and seen.len >= runs: if not seenFut.finished() and seen.len >= runs:
seenFut.complete() seenFut.complete()
subs &= dialer.subscribe("foobar", handler) await dialer.subscribe("foobar", handler)
await waitSub(nodes[0], dialer, "foobar")
await allFuturesThrowing(subs).wait(30.seconds)
tryPublish await wait(nodes[0].publish("foobar", tryPublish await wait(nodes[0].publish("foobar",
toBytes("from node " & toBytes("from node " &
$nodes[1].peerInfo.peerId)), $nodes[0].peerInfo.peerId)),
1.minutes), runs, 5.seconds 1.minutes), 1, 5.seconds
await wait(seenFut, 2.minutes) await wait(seenFut, 2.minutes)
check: seen.len >= runs check: seen.len >= runs
@ -591,7 +589,6 @@ suite "GossipSub":
await subscribeNodes(nodes) await subscribeNodes(nodes)
var seen: Table[PeerID, int] var seen: Table[PeerID, int]
var subs: seq[Future[void]]
var seenFut = newFuture[void]() var seenFut = newFuture[void]()
for dialer in nodes: for dialer in nodes:
var handler: TopicHandler var handler: TopicHandler
@ -606,14 +603,13 @@ suite "GossipSub":
if not seenFut.finished() and seen.len >= runs: if not seenFut.finished() and seen.len >= runs:
seenFut.complete() seenFut.complete()
subs &= dialer.subscribe("foobar", handler) await dialer.subscribe("foobar", handler)
subs &= waitSub(nodes[0], dialer, "foobar") await waitSub(nodes[0], dialer, "foobar")
await allFuturesThrowing(subs)
tryPublish await wait(nodes[0].publish("foobar", tryPublish await wait(nodes[0].publish("foobar",
toBytes("from node " & toBytes("from node " &
$nodes[1].peerInfo.peerId)), $nodes[0].peerInfo.peerId)),
1.minutes), 2, 5.seconds 1.minutes), 1, 5.seconds
await wait(seenFut, 5.minutes) await wait(seenFut, 5.minutes)
check: seen.len >= runs check: seen.len >= runs

View File

@ -2,6 +2,7 @@
const const
libp2p_pubsub_sign {.booldefine.} = true libp2p_pubsub_sign {.booldefine.} = true
libp2p_pubsub_verify {.booldefine.} = true libp2p_pubsub_verify {.booldefine.} = true
libp2p_pubsub_anonymize {.booldefine.} = false
import random import random
import chronos import chronos
@ -28,6 +29,7 @@ proc generateNodes*(
gossip: bool = false, gossip: bool = false,
triggerSelf: bool = false, triggerSelf: bool = false,
verifySignature: bool = libp2p_pubsub_verify, verifySignature: bool = libp2p_pubsub_verify,
anonymize: bool = libp2p_pubsub_anonymize,
sign: bool = libp2p_pubsub_sign): seq[PubSub] = sign: bool = libp2p_pubsub_sign): seq[PubSub] =
for i in 0..<num: for i in 0..<num:
@ -39,6 +41,7 @@ proc generateNodes*(
verifySignature = verifySignature, verifySignature = verifySignature,
sign = sign, sign = sign,
msgIdProvider = msgIdProvider, msgIdProvider = msgIdProvider,
anonymize = anonymize,
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p)).PubSub parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p)).PubSub
else: else:
FloodSub.init( FloodSub.init(
@ -46,7 +49,8 @@ proc generateNodes*(
triggerSelf = triggerSelf, triggerSelf = triggerSelf,
verifySignature = verifySignature, verifySignature = verifySignature,
sign = sign, sign = sign,
msgIdProvider = msgIdProvider).PubSub msgIdProvider = msgIdProvider,
anonymize = anonymize).PubSub
switch.mount(pubsub) switch.mount(pubsub)
result.add(pubsub) result.add(pubsub)