From 998f040fdbf4ce65bcee5b7c5cc711edba3859ab Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 27 Jan 2025 13:12:34 +0100 Subject: [PATCH] move mount store before relay and rln relay (#3257) This is needed to make a quick creation of the messages table, before the event rln sync kicks off. With that, we avoid having errors from postgres-exporter (nwaku-compose) complaining about non-existing messages table. --- waku/factory/node_factory.nim | 192 +++++++++++++++++----------------- 1 file changed, 96 insertions(+), 96 deletions(-) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 852ec460d..625f1a77b 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -167,108 +167,12 @@ proc setupProtocols( node.mountMetadata(conf.clusterId).isOkOr: return err("failed to mount waku metadata protocol: " & error) - # If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork - let numShardsInNetwork = getNumShardsInNetwork(conf) - - if conf.numShardsInNetwork == 0: - warn "Number of shards in network not configured, setting it to", - numShardsInNetwork = $numShardsInNetwork - - node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr: - return err("failed to mount waku sharding: " & error) - - # Mount relay on all nodes - var peerExchangeHandler = none(RoutingRecordsHandler) - if conf.relayPeerExchange: - proc handlePeerExchange( - peer: PeerId, topic: string, peers: seq[RoutingRecordsPair] - ) {.gcsafe.} = - ## Handle peers received via gossipsub peer exchange - # TODO: Only consider peers on pubsub topics we subscribe to - let exchangedPeers = peers.filterIt(it.record.isSome()) - # only peers with populated records - .mapIt(toRemotePeerInfo(it.record.get())) - - debug "adding exchanged peers", - src = peer, topic = topic, numPeers = exchangedPeers.len - - for peer in exchangedPeers: - # Peers added are filtered by the peer manager - node.peerManager.addPeer(peer, PeerOrigin.PeerExchange) - - peerExchangeHandler = some(handlePeerExchange) - - let autoShards = node.getAutoshards(conf.contentTopics).valueOr: - return err("Could not get autoshards: " & error) - - debug "Shards created from content topics", - contentTopics = conf.contentTopics, shards = autoShards - - let confShards = - conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) - let shards = confShards & autoShards - - if conf.relay: - let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr: - return err("failed to parse 'max-num-bytes-msg-size' param: " & $error) - - debug "Setting max message size", num_bytes = parsedMaxMsgSize - - try: - await mountRelay( - node, shards, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize) - ) - except CatchableError: - return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) - - # Add validation keys to protected topics - var subscribedProtectedShards: seq[ProtectedShard] - for shardKey in conf.protectedShards: - if shardKey.shard notin conf.shards: - warn "protected shard not in subscribed shards, skipping adding validator", - protectedShard = shardKey.shard, subscribedShards = shards - continue - subscribedProtectedShards.add(shardKey) - notice "routing only signed traffic", - protectedShard = shardKey.shard, publicKey = shardKey.key - node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) - - # Only relay nodes should be rendezvous points. - if conf.rendezvous: - await node.mountRendezvous() - - # Keepalive mounted on all nodes - try: - await mountLibp2pPing(node) - except CatchableError: - return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()) - var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} = ## Action to be taken when an internal error occurs during the node run. ## e.g. the connection with the database is lost and not recovered. error "Unrecoverable error occurred", error = msg quit(QuitFailure) - if conf.rlnRelay: - let rlnConf = WakuRlnConfig( - rlnRelayDynamic: conf.rlnRelayDynamic, - rlnRelayCredIndex: conf.rlnRelayCredIndex, - rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, - rlnRelayChainId: conf.rlnRelayChainId, - rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress), - rlnRelayCredPath: conf.rlnRelayCredPath, - rlnRelayCredPassword: conf.rlnRelayCredPassword, - rlnRelayTreePath: conf.rlnRelayTreePath, - rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit, - rlnEpochSizeSec: conf.rlnEpochSizeSec, - onFatalErrorAction: onFatalErrorAction, - ) - - try: - waitFor node.mountRlnRelay(rlnConf) - except CatchableError: - return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg()) - if conf.store: if conf.legacyStore: let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new( @@ -355,6 +259,102 @@ proc setupProtocols( if conf.store and conf.storeResume: node.setupStoreResume() + # If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork + let numShardsInNetwork = getNumShardsInNetwork(conf) + + if conf.numShardsInNetwork == 0: + warn "Number of shards in network not configured, setting it to", + numShardsInNetwork = $numShardsInNetwork + + node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr: + return err("failed to mount waku sharding: " & error) + + # Mount relay on all nodes + var peerExchangeHandler = none(RoutingRecordsHandler) + if conf.relayPeerExchange: + proc handlePeerExchange( + peer: PeerId, topic: string, peers: seq[RoutingRecordsPair] + ) {.gcsafe.} = + ## Handle peers received via gossipsub peer exchange + # TODO: Only consider peers on pubsub topics we subscribe to + let exchangedPeers = peers.filterIt(it.record.isSome()) + # only peers with populated records + .mapIt(toRemotePeerInfo(it.record.get())) + + debug "adding exchanged peers", + src = peer, topic = topic, numPeers = exchangedPeers.len + + for peer in exchangedPeers: + # Peers added are filtered by the peer manager + node.peerManager.addPeer(peer, PeerOrigin.PeerExchange) + + peerExchangeHandler = some(handlePeerExchange) + + let autoShards = node.getAutoshards(conf.contentTopics).valueOr: + return err("Could not get autoshards: " & error) + + debug "Shards created from content topics", + contentTopics = conf.contentTopics, shards = autoShards + + let confShards = + conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) + let shards = confShards & autoShards + + if conf.relay: + let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr: + return err("failed to parse 'max-num-bytes-msg-size' param: " & $error) + + debug "Setting max message size", num_bytes = parsedMaxMsgSize + + try: + await mountRelay( + node, shards, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize) + ) + except CatchableError: + return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) + + # Add validation keys to protected topics + var subscribedProtectedShards: seq[ProtectedShard] + for shardKey in conf.protectedShards: + if shardKey.shard notin conf.shards: + warn "protected shard not in subscribed shards, skipping adding validator", + protectedShard = shardKey.shard, subscribedShards = shards + continue + subscribedProtectedShards.add(shardKey) + notice "routing only signed traffic", + protectedShard = shardKey.shard, publicKey = shardKey.key + node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) + + # Only relay nodes should be rendezvous points. + if conf.rendezvous: + await node.mountRendezvous() + + # Keepalive mounted on all nodes + try: + await mountLibp2pPing(node) + except CatchableError: + return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()) + + if conf.rlnRelay: + let rlnConf = WakuRlnConfig( + rlnRelayDynamic: conf.rlnRelayDynamic, + rlnRelayCredIndex: conf.rlnRelayCredIndex, + rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, + rlnRelayChainId: conf.rlnRelayChainId, + rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress), + rlnRelayCredPath: conf.rlnRelayCredPath, + rlnRelayCredPassword: conf.rlnRelayCredPassword, + rlnRelayTreePath: conf.rlnRelayTreePath, + rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit, + rlnEpochSizeSec: conf.rlnEpochSizeSec, + onFatalErrorAction: onFatalErrorAction, + ) + + try: + waitFor node.mountRlnRelay(rlnConf) + except CatchableError: + return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg()) + # NOTE Must be mounted after relay if conf.lightpush: try: