mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-09 09:23:14 +00:00
More config, add comments on macro
This commit is contained in:
parent
b1c1c148b5
commit
1a9e99f4f7
@ -20,7 +20,8 @@ import
|
||||
../common/logging,
|
||||
../waku_enr,
|
||||
../node/peer_manager,
|
||||
../waku_core/topics/pubsub_topic
|
||||
../waku_core/topics/pubsub_topic,
|
||||
./waku_conf
|
||||
|
||||
include ../waku_core/message/default_values
|
||||
|
||||
@ -33,10 +34,6 @@ type ConfResult*[T] = Result[T, string]
|
||||
|
||||
type EthRpcUrl* = distinct string
|
||||
|
||||
type ProtectedShard* = object
|
||||
shard*: uint16
|
||||
key*: secp256k1.SkPublicKey
|
||||
|
||||
type StartUpCommand* = enum
|
||||
noCommand # default, runs waku
|
||||
generateRlnKeystore # generates a new RLN keystore
|
||||
|
||||
@ -96,7 +96,7 @@ proc networkConfiguration*(conf: WakuConf, clientId: string): NetConfigResult =
|
||||
wakuFlags = CapabilitiesBitfield.init(
|
||||
lightpush = conf.lightpush,
|
||||
filter = conf.filter,
|
||||
store = conf.store,
|
||||
store = conf.storeServiceConf.isSome,
|
||||
relay = conf.relay,
|
||||
sync = conf.storeSync,
|
||||
)
|
||||
@ -170,7 +170,7 @@ proc networkConfiguration*(conf: WakuConf, clientId: string): NetConfigResult =
|
||||
# discard
|
||||
|
||||
# TODO: numShardsInNetwork should be mandatory with autosharding, and unneeded otherwise
|
||||
proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 =
|
||||
proc getNumShardsInNetwork*(conf: WakuConf): uint32 =
|
||||
if conf.numShardsInNetwork != 0:
|
||||
return conf.numShardsInNetwork
|
||||
# If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec
|
||||
|
||||
@ -174,11 +174,12 @@ proc setupProtocols(
|
||||
error "Unrecoverable error occurred", error = msg
|
||||
quit(QuitFailure)
|
||||
|
||||
if conf.store:
|
||||
if conf.legacyStore:
|
||||
if conf.storeServiceConf.isSome:
|
||||
let storeServiceConf = conf.storeServiceConf.get()
|
||||
if storeServiceConf.legacy:
|
||||
let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new(
|
||||
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration,
|
||||
conf.storeMaxNumDbConnections, onFatalErrorAction,
|
||||
storeServiceConf.dbUrl, storeServiceConf.dbVacuum, storeServiceConf.dbMigration,
|
||||
storeServiceConf.maxNumDbConnections, onFatalErrorAction,
|
||||
)
|
||||
if archiveDriverRes.isErr():
|
||||
return err("failed to setup legacy archive driver: " & archiveDriverRes.error)
|
||||
@ -198,26 +199,26 @@ proc setupProtocols(
|
||||
## So for now, we need to make sure that when legacy store is enabled and we use sqlite
|
||||
## that we migrate our db according to legacy store's schema to have the extra field
|
||||
|
||||
let engineRes = dburl.getDbEngine(conf.storeMessageDbUrl)
|
||||
let engineRes = dburl.getDbEngine(storeServiceConf.dbUrl)
|
||||
if engineRes.isErr():
|
||||
return err("error getting db engine in setupProtocols: " & engineRes.error)
|
||||
|
||||
let engine = engineRes.get()
|
||||
|
||||
let migrate =
|
||||
if engine == "sqlite" and conf.legacyStore:
|
||||
if engine == "sqlite" and storeServiceConf.legacy:
|
||||
false
|
||||
else:
|
||||
conf.storeMessageDbMigration
|
||||
storeServiceConf.dbMigration
|
||||
|
||||
let archiveDriverRes = waitFor driver.ArchiveDriver.new(
|
||||
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, migrate,
|
||||
conf.storeMaxNumDbConnections, onFatalErrorAction,
|
||||
storeServiceConf.dbUrl, storeServiceConf.dbVacuum, migrate,
|
||||
storeServiceConf.maxNumDbConnections, onFatalErrorAction,
|
||||
)
|
||||
if archiveDriverRes.isErr():
|
||||
return err("failed to setup archive driver: " & archiveDriverRes.error)
|
||||
|
||||
let retPolicyRes = policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy)
|
||||
let retPolicyRes = policy.RetentionPolicy.new(storeServiceConf.retentionPolicy)
|
||||
if retPolicyRes.isErr():
|
||||
return err("failed to create retention policy: " & retPolicyRes.error)
|
||||
|
||||
@ -225,7 +226,7 @@ proc setupProtocols(
|
||||
if mountArcRes.isErr():
|
||||
return err("failed to mount waku archive protocol: " & mountArcRes.error)
|
||||
|
||||
if conf.legacyStore:
|
||||
if storeServiceConf.legacy:
|
||||
# Store legacy setup
|
||||
try:
|
||||
await mountLegacyStore(node, node.rateLimitSettings.getSetting(STOREV2))
|
||||
@ -237,19 +238,21 @@ proc setupProtocols(
|
||||
try:
|
||||
await mountStore(node, node.rateLimitSettings.getSetting(STOREV3))
|
||||
except CatchableError:
|
||||
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
|
||||
return err(
|
||||
"failed to mount waku store protocremoteStoreNodeol: " & getCurrentExceptionMsg()
|
||||
)
|
||||
|
||||
mountStoreClient(node)
|
||||
if conf.storenode != "":
|
||||
let storeNode = parsePeerInfo(conf.storenode)
|
||||
if conf.remoteStoreNode.isSome:
|
||||
let storeNode = parsePeerInfo(conf.remoteStoreNode.get())
|
||||
if storeNode.isOk():
|
||||
node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec)
|
||||
else:
|
||||
return err("failed to set node waku store peer: " & storeNode.error)
|
||||
|
||||
mountLegacyStoreClient(node)
|
||||
if conf.storenode != "":
|
||||
let storeNode = parsePeerInfo(conf.storenode)
|
||||
if conf.remoteStoreNode.isSome:
|
||||
let storeNode = parsePeerInfo(conf.remoteStoreNode.get())
|
||||
if storeNode.isOk():
|
||||
node.peerManager.addServicePeer(
|
||||
storeNode.value, legacy_common.WakuLegacyStoreCodec
|
||||
@ -257,7 +260,7 @@ proc setupProtocols(
|
||||
else:
|
||||
return err("failed to set node waku legacy store peer: " & storeNode.error)
|
||||
|
||||
if conf.store and conf.storeResume:
|
||||
if conf.storeServiceConf.isSome and conf.storeServiceConf.get().resume:
|
||||
node.setupStoreResume()
|
||||
|
||||
# If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork
|
||||
@ -303,14 +306,14 @@ proc setupProtocols(
|
||||
let shards = confShards & autoShards
|
||||
|
||||
if conf.relay:
|
||||
let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr:
|
||||
return err("failed to parse 'max-num-bytes-msg-size' param: " & $error)
|
||||
|
||||
debug "Setting max message size", num_bytes = parsedMaxMsgSize
|
||||
debug "Setting max message size", num_bytes = conf.maxMessageSizeBytes
|
||||
|
||||
try:
|
||||
await mountRelay(
|
||||
node, shards, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize)
|
||||
node,
|
||||
shards,
|
||||
peerExchangeHandler = peerExchangeHandler,
|
||||
int(conf.maxMessageSizeBytes),
|
||||
)
|
||||
except CatchableError:
|
||||
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())
|
||||
@ -337,18 +340,19 @@ proc setupProtocols(
|
||||
except CatchableError:
|
||||
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())
|
||||
|
||||
if conf.rlnRelay:
|
||||
if conf.rlnRelayConf.isSome:
|
||||
let rlnRelayConf = conf.rlnRelayConf.get()
|
||||
let rlnConf = WakuRlnConfig(
|
||||
rlnRelayDynamic: conf.rlnRelayDynamic,
|
||||
rlnRelayCredIndex: conf.rlnRelayCredIndex,
|
||||
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
|
||||
rlnRelayChainId: conf.rlnRelayChainId,
|
||||
rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress),
|
||||
rlnRelayCredPath: conf.rlnRelayCredPath,
|
||||
rlnRelayCredPassword: conf.rlnRelayCredPassword,
|
||||
rlnRelayTreePath: conf.rlnRelayTreePath,
|
||||
rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit,
|
||||
rlnEpochSizeSec: conf.rlnEpochSizeSec,
|
||||
rlnRelayConfDynamic: rlnRelayConf.dynamic,
|
||||
rlnRelayCredIndex: rlnRelayConf.credIndex,
|
||||
rlnRelayEthContractAddress: rlnRelayConf.ethContractAddress.string,
|
||||
rlnRelayChainId: rlnRelayConf.chainId,
|
||||
rlnRelayEthClientAddress: string(rlnRelayConf.ethClientAddress),
|
||||
rlnRelayCredPath: rlnRelayConf.credPath,
|
||||
rlnRelayCredPassword: rlnRelayConf.credPassword,
|
||||
rlnRelayTreePath: rlnRelayConf.treePath,
|
||||
rlnRelayUserMessageLimit: rlnRelayConf.userMessageLimit,
|
||||
rlnEpochSizeSec: rlnRelayConf.epochSizeSec,
|
||||
onFatalErrorAction: onFatalErrorAction,
|
||||
)
|
||||
|
||||
@ -358,7 +362,7 @@ proc setupProtocols(
|
||||
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())
|
||||
|
||||
# NOTE Must be mounted after relay
|
||||
if conf.lightpush:
|
||||
if conf.lightPush:
|
||||
try:
|
||||
await mountLightPush(node, node.rateLimitSettings.getSetting(LIGHTPUSH))
|
||||
await mountLegacyLightPush(node, node.rateLimitSettings.getSetting(LIGHTPUSH))
|
||||
|
||||
@ -13,7 +13,7 @@ import
|
||||
|
||||
const MessageWindowInSec = 5 * 60 # +- 5 minutes
|
||||
|
||||
import ./external_config, ../waku_relay/protocol, ../waku_core
|
||||
import ./waku_conf, ../waku_relay/protocol, ../waku_core
|
||||
|
||||
declarePublicCounter waku_msg_validator_signed_outcome,
|
||||
"number of messages for each validation outcome", ["result"]
|
||||
|
||||
@ -3,6 +3,7 @@ import
|
||||
chronicles,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/multiaddress,
|
||||
secp256k1,
|
||||
results
|
||||
|
||||
import ../common/logging
|
||||
@ -17,6 +18,11 @@ type
|
||||
NatStrategy* = distinct string
|
||||
DomainName* = distinct string
|
||||
|
||||
# TODO: should be defined in validator_signed.nim and imported here
|
||||
type ProtectedShard* = object
|
||||
shard*: uint16
|
||||
key*: secp256k1.SkPublicKey
|
||||
|
||||
# TODO: this should come from discv5 discovery module
|
||||
type Discv5Conf* = ref object
|
||||
# TODO: This should probably be an option on the builder
|
||||
@ -25,13 +31,20 @@ type Discv5Conf* = ref object
|
||||
bootstrapNodes*: seq[TextEnr]
|
||||
udpPort*: Port
|
||||
|
||||
type Storeconf* = ref object
|
||||
type StoreServiceConf* = ref object
|
||||
legacy*: bool
|
||||
dbURl*: string
|
||||
dbVacuum*: bool
|
||||
dbMigration*: bool
|
||||
maxNumDbConnections*: int
|
||||
retentionPolicy*: string
|
||||
resume*: bool
|
||||
|
||||
# TODO: this should come from RLN relay module
|
||||
type RlnRelayConf* = ref object
|
||||
ethContractAddress*: ContractAddress
|
||||
chainId*: uint
|
||||
credIndex*: Option[uint]
|
||||
dynamic*: bool
|
||||
bandwidthThreshold*: int
|
||||
epochSizeSec*: uint64
|
||||
@ -54,8 +67,12 @@ type WakuConf* = ref object
|
||||
nodeKey*: PrivateKey
|
||||
|
||||
clusterId*: uint16
|
||||
numShardsInNetwork*: uint32
|
||||
shards*: seq[uint16]
|
||||
protectedShards*: seq[ProtectedShard]
|
||||
|
||||
#TODO: move to an autoShardingConf
|
||||
numShardsInNetwork*: uint32
|
||||
contentTopics*: seq[string]
|
||||
|
||||
relay*: bool
|
||||
filter*: bool
|
||||
@ -64,13 +81,17 @@ type WakuConf* = ref object
|
||||
storeSync*: bool
|
||||
# TODO: remove relay peer exchange
|
||||
relayPeerExchange*: bool
|
||||
rendezvous*: bool
|
||||
|
||||
discv5Conf*: Option[Discv5Conf]
|
||||
|
||||
storeConf*: Option[StoreConf]
|
||||
storeServiceConf*: Option[StoreServiceConf]
|
||||
|
||||
rlnRelayConf*: Option[RlnRelayConf]
|
||||
|
||||
#TODO: could probably make it a `PeerRemoteInfo` here
|
||||
remoteStoreNode*: Option[string]
|
||||
|
||||
maxMessageSizeBytes*: int
|
||||
|
||||
logLevel*: logging.LogLevel
|
||||
@ -109,7 +130,7 @@ proc log*(conf: WakuConf) =
|
||||
info "Configuration: Enabled protocols",
|
||||
relay = conf.relay,
|
||||
rlnRelay = conf.rlnRelayConf.isSome,
|
||||
store = conf.storeConf.isSome,
|
||||
store = conf.storeServiceConf.isSome,
|
||||
filter = conf.filter,
|
||||
lightPush = conf.lightPush,
|
||||
peerExchange = conf.peerExchange
|
||||
@ -148,13 +169,17 @@ proc validateShards(wakuConf: WakuConf): Result[void, string] =
|
||||
return ok()
|
||||
|
||||
proc validateNoEmptyStrings(wakuConf: WakuConf): Result[void, string] =
|
||||
if wakuConf.dns4DomainName.isSome():
|
||||
if isEmptyOrWhiteSpace(wakuConf.dns4DomainName.get().string):
|
||||
return err("dns4DomainName is an empty string, set it to none(string) instead")
|
||||
if wakuConf.dns4DomainName.isSome and
|
||||
isEmptyOrWhiteSpace(wakuConf.dns4DomainName.get().string):
|
||||
return err("dns4DomainName is an empty string, set it to none(string) instead")
|
||||
|
||||
if isEmptyOrWhiteSpace(wakuConf.relayServiceRatio):
|
||||
return err("relayServiceRatio is an empty string")
|
||||
|
||||
if wakuConf.remoteStoreNode.isSome and
|
||||
isEmptyOrWhiteSpace(wakuConf.remoteStoreNode.get()):
|
||||
return err("store node is an empty string, set it to none(string) instead")
|
||||
|
||||
return ok()
|
||||
|
||||
proc validate*(wakuConf: WakuConf): Result[void, string] =
|
||||
|
||||
@ -32,11 +32,39 @@ proc generateWithProc(builderType, argName, argType, targetType: NimNode): NimNo
|
||||
proc `procName`*(`builderIdent`: var `builderType`, `resVar`: `argType`) =
|
||||
`builderVar` = some(`argName`.`targetType`)
|
||||
|
||||
## A simple macro to set a property on the builder.
|
||||
## For example:
|
||||
##
|
||||
## ```
|
||||
## with(RlnRelayConfbuilder, rlnRelay, bool)
|
||||
## ```
|
||||
##
|
||||
## Generates
|
||||
##
|
||||
## ```
|
||||
## proc withRlnRelay*(builder: var RlnRelayConfBuilder, rlnRelay: bool) =
|
||||
## builder.rlnRelay = some(rlnRelay)
|
||||
## ```
|
||||
macro with(
|
||||
builderType: untyped, argName: untyped, argType: untyped, targetType: untyped
|
||||
) =
|
||||
result = generateWithProc(builderType, argName, argType, targetType)
|
||||
|
||||
## A simple macro to set a property on the builder, and convert the property
|
||||
## to the right (distinct) type.
|
||||
##
|
||||
## For example:
|
||||
##
|
||||
## ```
|
||||
## with(RlnRelayConfBuilder, ethContractAddress, string, ContractAddress)
|
||||
## ```
|
||||
##
|
||||
## Generates
|
||||
##
|
||||
## ```
|
||||
## proc withRlnRelay*(builder: var RlnRelayConfBuilder, ethContractAddress: string) =
|
||||
## builder.ethContractAddress = some(ethContractAddress.ContractAddress)
|
||||
## ```
|
||||
macro with(builderType: untyped, argName: untyped, argType: untyped) =
|
||||
result = generateWithProc(builderType, argName, argType, argType)
|
||||
|
||||
@ -47,6 +75,7 @@ type RlnRelayConfBuilder = ref object
|
||||
rlnRelay: Option[bool]
|
||||
ethContractAddress: Option[ContractAddress]
|
||||
chainId: Option[uint]
|
||||
credIndex: Option[uint]
|
||||
dynamic: Option[bool]
|
||||
bandwidthThreshold: Option[int]
|
||||
epochSizeSec: Option[uint64]
|
||||
@ -58,6 +87,7 @@ proc init*(T: type RlnRelayConfBuilder): RlnRelayConfBuilder =
|
||||
|
||||
with(RlnRelayConfbuilder, rlnRelay, bool)
|
||||
with(RlnRelayConfBuilder, chainId, uint)
|
||||
with(RlnRelayConfBuilder, credIndex, uint)
|
||||
with(RlnRelayConfBuilder, dynamic, bool)
|
||||
with(RlnRelayConfBuilder, bandwidthThreshold, int)
|
||||
with(RlnRelayConfBuilder, epochSizeSec, uint64)
|
||||
@ -116,6 +146,7 @@ proc build*(builder: RlnRelayConfBuilder): Result[Option[RlnRelayConf], string]
|
||||
some(
|
||||
RlnRelayConf(
|
||||
chainId: chainId,
|
||||
credIndex: credIndex,
|
||||
dynamic: dynamic,
|
||||
ethContractAddress: ethContractAddress,
|
||||
epochSizeSec: epochSizeSec,
|
||||
@ -128,21 +159,21 @@ proc build*(builder: RlnRelayConfBuilder): Result[Option[RlnRelayConf], string]
|
||||
###########################
|
||||
## Store Config Builder ##
|
||||
###########################
|
||||
type StoreConfBuilder = ref object
|
||||
type StoreServiceConfBuilder = ref object
|
||||
store: Option[bool]
|
||||
legacy: Option[bool]
|
||||
|
||||
proc init(T: type StoreConfBuilder): StoreConfBuilder =
|
||||
StoreConfBuilder()
|
||||
proc init(T: type StoreServiceConfBuilder): StoreServiceConfBuilder =
|
||||
StoreServiceConfBuilder()
|
||||
|
||||
with(StoreConfBuilder, store, bool)
|
||||
with(StoreConfBuilder, legacy, bool)
|
||||
with(StoreServiceConfBuilder, store, bool)
|
||||
with(StoreServiceConfBuilder, legacy, bool)
|
||||
|
||||
proc build(builder: StoreConfBuilder): Result[Option[StoreConf], string] =
|
||||
proc build(builder: StoreServiceConfBuilder): Result[Option[StoreServiceConf], string] =
|
||||
if builder.store.get(false):
|
||||
return ok(none(StoreConf))
|
||||
return ok(none(StoreServiceConf))
|
||||
|
||||
return ok(some(StoreConf(legacy: builder.legacy.get(true))))
|
||||
return ok(some(StoreServiceConf(legacy: builder.legacy.get(true))))
|
||||
|
||||
###########################
|
||||
## Discv5 Config Builder ##
|
||||
@ -261,7 +292,7 @@ type WakuConfBuilder* = ref object
|
||||
|
||||
clusterConf: Option[ClusterConf]
|
||||
|
||||
storeConf: StoreConfBuilder
|
||||
storeServiceConf: StoreServiceConfBuilder
|
||||
rlnRelayConf*: RlnRelayConfBuilder
|
||||
|
||||
maxMessageSizeBytes: Option[int]
|
||||
@ -299,7 +330,7 @@ type WakuConfBuilder* = ref object
|
||||
|
||||
proc init*(T: type WakuConfBuilder): WakuConfBuilder =
|
||||
WakuConfBuilder(
|
||||
storeConf: StoreConfBuilder.init(),
|
||||
storeServiceConf: StoreServiceConfBuilder.init(),
|
||||
rlnRelayConf: RlnRelayConfBuilder.init(),
|
||||
discv5Conf: Discv5ConfBuilder.init(),
|
||||
webSocketConf: WebSocketConfBuilder.init(),
|
||||
@ -501,7 +532,7 @@ proc build*(
|
||||
let discv5Conf = builder.discv5Conf.build().valueOr:
|
||||
return err("Discv5 Conf building failed: " & $error)
|
||||
|
||||
let storeConf = builder.storeConf.build().valueOr:
|
||||
let storeServiceConf = builder.storeServiceConf.build().valueOr:
|
||||
return err("Store Conf building failed: " & $error)
|
||||
|
||||
let rlnRelayConf = builder.rlnRelayConf.build().valueOr:
|
||||
@ -630,12 +661,15 @@ proc build*(
|
||||
nodeKey: nodeKey,
|
||||
clusterId: clusterId,
|
||||
numShardsInNetwork: numShardsInNetwork,
|
||||
contentTopics: contentTopics,
|
||||
shards: shards,
|
||||
protectedShards: protectedShards,
|
||||
relay: relay,
|
||||
filter: filter,
|
||||
lightPush: lightPush,
|
||||
peerExchange: peerExchange,
|
||||
storeConf: storeConf,
|
||||
rendezvous: rendezvous,
|
||||
storeServiceConf: storeServiceConf,
|
||||
relayPeerExchange: relayPeerExchange,
|
||||
discv5Conf: discv5Conf,
|
||||
rlnRelayConf: rlnRelayConf,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user