mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-02-18 21:03:36 +00:00
* Initial for liblogosdelivery library (static & dynamic) based on current state of API. * nix build support added. * logosdelivery_example * Added support for missing logLevel/logFormat in new API create_node * Added full JSON to NodeConfig support * Added ctx and ctx.myLib check to avoid uninitialzed calls and crash. Adjusted logosdelivery_example with proper error handling and JSON config format * target aware install phase * Fix base64 decode of payload
528 lines
16 KiB
Nim
528 lines
16 KiB
Nim
import std/[net, options]
|
|
|
|
import results
|
|
import json_serialization, json_serialization/std/options as json_options
|
|
|
|
import
|
|
waku/common/utils/parse_size_units,
|
|
waku/common/logging,
|
|
waku/factory/waku_conf,
|
|
waku/factory/conf_builder/conf_builder,
|
|
waku/factory/networks_config,
|
|
./entry_nodes
|
|
|
|
export json_serialization, json_options
|
|
|
|
type AutoShardingConfig* {.requiresInit.} = object
|
|
numShardsInCluster*: uint16
|
|
|
|
type RlnConfig* {.requiresInit.} = object
|
|
contractAddress*: string
|
|
chainId*: uint
|
|
epochSizeSec*: uint64
|
|
|
|
type NetworkingConfig* {.requiresInit.} = object
|
|
listenIpv4*: string
|
|
p2pTcpPort*: uint16
|
|
discv5UdpPort*: uint16
|
|
|
|
type MessageValidation* {.requiresInit.} = object
|
|
maxMessageSize*: string # Accepts formats like "150 KiB", "1500 B"
|
|
rlnConfig*: Option[RlnConfig]
|
|
|
|
type ProtocolsConfig* {.requiresInit.} = object
|
|
entryNodes: seq[string]
|
|
staticStoreNodes: seq[string]
|
|
clusterId: uint16
|
|
autoShardingConfig: AutoShardingConfig
|
|
messageValidation: MessageValidation
|
|
|
|
const DefaultNetworkingConfig* =
|
|
NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 60000, discv5UdpPort: 9000)
|
|
|
|
const DefaultAutoShardingConfig* = AutoShardingConfig(numShardsInCluster: 1)
|
|
|
|
const DefaultMessageValidation* =
|
|
MessageValidation(maxMessageSize: "150 KiB", rlnConfig: none(RlnConfig))
|
|
|
|
proc init*(
|
|
T: typedesc[ProtocolsConfig],
|
|
entryNodes: seq[string],
|
|
staticStoreNodes: seq[string] = @[],
|
|
clusterId: uint16,
|
|
autoShardingConfig: AutoShardingConfig = DefaultAutoShardingConfig,
|
|
messageValidation: MessageValidation = DefaultMessageValidation,
|
|
): T =
|
|
return T(
|
|
entryNodes: entryNodes,
|
|
staticStoreNodes: staticStoreNodes,
|
|
clusterId: clusterId,
|
|
autoShardingConfig: autoShardingConfig,
|
|
messageValidation: messageValidation,
|
|
)
|
|
|
|
const TheWakuNetworkPreset* = ProtocolsConfig(
|
|
entryNodes:
|
|
@[
|
|
"enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im"
|
|
],
|
|
staticStoreNodes: @[],
|
|
clusterId: 1,
|
|
autoShardingConfig: AutoShardingConfig(numShardsInCluster: 8),
|
|
messageValidation: MessageValidation(
|
|
maxMessageSize: "150 KiB",
|
|
rlnConfig: some(
|
|
RlnConfig(
|
|
contractAddress: "0xB9cd878C90E49F797B4431fBF4fb333108CB90e6",
|
|
chainId: 59141,
|
|
epochSizeSec: 600, # 10 minutes
|
|
)
|
|
),
|
|
),
|
|
)
|
|
|
|
type WakuMode* {.pure.} = enum
|
|
Edge
|
|
Core
|
|
|
|
type NodeConfig* {.requiresInit.} = object
|
|
mode: WakuMode
|
|
protocolsConfig: ProtocolsConfig
|
|
networkingConfig: NetworkingConfig
|
|
ethRpcEndpoints: seq[string]
|
|
p2pReliability: bool
|
|
logLevel: LogLevel
|
|
logFormat: LogFormat
|
|
|
|
proc init*(
|
|
T: typedesc[NodeConfig],
|
|
mode: WakuMode = WakuMode.Core,
|
|
protocolsConfig: ProtocolsConfig = TheWakuNetworkPreset,
|
|
networkingConfig: NetworkingConfig = DefaultNetworkingConfig,
|
|
ethRpcEndpoints: seq[string] = @[],
|
|
p2pReliability: bool = false,
|
|
logLevel: LogLevel = LogLevel.INFO,
|
|
logFormat: LogFormat = LogFormat.TEXT,
|
|
): T =
|
|
return T(
|
|
mode: mode,
|
|
protocolsConfig: protocolsConfig,
|
|
networkingConfig: networkingConfig,
|
|
ethRpcEndpoints: ethRpcEndpoints,
|
|
p2pReliability: p2pReliability,
|
|
logLevel: logLevel,
|
|
logFormat: logFormat,
|
|
)
|
|
|
|
# -- Getters for ProtocolsConfig (private fields) - used for testing --
|
|
|
|
proc entryNodes*(c: ProtocolsConfig): seq[string] =
|
|
c.entryNodes
|
|
|
|
proc staticStoreNodes*(c: ProtocolsConfig): seq[string] =
|
|
c.staticStoreNodes
|
|
|
|
proc clusterId*(c: ProtocolsConfig): uint16 =
|
|
c.clusterId
|
|
|
|
proc autoShardingConfig*(c: ProtocolsConfig): AutoShardingConfig =
|
|
c.autoShardingConfig
|
|
|
|
proc messageValidation*(c: ProtocolsConfig): MessageValidation =
|
|
c.messageValidation
|
|
|
|
# -- Getters for NodeConfig (private fields) - used for testing --
|
|
|
|
proc mode*(c: NodeConfig): WakuMode =
|
|
c.mode
|
|
|
|
proc protocolsConfig*(c: NodeConfig): ProtocolsConfig =
|
|
c.protocolsConfig
|
|
|
|
proc networkingConfig*(c: NodeConfig): NetworkingConfig =
|
|
c.networkingConfig
|
|
|
|
proc ethRpcEndpoints*(c: NodeConfig): seq[string] =
|
|
c.ethRpcEndpoints
|
|
|
|
proc p2pReliability*(c: NodeConfig): bool =
|
|
c.p2pReliability
|
|
|
|
proc logLevel*(c: NodeConfig): LogLevel =
|
|
c.logLevel
|
|
|
|
proc logFormat*(c: NodeConfig): LogFormat =
|
|
c.logFormat
|
|
|
|
proc toWakuConf*(nodeConfig: NodeConfig): Result[WakuConf, string] =
|
|
var b = WakuConfBuilder.init()
|
|
|
|
# Apply log configuration
|
|
b.withLogLevel(nodeConfig.logLevel)
|
|
b.withLogFormat(nodeConfig.logFormat)
|
|
|
|
# Apply networking configuration
|
|
let networkingConfig = nodeConfig.networkingConfig
|
|
let ip = parseIpAddress(networkingConfig.listenIpv4)
|
|
|
|
b.withP2pListenAddress(ip)
|
|
b.withP2pTcpPort(networkingConfig.p2pTcpPort)
|
|
b.discv5Conf.withUdpPort(networkingConfig.discv5UdpPort)
|
|
|
|
case nodeConfig.mode
|
|
of Core:
|
|
b.withRelay(true)
|
|
|
|
# Metadata is always mounted
|
|
|
|
b.filterServiceConf.withEnabled(true)
|
|
b.filterServiceConf.withMaxPeersToServe(20)
|
|
|
|
b.withLightPush(true)
|
|
|
|
b.discv5Conf.withEnabled(true)
|
|
b.withPeerExchange(true)
|
|
b.withRendezvous(true)
|
|
|
|
# TODO: fix store as client usage
|
|
|
|
b.rateLimitConf.withRateLimits(@["filter:100/1s", "lightpush:5/1s", "px:5/1s"])
|
|
of Edge:
|
|
# All client side protocols are mounted by default
|
|
# Peer exchange client is always enabled and start_node will start the px loop
|
|
# Metadata is always mounted
|
|
b.withPeerExchange(true)
|
|
# switch off all service side protocols and relay
|
|
b.withRelay(false)
|
|
b.filterServiceConf.withEnabled(false)
|
|
b.withLightPush(false)
|
|
b.storeServiceConf.withEnabled(false)
|
|
# Leave discv5 and rendezvous for user choice
|
|
|
|
## Network Conf
|
|
let protocolsConfig = nodeConfig.protocolsConfig
|
|
|
|
# Set cluster ID
|
|
b.withClusterId(protocolsConfig.clusterId)
|
|
|
|
# Set sharding configuration
|
|
b.withShardingConf(ShardingConfKind.AutoSharding)
|
|
let autoShardingConfig = protocolsConfig.autoShardingConfig
|
|
b.withNumShardsInCluster(autoShardingConfig.numShardsInCluster)
|
|
|
|
# Process entry nodes - supports enrtree:, enr:, and multiaddress formats
|
|
if protocolsConfig.entryNodes.len > 0:
|
|
let (enrTreeUrls, bootstrapEnrs, staticNodesFromEntry) = processEntryNodes(
|
|
protocolsConfig.entryNodes
|
|
).valueOr:
|
|
return err("Failed to process entry nodes: " & error)
|
|
|
|
# Set ENRTree URLs for DNS discovery
|
|
if enrTreeUrls.len > 0:
|
|
for url in enrTreeUrls:
|
|
b.dnsDiscoveryConf.withEnrTreeUrl(url)
|
|
b.dnsDiscoveryconf.withNameServers(
|
|
@[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")]
|
|
)
|
|
|
|
# Set ENR records as bootstrap nodes for discv5
|
|
if bootstrapEnrs.len > 0:
|
|
b.discv5Conf.withBootstrapNodes(bootstrapEnrs)
|
|
|
|
# Add static nodes (multiaddrs and those extracted from ENR entries)
|
|
if staticNodesFromEntry.len > 0:
|
|
b.withStaticNodes(staticNodesFromEntry)
|
|
|
|
# TODO: verify behaviour
|
|
# Set static store nodes
|
|
if protocolsConfig.staticStoreNodes.len > 0:
|
|
b.withStaticNodes(protocolsConfig.staticStoreNodes)
|
|
|
|
# Set message validation
|
|
let msgValidation = protocolsConfig.messageValidation
|
|
let maxSizeBytes = parseMsgSize(msgValidation.maxMessageSize).valueOr:
|
|
return err("Failed to parse max message size: " & error)
|
|
b.withMaxMessageSize(maxSizeBytes)
|
|
|
|
# Set RLN config if provided
|
|
if msgValidation.rlnConfig.isSome():
|
|
let rlnConfig = msgValidation.rlnConfig.get()
|
|
b.rlnRelayConf.withEnabled(true)
|
|
b.rlnRelayConf.withEthContractAddress(rlnConfig.contractAddress)
|
|
b.rlnRelayConf.withChainId(rlnConfig.chainId)
|
|
b.rlnRelayConf.withEpochSizeSec(rlnConfig.epochSizeSec)
|
|
b.rlnRelayConf.withDynamic(true)
|
|
b.rlnRelayConf.withEthClientUrls(nodeConfig.ethRpcEndpoints)
|
|
|
|
# TODO: we should get rid of those two
|
|
b.rlnRelayconf.withUserMessageLimit(100)
|
|
|
|
## Various configurations
|
|
b.withNatStrategy("any")
|
|
b.withP2PReliability(nodeConfig.p2pReliability)
|
|
|
|
let wakuConf = b.build().valueOr:
|
|
return err("Failed to build configuration: " & error)
|
|
|
|
wakuConf.validate().isOkOr:
|
|
return err("Failed to validate configuration: " & error)
|
|
|
|
return ok(wakuConf)
|
|
|
|
# ---- JSON serialization (writeValue / readValue) ----
|
|
# ---------- AutoShardingConfig ----------
|
|
|
|
proc writeValue*(w: var JsonWriter, val: AutoShardingConfig) {.raises: [IOError].} =
|
|
w.beginRecord()
|
|
w.writeField("numShardsInCluster", val.numShardsInCluster)
|
|
w.endRecord()
|
|
|
|
proc readValue*(
|
|
r: var JsonReader, val: var AutoShardingConfig
|
|
) {.raises: [SerializationError, IOError].} =
|
|
var numShardsInCluster: Option[uint16]
|
|
|
|
for fieldName in readObjectFields(r):
|
|
case fieldName
|
|
of "numShardsInCluster":
|
|
numShardsInCluster = some(r.readValue(uint16))
|
|
else:
|
|
r.raiseUnexpectedField(fieldName, "AutoShardingConfig")
|
|
|
|
if numShardsInCluster.isNone():
|
|
r.raiseUnexpectedValue("Missing required field 'numShardsInCluster'")
|
|
|
|
val = AutoShardingConfig(numShardsInCluster: numShardsInCluster.get())
|
|
|
|
# ---------- RlnConfig ----------
|
|
|
|
proc writeValue*(w: var JsonWriter, val: RlnConfig) {.raises: [IOError].} =
|
|
w.beginRecord()
|
|
w.writeField("contractAddress", val.contractAddress)
|
|
w.writeField("chainId", val.chainId)
|
|
w.writeField("epochSizeSec", val.epochSizeSec)
|
|
w.endRecord()
|
|
|
|
proc readValue*(
|
|
r: var JsonReader, val: var RlnConfig
|
|
) {.raises: [SerializationError, IOError].} =
|
|
var
|
|
contractAddress: Option[string]
|
|
chainId: Option[uint]
|
|
epochSizeSec: Option[uint64]
|
|
|
|
for fieldName in readObjectFields(r):
|
|
case fieldName
|
|
of "contractAddress":
|
|
contractAddress = some(r.readValue(string))
|
|
of "chainId":
|
|
chainId = some(r.readValue(uint))
|
|
of "epochSizeSec":
|
|
epochSizeSec = some(r.readValue(uint64))
|
|
else:
|
|
r.raiseUnexpectedField(fieldName, "RlnConfig")
|
|
|
|
if contractAddress.isNone():
|
|
r.raiseUnexpectedValue("Missing required field 'contractAddress'")
|
|
if chainId.isNone():
|
|
r.raiseUnexpectedValue("Missing required field 'chainId'")
|
|
if epochSizeSec.isNone():
|
|
r.raiseUnexpectedValue("Missing required field 'epochSizeSec'")
|
|
|
|
val = RlnConfig(
|
|
contractAddress: contractAddress.get(),
|
|
chainId: chainId.get(),
|
|
epochSizeSec: epochSizeSec.get(),
|
|
)
|
|
|
|
# ---------- NetworkingConfig ----------
|
|
|
|
proc writeValue*(w: var JsonWriter, val: NetworkingConfig) {.raises: [IOError].} =
|
|
w.beginRecord()
|
|
w.writeField("listenIpv4", val.listenIpv4)
|
|
w.writeField("p2pTcpPort", val.p2pTcpPort)
|
|
w.writeField("discv5UdpPort", val.discv5UdpPort)
|
|
w.endRecord()
|
|
|
|
proc readValue*(
|
|
r: var JsonReader, val: var NetworkingConfig
|
|
) {.raises: [SerializationError, IOError].} =
|
|
var
|
|
listenIpv4: Option[string]
|
|
p2pTcpPort: Option[uint16]
|
|
discv5UdpPort: Option[uint16]
|
|
|
|
for fieldName in readObjectFields(r):
|
|
case fieldName
|
|
of "listenIpv4":
|
|
listenIpv4 = some(r.readValue(string))
|
|
of "p2pTcpPort":
|
|
p2pTcpPort = some(r.readValue(uint16))
|
|
of "discv5UdpPort":
|
|
discv5UdpPort = some(r.readValue(uint16))
|
|
else:
|
|
r.raiseUnexpectedField(fieldName, "NetworkingConfig")
|
|
|
|
if listenIpv4.isNone():
|
|
r.raiseUnexpectedValue("Missing required field 'listenIpv4'")
|
|
if p2pTcpPort.isNone():
|
|
r.raiseUnexpectedValue("Missing required field 'p2pTcpPort'")
|
|
if discv5UdpPort.isNone():
|
|
r.raiseUnexpectedValue("Missing required field 'discv5UdpPort'")
|
|
|
|
val = NetworkingConfig(
|
|
listenIpv4: listenIpv4.get(),
|
|
p2pTcpPort: p2pTcpPort.get(),
|
|
discv5UdpPort: discv5UdpPort.get(),
|
|
)
|
|
|
|
# ---------- MessageValidation ----------
|
|
|
|
proc writeValue*(w: var JsonWriter, val: MessageValidation) {.raises: [IOError].} =
|
|
w.beginRecord()
|
|
w.writeField("maxMessageSize", val.maxMessageSize)
|
|
w.writeField("rlnConfig", val.rlnConfig)
|
|
w.endRecord()
|
|
|
|
proc readValue*(
|
|
r: var JsonReader, val: var MessageValidation
|
|
) {.raises: [SerializationError, IOError].} =
|
|
var
|
|
maxMessageSize: Option[string]
|
|
rlnConfig: Option[Option[RlnConfig]]
|
|
|
|
for fieldName in readObjectFields(r):
|
|
case fieldName
|
|
of "maxMessageSize":
|
|
maxMessageSize = some(r.readValue(string))
|
|
of "rlnConfig":
|
|
rlnConfig = some(r.readValue(Option[RlnConfig]))
|
|
else:
|
|
r.raiseUnexpectedField(fieldName, "MessageValidation")
|
|
|
|
if maxMessageSize.isNone():
|
|
r.raiseUnexpectedValue("Missing required field 'maxMessageSize'")
|
|
|
|
val = MessageValidation(
|
|
maxMessageSize: maxMessageSize.get(), rlnConfig: rlnConfig.get(none(RlnConfig))
|
|
)
|
|
|
|
# ---------- ProtocolsConfig ----------
|
|
|
|
proc writeValue*(w: var JsonWriter, val: ProtocolsConfig) {.raises: [IOError].} =
|
|
w.beginRecord()
|
|
w.writeField("entryNodes", val.entryNodes)
|
|
w.writeField("staticStoreNodes", val.staticStoreNodes)
|
|
w.writeField("clusterId", val.clusterId)
|
|
w.writeField("autoShardingConfig", val.autoShardingConfig)
|
|
w.writeField("messageValidation", val.messageValidation)
|
|
w.endRecord()
|
|
|
|
proc readValue*(
|
|
r: var JsonReader, val: var ProtocolsConfig
|
|
) {.raises: [SerializationError, IOError].} =
|
|
var
|
|
entryNodes: Option[seq[string]]
|
|
staticStoreNodes: Option[seq[string]]
|
|
clusterId: Option[uint16]
|
|
autoShardingConfig: Option[AutoShardingConfig]
|
|
messageValidation: Option[MessageValidation]
|
|
|
|
for fieldName in readObjectFields(r):
|
|
case fieldName
|
|
of "entryNodes":
|
|
entryNodes = some(r.readValue(seq[string]))
|
|
of "staticStoreNodes":
|
|
staticStoreNodes = some(r.readValue(seq[string]))
|
|
of "clusterId":
|
|
clusterId = some(r.readValue(uint16))
|
|
of "autoShardingConfig":
|
|
autoShardingConfig = some(r.readValue(AutoShardingConfig))
|
|
of "messageValidation":
|
|
messageValidation = some(r.readValue(MessageValidation))
|
|
else:
|
|
r.raiseUnexpectedField(fieldName, "ProtocolsConfig")
|
|
|
|
if entryNodes.isNone():
|
|
r.raiseUnexpectedValue("Missing required field 'entryNodes'")
|
|
if clusterId.isNone():
|
|
r.raiseUnexpectedValue("Missing required field 'clusterId'")
|
|
|
|
val = ProtocolsConfig.init(
|
|
entryNodes = entryNodes.get(),
|
|
staticStoreNodes = staticStoreNodes.get(@[]),
|
|
clusterId = clusterId.get(),
|
|
autoShardingConfig = autoShardingConfig.get(DefaultAutoShardingConfig),
|
|
messageValidation = messageValidation.get(DefaultMessageValidation),
|
|
)
|
|
|
|
# ---------- NodeConfig ----------
|
|
|
|
proc writeValue*(w: var JsonWriter, val: NodeConfig) {.raises: [IOError].} =
|
|
w.beginRecord()
|
|
w.writeField("mode", val.mode)
|
|
w.writeField("protocolsConfig", val.protocolsConfig)
|
|
w.writeField("networkingConfig", val.networkingConfig)
|
|
w.writeField("ethRpcEndpoints", val.ethRpcEndpoints)
|
|
w.writeField("p2pReliability", val.p2pReliability)
|
|
w.writeField("logLevel", val.logLevel)
|
|
w.writeField("logFormat", val.logFormat)
|
|
w.endRecord()
|
|
|
|
proc readValue*(
|
|
r: var JsonReader, val: var NodeConfig
|
|
) {.raises: [SerializationError, IOError].} =
|
|
var
|
|
mode: Option[WakuMode]
|
|
protocolsConfig: Option[ProtocolsConfig]
|
|
networkingConfig: Option[NetworkingConfig]
|
|
ethRpcEndpoints: Option[seq[string]]
|
|
p2pReliability: Option[bool]
|
|
logLevel: Option[LogLevel]
|
|
logFormat: Option[LogFormat]
|
|
|
|
for fieldName in readObjectFields(r):
|
|
case fieldName
|
|
of "mode":
|
|
mode = some(r.readValue(WakuMode))
|
|
of "protocolsConfig":
|
|
protocolsConfig = some(r.readValue(ProtocolsConfig))
|
|
of "networkingConfig":
|
|
networkingConfig = some(r.readValue(NetworkingConfig))
|
|
of "ethRpcEndpoints":
|
|
ethRpcEndpoints = some(r.readValue(seq[string]))
|
|
of "p2pReliability":
|
|
p2pReliability = some(r.readValue(bool))
|
|
of "logLevel":
|
|
logLevel = some(r.readValue(LogLevel))
|
|
of "logFormat":
|
|
logFormat = some(r.readValue(LogFormat))
|
|
else:
|
|
r.raiseUnexpectedField(fieldName, "NodeConfig")
|
|
|
|
val = NodeConfig.init(
|
|
mode = mode.get(WakuMode.Core),
|
|
protocolsConfig = protocolsConfig.get(TheWakuNetworkPreset),
|
|
networkingConfig = networkingConfig.get(DefaultNetworkingConfig),
|
|
ethRpcEndpoints = ethRpcEndpoints.get(@[]),
|
|
p2pReliability = p2pReliability.get(false),
|
|
logLevel = logLevel.get(LogLevel.INFO),
|
|
logFormat = logFormat.get(LogFormat.TEXT),
|
|
)
|
|
|
|
# ---------- Decode helper ----------
|
|
# Json.decode returns T via `result`, which conflicts with {.requiresInit.}
|
|
# on Nim 2.x. This helper avoids the issue by using readValue into a var.
|
|
|
|
proc decodeNodeConfigFromJson*(
|
|
jsonStr: string
|
|
): NodeConfig {.raises: [SerializationError].} =
|
|
var val = NodeConfig.init() # default-initialized
|
|
try:
|
|
var stream = unsafeMemoryInput(jsonStr)
|
|
var reader = JsonReader[DefaultFlavor].init(stream)
|
|
reader.readValue(val)
|
|
except IOError as err:
|
|
raise (ref SerializationError)(msg: err.msg)
|
|
return val
|