mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-05-13 05:49:29 +00:00
renaming & key fixes
This commit is contained in:
parent
01533189b1
commit
c1e0b33088
@ -24,7 +24,7 @@ const DefaultKademliaDiscoveryInterval* = chronos.seconds(10)
|
||||
type WakuKademlia* = ref object
|
||||
protocol*: KademliaDiscovery
|
||||
peerManager: PeerManager
|
||||
intervalFut: Future[void]
|
||||
walkIntervalFut: Future[void]
|
||||
|
||||
proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
debug "processing kademlia record",
|
||||
@ -66,7 +66,7 @@ proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
)
|
||||
|
||||
proc runDiscoveryLoop(
|
||||
wk: WakuKademlia, interval: Duration
|
||||
self: WakuKademlia, interval: Duration
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
info "kademlia discovery loop started", interval = interval
|
||||
|
||||
@ -74,7 +74,7 @@ proc runDiscoveryLoop(
|
||||
await sleepAsync(interval)
|
||||
|
||||
let res = catch:
|
||||
await wk.protocol.randomRecords()
|
||||
await self.protocol.randomRecords()
|
||||
let records = res.valueOr:
|
||||
error "kademlia discovery lookup failed", error = res.error.msg
|
||||
continue
|
||||
@ -83,15 +83,13 @@ proc runDiscoveryLoop(
|
||||
let peerInfo = toRemotePeerInfo(record).valueOr:
|
||||
continue
|
||||
|
||||
wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
|
||||
debug "peer added via kademlia discovery",
|
||||
debug "peer added via random walk",
|
||||
peerId = $peerInfo.peerId,
|
||||
addresses = peerInfo.addrs.mapIt($it),
|
||||
protocols = peerInfo.protocols
|
||||
|
||||
#TODO peer added metric
|
||||
|
||||
proc lookup*(
|
||||
self: WakuKademlia, codec: string
|
||||
): Future[seq[RemotePeerInfo]] {.async: (raises: []).} =
|
||||
@ -114,13 +112,12 @@ proc lookup*(
|
||||
|
||||
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
|
||||
debug "peer added via kademlia discovery",
|
||||
debug "peer added via service discovery",
|
||||
service = codec,
|
||||
peerId = $peerInfo.peerId,
|
||||
addresses = peerInfo.addrs.mapIt($it),
|
||||
protocols = peerInfo.protocols
|
||||
|
||||
#TODO peer added metric
|
||||
|
||||
peerInfos.add(peerInfo)
|
||||
|
||||
return peerInfos
|
||||
@ -152,26 +149,28 @@ proc start*(
|
||||
self: WakuKademlia, interval: Duration = DefaultKademliaDiscoveryInterval
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if self.protocol.started:
|
||||
warn "Starting kad-disco twice"
|
||||
warn "Starting waku kad twice"
|
||||
return
|
||||
|
||||
info "Starting Waku Kademlia"
|
||||
|
||||
await self.protocol.start()
|
||||
|
||||
self.intervalFut = self.runDiscoveryLoop(interval)
|
||||
self.walkIntervalFut = self.runDiscoveryLoop(interval)
|
||||
|
||||
info "kademlia discovery started"
|
||||
info "Waku Kademlia Started"
|
||||
|
||||
proc stop*(self: WakuKademlia) {.async: (raises: []).} =
|
||||
if not self.protocol.started:
|
||||
return
|
||||
|
||||
info "Stopping kademlia discovery"
|
||||
info "Stopping Waku Kademlia"
|
||||
|
||||
if not self.intervalFut.isNil():
|
||||
self.intervalFut.cancelSoon()
|
||||
self.intervalFut = nil
|
||||
if not self.walkIntervalFut.isNil():
|
||||
self.walkIntervalFut.cancelSoon()
|
||||
self.walkIntervalFut = nil
|
||||
|
||||
if not self.protocol.isNil():
|
||||
await self.protocol.stop()
|
||||
|
||||
info "Successfully stopped kademlia discovery"
|
||||
info "Successfully stopped Waku Kademlia"
|
||||
|
||||
@ -174,7 +174,7 @@ proc setupProtocols(
|
||||
if conf.mixConf.isSome():
|
||||
let mixConf = conf.mixConf.get()
|
||||
|
||||
let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixKey))
|
||||
let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixPubKey))
|
||||
providedServices.add(mixService)
|
||||
|
||||
(await node.mountMix(mixConf.mixKey)).isOkOr:
|
||||
|
||||
@ -36,6 +36,12 @@ type WakuMix* = ref object of MixProtocol
|
||||
maintenanceIntervalFut: Future[void]
|
||||
wakuKademlia: WakuKademlia
|
||||
|
||||
proc poolSize*(self: WakuMix): int =
|
||||
if self.nodePool.isNil():
|
||||
0
|
||||
else:
|
||||
self.nodePool.len()
|
||||
|
||||
proc mixPoolMaintenance(
|
||||
self: WakuMix, interval: Duration
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
@ -44,8 +50,7 @@ proc mixPoolMaintenance(
|
||||
while true:
|
||||
await sleepAsync(interval)
|
||||
|
||||
# Update current pool size from nodePool
|
||||
self.currentMixPoolSize = self.nodePool.len()
|
||||
self.currentMixPoolSize = self.poolSize()
|
||||
mix_pool_size.set(self.currentMixPoolSize.int64)
|
||||
|
||||
if self.currentMixPoolSize >= self.targetMixPoolSize:
|
||||
@ -61,8 +66,7 @@ proc mixPoolMaintenance(
|
||||
|
||||
let mixPeers = await self.wakuKademlia.lookup(MixProtocolID)
|
||||
|
||||
# Pool size will be updated on next iteration
|
||||
info "mix peer discovery completed", discoveredPeers = mixPeers.len
|
||||
debug "mix peer discovery completed", discoveredPeers = mixPeers.len
|
||||
|
||||
proc new*(
|
||||
T: typedesc[WakuMix],
|
||||
@ -106,12 +110,6 @@ proc new*(
|
||||
proc setKademlia*(self: WakuMix, wakuKademlia: WakuKademlia) =
|
||||
self.wakuKademlia = wakuKademlia
|
||||
|
||||
proc poolSize*(self: WakuMix): int =
|
||||
if self.nodePool.isNil():
|
||||
0
|
||||
else:
|
||||
self.nodePool.len()
|
||||
|
||||
method start*(self: WakuMix) {.async.} =
|
||||
if self.started:
|
||||
warn "Starting Waku Mix twice"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user