From fb0d10b6fdd93515508de095c7f63f2547a5116b Mon Sep 17 00:00:00 2001 From: Tanguy Date: Mon, 27 Dec 2021 11:17:00 +0100 Subject: [PATCH 01/16] Gossipsub: process messages concurrently (#680) * Gossip sub: process messages concurrently * Retries for flaky test --- libp2p/protocols/pubsub/gossipsub.nim | 107 ++++++++++++++------------ tests/asyncunit.nim | 22 ++++++ tests/testdaemon.nim | 12 ++- 3 files changed, 86 insertions(+), 55 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index f46260e..2691180 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -282,6 +282,64 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = peer, RPCMsg(control: some(respControl), messages: messages)) +proc validateAndRelay(g: GossipSub, + msg: Message, + msgId, msgIdSalted: MessageId, + peer: PubSubPeer) {.async.} = + try: + let validation = await g.validate(msg) + + var seenPeers: HashSet[PubSubPeer] + discard g.validationSeen.pop(msgIdSalted, seenPeers) + libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64) + + case validation + of ValidationResult.Reject: + debug "Dropping message after validation, reason: reject", + msgId = shortLog(msgId), peer + g.punishInvalidMessage(peer, msg.topicIDs) + return + of ValidationResult.Ignore: + debug "Dropping message after validation, reason: ignore", + msgId = shortLog(msgId), peer + return + of ValidationResult.Accept: + discard + + # store in cache only after validation + g.mcache.put(msgId, msg) + + g.rewardDelivered(peer, msg.topicIDs, true) + + var toSendPeers = HashSet[PubSubPeer]() + for t in msg.topicIDs: # for every topic in the message + if t notin g.topics: + continue + + g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) + g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) + + # Don't send it to source peer, or peers that + # sent it during validation + toSendPeers.excl(peer) + toSendPeers.excl(seenPeers) + + # In theory, if topics are the same in all messages, we could batch - we'd + # also have to be careful to only include validated messages + g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + trace "forwared message to peers", peers = toSendPeers.len, msgId, peer + for topic in msg.topicIDs: + if topic notin g.topics: continue + + if g.knownTopics.contains(topic): + libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic]) + else: + libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"]) + + await handleData(g, topic, msg.data) + except CatchableError as exc: + info "validateAndRelay failed", msg=exc.msg + method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = @@ -343,54 +401,7 @@ method rpcHandler*(g: GossipSub, # (eg, pop everything you put in it) g.validationSeen[msgIdSalted] = initHashSet[PubSubPeer]() - let validation = await g.validate(msg) - - var seenPeers: HashSet[PubSubPeer] - discard g.validationSeen.pop(msgIdSalted, seenPeers) - libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64) - - case validation - of ValidationResult.Reject: - debug "Dropping message after validation, reason: reject", - msgId = shortLog(msgId), peer - g.punishInvalidMessage(peer, msg.topicIDs) - continue - of ValidationResult.Ignore: - debug "Dropping message after validation, reason: ignore", - msgId = shortLog(msgId), peer - continue - of ValidationResult.Accept: - discard - - # store in cache only after validation - g.mcache.put(msgId, msg) - - g.rewardDelivered(peer, msg.topicIDs, true) - - var toSendPeers = HashSet[PubSubPeer]() - for t in msg.topicIDs: # for every topic in the message - if t notin g.topics: - continue - - g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) - g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) - - await handleData(g, t, msg.data) - - # Don't send it to source peer, or peers that - # sent it during validation - toSendPeers.excl(peer) - toSendPeers.excl(seenPeers) - - # In theory, if topics are the same in all messages, we could batch - we'd - # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) - trace "forwared message to peers", peers = toSendPeers.len, msgId, peer - for topic in msg.topicIDs: - if g.knownTopics.contains(topic): - libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic]) - else: - libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"]) + asyncSpawn g.validateAndRelay(msg, msgId, msgIdSalted, peer) if rpcMsg.control.isSome(): g.handleControl(peer, rpcMsg.control.unsafeGet()) diff --git a/tests/asyncunit.nim b/tests/asyncunit.nim index 9da6838..fa10c9e 100644 --- a/tests/asyncunit.nim +++ b/tests/asyncunit.nim @@ -22,3 +22,25 @@ template asyncTest*(name: string, body: untyped): untyped = proc() {.async, gcsafe.} = body )()) + +template flakyAsyncTest*(name: string, attempts: int, body: untyped): untyped = + test name: + var attemptNumber = 0 + while attemptNumber < attempts: + let isLastAttempt = attemptNumber == attempts - 1 + inc attemptNumber + try: + waitFor(( + proc() {.async, gcsafe.} = + body + )()) + except Exception as e: + if isLastAttempt: raise e + else: testStatusIMPL = TestStatus.FAILED + finally: + if not isLastAttempt: + if testStatusIMPL == TestStatus.FAILED: + # Retry + testStatusIMPL = TestStatus.OK + else: + break diff --git a/tests/testdaemon.nim b/tests/testdaemon.nim index 885cb1c..5c1da3d 100644 --- a/tests/testdaemon.nim +++ b/tests/testdaemon.nim @@ -1,4 +1,4 @@ -import chronos, unittest2 +import chronos, unittest2, helpers import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec, ../libp2p/cid, ../libp2p/multihash, ../libp2p/peerid @@ -140,9 +140,7 @@ when isMainModule: # test "Provide CID test": # check: # waitFor(provideCidTest()) == true - test "GossipSub test": - check: - waitFor(pubsubTest({PSGossipSub})) == true - test "FloodSub test": - check: - waitFor(pubsubTest({PSFloodSub})) == true + flakyAsyncTest "GossipSub test", attempts=4: + check (await pubsubTest({PSGossipSub})) == true + flakyAsyncTest "FloodSub test", attempts=4: + check (await pubsubTest({PSFloodSub})) == true From dffe4bed4541dee2645629729a245e7607d716ad Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 4 Jan 2022 05:21:24 -0600 Subject: [PATCH 02/16] mitigate high traffic - drop faulty peers (#460) Only allow 1 connection per peer Co-authored-by: Tanguy --- libp2p/connmanager.nim | 2 +- tests/testswitch.nim | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 9613874..3f670cd 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -25,7 +25,7 @@ declareGauge(libp2p_peers, "total connected peers") const MaxConnections* = 50 - MaxConnectionsPerPeer* = 5 + MaxConnectionsPerPeer* = 1 type TooManyConnectionsError* = object of LPError diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 603812c..8d21a0f 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -562,6 +562,7 @@ suite "Switch": conns.dec switches.add(newStandardSwitch( + maxConnsPerPeer = 10, rng = rng)) switches[0].addConnEventHandler(hook, ConnEventKind.Connected) From f3dee6865cae08032e2976ff31e977a3371f3e6d Mon Sep 17 00:00:00 2001 From: Tanguy Date: Wed, 5 Jan 2022 16:27:33 +0100 Subject: [PATCH 03/16] Chronos strict exception tracking (#652) * Enable chronos strict exception tracking ( -d:chronosStrictException ) --- .pinned | 4 ++-- examples/directchat.nim | 5 ++++- libp2p.nimble | 20 ++++++++++++++------ libp2p/connmanager.nim | 2 +- libp2p/daemon/daemonapi.nim | 18 ++++++++++++------ libp2p/errors.nim | 2 +- libp2p/nameresolving/dnsresolver.nim | 5 +++++ libp2p/protocols/ping.nim | 2 ++ libp2p/transports/tcptransport.nim | 2 +- tests/commontransport.nim | 2 +- tests/helpers.nim | 6 +++++- tests/testmultistream.nim | 18 ++++++++++-------- tests/testnoise.nim | 2 +- tests/testwstransport.nim | 21 ++++++++++++--------- 14 files changed, 71 insertions(+), 38 deletions(-) diff --git a/.pinned b/.pinned index 43951a3..a2aea23 100644 --- a/.pinned +++ b/.pinned @@ -2,7 +2,7 @@ asynctest;https://github.com/markspanbroek/asynctest@#3882ed64ed3159578f796bc5ae bearssl;https://github.com/status-im/nim-bearssl@#ba80e2a0d7ae8aab666cee013e38ff8d33a3e5e7 chronicles;https://github.com/status-im/nim-chronicles@#2a2681b60289aaf7895b7056f22616081eb1a882 chronos;https://github.com/status-im/nim-chronos@#7dc58d42b6905a7fd7531875fa76060f8f744e4e -dnsclient;https://github.com/ba0f3/dnsclient.nim@#536cc6b7933e5f86590bb27083c0ffeab31255f9 +dnsclient;https://github.com/ba0f3/dnsclient.nim@#fbb76f8af8a33ab818184a7d4406d9fee20993be faststreams;https://github.com/status-im/nim-faststreams@#c653d05f277dca0f374732c5b9b80f2368faea33 httputils;https://github.com/status-im/nim-http-utils@#507bfb7dcb6244d76ce2567df7bf3756cbe88775 json_serialization;https://github.com/status-im/nim-json-serialization@#010aa238cf6afddf1fbe4cbcd27ab3be3f443841 @@ -14,4 +14,4 @@ stew;https://github.com/status-im/nim-stew@#2f9c61f485e1de6d7e163294008276c455d3 testutils;https://github.com/status-im/nim-testutils@#aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2 unittest2;https://github.com/status-im/nim-unittest2@#4e2893eacb916c7678fdc4935ff7420f13bf3a9c websock;https://github.com/status-im/nim-websock@#c2aae352f7fad7a8d333327c37e966969d3ee542 -zlib;https://github.com/status-im/nim-zlib@#d4e716d071eba1b5e0ffdf7949d983959e2b95dd \ No newline at end of file +zlib;https://github.com/status-im/nim-zlib@#d4e716d071eba1b5e0ffdf7949d983959e2b95dd diff --git a/examples/directchat.nim b/examples/directchat.nim index d473a9f..121f031 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -162,7 +162,10 @@ proc main() {.async.} = stdinReader = fromPipe(rfd) var thread: Thread[AsyncFD] - thread.createThread(readInput, wfd) + try: + thread.createThread(readInput, wfd) + except Exception as exc: + quit("Failed to create thread: " & exc.msg) var localAddress = MultiAddress.init(DefaultAddr).tryGet() diff --git a/libp2p.nimble b/libp2p.nimble index 33be693..22cfce8 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -9,7 +9,7 @@ skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"] requires "nim >= 1.2.0", "nimcrypto >= 0.4.1", - "https://github.com/ba0f3/dnsclient.nim == 0.1.0", + "dnsclient >= 0.1.2", "bearssl >= 0.1.4", "chronicles >= 0.10.2", "chronos >= 3.0.6", @@ -18,11 +18,19 @@ requires "nim >= 1.2.0", "stew#head", "websock" +const nimflags = + "--verbosity:0 --hints:off " & + "--warning[CaseTransition]:off --warning[ObservableStores]:off " & + "--warning[LockLevel]:off " & + "-d:chronosStrictException " & + "--styleCheck:usages --styleCheck:hint " + proc runTest(filename: string, verify: bool = true, sign: bool = true, moreoptions: string = "") = let env_nimflags = getEnv("NIMFLAGS") - var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:libp2p_network_protocols_metrics --verbosity:0 --hints:off --styleCheck:usages --styleCheck:hint " & env_nimflags - excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off") + var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:libp2p_network_protocols_metrics " + excstr.add(" " & env_nimflags & " ") + excstr.add(" " & nimflags & " ") excstr.add(" -d:libp2p_pubsub_sign=" & $sign) excstr.add(" -d:libp2p_pubsub_verify=" & $verify) excstr.add(" " & moreoptions & " ") @@ -34,8 +42,8 @@ proc runTest(filename: string, verify: bool = true, sign: bool = true, rmFile "tests/" & filename.toExe proc buildSample(filename: string, run = false) = - var excstr = "nim c --opt:speed --threads:on -d:debug --verbosity:0 --hints:off " - excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off") + var excstr = "nim c --opt:speed --threads:on -d:debug " + excstr.add(" " & nimflags & " ") excstr.add(" examples/" & filename) exec excstr if run: @@ -44,7 +52,7 @@ proc buildSample(filename: string, run = false) = proc buildTutorial(filename: string) = discard gorge "cat " & filename & " | nim c -r --hints:off tools/markdown_runner.nim | " & - " nim --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off c -" + " nim " & nimflags & " c -" task testnative, "Runs libp2p native tests": runTest("testnative") diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 3f670cd..3d769e5 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -65,7 +65,7 @@ type discard PeerEventHandler* = - proc(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} + proc(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe, raises: [Defect].} MuxerHolder = object muxer: Muxer diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index ff9a502..c082a60 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -150,10 +150,10 @@ type key*: PublicKey P2PStreamCallback* = proc(api: DaemonAPI, - stream: P2PStream): Future[void] {.gcsafe.} + stream: P2PStream): Future[void] {.gcsafe, raises: [Defect, CatchableError].} P2PPubSubCallback* = proc(api: DaemonAPI, ticket: PubsubTicket, - message: PubSubMessage): Future[bool] {.gcsafe.} + message: PubSubMessage): Future[bool] {.gcsafe, raises: [Defect, CatchableError].} DaemonError* = object of LPError DaemonRemoteError* = object of DaemonError @@ -755,7 +755,13 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, # Starting daemon process # echo "Starting ", cmd, " ", args.join(" ") - api.process = startProcess(cmd, "", args, env, {poParentStreams}) + api.process = + try: + startProcess(cmd, "", args, env, {poParentStreams}) + except CatchableError as exc: + raise exc + except Exception as exc: + raiseAssert exc.msg # Waiting until daemon will not be bound to control socket. while true: if not api.process.running(): @@ -900,7 +906,7 @@ proc openStream*(api: DaemonAPI, peer: PeerId, stream.flags.incl(Outbound) stream.transp = transp result = stream - except Exception as exc: + except CatchableError as exc: await api.closeConnection(transp) raise exc @@ -936,7 +942,7 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string], protocols)) pb.withMessage() do: api.servers.add(P2PServer(server: server, address: maddress)) - except Exception as exc: + except CatchableError as exc: for item in protocols: api.handlers.del(item) server.stop() @@ -1301,7 +1307,7 @@ proc pubsubSubscribe*(api: DaemonAPI, topic: string, ticket.transp = transp asyncSpawn pubsubLoop(api, ticket) result = ticket - except Exception as exc: + except CatchableError as exc: await api.closeConnection(transp) raise exc diff --git a/libp2p/errors.nim b/libp2p/errors.nim index 83d4116..3b6f3ed 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -49,7 +49,7 @@ proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] = for fut in args: futs &= fut proc call() {.async.} = - var first: ref Exception = nil + var first: ref CatchableError = nil futs = await allFinished(futs) for fut in futs: if fut.failed: diff --git a/libp2p/nameresolving/dnsresolver.nim b/libp2p/nameresolving/dnsresolver.nim index 1bb6660..d224afc 100644 --- a/libp2p/nameresolving/dnsresolver.nim +++ b/libp2p/nameresolving/dnsresolver.nim @@ -78,7 +78,12 @@ proc getDnsResponse( dataStream = newStringStream() dataStream.writeData(addr rawResponse[0], rawResponse.len) dataStream.setPosition(0) + # parseResponse can has a raises: [Exception, ..] because of + # https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008 + # it can't actually raise though return parseResponse(dataStream) + except CatchableError as exc: raise exc + except Exception as exc: raiseAssert exc.msg finally: await sock.closeWait() diff --git a/libp2p/protocols/ping.nim b/libp2p/protocols/ping.nim index 6e5e6bc..c07b2a0 100644 --- a/libp2p/protocols/ping.nim +++ b/libp2p/protocols/ping.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [Defect].} + import chronos, chronicles, bearssl import ../protobuf/minprotobuf, ../peerinfo, diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 1c2fd3d..4238c08 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -225,7 +225,7 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} = debug "Server was closed", exc = exc.msg raise newTransportClosedError(exc) except CancelledError as exc: - raise + raise exc except CatchableError as exc: debug "Unexpected error accepting connection", exc = exc.msg raise exc diff --git a/tests/commontransport.nim b/tests/commontransport.nim index 92f817a..76aa2bb 100644 --- a/tests/commontransport.nim +++ b/tests/commontransport.nim @@ -11,7 +11,7 @@ import ../libp2p/[stream/connection, import ./helpers -type TransportProvider* = proc(): Transport {.gcsafe.} +type TransportProvider* = proc(): Transport {.gcsafe, raises: [Defect].} proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = suite name & " common tests": diff --git a/tests/helpers.nim b/tests/helpers.nim index c336291..a607df6 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -13,6 +13,8 @@ import ../libp2p/protocols/secure/secure import ./asyncunit export asyncunit +{.push raises: [Defect].} + const StreamTransportTrackerName = "stream.transport" StreamServerTrackerName = "stream.server" @@ -51,7 +53,9 @@ template checkTrackers*() = checkpoint tracker.dump() fail() # Also test the GC is not fooling with us - GC_fullCollect() + try: + GC_fullCollect() + except: discard type RngWrap = object rng: ref BrHmacDrbgContext diff --git a/tests/testmultistream.nim b/tests/testmultistream.nim index 986b837..5690ec0 100644 --- a/tests/testmultistream.nim +++ b/tests/testmultistream.nim @@ -10,6 +10,9 @@ import ../libp2p/errors, ../libp2p/protocols/protocol, ../libp2p/upgrademngrs/upgrade + +{.push raises: [Defect].} + import ./helpers when defined(nimHasUsed): {.used.} @@ -53,7 +56,7 @@ method readOnce*(s: TestSelectStream, method write*(s: TestSelectStream, msg: seq[byte]) {.async, gcsafe.} = discard -method close(s: TestSelectStream) {.async, gcsafe.} = +method close(s: TestSelectStream) {.async, gcsafe, raises: [Defect].} = s.isClosed = true s.isEof = true @@ -63,7 +66,7 @@ proc newTestSelectStream(): TestSelectStream = ## Mock stream for handles `ls` test type - LsHandler = proc(procs: seq[byte]): Future[void] {.gcsafe.} + LsHandler = proc(procs: seq[byte]): Future[void] {.gcsafe, raises: [Defect].} TestLsStream = ref object of Connection step*: int @@ -115,7 +118,7 @@ proc newTestLsStream(ls: LsHandler): TestLsStream {.gcsafe.} = ## Mock stream for handles `na` test type - NaHandler = proc(procs: string): Future[void] {.gcsafe.} + NaHandler = proc(procs: string): Future[void] {.gcsafe, raises: [Defect].} TestNaStream = ref object of Connection step*: int @@ -195,14 +198,14 @@ suite "Multistream select": asyncTest "test handle `ls`": let ms = MultistreamSelect.new() - proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} # forward declaration - let conn = Connection(newTestLsStream(testLsHandler)) + var conn: Connection = nil let done = newFuture[void]() proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} = var strProto: string = string.fromBytes(proto) check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n" await conn.close() done.complete() + conn = Connection(newTestLsStream(testLsHandler)) proc testHandler(conn: Connection, proto: string): Future[void] {.async, gcsafe.} = discard @@ -216,13 +219,12 @@ suite "Multistream select": asyncTest "test handle `na`": let ms = MultistreamSelect.new() - proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} - let conn = newTestNaStream(testNaHandler) - + var conn: Connection = nil proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} = echo msg check msg == Na await conn.close() + conn = newTestNaStream(testNaHandler) var protocol: LPProtocol = new LPProtocol proc testHandler(conn: Connection, diff --git a/tests/testnoise.nim b/tests/testnoise.nim index 293dec9..8415d6f 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -40,7 +40,7 @@ const type TestProto = ref object of LPProtocol -method init(p: TestProto) {.gcsafe.} = +method init(p: TestProto) {.gcsafe, raises: [Defect].} = proc handle(conn: Connection, proto: string) {.async, gcsafe.} = let msg = string.fromBytes(await conn.readLp(1024)) check "Hello!" == msg diff --git a/tests/testwstransport.nim b/tests/testwstransport.nim index bf8b9ce..0e46b00 100644 --- a/tests/testwstransport.nim +++ b/tests/testwstransport.nim @@ -13,7 +13,7 @@ import ../libp2p/[stream/connection, import ./helpers, ./commontransport const - SecureKey* = """ + SecureKey = """ -----BEGIN PRIVATE KEY----- MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBAP0yH7F7FtGunC91 IPkU+u8B4gdxiwYW0J3PrixtB1Xz3e4dfjwQqhIJlG6BxQ4myCxmSPjxP/eOOYp+ @@ -32,7 +32,7 @@ NABr5ec1FxuJa/8= -----END PRIVATE KEY----- """ - SecureCert* = """ + SecureCert = """ -----BEGIN CERTIFICATE----- MIICjDCCAfWgAwIBAgIURjeiJmkNbBVktqXvnXh44DKx364wDQYJKoZIhvcNAQEL BQAwVzELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM @@ -62,13 +62,16 @@ suite "WebSocket transport": commonTransportTest( "WebSocket Secure", - proc (): Transport = - WsTransport.new( - Upgrade(), - TLSPrivateKey.init(SecureKey), - TLSCertificate.init(SecureCert), - {TLSFlags.NoVerifyHost, TLSFlags.NoVerifyServerName}), - "/ip4/0.0.0.0/tcp/0/wss") + (proc (): Transport {.gcsafe.} = + try: + return WsTransport.new( + Upgrade(), + TLSPrivateKey.init(SecureKey), + TLSCertificate.init(SecureCert), + {TLSFlags.NoVerifyHost, TLSFlags.NoVerifyServerName}) + except Exception: check(false) + ), + "/ip4/0.0.0.0/tcp/0/wss") asyncTest "Hostname verification": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0/wss").tryGet()] From 388b92d58fe5eaa1f0d411b0c543ab745bc3246d Mon Sep 17 00:00:00 2001 From: Tanguy Date: Fri, 7 Jan 2022 08:19:22 +0100 Subject: [PATCH 04/16] Bump dependencies (#683) --- .pinned | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.pinned b/.pinned index a2aea23..9c1dfb9 100644 --- a/.pinned +++ b/.pinned @@ -1,17 +1,17 @@ asynctest;https://github.com/markspanbroek/asynctest@#3882ed64ed3159578f796bc5ae0c6b13837fe798 bearssl;https://github.com/status-im/nim-bearssl@#ba80e2a0d7ae8aab666cee013e38ff8d33a3e5e7 chronicles;https://github.com/status-im/nim-chronicles@#2a2681b60289aaf7895b7056f22616081eb1a882 -chronos;https://github.com/status-im/nim-chronos@#7dc58d42b6905a7fd7531875fa76060f8f744e4e +chronos;https://github.com/status-im/nim-chronos@#17fed89c99beac5a92d3668d0d3e9b0e4ac13936 dnsclient;https://github.com/ba0f3/dnsclient.nim@#fbb76f8af8a33ab818184a7d4406d9fee20993be faststreams;https://github.com/status-im/nim-faststreams@#c653d05f277dca0f374732c5b9b80f2368faea33 -httputils;https://github.com/status-im/nim-http-utils@#507bfb7dcb6244d76ce2567df7bf3756cbe88775 +httputils;https://github.com/status-im/nim-http-utils@#ad6f82e4b7d504ea9b8222c9c01aa1d24ebd4dc8 json_serialization;https://github.com/status-im/nim-json-serialization@#010aa238cf6afddf1fbe4cbcd27ab3be3f443841 metrics;https://github.com/status-im/nim-metrics@#2c0c486c65f980e8387f86bed0b43d53161c8286 nimcrypto;https://github.com/cheatfate/nimcrypto@#a5742a9a214ac33f91615f3862c7b099aec43b00 -secp256k1;https://github.com/status-im/nim-secp256k1@#d790c42206fab4b8008eaa91181ca8c8c68a0105 +secp256k1;https://github.com/status-im/nim-secp256k1@#153ec71a3416c1043846c9462587f74b73b1e5a8 serialization;https://github.com/status-im/nim-serialization@#11a8aa64d27d4fa92e266b9488500461da193c24 -stew;https://github.com/status-im/nim-stew@#2f9c61f485e1de6d7e163294008276c455d39da2 +stew;https://github.com/status-im/nim-stew@#4e223b95a70d612d71569a308b35dbb82d961c5a testutils;https://github.com/status-im/nim-testutils@#aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2 unittest2;https://github.com/status-im/nim-unittest2@#4e2893eacb916c7678fdc4935ff7420f13bf3a9c -websock;https://github.com/status-im/nim-websock@#c2aae352f7fad7a8d333327c37e966969d3ee542 -zlib;https://github.com/status-im/nim-zlib@#d4e716d071eba1b5e0ffdf7949d983959e2b95dd +websock;https://github.com/status-im/nim-websock@#73edde4417f7b45003113b7a34212c3ccd95b9fd +zlib;https://github.com/status-im/nim-zlib@#d4e716d071eba1b5e0ffdf7949d983959e2b95dd \ No newline at end of file From e72d03bc78d3bc896ae5912ab45e2ecd53849aa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C8=98tefan=20Talpalaru?= Date: Mon, 10 Jan 2022 12:29:52 +0100 Subject: [PATCH 05/16] CI: test with multiple Nim versions by default (#684) * CI: test with 1.2 & devel by default * Skip buggy windows websocket test --- .github/workflows/ci.yml | 150 +++++++++++++++----------------- .github/workflows/multi_nim.yml | 145 ++++++++++++++++-------------- libp2p.nimble | 5 +- tests/commontransport.nim | 4 + tests/testswitch.nim | 4 + 5 files changed, 161 insertions(+), 147 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c6d0bcc..a02312e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,78 +9,49 @@ on: jobs: build: + timeout-minutes: 90 strategy: fail-fast: false - max-parallel: 20 matrix: target: - # Unit tests - os: linux cpu: amd64 - os: linux cpu: i386 - os: macos cpu: amd64 - - os: windows - cpu: i386 - os: windows cpu: amd64 + #- os: windows + #cpu: i386 + branch: [version-1-2, devel] include: - target: os: linux builder: ubuntu-20.04 + shell: bash - target: os: macos builder: macos-10.15 + shell: bash - target: os: windows builder: windows-2019 + shell: msys2 {0} defaults: run: - shell: bash + shell: ${{ matrix.shell }} - name: '${{ matrix.target.os }}-${{ matrix.target.cpu }}' + name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})' runs-on: ${{ matrix.builder }} + continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }} steps: - - name: Checkout nim-libp2p + - name: Checkout uses: actions/checkout@v2 with: submodules: true - - name: Derive environment variables - run: | - if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then - ARCH=64 - PLATFORM=x64 - else - ARCH=32 - PLATFORM=x86 - fi - echo "ARCH=$ARCH" >> $GITHUB_ENV - echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV - - ncpu= - ext= - MAKE_CMD="make" - case '${{ runner.os }}' in - 'Linux') - ncpu=$(nproc) - ;; - 'macOS') - ncpu=$(sysctl -n hw.ncpu) - ;; - 'Windows') - ncpu=$NUMBER_OF_PROCESSORS - ext=.exe - MAKE_CMD="mingw32-make" - ;; - esac - [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1 - echo "ncpu=$ncpu" >> $GITHUB_ENV - echo "ext=$ext" >> $GITHUB_ENV - echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV - - name: Install build dependencies (Linux i386) if: runner.os == 'Linux' && matrix.target.cpu == 'i386' run: | @@ -101,68 +72,83 @@ jobs: chmod 755 external/bin/gcc external/bin/g++ echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH - - name: Restore MinGW-W64 (Windows) from cache - if: runner.os == 'Windows' - id: windows-mingw-cache - uses: actions/cache@v2 + - name: MSYS2 (Windows i386) + if: runner.os == 'Windows' && matrix.target.cpu == 'i386' + uses: msys2/setup-msys2@v2 with: - path: external/mingw-${{ matrix.target.cpu }} - key: 'mingw-${{ matrix.target.cpu }}' + path-type: inherit + msystem: MINGW32 + install: >- + base-devel + git + mingw-w64-i686-toolchain + + - name: MSYS2 (Windows amd64) + if: runner.os == 'Windows' && matrix.target.cpu == 'amd64' + uses: msys2/setup-msys2@v2 + with: + path-type: inherit + install: >- + base-devel + git + mingw-w64-x86_64-toolchain - name: Restore Nim DLLs dependencies (Windows) from cache if: runner.os == 'Windows' id: windows-dlls-cache uses: actions/cache@v2 with: - path: external/dlls-${{ matrix.target.cpu }} - key: 'dlls-${{ matrix.target.cpu }}' + path: external/dlls + key: 'dlls' - - name: Install MinGW64 dependency (Windows) - if: > - steps.windows-mingw-cache.outputs.cache-hit != 'true' && - runner.os == 'Windows' - run: | - mkdir -p external - curl -L "https://nim-lang.org/download/mingw$ARCH.7z" -o "external/mingw-${{ matrix.target.cpu }}.7z" - 7z x -y "external/mingw-${{ matrix.target.cpu }}.7z" -oexternal/ - mv external/mingw$ARCH external/mingw-${{ matrix.target.cpu }} - - - name: Install DLLs dependencies (Windows) + - name: Install DLL dependencies (Windows) if: > steps.windows-dlls-cache.outputs.cache-hit != 'true' && runner.os == 'Windows' run: | - mkdir -p external + mkdir external curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip - 7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }} + 7z x external/windeps.zip -oexternal/dlls - name: Path to cached dependencies (Windows) if: > runner.os == 'Windows' run: | - echo "${{ github.workspace }}/external/mingw-${{ matrix.target.cpu }}/bin" >> $GITHUB_PATH - echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH + echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH - - name: Get latest Nim commit hash - id: versions + - name: Derive environment variables run: | - getHash() { - git ls-remote "https://github.com/$1" "${2:-HEAD}" | cut -f 1 - } - nbsHash=$(getHash status-im/nimbus-build-system) - echo "::set-output name=nimbus_build_system::$nbsHash" + if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then + PLATFORM=x64 + else + PLATFORM=x86 + fi + echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV - - name: Restore prebuilt Nim from cache - id: nim-cache - uses: actions/cache@v2 - with: - path: NimBinaries - key: 'NimBinaries-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.nimbus_build_system }}' + ncpu= + MAKE_CMD="make" + case '${{ runner.os }}' in + 'Linux') + ncpu=$(nproc) + ;; + 'macOS') + ncpu=$(sysctl -n hw.ncpu) + ;; + 'Windows') + ncpu=$NUMBER_OF_PROCESSORS + MAKE_CMD="mingw32-make" + ;; + esac + [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1 + echo "ncpu=$ncpu" >> $GITHUB_ENV + echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV - - name: Build Nim and associated tools + - name: Build Nim and Nimble run: | curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh - env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} CC=gcc bash build_nim.sh nim csources dist/nimble NimBinaries + env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ matrix.branch }} \ + QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \ + bash build_nim.sh nim csources dist/nimble NimBinaries echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH - name: Setup Go @@ -174,8 +160,14 @@ jobs: run: | V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3 - - name: Run nim-libp2p tests + - name: Run tests run: | + if [[ "${{ matrix.target.os }}" == "windows" ]]; then + # https://github.com/status-im/nimbus-eth2/issues/3121 + export NIMFLAGS="-d:nimRawSetjmp" + fi + nim --version + nimble --version nimble install_pinned nimble test diff --git a/.github/workflows/multi_nim.yml b/.github/workflows/multi_nim.yml index 655ca7e..5d55afb 100644 --- a/.github/workflows/multi_nim.yml +++ b/.github/workflows/multi_nim.yml @@ -6,6 +6,7 @@ on: jobs: build: + timeout-minutes: 120 strategy: fail-fast: false matrix: @@ -16,69 +17,39 @@ jobs: cpu: i386 - os: macos cpu: amd64 - #- os: windows - #cpu: i386 - os: windows cpu: amd64 + #- os: windows + #cpu: i386 branch: [version-1-2, version-1-4, version-1-6, devel] include: - target: os: linux builder: ubuntu-20.04 + shell: bash - target: os: macos builder: macos-10.15 + shell: bash - target: os: windows builder: windows-2019 + shell: msys2 {0} defaults: run: - shell: bash + shell: ${{ matrix.shell }} name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})' runs-on: ${{ matrix.builder }} continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }} steps: - - name: Checkout nim-libp2p + - name: Checkout uses: actions/checkout@v2 with: - ref: master + ref: unstable submodules: true - - name: Derive environment variables - run: | - if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then - ARCH=64 - PLATFORM=x64 - else - ARCH=32 - PLATFORM=x86 - fi - echo "ARCH=$ARCH" >> $GITHUB_ENV - echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV - - ncpu= - ext= - MAKE_CMD="make" - case '${{ runner.os }}' in - 'Linux') - ncpu=$(nproc) - ;; - 'macOS') - ncpu=$(sysctl -n hw.ncpu) - ;; - 'Windows') - ncpu=$NUMBER_OF_PROCESSORS - ext=.exe - MAKE_CMD="mingw32-make" - ;; - esac - [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1 - echo "ncpu=$ncpu" >> $GITHUB_ENV - echo "ext=$ext" >> $GITHUB_ENV - echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV - - name: Install build dependencies (Linux i386) if: runner.os == 'Linux' && matrix.target.cpu == 'i386' run: | @@ -99,47 +70,76 @@ jobs: chmod 755 external/bin/gcc external/bin/g++ echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH - - name: Restore MinGW-W64 (Windows) from cache - if: runner.os == 'Windows' - id: windows-mingw-cache - uses: actions/cache@v2 + - name: MSYS2 (Windows i386) + if: runner.os == 'Windows' && matrix.target.cpu == 'i386' + uses: msys2/setup-msys2@v2 with: - path: external/mingw-${{ matrix.target.cpu }} - key: 'mingw-${{ matrix.target.cpu }}' + path-type: inherit + msystem: MINGW32 + install: >- + base-devel + git + mingw-w64-i686-toolchain + + - name: MSYS2 (Windows amd64) + if: runner.os == 'Windows' && matrix.target.cpu == 'amd64' + uses: msys2/setup-msys2@v2 + with: + path-type: inherit + install: >- + base-devel + git + mingw-w64-x86_64-toolchain - name: Restore Nim DLLs dependencies (Windows) from cache if: runner.os == 'Windows' id: windows-dlls-cache uses: actions/cache@v2 with: - path: external/dlls-${{ matrix.target.cpu }} - key: 'dlls-${{ matrix.target.cpu }}' + path: external/dlls + key: 'dlls' - - name: Install MinGW64 dependency (Windows) - if: > - steps.windows-mingw-cache.outputs.cache-hit != 'true' && - runner.os == 'Windows' - run: | - mkdir -p external - curl -L "https://nim-lang.org/download/mingw$ARCH.7z" -o "external/mingw-${{ matrix.target.cpu }}.7z" - 7z x -y "external/mingw-${{ matrix.target.cpu }}.7z" -oexternal/ - mv external/mingw$ARCH external/mingw-${{ matrix.target.cpu }} - - - name: Install DLLs dependencies (Windows) + - name: Install DLL dependencies (Windows) if: > steps.windows-dlls-cache.outputs.cache-hit != 'true' && runner.os == 'Windows' run: | - mkdir -p external + mkdir external curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip - 7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }} + 7z x external/windeps.zip -oexternal/dlls - name: Path to cached dependencies (Windows) if: > runner.os == 'Windows' run: | - echo "${{ github.workspace }}/external/mingw-${{ matrix.target.cpu }}/bin" >> $GITHUB_PATH - echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH + echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH + + - name: Derive environment variables + run: | + if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then + PLATFORM=x64 + else + PLATFORM=x86 + fi + echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV + + ncpu= + MAKE_CMD="make" + case '${{ runner.os }}' in + 'Linux') + ncpu=$(nproc) + ;; + 'macOS') + ncpu=$(sysctl -n hw.ncpu) + ;; + 'Windows') + ncpu=$NUMBER_OF_PROCESSORS + MAKE_CMD="mingw32-make" + ;; + esac + [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1 + echo "ncpu=$ncpu" >> $GITHUB_ENV + echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV - name: Build Nim and Nimble run: | @@ -149,12 +149,27 @@ jobs: bash build_nim.sh nim csources dist/nimble NimBinaries echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH - - name: Run nim-libp2p tests + - name: Setup Go + uses: actions/setup-go@v2 + with: + go-version: '^1.15.5' + + - name: Install p2pd run: | + V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3 + + - name: Run tests + run: | + if [[ "${{ matrix.target.os }}" == "windows" ]]; then + # https://github.com/status-im/nimbus-eth2/issues/3121 + export NIMFLAGS="-d:nimRawSetjmp" + fi + nim --version + nimble --version nimble install -y --depsOnly - nimble test_slim + nimble test if [[ "${{ matrix.branch }}" == "version-1-6" || "${{ matrix.branch }}" == "devel" ]]; then echo -e "\nTesting with '--gc:orc':\n" - export NIMFLAGS="--gc:orc" - nimble test_slim + export NIMFLAGS="${NIMFLAGS} --gc:orc" + nimble test fi diff --git a/libp2p.nimble b/libp2p.nimble index 22cfce8..a5483ac 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -27,9 +27,8 @@ const nimflags = proc runTest(filename: string, verify: bool = true, sign: bool = true, moreoptions: string = "") = - let env_nimflags = getEnv("NIMFLAGS") var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:libp2p_network_protocols_metrics " - excstr.add(" " & env_nimflags & " ") + excstr.add(" " & getEnv("NIMFLAGS") & " ") excstr.add(" " & nimflags & " ") excstr.add(" -d:libp2p_pubsub_sign=" & $sign) excstr.add(" -d:libp2p_pubsub_verify=" & $verify) @@ -91,7 +90,7 @@ task test, "Runs the test suite": exec "nimble testfilter" exec "nimble examples_build" -task test_slim, "Runs the test suite": +task test_slim, "Runs the (slimmed down) test suite": exec "nimble testnative" exec "nimble testpubsub_slim" exec "nimble testfilter" diff --git a/tests/commontransport.nim b/tests/commontransport.nim index 76aa2bb..532689e 100644 --- a/tests/commontransport.nim +++ b/tests/commontransport.nim @@ -137,6 +137,10 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = await transport1.stop() asyncTest "e2e should allow multiple local addresses": + when defined(windows): + # this randomly locks the Windows CI job + skip() + return let addrs = @[MultiAddress.init(ma).tryGet(), MultiAddress.init(ma).tryGet()] diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 8d21a0f..c24c9b8 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -841,6 +841,10 @@ suite "Switch": switch2.peerStore.protoBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.protocols.toHashSet() asyncTest "e2e should allow multiple local addresses": + when defined(windows): + # this randomly locks the Windows CI job + skip() + return proc handle(conn: Connection, proto: string) {.async, gcsafe.} = try: let msg = string.fromBytes(await conn.readLp(1024)) From 1a97d0a2f5f9b51478204346dbd2400e2ae71f51 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Fri, 14 Jan 2022 19:40:30 +0100 Subject: [PATCH 06/16] Validate pubsub subscriptions (#627) * Check topic before subscribing * Block subscribe to invalid topics --- libp2p/protocols/pubsub/pubsub.nim | 5 +++++ tests/pubsub/testgossipsub2.nim | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 4c119ca..30a30ed 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -452,6 +452,11 @@ proc subscribe*(p: PubSub, ## on every received message ## + # Check that this is an allowed topic + if p.subscriptionValidator != nil and p.subscriptionValidator(topic) == false: + warn "Trying to subscribe to a topic not passing validation!", topic + return + p.topics.withValue(topic, handlers) do: # Already subscribed, just adding another handler handlers[].add(handler) diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index 62562c3..6e9c891 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -147,6 +147,9 @@ suite "GossipSub": nodes[1].start(), )) + # We must subscribe before setting the validator + nodes[0].subscribe("foobar", handler) + var gossip = GossipSub(nodes[0]) let invalidDetected = newFuture[void]() gossip.subscriptionValidator = @@ -162,7 +165,6 @@ suite "GossipSub": await subscribeNodes(nodes) - nodes[0].subscribe("foobar", handler) nodes[1].subscribe("foobar", handler) await invalidDetected.wait(10.seconds) From c18830ad3370824192f62247ea433e583335701a Mon Sep 17 00:00:00 2001 From: Tanguy Date: Sat, 15 Jan 2022 12:47:41 +0100 Subject: [PATCH 07/16] Score correctly on mesh peer unsub (#644) * Score correctly on mesh peer unsub * remove from mesh before removing from gossipsub --- libp2p/protocols/pubsub/gossipsub.nim | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 2691180..d8753d4 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -186,16 +186,16 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) = if s[].len == 0: g.peersInIP.del(pubSubPeer.address.get()) - for t in toSeq(g.gossipsub.keys): - g.gossipsub.removePeer(t, pubSubPeer) - # also try to remove from explicit table here - g.explicit.removePeer(t, pubSubPeer) - for t in toSeq(g.mesh.keys): trace "pruning unsubscribing peer", pubSubPeer, score = pubSubPeer.score g.pruned(pubSubPeer, t) g.mesh.removePeer(t, pubSubPeer) + for t in toSeq(g.gossipsub.keys): + g.gossipsub.removePeer(t, pubSubPeer) + # also try to remove from explicit table here + g.explicit.removePeer(t, pubSubPeer) + for t in toSeq(g.fanout.keys): g.fanout.removePeer(t, pubSubPeer) @@ -237,9 +237,14 @@ proc handleSubscribe*(g: GossipSub, else: trace "peer unsubscribed from topic" + if g.mesh.hasPeer(topic, peer): + #against spec + g.mesh.removePeer(topic, peer) + g.pruned(peer, topic) + # unsubscribe remote peer from the topic g.gossipsub.removePeer(topic, peer) - g.mesh.removePeer(topic, peer) + g.fanout.removePeer(topic, peer) if peer.peerId in g.parameters.directPeers: g.explicit.removePeer(topic, peer) From 07da14a7a7de467b9d5cfe46eeaba18c8de7a7e0 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Fri, 28 Jan 2022 19:05:07 +0100 Subject: [PATCH 08/16] Fix websocket EOF reading exception (#689) --- libp2p/transports/wstransport.nim | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/libp2p/transports/wstransport.nim b/libp2p/transports/wstransport.nim index c0b1427..4f64442 100644 --- a/libp2p/transports/wstransport.nim +++ b/libp2p/transports/wstransport.nim @@ -49,11 +49,24 @@ proc new*(T: type WsStream, stream.initStream() return stream +template mapExceptions(body: untyped) = + try: + body + except AsyncStreamIncompleteError: + raise newLPStreamEOFError() + except AsyncStreamUseClosedError: + raise newLPStreamEOFError() + except WSClosedError: + raise newLPStreamEOFError() + except AsyncStreamLimitError: + raise newLPStreamLimitError() + method readOnce*( s: WsStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = - let res = await s.session.recv(pbytes, nbytes) + let res = mapExceptions(await s.session.recv(pbytes, nbytes)) + if res == 0 and s.session.readyState == ReadyState.Closed: raise newLPStreamEOFError() return res @@ -61,10 +74,7 @@ method readOnce*( method write*( s: WsStream, msg: seq[byte]): Future[void] {.async.} = - try: - await s.session.send(msg, Opcode.Binary) - except WSClosedError: - raise newLPStreamEOFError() + mapExceptions(await s.session.send(msg, Opcode.Binary)) method closeImpl*(s: WsStream): Future[void] {.async.} = await s.session.close() From 00e1f9342feef6374ef6b0010a54ca3796cdc230 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Tue, 1 Feb 2022 18:35:48 +0100 Subject: [PATCH 09/16] Fix identify log for json_sink (#690) --- libp2p/protocols/identify.nim | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 28ae801..dfa0813 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -9,7 +9,7 @@ {.push raises: [Defect].} -import options +import std/[sequtils, options, strutils] import chronos, chronicles import ../protobuf/minprotobuf, ../peerinfo, @@ -107,11 +107,14 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] = iinfo.protoVersion = some(protoVersion) if r6.get(): iinfo.agentVersion = some(agentVersion) - debug "decodeMsg: decoded message", pubkey = ($pubkey).shortLog, - addresses = $iinfo.addrs, protocols = $iinfo.protos, - observable_address = $iinfo.observedAddr, - proto_version = $iinfo.protoVersion, - agent_version = $iinfo.agentVersion + debug "decodeMsg: decoded identify", pubkey = ($pubkey).shortLog, + addresses = iinfo.addrs.mapIt($it).join(","), + protocols = iinfo.protos.mapIt($it).join(","), + observable_address = + if iinfo.observedAddr.isSome(): $iinfo.observedAddr.get() + else: "None", + proto_version = iinfo.protoVersion.get("None"), + agent_version = iinfo.agentVersion.get("None") some(iinfo) else: trace "decodeMsg: failed to decode received message" From 9a7e3bda3c2dd6bcdc3468696192fb01b251be2b Mon Sep 17 00:00:00 2001 From: Tanguy Date: Thu, 10 Feb 2022 14:21:12 +0100 Subject: [PATCH 10/16] Bump dependencies (#694) --- .pinned | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/.pinned b/.pinned index 9c1dfb9..de01465 100644 --- a/.pinned +++ b/.pinned @@ -1,17 +1,17 @@ asynctest;https://github.com/markspanbroek/asynctest@#3882ed64ed3159578f796bc5ae0c6b13837fe798 bearssl;https://github.com/status-im/nim-bearssl@#ba80e2a0d7ae8aab666cee013e38ff8d33a3e5e7 chronicles;https://github.com/status-im/nim-chronicles@#2a2681b60289aaf7895b7056f22616081eb1a882 -chronos;https://github.com/status-im/nim-chronos@#17fed89c99beac5a92d3668d0d3e9b0e4ac13936 +chronos;https://github.com/status-im/nim-chronos@#87197230779002a2bfa8642f0e2ae07e2349e304 dnsclient;https://github.com/ba0f3/dnsclient.nim@#fbb76f8af8a33ab818184a7d4406d9fee20993be -faststreams;https://github.com/status-im/nim-faststreams@#c653d05f277dca0f374732c5b9b80f2368faea33 -httputils;https://github.com/status-im/nim-http-utils@#ad6f82e4b7d504ea9b8222c9c01aa1d24ebd4dc8 -json_serialization;https://github.com/status-im/nim-json-serialization@#010aa238cf6afddf1fbe4cbcd27ab3be3f443841 -metrics;https://github.com/status-im/nim-metrics@#2c0c486c65f980e8387f86bed0b43d53161c8286 +faststreams;https://github.com/status-im/nim-faststreams@#37a183153c071539ab870f427c09a1376ba311b9 +httputils;https://github.com/status-im/nim-http-utils@#40048e8b3e69284bdb5d4daa0a16ad93402c55db +json_serialization;https://github.com/status-im/nim-json-serialization@#4b8f487d2dfdd941df7408ceaa70b174cce02180 +metrics;https://github.com/status-im/nim-metrics@#71e0f0e354e1f4c59e3dc92153989c8b723c3440 nimcrypto;https://github.com/cheatfate/nimcrypto@#a5742a9a214ac33f91615f3862c7b099aec43b00 -secp256k1;https://github.com/status-im/nim-secp256k1@#153ec71a3416c1043846c9462587f74b73b1e5a8 -serialization;https://github.com/status-im/nim-serialization@#11a8aa64d27d4fa92e266b9488500461da193c24 -stew;https://github.com/status-im/nim-stew@#4e223b95a70d612d71569a308b35dbb82d961c5a +secp256k1;https://github.com/status-im/nim-secp256k1@#e092373a5cbe1fa25abfc62e0f2a5f138dc3fb13 +serialization;https://github.com/status-im/nim-serialization@#37bc0db558d85711967acb16e9bb822b06911d46 +stew;https://github.com/status-im/nim-stew@#bb705bf17b46d2c8f9bfb106d9cc7437009a2501 testutils;https://github.com/status-im/nim-testutils@#aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2 unittest2;https://github.com/status-im/nim-unittest2@#4e2893eacb916c7678fdc4935ff7420f13bf3a9c websock;https://github.com/status-im/nim-websock@#73edde4417f7b45003113b7a34212c3ccd95b9fd -zlib;https://github.com/status-im/nim-zlib@#d4e716d071eba1b5e0ffdf7949d983959e2b95dd \ No newline at end of file +zlib;https://github.com/status-im/nim-zlib@#74cdeb54b21bededb5a515d36f608bc1850555a2 \ No newline at end of file From 3b718baa97932975fdf860b1010c95bbd137ef73 Mon Sep 17 00:00:00 2001 From: Eric Mastro Date: Tue, 22 Feb 2022 02:04:17 +1100 Subject: [PATCH 11/16] feat: allow msgIdProvider to fail (#688) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: allow msgIdProvider to fail Closes: #642. Changes the return type of the msgIdProvider to `Result[MessageID, string]` so that message id generation can fail. String error type was chosen as this `msgIdProvider` mainly because the failed message id generation drops the message and logs the error provided. Because `msgIdProvider` can be externally provided by library consumers, an enum didn’t make sense and a object seemed to be overkill. Exceptions could have been used as well, however, in this case, Result ergonomics were warranted and prevented wrapping quite a large block of code in try/except. The `defaultMsgIdProvider` function previously allowed message id generation to fail silently for use in the tests: when seqno or source peerid were not valid, the message id generated was based on a hash of the message data and topic ids. The silent failing was moved to the `defaultMsgIdProvider` used only in the tests so that it could not fail silently in applications. Unit tests were added for the `defaultMsgIdProvider`. * Change MsgIdProvider error type to ValidationResult --- libp2p/protocols/pubsub/errors.nim | 6 +++ libp2p/protocols/pubsub/floodsub.nim | 18 +++++++- libp2p/protocols/pubsub/gossipsub.nim | 20 ++++++++- libp2p/protocols/pubsub/pubsub.nim | 8 ++-- libp2p/protocols/pubsub/rpc/message.nim | 21 ++++------ tests/pubsub/testfloodsub.nim | 1 + tests/pubsub/testgossipinternal.nim | 10 +++-- tests/pubsub/testgossipsub.nim | 1 + tests/pubsub/testmcache.nim | 22 +++++----- tests/pubsub/testmessage.nim | 55 +++++++++++++++++++++++++ tests/pubsub/utils.nim | 19 +++++++-- 11 files changed, 143 insertions(+), 38 deletions(-) create mode 100644 libp2p/protocols/pubsub/errors.nim diff --git a/libp2p/protocols/pubsub/errors.nim b/libp2p/protocols/pubsub/errors.nim new file mode 100644 index 0000000..cfb2ccc --- /dev/null +++ b/libp2p/protocols/pubsub/errors.nim @@ -0,0 +1,6 @@ +# this module will be further extended in PR +# https://github.com/status-im/nim-libp2p/pull/107/ + +type + ValidationResult* {.pure.} = enum + Accept, Reject, Ignore diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index eda35ee..54d6f28 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -96,7 +96,14 @@ method rpcHandler*(f: FloodSub, f.handleSubscribe(peer, sub.topic, sub.subscribe) for msg in rpcMsg.messages: # for every message - let msgId = f.msgIdProvider(msg) + let msgIdResult = f.msgIdProvider(msg) + if msgIdResult.isErr: + debug "Dropping message due to failed message id generation", + error = msgIdResult.error + # TODO: descore peers due to error during message validation (malicious?) + continue + + let msgId = msgIdResult.get if f.addSeen(msgId): trace "Dropping already-seen message", msgId, peer @@ -184,7 +191,14 @@ method publish*(f: FloodSub, Message.init(none(PeerInfo), data, topic, none(uint64), false) else: Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign) - msgId = f.msgIdProvider(msg) + msgIdResult = f.msgIdProvider(msg) + + if msgIdResult.isErr: + trace "Error generating message id, skipping publish", + error = msgIdResult.error + return 0 + + let msgId = msgIdResult.get trace "Created new message", msg = shortLog(msg), peers = peers.len, topic, msgId diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index d8753d4..9a2a574 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -362,8 +362,16 @@ method rpcHandler*(g: GossipSub, for i in 0.. 0 and m.fromPeer.data.len > 0: - byteutils.toHex(m.seqno) & $m.fromPeer - else: - # This part is irrelevant because it's not standard, - # We use it exclusively for testing basically and users should - # implement their own logic in the case they use anonymization - $m.data.hash & $m.topicIDs.hash - mid.toBytes() +func defaultMsgIdProvider*(m: Message): Result[MessageID, ValidationResult] = + if m.seqno.len > 0 and m.fromPeer.data.len > 0: + let mid = byteutils.toHex(m.seqno) & $m.fromPeer + ok mid.toBytes() + else: + err ValidationResult.Reject proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] = ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes()) diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 0ef5080..38d00d9 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -20,6 +20,7 @@ import utils, protocols/pubsub/floodsub, protocols/pubsub/rpc/messages, protocols/pubsub/peertable] +import ../../libp2p/protocols/pubsub/errors as pubsub_errors import ../helpers diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index d1ff8e9..ce4f793 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -39,6 +39,8 @@ proc randomPeerId(): PeerId = except CatchableError as exc: raise newException(Defect, exc.msg) +const MsgIdFail = "msg id gen failure" + suite "GossipSub internal": teardown: checkTrackers() @@ -308,7 +310,7 @@ suite "GossipSub internal": conn.peerId = peerId inc seqno let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) - gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdFail), msg) check gossipSub.fanout[topic].len == 15 check gossipSub.mesh[topic].len == 15 @@ -355,7 +357,7 @@ suite "GossipSub internal": conn.peerId = peerId inc seqno let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) - gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdFail), msg) let peers = gossipSub.getGossipPeers() check peers.len == gossipSub.parameters.d @@ -396,7 +398,7 @@ suite "GossipSub internal": conn.peerId = peerId inc seqno let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) - gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdFail), msg) let peers = gossipSub.getGossipPeers() check peers.len == gossipSub.parameters.d @@ -437,7 +439,7 @@ suite "GossipSub internal": conn.peerId = peerId inc seqno let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno)) - gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdFail), msg) let peers = gossipSub.getGossipPeers() check peers.len == 0 diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index a1332ff..11eeda6 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -24,6 +24,7 @@ import utils, ../../libp2p/[errors, protocols/pubsub/peertable, protocols/pubsub/timedcache, protocols/pubsub/rpc/messages] +import ../../libp2p/protocols/pubsub/errors as pubsub_errors import ../helpers proc `$`(peer: PubSubPeer): string = shortLog(peer) diff --git a/tests/pubsub/testmcache.nim b/tests/pubsub/testmcache.nim index 4b7b7ce..7ceab81 100644 --- a/tests/pubsub/testmcache.nim +++ b/tests/pubsub/testmcache.nim @@ -5,19 +5,21 @@ import stew/byteutils import ../../libp2p/[peerid, crypto/crypto, protocols/pubsub/mcache, - protocols/pubsub/rpc/message, protocols/pubsub/rpc/messages] +import ./utils var rng = newRng() proc randomPeerId(): PeerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).get() +const MsgIdGenFail = "msg id gen failure" + suite "MCache": test "put/get": var mCache = MCache.init(3, 5) var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes()) - let msgId = defaultMsgIdProvider(msg) + let msgId = defaultMsgIdProvider(msg).expect(MsgIdGenFail) mCache.put(msgId, msg) check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg @@ -28,13 +30,13 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["foo"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) for i in 0..<5: var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["bar"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) var mids = mCache.window("foo") check mids.len == 3 @@ -49,7 +51,7 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["foo"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) mCache.shift() check mCache.window("foo").len == 0 @@ -58,7 +60,7 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["bar"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) mCache.shift() check mCache.window("bar").len == 0 @@ -67,7 +69,7 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["baz"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) mCache.shift() check mCache.window("baz").len == 0 @@ -79,19 +81,19 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["foo"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) for i in 0..<3: var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["bar"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) for i in 0..<3: var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["baz"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) mCache.shift() check mCache.window("foo").len == 0 diff --git a/tests/pubsub/testmessage.nim b/tests/pubsub/testmessage.nim index d555d89..19e992c 100644 --- a/tests/pubsub/testmessage.nim +++ b/tests/pubsub/testmessage.nim @@ -3,8 +3,10 @@ import unittest2 {.used.} import options +import stew/byteutils import ../../libp2p/[peerid, peerinfo, crypto/crypto, + protocols/pubsub/errors, protocols/pubsub/rpc/message, protocols/pubsub/rpc/messages] @@ -18,3 +20,56 @@ suite "Message": msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true) check verify(msg) + + test "defaultMsgIdProvider success": + let + seqno = 11'u64 + pkHex = + """08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C + E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F + E731065A""" + seckey = PrivateKey.init(fromHex(stripSpaces(pkHex))) + .expect("invalid private key bytes") + peer = PeerInfo.new(seckey) + msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true) + msgIdResult = msg.defaultMsgIdProvider() + + check: + msgIdResult.isOk + string.fromBytes(msgIdResult.get) == + "000000000000000b12D3KooWGyLzSt9g4U9TdHYDvVWAs5Ht4WrocgoyqPxxvnqAL8qw" + + test "defaultMsgIdProvider error - no source peer id": + let + seqno = 11'u64 + pkHex = + """08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C + E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F + E731065A""" + seckey = PrivateKey.init(fromHex(stripSpaces(pkHex))) + .expect("invalid private key bytes") + peer = PeerInfo.new(seckey) + + var msg = Message.init(peer.some, @[], "topic", some(seqno), sign = true) + msg.fromPeer = PeerId() + let msgIdResult = msg.defaultMsgIdProvider() + + check: + msgIdResult.isErr + msgIdResult.error == ValidationResult.Reject + + test "defaultMsgIdProvider error - no source seqno": + let + pkHex = + """08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C + E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F + E731065A""" + seckey = PrivateKey.init(fromHex(stripSpaces(pkHex))) + .expect("invalid private key bytes") + peer = PeerInfo.new(seckey) + msg = Message.init(some(peer), @[], "topic", uint64.none, sign = true) + msgIdResult = msg.defaultMsgIdProvider() + + check: + msgIdResult.isErr + msgIdResult.error == ValidationResult.Reject diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 50644de..f203f74 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -4,24 +4,37 @@ const libp2p_pubsub_verify {.booldefine.} = true libp2p_pubsub_anonymize {.booldefine.} = false -import random, tables -import chronos +import hashes, random, tables +import chronos, stew/[byteutils, results] import ../../libp2p/[builders, + protocols/pubsub/errors, protocols/pubsub/pubsub, protocols/pubsub/gossipsub, protocols/pubsub/floodsub, + protocols/pubsub/rpc/messages, protocols/secure/secure] export builders randomize() +func defaultMsgIdProvider*(m: Message): Result[MessageID, ValidationResult] = + let mid = + if m.seqno.len > 0 and m.fromPeer.data.len > 0: + byteutils.toHex(m.seqno) & $m.fromPeer + else: + # This part is irrelevant because it's not standard, + # We use it exclusively for testing basically and users should + # implement their own logic in the case they use anonymization + $m.data.hash & $m.topicIDs.hash + ok mid.toBytes() + proc generateNodes*( num: Natural, secureManagers: openArray[SecureProtocol] = [ SecureProtocol.Noise ], - msgIdProvider: MsgIdProvider = nil, + msgIdProvider: MsgIdProvider = defaultMsgIdProvider, gossip: bool = false, triggerSelf: bool = false, verifySignature: bool = libp2p_pubsub_verify, From bc318084f4e5c3362fcfefdc211f69e61d08eb3d Mon Sep 17 00:00:00 2001 From: Tanguy Date: Mon, 21 Feb 2022 16:22:08 +0100 Subject: [PATCH 12/16] GS: Publish to fanout when mesh unhealthy (#638) * Send to fanout when mesh unhealthy * don't use fanout when floodPublish --- libp2p/protocols/pubsub/gossipsub.nim | 22 +++++-- .../protocols/pubsub/gossipsub/behavior.nim | 2 + tests/pubsub/testgossipsub.nim | 65 +++++++++++++++++++ 3 files changed, 83 insertions(+), 6 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 9a2a574..010be4f 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -482,13 +482,23 @@ method publish*(g: GossipSub, if topic in g.topics: # if we're subscribed use the mesh peers.incl(g.mesh.getOrDefault(topic)) - else: # not subscribed, send to fanout peers - # try optimistically - peers.incl(g.fanout.getOrDefault(topic)) - if peers.len == 0: - # ok we had nothing.. let's try replenish inline + + if peers.len < g.parameters.dLow and g.parameters.floodPublish == false: + # not subscribed or bad mesh, send to fanout peers + # disable for floodPublish, since we already sent to every good peer + # + var fanoutPeers = g.fanout.getOrDefault(topic).toSeq() + if fanoutPeers.len == 0: g.replenishFanout(topic) - peers.incl(g.fanout.getOrDefault(topic)) + fanoutPeers = g.fanout.getOrDefault(topic).toSeq() + + fanoutPeers.shuffle() + if fanoutPeers.len + peers.len > g.parameters.d: + fanoutPeers.setLen(g.parameters.d - peers.len) + + for fanPeer in fanoutPeers: + peers.incl(fanPeer) + if peers.len > g.parameters.d: break # even if we couldn't publish, # we still attempted to publish diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 9bb2a94..e545846 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -489,9 +489,11 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [Defect].} = logScope: topic trace "about to replenish fanout" + let currentMesh = g.mesh.getOrDefault(topic) if g.fanout.peers(topic) < g.parameters.dLow: trace "replenishing fanout", peers = g.fanout.peers(topic) for peer in g.gossipsub.getOrDefault(topic): + if peer in currentMesh: continue if g.fanout.addPeer(topic, peer): if g.fanout.peers(topic) == g.parameters.d: break diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 11eeda6..0a67114 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -565,6 +565,71 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) check observed == 2 + asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic": + var passed = newFuture[void]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + passed.complete() + + let + nodes = generateNodes( + 2, + gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + await subscribeNodes(nodes) + + nodes[1].subscribe("foobar", handler) + nodes[0].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") + await waitSub(nodes[1], nodes[0], "foobar") + + nodes[0].unsubscribe("foobar", handler) + + let gsNode = GossipSub(nodes[1]) + check await checkExpiring(gsNode.mesh.getOrDefault("foobar").len == 0) + + nodes[0].subscribe("foobar", handler) + + check GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0 + + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 + + check: + GossipSub(nodes[0]).fanout.getOrDefault("foobar").len > 0 + GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0 + + await passed.wait(2.seconds) + + trace "test done, stopping..." + + await nodes[0].stop() + await nodes[1].stop() + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - GossipSub send over mesh A -> B": var passed: Future[bool] = newFuture[bool]() proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = From fd59cbc7a9b75ed84adc82705808edb12cb31912 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Mon, 21 Feb 2022 17:00:18 +0100 Subject: [PATCH 13/16] Fix shuffle of #638 --- libp2p/protocols/pubsub/gossipsub.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 010be4f..8261ac5 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -492,7 +492,7 @@ method publish*(g: GossipSub, g.replenishFanout(topic) fanoutPeers = g.fanout.getOrDefault(topic).toSeq() - fanoutPeers.shuffle() + g.rng.shuffle(fanoutPeers) if fanoutPeers.len + peers.len > g.parameters.d: fanoutPeers.setLen(g.parameters.d - peers.len) From f98bf612bdb574942311e9977711368d7666dc3a Mon Sep 17 00:00:00 2001 From: Tanguy Date: Mon, 21 Feb 2022 18:14:43 +0100 Subject: [PATCH 14/16] Fix tests of #638 --- tests/pubsub/testgossipsub.nim | 3 ++- tests/pubsub/utils.nim | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 0a67114..5986d47 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -574,7 +574,8 @@ suite "GossipSub": let nodes = generateNodes( 2, - gossip = true) + gossip = true, + unsubscribeBackoff = 10.minutes) # start switches nodesFut = await allFinished( diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index f203f74..846c8bb 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -40,6 +40,7 @@ proc generateNodes*( verifySignature: bool = libp2p_pubsub_verify, anonymize: bool = libp2p_pubsub_anonymize, sign: bool = libp2p_pubsub_sign, + unsubscribeBackoff = 1.seconds, maxMessageSize: int = 1024 * 1024): seq[PubSub] = for i in 0.. Date: Thu, 24 Feb 2022 17:31:47 +0100 Subject: [PATCH 15/16] Allow force dial (#696) --- libp2p/connmanager.nim | 7 +++-- libp2p/dial.nim | 9 ++++-- libp2p/dialer.nim | 21 ++++++++------ libp2p/switch.nim | 10 ++++--- libp2p/utils/semaphore.nim | 15 +++++++--- tests/testconnmngr.nim | 30 ++++++++++++++++++++ tests/testsemaphore.nim | 56 +++++++++++++++++++++++++++++++++++--- 7 files changed, 123 insertions(+), 25 deletions(-) diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 3d769e5..f56c7b7 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -452,7 +452,8 @@ proc trackIncomingConn*(c: ConnManager, raise exc proc trackOutgoingConn*(c: ConnManager, - provider: ConnProvider): + provider: ConnProvider, + forceDial = false): Future[Connection] {.async.} = ## try acquiring a connection if all slots ## are already taken, raise TooManyConnectionsError @@ -462,7 +463,9 @@ proc trackOutgoingConn*(c: ConnManager, trace "Tracking outgoing connection", count = c.outSema.count, max = c.outSema.size - if not c.outSema.tryAcquire(): + if forceDial: + c.outSema.forceAcquire() + elif not c.outSema.tryAcquire(): trace "Too many outgoing connections!", count = c.outSema.count, max = c.outSema.size raise newTooManyConnectionsError() diff --git a/libp2p/dial.nim b/libp2p/dial.nim index 7eb3a46..ea51270 100644 --- a/libp2p/dial.nim +++ b/libp2p/dial.nim @@ -19,7 +19,8 @@ type method connect*( self: Dial, peerId: PeerId, - addrs: seq[MultiAddress]) {.async, base.} = + addrs: seq[MultiAddress], + forceDial = false) {.async, base.} = ## connect remote peer without negotiating ## a protocol ## @@ -29,7 +30,8 @@ method connect*( method dial*( self: Dial, peerId: PeerId, - protos: seq[string]): Future[Connection] {.async, base.} = + protos: seq[string], + ): Future[Connection] {.async, base.} = ## create a protocol stream over an ## existing connection ## @@ -40,7 +42,8 @@ method dial*( self: Dial, peerId: PeerId, addrs: seq[MultiAddress], - protos: seq[string]): Future[Connection] {.async, base.} = + protos: seq[string], + forceDial = false): Future[Connection] {.async, base.} = ## create a protocol stream and establish ## a connection if one doesn't exist already ## diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 3d02379..65cc1d6 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -47,7 +47,8 @@ type proc dialAndUpgrade( self: Dialer, peerId: PeerId, - addrs: seq[MultiAddress]): + addrs: seq[MultiAddress], + forceDial: bool): Future[Connection] {.async.} = debug "Dialing peer", peerId @@ -72,7 +73,8 @@ proc dialAndUpgrade( transportCopy = transport addressCopy = a await self.connManager.trackOutgoingConn( - () => transportCopy.dial(hostname, addressCopy) + () => transportCopy.dial(hostname, addressCopy), + forceDial ) except TooManyConnectionsError as exc: trace "Connection limit reached!" @@ -112,7 +114,8 @@ proc dialAndUpgrade( proc internalConnect( self: Dialer, peerId: PeerId, - addrs: seq[MultiAddress]): + addrs: seq[MultiAddress], + forceDial: bool): Future[Connection] {.async.} = if self.localPeerId == peerId: raise newException(CatchableError, "can't dial self!") @@ -136,7 +139,7 @@ proc internalConnect( trace "Reusing existing connection", conn, direction = $conn.dir return conn - conn = await self.dialAndUpgrade(peerId, addrs) + conn = await self.dialAndUpgrade(peerId, addrs, forceDial) if isNil(conn): # None of the addresses connected raise newException(DialFailedError, "Unable to establish outgoing link") @@ -159,7 +162,8 @@ proc internalConnect( method connect*( self: Dialer, peerId: PeerId, - addrs: seq[MultiAddress]) {.async.} = + addrs: seq[MultiAddress], + forceDial = false) {.async.} = ## connect remote peer without negotiating ## a protocol ## @@ -167,7 +171,7 @@ method connect*( if self.connManager.connCount(peerId) > 0: return - discard await self.internalConnect(peerId, addrs) + discard await self.internalConnect(peerId, addrs, forceDial) proc negotiateStream( self: Dialer, @@ -200,7 +204,8 @@ method dial*( self: Dialer, peerId: PeerId, addrs: seq[MultiAddress], - protos: seq[string]): Future[Connection] {.async.} = + protos: seq[string], + forceDial = false): Future[Connection] {.async.} = ## create a protocol stream and establish ## a connection if one doesn't exist already ## @@ -218,7 +223,7 @@ method dial*( try: trace "Dialing (new)", peerId, protos - conn = await self.internalConnect(peerId, addrs) + conn = await self.internalConnect(peerId, addrs, forceDial) trace "Opening stream", conn stream = await self.connManager.getStream(conn) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index c7a0a5b..da68e86 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -99,8 +99,9 @@ proc disconnect*(s: Switch, peerId: PeerId): Future[void] {.gcsafe.} = method connect*( s: Switch, peerId: PeerId, - addrs: seq[MultiAddress]): Future[void] = - s.dialer.connect(peerId, addrs) + addrs: seq[MultiAddress], + forceDial = false): Future[void] = + s.dialer.connect(peerId, addrs, forceDial) method dial*( s: Switch, @@ -117,8 +118,9 @@ method dial*( s: Switch, peerId: PeerId, addrs: seq[MultiAddress], - protos: seq[string]): Future[Connection] = - s.dialer.dial(peerId, addrs, protos) + protos: seq[string], + forceDial = false): Future[Connection] = + s.dialer.dial(peerId, addrs, protos, forceDial) proc dial*( s: Switch, diff --git a/libp2p/utils/semaphore.nim b/libp2p/utils/semaphore.nim index 8ded05e..e396f27 100644 --- a/libp2p/utils/semaphore.nim +++ b/libp2p/utils/semaphore.nim @@ -54,16 +54,21 @@ proc acquire*(s: AsyncSemaphore): Future[void] = fut.cancelCallback = nil if not fut.finished: s.queue.keepItIf( it != fut ) - s.count.inc fut.cancelCallback = cancellation s.queue.add(fut) - s.count.dec trace "Queued slot", available = s.count, queue = s.queue.len return fut +proc forceAcquire*(s: AsyncSemaphore) = + ## ForceAcquire will always succeed, + ## creating a temporary slot if required. + ## This temporary slot will stay usable until + ## there is less `acquire`s than `release`s + s.count.dec + proc release*(s: AsyncSemaphore) = ## Release a resource from the semaphore, ## by picking the first future from the queue @@ -77,13 +82,15 @@ proc release*(s: AsyncSemaphore) = trace "Releasing slot", available = s.count, queue = s.queue.len - if s.queue.len > 0: + s.count.inc + while s.queue.len > 0: var fut = s.queue[0] s.queue.delete(0) if not fut.finished(): + s.count.dec fut.complete() + break - s.count.inc # increment the resource count trace "Released slot", available = s.count, queue = s.queue.len return diff --git a/tests/testconnmngr.nim b/tests/testconnmngr.nim index 8119265..6aaa4ed 100644 --- a/tests/testconnmngr.nim +++ b/tests/testconnmngr.nim @@ -463,3 +463,33 @@ suite "Connection Manager": await connMngr.close() await allFuturesThrowing( allFutures(conns.mapIt( it.close() ))) + + asyncTest "allow force dial": + let connMngr = ConnManager.new(maxConnections = 2) + + var conns: seq[Connection] + for i in 0..<3: + let conn = connMngr.trackOutgoingConn( + (proc(): Future[Connection] {.async.} = + return Connection.new( + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), + Direction.In) + ), true + ) + + check await conn.withTimeout(10.millis) + conns.add(await conn) + + # should throw adding a connection over the limit + expect TooManyConnectionsError: + discard await connMngr.trackOutgoingConn( + (proc(): Future[Connection] {.async.} = + return Connection.new( + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), + Direction.In) + ), false + ) + + await connMngr.close() + await allFuturesThrowing( + allFutures(conns.mapIt( it.close() ))) diff --git a/tests/testsemaphore.nim b/tests/testsemaphore.nim index c7a26d8..09a7d29 100644 --- a/tests/testsemaphore.nim +++ b/tests/testsemaphore.nim @@ -36,7 +36,7 @@ suite "AsyncSemaphore": await sema.acquire() let fut = sema.acquire() - check sema.count == -1 + check sema.count == 0 sema.release() sema.release() check sema.count == 1 @@ -66,7 +66,7 @@ suite "AsyncSemaphore": let fut = sema.acquire() check fut.finished == false - check sema.count == -1 + check sema.count == 0 sema.release() sema.release() @@ -104,12 +104,20 @@ suite "AsyncSemaphore": await sema.acquire() - let tmp = sema.acquire() - check not tmp.finished() + let + tmp = sema.acquire() + tmp2 = sema.acquire() + check: + not tmp.finished() + not tmp2.finished() tmp.cancel() sema.release() + check tmp2.finished() + + sema.release() + check await sema.acquire().withTimeout(10.millis) asyncTest "should handle out of order cancellations": @@ -145,3 +153,43 @@ suite "AsyncSemaphore": sema.release() check await sema.acquire().withTimeout(10.millis) + + asyncTest "should handle forceAcquire properly": + let sema = newAsyncSemaphore(1) + + await sema.acquire() + check not(await sema.acquire().withTimeout(1.millis)) # should not acquire but cancel + + let + fut1 = sema.acquire() + fut2 = sema.acquire() + + sema.forceAcquire() + sema.release() + + await fut1 or fut2 or sleepAsync(1.millis) + check: + fut1.finished() + not fut2.finished() + + sema.release() + await fut1 or fut2 or sleepAsync(1.millis) + check: + fut1.finished() + fut2.finished() + + + sema.forceAcquire() + sema.forceAcquire() + + let + fut3 = sema.acquire() + fut4 = sema.acquire() + fut5 = sema.acquire() + sema.release() + sema.release() + await sleepAsync(1.millis) + check: + fut3.finished() + fut4.finished() + not fut5.finished() From 44a7260f0754341a6ae9864d4ece5a177cb8fc2b Mon Sep 17 00:00:00 2001 From: Eric Mastro Date: Fri, 25 Feb 2022 03:32:20 +1100 Subject: [PATCH 16/16] fixes from #688 (#697) * tests: invert message logic on expect from #688 * fix: export pubsub_errors for backward compatibility --- libp2p/protocols/pubsub/pubsub.nim | 1 + tests/pubsub/testgossipinternal.nim | 10 +++++----- tests/pubsub/testmcache.nim | 20 ++++++++++---------- tests/pubsub/testmessage.nim | 6 +++--- 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index eecab91..a79d15f 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -30,6 +30,7 @@ export results export PubSubPeer export PubSubObserver export protocol +export pubsub_errors logScope: topics = "libp2p pubsub" diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index ce4f793..6915793 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -39,7 +39,7 @@ proc randomPeerId(): PeerId = except CatchableError as exc: raise newException(Defect, exc.msg) -const MsgIdFail = "msg id gen failure" +const MsgIdSuccess = "msg id gen success" suite "GossipSub internal": teardown: @@ -310,7 +310,7 @@ suite "GossipSub internal": conn.peerId = peerId inc seqno let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) - gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdFail), msg) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg) check gossipSub.fanout[topic].len == 15 check gossipSub.mesh[topic].len == 15 @@ -357,7 +357,7 @@ suite "GossipSub internal": conn.peerId = peerId inc seqno let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) - gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdFail), msg) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg) let peers = gossipSub.getGossipPeers() check peers.len == gossipSub.parameters.d @@ -398,7 +398,7 @@ suite "GossipSub internal": conn.peerId = peerId inc seqno let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) - gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdFail), msg) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg) let peers = gossipSub.getGossipPeers() check peers.len == gossipSub.parameters.d @@ -439,7 +439,7 @@ suite "GossipSub internal": conn.peerId = peerId inc seqno let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno)) - gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdFail), msg) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg) let peers = gossipSub.getGossipPeers() check peers.len == 0 diff --git a/tests/pubsub/testmcache.nim b/tests/pubsub/testmcache.nim index 7ceab81..6aecb8a 100644 --- a/tests/pubsub/testmcache.nim +++ b/tests/pubsub/testmcache.nim @@ -13,13 +13,13 @@ var rng = newRng() proc randomPeerId(): PeerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).get() -const MsgIdGenFail = "msg id gen failure" +const MsgIdGenSuccess = "msg id generation success" suite "MCache": test "put/get": var mCache = MCache.init(3, 5) var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes()) - let msgId = defaultMsgIdProvider(msg).expect(MsgIdGenFail) + let msgId = defaultMsgIdProvider(msg).expect(MsgIdGenSuccess) mCache.put(msgId, msg) check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg @@ -30,13 +30,13 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["foo"]) - mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) for i in 0..<5: var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["bar"]) - mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) var mids = mCache.window("foo") check mids.len == 3 @@ -51,7 +51,7 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["foo"]) - mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) mCache.shift() check mCache.window("foo").len == 0 @@ -60,7 +60,7 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["bar"]) - mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) mCache.shift() check mCache.window("bar").len == 0 @@ -69,7 +69,7 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["baz"]) - mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) mCache.shift() check mCache.window("baz").len == 0 @@ -81,19 +81,19 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["foo"]) - mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) for i in 0..<3: var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["bar"]) - mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) for i in 0..<3: var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["baz"]) - mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) mCache.shift() check mCache.window("foo").len == 0 diff --git a/tests/pubsub/testmessage.nim b/tests/pubsub/testmessage.nim index 19e992c..7bc4b26 100644 --- a/tests/pubsub/testmessage.nim +++ b/tests/pubsub/testmessage.nim @@ -29,7 +29,7 @@ suite "Message": E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F E731065A""" seckey = PrivateKey.init(fromHex(stripSpaces(pkHex))) - .expect("invalid private key bytes") + .expect("valid private key bytes") peer = PeerInfo.new(seckey) msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true) msgIdResult = msg.defaultMsgIdProvider() @@ -47,7 +47,7 @@ suite "Message": E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F E731065A""" seckey = PrivateKey.init(fromHex(stripSpaces(pkHex))) - .expect("invalid private key bytes") + .expect("valid private key bytes") peer = PeerInfo.new(seckey) var msg = Message.init(peer.some, @[], "topic", some(seqno), sign = true) @@ -65,7 +65,7 @@ suite "Message": E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F E731065A""" seckey = PrivateKey.init(fromHex(stripSpaces(pkHex))) - .expect("invalid private key bytes") + .expect("valid private key bytes") peer = PeerInfo.new(seckey) msg = Message.init(some(peer), @[], "topic", uint64.none, sign = true) msgIdResult = msg.defaultMsgIdProvider()