mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-02-12 01:43:13 +00:00
move lookup logic into extended kad disco
This commit is contained in:
parent
31d5591a97
commit
169eac1be8
@ -503,7 +503,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
await node.start()
|
||||
|
||||
node.peerManager.start()
|
||||
node.startExtendedKademliaDiscoveryLoop()
|
||||
node.startExtendedKademliaDiscoveryLoop(minMixPeers = MinMixNodePoolSize)
|
||||
|
||||
await node.mountLibp2pPing()
|
||||
#await node.mountPeerExchangeClient()
|
||||
@ -651,10 +651,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
while node.getMixNodePoolSize() < MinMixNodePoolSize:
|
||||
info "waiting for mix nodes to be discovered",
|
||||
currentpoolSize = node.getMixNodePoolSize()
|
||||
# Try to lookup mix peers via kademlia if pool is low
|
||||
let found = await node.lookupMixPeers()
|
||||
if found > 0:
|
||||
info "found mix peers via kademlia lookup", count = found
|
||||
await sleepAsync(1000)
|
||||
notice "ready to publish with mix node pool size ",
|
||||
currentpoolSize = node.getMixNodePoolSize()
|
||||
|
||||
@ -132,8 +132,45 @@ proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
)
|
||||
)
|
||||
|
||||
proc lookupMixPeers*(node: WakuNode): Future[int] {.async.} =
|
||||
## Lookup mix peers via kademlia and add them to the peer store.
|
||||
## Returns the number of mix peers found and added.
|
||||
if node.wakuKademlia.isNil():
|
||||
warn "cannot lookup mix peers: kademlia not mounted"
|
||||
return 0
|
||||
|
||||
let mixService = ServiceInfo(id: MixProtocolID, data: @[])
|
||||
var records: seq[ExtendedPeerRecord]
|
||||
try:
|
||||
records = await node.wakuKademlia.lookup(mixService)
|
||||
except CatchableError:
|
||||
warn "mix peer lookup failed", error = getCurrentExceptionMsg()
|
||||
return 0
|
||||
|
||||
debug "mix peer lookup returned records", numRecords = records.len
|
||||
|
||||
var added = 0
|
||||
for record in records:
|
||||
let peerOpt = remotePeerInfoFrom(record)
|
||||
if peerOpt.isNone():
|
||||
continue
|
||||
|
||||
let peerInfo = peerOpt.get()
|
||||
if peerInfo.mixPubKey.isNone():
|
||||
continue
|
||||
|
||||
node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
info "mix peer added via kademlia lookup",
|
||||
peerId = $peerInfo.peerId, mixPubKey = peerInfo.mixPubKey.get().toHex()
|
||||
added.inc()
|
||||
|
||||
info "mix peer lookup complete", found = added
|
||||
return added
|
||||
|
||||
proc runExtendedKademliaDiscoveryLoop*(
|
||||
node: WakuNode, interval = DefaultExtendedKademliaDiscoveryInterval
|
||||
node: WakuNode,
|
||||
interval = DefaultExtendedKademliaDiscoveryInterval,
|
||||
minMixPeers: int = 0,
|
||||
): Future[void] {.async.} =
|
||||
info "extended kademlia discovery loop started", interval = interval
|
||||
|
||||
@ -175,6 +212,14 @@ proc runExtendedKademliaDiscoveryLoop*(
|
||||
if added > 0:
|
||||
info "added peers from extended kademlia discovery", count = added
|
||||
|
||||
# Targeted mix peer lookup when pool is low
|
||||
if minMixPeers > 0 and node.getMixNodePoolSize() < minMixPeers:
|
||||
debug "mix node pool below threshold, performing targeted lookup",
|
||||
currentPoolSize = node.getMixNodePoolSize(), threshold = minMixPeers
|
||||
let found = await node.lookupMixPeers()
|
||||
if found > 0:
|
||||
info "found mix peers via targeted kademlia lookup", count = found
|
||||
|
||||
await sleepAsync(interval)
|
||||
except CancelledError:
|
||||
debug "extended kademlia discovery loop cancelled"
|
||||
@ -182,7 +227,9 @@ proc runExtendedKademliaDiscoveryLoop*(
|
||||
error "extended kademlia discovery loop failed", error = e.msg
|
||||
|
||||
proc startExtendedKademliaDiscoveryLoop*(
|
||||
node: WakuNode, interval = DefaultExtendedKademliaDiscoveryInterval
|
||||
node: WakuNode,
|
||||
interval = DefaultExtendedKademliaDiscoveryInterval,
|
||||
minMixPeers: int = 0,
|
||||
) =
|
||||
if node.wakuKademlia.isNil():
|
||||
trace "extended kademlia discovery not started: protocol not mounted"
|
||||
@ -192,39 +239,4 @@ proc startExtendedKademliaDiscoveryLoop*(
|
||||
trace "extended kademlia discovery loop already running"
|
||||
return
|
||||
|
||||
node.kademliaDiscoveryLoop = node.runExtendedKademliaDiscoveryLoop(interval)
|
||||
|
||||
proc lookupMixPeers*(node: WakuNode): Future[int] {.async.} =
|
||||
## Lookup mix peers via kademlia and add them to the peer store.
|
||||
## Returns the number of mix peers found and added.
|
||||
if node.wakuKademlia.isNil():
|
||||
warn "cannot lookup mix peers: kademlia not mounted"
|
||||
return 0
|
||||
|
||||
let mixService = ServiceInfo(id: MixProtocolID, data: @[])
|
||||
var records: seq[ExtendedPeerRecord]
|
||||
try:
|
||||
records = await node.wakuKademlia.lookup(mixService)
|
||||
except CatchableError:
|
||||
warn "mix peer lookup failed", error = getCurrentExceptionMsg()
|
||||
return 0
|
||||
|
||||
debug "mix peer lookup returned records", numRecords = records.len
|
||||
|
||||
var added = 0
|
||||
for record in records:
|
||||
let peerOpt = remotePeerInfoFrom(record)
|
||||
if peerOpt.isNone():
|
||||
continue
|
||||
|
||||
let peerInfo = peerOpt.get()
|
||||
if peerInfo.mixPubKey.isNone():
|
||||
continue
|
||||
|
||||
node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
info "mix peer added via kademlia lookup",
|
||||
peerId = $peerInfo.peerId, mixPubKey = peerInfo.mixPubKey.get().toHex()
|
||||
added.inc()
|
||||
|
||||
info "mix peer lookup complete", found = added
|
||||
return added
|
||||
node.kademliaDiscoveryLoop = node.runExtendedKademliaDiscoveryLoop(interval, minMixPeers)
|
||||
|
||||
@ -496,7 +496,10 @@ proc startNode*(
|
||||
if conf.relay:
|
||||
node.peerManager.start()
|
||||
|
||||
startExtendedKademliaDiscoveryLoop(node)
|
||||
let minMixPeers =
|
||||
if conf.mixConf.isSome(): 4
|
||||
else: 0
|
||||
startExtendedKademliaDiscoveryLoop(node, minMixPeers = minMixPeers)
|
||||
|
||||
return ok()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user