mirror of https://github.com/waku-org/nwaku.git
chore(refactor): Move record creation & fix libwaku compilation (#1833)
* Move record creation & fix libwaku * app,external_config,internal_config,libwaku,sharding: refactorgin to make it compile --------- Co-authored-by: SionoiS <simvivier@status.im>
This commit is contained in:
parent
96aa59ba58
commit
97d3b9f79f
|
@ -106,14 +106,33 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
|
||||||
|
|
||||||
keyRes.get()
|
keyRes.get()
|
||||||
|
|
||||||
let netConfigRes = networkConfiguration(conf)
|
let netConfigRes = networkConfiguration(conf, clientId)
|
||||||
|
|
||||||
let netConfig =
|
let netConfig =
|
||||||
if netConfigRes.isErr():
|
if netConfigRes.isErr():
|
||||||
error "failed to create internal config", error=netConfigRes.error
|
error "failed to create internal config", error=netConfigRes.error
|
||||||
quit(QuitFailure)
|
quit(QuitFailure)
|
||||||
else: netConfigRes.get()
|
else: netConfigRes.get()
|
||||||
|
|
||||||
let recordRes = createRecord(conf.topics, netConfig, key)
|
var enrBuilder = EnrBuilder.init(key)
|
||||||
|
|
||||||
|
enrBuilder.withIpAddressAndPorts(
|
||||||
|
netConfig.enrIp,
|
||||||
|
netConfig.enrPort,
|
||||||
|
netConfig.discv5UdpPort
|
||||||
|
)
|
||||||
|
|
||||||
|
if netConfig.wakuFlags.isSome():
|
||||||
|
enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get())
|
||||||
|
|
||||||
|
enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)
|
||||||
|
|
||||||
|
let addShardedTopics = enrBuilder.withShardedTopics(conf.topics)
|
||||||
|
if addShardedTopics.isErr():
|
||||||
|
error "failed to add sharded topics", error=addShardedTopics.error
|
||||||
|
quit(QuitFailure)
|
||||||
|
|
||||||
|
let recordRes = enrBuilder.build()
|
||||||
let record =
|
let record =
|
||||||
if recordRes.isErr():
|
if recordRes.isErr():
|
||||||
error "failed to create record", error=recordRes.error
|
error "failed to create record", error=recordRes.error
|
||||||
|
|
|
@ -537,13 +537,6 @@ proc validateDbUrl*(val: string): ConfResult[string] =
|
||||||
else:
|
else:
|
||||||
err("invalid 'db url' option format: " & val)
|
err("invalid 'db url' option format: " & val)
|
||||||
|
|
||||||
proc validateExtMultiAddrs*(vals: seq[string]): ConfResult[seq[MultiAddress]] =
|
|
||||||
var multiaddrs: seq[MultiAddress]
|
|
||||||
for val in vals:
|
|
||||||
let multiaddr = ? MultiAddress.init(val)
|
|
||||||
multiaddrs.add(multiaddr)
|
|
||||||
ok(multiaddrs)
|
|
||||||
|
|
||||||
## Load
|
## Load
|
||||||
|
|
||||||
proc readValue*(r: var TomlReader, value: var crypto.PrivateKey) {.raises: [SerializationError].} =
|
proc readValue*(r: var TomlReader, value: var crypto.PrivateKey) {.raises: [SerializationError].} =
|
||||||
|
|
|
@ -1,46 +1,62 @@
|
||||||
import
|
import
|
||||||
|
std/options,
|
||||||
|
stew/shims/net,
|
||||||
stew/results,
|
stew/results,
|
||||||
libp2p/crypto/crypto
|
libp2p/crypto/crypto,
|
||||||
|
libp2p/multiaddress
|
||||||
import
|
import
|
||||||
../../waku/common/utils/nat,
|
../../waku/common/utils/nat,
|
||||||
../../waku/v2/node/config,
|
../../waku/v2/node/config,
|
||||||
../../waku/v2/node/waku_node,
|
../../waku/v2/waku_enr/capabilities,
|
||||||
../../waku/v2/waku_enr,
|
|
||||||
./external_config
|
./external_config
|
||||||
|
|
||||||
proc networkConfiguration*(conf: WakuNodeConf): NetConfigResult =
|
proc validateExtMultiAddrs*(vals: seq[string]):
|
||||||
|
Result[seq[MultiAddress], string] =
|
||||||
|
var multiaddrs: seq[MultiAddress]
|
||||||
|
for val in vals:
|
||||||
|
let multiaddr = ? MultiAddress.init(val)
|
||||||
|
multiaddrs.add(multiaddr)
|
||||||
|
return ok(multiaddrs)
|
||||||
|
|
||||||
|
proc networkConfiguration*(conf: WakuNodeConf,
|
||||||
|
clientId: string,
|
||||||
|
): NetConfigResult =
|
||||||
|
|
||||||
## `udpPort` is only supplied to satisfy underlying APIs but is not
|
## `udpPort` is only supplied to satisfy underlying APIs but is not
|
||||||
## actually a supported transport for libp2p traffic.
|
## actually a supported transport for libp2p traffic.
|
||||||
let udpPort = conf.tcpPort
|
|
||||||
let natRes = setupNat(conf.nat, clientId,
|
let natRes = setupNat(conf.nat, clientId,
|
||||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||||
Port(uint16(udpPort) + conf.portsShift))
|
Port(uint16(conf.tcpPort) + conf.portsShift))
|
||||||
if natRes.isErr():
|
if natRes.isErr():
|
||||||
return err("failed to setup NAT: " & $natRes.error)
|
return err("failed to setup NAT: " & $natRes.error)
|
||||||
|
|
||||||
|
|
||||||
let (extIp, extTcpPort, _) = natRes.get()
|
let (extIp, extTcpPort, _) = natRes.get()
|
||||||
|
|
||||||
let
|
let
|
||||||
dns4DomainName = if conf.dns4DomainName != "": some(conf.dns4DomainName)
|
dns4DomainName = if conf.dns4DomainName != "": some(conf.dns4DomainName)
|
||||||
else: none(string)
|
else: none(string)
|
||||||
|
|
||||||
discv5UdpPort = if conf.discv5Discovery: some(Port(uint16(conf.discv5UdpPort) + conf.portsShift))
|
discv5UdpPort = if conf.discv5Discovery:
|
||||||
|
some(Port(uint16(conf.discv5UdpPort) + conf.portsShift))
|
||||||
else: none(Port)
|
else: none(Port)
|
||||||
|
|
||||||
## TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably
|
## TODO: the NAT setup assumes a manual port mapping configuration if extIp
|
||||||
## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual
|
## config is set. This probably implies adding manual config item for
|
||||||
## config, the external port is the same as the bind port.
|
## extPort as well. The following heuristic assumes that, in absence of
|
||||||
extPort = if (extIp.isSome() or dns4DomainName.isSome()) and extTcpPort.isNone():
|
## manual config, the external port is the same as the bind port.
|
||||||
|
|
||||||
|
extPort = if (extIp.isSome() or dns4DomainName.isSome()) and
|
||||||
|
extTcpPort.isNone():
|
||||||
some(Port(uint16(conf.tcpPort) + conf.portsShift))
|
some(Port(uint16(conf.tcpPort) + conf.portsShift))
|
||||||
else:
|
else:
|
||||||
extTcpPort
|
extTcpPort
|
||||||
extMultiAddrs = if (conf.extMultiAddrs.len > 0):
|
|
||||||
let extMultiAddrsValidationRes = validateExtMultiAddrs(conf.extMultiAddrs)
|
|
||||||
if extMultiAddrsValidationRes.isErr():
|
|
||||||
return err("invalid external multiaddress: " & $extMultiAddrsValidationRes.error)
|
|
||||||
|
|
||||||
|
extMultiAddrs = if (conf.extMultiAddrs.len > 0):
|
||||||
|
let extMultiAddrsValidationRes =
|
||||||
|
validateExtMultiAddrs(conf.extMultiAddrs)
|
||||||
|
if extMultiAddrsValidationRes.isErr():
|
||||||
|
return err("invalid external multiaddress: " &
|
||||||
|
$extMultiAddrsValidationRes.error)
|
||||||
else:
|
else:
|
||||||
extMultiAddrsValidationRes.get()
|
extMultiAddrsValidationRes.get()
|
||||||
else:
|
else:
|
||||||
|
@ -54,8 +70,8 @@ proc networkConfiguration*(conf: WakuNodeConf): NetConfigResult =
|
||||||
)
|
)
|
||||||
|
|
||||||
# Wrap in none because NetConfig does not have a default constructor
|
# Wrap in none because NetConfig does not have a default constructor
|
||||||
# TODO: We could change bindIp in NetConfig to be something less restrictive than ValidIpAddress,
|
# TODO: We could change bindIp in NetConfig to be something less restrictive
|
||||||
# which doesn't allow default construction
|
# than ValidIpAddress, which doesn't allow default construction
|
||||||
let netConfigRes = NetConfig.init(
|
let netConfigRes = NetConfig.init(
|
||||||
bindIp = conf.listenAddress,
|
bindIp = conf.listenAddress,
|
||||||
bindPort = Port(uint16(conf.tcpPort) + conf.portsShift),
|
bindPort = Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||||
|
@ -70,41 +86,4 @@ proc networkConfiguration*(conf: WakuNodeConf): NetConfigResult =
|
||||||
wakuFlags = some(wakuFlags),
|
wakuFlags = some(wakuFlags),
|
||||||
)
|
)
|
||||||
|
|
||||||
netConfigRes
|
return netConfigRes
|
||||||
|
|
||||||
proc createRecord*(topics: seq[string],
|
|
||||||
netConf: NetConfig,
|
|
||||||
key: crypto.PrivateKey):
|
|
||||||
Result[enr.Record, string] =
|
|
||||||
let relayShardsRes = topicsToRelayShards(topics)
|
|
||||||
|
|
||||||
let relayShardOp =
|
|
||||||
if relayShardsRes.isErr():
|
|
||||||
return err("building ENR with relay sharding failed: " & $relayShardsRes.error)
|
|
||||||
else: relayShardsRes.get()
|
|
||||||
|
|
||||||
var builder = EnrBuilder.init(key)
|
|
||||||
|
|
||||||
builder.withIpAddressAndPorts(
|
|
||||||
ipAddr = netConf.enrIp,
|
|
||||||
tcpPort = netConf.enrPort,
|
|
||||||
udpPort = netConf.discv5UdpPort,
|
|
||||||
)
|
|
||||||
|
|
||||||
if netConf.wakuFlags.isSome():
|
|
||||||
builder.withWakuCapabilities(netConf.wakuFlags.get())
|
|
||||||
|
|
||||||
builder.withMultiaddrs(netConf.enrMultiaddrs)
|
|
||||||
|
|
||||||
if relayShardOp.isSome():
|
|
||||||
let res = builder.withWakuRelaySharding(relayShardOp.get())
|
|
||||||
|
|
||||||
if res.isErr():
|
|
||||||
return err("building ENR with relay sharding failed: " & $res.error)
|
|
||||||
|
|
||||||
let res = builder.build()
|
|
||||||
|
|
||||||
if res.isErr():
|
|
||||||
return err("building ENR failed: " & $res.error)
|
|
||||||
|
|
||||||
ok(res.get())
|
|
||||||
|
|
|
@ -10,8 +10,11 @@ import
|
||||||
stew/shims/net
|
stew/shims/net
|
||||||
import
|
import
|
||||||
../vendor/nim-libp2p/libp2p/crypto/crypto,
|
../vendor/nim-libp2p/libp2p/crypto/crypto,
|
||||||
|
../../waku/common/enr/builder,
|
||||||
../../waku/common/utils/nat,
|
../../waku/common/utils/nat,
|
||||||
../../waku/v2/waku_enr/capabilities,
|
../../waku/v2/waku_enr/capabilities,
|
||||||
|
../../waku/v2/waku_enr/multiaddr,
|
||||||
|
../../waku/v2/waku_enr/sharding,
|
||||||
../../waku/v2/waku_core/message/codec,
|
../../waku/v2/waku_core/message/codec,
|
||||||
../../waku/v2/waku_core/message/message,
|
../../waku/v2/waku_core/message/message,
|
||||||
../../waku/v2/waku_core/topics/pubsub_topic,
|
../../waku/v2/waku_core/topics/pubsub_topic,
|
||||||
|
@ -47,17 +50,12 @@ type
|
||||||
|
|
||||||
var eventCallback:EventCallback = nil
|
var eventCallback:EventCallback = nil
|
||||||
|
|
||||||
proc relayEventCallback(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].} =
|
proc relayEventCallback(pubsubTopic: string,
|
||||||
|
msg: WakuMessage):
|
||||||
|
Future[void] {.gcsafe, raises: [Defect].} =
|
||||||
# Callback that hadles the Waku Relay events. i.e. messages or errors.
|
# Callback that hadles the Waku Relay events. i.e. messages or errors.
|
||||||
if not isNil(eventCallback):
|
if not isNil(eventCallback):
|
||||||
let msg = WakuMessage.decode(data)
|
let event = JsonMessageEvent.new(pubsubTopic, msg)
|
||||||
var event: JsonSignal
|
|
||||||
if msg.isOk():
|
|
||||||
event = JsonMessageEvent.new(pubsubTopic, msg.value)
|
|
||||||
else:
|
|
||||||
let errorMsg = string("Error decoding message.") & $msg.error
|
|
||||||
event = JsonErrorEvent.new(errorMsg)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
eventCallback(cstring($event))
|
eventCallback(cstring($event))
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -175,9 +173,34 @@ proc waku_new(config: ConfigNode,
|
||||||
jsonResp):
|
jsonResp):
|
||||||
return false
|
return false
|
||||||
|
|
||||||
|
var enrBuilder = EnrBuilder.init(privateKey)
|
||||||
|
|
||||||
|
enrBuilder.withIpAddressAndPorts(
|
||||||
|
netConfig.enrIp,
|
||||||
|
netConfig.enrPort,
|
||||||
|
netConfig.discv5UdpPort
|
||||||
|
)
|
||||||
|
|
||||||
|
if netConfig.wakuFlags.isSome():
|
||||||
|
enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get())
|
||||||
|
|
||||||
|
enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)
|
||||||
|
## TODO: pass the topics from conf parameter. We will pass it from
|
||||||
|
## json array in upcoming PRs.
|
||||||
|
let addShardedTopics = enrBuilder.withShardedTopics(@["/waku/2/default-waku/proto"])
|
||||||
|
if addShardedTopics.isErr():
|
||||||
|
return false
|
||||||
|
|
||||||
|
let recordRes = enrBuilder.build()
|
||||||
|
let record =
|
||||||
|
if recordRes.isErr():
|
||||||
|
return false
|
||||||
|
else: recordRes.get()
|
||||||
|
|
||||||
var builder = WakuNodeBuilder.init()
|
var builder = WakuNodeBuilder.init()
|
||||||
builder.withRng(crypto.newRng())
|
builder.withRng(crypto.newRng())
|
||||||
builder.withNodeKey(privateKey)
|
builder.withNodeKey(privateKey)
|
||||||
|
builder.withRecord(record)
|
||||||
builder.withNetworkConfiguration(netConfig)
|
builder.withNetworkConfiguration(netConfig)
|
||||||
builder.withSwitchConfiguration(
|
builder.withSwitchConfiguration(
|
||||||
maxConnections = some(50.int)
|
maxConnections = some(50.int)
|
||||||
|
@ -303,7 +326,8 @@ Kindly set it with the 'waku_set_event_callback' function""")
|
||||||
jsonResp = errResp("Cannot subscribe without Waku Relay enabled.")
|
jsonResp = errResp("Cannot subscribe without Waku Relay enabled.")
|
||||||
return false
|
return false
|
||||||
|
|
||||||
node.wakuRelay.subscribe(PubsubTopic($pubSubTopic), PubsubRawHandler(relayEventCallback))
|
node.wakuRelay.subscribe(PubsubTopic($pubSubTopic),
|
||||||
|
WakuRelayHandler(relayEventCallback))
|
||||||
|
|
||||||
jsonResp = okResp("true")
|
jsonResp = okResp("true")
|
||||||
return true
|
return true
|
||||||
|
@ -323,7 +347,7 @@ Kindly set it with the 'waku_set_event_callback' function""")
|
||||||
jsonResp = errResp("Cannot unsubscribe without Waku Relay enabled.")
|
jsonResp = errResp("Cannot unsubscribe without Waku Relay enabled.")
|
||||||
return false
|
return false
|
||||||
|
|
||||||
node.wakuRelay.unsubscribeAll(PubsubTopic($pubSubTopic))
|
node.wakuRelay.unsubscribe(PubsubTopic($pubSubTopic))
|
||||||
|
|
||||||
jsonResp = okResp("true")
|
jsonResp = okResp("true")
|
||||||
return true
|
return true
|
||||||
|
|
|
@ -187,6 +187,25 @@ func withWakuRelaySharding*(builder: var EnrBuilder, rs: RelayShards): EnrResult
|
||||||
else:
|
else:
|
||||||
builder.withWakuRelayShardingIndicesList(rs)
|
builder.withWakuRelayShardingIndicesList(rs)
|
||||||
|
|
||||||
|
func withShardedTopics*(builder: var EnrBuilder,
|
||||||
|
topics: seq[string]):
|
||||||
|
Result[void, string] =
|
||||||
|
let relayShardsRes = topicsToRelayShards(topics)
|
||||||
|
let relayShardOp =
|
||||||
|
if relayShardsRes.isErr():
|
||||||
|
return err("building ENR with relay sharding failed: " &
|
||||||
|
$relayShardsRes.error)
|
||||||
|
else: relayShardsRes.get()
|
||||||
|
|
||||||
|
if relayShardOp.isNone():
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
let res = builder.withWakuRelaySharding(relayShardOp.get())
|
||||||
|
|
||||||
|
if res.isErr():
|
||||||
|
return err($res.error)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
# ENR record accessors (e.g., Record, TypedRecord, etc.)
|
# ENR record accessors (e.g., Record, TypedRecord, etc.)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue