diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 5989eb666..1cf376495 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -59,7 +59,6 @@ import ./test_waku_keepalive, ./test_waku_enr, ./test_waku_dnsdisc, - ./test_waku_discv5, ./test_relay_peer_exchange, ./test_waku_noise, ./test_waku_noise_sessions, diff --git a/tests/test_waku_discv5.nim b/tests/test_waku_discv5.nim deleted file mode 100644 index ebac6b923..000000000 --- a/tests/test_waku_discv5.nim +++ /dev/null @@ -1,462 +0,0 @@ -{.used.} - -import - std/[sequtils], - stew/results, - stew/shims/net, - chronos, - chronicles, - testutils/unittests, - libp2p/crypto/crypto as libp2p_keys, - eth/keys as eth_keys -import - ../../waku/waku_core/topics, - ../../waku/waku_enr, - ../../waku/waku_discv5, - ./testlib/common, - ./testlib/wakucore, - ./testlib/wakunode - - -proc newTestEnrRecord(privKey: libp2p_keys.PrivateKey, - extIp: string, tcpPort: uint16, udpPort: uint16, - flags = none(CapabilitiesBitfield)): waku_enr.Record = - var builder = EnrBuilder.init(privKey) - builder.withIpAddressAndPorts( - ipAddr = some(parseIpAddress(extIp)), - tcpPort = some(Port(tcpPort)), - udpPort = some(Port(udpPort)), - ) - - if flags.isSome(): - builder.withWakuCapabilities(flags.get()) - - builder.build().tryGet() - - -proc newTestDiscv5( - privKey: libp2p_keys.PrivateKey, - bindIp: string, tcpPort: uint16, udpPort: uint16, - record: waku_enr.Record, - bootstrapRecords = newSeq[waku_enr.Record](), - queue = newAsyncEventQueue[SubscriptionEvent](30), - ): WakuDiscoveryV5 = - let config = WakuDiscoveryV5Config( - privateKey: eth_keys.PrivateKey(privKey.skkey), - address: parseIpAddress(bindIp), - port: Port(udpPort), - bootstrapRecords: bootstrapRecords, - ) - - let discv5 = WakuDiscoveryV5.new( - rng = rng(), - conf = config, - record = some(record), - queue = queue, - ) - - return discv5 - - -procSuite "Waku Discovery v5": - asyncTest "find random peers": - ## Given - # Node 1 - let - privKey1 = generateSecp256k1Key() - bindIp1 = "0.0.0.0" - extIp1 = "127.0.0.1" - tcpPort1 = 61500u16 - udpPort1 = 9000u16 - - let record1 = newTestEnrRecord( - privKey = privKey1, - extIp = extIp1, - tcpPort = tcpPort1, - udpPort = udpPort1, - ) - let node1 = newTestDiscv5( - privKey = privKey1, - bindIp = bindIp1, - tcpPort = tcpPort1, - udpPort = udpPort1, - record = record1 - ) - - # Node 2 - let - privKey2 = generateSecp256k1Key() - bindIp2 = "0.0.0.0" - extIp2 = "127.0.0.1" - tcpPort2 = 61502u16 - udpPort2 = 9002u16 - - let record2 = newTestEnrRecord( - privKey = privKey2, - extIp = extIp2, - tcpPort = tcpPort2, - udpPort = udpPort2, - ) - - let node2 = newTestDiscv5( - privKey = privKey2, - bindIp = bindIp2, - tcpPort = tcpPort2, - udpPort = udpPort2, - record = record2, - ) - - # Node 3 - let - privKey3 = generateSecp256k1Key() - bindIp3 = "0.0.0.0" - extIp3 = "127.0.0.1" - tcpPort3 = 61504u16 - udpPort3 = 9004u16 - - let record3 = newTestEnrRecord( - privKey = privKey3, - extIp = extIp3, - tcpPort = tcpPort3, - udpPort = udpPort3, - ) - - let node3 = newTestDiscv5( - privKey = privKey3, - bindIp = bindIp3, - tcpPort = tcpPort3, - udpPort = udpPort3, - record = record3, - bootstrapRecords = @[record1, record2] - ) - - let res1 = await node1.start() - assert res1.isOk(), res1.error - - let res2 = await node2.start() - assert res2.isOk(), res2.error - - let res3 = await node3.start() - assert res3.isOk(), res3.error - - ## When - let res = await node3.findRandomPeers() - - ## Then - check: - res.len >= 1 - - ## Cleanup - await allFutures(node1.stop(), node2.stop(), node3.stop()) - - asyncTest "find random peers with predicate": - ## Setup - # Records - let - privKey1 = generateSecp256k1Key() - bindIp1 = "0.0.0.0" - extIp1 = "127.0.0.1" - tcpPort1 = 61500u16 - udpPort1 = 9000u16 - - let record1 = newTestEnrRecord( - privKey = privKey1, - extIp = extIp1, - tcpPort = tcpPort1, - udpPort = udpPort1, - flags = some(CapabilitiesBitfield.init(Capabilities.Relay)) - ) - - let - privKey2 = generateSecp256k1Key() - bindIp2 = "0.0.0.0" - extIp2 = "127.0.0.1" - tcpPort2 = 61502u16 - udpPort2 = 9002u16 - - let record2 = newTestEnrRecord( - privKey = privKey2, - extIp = extIp2, - tcpPort = tcpPort2, - udpPort = udpPort2, - flags = some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)) - ) - - let - privKey3 = generateSecp256k1Key() - bindIp3 = "0.0.0.0" - extIp3 = "127.0.0.1" - tcpPort3 = 61504u16 - udpPort3 = 9004u16 - - let record3 = newTestEnrRecord( - privKey = privKey3, - extIp = extIp3, - tcpPort = tcpPort3, - udpPort = udpPort3, - flags = some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Filter)) - ) - - let - privKey4 = generateSecp256k1Key() - bindIp4 = "0.0.0.0" - extIp4 = "127.0.0.1" - tcpPort4 = 61506u16 - udpPort4 = 9006u16 - - let record4 = newTestEnrRecord( - privKey = privKey4, - extIp = extIp4, - tcpPort = tcpPort4, - udpPort = udpPort4, - flags = some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)) - ) - - - # Nodes - let node1 = newTestDiscv5( - privKey = privKey1, - bindIp = bindIp1, - tcpPort = tcpPort1, - udpPort = udpPort1, - record = record1, - bootstrapRecords = @[record2] - ) - let node2 = newTestDiscv5( - privKey = privKey2, - bindIp = bindIp2, - tcpPort = tcpPort2, - udpPort = udpPort2, - record = record2, - bootstrapRecords = @[record3, record4] - ) - - let node3 = newTestDiscv5( - privKey = privKey3, - bindIp = bindIp3, - tcpPort = tcpPort3, - udpPort = udpPort3, - record = record3 - ) - - let node4 = newTestDiscv5( - privKey = privKey4, - bindIp = bindIp4, - tcpPort = tcpPort4, - udpPort = udpPort4, - record = record4 - ) - - # Start nodes' discoveryV5 protocols - let res1 = await node1.start() - assert res1.isOk(), res1.error - - let res2 = await node2.start() - assert res2.isOk(), res2.error - - let res3 = await node3.start() - assert res3.isOk(), res3.error - - let res4 = await node4.start() - assert res4.isOk(), res4.error - - ## Given - let recordPredicate: WakuDiscv5Predicate = proc(record: waku_enr.Record): bool = - let typedRecord = record.toTyped() - if typedRecord.isErr(): - return false - - let capabilities = typedRecord.value.waku2 - if capabilities.isNone(): - return false - - return capabilities.get().supportsCapability(Capabilities.Store) - - - ## When - let peers = await node1.findRandomPeers(some(recordPredicate)) - - ## Then - check: - peers.len >= 1 - peers.allIt(it.supportsCapability(Capabilities.Store)) - - # Cleanup - await allFutures(node1.stop(), node2.stop(), node3.stop(), node4.stop()) - - asyncTest "get shards from topics": - ## Given - let mixedTopics = @["/waku/2/thisisatest", "/waku/2/rs/0/2", "/waku/2/rs/0/8"] - let shardedTopics = @["/waku/2/rs/0/2", "/waku/2/rs/0/4", "/waku/2/rs/0/8"] - let namedTopics = @["/waku/2/thisisatest", "/waku/2/atestthisis", "/waku/2/isthisatest"] - let gibberish = @["aedyttydcb/uioasduyio", "jhdfsjhlsdfjhk/sadjhk", "khfsd/hjfdsgjh/dfs"] - let empty: seq[string] = @[] - - let shardsTopics = RelayShards.init(0, @[uint16(2), uint16(4), uint16(8)]).expect("Valid shardIds") - - ## When - - let mixedRes = topicsToRelayShards(mixedTopics) - let shardedRes = topicsToRelayShards(shardedTopics) - let namedRes = topicsToRelayShards(namedTopics) - let gibberishRes = topicsToRelayShards(gibberish) - let emptyRes = topicsToRelayShards(empty) - - ## Then - assert mixedRes.isErr(), $mixedRes.value - assert shardedRes.isOk(), shardedRes.error - assert shardedRes.value.isSome() - assert shardedRes.value.get() == shardsTopics, $shardedRes.value.get() - assert namedRes.isOk(), namedRes.error - assert namedRes.value.isNone(), $namedRes.value - assert gibberishRes.isErr(), $gibberishRes.value - assert emptyRes.isOk(), emptyRes.error - assert emptyRes.value.isNone(), $emptyRes.value - - asyncTest "filter peer per static shard": - ## Given - let recordCluster21 = block: - let - enrSeqNum = 1u64 - enrPrivKey = generatesecp256k1key() - - let - clusterId: uint16 = 21 - shardIds: seq[uint16] = @[1u16, 2u16, 5u16, 7u16, 9u16, 11u16] - - let shardsTopics = RelayShards.init(clusterId, shardIds).expect("Valid shardIds") - - var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) - require builder.withWakuRelaySharding(shardsTopics).isOk() - builder.withWakuCapabilities(Relay) - - let recordRes = builder.build() - require recordRes.isOk() - recordRes.tryGet() - - let recordCluster22Indices1 = block: - let - enrSeqNum = 1u64 - enrPrivKey = generatesecp256k1key() - - let - clusterId: uint16 = 22 - shardIds: seq[uint16] = @[2u16, 4u16, 5u16, 8u16, 10u16, 12u16] - - let shardsTopics = RelayShards.init(clusterId, shardIds).expect("Valid shardIds") - - var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) - require builder.withWakuRelaySharding(shardsTopics).isOk() - builder.withWakuCapabilities(Relay) - - let recordRes = builder.build() - require recordRes.isOk() - recordRes.tryGet() - - let recordCluster22Indices2 = block: - let - enrSeqNum = 1u64 - enrPrivKey = generatesecp256k1key() - - let - clusterId: uint16 = 22 - shardIds: seq[uint16] = @[1u16, 3u16, 6u16, 7u16, 9u16, 11u16] - - let shardsTopics = RelayShards.init(clusterId, shardIds).expect("Valid shardIds") - - var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) - require builder.withWakuRelaySharding(shardsTopics).isOk() - builder.withWakuCapabilities(Relay) - - let recordRes = builder.build() - require recordRes.isOk() - recordRes.tryGet() - - ## When - let predicateCluster21Op = shardingPredicate(recordCluster21) - require predicateCluster21Op.isSome() - let predicateCluster21 = predicateCluster21Op.get() - - let predicateCluster22Op = shardingPredicate(recordCluster22Indices1) - require predicateCluster22Op.isSome() - let predicateCluster22 = predicateCluster22Op.get() - - ## Then - check: - predicateCluster21(recordCluster21) == true - predicateCluster21(recordCluster22Indices1) == false - predicateCluster21(recordCluster22Indices2) == false - predicateCluster22(recordCluster21) == false - predicateCluster22(recordCluster22Indices1) == true - predicateCluster22(recordCluster22Indices2) == false - - asyncTest "update ENR from subscriptions": - ## Given - let - shard1 = "/waku/2/rs/0/1" - shard2 = "/waku/2/rs/0/2" - shard3 = "/waku/2/rs/0/3" - privKey = generateSecp256k1Key() - bindIp = "0.0.0.0" - extIp = "127.0.0.1" - tcpPort = 61500u16 - udpPort = 9000u16 - - let record = newTestEnrRecord( - privKey = privKey, - extIp = extIp, - tcpPort = tcpPort, - udpPort = udpPort, - ) - - let queue = newAsyncEventQueue[SubscriptionEvent](30) - - let node = newTestDiscv5( - privKey = privKey, - bindIp = bindIp, - tcpPort = tcpPort, - udpPort = udpPort, - record = record, - queue = queue, - ) - - let res = await node.start() - assert res.isOk(), res.error - - ## Then - queue.emit((kind: PubsubSub, topic: shard1)) - queue.emit((kind: PubsubSub, topic: shard2)) - queue.emit((kind: PubsubSub, topic: shard3)) - - await sleepAsync(1.seconds) - - check: - node.protocol.localNode.record.containsShard(shard1) == true - node.protocol.localNode.record.containsShard(shard2) == true - node.protocol.localNode.record.containsShard(shard3) == true - - queue.emit((kind: PubsubSub, topic: shard1)) - queue.emit((kind: PubsubSub, topic: shard2)) - queue.emit((kind: PubsubSub, topic: shard3)) - - await sleepAsync(1.seconds) - - check: - node.protocol.localNode.record.containsShard(shard1) == true - node.protocol.localNode.record.containsShard(shard2) == true - node.protocol.localNode.record.containsShard(shard3) == true - - queue.emit((kind: PubsubUnsub, topic: shard1)) - queue.emit((kind: PubsubUnsub, topic: shard2)) - - await sleepAsync(1.seconds) - - check: - node.protocol.localNode.record.containsShard(shard1) == false - node.protocol.localNode.record.containsShard(shard2) == false - node.protocol.localNode.record.containsShard(shard3) == true - - ## Cleanup - await node.stop() - - diff --git a/tests/waku_discv5/test_all.nim b/tests/waku_discv5/test_all.nim new file mode 100644 index 000000000..a6d2c22c4 --- /dev/null +++ b/tests/waku_discv5/test_all.nim @@ -0,0 +1 @@ +import ./test_waku_discv5 diff --git a/tests/waku_discv5/test_waku_discv5.nim b/tests/waku_discv5/test_waku_discv5.nim new file mode 100644 index 000000000..cdcd9461d --- /dev/null +++ b/tests/waku_discv5/test_waku_discv5.nim @@ -0,0 +1,426 @@ +{.used.} + +import + std/[sequtils, algorithm], + stew/results, + stew/shims/net, + chronos, + chronicles, + testutils/unittests, + libp2p/crypto/crypto as libp2p_keys, + eth/keys as eth_keys + +import + ../../../waku/[waku_core/topics, waku_enr, waku_discv5, common/enr], + ../testlib/[wakucore, testasync, assertions, futures], + ../waku_enr/utils, + ./utils + +import eth/p2p/discoveryv5/enr as ethEnr + +procSuite "Waku Discovery v5": + let + rng = eth_keys.newRng() + pk1 = eth_keys.PrivateKey.random(rng[]) + pk2 = eth_keys.PrivateKey.random(rng[]) + + enrNoCapabilities = + initRecord(1, pk1, {"rs": @[0.byte, 0.byte, 1.byte, 0.byte, 0.byte]}).value() + enrRelay = + initRecord( + 1, pk2, {"waku2": @[1.byte], "rs": @[0.byte, 1.byte, 1.byte, 0.byte, 1.byte]} + ).value() + enrNoShardingInfo = initRecord(1, pk1, {"waku2": @[1.byte]}).value() + + suite "shardingPredicate": + var + recordCluster21 {.threadvar.}: Record + recordCluster22Indices1 {.threadvar.}: Record + recordCluster22Indices2 {.threadvar.}: Record + + asyncSetup: + recordCluster21 = + block: + let + enrSeqNum = 1u64 + enrPrivKey = generatesecp256k1key() + + let + clusterId: uint16 = 21 + shardIds: seq[uint16] = @[1u16, 2u16, 5u16, 7u16, 9u16, 11u16] + + let + shardsTopics = + RelayShards.init(clusterId, shardIds).expect("Valid shardIds") + + var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) + require builder.withWakuRelaySharding(shardsTopics).isOk() + builder.withWakuCapabilities(Relay) + + let recordRes = builder.build() + require recordRes.isOk() + recordRes.tryGet() + + recordCluster22Indices1 = + block: + let + enrSeqNum = 1u64 + enrPrivKey = generatesecp256k1key() + + let + clusterId: uint16 = 22 + shardIds: seq[uint16] = @[2u16, 4u16, 5u16, 8u16, 10u16, 12u16] + + let + shardsTopics = + RelayShards.init(clusterId, shardIds).expect("Valid shardIds") + + var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) + require builder.withWakuRelaySharding(shardsTopics).isOk() + builder.withWakuCapabilities(Relay) + + let recordRes = builder.build() + require recordRes.isOk() + recordRes.tryGet() + + recordCluster22Indices2 = + block: + let + enrSeqNum = 1u64 + enrPrivKey = generatesecp256k1key() + + let + clusterId: uint16 = 22 + shardIds: seq[uint16] = @[1u16, 3u16, 6u16, 7u16, 9u16, 11u16] + + let + shardsTopics = + RelayShards.init(clusterId, shardIds).expect("Valid shardIds") + + var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) + require builder.withWakuRelaySharding(shardsTopics).isOk() + builder.withWakuCapabilities(Relay) + + let recordRes = builder.build() + require recordRes.isOk() + recordRes.tryGet() + + asyncTest "filter peer per contained shard": + # When + let predicateCluster21Op = shardingPredicate(recordCluster21) + require predicateCluster21Op.isSome() + let predicateCluster21 = predicateCluster21Op.get() + + let predicateCluster22Op = shardingPredicate(recordCluster22Indices1) + require predicateCluster22Op.isSome() + let predicateCluster22 = predicateCluster22Op.get() + + # Then + check: + predicateCluster21(recordCluster21) == true + predicateCluster21(recordCluster22Indices1) == false + predicateCluster21(recordCluster22Indices2) == false + predicateCluster22(recordCluster21) == false + predicateCluster22(recordCluster22Indices1) == true + predicateCluster22(recordCluster22Indices2) == false + + asyncTest "filter peer per bootnode": + let + predicateNoCapabilities = + shardingPredicate(enrNoCapabilities, @[enrNoCapabilities]).get() + predicateNoCapabilitiesWithBoth = + shardingPredicate(enrNoCapabilities, @[enrNoCapabilities, enrRelay]).get() + + check: + predicateNoCapabilities(enrNoCapabilities) == true + predicateNoCapabilities(enrRelay) == false + predicateNoCapabilitiesWithBoth(enrNoCapabilities) == true + predicateNoCapabilitiesWithBoth(enrRelay) == true + + let + predicateRelay = shardingPredicate(enrRelay, @[enrRelay]).get() + predicateRelayWithBoth = + shardingPredicate(enrRelay, @[enrRelay, enrNoCapabilities]).get() + + check: + predicateRelay(enrNoCapabilities) == false + predicateRelay(enrRelay) == true + predicateRelayWithBoth(enrNoCapabilities) == true + predicateRelayWithBoth(enrRelay) == true + + asyncTest "does not conform to typed record": + let + record = ethEnr.Record(raw: @[]) + predicateRecord = shardingPredicate(record, @[]) + + check: + predicateRecord.isNone() + + asyncTest "no relay sharding info": + let + predicateNoShardingInfo = + shardingPredicate(enrNoShardingInfo, @[enrNoShardingInfo]) + + check: + predicateNoShardingInfo.isNone() + + suite "findRandomPeers": + proc buildNode( + tcpPort: uint16, + udpPort: uint16, + bindIp: string = "0.0.0.0", + extIp: string = "127.0.0.1", + indices: seq[uint64] = @[], + recordFlags: Option[CapabilitiesBitfield] = none(CapabilitiesBitfield), + bootstrapRecords: seq[waku_enr.Record] = @[], + ): (WakuDiscoveryV5, Record) = + let + privKey = generateSecp256k1Key() + record = + newTestEnrRecord( + privKey = privKey, + extIp = extIp, + tcpPort = tcpPort, + udpPort = udpPort, + indices = indices, + flags = recordFlags, + ) + node = + newTestDiscv5( + privKey = privKey, + bindIp = bindIp, + tcpPort = tcpPort, + udpPort = udpPort, + record = record, + bootstrapRecords = bootstrapRecords, + ) + + (node, record) + + let + filterForStore: WakuDiscv5Predicate = + proc(record: waku_enr.Record): bool = + let typedRecord = record.toTyped() + if typedRecord.isErr(): + return false + + let capabilities = typedRecord.value.waku2 + if capabilities.isNone(): + return false + + return capabilities.get().supportsCapability(Capabilities.Store) + + asyncTest "find random peers without predicate": + # Given 3 nodes + let + (node1, record1) = buildNode(tcpPort = 61500u16, udpPort = 9000u16) + (node2, record2) = buildNode(tcpPort = 61502u16, udpPort = 9002u16) + (node3, record3) = + buildNode( + tcpPort = 61504u16, + udpPort = 9004u16, + bootstrapRecords = @[record1, record2], + ) + + let res1 = await node1.start() + assertResultOk res1 + + let res2 = await node2.start() + assertResultOk res2 + + let res3 = await node3.start() + assertResultOk res3 + + await sleepAsync(FUTURE_TIMEOUT) + + ## When we find random peers + let res = await node3.findRandomPeers() + + var tcpPortList = res.mapIt(it.toTypedRecord().value().tcp.get()) + tcpPortList.sort() + + ## Then + check: + res.len == 2 + tcpPortList == @[61500, 61502] + + ## Cleanup + await allFutures(node1.stop(), node2.stop(), node3.stop()) + + asyncTest "find random peers with parameter predicate": + # Given 4 nodes + let + (node3, record3) = + buildNode( + tcpPort = 61504u16, + udpPort = 9004u16, + recordFlags = + some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Filter)), + ) + (node4, record4) = + buildNode( + tcpPort = 61506u16, + udpPort = 9006u16, + recordFlags = + some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)), + ) + (node2, record2) = + buildNode( + tcpPort = 61502u16, + udpPort = 9002u16, + recordFlags = + some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)), + bootstrapRecords = @[record3, record4], + ) + (node1, record1) = + buildNode( + tcpPort = 61500u16, + udpPort = 9000u16, + recordFlags = some(CapabilitiesBitfield.init(Capabilities.Relay)), + bootstrapRecords = @[record2], + ) + + # Start nodes' discoveryV5 protocols + let res1 = await node1.start() + assertResultOk res1 + + let res2 = await node2.start() + assertResultOk res2 + + let res3 = await node3.start() + assertResultOk res3 + + let res4 = await node4.start() + assertResultOk res4 + + await sleepAsync(FUTURE_TIMEOUT) + + ## When + let peers = await node1.findRandomPeers(some(filterForStore)) + + ## Then + check: + peers.len >= 1 + peers.allIt(it.supportsCapability(Capabilities.Store)) + + # Cleanup + await allFutures(node1.stop(), node2.stop(), node3.stop(), node4.stop()) + + asyncTest "find random peers with instance predicate": + ## Setup + # Records + let + (node3, record3) = + buildNode( + tcpPort = 61504u16, + udpPort = 9004u16, + recordFlags = + some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Filter)), + ) + (node4, record4) = + buildNode( + tcpPort = 61506u16, + udpPort = 9006u16, + recordFlags = + some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)), + ) + (node2, record2) = + buildNode( + tcpPort = 61502u16, + udpPort = 9002u16, + recordFlags = + some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)), + bootstrapRecords = @[record3, record4], + ) + let + (node1, record1) = + buildNode( + tcpPort = 61500u16, + udpPort = 9000u16, + recordFlags = some(CapabilitiesBitfield.init(Capabilities.Relay)), + indices = @[0u64, 0u64, 1u64, 0u64, 0u64], + bootstrapRecords = @[record2], + ) + + # Start nodes' discoveryV5 protocols + let res1 = await node1.start() + assertResultOk res1 + + let res2 = await node2.start() + assertResultOk res2 + + let res3 = await node3.start() + assertResultOk res3 + + let res4 = await node4.start() + assertResultOk res4 + + await sleepAsync(FUTURE_TIMEOUT) + + ## When + let peers = await node1.findRandomPeers() + + ## Then + check: + peers.len >= 1 + peers.allIt(it.supportsCapability(Capabilities.Store)) + + # Cleanup + await allFutures(node1.stop(), node2.stop(), node3.stop(), node4.stop()) + + suite "addBoostrapNode": + let + validEnr = + "enr:-I-4QG3mX250ArniAs2DLpW-QHOLKSD5x_Ibp8AYcQZbz1HhHFJtl2dNDGcha" & + "U5ugLbDKRgtTDZH8NsxXlTXDpYAgzgBgmlkgnY0gnJzjwAVBgABAAIABQAHAAkAC4" & + "lzZWNwMjU2azGhA4_KwN0NRRmmfQ-B9B2h2PZjoJvBnaIOi6sR_b2UTQBBhXdha3U" & "yAQ" + + asyncTest "address is valid": + # Given an empty list of enrs + var enrs: seq[Record] = @[] + + # When adding a valid enr + addBootstrapNode(validEnr, enrs) + var r: Record + echo r.fromURI(validEnr) + echo r + + # Then the enr is added to the list + check: + enrs.len == 1 + enrs[0].toBase64() == validEnr[4..^1] + + asyncTest "address is empty": + # Given an empty list of enrs + var enrs: seq[Record] = @[] + + # When adding an empty enr + addBootstrapNode("", enrs) + + # Then the enr is not added to the list + check: + enrs.len == 0 + + asyncTest "address is valid but starts with #": + # Given an empty list of enrs + var enrs: seq[Record] = @[] + + # When adding any enr that starts with # + let enr = "#" & validEnr + addBootstrapNode(enr, enrs) + + # Then the enr is not added to the list + check: + enrs.len == 0 + + asyncTest "address is not valid": + # Given an empty list of enrs + var enrs: seq[Record] = @[] + + # When adding an invalid enr + let enr = "enr:invalid" + addBootstrapNode(enr, enrs) + + # Then the enr is not added to the list + check: + enrs.len == 0 diff --git a/tests/waku_discv5/utils.nim b/tests/waku_discv5/utils.nim new file mode 100644 index 000000000..9a8464a4e --- /dev/null +++ b/tests/waku_discv5/utils.nim @@ -0,0 +1,36 @@ +import + stew/results, + stew/shims/net, + chronos, + libp2p/crypto/crypto as libp2p_keys, + eth/keys as eth_keys + +import + ../../../waku/[waku_core/topics, waku_enr, waku_discv5], + ../testlib/[common, wakucore] + +proc newTestDiscv5*( + privKey: libp2p_keys.PrivateKey, + bindIp: string, + tcpPort: uint16, + udpPort: uint16, + record: waku_enr.Record, + bootstrapRecords = newSeq[waku_enr.Record](), + queue = newAsyncEventQueue[SubscriptionEvent](30), +): WakuDiscoveryV5 = + let + config = + WakuDiscoveryV5Config( + privateKey: eth_keys.PrivateKey(privKey.skkey), + address: parseIpAddress(bindIp), + port: Port(udpPort), + bootstrapRecords: bootstrapRecords, + ) + + let + discv5 = + WakuDiscoveryV5.new( + rng = rng(), conf = config, record = some(record), queue = queue + ) + + return discv5 diff --git a/tests/waku_enr/test_all.nim b/tests/waku_enr/test_all.nim new file mode 100644 index 000000000..13ae1c48a --- /dev/null +++ b/tests/waku_enr/test_all.nim @@ -0,0 +1 @@ +import ./test_sharding diff --git a/tests/waku_enr/test_sharding.nim b/tests/waku_enr/test_sharding.nim new file mode 100644 index 000000000..547d2c62e --- /dev/null +++ b/tests/waku_enr/test_sharding.nim @@ -0,0 +1,125 @@ +{.used.} + +import + stew/results, + stew/shims/net, + chronos, + testutils/unittests, + libp2p/crypto/crypto as libp2p_keys, + eth/keys as eth_keys + +import + ../../../waku/[waku_enr, waku_discv5, waku_core], + ../testlib/wakucore, + ../waku_discv5/utils, + ./utils + +suite "Sharding": + suite "topicsToRelayShards": + asyncTest "get shards from topics": + ## Given + let mixedTopics = @["/waku/2/thisisatest", "/waku/2/rs/0/2", "/waku/2/rs/0/8"] + let shardedTopics = @["/waku/2/rs/0/2", "/waku/2/rs/0/4", "/waku/2/rs/0/8"] + let + namedTopics = + @["/waku/2/thisisatest", "/waku/2/atestthisis", "/waku/2/isthisatest"] + let + gibberish = + @["aedyttydcb/uioasduyio", "jhdfsjhlsdfjhk/sadjhk", "khfsd/hjfdsgjh/dfs"] + let empty: seq[string] = @[] + + let + shardsTopics = + RelayShards.init(0, @[uint16(2), uint16(4), uint16(8)]).expect( + "Valid shardIds" + ) + + ## When + + let mixedRes = topicsToRelayShards(mixedTopics) + let shardedRes = topicsToRelayShards(shardedTopics) + let namedRes = topicsToRelayShards(namedTopics) + let gibberishRes = topicsToRelayShards(gibberish) + let emptyRes = topicsToRelayShards(empty) + + ## Then + assert mixedRes.isErr(), $mixedRes.value + assert shardedRes.isOk(), shardedRes.error + assert shardedRes.value.isSome() + assert shardedRes.value.get() == shardsTopics, $shardedRes.value.get() + assert namedRes.isOk(), namedRes.error + assert namedRes.value.isNone(), $namedRes.value + assert gibberishRes.isErr(), $gibberishRes.value + assert emptyRes.isOk(), emptyRes.error + assert emptyRes.value.isNone(), $emptyRes.value + + suite "containsShard": + asyncTest "update ENR from subscriptions": + ## Given + let + shard1 = "/waku/2/rs/0/1" + shard2 = "/waku/2/rs/0/2" + shard3 = "/waku/2/rs/0/3" + privKey = generateSecp256k1Key() + bindIp = "0.0.0.0" + extIp = "127.0.0.1" + tcpPort = 61500u16 + udpPort = 9000u16 + + let + record = + newTestEnrRecord( + privKey = privKey, extIp = extIp, tcpPort = tcpPort, udpPort = udpPort + ) + + let queue = newAsyncEventQueue[SubscriptionEvent](30) + + let + node = + newTestDiscv5( + privKey = privKey, + bindIp = bindIp, + tcpPort = tcpPort, + udpPort = udpPort, + record = record, + queue = queue, + ) + + let res = await node.start() + assert res.isOk(), res.error + + ## Then + queue.emit((kind: PubsubSub, topic: shard1)) + queue.emit((kind: PubsubSub, topic: shard2)) + queue.emit((kind: PubsubSub, topic: shard3)) + + await sleepAsync(1.seconds) + + check: + node.protocol.localNode.record.containsShard(shard1) == true + node.protocol.localNode.record.containsShard(shard2) == true + node.protocol.localNode.record.containsShard(shard3) == true + + queue.emit((kind: PubsubSub, topic: shard1)) + queue.emit((kind: PubsubSub, topic: shard2)) + queue.emit((kind: PubsubSub, topic: shard3)) + + await sleepAsync(1.seconds) + + check: + node.protocol.localNode.record.containsShard(shard1) == true + node.protocol.localNode.record.containsShard(shard2) == true + node.protocol.localNode.record.containsShard(shard3) == true + + queue.emit((kind: PubsubUnsub, topic: shard1)) + queue.emit((kind: PubsubUnsub, topic: shard2)) + + await sleepAsync(1.seconds) + + check: + node.protocol.localNode.record.containsShard(shard1) == false + node.protocol.localNode.record.containsShard(shard2) == false + node.protocol.localNode.record.containsShard(shard3) == true + + ## Cleanup + await node.stop() diff --git a/tests/waku_enr/utils.nim b/tests/waku_enr/utils.nim new file mode 100644 index 000000000..469e0d95c --- /dev/null +++ b/tests/waku_enr/utils.nim @@ -0,0 +1,37 @@ +import + sequtils, + stew/results, + stew/shims/net, + chronos, + libp2p/crypto/crypto as libp2p_keys, + eth/keys as eth_keys + +import + ../../../waku/[waku_core/topics, waku_enr, waku_discv5, waku_enr/sharding], + ../testlib/[common, wakucore] + +proc newTestEnrRecord*( + privKey: libp2p_keys.PrivateKey, + extIp: string, + tcpPort: uint16, + udpPort: uint16, + indices: seq[uint64] = @[], + flags = none(CapabilitiesBitfield), +): waku_enr.Record = + var builder = EnrBuilder.init(privKey) + builder.withIpAddressAndPorts( + ipAddr = some(parseIpAddress(extIp)), + tcpPort = some(Port(tcpPort)), + udpPort = some(Port(udpPort)), + ) + + if indices.len > 0: + let + byteSeq: seq[byte] = indices.mapIt(cast[byte](it)) + relayShards = fromIndicesList(byteSeq).get() + discard builder.withWakuRelayShardingIndicesList(relayShards) + + if flags.isSome(): + builder.withWakuCapabilities(flags.get()) + + builder.build().tryGet() diff --git a/waku/waku_enr/sharding.nim b/waku/waku_enr/sharding.nim index f5045ed6d..d4ee0ea02 100644 --- a/waku/waku_enr/sharding.nim +++ b/waku/waku_enr/sharding.nim @@ -113,7 +113,7 @@ func toIndicesList*(rs: RelayShards): EnrResult[seq[byte]] = ok(res) -func fromIndicesList(buf: seq[byte]): Result[RelayShards, string] = +func fromIndicesList*(buf: seq[byte]): Result[RelayShards, string] = if buf.len < 3: return err("insufficient data: expected at least 3 bytes, got " & $buf.len & " bytes")