mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-06-27 20:10:02 +00:00
feat(mix): adaptive service-peer discovery top-up via broker
Add a generic runServicePeerTopUp loop in waku_node that, while a service's provider count is below target, pulls more peers through the ServicePeersRequest broker; wired for mix in WakuNode.start(). Move the ServicePeersRequest provider registration into the startProvidersAndListeners/stopProvidersAndListeners lifecycle (gated on kademlia) so chat2mix gets it too, keeping WakuKademlia generic. Also fix pre-existing chat2mix build errors (toHashSet, valueOr, std/sets import).
This commit is contained in:
parent
920c2a9ad7
commit
eea07b1d47
@ -6,7 +6,7 @@ when not (compileOption("threads")):
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[strformat, strutils, times, options, random, sequtils]
|
||||
import std/[strformat, strutils, times, options, random, sequtils, sets]
|
||||
import
|
||||
confutils,
|
||||
chronicles,
|
||||
@ -563,7 +563,7 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} =
|
||||
var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])]
|
||||
for nodeStr in conf.kadBootstrapNodes:
|
||||
let (peerId, ma) = block:
|
||||
parseFullAddress(nodeStr).isOkOr:
|
||||
parseFullAddress(nodeStr).valueOr:
|
||||
error "Failed to parse kademlia bootstrap node", node = nodeStr, error
|
||||
continue
|
||||
|
||||
@ -573,7 +573,7 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} =
|
||||
node.mountKademlia(
|
||||
KademliaDiscoveryConf(
|
||||
bootstrapNodes: kadBootstrapPeers,
|
||||
servicesToDiscover: @[MixProtocolID],
|
||||
servicesToDiscover: toHashSet(@[MixProtocolID]),
|
||||
randomLookupInterval: chronos.seconds(60),
|
||||
serviceLookupInterval: chronos.seconds(60),
|
||||
kadDhtConfig: KadDHTConfig.new(),
|
||||
|
||||
@ -183,17 +183,6 @@ proc setupProtocols(
|
||||
node.mountKademlia(kadConf).isOkOr:
|
||||
return err("failed to setup service discovery: " & error)
|
||||
|
||||
# Register ServicePeersRequest provider
|
||||
ServicePeersRequest.setProvider(
|
||||
node.brokerCtx,
|
||||
proc(serviceId: string): Future[Result[ServicePeersRequest, string]] {.async.} =
|
||||
let peers = (await node.wakuKademlia.lookupServicePeers(serviceId)).valueOr:
|
||||
return err("failed call to lookupServicePeers: " & error)
|
||||
return ok(ServicePeersRequest(serviceId: serviceId, peers: peers)),
|
||||
).isOkOr:
|
||||
error "Can't set provider for ServicePeersRequest", error = error
|
||||
return err("Can't set provider for ServicePeersRequest: " & error)
|
||||
|
||||
if conf.storeServiceConf.isSome():
|
||||
let storeServiceConf = conf.storeServiceConf.get()
|
||||
|
||||
|
||||
@ -62,6 +62,7 @@ import
|
||||
events/health_events,
|
||||
events/message_events,
|
||||
events/peer_events,
|
||||
events/discovery_events,
|
||||
],
|
||||
logos_delivery/waku/discovery/waku_kademlia,
|
||||
logos_delivery/waku/net/[bound_ports, net_config],
|
||||
@ -123,6 +124,7 @@ type
|
||||
libp2pPing*: Ping
|
||||
rng*: crypto.Rng
|
||||
brokerCtx*: BrokerContext
|
||||
mixTopUpLoop: Future[void]
|
||||
wakuRendezvous*: WakuRendezVous
|
||||
wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient
|
||||
announcedAddresses*: seq[MultiAddress]
|
||||
@ -394,6 +396,26 @@ proc mountKademlia*(
|
||||
|
||||
return ok()
|
||||
|
||||
proc runServicePeerTopUp(
|
||||
node: WakuNode,
|
||||
serviceId: string,
|
||||
target: int,
|
||||
currentCount: proc(): int {.gcsafe, raises: [].},
|
||||
interval: Duration,
|
||||
) {.async.} =
|
||||
## Adaptive service-peer discovery: while the number of usable providers for
|
||||
## `serviceId` is below `target`, pull more through the broker. The registered
|
||||
## ServicePeersRequest provider (kademlia) performs the lookup and fills the
|
||||
## pool. Generic — the caller supplies the service id, target and count getter.
|
||||
debug "service peer top-up loop started", serviceId, target, interval = $interval
|
||||
while true:
|
||||
await sleepAsync(interval)
|
||||
if currentCount() >= target:
|
||||
continue
|
||||
let res = await ServicePeersRequest.request(node.brokerCtx, serviceId)
|
||||
if res.isErr:
|
||||
debug "service peer top-up request failed", serviceId, error = res.error
|
||||
|
||||
## Waku Sync
|
||||
|
||||
proc mountStoreSync*(
|
||||
@ -620,10 +642,22 @@ proc startProvidersAndListeners*(node: WakuNode) =
|
||||
).isOkOr:
|
||||
error "Can't set provider for RequestContentTopicsHealth", error = error
|
||||
|
||||
# Service-peer lookups are answered by kademlia; register only when it's mounted.
|
||||
if not node.wakuKademlia.isNil():
|
||||
ServicePeersRequest.setProvider(
|
||||
node.brokerCtx,
|
||||
proc(serviceId: string): Future[Result[ServicePeersRequest, string]] {.async.} =
|
||||
let peers = (await node.wakuKademlia.lookupServicePeers(serviceId)).valueOr:
|
||||
return err("failed call to lookupServicePeers: " & error)
|
||||
return ok(ServicePeersRequest(serviceId: serviceId, peers: peers)),
|
||||
).isOkOr:
|
||||
error "Can't set provider for ServicePeersRequest", error = error
|
||||
|
||||
proc stopProvidersAndListeners*(node: WakuNode) =
|
||||
RequestRelayShard.clearProvider(node.brokerCtx)
|
||||
RequestContentTopicsHealth.clearProvider(node.brokerCtx)
|
||||
RequestShardTopicsHealth.clearProvider(node.brokerCtx)
|
||||
ServicePeersRequest.clearProvider(node.brokerCtx)
|
||||
|
||||
proc start*(node: WakuNode) {.async.} =
|
||||
## Starts a created Waku Node and
|
||||
@ -672,6 +706,20 @@ proc start*(node: WakuNode) {.async.} =
|
||||
if not node.wakuKademlia.isNil():
|
||||
await node.wakuKademlia.start()
|
||||
|
||||
# Keep the mix pool filled: the generic service-peer top-up wired with
|
||||
# mix's service id, target pool size and current pool-size getter.
|
||||
if not node.wakuMix.isNil():
|
||||
node.mixTopUpLoop = node.runServicePeerTopUp(
|
||||
MixProtocolID,
|
||||
minMixPoolSize,
|
||||
proc(): int {.gcsafe, raises: [].} =
|
||||
if node.wakuMix.isNil():
|
||||
0
|
||||
else:
|
||||
node.getMixNodePoolSize(),
|
||||
chronos.seconds(5),
|
||||
)
|
||||
|
||||
if not node.wakuFilterClient.isNil():
|
||||
node.wakuFilterClient.registerPushHandler(
|
||||
proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
@ -698,6 +746,10 @@ proc stop*(node: WakuNode) {.async.} =
|
||||
|
||||
node.stopProvidersAndListeners()
|
||||
|
||||
if not node.mixTopUpLoop.isNil():
|
||||
await node.mixTopUpLoop.cancelAndWait()
|
||||
node.mixTopUpLoop = nil
|
||||
|
||||
if not node.wakuKademlia.isNil():
|
||||
await node.wakuKademlia.stop()
|
||||
|
||||
|
||||
@ -27,7 +27,7 @@ import
|
||||
logScope:
|
||||
topics = "waku mix"
|
||||
|
||||
const minMixPoolSize = 4
|
||||
const minMixPoolSize* = 4
|
||||
|
||||
type
|
||||
PublishMessage* = proc(message: WakuMessage): Future[Result[void, string]] {.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user