diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim index 51baf7806..ad6b6e87a 100644 --- a/apps/chat2mix/chat2mix.nim +++ b/apps/chat2mix/chat2mix.nim @@ -6,7 +6,7 @@ when not (compileOption("threads")): {.push raises: [].} -import std/[strformat, strutils, times, options, random, sequtils] +import std/[strformat, strutils, times, options, random, sequtils, sets] import confutils, chronicles, @@ -563,7 +563,7 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} = var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])] for nodeStr in conf.kadBootstrapNodes: let (peerId, ma) = block: - parseFullAddress(nodeStr).isOkOr: + parseFullAddress(nodeStr).valueOr: error "Failed to parse kademlia bootstrap node", node = nodeStr, error continue @@ -573,7 +573,7 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} = node.mountKademlia( KademliaDiscoveryConf( bootstrapNodes: kadBootstrapPeers, - servicesToDiscover: @[MixProtocolID], + servicesToDiscover: toHashSet(@[MixProtocolID]), randomLookupInterval: chronos.seconds(60), serviceLookupInterval: chronos.seconds(60), kadDhtConfig: KadDHTConfig.new(), diff --git a/logos_delivery/waku/factory/node_factory.nim b/logos_delivery/waku/factory/node_factory.nim index 30e37850a..e01a7e1be 100644 --- a/logos_delivery/waku/factory/node_factory.nim +++ b/logos_delivery/waku/factory/node_factory.nim @@ -183,17 +183,6 @@ proc setupProtocols( node.mountKademlia(kadConf).isOkOr: return err("failed to setup service discovery: " & error) - # Register ServicePeersRequest provider - ServicePeersRequest.setProvider( - node.brokerCtx, - proc(serviceId: string): Future[Result[ServicePeersRequest, string]] {.async.} = - let peers = (await node.wakuKademlia.lookupServicePeers(serviceId)).valueOr: - return err("failed call to lookupServicePeers: " & error) - return ok(ServicePeersRequest(serviceId: serviceId, peers: peers)), - ).isOkOr: - error "Can't set provider for ServicePeersRequest", error = error - return err("Can't set provider for ServicePeersRequest: " & error) - if conf.storeServiceConf.isSome(): let storeServiceConf = conf.storeServiceConf.get() diff --git a/logos_delivery/waku/node/waku_node.nim b/logos_delivery/waku/node/waku_node.nim index 74052cd62..fc5f3f402 100644 --- a/logos_delivery/waku/node/waku_node.nim +++ b/logos_delivery/waku/node/waku_node.nim @@ -62,6 +62,7 @@ import events/health_events, events/message_events, events/peer_events, + events/discovery_events, ], logos_delivery/waku/discovery/waku_kademlia, logos_delivery/waku/net/[bound_ports, net_config], @@ -123,6 +124,7 @@ type libp2pPing*: Ping rng*: crypto.Rng brokerCtx*: BrokerContext + mixTopUpLoop: Future[void] wakuRendezvous*: WakuRendezVous wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient announcedAddresses*: seq[MultiAddress] @@ -394,6 +396,26 @@ proc mountKademlia*( return ok() +proc runServicePeerTopUp( + node: WakuNode, + serviceId: string, + target: int, + currentCount: proc(): int {.gcsafe, raises: [].}, + interval: Duration, +) {.async.} = + ## Adaptive service-peer discovery: while the number of usable providers for + ## `serviceId` is below `target`, pull more through the broker. The registered + ## ServicePeersRequest provider (kademlia) performs the lookup and fills the + ## pool. Generic — the caller supplies the service id, target and count getter. + debug "service peer top-up loop started", serviceId, target, interval = $interval + while true: + await sleepAsync(interval) + if currentCount() >= target: + continue + let res = await ServicePeersRequest.request(node.brokerCtx, serviceId) + if res.isErr: + debug "service peer top-up request failed", serviceId, error = res.error + ## Waku Sync proc mountStoreSync*( @@ -620,10 +642,22 @@ proc startProvidersAndListeners*(node: WakuNode) = ).isOkOr: error "Can't set provider for RequestContentTopicsHealth", error = error + # Service-peer lookups are answered by kademlia; register only when it's mounted. + if not node.wakuKademlia.isNil(): + ServicePeersRequest.setProvider( + node.brokerCtx, + proc(serviceId: string): Future[Result[ServicePeersRequest, string]] {.async.} = + let peers = (await node.wakuKademlia.lookupServicePeers(serviceId)).valueOr: + return err("failed call to lookupServicePeers: " & error) + return ok(ServicePeersRequest(serviceId: serviceId, peers: peers)), + ).isOkOr: + error "Can't set provider for ServicePeersRequest", error = error + proc stopProvidersAndListeners*(node: WakuNode) = RequestRelayShard.clearProvider(node.brokerCtx) RequestContentTopicsHealth.clearProvider(node.brokerCtx) RequestShardTopicsHealth.clearProvider(node.brokerCtx) + ServicePeersRequest.clearProvider(node.brokerCtx) proc start*(node: WakuNode) {.async.} = ## Starts a created Waku Node and @@ -672,6 +706,20 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuKademlia.isNil(): await node.wakuKademlia.start() + # Keep the mix pool filled: the generic service-peer top-up wired with + # mix's service id, target pool size and current pool-size getter. + if not node.wakuMix.isNil(): + node.mixTopUpLoop = node.runServicePeerTopUp( + MixProtocolID, + minMixPoolSize, + proc(): int {.gcsafe, raises: [].} = + if node.wakuMix.isNil(): + 0 + else: + node.getMixNodePoolSize(), + chronos.seconds(5), + ) + if not node.wakuFilterClient.isNil(): node.wakuFilterClient.registerPushHandler( proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = @@ -698,6 +746,10 @@ proc stop*(node: WakuNode) {.async.} = node.stopProvidersAndListeners() + if not node.mixTopUpLoop.isNil(): + await node.mixTopUpLoop.cancelAndWait() + node.mixTopUpLoop = nil + if not node.wakuKademlia.isNil(): await node.wakuKademlia.stop() diff --git a/logos_delivery/waku/waku_mix/protocol.nim b/logos_delivery/waku/waku_mix/protocol.nim index 5776f3eef..1026de2a1 100644 --- a/logos_delivery/waku/waku_mix/protocol.nim +++ b/logos_delivery/waku/waku_mix/protocol.nim @@ -27,7 +27,7 @@ import logScope: topics = "waku mix" -const minMixPoolSize = 4 +const minMixPoolSize* = 4 type PublishMessage* = proc(message: WakuMessage): Future[Result[void, string]] {.