mirror of https://github.com/waku-org/nwaku.git
chore: discv5 re-org setup (#1815)
Key, netconfig, enr are created at App initialization and config files has been renamed.
This commit is contained in:
parent
a44d4bfbcd
commit
44f9d8dc0e
|
@ -41,7 +41,8 @@ import
|
||||||
../../waku/v2/waku_lightpush,
|
../../waku/v2/waku_lightpush,
|
||||||
../../waku/v2/waku_filter,
|
../../waku/v2/waku_filter,
|
||||||
./wakunode2_validator_signed,
|
./wakunode2_validator_signed,
|
||||||
./config
|
./internal_config,
|
||||||
|
./external_config
|
||||||
import
|
import
|
||||||
../../waku/v2/node/message_cache,
|
../../waku/v2/node/message_cache,
|
||||||
../../waku/v2/node/rest/server,
|
../../waku/v2/node/rest/server,
|
||||||
|
@ -69,8 +70,11 @@ type
|
||||||
App* = object
|
App* = object
|
||||||
version: string
|
version: string
|
||||||
conf: WakuNodeConf
|
conf: WakuNodeConf
|
||||||
|
netConf: NetConfig
|
||||||
rng: ref HmacDrbgContext
|
rng: ref HmacDrbgContext
|
||||||
|
key: crypto.PrivateKey
|
||||||
|
record: Record
|
||||||
|
|
||||||
peerStore: Option[WakuPeerStorage]
|
peerStore: Option[WakuPeerStorage]
|
||||||
archiveDriver: Option[ArchiveDriver]
|
archiveDriver: Option[ArchiveDriver]
|
||||||
archiveRetentionPolicy: Option[RetentionPolicy]
|
archiveRetentionPolicy: Option[RetentionPolicy]
|
||||||
|
@ -95,7 +99,41 @@ func version*(app: App): string =
|
||||||
## Initialisation
|
## Initialisation
|
||||||
|
|
||||||
proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
|
proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
|
||||||
App(version: git_version, conf: conf, rng: rng, node: nil)
|
let key =
|
||||||
|
if conf.nodeKey.isSome():
|
||||||
|
conf.nodeKey.get()
|
||||||
|
else:
|
||||||
|
let keyRes = crypto.PrivateKey.random(Secp256k1, rng[])
|
||||||
|
|
||||||
|
if keyRes.isErr():
|
||||||
|
error "failed to generate key", error=keyRes.error
|
||||||
|
quit(QuitFailure)
|
||||||
|
|
||||||
|
keyRes.get()
|
||||||
|
|
||||||
|
let netConfigRes = networkConfiguration(conf)
|
||||||
|
let netConfig =
|
||||||
|
if netConfigRes.isErr():
|
||||||
|
error "failed to create internal config", error=netConfigRes.error
|
||||||
|
quit(QuitFailure)
|
||||||
|
else: netConfigRes.get()
|
||||||
|
|
||||||
|
let recordRes = createRecord(conf, netConfig, key)
|
||||||
|
let record =
|
||||||
|
if recordRes.isErr():
|
||||||
|
error "failed to create record", error=recordRes.error
|
||||||
|
quit(QuitFailure)
|
||||||
|
else: recordRes.get()
|
||||||
|
|
||||||
|
App(
|
||||||
|
version: git_version,
|
||||||
|
conf: conf,
|
||||||
|
netConf: netConfig,
|
||||||
|
rng: rng,
|
||||||
|
key: key,
|
||||||
|
record: record,
|
||||||
|
node: nil
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
## SQLite database
|
## SQLite database
|
||||||
|
@ -318,7 +356,10 @@ proc setupDyamicBootstrapNodes*(app: var App): AppResult[void] =
|
||||||
## Init waku node instance
|
## Init waku node instance
|
||||||
|
|
||||||
proc initNode(conf: WakuNodeConf,
|
proc initNode(conf: WakuNodeConf,
|
||||||
|
netConfig: NetConfig,
|
||||||
rng: ref HmacDrbgContext,
|
rng: ref HmacDrbgContext,
|
||||||
|
nodeKey: crypto.PrivateKey,
|
||||||
|
record: enr.Record,
|
||||||
peerStore: Option[WakuPeerStorage],
|
peerStore: Option[WakuPeerStorage],
|
||||||
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): AppResult[WakuNode] =
|
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): AppResult[WakuNode] =
|
||||||
|
|
||||||
|
@ -335,83 +376,11 @@ proc initNode(conf: WakuNodeConf,
|
||||||
|
|
||||||
dnsResolver = DnsResolver.new(nameServers)
|
dnsResolver = DnsResolver.new(nameServers)
|
||||||
|
|
||||||
let
|
|
||||||
nodekey = if conf.nodekey.isSome():
|
|
||||||
conf.nodekey.get()
|
|
||||||
else:
|
|
||||||
let nodekeyRes = crypto.PrivateKey.random(Secp256k1, rng[])
|
|
||||||
if nodekeyRes.isErr():
|
|
||||||
return err("failed to generate nodekey: " & $nodekeyRes.error)
|
|
||||||
nodekeyRes.get()
|
|
||||||
|
|
||||||
|
|
||||||
## `udpPort` is only supplied to satisfy underlying APIs but is not
|
|
||||||
## actually a supported transport for libp2p traffic.
|
|
||||||
let udpPort = conf.tcpPort
|
|
||||||
let natRes = setupNat(conf.nat, clientId,
|
|
||||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
|
||||||
Port(uint16(udpPort) + conf.portsShift))
|
|
||||||
if natRes.isErr():
|
|
||||||
return err("failed to setup NAT: " & $natRes.error)
|
|
||||||
|
|
||||||
let (extIp, extTcpPort, _) = natRes.get()
|
|
||||||
|
|
||||||
let
|
|
||||||
dns4DomainName = if conf.dns4DomainName != "": some(conf.dns4DomainName)
|
|
||||||
else: none(string)
|
|
||||||
|
|
||||||
discv5UdpPort = if conf.discv5Discovery: some(Port(uint16(conf.discv5UdpPort) + conf.portsShift))
|
|
||||||
else: none(Port)
|
|
||||||
|
|
||||||
## TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably
|
|
||||||
## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual
|
|
||||||
## config, the external port is the same as the bind port.
|
|
||||||
extPort = if (extIp.isSome() or dns4DomainName.isSome()) and extTcpPort.isNone():
|
|
||||||
some(Port(uint16(conf.tcpPort) + conf.portsShift))
|
|
||||||
else:
|
|
||||||
extTcpPort
|
|
||||||
extMultiAddrs = if (conf.extMultiAddrs.len > 0):
|
|
||||||
let extMultiAddrsValidationRes = validateExtMultiAddrs(conf.extMultiAddrs)
|
|
||||||
if extMultiAddrsValidationRes.isErr():
|
|
||||||
return err("invalid external multiaddress: " & extMultiAddrsValidationRes.error)
|
|
||||||
else:
|
|
||||||
extMultiAddrsValidationRes.get()
|
|
||||||
else:
|
|
||||||
@[]
|
|
||||||
|
|
||||||
wakuFlags = CapabilitiesBitfield.init(
|
|
||||||
lightpush = conf.lightpush,
|
|
||||||
filter = conf.filter,
|
|
||||||
store = conf.store,
|
|
||||||
relay = conf.relay
|
|
||||||
)
|
|
||||||
|
|
||||||
var node: WakuNode
|
var node: WakuNode
|
||||||
|
|
||||||
let pStorage = if peerStore.isNone(): nil
|
let pStorage = if peerStore.isNone(): nil
|
||||||
else: peerStore.get()
|
else: peerStore.get()
|
||||||
|
|
||||||
let rng = crypto.newRng()
|
|
||||||
# Wrap in none because NetConfig does not have a default constructor
|
|
||||||
# TODO: We could change bindIp in NetConfig to be something less restrictive than ValidIpAddress,
|
|
||||||
# which doesn't allow default construction
|
|
||||||
let netConfigRes = NetConfig.init(
|
|
||||||
bindIp = conf.listenAddress,
|
|
||||||
bindPort = Port(uint16(conf.tcpPort) + conf.portsShift),
|
|
||||||
extIp = extIp,
|
|
||||||
extPort = extPort,
|
|
||||||
extMultiAddrs = extMultiAddrs,
|
|
||||||
wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift),
|
|
||||||
wsEnabled = conf.websocketSupport,
|
|
||||||
wssEnabled = conf.websocketSecureSupport,
|
|
||||||
dns4DomainName = dns4DomainName,
|
|
||||||
discv5UdpPort = discv5UdpPort,
|
|
||||||
wakuFlags = some(wakuFlags),
|
|
||||||
)
|
|
||||||
if netConfigRes.isErr():
|
|
||||||
return err("failed to create net config instance: " & netConfigRes.error)
|
|
||||||
|
|
||||||
let netConfig = netConfigRes.get()
|
|
||||||
var wakuDiscv5 = none(WakuDiscoveryV5)
|
var wakuDiscv5 = none(WakuDiscoveryV5)
|
||||||
|
|
||||||
if conf.discv5Discovery:
|
if conf.discv5Discovery:
|
||||||
|
@ -449,6 +418,7 @@ proc initNode(conf: WakuNodeConf,
|
||||||
var builder = WakuNodeBuilder.init()
|
var builder = WakuNodeBuilder.init()
|
||||||
builder.withRng(rng)
|
builder.withRng(rng)
|
||||||
builder.withNodeKey(nodekey)
|
builder.withNodeKey(nodekey)
|
||||||
|
builder.withRecord(record)
|
||||||
builder.withNetworkConfiguration(netConfig)
|
builder.withNetworkConfiguration(netConfig)
|
||||||
builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity)
|
builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity)
|
||||||
builder.withSwitchConfiguration(
|
builder.withSwitchConfiguration(
|
||||||
|
@ -467,7 +437,7 @@ proc initNode(conf: WakuNodeConf,
|
||||||
|
|
||||||
proc setupWakuNode*(app: var App): AppResult[void] =
|
proc setupWakuNode*(app: var App): AppResult[void] =
|
||||||
## Waku node
|
## Waku node
|
||||||
let initNodeRes = initNode(app.conf, app.rng, app.peerStore, app.dynamicBootstrapNodes)
|
let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes)
|
||||||
if initNodeRes.isErr():
|
if initNodeRes.isErr():
|
||||||
return err("failed to init node: " & initNodeRes.error)
|
return err("failed to init node: " & initNodeRes.error)
|
||||||
|
|
||||||
|
@ -477,7 +447,9 @@ proc setupWakuNode*(app: var App): AppResult[void] =
|
||||||
|
|
||||||
## Mount protocols
|
## Mount protocols
|
||||||
|
|
||||||
proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
|
proc setupProtocols(node: WakuNode,
|
||||||
|
conf: WakuNodeConf,
|
||||||
|
nodeKey: crypto.PrivateKey,
|
||||||
archiveDriver: Option[ArchiveDriver],
|
archiveDriver: Option[ArchiveDriver],
|
||||||
archiveRetentionPolicy: Option[RetentionPolicy]): Future[AppResult[void]] {.async.} =
|
archiveRetentionPolicy: Option[RetentionPolicy]): Future[AppResult[void]] {.async.} =
|
||||||
## Setup configured protocols on an existing Waku v2 node.
|
## Setup configured protocols on an existing Waku v2 node.
|
||||||
|
@ -626,6 +598,7 @@ proc setupAndMountProtocols*(app: App): Future[AppResult[void]] {.async.} =
|
||||||
return await setupProtocols(
|
return await setupProtocols(
|
||||||
app.node,
|
app.node,
|
||||||
app.conf,
|
app.conf,
|
||||||
|
app.key,
|
||||||
app.archiveDriver,
|
app.archiveDriver,
|
||||||
app.archiveRetentionPolicy
|
app.archiveRetentionPolicy
|
||||||
)
|
)
|
||||||
|
|
|
@ -16,7 +16,8 @@ import
|
||||||
import
|
import
|
||||||
../../waku/common/confutils/envvar/defs as confEnvvarDefs,
|
../../waku/common/confutils/envvar/defs as confEnvvarDefs,
|
||||||
../../waku/common/confutils/envvar/std/net as confEnvvarNet,
|
../../waku/common/confutils/envvar/std/net as confEnvvarNet,
|
||||||
../../waku/common/logging
|
../../waku/common/logging,
|
||||||
|
../../waku/v2/waku_enr
|
||||||
|
|
||||||
export
|
export
|
||||||
confTomlDefs,
|
confTomlDefs,
|
|
@ -0,0 +1,107 @@
|
||||||
|
import
|
||||||
|
stew/results,
|
||||||
|
libp2p/crypto/crypto
|
||||||
|
import
|
||||||
|
../../waku/common/utils/nat,
|
||||||
|
../../waku/v2/node/config,
|
||||||
|
../../waku/v2/node/waku_node,
|
||||||
|
../../waku/v2/waku_enr,
|
||||||
|
./external_config
|
||||||
|
|
||||||
|
proc networkConfiguration*(conf: WakuNodeConf): NetConfigResult =
|
||||||
|
|
||||||
|
## `udpPort` is only supplied to satisfy underlying APIs but is not
|
||||||
|
## actually a supported transport for libp2p traffic.
|
||||||
|
let udpPort = conf.tcpPort
|
||||||
|
let natRes = setupNat(conf.nat, clientId,
|
||||||
|
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||||
|
Port(uint16(udpPort) + conf.portsShift))
|
||||||
|
if natRes.isErr():
|
||||||
|
return err("failed to setup NAT: " & $natRes.error)
|
||||||
|
|
||||||
|
|
||||||
|
let (extIp, extTcpPort, _) = natRes.get()
|
||||||
|
|
||||||
|
let
|
||||||
|
dns4DomainName = if conf.dns4DomainName != "": some(conf.dns4DomainName)
|
||||||
|
else: none(string)
|
||||||
|
|
||||||
|
discv5UdpPort = if conf.discv5Discovery: some(Port(uint16(conf.discv5UdpPort) + conf.portsShift))
|
||||||
|
else: none(Port)
|
||||||
|
|
||||||
|
## TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably
|
||||||
|
## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual
|
||||||
|
## config, the external port is the same as the bind port.
|
||||||
|
extPort = if (extIp.isSome() or dns4DomainName.isSome()) and extTcpPort.isNone():
|
||||||
|
some(Port(uint16(conf.tcpPort) + conf.portsShift))
|
||||||
|
else:
|
||||||
|
extTcpPort
|
||||||
|
extMultiAddrs = if (conf.extMultiAddrs.len > 0):
|
||||||
|
let extMultiAddrsValidationRes = validateExtMultiAddrs(conf.extMultiAddrs)
|
||||||
|
if extMultiAddrsValidationRes.isErr():
|
||||||
|
return err("invalid external multiaddress: " & $extMultiAddrsValidationRes.error)
|
||||||
|
|
||||||
|
else:
|
||||||
|
extMultiAddrsValidationRes.get()
|
||||||
|
else:
|
||||||
|
@[]
|
||||||
|
|
||||||
|
wakuFlags = CapabilitiesBitfield.init(
|
||||||
|
lightpush = conf.lightpush,
|
||||||
|
filter = conf.filter,
|
||||||
|
store = conf.store,
|
||||||
|
relay = conf.relay
|
||||||
|
)
|
||||||
|
|
||||||
|
# Wrap in none because NetConfig does not have a default constructor
|
||||||
|
# TODO: We could change bindIp in NetConfig to be something less restrictive than ValidIpAddress,
|
||||||
|
# which doesn't allow default construction
|
||||||
|
let netConfigRes = NetConfig.init(
|
||||||
|
bindIp = conf.listenAddress,
|
||||||
|
bindPort = Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||||
|
extIp = extIp,
|
||||||
|
extPort = extPort,
|
||||||
|
extMultiAddrs = extMultiAddrs,
|
||||||
|
wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift),
|
||||||
|
wsEnabled = conf.websocketSupport,
|
||||||
|
wssEnabled = conf.websocketSecureSupport,
|
||||||
|
dns4DomainName = dns4DomainName,
|
||||||
|
discv5UdpPort = discv5UdpPort,
|
||||||
|
wakuFlags = some(wakuFlags),
|
||||||
|
)
|
||||||
|
|
||||||
|
netConfigRes
|
||||||
|
|
||||||
|
proc createRecord*(conf: WakuNodeConf, netConf: NetConfig, key: crypto.PrivateKey): Result[enr.Record, string] =
|
||||||
|
let relayShardsRes = topicsToRelayShards(conf.topics)
|
||||||
|
|
||||||
|
let relayShardOp =
|
||||||
|
if relayShardsRes.isErr():
|
||||||
|
return err("building ENR with relay sharding failed: " & $relayShardsRes.error)
|
||||||
|
else: relayShardsRes.get()
|
||||||
|
|
||||||
|
var builder = EnrBuilder.init(key)
|
||||||
|
|
||||||
|
builder.withIpAddressAndPorts(
|
||||||
|
ipAddr = netConf.extIp,
|
||||||
|
tcpPort = netConf.extPort,
|
||||||
|
udpPort = netConf.discv5UdpPort,
|
||||||
|
)
|
||||||
|
|
||||||
|
if netConf.wakuFlags.isSome():
|
||||||
|
builder.withWakuCapabilities(netConf.wakuFlags.get())
|
||||||
|
|
||||||
|
builder.withMultiaddrs(netConf.enrMultiaddrs)
|
||||||
|
|
||||||
|
if relayShardOp.isSome():
|
||||||
|
let res = builder.withWakuRelaySharding(relayShardOp.get())
|
||||||
|
|
||||||
|
if res.isErr():
|
||||||
|
return err("building ENR with relay sharding failed: " & $res.error)
|
||||||
|
|
||||||
|
let res = builder.build()
|
||||||
|
|
||||||
|
if res.isErr():
|
||||||
|
return err("building ENR failed: " & $res.error)
|
||||||
|
|
||||||
|
ok(res.get())
|
|
@ -14,7 +14,7 @@ import
|
||||||
libp2p/crypto/crypto
|
libp2p/crypto/crypto
|
||||||
import
|
import
|
||||||
../../waku/common/logging,
|
../../waku/common/logging,
|
||||||
./config,
|
./external_config,
|
||||||
./app
|
./app
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
|
|
|
@ -25,6 +25,7 @@ type
|
||||||
nodeRng: Option[ref crypto.HmacDrbgContext]
|
nodeRng: Option[ref crypto.HmacDrbgContext]
|
||||||
nodeKey: Option[crypto.PrivateKey]
|
nodeKey: Option[crypto.PrivateKey]
|
||||||
netConfig: Option[NetConfig]
|
netConfig: Option[NetConfig]
|
||||||
|
record: Option[enr.Record]
|
||||||
|
|
||||||
# Peer storage and peer manager
|
# Peer storage and peer manager
|
||||||
peerStorage: Option[PeerStorage]
|
peerStorage: Option[PeerStorage]
|
||||||
|
@ -59,6 +60,9 @@ proc withRng*(builder: var WakuNodeBuilder, rng: ref crypto.HmacDrbgContext) =
|
||||||
proc withNodeKey*(builder: var WakuNodeBuilder, nodeKey: crypto.PrivateKey) =
|
proc withNodeKey*(builder: var WakuNodeBuilder, nodeKey: crypto.PrivateKey) =
|
||||||
builder.nodeKey = some(nodeKey)
|
builder.nodeKey = some(nodeKey)
|
||||||
|
|
||||||
|
proc withRecord*(builder: var WakuNodeBuilder, record: enr.Record) =
|
||||||
|
builder.record = some(record)
|
||||||
|
|
||||||
proc withNetworkConfiguration*(builder: var WakuNodeBuilder, config: NetConfig) =
|
proc withNetworkConfiguration*(builder: var WakuNodeBuilder, config: NetConfig) =
|
||||||
builder.netConfig = some(config)
|
builder.netConfig = some(config)
|
||||||
|
|
||||||
|
@ -151,6 +155,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
|
||||||
rng = rng,
|
rng = rng,
|
||||||
nodeKey = builder.nodeKey.get(),
|
nodeKey = builder.nodeKey.get(),
|
||||||
netConfig = builder.netConfig.get(),
|
netConfig = builder.netConfig.get(),
|
||||||
|
enr = builder.record,
|
||||||
peerStorage = builder.peerStorage.get(nil),
|
peerStorage = builder.peerStorage.get(nil),
|
||||||
peerStoreCapacity = builder.peerStorageCapacity,
|
peerStoreCapacity = builder.peerStorageCapacity,
|
||||||
maxConnections = builder.switchMaxConnections.get(builders.MaxConnections),
|
maxConnections = builder.switchMaxConnections.get(builders.MaxConnections),
|
||||||
|
|
|
@ -8,6 +8,8 @@ import
|
||||||
stew/results,
|
stew/results,
|
||||||
stew/shims/net,
|
stew/shims/net,
|
||||||
libp2p/multiaddress
|
libp2p/multiaddress
|
||||||
|
import
|
||||||
|
../../waku/v2/waku_core/peers
|
||||||
import
|
import
|
||||||
../waku_enr
|
../waku_enr
|
||||||
|
|
||||||
|
|
|
@ -158,6 +158,7 @@ proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService =
|
||||||
proc new*(T: type WakuNode,
|
proc new*(T: type WakuNode,
|
||||||
nodeKey: crypto.PrivateKey,
|
nodeKey: crypto.PrivateKey,
|
||||||
netConfig: NetConfig,
|
netConfig: NetConfig,
|
||||||
|
enr: Option[enr.Record],
|
||||||
peerStorage: PeerStorage = nil,
|
peerStorage: PeerStorage = nil,
|
||||||
maxConnections = builders.MaxConnections,
|
maxConnections = builders.MaxConnections,
|
||||||
secureKey: string = "",
|
secureKey: string = "",
|
||||||
|
@ -191,15 +192,21 @@ proc new*(T: type WakuNode,
|
||||||
services = @[Service(getAutonatService(rng))],
|
services = @[Service(getAutonatService(rng))],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
let enr =
|
||||||
|
if enr.isNone():
|
||||||
let nodeEnrRes = getEnr(netConfig, wakuDiscv5, nodekey)
|
let nodeEnrRes = getEnr(netConfig, wakuDiscv5, nodekey)
|
||||||
|
|
||||||
if nodeEnrRes.isErr():
|
if nodeEnrRes.isErr():
|
||||||
raise newException(Defect, "failed to generate the node ENR record: " & $nodeEnrRes.error)
|
raise newException(Defect, "failed to generate the node ENR record: " & $nodeEnrRes.error)
|
||||||
|
|
||||||
|
nodeEnrRes.get()
|
||||||
|
else: enr.get()
|
||||||
|
|
||||||
return WakuNode(
|
return WakuNode(
|
||||||
peerManager: PeerManager.new(switch, peerStorage),
|
peerManager: PeerManager.new(switch, peerStorage),
|
||||||
switch: switch,
|
switch: switch,
|
||||||
rng: rng,
|
rng: rng,
|
||||||
enr: nodeEnrRes.get(),
|
enr: enr,
|
||||||
announcedAddresses: netConfig.announcedAddresses,
|
announcedAddresses: netConfig.announcedAddresses,
|
||||||
wakuDiscv5: if wakuDiscV5.isSome(): wakuDiscV5.get() else: nil,
|
wakuDiscv5: if wakuDiscV5.isSome(): wakuDiscV5.get() else: nil,
|
||||||
)
|
)
|
||||||
|
|
|
@ -48,27 +48,6 @@ type WakuDiscoveryV5* = ref object
|
||||||
protocol*: protocol.Protocol
|
protocol*: protocol.Protocol
|
||||||
listening*: bool
|
listening*: bool
|
||||||
|
|
||||||
func topicsToRelayShards*(topics: seq[string]): Result[Option[RelayShards], string] =
|
|
||||||
if topics.len < 1:
|
|
||||||
return ok(none(RelayShards))
|
|
||||||
|
|
||||||
let parsedTopicsRes = topics.mapIt(NsPubsubTopic.parse(it))
|
|
||||||
|
|
||||||
for res in parsedTopicsRes:
|
|
||||||
if res.isErr():
|
|
||||||
return err("failed to parse topic: " & $res.error)
|
|
||||||
|
|
||||||
if parsedTopicsRes.allIt(it.get().kind == NsPubsubTopicKind.NamedSharding):
|
|
||||||
return ok(none(RelayShards))
|
|
||||||
|
|
||||||
if parsedTopicsRes.anyIt(it.get().kind == NsPubsubTopicKind.NamedSharding):
|
|
||||||
return err("use named topics OR sharded ones not both.")
|
|
||||||
|
|
||||||
if parsedTopicsRes.anyIt(it.get().cluster != parsedTopicsRes[0].get().cluster):
|
|
||||||
return err("use sharded topics within the same cluster.")
|
|
||||||
|
|
||||||
return ok(some(RelayShards.init(parsedTopicsRes[0].get().cluster, parsedTopicsRes.mapIt(it.get().shard))))
|
|
||||||
|
|
||||||
proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscoveryV5Config, record: Option[waku_enr.Record]): T =
|
proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscoveryV5Config, record: Option[waku_enr.Record]): T =
|
||||||
let protocol = newProtocol(
|
let protocol = newProtocol(
|
||||||
rng = rng,
|
rng = rng,
|
||||||
|
|
|
@ -68,6 +68,26 @@ func init*(T: type RelayShards, cluster: uint16, indices: seq[uint16]): T =
|
||||||
|
|
||||||
RelayShards(cluster: cluster, indices: indicesSeq)
|
RelayShards(cluster: cluster, indices: indicesSeq)
|
||||||
|
|
||||||
|
func topicsToRelayShards*(topics: seq[string]): Result[Option[RelayShards], string] =
|
||||||
|
if topics.len < 1:
|
||||||
|
return ok(none(RelayShards))
|
||||||
|
|
||||||
|
let parsedTopicsRes = topics.mapIt(NsPubsubTopic.parse(it))
|
||||||
|
|
||||||
|
for res in parsedTopicsRes:
|
||||||
|
if res.isErr():
|
||||||
|
return err("failed to parse topic: " & $res.error)
|
||||||
|
|
||||||
|
if parsedTopicsRes.allIt(it.get().kind == NsPubsubTopicKind.NamedSharding):
|
||||||
|
return ok(none(RelayShards))
|
||||||
|
|
||||||
|
if parsedTopicsRes.anyIt(it.get().kind == NsPubsubTopicKind.NamedSharding):
|
||||||
|
return err("use named topics OR sharded ones not both.")
|
||||||
|
|
||||||
|
if parsedTopicsRes.anyIt(it.get().cluster != parsedTopicsRes[0].get().cluster):
|
||||||
|
return err("use sharded topics within the same cluster.")
|
||||||
|
|
||||||
|
return ok(some(RelayShards.init(parsedTopicsRes[0].get().cluster, parsedTopicsRes.mapIt(it.get().shard))))
|
||||||
|
|
||||||
func contains*(rs: RelayShards, cluster, index: uint16): bool =
|
func contains*(rs: RelayShards, cluster, index: uint16): bool =
|
||||||
rs.cluster == cluster and rs.indices.contains(index)
|
rs.cluster == cluster and rs.indices.contains(index)
|
||||||
|
|
Loading…
Reference in New Issue