mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
chore: mics. improvements to cluster id and shards setup (#2187)
This commit is contained in:
parent
54ec62506e
commit
0be13a356f
@ -127,6 +127,21 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
|
|||||||
quit(QuitFailure)
|
quit(QuitFailure)
|
||||||
else: recordRes.get()
|
else: recordRes.get()
|
||||||
|
|
||||||
|
# Check the ENR sharding info for matching config cluster id
|
||||||
|
if conf.clusterId != 0:
|
||||||
|
let res = record.toTyped()
|
||||||
|
if res.isErr():
|
||||||
|
error "ENR setup failed", error = $res.get()
|
||||||
|
quit(QuitFailure)
|
||||||
|
|
||||||
|
let relayShard = res.get().relaySharding().valueOr:
|
||||||
|
error "no sharding info"
|
||||||
|
quit(QuitFailure)
|
||||||
|
|
||||||
|
if conf.clusterId != relayShard.clusterId:
|
||||||
|
error "cluster id mismatch"
|
||||||
|
quit(QuitFailure)
|
||||||
|
|
||||||
App(
|
App(
|
||||||
version: git_version,
|
version: git_version,
|
||||||
conf: conf,
|
conf: conf,
|
||||||
@ -234,7 +249,13 @@ proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
|
|||||||
autoupdateRecord: app.conf.discv5EnrAutoUpdate,
|
autoupdateRecord: app.conf.discv5EnrAutoUpdate,
|
||||||
)
|
)
|
||||||
|
|
||||||
WakuDiscoveryV5.new(app.rng, discv5Conf, some(app.record))
|
WakuDiscoveryV5.new(
|
||||||
|
app.rng,
|
||||||
|
discv5Conf,
|
||||||
|
some(app.record),
|
||||||
|
some(app.node.peerManager),
|
||||||
|
app.node.topicSubscriptionQueue,
|
||||||
|
)
|
||||||
|
|
||||||
## Init waku node instance
|
## Init waku node instance
|
||||||
|
|
||||||
@ -286,11 +307,6 @@ proc initNode(conf: WakuNodeConf,
|
|||||||
ok(node)
|
ok(node)
|
||||||
|
|
||||||
proc setupWakuApp*(app: var App): AppResult[void] =
|
proc setupWakuApp*(app: var App): AppResult[void] =
|
||||||
|
|
||||||
## Discv5
|
|
||||||
if app.conf.discv5Discovery:
|
|
||||||
app.wakuDiscV5 = some(app.setupDiscoveryV5())
|
|
||||||
|
|
||||||
## Waku node
|
## Waku node
|
||||||
let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes)
|
let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes)
|
||||||
if initNodeRes.isErr():
|
if initNodeRes.isErr():
|
||||||
@ -298,6 +314,10 @@ proc setupWakuApp*(app: var App): AppResult[void] =
|
|||||||
|
|
||||||
app.node = initNodeRes.get()
|
app.node = initNodeRes.get()
|
||||||
|
|
||||||
|
## Discv5
|
||||||
|
if app.conf.discv5Discovery:
|
||||||
|
app.wakuDiscV5 = some(app.setupDiscoveryV5())
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc getPorts(listenAddrs: seq[MultiAddress]):
|
proc getPorts(listenAddrs: seq[MultiAddress]):
|
||||||
@ -341,7 +361,17 @@ proc updateNetConfig(app: var App): AppResult[void] =
|
|||||||
proc updateEnr(app: var App): AppResult[void] =
|
proc updateEnr(app: var App): AppResult[void] =
|
||||||
|
|
||||||
let record = enrConfiguration(app.conf, app.netConf, app.key).valueOr:
|
let record = enrConfiguration(app.conf, app.netConf, app.key).valueOr:
|
||||||
return err(error)
|
return err("ENR setup failed: " & error)
|
||||||
|
|
||||||
|
if app.conf.clusterId != 0:
|
||||||
|
let tRecord = record.toTyped().valueOr:
|
||||||
|
return err("ENR setup failed: " & $error)
|
||||||
|
|
||||||
|
let relayShard = tRecord.relaySharding().valueOr:
|
||||||
|
return err("ENR setup failed: no sharding info")
|
||||||
|
|
||||||
|
if app.conf.clusterId != relayShard.clusterId:
|
||||||
|
return err("ENR setup failed: cluster id mismatch")
|
||||||
|
|
||||||
app.record = record
|
app.record = record
|
||||||
app.node.enr = record
|
app.node.enr = record
|
||||||
@ -377,6 +407,9 @@ proc setupProtocols(node: WakuNode,
|
|||||||
## Optionally include persistent message storage.
|
## Optionally include persistent message storage.
|
||||||
## No protocols are started yet.
|
## No protocols are started yet.
|
||||||
|
|
||||||
|
node.mountMetadata(conf.clusterId).isOkOr:
|
||||||
|
return err("failed to mount waku metadata protocol: " & error)
|
||||||
|
|
||||||
# Mount relay on all nodes
|
# Mount relay on all nodes
|
||||||
var peerExchangeHandler = none(RoutingRecordsHandler)
|
var peerExchangeHandler = none(RoutingRecordsHandler)
|
||||||
if conf.relayPeerExchange:
|
if conf.relayPeerExchange:
|
||||||
@ -587,11 +620,12 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
|
|||||||
|
|
||||||
proc startApp*(app: var App): AppResult[void] =
|
proc startApp*(app: var App): AppResult[void] =
|
||||||
|
|
||||||
try:
|
let nodeRes = catch: (waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes))
|
||||||
(waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes)).isOkOr:
|
if nodeRes.isErr():
|
||||||
return err(error)
|
return err("exception starting node: " & nodeRes.error.msg)
|
||||||
except CatchableError:
|
|
||||||
return err("exception starting node: " & getCurrentExceptionMsg())
|
nodeRes.get().isOkOr:
|
||||||
|
return err("exception starting node: " & error)
|
||||||
|
|
||||||
# Update app data that is set dynamically on node start
|
# Update app data that is set dynamically on node start
|
||||||
app.updateApp().isOkOr:
|
app.updateApp().isOkOr:
|
||||||
@ -599,13 +633,12 @@ proc startApp*(app: var App): AppResult[void] =
|
|||||||
|
|
||||||
if app.wakuDiscv5.isSome():
|
if app.wakuDiscv5.isSome():
|
||||||
let wakuDiscv5 = app.wakuDiscv5.get()
|
let wakuDiscv5 = app.wakuDiscv5.get()
|
||||||
|
let catchRes = catch: (waitFor wakuDiscv5.start())
|
||||||
|
let startRes = catchRes.valueOr:
|
||||||
|
return err("failed to start waku discovery v5: " & catchRes.error.msg)
|
||||||
|
|
||||||
let res = wakuDiscv5.start()
|
startRes.isOkOr:
|
||||||
if res.isErr():
|
return err("failed to start waku discovery v5: " & error)
|
||||||
return err("failed to start waku discovery v5: " & $res.error)
|
|
||||||
|
|
||||||
asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager)
|
|
||||||
asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue)
|
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
|
|||||||
@ -51,31 +51,32 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
|||||||
var bootstrapNodeEnr: enr.Record
|
var bootstrapNodeEnr: enr.Record
|
||||||
discard bootstrapNodeEnr.fromURI(bootstrapNode)
|
discard bootstrapNodeEnr.fromURI(bootstrapNode)
|
||||||
|
|
||||||
|
let discv5Conf = WakuDiscoveryV5Config(
|
||||||
|
discv5Config: none(DiscoveryConfig),
|
||||||
|
address: ip,
|
||||||
|
port: Port(discv5Port),
|
||||||
|
privateKey: keys.PrivateKey(nodeKey.skkey),
|
||||||
|
bootstrapRecords: @[bootstrapNodeEnr],
|
||||||
|
autoupdateRecord: true,
|
||||||
|
)
|
||||||
|
|
||||||
# assumes behind a firewall, so not care about being discoverable
|
# assumes behind a firewall, so not care about being discoverable
|
||||||
let wakuDiscv5 = WakuDiscoveryV5.new(
|
let wakuDiscv5 = WakuDiscoveryV5.new(
|
||||||
extIp= none(ValidIpAddress),
|
node.rng,
|
||||||
extTcpPort = none(Port),
|
discv5Conf,
|
||||||
extUdpPort = none(Port),
|
some(node.enr),
|
||||||
bindIP = ip,
|
some(node.peerManager),
|
||||||
discv5UdpPort = Port(discv5Port),
|
node.topicSubscriptionQueue,
|
||||||
bootstrapEnrs = @[bootstrapNodeEnr],
|
|
||||||
privateKey = keys.PrivateKey(nodeKey.skkey),
|
|
||||||
flags = flags,
|
|
||||||
rng = node.rng,
|
|
||||||
topics = @[],
|
|
||||||
)
|
)
|
||||||
|
|
||||||
await node.start()
|
await node.start()
|
||||||
await node.mountRelay()
|
await node.mountRelay()
|
||||||
node.peerManager.start()
|
node.peerManager.start()
|
||||||
|
|
||||||
let discv5Res = wakuDiscv5.start()
|
(await wakuDiscv5.start()).isOkOr:
|
||||||
if discv5Res.isErr():
|
error "failed to start discv5", error = error
|
||||||
error "failed to start discv5", error= discv5Res.error
|
|
||||||
quit(1)
|
quit(1)
|
||||||
|
|
||||||
asyncSpawn wakuDiscv5.searchLoop(node.peerManager)
|
|
||||||
|
|
||||||
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
|
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
|
||||||
while true:
|
while true:
|
||||||
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
|
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
|
||||||
|
|||||||
@ -46,31 +46,32 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
|
|||||||
var bootstrapNodeEnr: enr.Record
|
var bootstrapNodeEnr: enr.Record
|
||||||
discard bootstrapNodeEnr.fromURI(bootstrapNode)
|
discard bootstrapNodeEnr.fromURI(bootstrapNode)
|
||||||
|
|
||||||
|
let discv5Conf = WakuDiscoveryV5Config(
|
||||||
|
discv5Config: none(DiscoveryConfig),
|
||||||
|
address: ip,
|
||||||
|
port: Port(discv5Port),
|
||||||
|
privateKey: keys.PrivateKey(nodeKey.skkey),
|
||||||
|
bootstrapRecords: @[bootstrapNodeEnr],
|
||||||
|
autoupdateRecord: true,
|
||||||
|
)
|
||||||
|
|
||||||
# assumes behind a firewall, so not care about being discoverable
|
# assumes behind a firewall, so not care about being discoverable
|
||||||
let wakuDiscv5 = WakuDiscoveryV5.new(
|
let wakuDiscv5 = WakuDiscoveryV5.new(
|
||||||
extIp= none(ValidIpAddress),
|
node.rng,
|
||||||
extTcpPort = none(Port),
|
discv5Conf,
|
||||||
extUdpPort = none(Port),
|
some(node.enr),
|
||||||
bindIP = ip,
|
some(node.peerManager),
|
||||||
discv5UdpPort = Port(discv5Port),
|
node.topicSubscriptionQueue,
|
||||||
bootstrapEnrs = @[bootstrapNodeEnr],
|
|
||||||
privateKey = keys.PrivateKey(nodeKey.skkey),
|
|
||||||
flags = flags,
|
|
||||||
rng = node.rng,
|
|
||||||
topics = @[],
|
|
||||||
)
|
)
|
||||||
|
|
||||||
await node.start()
|
await node.start()
|
||||||
await node.mountRelay()
|
await node.mountRelay()
|
||||||
node.peerManager.start()
|
node.peerManager.start()
|
||||||
|
|
||||||
let discv5Res = wakuDiscv5.start()
|
(await wakuDiscv5.start()).isOkOr:
|
||||||
if discv5Res.isErr():
|
error "failed to start discv5", error = error
|
||||||
error "failed to start discv5", error = discv5Res.error
|
|
||||||
quit(1)
|
quit(1)
|
||||||
|
|
||||||
asyncSpawn wakuDiscv5.searchLoop(node.peerManager)
|
|
||||||
|
|
||||||
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
|
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
|
||||||
while true:
|
while true:
|
||||||
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
|
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
|
||||||
|
|||||||
@ -268,16 +268,38 @@ procSuite "Peer Manager":
|
|||||||
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
||||||
|
|
||||||
asyncTest "Peer manager drops conections to peers on different networks":
|
asyncTest "Peer manager drops conections to peers on different networks":
|
||||||
let clusterId1 = 1.uint32
|
let clusterId3 = 3.uint32
|
||||||
let clusterId2 = 2.uint32
|
let clusterId4 = 4.uint32
|
||||||
|
|
||||||
let
|
let
|
||||||
# different network
|
# different network
|
||||||
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId1)
|
node1 = newTestWakuNode(
|
||||||
|
generateSecp256k1Key(),
|
||||||
|
ValidIpAddress.init("0.0.0.0"),
|
||||||
|
Port(0),
|
||||||
|
clusterId = clusterId3,
|
||||||
|
topics = @["/waku/2/rs/3/0"],
|
||||||
|
)
|
||||||
|
|
||||||
# same network
|
# same network
|
||||||
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)
|
node2 = newTestWakuNode(
|
||||||
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)
|
generateSecp256k1Key(),
|
||||||
|
ValidIpAddress.init("0.0.0.0"),
|
||||||
|
Port(0),
|
||||||
|
clusterId = clusterId4,
|
||||||
|
topics = @["/waku/2/rs/4/0"],
|
||||||
|
)
|
||||||
|
node3 = newTestWakuNode(
|
||||||
|
generateSecp256k1Key(),
|
||||||
|
ValidIpAddress.init("0.0.0.0"),
|
||||||
|
Port(0),
|
||||||
|
clusterId = clusterId4,
|
||||||
|
topics = @["/waku/2/rs/4/0"],
|
||||||
|
)
|
||||||
|
|
||||||
|
discard node1.mountMetadata(clusterId3)
|
||||||
|
discard node2.mountMetadata(clusterId4)
|
||||||
|
discard node3.mountMetadata(clusterId4)
|
||||||
|
|
||||||
# Start nodes
|
# Start nodes
|
||||||
await allFutures([node1.start(), node2.start(), node3.start()])
|
await allFutures([node1.start(), node2.start(), node3.start()])
|
||||||
|
|||||||
@ -34,10 +34,13 @@ proc newTestEnrRecord(privKey: libp2p_keys.PrivateKey,
|
|||||||
builder.build().tryGet()
|
builder.build().tryGet()
|
||||||
|
|
||||||
|
|
||||||
proc newTestDiscv5(privKey: libp2p_keys.PrivateKey,
|
proc newTestDiscv5(
|
||||||
|
privKey: libp2p_keys.PrivateKey,
|
||||||
bindIp: string, tcpPort: uint16, udpPort: uint16,
|
bindIp: string, tcpPort: uint16, udpPort: uint16,
|
||||||
record: waku_enr.Record,
|
record: waku_enr.Record,
|
||||||
bootstrapRecords = newSeq[waku_enr.Record]()): WakuDiscoveryV5 =
|
bootstrapRecords = newSeq[waku_enr.Record](),
|
||||||
|
queue = newAsyncEventQueue[SubscriptionEvent](30),
|
||||||
|
): WakuDiscoveryV5 =
|
||||||
let config = WakuDiscoveryV5Config(
|
let config = WakuDiscoveryV5Config(
|
||||||
privateKey: eth_keys.PrivateKey(privKey.skkey),
|
privateKey: eth_keys.PrivateKey(privKey.skkey),
|
||||||
address: ValidIpAddress.init(bindIp),
|
address: ValidIpAddress.init(bindIp),
|
||||||
@ -45,7 +48,12 @@ proc newTestDiscv5(privKey: libp2p_keys.PrivateKey,
|
|||||||
bootstrapRecords: bootstrapRecords,
|
bootstrapRecords: bootstrapRecords,
|
||||||
)
|
)
|
||||||
|
|
||||||
let discv5 = WakuDiscoveryV5.new(rng(), config, some(record))
|
let discv5 = WakuDiscoveryV5.new(
|
||||||
|
rng = rng(),
|
||||||
|
conf = config,
|
||||||
|
record = some(record),
|
||||||
|
queue = queue,
|
||||||
|
)
|
||||||
|
|
||||||
return discv5
|
return discv5
|
||||||
|
|
||||||
@ -122,13 +130,13 @@ procSuite "Waku Discovery v5":
|
|||||||
bootstrapRecords = @[record1, record2]
|
bootstrapRecords = @[record1, record2]
|
||||||
)
|
)
|
||||||
|
|
||||||
let res1 = node1.start()
|
let res1 = await node1.start()
|
||||||
assert res1.isOk(), res1.error
|
assert res1.isOk(), res1.error
|
||||||
|
|
||||||
let res2 = node2.start()
|
let res2 = await node2.start()
|
||||||
assert res2.isOk(), res2.error
|
assert res2.isOk(), res2.error
|
||||||
|
|
||||||
let res3 = node3.start()
|
let res3 = await node3.start()
|
||||||
assert res3.isOk(), res3.error
|
assert res3.isOk(), res3.error
|
||||||
|
|
||||||
## When
|
## When
|
||||||
@ -240,16 +248,16 @@ procSuite "Waku Discovery v5":
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Start nodes' discoveryV5 protocols
|
# Start nodes' discoveryV5 protocols
|
||||||
let res1 = node1.start()
|
let res1 = await node1.start()
|
||||||
assert res1.isOk(), res1.error
|
assert res1.isOk(), res1.error
|
||||||
|
|
||||||
let res2 = node2.start()
|
let res2 = await node2.start()
|
||||||
assert res2.isOk(), res2.error
|
assert res2.isOk(), res2.error
|
||||||
|
|
||||||
let res3 = node3.start()
|
let res3 = await node3.start()
|
||||||
assert res3.isOk(), res3.error
|
assert res3.isOk(), res3.error
|
||||||
|
|
||||||
let res4 = node4.start()
|
let res4 = await node4.start()
|
||||||
assert res4.isOk(), res4.error
|
assert res4.isOk(), res4.error
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
@ -401,22 +409,20 @@ procSuite "Waku Discovery v5":
|
|||||||
udpPort = udpPort,
|
udpPort = udpPort,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
let queue = newAsyncEventQueue[SubscriptionEvent](30)
|
||||||
|
|
||||||
let node = newTestDiscv5(
|
let node = newTestDiscv5(
|
||||||
privKey = privKey,
|
privKey = privKey,
|
||||||
bindIp = bindIp,
|
bindIp = bindIp,
|
||||||
tcpPort = tcpPort,
|
tcpPort = tcpPort,
|
||||||
udpPort = udpPort,
|
udpPort = udpPort,
|
||||||
record = record
|
record = record,
|
||||||
|
queue = queue,
|
||||||
)
|
)
|
||||||
|
|
||||||
let res = node.start()
|
let res = await node.start()
|
||||||
assert res.isOk(), res.error
|
assert res.isOk(), res.error
|
||||||
|
|
||||||
let queue = newAsyncEventQueue[SubscriptionEvent](0)
|
|
||||||
|
|
||||||
## When
|
|
||||||
asyncSpawn node.subscriptionsListener(queue)
|
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
queue.emit((kind: PubsubSub, topic: shard1))
|
queue.emit((kind: PubsubSub, topic: shard1))
|
||||||
queue.emit((kind: PubsubSub, topic: shard2))
|
queue.emit((kind: PubsubSub, topic: shard2))
|
||||||
@ -442,14 +448,13 @@ procSuite "Waku Discovery v5":
|
|||||||
|
|
||||||
queue.emit((kind: PubsubUnsub, topic: shard1))
|
queue.emit((kind: PubsubUnsub, topic: shard1))
|
||||||
queue.emit((kind: PubsubUnsub, topic: shard2))
|
queue.emit((kind: PubsubUnsub, topic: shard2))
|
||||||
queue.emit((kind: PubsubUnsub, topic: shard3))
|
|
||||||
|
|
||||||
await sleepAsync(1.seconds)
|
await sleepAsync(1.seconds)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
node.protocol.localNode.record.containsShard(shard1) == false
|
node.protocol.localNode.record.containsShard(shard1) == false
|
||||||
node.protocol.localNode.record.containsShard(shard2) == false
|
node.protocol.localNode.record.containsShard(shard2) == false
|
||||||
node.protocol.localNode.record.containsShard(shard3) == false
|
node.protocol.localNode.record.containsShard(shard3) == true
|
||||||
|
|
||||||
## Cleanup
|
## Cleanup
|
||||||
await node.stop()
|
await node.stop()
|
||||||
|
|||||||
@ -131,7 +131,8 @@ procSuite "Waku Peer Exchange":
|
|||||||
let disc1 = WakuDiscoveryV5.new(
|
let disc1 = WakuDiscoveryV5.new(
|
||||||
node1.rng,
|
node1.rng,
|
||||||
conf1,
|
conf1,
|
||||||
some(node1.enr)
|
some(node1.enr),
|
||||||
|
some(node1.peerManager),
|
||||||
)
|
)
|
||||||
|
|
||||||
let conf2 = WakuDiscoveryV5Config(
|
let conf2 = WakuDiscoveryV5Config(
|
||||||
@ -146,17 +147,15 @@ procSuite "Waku Peer Exchange":
|
|||||||
let disc2 = WakuDiscoveryV5.new(
|
let disc2 = WakuDiscoveryV5.new(
|
||||||
node2.rng,
|
node2.rng,
|
||||||
conf2,
|
conf2,
|
||||||
some(node2.enr)
|
some(node2.enr),
|
||||||
|
some(node2.peerManager),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
await allFutures(node1.start(), node2.start(), node3.start())
|
await allFutures(node1.start(), node2.start(), node3.start())
|
||||||
let resultDisc1StartRes = disc1.start()
|
let resultDisc1StartRes = await disc1.start()
|
||||||
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
|
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
|
||||||
let resultDisc2StartRes = disc2.start()
|
let resultDisc2StartRes = await disc2.start()
|
||||||
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
|
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
|
||||||
asyncSpawn disc1.searchLoop(node1.peerManager)
|
|
||||||
asyncSpawn disc2.searchLoop(node2.peerManager)
|
|
||||||
|
|
||||||
## When
|
## When
|
||||||
var attempts = 10
|
var attempts = 10
|
||||||
|
|||||||
@ -32,7 +32,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
|
|||||||
dnsAddrsNameServers: @[ValidIpAddress.init("1.1.1.1"), ValidIpAddress.init("1.0.0.1")],
|
dnsAddrsNameServers: @[ValidIpAddress.init("1.1.1.1"), ValidIpAddress.init("1.0.0.1")],
|
||||||
nat: "any",
|
nat: "any",
|
||||||
maxConnections: 50,
|
maxConnections: 50,
|
||||||
topics: @["/waku/2/default-waku/proto"],
|
topics: @[],
|
||||||
relay: true
|
relay: true
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -55,28 +55,27 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
|
|||||||
dns4DomainName = none(string),
|
dns4DomainName = none(string),
|
||||||
discv5UdpPort = none(Port),
|
discv5UdpPort = none(Port),
|
||||||
agentString = none(string),
|
agentString = none(string),
|
||||||
clusterId: uint32 = 0.uint32,
|
clusterId: uint32 = 2.uint32,
|
||||||
|
topics: seq[string] = @["/waku/2/rs/2/0"],
|
||||||
peerStoreCapacity = none(int)): WakuNode =
|
peerStoreCapacity = none(int)): WakuNode =
|
||||||
|
|
||||||
var resolvedExtIp = extIp
|
var resolvedExtIp = extIp
|
||||||
|
|
||||||
# Update extPort to default value if it's missing and there's an extIp or a DNS domain
|
# Update extPort to default value if it's missing and there's an extIp or a DNS domain
|
||||||
let extPort = if (extIp.isSome() or dns4DomainName.isSome()) and
|
let extPort =
|
||||||
extPort.isNone():
|
if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000))
|
||||||
some(Port(60000))
|
else: extPort
|
||||||
else:
|
|
||||||
extPort
|
let conf = defaultTestWakuNodeConf()
|
||||||
|
|
||||||
if dns4DomainName.isSome() and extIp.isNone():
|
if dns4DomainName.isSome() and extIp.isNone():
|
||||||
let conf = defaultTestWakuNodeConf()
|
|
||||||
# If there's an error resolving the IP, an exception is thrown and test fails
|
# If there's an error resolving the IP, an exception is thrown and test fails
|
||||||
let dnsRes = waitFor dnsResolve(dns4DomainName.get(), conf)
|
let dns = (waitFor dnsResolve(dns4DomainName.get(), conf)).valueOr:
|
||||||
if dnsRes.isErr():
|
raise newException(Defect, error)
|
||||||
raise newException(Defect, $dnsRes.error)
|
|
||||||
else:
|
|
||||||
resolvedExtIp = some(ValidIpAddress.init(dnsRes.get()))
|
|
||||||
|
|
||||||
let netConfigRes = NetConfig.init(
|
resolvedExtIp = some(ValidIpAddress.init(dns))
|
||||||
|
|
||||||
|
let netConf = NetConfig.init(
|
||||||
bindIp = bindIp,
|
bindIp = bindIp,
|
||||||
clusterId = clusterId,
|
clusterId = clusterId,
|
||||||
bindPort = bindPort,
|
bindPort = bindPort,
|
||||||
@ -89,36 +88,33 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
|
|||||||
wakuFlags = wakuFlags,
|
wakuFlags = wakuFlags,
|
||||||
dns4DomainName = dns4DomainName,
|
dns4DomainName = dns4DomainName,
|
||||||
discv5UdpPort = discv5UdpPort,
|
discv5UdpPort = discv5UdpPort,
|
||||||
)
|
).valueOr:
|
||||||
let netConf =
|
raise newException(Defect, "Invalid network configuration: " & error)
|
||||||
if netConfigRes.isErr():
|
|
||||||
raise newException(Defect, "Invalid network configuration: " & $netConfigRes.error)
|
|
||||||
else:
|
|
||||||
netConfigRes.get()
|
|
||||||
|
|
||||||
var enrBuilder = EnrBuilder.init(nodeKey)
|
var enrBuilder = EnrBuilder.init(nodeKey)
|
||||||
|
|
||||||
|
enrBuilder.withShardedTopics(topics).isOkOr:
|
||||||
|
raise newException(Defect, "Invalid record: " & error)
|
||||||
|
|
||||||
enrBuilder.withIpAddressAndPorts(
|
enrBuilder.withIpAddressAndPorts(
|
||||||
ipAddr = netConf.enrIp,
|
ipAddr = netConf.enrIp,
|
||||||
tcpPort = netConf.enrPort,
|
tcpPort = netConf.enrPort,
|
||||||
udpPort = netConf.discv5UdpPort,
|
udpPort = netConf.discv5UdpPort,
|
||||||
)
|
)
|
||||||
if netConf.wakuFlags.isSome():
|
|
||||||
enrBuilder.withWakuCapabilities(netConf.wakuFlags.get())
|
|
||||||
enrBuilder.withMultiaddrs(netConf.enrMultiaddrs)
|
enrBuilder.withMultiaddrs(netConf.enrMultiaddrs)
|
||||||
|
|
||||||
let recordRes = enrBuilder.build()
|
if netConf.wakuFlags.isSome():
|
||||||
let record =
|
enrBuilder.withWakuCapabilities(netConf.wakuFlags.get())
|
||||||
if recordRes.isErr():
|
|
||||||
raise newException(Defect, "Invalid record: " & $recordRes.error)
|
let record = enrBuilder.build().valueOr:
|
||||||
else:
|
raise newException(Defect, "Invalid record: " & $error)
|
||||||
recordRes.get()
|
|
||||||
|
|
||||||
var builder = WakuNodeBuilder.init()
|
var builder = WakuNodeBuilder.init()
|
||||||
builder.withRng(rng())
|
builder.withRng(rng())
|
||||||
builder.withNodeKey(nodeKey)
|
builder.withNodeKey(nodeKey)
|
||||||
builder.withRecord(record)
|
builder.withRecord(record)
|
||||||
builder.withNetworkConfiguration(netConfigRes.get())
|
builder.withNetworkConfiguration(netConf)
|
||||||
builder.withPeerStorage(peerStorage, capacity = peerStoreCapacity)
|
builder.withPeerStorage(peerStorage, capacity = peerStoreCapacity)
|
||||||
builder.withSwitchConfiguration(
|
builder.withSwitchConfiguration(
|
||||||
maxConnections = some(maxConnections),
|
maxConnections = some(maxConnections),
|
||||||
|
|||||||
@ -51,12 +51,24 @@ suite "Wakunode2 - App initialization":
|
|||||||
|
|
||||||
## When
|
## When
|
||||||
var wakunode2 = App.init(rng(), conf)
|
var wakunode2 = App.init(rng(), conf)
|
||||||
require wakunode2.setupPeerPersistence().isOk()
|
|
||||||
require wakunode2.setupDyamicBootstrapNodes().isOk()
|
let persRes = wakunode2.setupPeerPersistence()
|
||||||
require wakunode2.setupWakuApp().isOk()
|
assert persRes.isOk(), persRes.error
|
||||||
require isOk(waitFor wakunode2.setupAndMountProtocols())
|
|
||||||
require isOk(wakunode2.startApp())
|
let bootRes = wakunode2.setupDyamicBootstrapNodes()
|
||||||
require wakunode2.setupMonitoringAndExternalInterfaces().isOk()
|
assert bootRes.isOk(), bootRes.error
|
||||||
|
|
||||||
|
let setupRes = wakunode2.setupWakuApp()
|
||||||
|
assert setupRes.isOk(), setupRes.error
|
||||||
|
|
||||||
|
let mountRes = waitFor wakunode2.setupAndMountProtocols()
|
||||||
|
assert mountRes.isOk(), mountRes.error
|
||||||
|
|
||||||
|
let startRes = wakunode2.startApp()
|
||||||
|
assert startRes.isOk(), startRes.error
|
||||||
|
|
||||||
|
let monitorRes = wakunode2.setupMonitoringAndExternalInterfaces()
|
||||||
|
assert monitorRes.isOk(), monitorRes.error
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
let node = wakunode2.node
|
let node = wakunode2.node
|
||||||
|
|||||||
@ -155,12 +155,6 @@ proc new*(T: type WakuNode,
|
|||||||
topicSubscriptionQueue: queue
|
topicSubscriptionQueue: queue
|
||||||
)
|
)
|
||||||
|
|
||||||
# mount metadata protocol
|
|
||||||
let metadata = WakuMetadata.new(netConfig.clusterId, queue)
|
|
||||||
node.switch.mount(metadata, protocolMatcher(WakuMetadataCodec))
|
|
||||||
node.wakuMetadata = metadata
|
|
||||||
peerManager.wakuMetadata = metadata
|
|
||||||
|
|
||||||
return node
|
return node
|
||||||
|
|
||||||
proc peerInfo*(node: WakuNode): PeerInfo =
|
proc peerInfo*(node: WakuNode): PeerInfo =
|
||||||
@ -189,6 +183,22 @@ proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], s
|
|||||||
# NOTE Connects to the node without a give protocol, which automatically creates streams for relay
|
# NOTE Connects to the node without a give protocol, which automatically creates streams for relay
|
||||||
await peer_manager.connectToNodes(node.peerManager, nodes, source=source)
|
await peer_manager.connectToNodes(node.peerManager, nodes, source=source)
|
||||||
|
|
||||||
|
## Waku Metadata
|
||||||
|
|
||||||
|
proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] =
|
||||||
|
if not node.wakuMetadata.isNil():
|
||||||
|
return err("Waku metadata already mounted, skipping")
|
||||||
|
|
||||||
|
let metadata = WakuMetadata.new(clusterId, node.enr, node.topicSubscriptionQueue)
|
||||||
|
|
||||||
|
node.wakuMetadata = metadata
|
||||||
|
node.peerManager.wakuMetadata = metadata
|
||||||
|
|
||||||
|
let catchRes = catch: node.switch.mount(node.wakuMetadata, protocolMatcher(WakuMetadataCodec))
|
||||||
|
if catchRes.isErr():
|
||||||
|
return err(catchRes.error.msg)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
## Waku relay
|
## Waku relay
|
||||||
|
|
||||||
@ -1124,6 +1134,9 @@ proc start*(node: WakuNode) {.async.} =
|
|||||||
if not node.wakuRelay.isNil():
|
if not node.wakuRelay.isNil():
|
||||||
await node.startRelay()
|
await node.startRelay()
|
||||||
|
|
||||||
|
if not node.wakuMetadata.isNil():
|
||||||
|
node.wakuMetadata.start()
|
||||||
|
|
||||||
## The switch uses this mapper to update peer info addrs
|
## The switch uses this mapper to update peer info addrs
|
||||||
## with announced addrs after start
|
## with announced addrs after start
|
||||||
let addressMapper =
|
let addressMapper =
|
||||||
@ -1136,8 +1149,6 @@ proc start*(node: WakuNode) {.async.} =
|
|||||||
|
|
||||||
node.started = true
|
node.started = true
|
||||||
|
|
||||||
node.wakuMetadata.start()
|
|
||||||
|
|
||||||
if not zeroPortPresent:
|
if not zeroPortPresent:
|
||||||
printNodeNetworkInfo(node)
|
printNodeNetworkInfo(node)
|
||||||
else:
|
else:
|
||||||
@ -1149,6 +1160,9 @@ proc stop*(node: WakuNode) {.async.} =
|
|||||||
if not node.wakuRelay.isNil():
|
if not node.wakuRelay.isNil():
|
||||||
await node.wakuRelay.stop()
|
await node.wakuRelay.stop()
|
||||||
|
|
||||||
|
if not node.wakuMetadata.isNil():
|
||||||
|
node.wakuMetadata.stop()
|
||||||
|
|
||||||
await node.switch.stop()
|
await node.switch.stop()
|
||||||
node.peerManager.stop()
|
node.peerManager.stop()
|
||||||
|
|
||||||
|
|||||||
@ -49,6 +49,8 @@ type WakuDiscoveryV5* = ref object
|
|||||||
protocol*: protocol.Protocol
|
protocol*: protocol.Protocol
|
||||||
listening*: bool
|
listening*: bool
|
||||||
predicate: Option[WakuDiscv5Predicate]
|
predicate: Option[WakuDiscv5Predicate]
|
||||||
|
peerManager: Option[PeerManager]
|
||||||
|
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
|
||||||
|
|
||||||
proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
|
proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
|
||||||
## Filter peers based on relay sharding information
|
## Filter peers based on relay sharding information
|
||||||
@ -72,7 +74,9 @@ proc new*(
|
|||||||
T: type WakuDiscoveryV5,
|
T: type WakuDiscoveryV5,
|
||||||
rng: ref HmacDrbgContext,
|
rng: ref HmacDrbgContext,
|
||||||
conf: WakuDiscoveryV5Config,
|
conf: WakuDiscoveryV5Config,
|
||||||
record: Option[waku_enr.Record]
|
record: Option[waku_enr.Record],
|
||||||
|
peerManager: Option[PeerManager] = none(PeerManager),
|
||||||
|
queue: AsyncEventQueue[SubscriptionEvent] = newAsyncEventQueue[SubscriptionEvent](30),
|
||||||
): T =
|
): T =
|
||||||
let shardPredOp =
|
let shardPredOp =
|
||||||
if record.isSome(): shardingPredicate(record.get())
|
if record.isSome(): shardingPredicate(record.get())
|
||||||
@ -101,82 +105,26 @@ proc new*(
|
|||||||
enrUdpPort = none(Port),
|
enrUdpPort = none(Port),
|
||||||
)
|
)
|
||||||
|
|
||||||
WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false, predicate: shardPredOp)
|
WakuDiscoveryV5(
|
||||||
|
conf: conf,
|
||||||
proc new*(T: type WakuDiscoveryV5,
|
protocol: protocol,
|
||||||
extIp: Option[ValidIpAddress],
|
listening: false,
|
||||||
extTcpPort: Option[Port],
|
predicate: shardPredOp,
|
||||||
extUdpPort: Option[Port],
|
peerManager: peerManager,
|
||||||
bindIP: ValidIpAddress,
|
topicSubscriptionQueue: queue,
|
||||||
discv5UdpPort: Port,
|
|
||||||
bootstrapEnrs = newSeq[enr.Record](),
|
|
||||||
enrAutoUpdate = false,
|
|
||||||
privateKey: eth_keys.PrivateKey,
|
|
||||||
flags: CapabilitiesBitfield,
|
|
||||||
multiaddrs = newSeq[MultiAddress](),
|
|
||||||
rng: ref HmacDrbgContext,
|
|
||||||
topics: seq[string],
|
|
||||||
discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig
|
|
||||||
): T {.
|
|
||||||
deprecated: "use the config and record proc variant instead".}=
|
|
||||||
|
|
||||||
let relayShardsRes = topicsToRelayShards(topics)
|
|
||||||
|
|
||||||
let relayShard =
|
|
||||||
if relayShardsRes.isErr():
|
|
||||||
debug "pubsub topic parsing error", reason = relayShardsRes.error
|
|
||||||
none(RelayShards)
|
|
||||||
else: relayShardsRes.get()
|
|
||||||
|
|
||||||
let record = block:
|
|
||||||
var builder = EnrBuilder.init(privateKey)
|
|
||||||
builder.withIpAddressAndPorts(
|
|
||||||
ipAddr = extIp,
|
|
||||||
tcpPort = extTcpPort,
|
|
||||||
udpPort = extUdpPort,
|
|
||||||
)
|
)
|
||||||
builder.withWakuCapabilities(flags)
|
|
||||||
builder.withMultiaddrs(multiaddrs)
|
|
||||||
|
|
||||||
if relayShard.isSome():
|
|
||||||
let res = builder.withWakuRelaySharding(relayShard.get())
|
|
||||||
|
|
||||||
if res.isErr():
|
|
||||||
debug "building ENR with relay sharding failed", reason = res.error
|
|
||||||
else:
|
|
||||||
debug "building ENR with relay sharding information", clusterId = $relayShard.get().clusterId(), shards = $relayShard.get().shardIds()
|
|
||||||
|
|
||||||
builder.build().expect("Record within size limits")
|
|
||||||
|
|
||||||
let conf = WakuDiscoveryV5Config(
|
|
||||||
discv5Config: some(discv5Config),
|
|
||||||
address: bindIP,
|
|
||||||
port: discv5UdpPort,
|
|
||||||
privateKey: privateKey,
|
|
||||||
bootstrapRecords: bootstrapEnrs,
|
|
||||||
autoupdateRecord: enrAutoUpdate,
|
|
||||||
)
|
|
||||||
|
|
||||||
WakuDiscoveryV5.new(rng, conf, some(record))
|
|
||||||
|
|
||||||
proc updateENRShards(wd: WakuDiscoveryV5,
|
proc updateENRShards(wd: WakuDiscoveryV5,
|
||||||
newTopics: seq[PubsubTopic], add: bool): Result[void, string] =
|
newTopics: seq[PubsubTopic], add: bool): Result[void, string] =
|
||||||
## Add or remove shards from the Discv5 ENR
|
## Add or remove shards from the Discv5 ENR
|
||||||
|
let newShardOp = topicsToRelayShards(newTopics).valueOr:
|
||||||
|
return err("ENR update failed: " & error)
|
||||||
|
|
||||||
let newShardOp = ?topicsToRelayShards(newTopics)
|
let newShard = newShardOp.valueOr:
|
||||||
|
|
||||||
let newShard =
|
|
||||||
if newShardOp.isSome():
|
|
||||||
newShardOp.get()
|
|
||||||
else:
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
let typedRecordRes = wd.protocol.localNode.record.toTyped()
|
let typedRecord = wd.protocol.localNode.record.toTyped().valueOr:
|
||||||
let typedRecord =
|
return err("ENR update failed: " & $error)
|
||||||
if typedRecordRes.isErr():
|
|
||||||
return err($typedRecordRes.error)
|
|
||||||
else:
|
|
||||||
typedRecordRes.get()
|
|
||||||
|
|
||||||
let currentShardsOp = typedRecord.relaySharding()
|
let currentShardsOp = typedRecord.relaySharding()
|
||||||
|
|
||||||
@ -185,14 +133,16 @@ proc updateENRShards(wd: WakuDiscoveryV5,
|
|||||||
let currentShard = currentShardsOp.get()
|
let currentShard = currentShardsOp.get()
|
||||||
|
|
||||||
if currentShard.clusterId != newShard.clusterId:
|
if currentShard.clusterId != newShard.clusterId:
|
||||||
return err("ENR are limited to one clusterId id")
|
return err("ENR update failed: clusterId id mismatch")
|
||||||
|
|
||||||
|
RelayShards.init(currentShard.clusterId, currentShard.shardIds & newShard.shardIds).valueOr:
|
||||||
|
return err("ENR update failed: " & error)
|
||||||
|
|
||||||
?RelayShards.init(currentShard.clusterId, currentShard.shardIds & newShard.shardIds)
|
|
||||||
elif not add and currentShardsOp.isSome():
|
elif not add and currentShardsOp.isSome():
|
||||||
let currentShard = currentShardsOp.get()
|
let currentShard = currentShardsOp.get()
|
||||||
|
|
||||||
if currentShard.clusterId != newShard.clusterId:
|
if currentShard.clusterId != newShard.clusterId:
|
||||||
return err("ENR are limited to one clusterId id")
|
return err("ENR update failed: clusterId id mismatch")
|
||||||
|
|
||||||
let currentSet = toHashSet(currentShard.shardIds)
|
let currentSet = toHashSet(currentShard.shardIds)
|
||||||
let newSet = toHashSet(newShard.shardIds)
|
let newSet = toHashSet(newShard.shardIds)
|
||||||
@ -200,16 +150,11 @@ proc updateENRShards(wd: WakuDiscoveryV5,
|
|||||||
let indices = toSeq(currentSet - newSet)
|
let indices = toSeq(currentSet - newSet)
|
||||||
|
|
||||||
if indices.len == 0:
|
if indices.len == 0:
|
||||||
# Can't create RelayShard with no indices so update then return
|
return err("ENR update failed: cannot remove all shards")
|
||||||
let (field, value) = (ShardingIndicesListEnrField, newSeq[byte](3))
|
|
||||||
|
|
||||||
let res = wd.protocol.updateRecord([(field, value)])
|
RelayShards.init(currentShard.clusterId, indices).valueOr:
|
||||||
if res.isErr():
|
return err("ENR update failed: " & error)
|
||||||
return err($res.error)
|
|
||||||
|
|
||||||
return ok()
|
|
||||||
|
|
||||||
?RelayShards.init(currentShard.clusterId, indices)
|
|
||||||
elif add and currentShardsOp.isNone(): newShard
|
elif add and currentShardsOp.isNone(): newShard
|
||||||
else: return ok()
|
else: return ok()
|
||||||
|
|
||||||
@ -217,18 +162,13 @@ proc updateENRShards(wd: WakuDiscoveryV5,
|
|||||||
if resultShard.shardIds.len >= ShardingIndicesListMaxLength:
|
if resultShard.shardIds.len >= ShardingIndicesListMaxLength:
|
||||||
(ShardingBitVectorEnrField, resultShard.toBitVector())
|
(ShardingBitVectorEnrField, resultShard.toBitVector())
|
||||||
else:
|
else:
|
||||||
let listRes = resultShard.toIndicesList()
|
let list = resultShard.toIndicesList().valueOr:
|
||||||
let list =
|
return err("ENR update failed: " & $error)
|
||||||
if listRes.isErr():
|
|
||||||
return err($listRes.error)
|
|
||||||
else:
|
|
||||||
listRes.get()
|
|
||||||
|
|
||||||
(ShardingIndicesListEnrField, list)
|
(ShardingIndicesListEnrField, list)
|
||||||
|
|
||||||
let res = wd.protocol.updateRecord([(field, value)])
|
wd.protocol.updateRecord([(field, value)]).isOkOr:
|
||||||
if res.isErr():
|
return err("ENR update failed: " & $error)
|
||||||
return err($res.error)
|
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
@ -246,10 +186,12 @@ proc findRandomPeers*(wd: WakuDiscoveryV5, overridePred = none(WakuDiscv5Predica
|
|||||||
|
|
||||||
return discoveredRecords
|
return discoveredRecords
|
||||||
|
|
||||||
#TODO abstract away PeerManager
|
proc searchLoop(wd: WakuDiscoveryV5) {.async.} =
|
||||||
proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager) {.async.} =
|
|
||||||
## Continuously add newly discovered nodes
|
## Continuously add newly discovered nodes
|
||||||
|
|
||||||
|
let peerManager = wd.peerManager.valueOr:
|
||||||
|
return
|
||||||
|
|
||||||
info "Starting discovery v5 search"
|
info "Starting discovery v5 search"
|
||||||
|
|
||||||
while wd.listening:
|
while wd.listening:
|
||||||
@ -267,50 +209,13 @@ proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager) {.async.} =
|
|||||||
# Also, give some time to dial the discovered nodes and update stats, etc.
|
# Also, give some time to dial the discovered nodes and update stats, etc.
|
||||||
await sleepAsync(5.seconds)
|
await sleepAsync(5.seconds)
|
||||||
|
|
||||||
proc start*(wd: WakuDiscoveryV5): Result[void, string] =
|
proc subscriptionsListener(wd: WakuDiscoveryV5) {.async.} =
|
||||||
if wd.listening:
|
|
||||||
return err("already listening")
|
|
||||||
|
|
||||||
info "Starting discovery v5 service"
|
|
||||||
|
|
||||||
debug "start listening on udp port", address = $wd.conf.address, port = $wd.conf.port
|
|
||||||
try:
|
|
||||||
wd.protocol.open()
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to open udp port: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
wd.listening = true
|
|
||||||
|
|
||||||
trace "start discv5 service"
|
|
||||||
wd.protocol.start()
|
|
||||||
|
|
||||||
debug "Successfully started discovery v5 service"
|
|
||||||
info "Discv5: discoverable ENR ", enr = wd.protocol.localNode.record.toUri()
|
|
||||||
|
|
||||||
ok()
|
|
||||||
|
|
||||||
proc stop*(wd: WakuDiscoveryV5): Future[void] {.async.} =
|
|
||||||
if not wd.listening:
|
|
||||||
return
|
|
||||||
|
|
||||||
info "Stopping discovery v5 service"
|
|
||||||
|
|
||||||
wd.listening = false
|
|
||||||
trace "Stop listening on discv5 port"
|
|
||||||
await wd.protocol.closeWait()
|
|
||||||
|
|
||||||
debug "Successfully stopped discovery v5 service"
|
|
||||||
|
|
||||||
proc subscriptionsListener*(
|
|
||||||
wd: WakuDiscoveryV5,
|
|
||||||
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
|
|
||||||
) {.async.} =
|
|
||||||
## Listen for pubsub topics subscriptions changes
|
## Listen for pubsub topics subscriptions changes
|
||||||
|
|
||||||
let key = topicSubscriptionQueue.register()
|
let key = wd.topicSubscriptionQueue.register()
|
||||||
|
|
||||||
while wd.listening:
|
while wd.listening:
|
||||||
let events = await topicSubscriptionQueue.waitEvents(key)
|
let events = await wd.topicSubscriptionQueue.waitEvents(key)
|
||||||
|
|
||||||
# Since we don't know the events we will receive we have to anticipate.
|
# Since we don't know the events we will receive we have to anticipate.
|
||||||
|
|
||||||
@ -336,7 +241,44 @@ proc subscriptionsListener*(
|
|||||||
|
|
||||||
wd.predicate = shardingPredicate(wd.protocol.localNode.record)
|
wd.predicate = shardingPredicate(wd.protocol.localNode.record)
|
||||||
|
|
||||||
topicSubscriptionQueue.unregister(key)
|
wd.topicSubscriptionQueue.unregister(key)
|
||||||
|
|
||||||
|
proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async.} =
|
||||||
|
if wd.listening:
|
||||||
|
return err("already listening")
|
||||||
|
|
||||||
|
info "Starting discovery v5 service"
|
||||||
|
|
||||||
|
debug "start listening on udp port", address = $wd.conf.address, port = $wd.conf.port
|
||||||
|
try:
|
||||||
|
wd.protocol.open()
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to open udp port: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
wd.listening = true
|
||||||
|
|
||||||
|
trace "start discv5 service"
|
||||||
|
wd.protocol.start()
|
||||||
|
|
||||||
|
asyncSpawn wd.searchLoop()
|
||||||
|
asyncSpawn wd.subscriptionsListener()
|
||||||
|
|
||||||
|
debug "Successfully started discovery v5 service"
|
||||||
|
info "Discv5: discoverable ENR ", enr = wd.protocol.localNode.record.toUri()
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
proc stop*(wd: WakuDiscoveryV5): Future[void] {.async.} =
|
||||||
|
if not wd.listening:
|
||||||
|
return
|
||||||
|
|
||||||
|
info "Stopping discovery v5 service"
|
||||||
|
|
||||||
|
wd.listening = false
|
||||||
|
trace "Stop listening on discv5 port"
|
||||||
|
await wd.protocol.closeWait()
|
||||||
|
|
||||||
|
debug "Successfully stopped discovery v5 service"
|
||||||
|
|
||||||
## Helper functions
|
## Helper functions
|
||||||
|
|
||||||
|
|||||||
@ -4,7 +4,7 @@ else:
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sequtils, random, sets],
|
std/[options, sequtils, sets],
|
||||||
stew/results,
|
stew/results,
|
||||||
chronicles,
|
chronicles,
|
||||||
chronos,
|
chronos,
|
||||||
@ -15,7 +15,9 @@ import
|
|||||||
eth/p2p/discoveryv5/enr
|
eth/p2p/discoveryv5/enr
|
||||||
import
|
import
|
||||||
../common/nimchronos,
|
../common/nimchronos,
|
||||||
|
../common/enr,
|
||||||
../waku_core,
|
../waku_core,
|
||||||
|
../waku_enr,
|
||||||
./rpc
|
./rpc
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
@ -93,13 +95,24 @@ proc initProtocolHandler*(m: WakuMetadata) =
|
|||||||
|
|
||||||
proc new*(T: type WakuMetadata,
|
proc new*(T: type WakuMetadata,
|
||||||
clusterId: uint32,
|
clusterId: uint32,
|
||||||
|
enr: Record,
|
||||||
queue: AsyncEventQueue[SubscriptionEvent],
|
queue: AsyncEventQueue[SubscriptionEvent],
|
||||||
): T =
|
): T =
|
||||||
let wm = WakuMetadata(clusterId: clusterId, topicSubscriptionQueue: queue)
|
var (cluster, shards) = (clusterId, initHashSet[uint32]())
|
||||||
|
|
||||||
|
let enrRes = enr.toTyped()
|
||||||
|
if enrRes.isOk():
|
||||||
|
let shardingRes = enrRes.get().relaySharding()
|
||||||
|
if shardingRes.isSome():
|
||||||
|
let relayShard = shardingRes.get()
|
||||||
|
cluster = uint32(relayShard.clusterId)
|
||||||
|
shards = toHashSet(relayShard.shardIds.mapIt(uint32(it)))
|
||||||
|
|
||||||
|
let wm = WakuMetadata(clusterId: cluster, shards: shards, topicSubscriptionQueue: queue)
|
||||||
|
|
||||||
wm.initProtocolHandler()
|
wm.initProtocolHandler()
|
||||||
|
|
||||||
info "Created WakuMetadata protocol", clusterId=clusterId
|
info "Created WakuMetadata protocol", clusterId=cluster
|
||||||
|
|
||||||
return wm
|
return wm
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user