mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-02-13 02:13:12 +00:00
address review comments
This commit is contained in:
parent
e8a7559c33
commit
b943ece443
@ -40,7 +40,7 @@ import
|
||||
waku_lightpush/rpc,
|
||||
waku_enr,
|
||||
discovery/waku_dnsdisc,
|
||||
discovery/waku_ext_kademlia,
|
||||
discovery/waku_kademlia,
|
||||
waku_node,
|
||||
node/waku_metrics,
|
||||
node/peer_manager,
|
||||
@ -485,16 +485,19 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
kadBootstrapPeers.add(parsed)
|
||||
|
||||
if kadBootstrapPeers.len > 0:
|
||||
(
|
||||
await setupExtendedKademliaDiscovery(
|
||||
node,
|
||||
ExtendedKademliaDiscoveryParams(
|
||||
bootstrapNodes: kadBootstrapPeers,
|
||||
mixPubKey: some(mixPubKey),
|
||||
advertiseMix: false,
|
||||
),
|
||||
)
|
||||
).isOkOr:
|
||||
node.wakuKademlia = WakuKademlia.new(
|
||||
node.switch,
|
||||
ExtendedKademliaDiscoveryParams(
|
||||
bootstrapNodes: kadBootstrapPeers,
|
||||
mixPubKey: some(mixPubKey),
|
||||
advertiseMix: false,
|
||||
),
|
||||
node.peerManager,
|
||||
getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} =
|
||||
if node.wakuMix.isNil(): 0 else: node.getMixNodePoolSize(),
|
||||
isNodeStarted = proc(): bool {.gcsafe, raises: [].} =
|
||||
node.started,
|
||||
).valueOr:
|
||||
error "failed to setup kademlia discovery", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
@ -503,7 +506,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
await node.start()
|
||||
|
||||
node.peerManager.start()
|
||||
node.startExtendedKademliaDiscoveryLoop(minMixPeers = MinMixNodePoolSize)
|
||||
if not node.wakuKademlia.isNil():
|
||||
(await node.wakuKademlia.start(minMixPeers = MinMixNodePoolSize)).isOkOr:
|
||||
error "failed to start kademlia discovery", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
await node.mountLibp2pPing()
|
||||
#await node.mountPeerExchangeClient()
|
||||
|
||||
@ -45,9 +45,9 @@ proc waku_lightpush_publish(
|
||||
else:
|
||||
some(PubsubTopic($pubsubTopic))
|
||||
|
||||
discard (await ctx.myLib[].node.lightpushPublish(topic, msg, peerOpt)).valueOr:
|
||||
let messageHash = (await ctx.myLib[].node.lightpushPublish(topic, msg, peerOpt)).valueOr:
|
||||
let errorMsg = error.desc.get($error.code.int)
|
||||
error "PUBLISH failed", error = errorMsg
|
||||
return err(errorMsg)
|
||||
|
||||
return ok("")
|
||||
return ok($messageHash)
|
||||
|
||||
@ -5,14 +5,15 @@ import
|
||||
chronos,
|
||||
chronicles,
|
||||
results,
|
||||
libp2p/[peerid, multiaddress],
|
||||
stew/byteutils,
|
||||
libp2p/[peerid, multiaddress, switch],
|
||||
libp2p/extended_peer_record,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/protocols/[kademlia, kad_disco],
|
||||
libp2p/protocols/kademlia_discovery/types as kad_types,
|
||||
libp2p/protocols/mix/mix_protocol
|
||||
|
||||
import ../waku_core, ../node/waku_node, ../node/peer_manager
|
||||
import ../waku_core, ../node/peer_manager
|
||||
|
||||
logScope:
|
||||
topics = "waku extended kademlia discovery"
|
||||
@ -21,19 +22,36 @@ const
|
||||
DefaultExtendedKademliaDiscoveryInterval* = chronos.seconds(5)
|
||||
ExtendedKademliaDiscoveryStartupDelay* = chronos.seconds(5)
|
||||
|
||||
type ExtendedKademliaDiscoveryParams* = object
|
||||
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
|
||||
mixPubKey*: Option[Curve25519Key]
|
||||
advertiseMix*: bool = false
|
||||
type
|
||||
MixNodePoolSizeProvider* = proc(): int {.gcsafe, raises: [].}
|
||||
NodeStartedProvider* = proc(): bool {.gcsafe, raises: [].}
|
||||
|
||||
proc setupExtendedKademliaDiscovery*(
|
||||
node: WakuNode, params: ExtendedKademliaDiscoveryParams
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
ExtendedKademliaDiscoveryParams* = object
|
||||
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
|
||||
mixPubKey*: Option[Curve25519Key]
|
||||
advertiseMix*: bool = false
|
||||
|
||||
WakuKademlia* = ref object
|
||||
protocol*: KademliaDiscovery
|
||||
peerManager: PeerManager
|
||||
discoveryLoop: Future[void]
|
||||
running*: bool
|
||||
getMixNodePoolSize: MixNodePoolSizeProvider
|
||||
isNodeStarted: NodeStartedProvider
|
||||
|
||||
proc new*(
|
||||
T: type WakuKademlia,
|
||||
switch: Switch,
|
||||
params: ExtendedKademliaDiscoveryParams,
|
||||
peerManager: PeerManager,
|
||||
getMixNodePoolSize: MixNodePoolSizeProvider = nil,
|
||||
isNodeStarted: NodeStartedProvider = nil,
|
||||
): Result[T, string] =
|
||||
if params.bootstrapNodes.len == 0:
|
||||
info "starting kademlia discovery as seed node (no bootstrap nodes)"
|
||||
info "creating kademlia discovery as seed node (no bootstrap nodes)"
|
||||
|
||||
let kademlia = KademliaDiscovery.new(
|
||||
node.switch,
|
||||
switch,
|
||||
bootstrapNodes = params.bootstrapNodes,
|
||||
config = KadDHTConfig.new(
|
||||
validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector()
|
||||
@ -42,7 +60,7 @@ proc setupExtendedKademliaDiscovery*(
|
||||
)
|
||||
|
||||
try:
|
||||
node.switch.mount(kademlia)
|
||||
switch.mount(kademlia)
|
||||
except CatchableError:
|
||||
return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg())
|
||||
|
||||
@ -50,26 +68,27 @@ proc setupExtendedKademliaDiscovery*(
|
||||
# initial self-signed peer record published to the DHT
|
||||
if params.advertiseMix:
|
||||
if params.mixPubKey.isSome():
|
||||
discard kademlia.startAdvertising(
|
||||
let alreadyAdvertising = kademlia.startAdvertising(
|
||||
ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get()))
|
||||
)
|
||||
if alreadyAdvertising:
|
||||
warn "mix service was already being advertised"
|
||||
debug "extended kademlia advertising mix service",
|
||||
keyHex = params.mixPubKey.get().toHex(),
|
||||
keyHex = byteutils.toHex(params.mixPubKey.get()),
|
||||
bootstrapNodes = params.bootstrapNodes.len
|
||||
else:
|
||||
warn "mix advertising enabled but no key provided"
|
||||
|
||||
try:
|
||||
await kademlia.start()
|
||||
except CatchableError:
|
||||
return err("failed to start kademlia discovery: " & getCurrentExceptionMsg())
|
||||
|
||||
node.wakuKademlia = kademlia
|
||||
|
||||
info "kademlia discovery started",
|
||||
info "kademlia discovery created",
|
||||
bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix
|
||||
|
||||
ok()
|
||||
ok(WakuKademlia(
|
||||
protocol: kademlia,
|
||||
peerManager: peerManager,
|
||||
running: false,
|
||||
getMixNodePoolSize: getMixNodePoolSize,
|
||||
isNodeStarted: isNodeStarted,
|
||||
))
|
||||
|
||||
proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] =
|
||||
if service.id != MixProtocolID:
|
||||
@ -81,14 +100,14 @@ proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] =
|
||||
warn "invalid mix pub key length from kademlia record",
|
||||
expected = Curve25519KeySize,
|
||||
actual = service.data.len,
|
||||
dataHex = service.data.toHex()
|
||||
dataHex = byteutils.toHex(service.data)
|
||||
return none(Curve25519Key)
|
||||
|
||||
debug "found mix protocol service",
|
||||
dataLen = service.data.len, expectedLen = Curve25519KeySize
|
||||
|
||||
let key = intoCurve25519Key(service.data)
|
||||
debug "successfully extracted mix pub key", keyHex = key.toHex()
|
||||
debug "successfully extracted mix pub key", keyHex = byteutils.toHex(key)
|
||||
some(key)
|
||||
|
||||
proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
@ -132,20 +151,18 @@ proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
)
|
||||
)
|
||||
|
||||
proc lookupMixPeers*(node: WakuNode): Future[int] {.async.} =
|
||||
proc lookupMixPeers*(wk: WakuKademlia): Future[Result[int, string]] {.async: (raises: []).} =
|
||||
## Lookup mix peers via kademlia and add them to the peer store.
|
||||
## Returns the number of mix peers found and added.
|
||||
if node.wakuKademlia.isNil():
|
||||
warn "cannot lookup mix peers: kademlia not mounted"
|
||||
return 0
|
||||
if wk.protocol.isNil():
|
||||
return err("cannot lookup mix peers: kademlia not mounted")
|
||||
|
||||
let mixService = ServiceInfo(id: MixProtocolID, data: @[])
|
||||
var records: seq[ExtendedPeerRecord]
|
||||
try:
|
||||
records = await node.wakuKademlia.lookup(mixService)
|
||||
records = await wk.protocol.lookup(mixService)
|
||||
except CatchableError:
|
||||
warn "mix peer lookup failed", error = getCurrentExceptionMsg()
|
||||
return 0
|
||||
return err("mix peer lookup failed: " & getCurrentExceptionMsg())
|
||||
|
||||
debug "mix peer lookup returned records", numRecords = records.len
|
||||
|
||||
@ -159,34 +176,31 @@ proc lookupMixPeers*(node: WakuNode): Future[int] {.async.} =
|
||||
if peerInfo.mixPubKey.isNone():
|
||||
continue
|
||||
|
||||
node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
info "mix peer added via kademlia lookup",
|
||||
peerId = $peerInfo.peerId, mixPubKey = peerInfo.mixPubKey.get().toHex()
|
||||
peerId = $peerInfo.peerId, mixPubKey = byteutils.toHex(peerInfo.mixPubKey.get())
|
||||
added.inc()
|
||||
|
||||
info "mix peer lookup complete", found = added
|
||||
return added
|
||||
return ok(added)
|
||||
|
||||
proc runExtendedKademliaDiscoveryLoop(
|
||||
node: WakuNode,
|
||||
interval = DefaultExtendedKademliaDiscoveryInterval,
|
||||
minMixPeers: int = 0,
|
||||
): {.async.} =
|
||||
proc runDiscoveryLoop(
|
||||
wk: WakuKademlia,
|
||||
interval: Duration,
|
||||
minMixPeers: int,
|
||||
) {.async: (raises: []).} =
|
||||
info "extended kademlia discovery loop started", interval = interval
|
||||
|
||||
try:
|
||||
while true:
|
||||
if node.wakuKademlia.isNil():
|
||||
info "extended kademlia discovery loop stopping: protocol disabled"
|
||||
return
|
||||
|
||||
if not node.started:
|
||||
while wk.running:
|
||||
# Wait for node to be started
|
||||
if not wk.isNodeStarted.isNil() and not wk.isNodeStarted():
|
||||
await sleepAsync(ExtendedKademliaDiscoveryStartupDelay)
|
||||
continue
|
||||
|
||||
var records: seq[ExtendedPeerRecord]
|
||||
try:
|
||||
records = await node.wakuKademlia.randomRecords()
|
||||
records = await wk.protocol.randomRecords()
|
||||
except CatchableError:
|
||||
warn "extended kademlia discovery failed", error = getCurrentExceptionMsg()
|
||||
await sleepAsync(interval)
|
||||
@ -201,7 +215,7 @@ proc runExtendedKademliaDiscoveryLoop(
|
||||
continue
|
||||
|
||||
let peerInfo = peerOpt.get()
|
||||
node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
debug "peer added via extended kademlia discovery",
|
||||
peerId = $peerInfo.peerId,
|
||||
addresses = peerInfo.addrs.mapIt($it),
|
||||
@ -213,10 +227,13 @@ proc runExtendedKademliaDiscoveryLoop(
|
||||
info "added peers from extended kademlia discovery", count = added
|
||||
|
||||
# Targeted mix peer lookup when pool is low
|
||||
if minMixPeers > 0 and node.getMixNodePoolSize() < minMixPeers:
|
||||
if minMixPeers > 0 and not wk.getMixNodePoolSize.isNil() and
|
||||
wk.getMixNodePoolSize() < minMixPeers:
|
||||
debug "mix node pool below threshold, performing targeted lookup",
|
||||
currentPoolSize = node.getMixNodePoolSize(), threshold = minMixPeers
|
||||
let found = await node.lookupMixPeers()
|
||||
currentPoolSize = wk.getMixNodePoolSize(), threshold = minMixPeers
|
||||
let found = (await wk.lookupMixPeers()).valueOr:
|
||||
warn "targeted mix peer lookup failed", error = error
|
||||
0
|
||||
if found > 0:
|
||||
info "found mix peers via targeted kademlia lookup", count = found
|
||||
|
||||
@ -226,18 +243,35 @@ proc runExtendedKademliaDiscoveryLoop(
|
||||
except CatchableError as e:
|
||||
error "extended kademlia discovery loop failed", error = e.msg
|
||||
|
||||
proc startExtendedKademliaDiscoveryLoop*(
|
||||
node: WakuNode,
|
||||
interval = DefaultExtendedKademliaDiscoveryInterval,
|
||||
proc start*(
|
||||
wk: WakuKademlia,
|
||||
interval: Duration = DefaultExtendedKademliaDiscoveryInterval,
|
||||
minMixPeers: int = 0,
|
||||
) =
|
||||
if node.wakuKademlia.isNil():
|
||||
trace "extended kademlia discovery not started: protocol not mounted"
|
||||
): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
if wk.running:
|
||||
return err("already running")
|
||||
|
||||
try:
|
||||
await wk.protocol.start()
|
||||
except CatchableError:
|
||||
return err("failed to start kademlia discovery: " & getCurrentExceptionMsg())
|
||||
|
||||
wk.running = true
|
||||
wk.discoveryLoop = wk.runDiscoveryLoop(interval, minMixPeers)
|
||||
|
||||
info "kademlia discovery started"
|
||||
ok()
|
||||
|
||||
proc stop*(wk: WakuKademlia): Future[void] {.async: (raises: []).} =
|
||||
if not wk.running:
|
||||
return
|
||||
|
||||
if not node.kademliaDiscoveryLoop.isNil():
|
||||
trace "extended kademlia discovery loop already running"
|
||||
return
|
||||
info "Stopping kademlia discovery"
|
||||
|
||||
node.kademliaDiscoveryLoop =
|
||||
node.runExtendedKademliaDiscoveryLoop(interval, minMixPeers)
|
||||
wk.running = false
|
||||
|
||||
if not wk.discoveryLoop.isNil():
|
||||
await wk.discoveryLoop.cancelAndWait()
|
||||
wk.discoveryLoop = nil
|
||||
|
||||
info "Successfully stopped kademlia discovery"
|
||||
@ -33,7 +33,7 @@ import
|
||||
../waku_store_legacy/common as legacy_common,
|
||||
../waku_filter_v2,
|
||||
../waku_peer_exchange,
|
||||
../discovery/waku_ext_kademlia,
|
||||
../discovery/waku_kademlia,
|
||||
../node/peer_manager,
|
||||
../node/peer_manager/peer_store/waku_peer_storage,
|
||||
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
||||
@ -179,16 +179,19 @@ proc setupProtocols(
|
||||
else:
|
||||
none(Curve25519Key)
|
||||
|
||||
(
|
||||
await setupExtendedKademliaDiscovery(
|
||||
node,
|
||||
ExtendedKademliaDiscoveryParams(
|
||||
bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes,
|
||||
mixPubKey: mixPubKey,
|
||||
advertiseMix: conf.mixConf.isSome(),
|
||||
),
|
||||
)
|
||||
).isOkOr:
|
||||
node.wakuKademlia = WakuKademlia.new(
|
||||
node.switch,
|
||||
ExtendedKademliaDiscoveryParams(
|
||||
bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes,
|
||||
mixPubKey: mixPubKey,
|
||||
advertiseMix: conf.mixConf.isSome(),
|
||||
),
|
||||
node.peerManager,
|
||||
getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} =
|
||||
if node.wakuMix.isNil(): 0 else: node.getMixNodePoolSize(),
|
||||
isNodeStarted = proc(): bool {.gcsafe, raises: [].} =
|
||||
node.started,
|
||||
).valueOr:
|
||||
return err("failed to setup kademlia discovery: " & error)
|
||||
|
||||
if conf.storeServiceConf.isSome():
|
||||
@ -496,8 +499,10 @@ proc startNode*(
|
||||
if conf.relay:
|
||||
node.peerManager.start()
|
||||
|
||||
let minMixPeers = if conf.mixConf.isSome(): 4 else: 0
|
||||
startExtendedKademliaDiscoveryLoop(node, minMixPeers = minMixPeers)
|
||||
if not node.wakuKademlia.isNil():
|
||||
let minMixPeers = if conf.mixConf.isSome(): 4 else: 0
|
||||
(await node.wakuKademlia.start(minMixPeers = minMixPeers)).isOkOr:
|
||||
return err("failed to start kademlia discovery: " & error)
|
||||
|
||||
return ok()
|
||||
|
||||
|
||||
@ -17,7 +17,6 @@ import
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/protocols/kad_disco,
|
||||
libp2p/builders,
|
||||
libp2p/transports/transport,
|
||||
libp2p/transports/tcptransport,
|
||||
@ -62,6 +61,7 @@ import
|
||||
requests/node_requests,
|
||||
common/broker/broker_context,
|
||||
],
|
||||
../discovery/waku_kademlia,
|
||||
./net_config,
|
||||
./peer_manager
|
||||
|
||||
@ -137,8 +137,7 @@ type
|
||||
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
||||
rateLimitSettings*: ProtocolRateLimitSettings
|
||||
wakuMix*: WakuMix
|
||||
wakuKademlia*: KademliaDiscovery
|
||||
kademliaDiscoveryLoop*: Future[void]
|
||||
wakuKademlia*: WakuKademlia
|
||||
|
||||
proc deduceRelayShard(
|
||||
node: WakuNode,
|
||||
@ -285,7 +284,7 @@ proc mountAutoSharding*(
|
||||
return ok()
|
||||
|
||||
proc getMixNodePoolSize*(node: WakuNode): int =
|
||||
return node.wakuMix.nodePool.len
|
||||
return node.wakuMix.poolSize
|
||||
|
||||
proc mountMix*(
|
||||
node: WakuNode,
|
||||
@ -585,9 +584,8 @@ proc stop*(node: WakuNode) {.async.} =
|
||||
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()
|
||||
node.wakuPeerExchangeClient.pxLoopHandle = nil
|
||||
|
||||
if not node.kademliaDiscoveryLoop.isNil():
|
||||
await node.kademliaDiscoveryLoop.cancelAndWait()
|
||||
node.kademliaDiscoveryLoop = nil
|
||||
if not node.wakuKademlia.isNil():
|
||||
await node.wakuKademlia.stop()
|
||||
|
||||
if not node.wakuRendezvous.isNil():
|
||||
await node.wakuRendezvous.stopWait()
|
||||
|
||||
@ -100,6 +100,9 @@ proc new*(
|
||||
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
|
||||
return ok(m)
|
||||
|
||||
proc poolSize*(mix: WakuMix): int =
|
||||
mix.nodePool.len
|
||||
|
||||
method start*(mix: WakuMix) =
|
||||
info "starting waku mix protocol"
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user