move mount store before relay and rln relay (#3257)

This is needed to make a quick creation of the messages
table, before the event rln sync kicks off. With that,
we avoid having errors from postgres-exporter (nwaku-compose)
complaining about non-existing messages table.
This commit is contained in:
Ivan FB 2025-01-27 13:12:34 +01:00 committed by GitHub
parent a4d71718a2
commit 998f040fdb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -167,108 +167,12 @@ proc setupProtocols(
node.mountMetadata(conf.clusterId).isOkOr:
return err("failed to mount waku metadata protocol: " & error)
# If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork
let numShardsInNetwork = getNumShardsInNetwork(conf)
if conf.numShardsInNetwork == 0:
warn "Number of shards in network not configured, setting it to",
numShardsInNetwork = $numShardsInNetwork
node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr:
return err("failed to mount waku sharding: " & error)
# Mount relay on all nodes
var peerExchangeHandler = none(RoutingRecordsHandler)
if conf.relayPeerExchange:
proc handlePeerExchange(
peer: PeerId, topic: string, peers: seq[RoutingRecordsPair]
) {.gcsafe.} =
## Handle peers received via gossipsub peer exchange
# TODO: Only consider peers on pubsub topics we subscribe to
let exchangedPeers = peers.filterIt(it.record.isSome())
# only peers with populated records
.mapIt(toRemotePeerInfo(it.record.get()))
debug "adding exchanged peers",
src = peer, topic = topic, numPeers = exchangedPeers.len
for peer in exchangedPeers:
# Peers added are filtered by the peer manager
node.peerManager.addPeer(peer, PeerOrigin.PeerExchange)
peerExchangeHandler = some(handlePeerExchange)
let autoShards = node.getAutoshards(conf.contentTopics).valueOr:
return err("Could not get autoshards: " & error)
debug "Shards created from content topics",
contentTopics = conf.contentTopics, shards = autoShards
let confShards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
let shards = confShards & autoShards
if conf.relay:
let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr:
return err("failed to parse 'max-num-bytes-msg-size' param: " & $error)
debug "Setting max message size", num_bytes = parsedMaxMsgSize
try:
await mountRelay(
node, shards, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize)
)
except CatchableError:
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())
# Add validation keys to protected topics
var subscribedProtectedShards: seq[ProtectedShard]
for shardKey in conf.protectedShards:
if shardKey.shard notin conf.shards:
warn "protected shard not in subscribed shards, skipping adding validator",
protectedShard = shardKey.shard, subscribedShards = shards
continue
subscribedProtectedShards.add(shardKey)
notice "routing only signed traffic",
protectedShard = shardKey.shard, publicKey = shardKey.key
node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId)
# Only relay nodes should be rendezvous points.
if conf.rendezvous:
await node.mountRendezvous()
# Keepalive mounted on all nodes
try:
await mountLibp2pPing(node)
except CatchableError:
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)
if conf.rlnRelay:
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
rlnRelayCredIndex: conf.rlnRelayCredIndex,
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
rlnRelayChainId: conf.rlnRelayChainId,
rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress),
rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit,
rlnEpochSizeSec: conf.rlnEpochSizeSec,
onFatalErrorAction: onFatalErrorAction,
)
try:
waitFor node.mountRlnRelay(rlnConf)
except CatchableError:
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())
if conf.store:
if conf.legacyStore:
let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new(
@ -355,6 +259,102 @@ proc setupProtocols(
if conf.store and conf.storeResume:
node.setupStoreResume()
# If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork
let numShardsInNetwork = getNumShardsInNetwork(conf)
if conf.numShardsInNetwork == 0:
warn "Number of shards in network not configured, setting it to",
numShardsInNetwork = $numShardsInNetwork
node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr:
return err("failed to mount waku sharding: " & error)
# Mount relay on all nodes
var peerExchangeHandler = none(RoutingRecordsHandler)
if conf.relayPeerExchange:
proc handlePeerExchange(
peer: PeerId, topic: string, peers: seq[RoutingRecordsPair]
) {.gcsafe.} =
## Handle peers received via gossipsub peer exchange
# TODO: Only consider peers on pubsub topics we subscribe to
let exchangedPeers = peers.filterIt(it.record.isSome())
# only peers with populated records
.mapIt(toRemotePeerInfo(it.record.get()))
debug "adding exchanged peers",
src = peer, topic = topic, numPeers = exchangedPeers.len
for peer in exchangedPeers:
# Peers added are filtered by the peer manager
node.peerManager.addPeer(peer, PeerOrigin.PeerExchange)
peerExchangeHandler = some(handlePeerExchange)
let autoShards = node.getAutoshards(conf.contentTopics).valueOr:
return err("Could not get autoshards: " & error)
debug "Shards created from content topics",
contentTopics = conf.contentTopics, shards = autoShards
let confShards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
let shards = confShards & autoShards
if conf.relay:
let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr:
return err("failed to parse 'max-num-bytes-msg-size' param: " & $error)
debug "Setting max message size", num_bytes = parsedMaxMsgSize
try:
await mountRelay(
node, shards, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize)
)
except CatchableError:
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())
# Add validation keys to protected topics
var subscribedProtectedShards: seq[ProtectedShard]
for shardKey in conf.protectedShards:
if shardKey.shard notin conf.shards:
warn "protected shard not in subscribed shards, skipping adding validator",
protectedShard = shardKey.shard, subscribedShards = shards
continue
subscribedProtectedShards.add(shardKey)
notice "routing only signed traffic",
protectedShard = shardKey.shard, publicKey = shardKey.key
node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId)
# Only relay nodes should be rendezvous points.
if conf.rendezvous:
await node.mountRendezvous()
# Keepalive mounted on all nodes
try:
await mountLibp2pPing(node)
except CatchableError:
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())
if conf.rlnRelay:
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
rlnRelayCredIndex: conf.rlnRelayCredIndex,
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
rlnRelayChainId: conf.rlnRelayChainId,
rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress),
rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit,
rlnEpochSizeSec: conf.rlnEpochSizeSec,
onFatalErrorAction: onFatalErrorAction,
)
try:
waitFor node.mountRlnRelay(rlnConf)
except CatchableError:
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())
# NOTE Must be mounted after relay
if conf.lightpush:
try: