chore: mics. improvements to cluster id and shards setup (#2187)

This commit is contained in:
Simon-Pierre Vivier 2023-11-21 15:15:39 -05:00 committed by GitHub
parent 51f36099d5
commit 897f487978
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 302 additions and 264 deletions

View File

@ -127,6 +127,21 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
quit(QuitFailure)
else: recordRes.get()
# Check the ENR sharding info for matching config cluster id
if conf.clusterId != 0:
let res = record.toTyped()
if res.isErr():
error "ENR setup failed", error = $res.get()
quit(QuitFailure)
let relayShard = res.get().relaySharding().valueOr:
error "no sharding info"
quit(QuitFailure)
if conf.clusterId != relayShard.clusterId:
error "cluster id mismatch"
quit(QuitFailure)
App(
version: git_version,
conf: conf,
@ -234,7 +249,13 @@ proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
autoupdateRecord: app.conf.discv5EnrAutoUpdate,
)
WakuDiscoveryV5.new(app.rng, discv5Conf, some(app.record))
WakuDiscoveryV5.new(
app.rng,
discv5Conf,
some(app.record),
some(app.node.peerManager),
app.node.topicSubscriptionQueue,
)
## Init waku node instance
@ -286,11 +307,6 @@ proc initNode(conf: WakuNodeConf,
ok(node)
proc setupWakuApp*(app: var App): AppResult[void] =
## Discv5
if app.conf.discv5Discovery:
app.wakuDiscV5 = some(app.setupDiscoveryV5())
## Waku node
let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes)
if initNodeRes.isErr():
@ -298,6 +314,10 @@ proc setupWakuApp*(app: var App): AppResult[void] =
app.node = initNodeRes.get()
## Discv5
if app.conf.discv5Discovery:
app.wakuDiscV5 = some(app.setupDiscoveryV5())
ok()
proc getPorts(listenAddrs: seq[MultiAddress]):
@ -341,7 +361,17 @@ proc updateNetConfig(app: var App): AppResult[void] =
proc updateEnr(app: var App): AppResult[void] =
let record = enrConfiguration(app.conf, app.netConf, app.key).valueOr:
return err(error)
return err("ENR setup failed: " & error)
if app.conf.clusterId != 0:
let tRecord = record.toTyped().valueOr:
return err("ENR setup failed: " & $error)
let relayShard = tRecord.relaySharding().valueOr:
return err("ENR setup failed: no sharding info")
if app.conf.clusterId != relayShard.clusterId:
return err("ENR setup failed: cluster id mismatch")
app.record = record
app.node.enr = record
@ -377,6 +407,9 @@ proc setupProtocols(node: WakuNode,
## Optionally include persistent message storage.
## No protocols are started yet.
node.mountMetadata(conf.clusterId).isOkOr:
return err("failed to mount waku metadata protocol: " & error)
# Mount relay on all nodes
var peerExchangeHandler = none(RoutingRecordsHandler)
if conf.relayPeerExchange:
@ -587,11 +620,12 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
proc startApp*(app: var App): AppResult[void] =
try:
(waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes)).isOkOr:
return err(error)
except CatchableError:
return err("exception starting node: " & getCurrentExceptionMsg())
let nodeRes = catch: (waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes))
if nodeRes.isErr():
return err("exception starting node: " & nodeRes.error.msg)
nodeRes.get().isOkOr:
return err("exception starting node: " & error)
# Update app data that is set dynamically on node start
app.updateApp().isOkOr:
@ -599,13 +633,12 @@ proc startApp*(app: var App): AppResult[void] =
if app.wakuDiscv5.isSome():
let wakuDiscv5 = app.wakuDiscv5.get()
let catchRes = catch: (waitFor wakuDiscv5.start())
let startRes = catchRes.valueOr:
return err("failed to start waku discovery v5: " & catchRes.error.msg)
let res = wakuDiscv5.start()
if res.isErr():
return err("failed to start waku discovery v5: " & $res.error)
asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager)
asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue)
startRes.isOkOr:
return err("failed to start waku discovery v5: " & error)
return ok()

View File

@ -51,31 +51,32 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
var bootstrapNodeEnr: enr.Record
discard bootstrapNodeEnr.fromURI(bootstrapNode)
let discv5Conf = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: ip,
port: Port(discv5Port),
privateKey: keys.PrivateKey(nodeKey.skkey),
bootstrapRecords: @[bootstrapNodeEnr],
autoupdateRecord: true,
)
# assumes behind a firewall, so not care about being discoverable
let wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
extTcpPort = none(Port),
extUdpPort = none(Port),
bindIP = ip,
discv5UdpPort = Port(discv5Port),
bootstrapEnrs = @[bootstrapNodeEnr],
privateKey = keys.PrivateKey(nodeKey.skkey),
flags = flags,
rng = node.rng,
topics = @[],
)
node.rng,
discv5Conf,
some(node.enr),
some(node.peerManager),
node.topicSubscriptionQueue,
)
await node.start()
await node.mountRelay()
node.peerManager.start()
let discv5Res = wakuDiscv5.start()
if discv5Res.isErr():
error "failed to start discv5", error= discv5Res.error
(await wakuDiscv5.start()).isOkOr:
error "failed to start discv5", error = error
quit(1)
asyncSpawn wakuDiscv5.searchLoop(node.peerManager)
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)

View File

@ -46,31 +46,32 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
var bootstrapNodeEnr: enr.Record
discard bootstrapNodeEnr.fromURI(bootstrapNode)
let discv5Conf = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: ip,
port: Port(discv5Port),
privateKey: keys.PrivateKey(nodeKey.skkey),
bootstrapRecords: @[bootstrapNodeEnr],
autoupdateRecord: true,
)
# assumes behind a firewall, so not care about being discoverable
let wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
extTcpPort = none(Port),
extUdpPort = none(Port),
bindIP = ip,
discv5UdpPort = Port(discv5Port),
bootstrapEnrs = @[bootstrapNodeEnr],
privateKey = keys.PrivateKey(nodeKey.skkey),
flags = flags,
rng = node.rng,
topics = @[],
)
node.rng,
discv5Conf,
some(node.enr),
some(node.peerManager),
node.topicSubscriptionQueue,
)
await node.start()
await node.mountRelay()
node.peerManager.start()
let discv5Res = wakuDiscv5.start()
if discv5Res.isErr():
error "failed to start discv5", error = discv5Res.error
(await wakuDiscv5.start()).isOkOr:
error "failed to start discv5", error = error
quit(1)
asyncSpawn wakuDiscv5.searchLoop(node.peerManager)
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)

View File

@ -268,16 +268,38 @@ procSuite "Peer Manager":
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Peer manager drops conections to peers on different networks":
let clusterId1 = 1.uint32
let clusterId2 = 2.uint32
let clusterId3 = 3.uint32
let clusterId4 = 4.uint32
let
# different network
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId1)
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId3,
topics = @["/waku/2/rs/3/0"],
)
# same network
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)
node2 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId4,
topics = @["/waku/2/rs/4/0"],
)
node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId4,
topics = @["/waku/2/rs/4/0"],
)
discard node1.mountMetadata(clusterId3)
discard node2.mountMetadata(clusterId4)
discard node3.mountMetadata(clusterId4)
# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])

View File

@ -34,10 +34,13 @@ proc newTestEnrRecord(privKey: libp2p_keys.PrivateKey,
builder.build().tryGet()
proc newTestDiscv5(privKey: libp2p_keys.PrivateKey,
bindIp: string, tcpPort: uint16, udpPort: uint16,
record: waku_enr.Record,
bootstrapRecords = newSeq[waku_enr.Record]()): WakuDiscoveryV5 =
proc newTestDiscv5(
privKey: libp2p_keys.PrivateKey,
bindIp: string, tcpPort: uint16, udpPort: uint16,
record: waku_enr.Record,
bootstrapRecords = newSeq[waku_enr.Record](),
queue = newAsyncEventQueue[SubscriptionEvent](30),
): WakuDiscoveryV5 =
let config = WakuDiscoveryV5Config(
privateKey: eth_keys.PrivateKey(privKey.skkey),
address: ValidIpAddress.init(bindIp),
@ -45,7 +48,12 @@ proc newTestDiscv5(privKey: libp2p_keys.PrivateKey,
bootstrapRecords: bootstrapRecords,
)
let discv5 = WakuDiscoveryV5.new(rng(), config, some(record))
let discv5 = WakuDiscoveryV5.new(
rng = rng(),
conf = config,
record = some(record),
queue = queue,
)
return discv5
@ -122,13 +130,13 @@ procSuite "Waku Discovery v5":
bootstrapRecords = @[record1, record2]
)
let res1 = node1.start()
let res1 = await node1.start()
assert res1.isOk(), res1.error
let res2 = node2.start()
let res2 = await node2.start()
assert res2.isOk(), res2.error
let res3 = node3.start()
let res3 = await node3.start()
assert res3.isOk(), res3.error
## When
@ -240,16 +248,16 @@ procSuite "Waku Discovery v5":
)
# Start nodes' discoveryV5 protocols
let res1 = node1.start()
let res1 = await node1.start()
assert res1.isOk(), res1.error
let res2 = node2.start()
let res2 = await node2.start()
assert res2.isOk(), res2.error
let res3 = node3.start()
let res3 = await node3.start()
assert res3.isOk(), res3.error
let res4 = node4.start()
let res4 = await node4.start()
assert res4.isOk(), res4.error
## Given
@ -401,22 +409,20 @@ procSuite "Waku Discovery v5":
udpPort = udpPort,
)
let queue = newAsyncEventQueue[SubscriptionEvent](30)
let node = newTestDiscv5(
privKey = privKey,
bindIp = bindIp,
tcpPort = tcpPort,
udpPort = udpPort,
record = record
record = record,
queue = queue,
)
let res = node.start()
let res = await node.start()
assert res.isOk(), res.error
let queue = newAsyncEventQueue[SubscriptionEvent](0)
## When
asyncSpawn node.subscriptionsListener(queue)
## Then
queue.emit((kind: PubsubSub, topic: shard1))
queue.emit((kind: PubsubSub, topic: shard2))
@ -442,14 +448,13 @@ procSuite "Waku Discovery v5":
queue.emit((kind: PubsubUnsub, topic: shard1))
queue.emit((kind: PubsubUnsub, topic: shard2))
queue.emit((kind: PubsubUnsub, topic: shard3))
await sleepAsync(1.seconds)
check:
node.protocol.localNode.record.containsShard(shard1) == false
node.protocol.localNode.record.containsShard(shard2) == false
node.protocol.localNode.record.containsShard(shard3) == false
node.protocol.localNode.record.containsShard(shard3) == true
## Cleanup
await node.stop()

View File

@ -131,7 +131,8 @@ procSuite "Waku Peer Exchange":
let disc1 = WakuDiscoveryV5.new(
node1.rng,
conf1,
some(node1.enr)
some(node1.enr),
some(node1.peerManager),
)
let conf2 = WakuDiscoveryV5Config(
@ -146,17 +147,15 @@ procSuite "Waku Peer Exchange":
let disc2 = WakuDiscoveryV5.new(
node2.rng,
conf2,
some(node2.enr)
some(node2.enr),
some(node2.peerManager),
)
await allFutures(node1.start(), node2.start(), node3.start())
let resultDisc1StartRes = disc1.start()
let resultDisc1StartRes = await disc1.start()
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
let resultDisc2StartRes = disc2.start()
let resultDisc2StartRes = await disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
asyncSpawn disc1.searchLoop(node1.peerManager)
asyncSpawn disc2.searchLoop(node2.peerManager)
## When
var attempts = 10

View File

@ -32,7 +32,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
dnsAddrsNameServers: @[ValidIpAddress.init("1.1.1.1"), ValidIpAddress.init("1.0.0.1")],
nat: "any",
maxConnections: 50,
topics: @["/waku/2/default-waku/proto"],
topics: @[],
relay: true
)
@ -55,28 +55,27 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
agentString = none(string),
clusterId: uint32 = 0.uint32,
clusterId: uint32 = 2.uint32,
topics: seq[string] = @["/waku/2/rs/2/0"],
peerStoreCapacity = none(int)): WakuNode =
var resolvedExtIp = extIp
# Update extPort to default value if it's missing and there's an extIp or a DNS domain
let extPort = if (extIp.isSome() or dns4DomainName.isSome()) and
extPort.isNone():
some(Port(60000))
else:
extPort
let extPort =
if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000))
else: extPort
let conf = defaultTestWakuNodeConf()
if dns4DomainName.isSome() and extIp.isNone():
let conf = defaultTestWakuNodeConf()
# If there's an error resolving the IP, an exception is thrown and test fails
let dnsRes = waitFor dnsResolve(dns4DomainName.get(), conf)
if dnsRes.isErr():
raise newException(Defect, $dnsRes.error)
else:
resolvedExtIp = some(ValidIpAddress.init(dnsRes.get()))
let dns = (waitFor dnsResolve(dns4DomainName.get(), conf)).valueOr:
raise newException(Defect, error)
resolvedExtIp = some(ValidIpAddress.init(dns))
let netConfigRes = NetConfig.init(
let netConf = NetConfig.init(
bindIp = bindIp,
clusterId = clusterId,
bindPort = bindPort,
@ -89,36 +88,33 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
wakuFlags = wakuFlags,
dns4DomainName = dns4DomainName,
discv5UdpPort = discv5UdpPort,
)
let netConf =
if netConfigRes.isErr():
raise newException(Defect, "Invalid network configuration: " & $netConfigRes.error)
else:
netConfigRes.get()
).valueOr:
raise newException(Defect, "Invalid network configuration: " & error)
var enrBuilder = EnrBuilder.init(nodeKey)
enrBuilder.withShardedTopics(topics).isOkOr:
raise newException(Defect, "Invalid record: " & error)
enrBuilder.withIpAddressAndPorts(
ipAddr = netConf.enrIp,
tcpPort = netConf.enrPort,
udpPort = netConf.discv5UdpPort,
)
if netConf.wakuFlags.isSome():
enrBuilder.withWakuCapabilities(netConf.wakuFlags.get())
enrBuilder.withMultiaddrs(netConf.enrMultiaddrs)
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
raise newException(Defect, "Invalid record: " & $recordRes.error)
else:
recordRes.get()
if netConf.wakuFlags.isSome():
enrBuilder.withWakuCapabilities(netConf.wakuFlags.get())
let record = enrBuilder.build().valueOr:
raise newException(Defect, "Invalid record: " & $error)
var builder = WakuNodeBuilder.init()
builder.withRng(rng())
builder.withNodeKey(nodeKey)
builder.withRecord(record)
builder.withNetworkConfiguration(netConfigRes.get())
builder.withNetworkConfiguration(netConf)
builder.withPeerStorage(peerStorage, capacity = peerStoreCapacity)
builder.withSwitchConfiguration(
maxConnections = some(maxConnections),

View File

@ -51,12 +51,24 @@ suite "Wakunode2 - App initialization":
## When
var wakunode2 = App.init(rng(), conf)
require wakunode2.setupPeerPersistence().isOk()
require wakunode2.setupDyamicBootstrapNodes().isOk()
require wakunode2.setupWakuApp().isOk()
require isOk(waitFor wakunode2.setupAndMountProtocols())
require isOk(wakunode2.startApp())
require wakunode2.setupMonitoringAndExternalInterfaces().isOk()
let persRes = wakunode2.setupPeerPersistence()
assert persRes.isOk(), persRes.error
let bootRes = wakunode2.setupDyamicBootstrapNodes()
assert bootRes.isOk(), bootRes.error
let setupRes = wakunode2.setupWakuApp()
assert setupRes.isOk(), setupRes.error
let mountRes = waitFor wakunode2.setupAndMountProtocols()
assert mountRes.isOk(), mountRes.error
let startRes = wakunode2.startApp()
assert startRes.isOk(), startRes.error
let monitorRes = wakunode2.setupMonitoringAndExternalInterfaces()
assert monitorRes.isOk(), monitorRes.error
## Then
let node = wakunode2.node

View File

@ -155,12 +155,6 @@ proc new*(T: type WakuNode,
topicSubscriptionQueue: queue
)
# mount metadata protocol
let metadata = WakuMetadata.new(netConfig.clusterId, queue)
node.switch.mount(metadata, protocolMatcher(WakuMetadataCodec))
node.wakuMetadata = metadata
peerManager.wakuMetadata = metadata
return node
proc peerInfo*(node: WakuNode): PeerInfo =
@ -189,6 +183,22 @@ proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], s
# NOTE Connects to the node without a give protocol, which automatically creates streams for relay
await peer_manager.connectToNodes(node.peerManager, nodes, source=source)
## Waku Metadata
proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] =
if not node.wakuMetadata.isNil():
return err("Waku metadata already mounted, skipping")
let metadata = WakuMetadata.new(clusterId, node.enr, node.topicSubscriptionQueue)
node.wakuMetadata = metadata
node.peerManager.wakuMetadata = metadata
let catchRes = catch: node.switch.mount(node.wakuMetadata, protocolMatcher(WakuMetadataCodec))
if catchRes.isErr():
return err(catchRes.error.msg)
return ok()
## Waku relay
@ -1124,6 +1134,9 @@ proc start*(node: WakuNode) {.async.} =
if not node.wakuRelay.isNil():
await node.startRelay()
if not node.wakuMetadata.isNil():
node.wakuMetadata.start()
## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper =
@ -1136,8 +1149,6 @@ proc start*(node: WakuNode) {.async.} =
node.started = true
node.wakuMetadata.start()
if not zeroPortPresent:
printNodeNetworkInfo(node)
else:
@ -1149,6 +1160,9 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuRelay.isNil():
await node.wakuRelay.stop()
if not node.wakuMetadata.isNil():
node.wakuMetadata.stop()
await node.switch.stop()
node.peerManager.stop()

View File

@ -49,6 +49,8 @@ type WakuDiscoveryV5* = ref object
protocol*: protocol.Protocol
listening*: bool
predicate: Option[WakuDiscv5Predicate]
peerManager: Option[PeerManager]
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
## Filter peers based on relay sharding information
@ -72,7 +74,9 @@ proc new*(
T: type WakuDiscoveryV5,
rng: ref HmacDrbgContext,
conf: WakuDiscoveryV5Config,
record: Option[waku_enr.Record]
record: Option[waku_enr.Record],
peerManager: Option[PeerManager] = none(PeerManager),
queue: AsyncEventQueue[SubscriptionEvent] = newAsyncEventQueue[SubscriptionEvent](30),
): T =
let shardPredOp =
if record.isSome(): shardingPredicate(record.get())
@ -101,82 +105,26 @@ proc new*(
enrUdpPort = none(Port),
)
WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false, predicate: shardPredOp)
proc new*(T: type WakuDiscoveryV5,
extIp: Option[ValidIpAddress],
extTcpPort: Option[Port],
extUdpPort: Option[Port],
bindIP: ValidIpAddress,
discv5UdpPort: Port,
bootstrapEnrs = newSeq[enr.Record](),
enrAutoUpdate = false,
privateKey: eth_keys.PrivateKey,
flags: CapabilitiesBitfield,
multiaddrs = newSeq[MultiAddress](),
rng: ref HmacDrbgContext,
topics: seq[string],
discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig
): T {.
deprecated: "use the config and record proc variant instead".}=
let relayShardsRes = topicsToRelayShards(topics)
let relayShard =
if relayShardsRes.isErr():
debug "pubsub topic parsing error", reason = relayShardsRes.error
none(RelayShards)
else: relayShardsRes.get()
let record = block:
var builder = EnrBuilder.init(privateKey)
builder.withIpAddressAndPorts(
ipAddr = extIp,
tcpPort = extTcpPort,
udpPort = extUdpPort,
)
builder.withWakuCapabilities(flags)
builder.withMultiaddrs(multiaddrs)
if relayShard.isSome():
let res = builder.withWakuRelaySharding(relayShard.get())
if res.isErr():
debug "building ENR with relay sharding failed", reason = res.error
else:
debug "building ENR with relay sharding information", clusterId = $relayShard.get().clusterId(), shards = $relayShard.get().shardIds()
builder.build().expect("Record within size limits")
let conf = WakuDiscoveryV5Config(
discv5Config: some(discv5Config),
address: bindIP,
port: discv5UdpPort,
privateKey: privateKey,
bootstrapRecords: bootstrapEnrs,
autoupdateRecord: enrAutoUpdate,
)
WakuDiscoveryV5.new(rng, conf, some(record))
WakuDiscoveryV5(
conf: conf,
protocol: protocol,
listening: false,
predicate: shardPredOp,
peerManager: peerManager,
topicSubscriptionQueue: queue,
)
proc updateENRShards(wd: WakuDiscoveryV5,
newTopics: seq[PubsubTopic], add: bool): Result[void, string] =
## Add or remove shards from the Discv5 ENR
let newShardOp = topicsToRelayShards(newTopics).valueOr:
return err("ENR update failed: " & error)
let newShard = newShardOp.valueOr:
return ok()
let newShardOp = ?topicsToRelayShards(newTopics)
let newShard =
if newShardOp.isSome():
newShardOp.get()
else:
return ok()
let typedRecordRes = wd.protocol.localNode.record.toTyped()
let typedRecord =
if typedRecordRes.isErr():
return err($typedRecordRes.error)
else:
typedRecordRes.get()
let typedRecord = wd.protocol.localNode.record.toTyped().valueOr:
return err("ENR update failed: " & $error)
let currentShardsOp = typedRecord.relaySharding()
@ -185,14 +133,16 @@ proc updateENRShards(wd: WakuDiscoveryV5,
let currentShard = currentShardsOp.get()
if currentShard.clusterId != newShard.clusterId:
return err("ENR are limited to one clusterId id")
return err("ENR update failed: clusterId id mismatch")
RelayShards.init(currentShard.clusterId, currentShard.shardIds & newShard.shardIds).valueOr:
return err("ENR update failed: " & error)
?RelayShards.init(currentShard.clusterId, currentShard.shardIds & newShard.shardIds)
elif not add and currentShardsOp.isSome():
let currentShard = currentShardsOp.get()
if currentShard.clusterId != newShard.clusterId:
return err("ENR are limited to one clusterId id")
return err("ENR update failed: clusterId id mismatch")
let currentSet = toHashSet(currentShard.shardIds)
let newSet = toHashSet(newShard.shardIds)
@ -200,16 +150,11 @@ proc updateENRShards(wd: WakuDiscoveryV5,
let indices = toSeq(currentSet - newSet)
if indices.len == 0:
# Can't create RelayShard with no indices so update then return
let (field, value) = (ShardingIndicesListEnrField, newSeq[byte](3))
return err("ENR update failed: cannot remove all shards")
let res = wd.protocol.updateRecord([(field, value)])
if res.isErr():
return err($res.error)
RelayShards.init(currentShard.clusterId, indices).valueOr:
return err("ENR update failed: " & error)
return ok()
?RelayShards.init(currentShard.clusterId, indices)
elif add and currentShardsOp.isNone(): newShard
else: return ok()
@ -217,18 +162,13 @@ proc updateENRShards(wd: WakuDiscoveryV5,
if resultShard.shardIds.len >= ShardingIndicesListMaxLength:
(ShardingBitVectorEnrField, resultShard.toBitVector())
else:
let listRes = resultShard.toIndicesList()
let list =
if listRes.isErr():
return err($listRes.error)
else:
listRes.get()
let list = resultShard.toIndicesList().valueOr:
return err("ENR update failed: " & $error)
(ShardingIndicesListEnrField, list)
let res = wd.protocol.updateRecord([(field, value)])
if res.isErr():
return err($res.error)
wd.protocol.updateRecord([(field, value)]).isOkOr:
return err("ENR update failed: " & $error)
return ok()
@ -246,10 +186,12 @@ proc findRandomPeers*(wd: WakuDiscoveryV5, overridePred = none(WakuDiscv5Predica
return discoveredRecords
#TODO abstract away PeerManager
proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager) {.async.} =
proc searchLoop(wd: WakuDiscoveryV5) {.async.} =
## Continuously add newly discovered nodes
let peerManager = wd.peerManager.valueOr:
return
info "Starting discovery v5 search"
while wd.listening:
@ -267,50 +209,13 @@ proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager) {.async.} =
# Also, give some time to dial the discovered nodes and update stats, etc.
await sleepAsync(5.seconds)
proc start*(wd: WakuDiscoveryV5): Result[void, string] =
if wd.listening:
return err("already listening")
info "Starting discovery v5 service"
debug "start listening on udp port", address = $wd.conf.address, port = $wd.conf.port
try:
wd.protocol.open()
except CatchableError:
return err("failed to open udp port: " & getCurrentExceptionMsg())
wd.listening = true
trace "start discv5 service"
wd.protocol.start()
debug "Successfully started discovery v5 service"
info "Discv5: discoverable ENR ", enr = wd.protocol.localNode.record.toUri()
ok()
proc stop*(wd: WakuDiscoveryV5): Future[void] {.async.} =
if not wd.listening:
return
info "Stopping discovery v5 service"
wd.listening = false
trace "Stop listening on discv5 port"
await wd.protocol.closeWait()
debug "Successfully stopped discovery v5 service"
proc subscriptionsListener*(
wd: WakuDiscoveryV5,
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
) {.async.} =
proc subscriptionsListener(wd: WakuDiscoveryV5) {.async.} =
## Listen for pubsub topics subscriptions changes
let key = topicSubscriptionQueue.register()
let key = wd.topicSubscriptionQueue.register()
while wd.listening:
let events = await topicSubscriptionQueue.waitEvents(key)
let events = await wd.topicSubscriptionQueue.waitEvents(key)
# Since we don't know the events we will receive we have to anticipate.
@ -336,7 +241,44 @@ proc subscriptionsListener*(
wd.predicate = shardingPredicate(wd.protocol.localNode.record)
topicSubscriptionQueue.unregister(key)
wd.topicSubscriptionQueue.unregister(key)
proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async.} =
if wd.listening:
return err("already listening")
info "Starting discovery v5 service"
debug "start listening on udp port", address = $wd.conf.address, port = $wd.conf.port
try:
wd.protocol.open()
except CatchableError:
return err("failed to open udp port: " & getCurrentExceptionMsg())
wd.listening = true
trace "start discv5 service"
wd.protocol.start()
asyncSpawn wd.searchLoop()
asyncSpawn wd.subscriptionsListener()
debug "Successfully started discovery v5 service"
info "Discv5: discoverable ENR ", enr = wd.protocol.localNode.record.toUri()
ok()
proc stop*(wd: WakuDiscoveryV5): Future[void] {.async.} =
if not wd.listening:
return
info "Stopping discovery v5 service"
wd.listening = false
trace "Stop listening on discv5 port"
await wd.protocol.closeWait()
debug "Successfully stopped discovery v5 service"
## Helper functions

View File

@ -4,7 +4,7 @@ else:
{.push raises: [].}
import
std/[options, sequtils, random, sets],
std/[options, sequtils, sets],
stew/results,
chronicles,
chronos,
@ -15,7 +15,9 @@ import
eth/p2p/discoveryv5/enr
import
../common/nimchronos,
../common/enr,
../waku_core,
../waku_enr,
./rpc
logScope:
@ -93,13 +95,24 @@ proc initProtocolHandler*(m: WakuMetadata) =
proc new*(T: type WakuMetadata,
clusterId: uint32,
enr: Record,
queue: AsyncEventQueue[SubscriptionEvent],
): T =
let wm = WakuMetadata(clusterId: clusterId, topicSubscriptionQueue: queue)
var (cluster, shards) = (clusterId, initHashSet[uint32]())
let enrRes = enr.toTyped()
if enrRes.isOk():
let shardingRes = enrRes.get().relaySharding()
if shardingRes.isSome():
let relayShard = shardingRes.get()
cluster = uint32(relayShard.clusterId)
shards = toHashSet(relayShard.shardIds.mapIt(uint32(it)))
let wm = WakuMetadata(clusterId: cluster, shards: shards, topicSubscriptionQueue: queue)
wm.initProtocolHandler()
info "Created WakuMetadata protocol", clusterId=clusterId
info "Created WakuMetadata protocol", clusterId=cluster
return wm