mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-09 09:23:14 +00:00
parent
8e13bfbb65
commit
297838b145
@ -59,7 +59,6 @@ import
|
||||
./test_waku_keepalive,
|
||||
./test_waku_enr,
|
||||
./test_waku_dnsdisc,
|
||||
./test_waku_discv5,
|
||||
./test_relay_peer_exchange,
|
||||
./test_waku_noise,
|
||||
./test_waku_noise_sessions,
|
||||
|
||||
@ -1,462 +0,0 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[sequtils],
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronos,
|
||||
chronicles,
|
||||
testutils/unittests,
|
||||
libp2p/crypto/crypto as libp2p_keys,
|
||||
eth/keys as eth_keys
|
||||
import
|
||||
../../waku/waku_core/topics,
|
||||
../../waku/waku_enr,
|
||||
../../waku/waku_discv5,
|
||||
./testlib/common,
|
||||
./testlib/wakucore,
|
||||
./testlib/wakunode
|
||||
|
||||
|
||||
proc newTestEnrRecord(privKey: libp2p_keys.PrivateKey,
|
||||
extIp: string, tcpPort: uint16, udpPort: uint16,
|
||||
flags = none(CapabilitiesBitfield)): waku_enr.Record =
|
||||
var builder = EnrBuilder.init(privKey)
|
||||
builder.withIpAddressAndPorts(
|
||||
ipAddr = some(parseIpAddress(extIp)),
|
||||
tcpPort = some(Port(tcpPort)),
|
||||
udpPort = some(Port(udpPort)),
|
||||
)
|
||||
|
||||
if flags.isSome():
|
||||
builder.withWakuCapabilities(flags.get())
|
||||
|
||||
builder.build().tryGet()
|
||||
|
||||
|
||||
proc newTestDiscv5(
|
||||
privKey: libp2p_keys.PrivateKey,
|
||||
bindIp: string, tcpPort: uint16, udpPort: uint16,
|
||||
record: waku_enr.Record,
|
||||
bootstrapRecords = newSeq[waku_enr.Record](),
|
||||
queue = newAsyncEventQueue[SubscriptionEvent](30),
|
||||
): WakuDiscoveryV5 =
|
||||
let config = WakuDiscoveryV5Config(
|
||||
privateKey: eth_keys.PrivateKey(privKey.skkey),
|
||||
address: parseIpAddress(bindIp),
|
||||
port: Port(udpPort),
|
||||
bootstrapRecords: bootstrapRecords,
|
||||
)
|
||||
|
||||
let discv5 = WakuDiscoveryV5.new(
|
||||
rng = rng(),
|
||||
conf = config,
|
||||
record = some(record),
|
||||
queue = queue,
|
||||
)
|
||||
|
||||
return discv5
|
||||
|
||||
|
||||
procSuite "Waku Discovery v5":
|
||||
asyncTest "find random peers":
|
||||
## Given
|
||||
# Node 1
|
||||
let
|
||||
privKey1 = generateSecp256k1Key()
|
||||
bindIp1 = "0.0.0.0"
|
||||
extIp1 = "127.0.0.1"
|
||||
tcpPort1 = 61500u16
|
||||
udpPort1 = 9000u16
|
||||
|
||||
let record1 = newTestEnrRecord(
|
||||
privKey = privKey1,
|
||||
extIp = extIp1,
|
||||
tcpPort = tcpPort1,
|
||||
udpPort = udpPort1,
|
||||
)
|
||||
let node1 = newTestDiscv5(
|
||||
privKey = privKey1,
|
||||
bindIp = bindIp1,
|
||||
tcpPort = tcpPort1,
|
||||
udpPort = udpPort1,
|
||||
record = record1
|
||||
)
|
||||
|
||||
# Node 2
|
||||
let
|
||||
privKey2 = generateSecp256k1Key()
|
||||
bindIp2 = "0.0.0.0"
|
||||
extIp2 = "127.0.0.1"
|
||||
tcpPort2 = 61502u16
|
||||
udpPort2 = 9002u16
|
||||
|
||||
let record2 = newTestEnrRecord(
|
||||
privKey = privKey2,
|
||||
extIp = extIp2,
|
||||
tcpPort = tcpPort2,
|
||||
udpPort = udpPort2,
|
||||
)
|
||||
|
||||
let node2 = newTestDiscv5(
|
||||
privKey = privKey2,
|
||||
bindIp = bindIp2,
|
||||
tcpPort = tcpPort2,
|
||||
udpPort = udpPort2,
|
||||
record = record2,
|
||||
)
|
||||
|
||||
# Node 3
|
||||
let
|
||||
privKey3 = generateSecp256k1Key()
|
||||
bindIp3 = "0.0.0.0"
|
||||
extIp3 = "127.0.0.1"
|
||||
tcpPort3 = 61504u16
|
||||
udpPort3 = 9004u16
|
||||
|
||||
let record3 = newTestEnrRecord(
|
||||
privKey = privKey3,
|
||||
extIp = extIp3,
|
||||
tcpPort = tcpPort3,
|
||||
udpPort = udpPort3,
|
||||
)
|
||||
|
||||
let node3 = newTestDiscv5(
|
||||
privKey = privKey3,
|
||||
bindIp = bindIp3,
|
||||
tcpPort = tcpPort3,
|
||||
udpPort = udpPort3,
|
||||
record = record3,
|
||||
bootstrapRecords = @[record1, record2]
|
||||
)
|
||||
|
||||
let res1 = await node1.start()
|
||||
assert res1.isOk(), res1.error
|
||||
|
||||
let res2 = await node2.start()
|
||||
assert res2.isOk(), res2.error
|
||||
|
||||
let res3 = await node3.start()
|
||||
assert res3.isOk(), res3.error
|
||||
|
||||
## When
|
||||
let res = await node3.findRandomPeers()
|
||||
|
||||
## Then
|
||||
check:
|
||||
res.len >= 1
|
||||
|
||||
## Cleanup
|
||||
await allFutures(node1.stop(), node2.stop(), node3.stop())
|
||||
|
||||
asyncTest "find random peers with predicate":
|
||||
## Setup
|
||||
# Records
|
||||
let
|
||||
privKey1 = generateSecp256k1Key()
|
||||
bindIp1 = "0.0.0.0"
|
||||
extIp1 = "127.0.0.1"
|
||||
tcpPort1 = 61500u16
|
||||
udpPort1 = 9000u16
|
||||
|
||||
let record1 = newTestEnrRecord(
|
||||
privKey = privKey1,
|
||||
extIp = extIp1,
|
||||
tcpPort = tcpPort1,
|
||||
udpPort = udpPort1,
|
||||
flags = some(CapabilitiesBitfield.init(Capabilities.Relay))
|
||||
)
|
||||
|
||||
let
|
||||
privKey2 = generateSecp256k1Key()
|
||||
bindIp2 = "0.0.0.0"
|
||||
extIp2 = "127.0.0.1"
|
||||
tcpPort2 = 61502u16
|
||||
udpPort2 = 9002u16
|
||||
|
||||
let record2 = newTestEnrRecord(
|
||||
privKey = privKey2,
|
||||
extIp = extIp2,
|
||||
tcpPort = tcpPort2,
|
||||
udpPort = udpPort2,
|
||||
flags = some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store))
|
||||
)
|
||||
|
||||
let
|
||||
privKey3 = generateSecp256k1Key()
|
||||
bindIp3 = "0.0.0.0"
|
||||
extIp3 = "127.0.0.1"
|
||||
tcpPort3 = 61504u16
|
||||
udpPort3 = 9004u16
|
||||
|
||||
let record3 = newTestEnrRecord(
|
||||
privKey = privKey3,
|
||||
extIp = extIp3,
|
||||
tcpPort = tcpPort3,
|
||||
udpPort = udpPort3,
|
||||
flags = some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Filter))
|
||||
)
|
||||
|
||||
let
|
||||
privKey4 = generateSecp256k1Key()
|
||||
bindIp4 = "0.0.0.0"
|
||||
extIp4 = "127.0.0.1"
|
||||
tcpPort4 = 61506u16
|
||||
udpPort4 = 9006u16
|
||||
|
||||
let record4 = newTestEnrRecord(
|
||||
privKey = privKey4,
|
||||
extIp = extIp4,
|
||||
tcpPort = tcpPort4,
|
||||
udpPort = udpPort4,
|
||||
flags = some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store))
|
||||
)
|
||||
|
||||
|
||||
# Nodes
|
||||
let node1 = newTestDiscv5(
|
||||
privKey = privKey1,
|
||||
bindIp = bindIp1,
|
||||
tcpPort = tcpPort1,
|
||||
udpPort = udpPort1,
|
||||
record = record1,
|
||||
bootstrapRecords = @[record2]
|
||||
)
|
||||
let node2 = newTestDiscv5(
|
||||
privKey = privKey2,
|
||||
bindIp = bindIp2,
|
||||
tcpPort = tcpPort2,
|
||||
udpPort = udpPort2,
|
||||
record = record2,
|
||||
bootstrapRecords = @[record3, record4]
|
||||
)
|
||||
|
||||
let node3 = newTestDiscv5(
|
||||
privKey = privKey3,
|
||||
bindIp = bindIp3,
|
||||
tcpPort = tcpPort3,
|
||||
udpPort = udpPort3,
|
||||
record = record3
|
||||
)
|
||||
|
||||
let node4 = newTestDiscv5(
|
||||
privKey = privKey4,
|
||||
bindIp = bindIp4,
|
||||
tcpPort = tcpPort4,
|
||||
udpPort = udpPort4,
|
||||
record = record4
|
||||
)
|
||||
|
||||
# Start nodes' discoveryV5 protocols
|
||||
let res1 = await node1.start()
|
||||
assert res1.isOk(), res1.error
|
||||
|
||||
let res2 = await node2.start()
|
||||
assert res2.isOk(), res2.error
|
||||
|
||||
let res3 = await node3.start()
|
||||
assert res3.isOk(), res3.error
|
||||
|
||||
let res4 = await node4.start()
|
||||
assert res4.isOk(), res4.error
|
||||
|
||||
## Given
|
||||
let recordPredicate: WakuDiscv5Predicate = proc(record: waku_enr.Record): bool =
|
||||
let typedRecord = record.toTyped()
|
||||
if typedRecord.isErr():
|
||||
return false
|
||||
|
||||
let capabilities = typedRecord.value.waku2
|
||||
if capabilities.isNone():
|
||||
return false
|
||||
|
||||
return capabilities.get().supportsCapability(Capabilities.Store)
|
||||
|
||||
|
||||
## When
|
||||
let peers = await node1.findRandomPeers(some(recordPredicate))
|
||||
|
||||
## Then
|
||||
check:
|
||||
peers.len >= 1
|
||||
peers.allIt(it.supportsCapability(Capabilities.Store))
|
||||
|
||||
# Cleanup
|
||||
await allFutures(node1.stop(), node2.stop(), node3.stop(), node4.stop())
|
||||
|
||||
asyncTest "get shards from topics":
|
||||
## Given
|
||||
let mixedTopics = @["/waku/2/thisisatest", "/waku/2/rs/0/2", "/waku/2/rs/0/8"]
|
||||
let shardedTopics = @["/waku/2/rs/0/2", "/waku/2/rs/0/4", "/waku/2/rs/0/8"]
|
||||
let namedTopics = @["/waku/2/thisisatest", "/waku/2/atestthisis", "/waku/2/isthisatest"]
|
||||
let gibberish = @["aedyttydcb/uioasduyio", "jhdfsjhlsdfjhk/sadjhk", "khfsd/hjfdsgjh/dfs"]
|
||||
let empty: seq[string] = @[]
|
||||
|
||||
let shardsTopics = RelayShards.init(0, @[uint16(2), uint16(4), uint16(8)]).expect("Valid shardIds")
|
||||
|
||||
## When
|
||||
|
||||
let mixedRes = topicsToRelayShards(mixedTopics)
|
||||
let shardedRes = topicsToRelayShards(shardedTopics)
|
||||
let namedRes = topicsToRelayShards(namedTopics)
|
||||
let gibberishRes = topicsToRelayShards(gibberish)
|
||||
let emptyRes = topicsToRelayShards(empty)
|
||||
|
||||
## Then
|
||||
assert mixedRes.isErr(), $mixedRes.value
|
||||
assert shardedRes.isOk(), shardedRes.error
|
||||
assert shardedRes.value.isSome()
|
||||
assert shardedRes.value.get() == shardsTopics, $shardedRes.value.get()
|
||||
assert namedRes.isOk(), namedRes.error
|
||||
assert namedRes.value.isNone(), $namedRes.value
|
||||
assert gibberishRes.isErr(), $gibberishRes.value
|
||||
assert emptyRes.isOk(), emptyRes.error
|
||||
assert emptyRes.value.isNone(), $emptyRes.value
|
||||
|
||||
asyncTest "filter peer per static shard":
|
||||
## Given
|
||||
let recordCluster21 = block:
|
||||
let
|
||||
enrSeqNum = 1u64
|
||||
enrPrivKey = generatesecp256k1key()
|
||||
|
||||
let
|
||||
clusterId: uint16 = 21
|
||||
shardIds: seq[uint16] = @[1u16, 2u16, 5u16, 7u16, 9u16, 11u16]
|
||||
|
||||
let shardsTopics = RelayShards.init(clusterId, shardIds).expect("Valid shardIds")
|
||||
|
||||
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
|
||||
require builder.withWakuRelaySharding(shardsTopics).isOk()
|
||||
builder.withWakuCapabilities(Relay)
|
||||
|
||||
let recordRes = builder.build()
|
||||
require recordRes.isOk()
|
||||
recordRes.tryGet()
|
||||
|
||||
let recordCluster22Indices1 = block:
|
||||
let
|
||||
enrSeqNum = 1u64
|
||||
enrPrivKey = generatesecp256k1key()
|
||||
|
||||
let
|
||||
clusterId: uint16 = 22
|
||||
shardIds: seq[uint16] = @[2u16, 4u16, 5u16, 8u16, 10u16, 12u16]
|
||||
|
||||
let shardsTopics = RelayShards.init(clusterId, shardIds).expect("Valid shardIds")
|
||||
|
||||
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
|
||||
require builder.withWakuRelaySharding(shardsTopics).isOk()
|
||||
builder.withWakuCapabilities(Relay)
|
||||
|
||||
let recordRes = builder.build()
|
||||
require recordRes.isOk()
|
||||
recordRes.tryGet()
|
||||
|
||||
let recordCluster22Indices2 = block:
|
||||
let
|
||||
enrSeqNum = 1u64
|
||||
enrPrivKey = generatesecp256k1key()
|
||||
|
||||
let
|
||||
clusterId: uint16 = 22
|
||||
shardIds: seq[uint16] = @[1u16, 3u16, 6u16, 7u16, 9u16, 11u16]
|
||||
|
||||
let shardsTopics = RelayShards.init(clusterId, shardIds).expect("Valid shardIds")
|
||||
|
||||
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
|
||||
require builder.withWakuRelaySharding(shardsTopics).isOk()
|
||||
builder.withWakuCapabilities(Relay)
|
||||
|
||||
let recordRes = builder.build()
|
||||
require recordRes.isOk()
|
||||
recordRes.tryGet()
|
||||
|
||||
## When
|
||||
let predicateCluster21Op = shardingPredicate(recordCluster21)
|
||||
require predicateCluster21Op.isSome()
|
||||
let predicateCluster21 = predicateCluster21Op.get()
|
||||
|
||||
let predicateCluster22Op = shardingPredicate(recordCluster22Indices1)
|
||||
require predicateCluster22Op.isSome()
|
||||
let predicateCluster22 = predicateCluster22Op.get()
|
||||
|
||||
## Then
|
||||
check:
|
||||
predicateCluster21(recordCluster21) == true
|
||||
predicateCluster21(recordCluster22Indices1) == false
|
||||
predicateCluster21(recordCluster22Indices2) == false
|
||||
predicateCluster22(recordCluster21) == false
|
||||
predicateCluster22(recordCluster22Indices1) == true
|
||||
predicateCluster22(recordCluster22Indices2) == false
|
||||
|
||||
asyncTest "update ENR from subscriptions":
|
||||
## Given
|
||||
let
|
||||
shard1 = "/waku/2/rs/0/1"
|
||||
shard2 = "/waku/2/rs/0/2"
|
||||
shard3 = "/waku/2/rs/0/3"
|
||||
privKey = generateSecp256k1Key()
|
||||
bindIp = "0.0.0.0"
|
||||
extIp = "127.0.0.1"
|
||||
tcpPort = 61500u16
|
||||
udpPort = 9000u16
|
||||
|
||||
let record = newTestEnrRecord(
|
||||
privKey = privKey,
|
||||
extIp = extIp,
|
||||
tcpPort = tcpPort,
|
||||
udpPort = udpPort,
|
||||
)
|
||||
|
||||
let queue = newAsyncEventQueue[SubscriptionEvent](30)
|
||||
|
||||
let node = newTestDiscv5(
|
||||
privKey = privKey,
|
||||
bindIp = bindIp,
|
||||
tcpPort = tcpPort,
|
||||
udpPort = udpPort,
|
||||
record = record,
|
||||
queue = queue,
|
||||
)
|
||||
|
||||
let res = await node.start()
|
||||
assert res.isOk(), res.error
|
||||
|
||||
## Then
|
||||
queue.emit((kind: PubsubSub, topic: shard1))
|
||||
queue.emit((kind: PubsubSub, topic: shard2))
|
||||
queue.emit((kind: PubsubSub, topic: shard3))
|
||||
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
check:
|
||||
node.protocol.localNode.record.containsShard(shard1) == true
|
||||
node.protocol.localNode.record.containsShard(shard2) == true
|
||||
node.protocol.localNode.record.containsShard(shard3) == true
|
||||
|
||||
queue.emit((kind: PubsubSub, topic: shard1))
|
||||
queue.emit((kind: PubsubSub, topic: shard2))
|
||||
queue.emit((kind: PubsubSub, topic: shard3))
|
||||
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
check:
|
||||
node.protocol.localNode.record.containsShard(shard1) == true
|
||||
node.protocol.localNode.record.containsShard(shard2) == true
|
||||
node.protocol.localNode.record.containsShard(shard3) == true
|
||||
|
||||
queue.emit((kind: PubsubUnsub, topic: shard1))
|
||||
queue.emit((kind: PubsubUnsub, topic: shard2))
|
||||
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
check:
|
||||
node.protocol.localNode.record.containsShard(shard1) == false
|
||||
node.protocol.localNode.record.containsShard(shard2) == false
|
||||
node.protocol.localNode.record.containsShard(shard3) == true
|
||||
|
||||
## Cleanup
|
||||
await node.stop()
|
||||
|
||||
|
||||
1
tests/waku_discv5/test_all.nim
Normal file
1
tests/waku_discv5/test_all.nim
Normal file
@ -0,0 +1 @@
|
||||
import ./test_waku_discv5
|
||||
426
tests/waku_discv5/test_waku_discv5.nim
Normal file
426
tests/waku_discv5/test_waku_discv5.nim
Normal file
@ -0,0 +1,426 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[sequtils, algorithm],
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronos,
|
||||
chronicles,
|
||||
testutils/unittests,
|
||||
libp2p/crypto/crypto as libp2p_keys,
|
||||
eth/keys as eth_keys
|
||||
|
||||
import
|
||||
../../../waku/[waku_core/topics, waku_enr, waku_discv5, common/enr],
|
||||
../testlib/[wakucore, testasync, assertions, futures],
|
||||
../waku_enr/utils,
|
||||
./utils
|
||||
|
||||
import eth/p2p/discoveryv5/enr as ethEnr
|
||||
|
||||
procSuite "Waku Discovery v5":
|
||||
let
|
||||
rng = eth_keys.newRng()
|
||||
pk1 = eth_keys.PrivateKey.random(rng[])
|
||||
pk2 = eth_keys.PrivateKey.random(rng[])
|
||||
|
||||
enrNoCapabilities =
|
||||
initRecord(1, pk1, {"rs": @[0.byte, 0.byte, 1.byte, 0.byte, 0.byte]}).value()
|
||||
enrRelay =
|
||||
initRecord(
|
||||
1, pk2, {"waku2": @[1.byte], "rs": @[0.byte, 1.byte, 1.byte, 0.byte, 1.byte]}
|
||||
).value()
|
||||
enrNoShardingInfo = initRecord(1, pk1, {"waku2": @[1.byte]}).value()
|
||||
|
||||
suite "shardingPredicate":
|
||||
var
|
||||
recordCluster21 {.threadvar.}: Record
|
||||
recordCluster22Indices1 {.threadvar.}: Record
|
||||
recordCluster22Indices2 {.threadvar.}: Record
|
||||
|
||||
asyncSetup:
|
||||
recordCluster21 =
|
||||
block:
|
||||
let
|
||||
enrSeqNum = 1u64
|
||||
enrPrivKey = generatesecp256k1key()
|
||||
|
||||
let
|
||||
clusterId: uint16 = 21
|
||||
shardIds: seq[uint16] = @[1u16, 2u16, 5u16, 7u16, 9u16, 11u16]
|
||||
|
||||
let
|
||||
shardsTopics =
|
||||
RelayShards.init(clusterId, shardIds).expect("Valid shardIds")
|
||||
|
||||
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
|
||||
require builder.withWakuRelaySharding(shardsTopics).isOk()
|
||||
builder.withWakuCapabilities(Relay)
|
||||
|
||||
let recordRes = builder.build()
|
||||
require recordRes.isOk()
|
||||
recordRes.tryGet()
|
||||
|
||||
recordCluster22Indices1 =
|
||||
block:
|
||||
let
|
||||
enrSeqNum = 1u64
|
||||
enrPrivKey = generatesecp256k1key()
|
||||
|
||||
let
|
||||
clusterId: uint16 = 22
|
||||
shardIds: seq[uint16] = @[2u16, 4u16, 5u16, 8u16, 10u16, 12u16]
|
||||
|
||||
let
|
||||
shardsTopics =
|
||||
RelayShards.init(clusterId, shardIds).expect("Valid shardIds")
|
||||
|
||||
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
|
||||
require builder.withWakuRelaySharding(shardsTopics).isOk()
|
||||
builder.withWakuCapabilities(Relay)
|
||||
|
||||
let recordRes = builder.build()
|
||||
require recordRes.isOk()
|
||||
recordRes.tryGet()
|
||||
|
||||
recordCluster22Indices2 =
|
||||
block:
|
||||
let
|
||||
enrSeqNum = 1u64
|
||||
enrPrivKey = generatesecp256k1key()
|
||||
|
||||
let
|
||||
clusterId: uint16 = 22
|
||||
shardIds: seq[uint16] = @[1u16, 3u16, 6u16, 7u16, 9u16, 11u16]
|
||||
|
||||
let
|
||||
shardsTopics =
|
||||
RelayShards.init(clusterId, shardIds).expect("Valid shardIds")
|
||||
|
||||
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
|
||||
require builder.withWakuRelaySharding(shardsTopics).isOk()
|
||||
builder.withWakuCapabilities(Relay)
|
||||
|
||||
let recordRes = builder.build()
|
||||
require recordRes.isOk()
|
||||
recordRes.tryGet()
|
||||
|
||||
asyncTest "filter peer per contained shard":
|
||||
# When
|
||||
let predicateCluster21Op = shardingPredicate(recordCluster21)
|
||||
require predicateCluster21Op.isSome()
|
||||
let predicateCluster21 = predicateCluster21Op.get()
|
||||
|
||||
let predicateCluster22Op = shardingPredicate(recordCluster22Indices1)
|
||||
require predicateCluster22Op.isSome()
|
||||
let predicateCluster22 = predicateCluster22Op.get()
|
||||
|
||||
# Then
|
||||
check:
|
||||
predicateCluster21(recordCluster21) == true
|
||||
predicateCluster21(recordCluster22Indices1) == false
|
||||
predicateCluster21(recordCluster22Indices2) == false
|
||||
predicateCluster22(recordCluster21) == false
|
||||
predicateCluster22(recordCluster22Indices1) == true
|
||||
predicateCluster22(recordCluster22Indices2) == false
|
||||
|
||||
asyncTest "filter peer per bootnode":
|
||||
let
|
||||
predicateNoCapabilities =
|
||||
shardingPredicate(enrNoCapabilities, @[enrNoCapabilities]).get()
|
||||
predicateNoCapabilitiesWithBoth =
|
||||
shardingPredicate(enrNoCapabilities, @[enrNoCapabilities, enrRelay]).get()
|
||||
|
||||
check:
|
||||
predicateNoCapabilities(enrNoCapabilities) == true
|
||||
predicateNoCapabilities(enrRelay) == false
|
||||
predicateNoCapabilitiesWithBoth(enrNoCapabilities) == true
|
||||
predicateNoCapabilitiesWithBoth(enrRelay) == true
|
||||
|
||||
let
|
||||
predicateRelay = shardingPredicate(enrRelay, @[enrRelay]).get()
|
||||
predicateRelayWithBoth =
|
||||
shardingPredicate(enrRelay, @[enrRelay, enrNoCapabilities]).get()
|
||||
|
||||
check:
|
||||
predicateRelay(enrNoCapabilities) == false
|
||||
predicateRelay(enrRelay) == true
|
||||
predicateRelayWithBoth(enrNoCapabilities) == true
|
||||
predicateRelayWithBoth(enrRelay) == true
|
||||
|
||||
asyncTest "does not conform to typed record":
|
||||
let
|
||||
record = ethEnr.Record(raw: @[])
|
||||
predicateRecord = shardingPredicate(record, @[])
|
||||
|
||||
check:
|
||||
predicateRecord.isNone()
|
||||
|
||||
asyncTest "no relay sharding info":
|
||||
let
|
||||
predicateNoShardingInfo =
|
||||
shardingPredicate(enrNoShardingInfo, @[enrNoShardingInfo])
|
||||
|
||||
check:
|
||||
predicateNoShardingInfo.isNone()
|
||||
|
||||
suite "findRandomPeers":
|
||||
proc buildNode(
|
||||
tcpPort: uint16,
|
||||
udpPort: uint16,
|
||||
bindIp: string = "0.0.0.0",
|
||||
extIp: string = "127.0.0.1",
|
||||
indices: seq[uint64] = @[],
|
||||
recordFlags: Option[CapabilitiesBitfield] = none(CapabilitiesBitfield),
|
||||
bootstrapRecords: seq[waku_enr.Record] = @[],
|
||||
): (WakuDiscoveryV5, Record) =
|
||||
let
|
||||
privKey = generateSecp256k1Key()
|
||||
record =
|
||||
newTestEnrRecord(
|
||||
privKey = privKey,
|
||||
extIp = extIp,
|
||||
tcpPort = tcpPort,
|
||||
udpPort = udpPort,
|
||||
indices = indices,
|
||||
flags = recordFlags,
|
||||
)
|
||||
node =
|
||||
newTestDiscv5(
|
||||
privKey = privKey,
|
||||
bindIp = bindIp,
|
||||
tcpPort = tcpPort,
|
||||
udpPort = udpPort,
|
||||
record = record,
|
||||
bootstrapRecords = bootstrapRecords,
|
||||
)
|
||||
|
||||
(node, record)
|
||||
|
||||
let
|
||||
filterForStore: WakuDiscv5Predicate =
|
||||
proc(record: waku_enr.Record): bool =
|
||||
let typedRecord = record.toTyped()
|
||||
if typedRecord.isErr():
|
||||
return false
|
||||
|
||||
let capabilities = typedRecord.value.waku2
|
||||
if capabilities.isNone():
|
||||
return false
|
||||
|
||||
return capabilities.get().supportsCapability(Capabilities.Store)
|
||||
|
||||
asyncTest "find random peers without predicate":
|
||||
# Given 3 nodes
|
||||
let
|
||||
(node1, record1) = buildNode(tcpPort = 61500u16, udpPort = 9000u16)
|
||||
(node2, record2) = buildNode(tcpPort = 61502u16, udpPort = 9002u16)
|
||||
(node3, record3) =
|
||||
buildNode(
|
||||
tcpPort = 61504u16,
|
||||
udpPort = 9004u16,
|
||||
bootstrapRecords = @[record1, record2],
|
||||
)
|
||||
|
||||
let res1 = await node1.start()
|
||||
assertResultOk res1
|
||||
|
||||
let res2 = await node2.start()
|
||||
assertResultOk res2
|
||||
|
||||
let res3 = await node3.start()
|
||||
assertResultOk res3
|
||||
|
||||
await sleepAsync(FUTURE_TIMEOUT)
|
||||
|
||||
## When we find random peers
|
||||
let res = await node3.findRandomPeers()
|
||||
|
||||
var tcpPortList = res.mapIt(it.toTypedRecord().value().tcp.get())
|
||||
tcpPortList.sort()
|
||||
|
||||
## Then
|
||||
check:
|
||||
res.len == 2
|
||||
tcpPortList == @[61500, 61502]
|
||||
|
||||
## Cleanup
|
||||
await allFutures(node1.stop(), node2.stop(), node3.stop())
|
||||
|
||||
asyncTest "find random peers with parameter predicate":
|
||||
# Given 4 nodes
|
||||
let
|
||||
(node3, record3) =
|
||||
buildNode(
|
||||
tcpPort = 61504u16,
|
||||
udpPort = 9004u16,
|
||||
recordFlags =
|
||||
some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Filter)),
|
||||
)
|
||||
(node4, record4) =
|
||||
buildNode(
|
||||
tcpPort = 61506u16,
|
||||
udpPort = 9006u16,
|
||||
recordFlags =
|
||||
some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)),
|
||||
)
|
||||
(node2, record2) =
|
||||
buildNode(
|
||||
tcpPort = 61502u16,
|
||||
udpPort = 9002u16,
|
||||
recordFlags =
|
||||
some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)),
|
||||
bootstrapRecords = @[record3, record4],
|
||||
)
|
||||
(node1, record1) =
|
||||
buildNode(
|
||||
tcpPort = 61500u16,
|
||||
udpPort = 9000u16,
|
||||
recordFlags = some(CapabilitiesBitfield.init(Capabilities.Relay)),
|
||||
bootstrapRecords = @[record2],
|
||||
)
|
||||
|
||||
# Start nodes' discoveryV5 protocols
|
||||
let res1 = await node1.start()
|
||||
assertResultOk res1
|
||||
|
||||
let res2 = await node2.start()
|
||||
assertResultOk res2
|
||||
|
||||
let res3 = await node3.start()
|
||||
assertResultOk res3
|
||||
|
||||
let res4 = await node4.start()
|
||||
assertResultOk res4
|
||||
|
||||
await sleepAsync(FUTURE_TIMEOUT)
|
||||
|
||||
## When
|
||||
let peers = await node1.findRandomPeers(some(filterForStore))
|
||||
|
||||
## Then
|
||||
check:
|
||||
peers.len >= 1
|
||||
peers.allIt(it.supportsCapability(Capabilities.Store))
|
||||
|
||||
# Cleanup
|
||||
await allFutures(node1.stop(), node2.stop(), node3.stop(), node4.stop())
|
||||
|
||||
asyncTest "find random peers with instance predicate":
|
||||
## Setup
|
||||
# Records
|
||||
let
|
||||
(node3, record3) =
|
||||
buildNode(
|
||||
tcpPort = 61504u16,
|
||||
udpPort = 9004u16,
|
||||
recordFlags =
|
||||
some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Filter)),
|
||||
)
|
||||
(node4, record4) =
|
||||
buildNode(
|
||||
tcpPort = 61506u16,
|
||||
udpPort = 9006u16,
|
||||
recordFlags =
|
||||
some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)),
|
||||
)
|
||||
(node2, record2) =
|
||||
buildNode(
|
||||
tcpPort = 61502u16,
|
||||
udpPort = 9002u16,
|
||||
recordFlags =
|
||||
some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)),
|
||||
bootstrapRecords = @[record3, record4],
|
||||
)
|
||||
let
|
||||
(node1, record1) =
|
||||
buildNode(
|
||||
tcpPort = 61500u16,
|
||||
udpPort = 9000u16,
|
||||
recordFlags = some(CapabilitiesBitfield.init(Capabilities.Relay)),
|
||||
indices = @[0u64, 0u64, 1u64, 0u64, 0u64],
|
||||
bootstrapRecords = @[record2],
|
||||
)
|
||||
|
||||
# Start nodes' discoveryV5 protocols
|
||||
let res1 = await node1.start()
|
||||
assertResultOk res1
|
||||
|
||||
let res2 = await node2.start()
|
||||
assertResultOk res2
|
||||
|
||||
let res3 = await node3.start()
|
||||
assertResultOk res3
|
||||
|
||||
let res4 = await node4.start()
|
||||
assertResultOk res4
|
||||
|
||||
await sleepAsync(FUTURE_TIMEOUT)
|
||||
|
||||
## When
|
||||
let peers = await node1.findRandomPeers()
|
||||
|
||||
## Then
|
||||
check:
|
||||
peers.len >= 1
|
||||
peers.allIt(it.supportsCapability(Capabilities.Store))
|
||||
|
||||
# Cleanup
|
||||
await allFutures(node1.stop(), node2.stop(), node3.stop(), node4.stop())
|
||||
|
||||
suite "addBoostrapNode":
|
||||
let
|
||||
validEnr =
|
||||
"enr:-I-4QG3mX250ArniAs2DLpW-QHOLKSD5x_Ibp8AYcQZbz1HhHFJtl2dNDGcha" &
|
||||
"U5ugLbDKRgtTDZH8NsxXlTXDpYAgzgBgmlkgnY0gnJzjwAVBgABAAIABQAHAAkAC4" &
|
||||
"lzZWNwMjU2azGhA4_KwN0NRRmmfQ-B9B2h2PZjoJvBnaIOi6sR_b2UTQBBhXdha3U" & "yAQ"
|
||||
|
||||
asyncTest "address is valid":
|
||||
# Given an empty list of enrs
|
||||
var enrs: seq[Record] = @[]
|
||||
|
||||
# When adding a valid enr
|
||||
addBootstrapNode(validEnr, enrs)
|
||||
var r: Record
|
||||
echo r.fromURI(validEnr)
|
||||
echo r
|
||||
|
||||
# Then the enr is added to the list
|
||||
check:
|
||||
enrs.len == 1
|
||||
enrs[0].toBase64() == validEnr[4..^1]
|
||||
|
||||
asyncTest "address is empty":
|
||||
# Given an empty list of enrs
|
||||
var enrs: seq[Record] = @[]
|
||||
|
||||
# When adding an empty enr
|
||||
addBootstrapNode("", enrs)
|
||||
|
||||
# Then the enr is not added to the list
|
||||
check:
|
||||
enrs.len == 0
|
||||
|
||||
asyncTest "address is valid but starts with #":
|
||||
# Given an empty list of enrs
|
||||
var enrs: seq[Record] = @[]
|
||||
|
||||
# When adding any enr that starts with #
|
||||
let enr = "#" & validEnr
|
||||
addBootstrapNode(enr, enrs)
|
||||
|
||||
# Then the enr is not added to the list
|
||||
check:
|
||||
enrs.len == 0
|
||||
|
||||
asyncTest "address is not valid":
|
||||
# Given an empty list of enrs
|
||||
var enrs: seq[Record] = @[]
|
||||
|
||||
# When adding an invalid enr
|
||||
let enr = "enr:invalid"
|
||||
addBootstrapNode(enr, enrs)
|
||||
|
||||
# Then the enr is not added to the list
|
||||
check:
|
||||
enrs.len == 0
|
||||
36
tests/waku_discv5/utils.nim
Normal file
36
tests/waku_discv5/utils.nim
Normal file
@ -0,0 +1,36 @@
|
||||
import
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronos,
|
||||
libp2p/crypto/crypto as libp2p_keys,
|
||||
eth/keys as eth_keys
|
||||
|
||||
import
|
||||
../../../waku/[waku_core/topics, waku_enr, waku_discv5],
|
||||
../testlib/[common, wakucore]
|
||||
|
||||
proc newTestDiscv5*(
|
||||
privKey: libp2p_keys.PrivateKey,
|
||||
bindIp: string,
|
||||
tcpPort: uint16,
|
||||
udpPort: uint16,
|
||||
record: waku_enr.Record,
|
||||
bootstrapRecords = newSeq[waku_enr.Record](),
|
||||
queue = newAsyncEventQueue[SubscriptionEvent](30),
|
||||
): WakuDiscoveryV5 =
|
||||
let
|
||||
config =
|
||||
WakuDiscoveryV5Config(
|
||||
privateKey: eth_keys.PrivateKey(privKey.skkey),
|
||||
address: parseIpAddress(bindIp),
|
||||
port: Port(udpPort),
|
||||
bootstrapRecords: bootstrapRecords,
|
||||
)
|
||||
|
||||
let
|
||||
discv5 =
|
||||
WakuDiscoveryV5.new(
|
||||
rng = rng(), conf = config, record = some(record), queue = queue
|
||||
)
|
||||
|
||||
return discv5
|
||||
1
tests/waku_enr/test_all.nim
Normal file
1
tests/waku_enr/test_all.nim
Normal file
@ -0,0 +1 @@
|
||||
import ./test_sharding
|
||||
125
tests/waku_enr/test_sharding.nim
Normal file
125
tests/waku_enr/test_sharding.nim
Normal file
@ -0,0 +1,125 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronos,
|
||||
testutils/unittests,
|
||||
libp2p/crypto/crypto as libp2p_keys,
|
||||
eth/keys as eth_keys
|
||||
|
||||
import
|
||||
../../../waku/[waku_enr, waku_discv5, waku_core],
|
||||
../testlib/wakucore,
|
||||
../waku_discv5/utils,
|
||||
./utils
|
||||
|
||||
suite "Sharding":
|
||||
suite "topicsToRelayShards":
|
||||
asyncTest "get shards from topics":
|
||||
## Given
|
||||
let mixedTopics = @["/waku/2/thisisatest", "/waku/2/rs/0/2", "/waku/2/rs/0/8"]
|
||||
let shardedTopics = @["/waku/2/rs/0/2", "/waku/2/rs/0/4", "/waku/2/rs/0/8"]
|
||||
let
|
||||
namedTopics =
|
||||
@["/waku/2/thisisatest", "/waku/2/atestthisis", "/waku/2/isthisatest"]
|
||||
let
|
||||
gibberish =
|
||||
@["aedyttydcb/uioasduyio", "jhdfsjhlsdfjhk/sadjhk", "khfsd/hjfdsgjh/dfs"]
|
||||
let empty: seq[string] = @[]
|
||||
|
||||
let
|
||||
shardsTopics =
|
||||
RelayShards.init(0, @[uint16(2), uint16(4), uint16(8)]).expect(
|
||||
"Valid shardIds"
|
||||
)
|
||||
|
||||
## When
|
||||
|
||||
let mixedRes = topicsToRelayShards(mixedTopics)
|
||||
let shardedRes = topicsToRelayShards(shardedTopics)
|
||||
let namedRes = topicsToRelayShards(namedTopics)
|
||||
let gibberishRes = topicsToRelayShards(gibberish)
|
||||
let emptyRes = topicsToRelayShards(empty)
|
||||
|
||||
## Then
|
||||
assert mixedRes.isErr(), $mixedRes.value
|
||||
assert shardedRes.isOk(), shardedRes.error
|
||||
assert shardedRes.value.isSome()
|
||||
assert shardedRes.value.get() == shardsTopics, $shardedRes.value.get()
|
||||
assert namedRes.isOk(), namedRes.error
|
||||
assert namedRes.value.isNone(), $namedRes.value
|
||||
assert gibberishRes.isErr(), $gibberishRes.value
|
||||
assert emptyRes.isOk(), emptyRes.error
|
||||
assert emptyRes.value.isNone(), $emptyRes.value
|
||||
|
||||
suite "containsShard":
|
||||
asyncTest "update ENR from subscriptions":
|
||||
## Given
|
||||
let
|
||||
shard1 = "/waku/2/rs/0/1"
|
||||
shard2 = "/waku/2/rs/0/2"
|
||||
shard3 = "/waku/2/rs/0/3"
|
||||
privKey = generateSecp256k1Key()
|
||||
bindIp = "0.0.0.0"
|
||||
extIp = "127.0.0.1"
|
||||
tcpPort = 61500u16
|
||||
udpPort = 9000u16
|
||||
|
||||
let
|
||||
record =
|
||||
newTestEnrRecord(
|
||||
privKey = privKey, extIp = extIp, tcpPort = tcpPort, udpPort = udpPort
|
||||
)
|
||||
|
||||
let queue = newAsyncEventQueue[SubscriptionEvent](30)
|
||||
|
||||
let
|
||||
node =
|
||||
newTestDiscv5(
|
||||
privKey = privKey,
|
||||
bindIp = bindIp,
|
||||
tcpPort = tcpPort,
|
||||
udpPort = udpPort,
|
||||
record = record,
|
||||
queue = queue,
|
||||
)
|
||||
|
||||
let res = await node.start()
|
||||
assert res.isOk(), res.error
|
||||
|
||||
## Then
|
||||
queue.emit((kind: PubsubSub, topic: shard1))
|
||||
queue.emit((kind: PubsubSub, topic: shard2))
|
||||
queue.emit((kind: PubsubSub, topic: shard3))
|
||||
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
check:
|
||||
node.protocol.localNode.record.containsShard(shard1) == true
|
||||
node.protocol.localNode.record.containsShard(shard2) == true
|
||||
node.protocol.localNode.record.containsShard(shard3) == true
|
||||
|
||||
queue.emit((kind: PubsubSub, topic: shard1))
|
||||
queue.emit((kind: PubsubSub, topic: shard2))
|
||||
queue.emit((kind: PubsubSub, topic: shard3))
|
||||
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
check:
|
||||
node.protocol.localNode.record.containsShard(shard1) == true
|
||||
node.protocol.localNode.record.containsShard(shard2) == true
|
||||
node.protocol.localNode.record.containsShard(shard3) == true
|
||||
|
||||
queue.emit((kind: PubsubUnsub, topic: shard1))
|
||||
queue.emit((kind: PubsubUnsub, topic: shard2))
|
||||
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
check:
|
||||
node.protocol.localNode.record.containsShard(shard1) == false
|
||||
node.protocol.localNode.record.containsShard(shard2) == false
|
||||
node.protocol.localNode.record.containsShard(shard3) == true
|
||||
|
||||
## Cleanup
|
||||
await node.stop()
|
||||
37
tests/waku_enr/utils.nim
Normal file
37
tests/waku_enr/utils.nim
Normal file
@ -0,0 +1,37 @@
|
||||
import
|
||||
sequtils,
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronos,
|
||||
libp2p/crypto/crypto as libp2p_keys,
|
||||
eth/keys as eth_keys
|
||||
|
||||
import
|
||||
../../../waku/[waku_core/topics, waku_enr, waku_discv5, waku_enr/sharding],
|
||||
../testlib/[common, wakucore]
|
||||
|
||||
proc newTestEnrRecord*(
|
||||
privKey: libp2p_keys.PrivateKey,
|
||||
extIp: string,
|
||||
tcpPort: uint16,
|
||||
udpPort: uint16,
|
||||
indices: seq[uint64] = @[],
|
||||
flags = none(CapabilitiesBitfield),
|
||||
): waku_enr.Record =
|
||||
var builder = EnrBuilder.init(privKey)
|
||||
builder.withIpAddressAndPorts(
|
||||
ipAddr = some(parseIpAddress(extIp)),
|
||||
tcpPort = some(Port(tcpPort)),
|
||||
udpPort = some(Port(udpPort)),
|
||||
)
|
||||
|
||||
if indices.len > 0:
|
||||
let
|
||||
byteSeq: seq[byte] = indices.mapIt(cast[byte](it))
|
||||
relayShards = fromIndicesList(byteSeq).get()
|
||||
discard builder.withWakuRelayShardingIndicesList(relayShards)
|
||||
|
||||
if flags.isSome():
|
||||
builder.withWakuCapabilities(flags.get())
|
||||
|
||||
builder.build().tryGet()
|
||||
@ -113,7 +113,7 @@ func toIndicesList*(rs: RelayShards): EnrResult[seq[byte]] =
|
||||
|
||||
ok(res)
|
||||
|
||||
func fromIndicesList(buf: seq[byte]): Result[RelayShards, string] =
|
||||
func fromIndicesList*(buf: seq[byte]): Result[RelayShards, string] =
|
||||
if buf.len < 3:
|
||||
return err("insufficient data: expected at least 3 bytes, got " & $buf.len & " bytes")
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user