diff --git a/libp2p.nimble b/libp2p.nimble index a43f9f82c..6b0bba906 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -16,19 +16,43 @@ requires "nim >= 1.2.0", "secp256k1", "stew" -proc runTest(filename: string, secure: string = "secio") = - exec "nim c -r --opt:speed -d:debug --verbosity:0 --hints:off tests/" & filename +proc runTest(filename: string, secure: string = "secio", verify: bool = true, sign: bool = true) = + var excstr: string = "nim c -r --opt:speed -d:debug --verbosity:0 --hints:off" + excstr.add(" ") + excstr.add("-d:libp2p_secure=" & $secure) + excstr.add(" ") + excstr.add("-d:libp2p_pubsub_sign=" & $sign) + excstr.add(" ") + excstr.add("-d:libp2p_pubsub_verify=" & $verify) + excstr.add(" ") + excstr.add("tests/" & filename) + exec excstr rmFile "tests/" & filename.toExe proc buildSample(filename: string) = exec "nim c --opt:speed --threads:on -d:debug --verbosity:0 --hints:off examples/" & filename rmFile "examples" & filename.toExe -task test, "Runs the test suite": +task testnative, "Runs libp2p native tests": runTest("testnative") - runTest("testnative", "noise") + +task testdaemon, "Runs daemon tests": runTest("testdaemon") + +task testinterop, "Runs interop tests": runTest("testinterop") +task testpubsub, "Runs pubsub tests": + runTest("pubsub/testpubsub") + runTest("pubsub/testpubsub", sign = false, verify = false) + # runTest("pubsub/testpubsub", "noise") + +task test, "Runs the test suite": + exec "nimble testnative" + # runTest("testnative", "noise") + exec "nimble testpubsub" + exec "nimble testdaemon" + exec "nimble testinterop" + task examples_build, "Build the samples": buildSample("directchat") diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 5a95a6314..a02f09da8 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -65,7 +65,7 @@ method rpcHandler*(f: FloodSub, if msg.msgId notin f.seen: f.seen.put(msg.msgId) # add the message to the seen cache - if not msg.verify(peer.peerInfo): + if f.verifySignature and not msg.verify(peer.peerInfo): trace "dropping message due to failed signature verification" continue @@ -120,7 +120,7 @@ method publish*(f: FloodSub, return trace "publishing on topic", name = topic - let msg = newMessage(f.peerInfo, data, topic) + let msg = newMessage(f.peerInfo, data, topic, f.sign) var sent: seq[Future[void]] # start the future but do not wait yet for p in f.floodsub[topic]: diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index f7c9e5cdf..4f7e41234 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -90,7 +90,7 @@ method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async.} = trace "peer disconnected", peer=peer.id await procCall FloodSub(g).handleDisconnect(peer) - + for t in g.gossipsub.keys: g.gossipsub[t].excl(peer.id) @@ -179,7 +179,7 @@ method rpcHandler(g: GossipSub, g.seen.put(msg.msgId) # add the message to the seen cache - if not msg.verify(peer.peerInfo): + if g.verifySignature and not msg.verify(peer.peerInfo): trace "dropping message due to failed signature verification" continue @@ -404,7 +404,7 @@ method publish*(g: GossipSub, # set the fanout expiery time g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) - let msg = newMessage(g.peerInfo, data, topic) + let msg = newMessage(g.peerInfo, data, topic, g.sign) var sent: seq[Future[void]] for p in peers: if p == g.peerInfo.id: @@ -449,378 +449,3 @@ method initPubSub(g: GossipSub) = g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip g.control = initTable[string, ControlMessage]() # pending control messages g.heartbeatLock = newAsyncLock() - -## Unit tests -when isMainModule: - ## Test internal (private) methods for gossip, - ## mesh and fanout maintenance. - ## Usually I wouldn't test private behaviour, - ## but the maintenance methods are quite involved, - ## hence these tests are here. - ## - - import unittest - import ../../errors - import ../../stream/bufferstream - - type - TestGossipSub = ref object of GossipSub - - const - StreamTransportTrackerName = "stream.transport" - StreamServerTrackerName = "stream.server" - - suite "GossipSub": - teardown: - let - trackers = [ - getTracker(BufferStreamTrackerName), - getTracker(AsyncStreamWriterTrackerName), - getTracker(AsyncStreamReaderTrackerName), - getTracker(StreamTransportTrackerName), - getTracker(StreamServerTrackerName) - ] - for tracker in trackers: - if not isNil(tracker): - # echo tracker.dump() - check tracker.isLeaked() == false - - test "`rebalanceMesh` Degree Lo": - proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, - PeerInfo.init(PrivateKey.random(RSA))) - - let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[string]() - proc writeHandler(data: seq[byte]) {.async.} = - discard - - var conns = newSeq[Connection]() - for i in 0..<15: - let conn = newConnection(newBufferStream(writeHandler)) - conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].conn = conn - gossipSub.mesh[topic].incl(peerInfo.id) - - check gossipSub.peers.len == 15 - await gossipSub.rebalanceMesh(topic) - check gossipSub.mesh[topic].len == GossipSubD - - await allFuturesThrowing(conns.mapIt(it.close())) - - result = true - - check: - waitFor(testRun()) == true - - test "`rebalanceMesh` Degree Hi": - proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, - PeerInfo.init(PrivateKey.random(RSA))) - - let topic = "foobar" - gossipSub.gossipsub[topic] = initHashSet[string]() - proc writeHandler(data: seq[byte]) {.async.} = - discard - - var conns = newSeq[Connection]() - for i in 0..<15: - let conn = newConnection(newBufferStream(writeHandler)) - conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].conn = conn - gossipSub.gossipsub[topic].incl(peerInfo.id) - - check gossipSub.gossipsub[topic].len == 15 - await gossipSub.rebalanceMesh(topic) - check gossipSub.mesh[topic].len == GossipSubD - - await allFuturesThrowing(conns.mapIt(it.close())) - - result = true - - check: - waitFor(testRun()) == true - - test "`replenishFanout` Degree Lo": - proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, - PeerInfo.init(PrivateKey.random(RSA))) - - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = - discard - - let topic = "foobar" - gossipSub.gossipsub[topic] = initHashSet[string]() - proc writeHandler(data: seq[byte]) {.async.} = - discard - - var conns = newSeq[Connection]() - for i in 0..<15: - let conn = newConnection(newBufferStream(writeHandler)) - conns &= conn - var peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler - gossipSub.gossipsub[topic].incl(peerInfo.id) - - check gossipSub.gossipsub[topic].len == 15 - await gossipSub.replenishFanout(topic) - check gossipSub.fanout[topic].len == GossipSubD - - await allFuturesThrowing(conns.mapIt(it.close())) - - result = true - - check: - waitFor(testRun()) == true - - test "`dropFanoutPeers` drop expired fanout topics": - proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, - PeerInfo.init(PrivateKey.random(RSA))) - - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = - discard - - let topic = "foobar" - gossipSub.fanout[topic] = initHashSet[string]() - gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(100.millis) - proc writeHandler(data: seq[byte]) {.async.} = - discard - - var conns = newSeq[Connection]() - for i in 0..<6: - let conn = newConnection(newBufferStream(writeHandler)) - conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler - gossipSub.fanout[topic].incl(peerInfo.id) - - check gossipSub.fanout[topic].len == GossipSubD - - await gossipSub.dropFanoutPeers() - check topic notin gossipSub.fanout - - await allFuturesThrowing(conns.mapIt(it.close())) - - result = true - - check: - waitFor(testRun()) == true - - test "`dropFanoutPeers` leave unexpired fanout topics": - proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, - PeerInfo.init(PrivateKey.random(RSA))) - - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = - discard - - let topic1 = "foobar1" - let topic2 = "foobar2" - gossipSub.fanout[topic1] = initHashSet[string]() - gossipSub.fanout[topic2] = initHashSet[string]() - gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(100.millis) - gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(500.millis) - - proc writeHandler(data: seq[byte]) {.async.} = - discard - - var conns = newSeq[Connection]() - for i in 0..<6: - let conn = newConnection(newBufferStream(writeHandler)) - conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler - gossipSub.fanout[topic1].incl(peerInfo.id) - gossipSub.fanout[topic2].incl(peerInfo.id) - - check gossipSub.fanout[topic1].len == GossipSubD - check gossipSub.fanout[topic2].len == GossipSubD - - await gossipSub.dropFanoutPeers() - check topic1 notin gossipSub.fanout - check topic2 in gossipSub.fanout - - await allFuturesThrowing(conns.mapIt(it.close())) - - result = true - - check: - waitFor(testRun()) == true - - test "`getGossipPeers` - should gather up to degree D non intersecting peers": - proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, - PeerInfo.init(PrivateKey.random(RSA))) - - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = - discard - - proc writeHandler(data: seq[byte]) {.async.} = - discard - - let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[string]() - gossipSub.fanout[topic] = initHashSet[string]() - gossipSub.gossipsub[topic] = initHashSet[string]() - var conns = newSeq[Connection]() - for i in 0..<30: - let conn = newConnection(newBufferStream(writeHandler)) - conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler - if i mod 2 == 0: - gossipSub.fanout[topic].incl(peerInfo.id) - else: - gossipSub.mesh[topic].incl(peerInfo.id) - - for i in 0..<15: - let conn = newConnection(newBufferStream(writeHandler)) - conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler - gossipSub.gossipsub[topic].incl(peerInfo.id) - - check gossipSub.fanout[topic].len == 15 - check gossipSub.fanout[topic].len == 15 - check gossipSub.gossipsub[topic].len == 15 - - let peers = gossipSub.getGossipPeers() - check peers.len == GossipSubD - for p in peers.keys: - check p notin gossipSub.fanout[topic] - check p notin gossipSub.mesh[topic] - - await allFuturesThrowing(conns.mapIt(it.close())) - - result = true - - check: - waitFor(testRun()) == true - - test "`getGossipPeers` - should not crash on missing topics in mesh": - proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, - PeerInfo.init(PrivateKey.random(RSA))) - - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = - discard - - proc writeHandler(data: seq[byte]) {.async.} = - discard - - let topic = "foobar" - gossipSub.fanout[topic] = initHashSet[string]() - gossipSub.gossipsub[topic] = initHashSet[string]() - var conns = newSeq[Connection]() - for i in 0..<30: - let conn = newConnection(newBufferStream(writeHandler)) - conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler - if i mod 2 == 0: - gossipSub.fanout[topic].incl(peerInfo.id) - else: - gossipSub.gossipsub[topic].incl(peerInfo.id) - - let peers = gossipSub.getGossipPeers() - check peers.len == GossipSubD - - await allFuturesThrowing(conns.mapIt(it.close())) - - result = true - - check: - waitFor(testRun()) == true - - test "`getGossipPeers` - should not crash on missing topics in gossip": - proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, - PeerInfo.init(PrivateKey.random(RSA))) - - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = - discard - - proc writeHandler(data: seq[byte]) {.async.} = - discard - - let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[string]() - gossipSub.gossipsub[topic] = initHashSet[string]() - var conns = newSeq[Connection]() - for i in 0..<30: - let conn = newConnection(newBufferStream(writeHandler)) - conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler - if i mod 2 == 0: - gossipSub.mesh[topic].incl(peerInfo.id) - else: - gossipSub.gossipsub[topic].incl(peerInfo.id) - - let peers = gossipSub.getGossipPeers() - check peers.len == GossipSubD - - await allFuturesThrowing(conns.mapIt(it.close())) - - result = true - - check: - waitFor(testRun()) == true - - test "`getGossipPeers` - should not crash on missing topics in gossip": - proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, - PeerInfo.init(PrivateKey.random(RSA))) - - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = - discard - - proc writeHandler(data: seq[byte]) {.async.} = - discard - - let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[string]() - gossipSub.fanout[topic] = initHashSet[string]() - var conns = newSeq[Connection]() - for i in 0..<30: - let conn = newConnection(newBufferStream(writeHandler)) - conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler - if i mod 2 == 0: - gossipSub.mesh[topic].incl(peerInfo.id) - else: - gossipSub.fanout[topic].incl(peerInfo.id) - - let peers = gossipSub.getGossipPeers() - check peers.len == 0 - - await allFuturesThrowing(conns.mapIt(it.close())) - - result = true - - check: - waitFor(testRun()) == true diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index a6b9e9931..637b01c25 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -39,6 +39,8 @@ type topics*: Table[string, Topic] # local topics peers*: Table[string, PubSubPeer] # peerid to peer map triggerSelf*: bool # trigger own local handler on publish + verifySignature*: bool # enable signature verification + sign*: bool # enable message signing cleanupLock: AsyncLock validators*: Table[string, HashSet[ValidatorHandler]] @@ -241,11 +243,14 @@ method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} = let futs = await allFinished(pending) result = futs.allIt(not it.failed and it.read()) -proc newPubSub*(p: typedesc[PubSub], +proc newPubSub*(P: typedesc[PubSub], peerInfo: PeerInfo, - triggerSelf: bool = false): p = - new result - result.peerInfo = peerInfo - result.triggerSelf = triggerSelf - result.cleanupLock = newAsyncLock() + triggerSelf: bool = false, + verifySignature: bool = true, + sign: bool = true): P = + result = P(peerInfo: peerInfo, + triggerSelf: triggerSelf, + verifySignature: verifySignature, + sign: sign, + cleanupLock: newAsyncLock()) result.initPubSub() diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 438a07080..981b42afc 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -106,8 +106,9 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = proc sendMsg*(p: PubSubPeer, peerId: PeerID, topic: string, - data: seq[byte]): Future[void] {.gcsafe.} = - p.send(@[RPCMsg(messages: @[newMessage(p.peerInfo, data, topic)])]) + data: seq[byte], + sign: bool): Future[void] {.gcsafe.} = + p.send(@[RPCMsg(messages: @[newMessage(p.peerInfo, data, topic, sign)])]) proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} = for topic in topics: diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index 66339504c..11109eab0 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -53,7 +53,7 @@ proc verify*(m: Message, p: PeerInfo): bool = proc newMessage*(p: PeerInfo, data: seq[byte], - name: string, + topic: string, sign: bool = true): Message {.gcsafe.} = var seqno: seq[byte] = newSeq[byte](20) if p.publicKey.isSome and randomBytes(addr seqno[0], 20) > 0: @@ -62,7 +62,7 @@ proc newMessage*(p: PeerInfo, result = Message(fromPeer: p.peerId.getBytes(), data: data, seqno: seqno, - topicIDs: @[name]) + topicIDs: @[topic]) if sign: result = result.sign(p) diff --git a/libp2p/standard_setup.nim b/libp2p/standard_setup.nim index bb96cbee9..c05d34056 100644 --- a/libp2p/standard_setup.nim +++ b/libp2p/standard_setup.nim @@ -1,6 +1,8 @@ # compile time options here const libp2p_secure {.strdefine.} = "" + libp2p_pubsub_sign {.booldefine.} = true + libp2p_pubsub_verify {.booldefine.} = true import options, tables, @@ -21,7 +23,9 @@ export proc newStandardSwitch*(privKey = none(PrivateKey), address = MultiAddress.init("/ip4/127.0.0.1/tcp/0"), triggerSelf = false, - gossip = false): Switch = + gossip = false, + verifySignature = libp2p_pubsub_verify, + sign = libp2p_pubsub_sign): Switch = proc createMplex(conn: Connection): Muxer = result = newMplex(conn) @@ -36,10 +40,19 @@ proc newStandardSwitch*(privKey = none(PrivateKey), let secureManagers = {NoiseCodec: newNoise(seckey).Secure}.toTable else: let secureManagers = {SecioCodec: newSecio(seckey).Secure}.toTable + let pubSub = if gossip: - PubSub newPubSub(GossipSub, peerInfo, triggerSelf) + PubSub newPubSub(GossipSub, + peerInfo = peerInfo, + triggerSelf = triggerSelf, + verifySignature = verifySignature, + sign = sign) else: - PubSub newPubSub(FloodSub, peerInfo, triggerSelf) + PubSub newPubSub(FloodSub, + peerInfo = peerInfo, + triggerSelf = triggerSelf, + verifySignature = verifySignature, + sign = sign) result = newSwitch(peerInfo, transports, @@ -47,4 +60,3 @@ proc newStandardSwitch*(privKey = none(PrivateKey), muxers, secureManagers = secureManagers, pubSub = some(pubSub)) - diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim new file mode 100644 index 000000000..ec8b6b6b1 --- /dev/null +++ b/tests/pubsub/testgossipinternal.nim @@ -0,0 +1,344 @@ +include ../../libp2p/protocols/pubsub/gossipsub + +import unittest +import ../../libp2p/errors +import ../../libp2p/stream/bufferstream + +type + TestGossipSub = ref object of GossipSub + +const + StreamTransportTrackerName = "stream.transport" + StreamServerTrackerName = "stream.server" + +suite "GossipSub internal": + teardown: + let + trackers = [ + getTracker(BufferStreamTrackerName), + getTracker(AsyncStreamWriterTrackerName), + getTracker(AsyncStreamReaderTrackerName), + getTracker(StreamTransportTrackerName), + getTracker(StreamServerTrackerName) + ] + for tracker in trackers: + if not isNil(tracker): + # echo tracker.dump() + check tracker.isLeaked() == false + + test "`rebalanceMesh` Degree Lo": + proc testRun(): Future[bool] {.async.} = + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[string]() + + var conns = newSeq[Connection]() + for i in 0..<15: + let conn = newConnection(newBufferStream()) + conns &= conn + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = peerInfo + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].conn = conn + gossipSub.mesh[topic].incl(peerInfo.id) + + check gossipSub.peers.len == 15 + await gossipSub.rebalanceMesh(topic) + check gossipSub.mesh[topic].len == GossipSubD + + await allFuturesThrowing(conns.mapIt(it.close())) + + result = true + + check: + waitFor(testRun()) == true + + test "`rebalanceMesh` Degree Hi": + proc testRun(): Future[bool] {.async.} = + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) + + let topic = "foobar" + gossipSub.gossipsub[topic] = initHashSet[string]() + + var conns = newSeq[Connection]() + for i in 0..<15: + let conn = newConnection(newBufferStream()) + conns &= conn + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = peerInfo + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].conn = conn + gossipSub.gossipsub[topic].incl(peerInfo.id) + + check gossipSub.gossipsub[topic].len == 15 + await gossipSub.rebalanceMesh(topic) + check gossipSub.mesh[topic].len == GossipSubD + + await allFuturesThrowing(conns.mapIt(it.close())) + + result = true + + check: + waitFor(testRun()) == true + + test "`replenishFanout` Degree Lo": + proc testRun(): Future[bool] {.async.} = + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + discard + + let topic = "foobar" + gossipSub.gossipsub[topic] = initHashSet[string]() + + var conns = newSeq[Connection]() + for i in 0..<15: + let conn = newConnection(newBufferStream()) + conns &= conn + var peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = peerInfo + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler + gossipSub.gossipsub[topic].incl(peerInfo.id) + + check gossipSub.gossipsub[topic].len == 15 + await gossipSub.replenishFanout(topic) + check gossipSub.fanout[topic].len == GossipSubD + + await allFuturesThrowing(conns.mapIt(it.close())) + + result = true + + check: + waitFor(testRun()) == true + + test "`dropFanoutPeers` drop expired fanout topics": + proc testRun(): Future[bool] {.async.} = + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + discard + + let topic = "foobar" + gossipSub.fanout[topic] = initHashSet[string]() + gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(100.millis) + + var conns = newSeq[Connection]() + for i in 0..<6: + let conn = newConnection(newBufferStream()) + conns &= conn + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = peerInfo + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler + gossipSub.fanout[topic].incl(peerInfo.id) + + check gossipSub.fanout[topic].len == GossipSubD + + await gossipSub.dropFanoutPeers() + check topic notin gossipSub.fanout + + await allFuturesThrowing(conns.mapIt(it.close())) + + result = true + + check: + waitFor(testRun()) == true + + test "`dropFanoutPeers` leave unexpired fanout topics": + proc testRun(): Future[bool] {.async.} = + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + discard + + let topic1 = "foobar1" + let topic2 = "foobar2" + gossipSub.fanout[topic1] = initHashSet[string]() + gossipSub.fanout[topic2] = initHashSet[string]() + gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(100.millis) + gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(5.seconds) + + var conns = newSeq[Connection]() + for i in 0..<6: + let conn = newConnection(newBufferStream()) + conns &= conn + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = peerInfo + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler + gossipSub.fanout[topic1].incl(peerInfo.id) + gossipSub.fanout[topic2].incl(peerInfo.id) + + check gossipSub.fanout[topic1].len == GossipSubD + check gossipSub.fanout[topic2].len == GossipSubD + + await gossipSub.dropFanoutPeers() + check topic1 notin gossipSub.fanout + check topic2 in gossipSub.fanout + + await allFuturesThrowing(conns.mapIt(it.close())) + + result = true + + check: + waitFor(testRun()) == true + + test "`getGossipPeers` - should gather up to degree D non intersecting peers": + proc testRun(): Future[bool] {.async.} = + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + discard + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[string]() + gossipSub.fanout[topic] = initHashSet[string]() + gossipSub.gossipsub[topic] = initHashSet[string]() + var conns = newSeq[Connection]() + for i in 0..<30: + let conn = newConnection(newBufferStream()) + conns &= conn + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = peerInfo + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler + if i mod 2 == 0: + gossipSub.fanout[topic].incl(peerInfo.id) + else: + gossipSub.mesh[topic].incl(peerInfo.id) + + for i in 0..<15: + let conn = newConnection(newBufferStream()) + conns &= conn + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = peerInfo + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler + gossipSub.gossipsub[topic].incl(peerInfo.id) + + check gossipSub.fanout[topic].len == 15 + check gossipSub.fanout[topic].len == 15 + check gossipSub.gossipsub[topic].len == 15 + + let peers = gossipSub.getGossipPeers() + check peers.len == GossipSubD + for p in peers.keys: + check p notin gossipSub.fanout[topic] + check p notin gossipSub.mesh[topic] + + await allFuturesThrowing(conns.mapIt(it.close())) + + result = true + + check: + waitFor(testRun()) == true + + test "`getGossipPeers` - should not crash on missing topics in mesh": + proc testRun(): Future[bool] {.async.} = + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + discard + + let topic = "foobar" + gossipSub.fanout[topic] = initHashSet[string]() + gossipSub.gossipsub[topic] = initHashSet[string]() + var conns = newSeq[Connection]() + for i in 0..<30: + let conn = newConnection(newBufferStream()) + conns &= conn + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = peerInfo + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler + if i mod 2 == 0: + gossipSub.fanout[topic].incl(peerInfo.id) + else: + gossipSub.gossipsub[topic].incl(peerInfo.id) + + let peers = gossipSub.getGossipPeers() + check peers.len == GossipSubD + + await allFuturesThrowing(conns.mapIt(it.close())) + + result = true + + check: + waitFor(testRun()) == true + + test "`getGossipPeers` - should not crash on missing topics in gossip": + proc testRun(): Future[bool] {.async.} = + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + discard + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[string]() + gossipSub.gossipsub[topic] = initHashSet[string]() + var conns = newSeq[Connection]() + for i in 0..<30: + let conn = newConnection(newBufferStream()) + conns &= conn + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = peerInfo + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler + if i mod 2 == 0: + gossipSub.mesh[topic].incl(peerInfo.id) + else: + gossipSub.gossipsub[topic].incl(peerInfo.id) + + let peers = gossipSub.getGossipPeers() + check peers.len == GossipSubD + + await allFuturesThrowing(conns.mapIt(it.close())) + + result = true + + check: + waitFor(testRun()) == true + + test "`getGossipPeers` - should not crash on missing topics in gossip": + proc testRun(): Future[bool] {.async.} = + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + discard + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[string]() + gossipSub.fanout[topic] = initHashSet[string]() + var conns = newSeq[Connection]() + for i in 0..<30: + let conn = newConnection(newBufferStream()) + conns &= conn + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = peerInfo + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler + if i mod 2 == 0: + gossipSub.mesh[topic].incl(peerInfo.id) + else: + gossipSub.fanout[topic].incl(peerInfo.id) + + let peers = gossipSub.getGossipPeers() + check peers.len == 0 + + await allFuturesThrowing(conns.mapIt(it.close())) + + result = true + + check: + waitFor(testRun()) == true diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 1cffc2dd6..8e34cf9b6 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -171,44 +171,6 @@ suite "GossipSub": check: waitFor(runTests()) == true - test "GossipSub should add remote peer topic subscriptions": - proc runTests(): Future[bool] {.async.} = - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - discard - - let gossip1 = createGossipSub() - let gossip2 = createGossipSub() - - var buf1 = newBufferStream() - var conn1 = newConnection(buf1) - conn1.peerInfo = gossip1.peerInfo - - var buf2 = newBufferStream() - var conn2 = newConnection(buf2) - conn2.peerInfo = gossip2.peerInfo - - buf1 = buf1 | buf2 | buf1 - - await gossip1.subscribeToPeer(conn2) - asyncCheck gossip2.handleConn(conn1, GossipSubCodec) - - await gossip1.subscribe("foobar", handler) - await sleepAsync(1.seconds) - - check: - "foobar" in gossip2.gossipsub - gossip1.peerInfo.id in gossip2.gossipsub["foobar"] - - await allFuturesThrowing( - buf1.close(), - buf2.close() - ) - - result = true - - check: - waitFor(runTests()) == true - test "e2e - GossipSub should add remote peer topic subscriptions": proc testBasicGossipSub(): Future[bool] {.async.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = @@ -242,57 +204,6 @@ suite "GossipSub": check: waitFor(testBasicGossipSub()) == true - test "GossipSub should add remote peer topic subscriptions if both peers are subscribed": - proc runTests(): Future[bool] {.async.} = - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - discard - - let gossip1 = createGossipSub() - let gossip2 = createGossipSub() - - var buf1 = newBufferStream() - var conn1 = newConnection(buf1) - conn1.peerInfo = gossip1.peerInfo - - var buf2 = newBufferStream() - var conn2 = newConnection(buf2) - conn2.peerInfo = gossip2.peerInfo - - buf1 = buf1 | buf2 | buf1 - - await gossip1.subscribeToPeer(conn2) - asyncCheck gossip1.handleConn(conn1, GossipSubCodec) - - await gossip2.subscribeToPeer(conn1) - asyncCheck gossip2.handleConn(conn2, GossipSubCodec) - - await gossip1.subscribe("foobar", handler) - await gossip2.subscribe("foobar", handler) - await sleepAsync(1.seconds) - - check: - "foobar" in gossip1.topics - "foobar" in gossip2.topics - - "foobar" in gossip1.gossipsub - "foobar" in gossip2.gossipsub - - # TODO: in a real setting, we would be checking for the peerId from - # gossip1 in gossip2 and vice versa, but since we're doing some mockery - # with connection piping and such, this is fine - do not change! - gossip1.peerInfo.id in gossip1.gossipsub["foobar"] - gossip2.peerInfo.id in gossip2.gossipsub["foobar"] - - await allFuturesThrowing( - buf1.close(), - buf2.close() - ) - - result = true - - check: - waitFor(runTests()) == true - test "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed": proc testBasicGossipSub(): Future[bool] {.async.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = @@ -341,46 +252,6 @@ suite "GossipSub": check: waitFor(testBasicGossipSub()) == true - # test "send over fanout A -> B": - # proc runTests(): Future[bool] {.async.} = - # var handlerFut = newFuture[bool]() - # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - # check: - # topic == "foobar" - # cast[string](data) == "Hello!" - - # handlerFut.complete(true) - - # let gossip1 = createGossipSub() - # let gossip2 = createGossipSub() - - # var buf1 = newBufferStream() - # var conn1 = newConnection(buf1) - - # var buf2 = newBufferStream() - # var conn2 = newConnection(buf2) - - # conn1.peerInfo = gossip2.peerInfo - # conn2.peerInfo = gossip1.peerInfo - - # buf1 = buf1 | buf2 | buf1 - - # await gossip1.subscribeToPeer(conn2) - # asyncCheck gossip1.handleConn(conn1, GossipSubCodec) - - # await gossip2.subscribeToPeer(conn1) - # asyncCheck gossip2.handleConn(conn2, GossipSubCodec) - - # await gossip1.subscribe("foobar", handler) - # await sleepAsync(1.seconds) - # await gossip2.publish("foobar", cast[seq[byte]]("Hello!")) - # await sleepAsync(1.seconds) - - # result = await handlerFut - - # check: - # waitFor(runTests()) == true - test "e2e - GossipSub send over fanout A -> B": proc runTests(): Future[bool] {.async.} = var passed = newFuture[void]() @@ -417,49 +288,6 @@ suite "GossipSub": check: waitFor(runTests()) == true - # test "send over mesh A -> B": - # proc runTests(): Future[bool] {.async.} = - # var passed = newFuture[void]() - # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - # check: - # topic == "foobar" - # cast[string](data) == "Hello!" - - # passed.complete() - - # let gossip1 = createGossipSub() - # let gossip2 = createGossipSub() - - # var buf1 = newBufferStream() - # var conn1 = newConnection(buf1) - # conn1.peerInfo = gossip1.peerInfo - - # var buf2 = newBufferStream() - # var conn2 = newConnection(buf2) - # conn2.peerInfo = gossip2.peerInfo - - # buf1 = buf1 | buf2 | buf1 - - # await gossip1.subscribeToPeer(conn2) - # await gossip2.subscribeToPeer(conn1) - - # await gossip1.subscribe("foobar", handler) - # await sleepAsync(1.seconds) - - # await gossip2.subscribe("foobar", handler) - # await sleepAsync(1.seconds) - - # await gossip2.publish("foobar", cast[seq[byte]]("Hello!")) - # await sleepAsync(1.seconds) - - # await passed.wait(5.seconds) - # result = true - - # await allFuturesThrowing(buf1.close(), buf2.close()) - - # check: - # waitFor(runTests()) == true - test "e2e - GossipSub send over mesh A -> B": proc runTests(): Future[bool] {.async.} = var passed: Future[bool] = newFuture[bool]() @@ -488,64 +316,6 @@ suite "GossipSub": check: waitFor(runTests()) == true - # test "with multiple peers": - # proc runTests(): Future[bool] {.async.} = - # var nodes: seq[GossipSub] - # for i in 0..<10: - # nodes.add(createGossipSub()) - - # var pending: seq[Future[void]] - # var awaitters: seq[Future[void]] - # var seen: Table[string, int] - # for dialer in nodes: - # var handler: TopicHandler - # closureScope: - # var dialerNode = dialer - # handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = - # if dialerNode.peerInfo.peerId.get().pretty notin seen: - # seen[dialerNode.peerInfo.peerId.get().pretty] = 0 - # seen[dialerNode.peerInfo.peerId.get().pretty].inc - # check topic == "foobar" - - # await dialer.subscribe("foobar", handler) - # await sleepAsync(20.millis) - - # for i, node in nodes: - # if dialer.peerInfo.peerId != node.peerInfo.peerId: - # var buf1 = newBufferStream() - # var conn1 = newConnection(buf1) - # conn1.peerInfo = dialer.peerInfo - - # var buf2 = newBufferStream() - # var conn2 = newConnection(buf2) - # conn2.peerInfo = node.peerInfo - - # buf1 = buf2 | buf1 - # buf2 = buf1 | buf2 - - # pending.add(dialer.subscribeToPeer(conn2)) - # pending.add(node.subscribeToPeer(conn1)) - # await sleepAsync(10.millis) - - # awaitters.add(dialer.start()) - - # await nodes[0].publish("foobar", - # cast[seq[byte]]("from node " & - # nodes[1].peerInfo.peerId.get().pretty)) - - # await sleepAsync(1000.millis) - # await allFuturesThrowing(nodes.mapIt(it.stop())) - # await allFuturesThrowing(awaitters) - - # check: seen.len == 9 - # for k, v in seen.pairs: - # check: v == 1 - - # result = true - - # check: - # waitFor(runTests()) == true - test "e2e - GossipSub with multiple peers": proc runTests(): Future[bool] {.async.} = var nodes: seq[Switch] = newSeq[Switch]() diff --git a/tests/pubsub/testpubsub.nim b/tests/pubsub/testpubsub.nim index 1401b3168..96897e5bd 100644 --- a/tests/pubsub/testpubsub.nim +++ b/tests/pubsub/testpubsub.nim @@ -1,4 +1,4 @@ -include ../../libp2p/protocols/pubsub/gossipsub -import testfloodsub, +import testgossipinternal, + testfloodsub, testgossipsub, testmcache diff --git a/tests/testnative.nim b/tests/testnative.nim index be262fc56..bb4147f48 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -19,5 +19,4 @@ import testtransport, testswitch, testnoise, testpeerinfo, - testmplex, - pubsub/testpubsub + testmplex