mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-02-13 02:13:12 +00:00
integrate mix protocol with extended kademlia discovery, libwaku API updates for logos-chat-poc integration
This commit is contained in:
parent
dd8dc7429d
commit
0f1efbab76
@ -24,12 +24,14 @@ import
|
||||
stream/connection, # create and close stream read / write connections
|
||||
multiaddress,
|
||||
# encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP
|
||||
multicodec,
|
||||
peerinfo,
|
||||
# manage the information of a peer, such as peer ID and public / private key
|
||||
peerid, # Implement how peers interact
|
||||
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
|
||||
nameresolving/dnsresolver,
|
||||
protocols/mix/curve25519,
|
||||
protocols/mix/mix_protocol,
|
||||
] # define DNS resolution
|
||||
import
|
||||
waku/[
|
||||
@ -38,6 +40,7 @@ import
|
||||
waku_lightpush/rpc,
|
||||
waku_enr,
|
||||
discovery/waku_dnsdisc,
|
||||
discovery/waku_ext_kademlia,
|
||||
waku_node,
|
||||
node/waku_metrics,
|
||||
node/peer_manager,
|
||||
@ -84,6 +87,24 @@ type
|
||||
|
||||
const MinMixNodePoolSize = 4
|
||||
|
||||
proc parseKadBootstrapNode(
|
||||
multiAddrStr: string
|
||||
): Result[(PeerId, seq[MultiAddress]), string] =
|
||||
## Parse a multiaddr string that includes /p2p/<peerID> into (PeerId, seq[MultiAddress])
|
||||
let multiAddr = MultiAddress.init(multiAddrStr).valueOr:
|
||||
return err("Invalid multiaddress: " & multiAddrStr)
|
||||
|
||||
let peerIdPart = multiAddr.getPart(multiCodec("p2p")).valueOr:
|
||||
return err("Multiaddress must include /p2p/<peerID>: " & multiAddrStr)
|
||||
|
||||
let peerIdBytes = peerIdPart.protoArgument().valueOr:
|
||||
return err("Failed to extract peer ID from multiaddress: " & multiAddrStr)
|
||||
|
||||
let peerId = PeerId.init(peerIdBytes).valueOr:
|
||||
return err("Invalid peer ID in multiaddress: " & multiAddrStr)
|
||||
|
||||
ok((peerId, @[multiAddr]))
|
||||
|
||||
#####################
|
||||
## chat2 protobufs ##
|
||||
#####################
|
||||
@ -453,14 +474,39 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
(await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr:
|
||||
error "failed to mount waku mix protocol: ", error = $error
|
||||
quit(QuitFailure)
|
||||
await node.mountRendezvousClient(conf.clusterId)
|
||||
|
||||
# Setup extended kademlia discovery if bootstrap nodes are provided
|
||||
if conf.kadBootstrapNodes.len > 0:
|
||||
var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])]
|
||||
for nodeStr in conf.kadBootstrapNodes:
|
||||
let parsed = parseKadBootstrapNode(nodeStr).valueOr:
|
||||
error "Failed to parse kademlia bootstrap node", node = nodeStr, error = error
|
||||
continue
|
||||
kadBootstrapPeers.add(parsed)
|
||||
|
||||
if kadBootstrapPeers.len > 0:
|
||||
(
|
||||
await setupExtendedKademliaDiscovery(
|
||||
node,
|
||||
ExtendedKademliaDiscoveryParams(
|
||||
bootstrapNodes: kadBootstrapPeers,
|
||||
mixPubKey: some(mixPubKey),
|
||||
advertiseMix: false,
|
||||
),
|
||||
)
|
||||
).isOkOr:
|
||||
error "failed to setup kademlia discovery", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
#await node.mountRendezvousClient(conf.clusterId)
|
||||
|
||||
await node.start()
|
||||
|
||||
node.peerManager.start()
|
||||
node.startExtendedKademliaDiscoveryLoop(minMixPeers = MinMixNodePoolSize)
|
||||
|
||||
await node.mountLibp2pPing()
|
||||
await node.mountPeerExchangeClient()
|
||||
#await node.mountPeerExchangeClient()
|
||||
let pubsubTopic = conf.getPubsubTopic(node, conf.contentTopic)
|
||||
echo "pubsub topic is: " & pubsubTopic
|
||||
let nick = await readNick(transp)
|
||||
@ -601,11 +647,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
node, pubsubTopic, conf.contentTopic, servicePeerInfo, false
|
||||
)
|
||||
echo "waiting for mix nodes to be discovered..."
|
||||
while true:
|
||||
if node.getMixNodePoolSize() >= MinMixNodePoolSize:
|
||||
break
|
||||
discard await node.fetchPeerExchangePeers()
|
||||
await sleepAsync(1000)
|
||||
|
||||
while node.getMixNodePoolSize() < MinMixNodePoolSize:
|
||||
info "waiting for mix nodes to be discovered",
|
||||
|
||||
@ -203,13 +203,13 @@ type
|
||||
fleet* {.
|
||||
desc:
|
||||
"Select the fleet to connect to. This sets the DNS discovery URL to the selected fleet.",
|
||||
defaultValue: Fleet.test,
|
||||
defaultValue: Fleet.none,
|
||||
name: "fleet"
|
||||
.}: Fleet
|
||||
|
||||
contentTopic* {.
|
||||
desc: "Content topic for chat messages.",
|
||||
defaultValue: "/toy-chat-mix/2/huilong/proto",
|
||||
defaultValue: "/toy-chat/2/baixa-chiado/proto",
|
||||
name: "content-topic"
|
||||
.}: string
|
||||
|
||||
@ -228,7 +228,14 @@ type
|
||||
desc: "WebSocket Secure Support.",
|
||||
defaultValue: false,
|
||||
name: "websocket-secure-support"
|
||||
.}: bool ## rln-relay configuration
|
||||
.}: bool
|
||||
|
||||
## Kademlia Discovery config
|
||||
kadBootstrapNodes* {.
|
||||
desc:
|
||||
"Peer multiaddr for kademlia discovery bootstrap node (must include /p2p/<peerID>). Argument may be repeated.",
|
||||
name: "kad-bootstrap-node"
|
||||
.}: seq[string]
|
||||
|
||||
proc parseCmdArg*(T: type MixNodePubInfo, p: string): T =
|
||||
let elements = p.split(":")
|
||||
|
||||
@ -9,7 +9,11 @@ import
|
||||
metrics,
|
||||
ffi
|
||||
import
|
||||
waku/factory/waku, waku/node/waku_node, waku/node/health_monitor, library/declare_lib
|
||||
waku/factory/waku,
|
||||
waku/node/waku_node,
|
||||
waku/node/health_monitor,
|
||||
library/declare_lib,
|
||||
waku/waku_core/codecs
|
||||
|
||||
proc getMultiaddresses(node: WakuNode): seq[string] =
|
||||
return node.info().listenAddresses
|
||||
@ -48,3 +52,19 @@ proc waku_is_online(
|
||||
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
return ok($ctx.myLib[].healthMonitor.onlineMonitor.amIOnline())
|
||||
|
||||
proc waku_get_mixnode_pool_size(
|
||||
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
## Returns the number of mix nodes in the pool
|
||||
if ctx.myLib[].node.wakuMix.isNil():
|
||||
return ok("0")
|
||||
return ok($ctx.myLib[].node.getMixNodePoolSize())
|
||||
|
||||
proc waku_get_lightpush_peers_count(
|
||||
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
## Returns the count of all peers in peerstore supporting lightpush protocol
|
||||
let peers =
|
||||
ctx.myLib[].node.peerManager.switch.peerStore.getPeersByProtocol(WakuLightPushCodec)
|
||||
return ok($peers.len)
|
||||
|
||||
@ -47,8 +47,10 @@ proc waku_filter_subscribe(
|
||||
error "fail filter subscribe", error = errorMsg
|
||||
return err(errorMsg)
|
||||
|
||||
let pubsubTopicOpt =
|
||||
if ($pubsubTopic).len > 0: some(PubsubTopic($pubsubTopic)) else: none(PubsubTopic)
|
||||
let subFut = ctx.myLib[].node.filterSubscribe(
|
||||
some(PubsubTopic($pubsubTopic)),
|
||||
pubsubTopicOpt,
|
||||
($contentTopics).split(",").mapIt(ContentTopic(it)),
|
||||
peer,
|
||||
)
|
||||
|
||||
@ -39,13 +39,15 @@ proc waku_lightpush_publish(
|
||||
let errorMsg = "failed to lightpublish message, no suitable remote peers"
|
||||
error "PUBLISH failed", error = errorMsg
|
||||
return err(errorMsg)
|
||||
let topic =
|
||||
if ($pubsubTopic).len == 0:
|
||||
none(PubsubTopic)
|
||||
else:
|
||||
some(PubsubTopic($pubsubTopic))
|
||||
|
||||
let msgHashHex = (
|
||||
await ctx.myLib[].node.wakuLegacyLightpushClient.publish(
|
||||
$pubsubTopic, msg, peer = peerOpt.get()
|
||||
)
|
||||
).valueOr:
|
||||
error "PUBLISH failed", error = error
|
||||
return err($error)
|
||||
discard (await ctx.myLib[].node.lightpushPublish(topic, msg, peerOpt)).valueOr:
|
||||
let errorMsg = error.desc.get($error.code.int)
|
||||
error "PUBLISH failed", error = errorMsg
|
||||
return err(errorMsg)
|
||||
|
||||
return ok(msgHashHex)
|
||||
return ok("")
|
||||
|
||||
@ -85,8 +85,8 @@ proc waku_relay_subscribe(
|
||||
callback: FFICallBack,
|
||||
userData: pointer,
|
||||
pubSubTopic: cstring,
|
||||
contentTopic: cstring,
|
||||
) {.ffi.} =
|
||||
echo "Subscribing to topic: " & $pubSubTopic & " ..."
|
||||
proc onReceivedMessage(ctx: ptr FFIContext[Waku]): WakuRelayHandler =
|
||||
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
|
||||
callEventCallback(ctx, "onReceivedMessage"):
|
||||
@ -94,21 +94,38 @@ proc waku_relay_subscribe(
|
||||
|
||||
var cb = onReceivedMessage(ctx)
|
||||
|
||||
ctx.myLib[].node.subscribe(
|
||||
(kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic),
|
||||
handler = WakuRelayHandler(cb),
|
||||
).isOkOr:
|
||||
# If contentTopic is provided and non-empty, use ContentSub, otherwise use PubsubSub
|
||||
let subscription =
|
||||
if contentTopic != nil and len($contentTopic) > 0:
|
||||
echo "Subscribing to content topic: " & $contentTopic & " ..."
|
||||
(kind: SubscriptionKind.ContentSub, topic: $contentTopic)
|
||||
else:
|
||||
echo "Subscribing to pubsub topic: " & $pubSubTopic & " ..."
|
||||
(kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic)
|
||||
|
||||
ctx.myLib[].node.subscribe(subscription, handler = WakuRelayHandler(cb)).isOkOr:
|
||||
error "SUBSCRIBE failed", error = error
|
||||
return err($error)
|
||||
return ok("")
|
||||
|
||||
# NOTE: When unsubscribing via contentTopic, this will unsubscribe from the entire
|
||||
# underlying pubsub topic/shard that the content topic maps to. This affects ALL
|
||||
# content topics on the same shard, not just the specified content topic.
|
||||
proc waku_relay_unsubscribe(
|
||||
ctx: ptr FFIContext[Waku],
|
||||
callback: FFICallBack,
|
||||
userData: pointer,
|
||||
pubSubTopic: cstring,
|
||||
contentTopic: cstring,
|
||||
) {.ffi.} =
|
||||
ctx.myLib[].node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic)).isOkOr:
|
||||
# If contentTopic is provided and non-empty, use ContentUnsub, otherwise use PubsubUnsub
|
||||
let subscription =
|
||||
if contentTopic != nil and len($contentTopic) > 0:
|
||||
(kind: SubscriptionKind.ContentUnsub, topic: $contentTopic)
|
||||
else:
|
||||
(kind: SubscriptionKind.PubsubUnsub, topic: $pubsubTopic)
|
||||
|
||||
ctx.myLib[].node.unsubscribe(subscription).isOkOr:
|
||||
error "UNSUBSCRIBE failed", error = error
|
||||
return err($error)
|
||||
|
||||
|
||||
@ -8,8 +8,13 @@ import
|
||||
waku/waku_store/common,
|
||||
waku/waku_store/client,
|
||||
waku/common/paging,
|
||||
waku/common/base64,
|
||||
library/declare_lib
|
||||
|
||||
# Custom JSON serialization for seq[byte] to avoid ambiguity
|
||||
proc `%`*(data: seq[byte]): JsonNode =
|
||||
%base64.encode(data)
|
||||
|
||||
func fromJsonNode(jsonContent: JsonNode): Result[StoreQueryRequest, string] =
|
||||
var contentTopics: seq[string]
|
||||
if jsonContent.contains("contentTopics"):
|
||||
@ -90,5 +95,6 @@ proc waku_store_query(
|
||||
).valueOr:
|
||||
return err("StoreRequest failed store query: " & $error)
|
||||
|
||||
let res = $(%*(queryResponse.toHex()))
|
||||
let hexResponse = queryResponse.toHex()
|
||||
let res = $(%*hexResponse)
|
||||
return ok(res) ## returning the response in json format
|
||||
|
||||
@ -85,7 +85,8 @@ extern "C"
|
||||
int waku_relay_subscribe(void *ctx,
|
||||
FFICallBack callback,
|
||||
void *userData,
|
||||
const char *pubSubTopic);
|
||||
const char *pubSubTopic,
|
||||
const char *contentTopic);
|
||||
|
||||
int waku_relay_add_protected_shard(void *ctx,
|
||||
FFICallBack callback,
|
||||
@ -97,7 +98,8 @@ extern "C"
|
||||
int waku_relay_unsubscribe(void *ctx,
|
||||
FFICallBack callback,
|
||||
void *userData,
|
||||
const char *pubSubTopic);
|
||||
const char *pubSubTopic,
|
||||
const char *contentTopic);
|
||||
|
||||
int waku_filter_subscribe(void *ctx,
|
||||
FFICallBack callback,
|
||||
@ -247,6 +249,14 @@ extern "C"
|
||||
FFICallBack callback,
|
||||
void *userData);
|
||||
|
||||
int waku_get_mixnode_pool_size(void *ctx,
|
||||
FFICallBack callback,
|
||||
void *userData);
|
||||
|
||||
int waku_get_lightpush_peers_count(void *ctx,
|
||||
FFICallBack callback,
|
||||
void *userData);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -45,9 +45,6 @@ in stdenv.mkDerivation {
|
||||
pkgs.darwin.cctools gcc # Necessary for libbacktrace
|
||||
];
|
||||
|
||||
# Environment variables required for Android builds
|
||||
ANDROID_SDK_ROOT="${pkgs.androidPkgs.sdk}";
|
||||
ANDROID_NDK_HOME="${pkgs.androidPkgs.ndk}";
|
||||
NIMFLAGS = "-d:disableMarchNative -d:git_revision_override=${revision}";
|
||||
XDG_CACHE_HOME = "/tmp";
|
||||
|
||||
@ -61,6 +58,15 @@ in stdenv.mkDerivation {
|
||||
|
||||
configurePhase = ''
|
||||
patchShebangs . vendor/nimbus-build-system > /dev/null
|
||||
|
||||
# build_nim.sh guards "rm -rf dist/checksums" with NIX_BUILD_TOP != "/build",
|
||||
# but on macOS the nix sandbox uses /private/tmp/... so the check fails and
|
||||
# dist/checksums (provided via preBuild) gets deleted. Fix the check to skip
|
||||
# the removal whenever NIX_BUILD_TOP is set (i.e. any nix build).
|
||||
substituteInPlace vendor/nimbus-build-system/scripts/build_nim.sh \
|
||||
--replace 'if [[ "''${NIX_BUILD_TOP}" != "/build" ]]; then' \
|
||||
'if [[ -z "''${NIX_BUILD_TOP}" ]]; then'
|
||||
|
||||
make nimbus-build-system-paths
|
||||
make nimbus-build-system-nimble-dir
|
||||
'';
|
||||
@ -98,6 +104,9 @@ in stdenv.mkDerivation {
|
||||
cp library/libwaku.h $out/include/
|
||||
'';
|
||||
|
||||
ANDROID_SDK_ROOT = "${pkgs.androidPkgs.sdk}";
|
||||
ANDROID_NDK_HOME = "${pkgs.androidPkgs.ndk}";
|
||||
|
||||
meta = with pkgs.lib; {
|
||||
description = "NWaku derivation to build libwaku for mobile targets using Android NDK and Rust.";
|
||||
homepage = "https://github.com/status-im/nwaku";
|
||||
|
||||
@ -1,16 +1,17 @@
|
||||
log-level = "INFO"
|
||||
log-level = "TRACE"
|
||||
relay = true
|
||||
mix = true
|
||||
filter = true
|
||||
store = false
|
||||
lightpush = true
|
||||
max-connections = 150
|
||||
peer-exchange = true
|
||||
peer-exchange = false
|
||||
metrics-logging = false
|
||||
cluster-id = 2
|
||||
discv5-discovery = true
|
||||
discv5-discovery = false
|
||||
discv5-udp-port = 9000
|
||||
discv5-enr-auto-update = true
|
||||
enable-kad-discovery = true
|
||||
rest = true
|
||||
rest-admin = true
|
||||
ports-shift = 1
|
||||
@ -19,7 +20,9 @@ shard = [0]
|
||||
agent-string = "nwaku-mix"
|
||||
nodekey = "f98e3fba96c32e8d1967d460f1b79457380e1a895f7971cecc8528abe733781a"
|
||||
mixkey = "a87db88246ec0eedda347b9b643864bee3d6933eb15ba41e6d58cb678d813258"
|
||||
rendezvous = true
|
||||
rendezvous = false
|
||||
listen-address = "127.0.0.1"
|
||||
nat = "extip:127.0.0.1"
|
||||
ext-multiaddr = ["/ip4/127.0.0.1/tcp/60001"]
|
||||
ext-multiaddr-only = true
|
||||
ip-colocation-limit=0
|
||||
|
||||
@ -1,17 +1,18 @@
|
||||
log-level = "INFO"
|
||||
log-level = "TRACE"
|
||||
relay = true
|
||||
mix = true
|
||||
filter = true
|
||||
store = false
|
||||
lightpush = true
|
||||
max-connections = 150
|
||||
peer-exchange = true
|
||||
peer-exchange = false
|
||||
metrics-logging = false
|
||||
cluster-id = 2
|
||||
discv5-discovery = true
|
||||
discv5-discovery = false
|
||||
discv5-udp-port = 9001
|
||||
discv5-enr-auto-update = true
|
||||
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
|
||||
kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"]
|
||||
rest = true
|
||||
rest-admin = true
|
||||
ports-shift = 2
|
||||
@ -20,8 +21,10 @@ shard = [0]
|
||||
agent-string = "nwaku-mix"
|
||||
nodekey = "09e9d134331953357bd38bbfce8edb377f4b6308b4f3bfbe85c610497053d684"
|
||||
mixkey = "c86029e02c05a7e25182974b519d0d52fcbafeca6fe191fbb64857fb05be1a53"
|
||||
rendezvous = true
|
||||
rendezvous = false
|
||||
listen-address = "127.0.0.1"
|
||||
nat = "extip:127.0.0.1"
|
||||
ext-multiaddr = ["/ip4/127.0.0.1/tcp/60002"]
|
||||
ext-multiaddr-only = true
|
||||
ip-colocation-limit=0
|
||||
#staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"]
|
||||
|
||||
@ -1,17 +1,18 @@
|
||||
log-level = "INFO"
|
||||
log-level = "TRACE"
|
||||
relay = true
|
||||
mix = true
|
||||
filter = true
|
||||
store = false
|
||||
lightpush = true
|
||||
max-connections = 150
|
||||
peer-exchange = true
|
||||
peer-exchange = false
|
||||
metrics-logging = false
|
||||
cluster-id = 2
|
||||
discv5-discovery = true
|
||||
discv5-discovery = false
|
||||
discv5-udp-port = 9002
|
||||
discv5-enr-auto-update = true
|
||||
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
|
||||
kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"]
|
||||
rest = false
|
||||
rest-admin = false
|
||||
ports-shift = 3
|
||||
@ -20,8 +21,10 @@ shard = [0]
|
||||
agent-string = "nwaku-mix"
|
||||
nodekey = "ed54db994682e857d77cd6fb81be697382dc43aa5cd78e16b0ec8098549f860e"
|
||||
mixkey = "b858ac16bbb551c4b2973313b1c8c8f7ea469fca03f1608d200bbf58d388ec7f"
|
||||
rendezvous = true
|
||||
rendezvous = false
|
||||
listen-address = "127.0.0.1"
|
||||
nat = "extip:127.0.0.1"
|
||||
ext-multiaddr = ["/ip4/127.0.0.1/tcp/60003"]
|
||||
ext-multiaddr-only = true
|
||||
ip-colocation-limit=0
|
||||
#staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"]
|
||||
|
||||
@ -1,17 +1,18 @@
|
||||
log-level = "INFO"
|
||||
log-level = "TRACE"
|
||||
relay = true
|
||||
mix = true
|
||||
filter = true
|
||||
store = false
|
||||
lightpush = true
|
||||
max-connections = 150
|
||||
peer-exchange = true
|
||||
peer-exchange = false
|
||||
metrics-logging = false
|
||||
cluster-id = 2
|
||||
discv5-discovery = true
|
||||
discv5-discovery = false
|
||||
discv5-udp-port = 9003
|
||||
discv5-enr-auto-update = true
|
||||
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
|
||||
kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"]
|
||||
rest = false
|
||||
rest-admin = false
|
||||
ports-shift = 4
|
||||
@ -20,8 +21,10 @@ shard = [0]
|
||||
agent-string = "nwaku-mix"
|
||||
nodekey = "42f96f29f2d6670938b0864aced65a332dcf5774103b4c44ec4d0ea4ef3c47d6"
|
||||
mixkey = "d8bd379bb394b0f22dd236d63af9f1a9bc45266beffc3fbbe19e8b6575f2535b"
|
||||
rendezvous = true
|
||||
rendezvous = false
|
||||
listen-address = "127.0.0.1"
|
||||
nat = "extip:127.0.0.1"
|
||||
ext-multiaddr = ["/ip4/127.0.0.1/tcp/60004"]
|
||||
ext-multiaddr-only = true
|
||||
ip-colocation-limit=0
|
||||
#staticnode = ["/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"]
|
||||
|
||||
@ -1,17 +1,18 @@
|
||||
log-level = "INFO"
|
||||
log-level = "TRACE"
|
||||
relay = true
|
||||
mix = true
|
||||
filter = true
|
||||
store = false
|
||||
lightpush = true
|
||||
max-connections = 150
|
||||
peer-exchange = true
|
||||
peer-exchange = false
|
||||
metrics-logging = false
|
||||
cluster-id = 2
|
||||
discv5-discovery = true
|
||||
discv5-discovery = false
|
||||
discv5-udp-port = 9004
|
||||
discv5-enr-auto-update = true
|
||||
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
|
||||
kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"]
|
||||
rest = false
|
||||
rest-admin = false
|
||||
ports-shift = 5
|
||||
@ -20,8 +21,10 @@ shard = [0]
|
||||
agent-string = "nwaku-mix"
|
||||
nodekey = "3ce887b3c34b7a92dd2868af33941ed1dbec4893b054572cd5078da09dd923d4"
|
||||
mixkey = "780fff09e51e98df574e266bf3266ec6a3a1ddfcf7da826a349a29c137009d49"
|
||||
rendezvous = true
|
||||
rendezvous = false
|
||||
listen-address = "127.0.0.1"
|
||||
nat = "extip:127.0.0.1"
|
||||
ext-multiaddr = ["/ip4/127.0.0.1/tcp/60005"]
|
||||
ext-multiaddr-only = true
|
||||
ip-colocation-limit=0
|
||||
#staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF"]
|
||||
|
||||
@ -1,2 +1,2 @@
|
||||
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE
|
||||
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --kad-bootstrap-node="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"
|
||||
#--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
|
||||
|
||||
@ -1 +1 @@
|
||||
../../build/wakunode2 --config-file="config.toml"
|
||||
../../build/wakunode2 --config-file="config.toml" 2>&1 | tee mix_node.log
|
||||
|
||||
@ -1 +1 @@
|
||||
../../build/wakunode2 --config-file="config1.toml"
|
||||
../../build/wakunode2 --config-file="config1.toml" 2>&1 | tee mix_node1.log
|
||||
|
||||
@ -1 +1 @@
|
||||
../../build/wakunode2 --config-file="config2.toml"
|
||||
../../build/wakunode2 --config-file="config2.toml" 2>&1 | tee mix_node2.log
|
||||
|
||||
@ -1 +1 @@
|
||||
../../build/wakunode2 --config-file="config3.toml"
|
||||
../../build/wakunode2 --config-file="config3.toml" 2>&1 | tee mix_node3.log
|
||||
|
||||
@ -1 +1 @@
|
||||
../../build/wakunode2 --config-file="config4.toml"
|
||||
../../build/wakunode2 --config-file="config4.toml" 2>&1 | tee mix_node4.log
|
||||
|
||||
@ -621,6 +621,20 @@ with the drawback of consuming some more bandwidth.""",
|
||||
name: "mixnode"
|
||||
.}: seq[MixNodePubInfo]
|
||||
|
||||
# Kademlia Discovery config
|
||||
enableKadDiscovery* {.
|
||||
desc:
|
||||
"Enable extended kademlia discovery. Can be enabled without bootstrap nodes for the first node in the network.",
|
||||
defaultValue: false,
|
||||
name: "enable-kad-discovery"
|
||||
.}: bool
|
||||
|
||||
kadBootstrapNodes* {.
|
||||
desc:
|
||||
"Peer multiaddr for kademlia discovery bootstrap node (must include /p2p/<peerID>). Argument may be repeated.",
|
||||
name: "kad-bootstrap-node"
|
||||
.}: seq[string]
|
||||
|
||||
## websocket config
|
||||
websocketSupport* {.
|
||||
desc: "Enable websocket: true|false",
|
||||
@ -1057,4 +1071,7 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
|
||||
|
||||
b.rateLimitConf.withRateLimits(n.rateLimits)
|
||||
|
||||
b.kademliaDiscoveryConf.withEnabled(n.enableKadDiscovery)
|
||||
b.kademliaDiscoveryConf.withBootstrapNodes(n.kadBootstrapNodes)
|
||||
|
||||
return b.build()
|
||||
|
||||
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
||||
Subproject commit ca48c3718246bb411ff0e354a70cb82d9a28de0d
|
||||
Subproject commit ff8d51857b4b79a68468e7bcc27b2026cca02996
|
||||
@ -24,7 +24,7 @@ requires "nim >= 2.2.4",
|
||||
"stew",
|
||||
"stint",
|
||||
"metrics",
|
||||
"libp2p >= 1.14.3",
|
||||
"libp2p >= 1.15.0",
|
||||
"web3",
|
||||
"presto",
|
||||
"regex",
|
||||
|
||||
243
waku/discovery/waku_ext_kademlia.nim
Normal file
243
waku/discovery/waku_ext_kademlia.nim
Normal file
@ -0,0 +1,243 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, sequtils],
|
||||
chronos,
|
||||
chronicles,
|
||||
results,
|
||||
libp2p/[peerid, multiaddress],
|
||||
libp2p/extended_peer_record,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/protocols/[kademlia, kad_disco],
|
||||
libp2p/protocols/kademlia_discovery/types as kad_types,
|
||||
libp2p/protocols/mix/mix_protocol
|
||||
|
||||
import ../waku_core, ../node/waku_node, ../node/peer_manager
|
||||
|
||||
logScope:
|
||||
topics = "waku extended kademlia discovery"
|
||||
|
||||
const
|
||||
DefaultExtendedKademliaDiscoveryInterval* = chronos.seconds(5)
|
||||
ExtendedKademliaDiscoveryStartupDelay* = chronos.seconds(5)
|
||||
|
||||
type ExtendedKademliaDiscoveryParams* = object
|
||||
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
|
||||
mixPubKey*: Option[Curve25519Key]
|
||||
advertiseMix*: bool = false
|
||||
|
||||
proc setupExtendedKademliaDiscovery*(
|
||||
node: WakuNode, params: ExtendedKademliaDiscoveryParams
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
if params.bootstrapNodes.len == 0:
|
||||
info "starting kademlia discovery as seed node (no bootstrap nodes)"
|
||||
|
||||
let kademlia = KademliaDiscovery.new(
|
||||
node.switch,
|
||||
bootstrapNodes = params.bootstrapNodes,
|
||||
config = KadDHTConfig.new(
|
||||
validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector()
|
||||
),
|
||||
codec = ExtendedKademliaDiscoveryCodec,
|
||||
)
|
||||
|
||||
try:
|
||||
node.switch.mount(kademlia)
|
||||
except CatchableError:
|
||||
return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg())
|
||||
|
||||
# Register services BEFORE starting kademlia so they are included in the
|
||||
# initial self-signed peer record published to the DHT
|
||||
if params.advertiseMix:
|
||||
if params.mixPubKey.isSome():
|
||||
discard kademlia.startAdvertising(
|
||||
ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get()))
|
||||
)
|
||||
debug "extended kademlia advertising mix service",
|
||||
keyHex = params.mixPubKey.get().toHex(),
|
||||
bootstrapNodes = params.bootstrapNodes.len
|
||||
else:
|
||||
warn "mix advertising enabled but no key provided"
|
||||
|
||||
try:
|
||||
await kademlia.start()
|
||||
except CatchableError:
|
||||
return err("failed to start kademlia discovery: " & getCurrentExceptionMsg())
|
||||
|
||||
node.wakuKademlia = kademlia
|
||||
|
||||
info "kademlia discovery started",
|
||||
bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix
|
||||
|
||||
ok()
|
||||
|
||||
proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] =
|
||||
if service.id != MixProtocolID:
|
||||
trace "service is not mix protocol",
|
||||
serviceId = service.id, mixProtocolId = MixProtocolID
|
||||
return none(Curve25519Key)
|
||||
|
||||
debug "found mix protocol service",
|
||||
dataLen = service.data.len, expectedLen = Curve25519KeySize
|
||||
|
||||
if service.data.len != Curve25519KeySize:
|
||||
warn "invalid mix pub key length from kademlia record",
|
||||
expected = Curve25519KeySize,
|
||||
actual = service.data.len,
|
||||
dataHex = service.data.toHex()
|
||||
return none(Curve25519Key)
|
||||
|
||||
let key = intoCurve25519Key(service.data)
|
||||
debug "successfully extracted mix pub key", keyHex = key.toHex()
|
||||
some(key)
|
||||
|
||||
proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
debug "processing kademlia record",
|
||||
peerId = record.peerId,
|
||||
numAddresses = record.addresses.len,
|
||||
numServices = record.services.len,
|
||||
serviceIds = record.services.mapIt(it.id)
|
||||
|
||||
if record.addresses.len == 0:
|
||||
trace "kademlia record missing addresses", peerId = record.peerId
|
||||
return none(RemotePeerInfo)
|
||||
|
||||
let addrs = record.addresses.mapIt(it.address)
|
||||
if addrs.len == 0:
|
||||
trace "kademlia record produced no dialable addresses", peerId = record.peerId
|
||||
return none(RemotePeerInfo)
|
||||
|
||||
let protocols = record.services.mapIt(it.id)
|
||||
|
||||
var mixPubKey = none(Curve25519Key)
|
||||
for service in record.services:
|
||||
debug "checking service",
|
||||
peerId = record.peerId, serviceId = service.id, dataLen = service.data.len
|
||||
mixPubKey = extractMixPubKey(service)
|
||||
if mixPubKey.isSome():
|
||||
debug "extracted mix public key from service", peerId = record.peerId
|
||||
break
|
||||
|
||||
if record.services.len > 0 and mixPubKey.isNone():
|
||||
debug "record has services but no valid mix key",
|
||||
peerId = record.peerId, services = record.services.mapIt(it.id)
|
||||
|
||||
some(
|
||||
RemotePeerInfo.init(
|
||||
record.peerId,
|
||||
addrs = addrs,
|
||||
protocols = protocols,
|
||||
origin = PeerOrigin.Kademlia,
|
||||
mixPubKey = mixPubKey,
|
||||
)
|
||||
)
|
||||
|
||||
proc lookupMixPeers*(node: WakuNode): Future[int] {.async.} =
|
||||
## Lookup mix peers via kademlia and add them to the peer store.
|
||||
## Returns the number of mix peers found and added.
|
||||
if node.wakuKademlia.isNil():
|
||||
warn "cannot lookup mix peers: kademlia not mounted"
|
||||
return 0
|
||||
|
||||
let mixService = ServiceInfo(id: MixProtocolID, data: @[])
|
||||
var records: seq[ExtendedPeerRecord]
|
||||
try:
|
||||
records = await node.wakuKademlia.lookup(mixService)
|
||||
except CatchableError:
|
||||
warn "mix peer lookup failed", error = getCurrentExceptionMsg()
|
||||
return 0
|
||||
|
||||
debug "mix peer lookup returned records", numRecords = records.len
|
||||
|
||||
var added = 0
|
||||
for record in records:
|
||||
let peerOpt = remotePeerInfoFrom(record)
|
||||
if peerOpt.isNone():
|
||||
continue
|
||||
|
||||
let peerInfo = peerOpt.get()
|
||||
if peerInfo.mixPubKey.isNone():
|
||||
continue
|
||||
|
||||
node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
info "mix peer added via kademlia lookup",
|
||||
peerId = $peerInfo.peerId, mixPubKey = peerInfo.mixPubKey.get().toHex()
|
||||
added.inc()
|
||||
|
||||
info "mix peer lookup complete", found = added
|
||||
return added
|
||||
|
||||
proc runExtendedKademliaDiscoveryLoop*(
|
||||
node: WakuNode,
|
||||
interval = DefaultExtendedKademliaDiscoveryInterval,
|
||||
minMixPeers: int = 0,
|
||||
): Future[void] {.async.} =
|
||||
info "extended kademlia discovery loop started", interval = interval
|
||||
|
||||
try:
|
||||
while true:
|
||||
if node.wakuKademlia.isNil():
|
||||
info "extended kademlia discovery loop stopping: protocol disabled"
|
||||
return
|
||||
|
||||
if not node.started:
|
||||
await sleepAsync(ExtendedKademliaDiscoveryStartupDelay)
|
||||
continue
|
||||
|
||||
var records: seq[ExtendedPeerRecord]
|
||||
try:
|
||||
records = await node.wakuKademlia.randomRecords()
|
||||
except CatchableError:
|
||||
warn "extended kademlia discovery failed", error = getCurrentExceptionMsg()
|
||||
await sleepAsync(interval)
|
||||
continue
|
||||
|
||||
debug "received random records from kademlia", numRecords = records.len
|
||||
|
||||
var added = 0
|
||||
for record in records:
|
||||
let peerOpt = remotePeerInfoFrom(record)
|
||||
if peerOpt.isNone():
|
||||
continue
|
||||
|
||||
let peerInfo = peerOpt.get()
|
||||
node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
debug "peer added via extended kademlia discovery",
|
||||
peerId = $peerInfo.peerId,
|
||||
addresses = peerInfo.addrs.mapIt($it),
|
||||
protocols = peerInfo.protocols,
|
||||
hasMixPubKey = peerInfo.mixPubKey.isSome()
|
||||
added.inc()
|
||||
|
||||
if added > 0:
|
||||
info "added peers from extended kademlia discovery", count = added
|
||||
|
||||
# Targeted mix peer lookup when pool is low
|
||||
if minMixPeers > 0 and node.getMixNodePoolSize() < minMixPeers:
|
||||
debug "mix node pool below threshold, performing targeted lookup",
|
||||
currentPoolSize = node.getMixNodePoolSize(), threshold = minMixPeers
|
||||
let found = await node.lookupMixPeers()
|
||||
if found > 0:
|
||||
info "found mix peers via targeted kademlia lookup", count = found
|
||||
|
||||
await sleepAsync(interval)
|
||||
except CancelledError:
|
||||
debug "extended kademlia discovery loop cancelled"
|
||||
except CatchableError as e:
|
||||
error "extended kademlia discovery loop failed", error = e.msg
|
||||
|
||||
proc startExtendedKademliaDiscoveryLoop*(
|
||||
node: WakuNode,
|
||||
interval = DefaultExtendedKademliaDiscoveryInterval,
|
||||
minMixPeers: int = 0,
|
||||
) =
|
||||
if node.wakuKademlia.isNil():
|
||||
trace "extended kademlia discovery not started: protocol not mounted"
|
||||
return
|
||||
|
||||
if not node.kademliaDiscoveryLoop.isNil():
|
||||
trace "extended kademlia discovery loop already running"
|
||||
return
|
||||
|
||||
node.kademliaDiscoveryLoop =
|
||||
node.runExtendedKademliaDiscoveryLoop(interval, minMixPeers)
|
||||
@ -10,10 +10,12 @@ import
|
||||
./metrics_server_conf_builder,
|
||||
./rate_limit_conf_builder,
|
||||
./rln_relay_conf_builder,
|
||||
./mix_conf_builder
|
||||
./mix_conf_builder,
|
||||
./kademlia_discovery_conf_builder
|
||||
|
||||
export
|
||||
waku_conf_builder, filter_service_conf_builder, store_sync_conf_builder,
|
||||
store_service_conf_builder, rest_server_conf_builder, dns_discovery_conf_builder,
|
||||
discv5_conf_builder, web_socket_conf_builder, metrics_server_conf_builder,
|
||||
rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder
|
||||
rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder,
|
||||
kademlia_discovery_conf_builder
|
||||
|
||||
@ -0,0 +1,62 @@
|
||||
import chronicles, std/options, results
|
||||
import libp2p/[peerid, multiaddress, multicodec]
|
||||
import ../waku_conf
|
||||
|
||||
logScope:
|
||||
topics = "waku conf builder kademlia discovery"
|
||||
|
||||
#######################################
|
||||
## Kademlia Discovery Config Builder ##
|
||||
#######################################
|
||||
type KademliaDiscoveryConfBuilder* = object
|
||||
enabled*: Option[bool]
|
||||
bootstrapNodes*: seq[string]
|
||||
|
||||
proc init*(T: type KademliaDiscoveryConfBuilder): KademliaDiscoveryConfBuilder =
|
||||
KademliaDiscoveryConfBuilder()
|
||||
|
||||
proc withEnabled*(b: var KademliaDiscoveryConfBuilder, enabled: bool) =
|
||||
b.enabled = some(enabled)
|
||||
|
||||
proc withBootstrapNodes*(
|
||||
b: var KademliaDiscoveryConfBuilder, bootstrapNodes: seq[string]
|
||||
) =
|
||||
b.bootstrapNodes = bootstrapNodes
|
||||
|
||||
proc parseBootstrapNode(
|
||||
multiAddrStr: string
|
||||
): Result[(PeerId, seq[MultiAddress]), string] =
|
||||
## Parse a multiaddr string that includes /p2p/<peerID> into (PeerId, seq[MultiAddress])
|
||||
let multiAddr = MultiAddress.init(multiAddrStr).valueOr:
|
||||
return err("Invalid multiaddress: " & multiAddrStr)
|
||||
|
||||
let peerIdPart = multiAddr.getPart(multiCodec("p2p")).valueOr:
|
||||
return err("Multiaddress must include /p2p/<peerID>: " & multiAddrStr)
|
||||
|
||||
let peerIdBytes = peerIdPart.protoArgument().valueOr:
|
||||
return err("Failed to extract peer ID from multiaddress: " & multiAddrStr)
|
||||
|
||||
let peerId = PeerId.init(peerIdBytes).valueOr:
|
||||
return err("Invalid peer ID in multiaddress: " & multiAddrStr)
|
||||
|
||||
# Get the address without the p2p part for connecting
|
||||
let addrWithoutP2p = multiAddr[0 ..^ 2].valueOr:
|
||||
return err("Failed to strip /p2p/ part from multiaddress: " & multiAddrStr)
|
||||
|
||||
ok((peerId, @[addrWithoutP2p]))
|
||||
|
||||
proc build*(
|
||||
b: KademliaDiscoveryConfBuilder
|
||||
): Result[Option[KademliaDiscoveryConf], string] =
|
||||
# Kademlia is enabled if explicitly enabled OR if bootstrap nodes are provided
|
||||
let enabled = b.enabled.get(false) or b.bootstrapNodes.len > 0
|
||||
if not enabled:
|
||||
return ok(none(KademliaDiscoveryConf))
|
||||
|
||||
var parsedNodes: seq[(PeerId, seq[MultiAddress])]
|
||||
for nodeStr in b.bootstrapNodes:
|
||||
let parsed = parseBootstrapNode(nodeStr).valueOr:
|
||||
return err("Failed to parse kademlia bootstrap node: " & error)
|
||||
parsedNodes.add(parsed)
|
||||
|
||||
return ok(some(KademliaDiscoveryConf(bootstrapNodes: parsedNodes)))
|
||||
@ -25,7 +25,8 @@ import
|
||||
./metrics_server_conf_builder,
|
||||
./rate_limit_conf_builder,
|
||||
./rln_relay_conf_builder,
|
||||
./mix_conf_builder
|
||||
./mix_conf_builder,
|
||||
./kademlia_discovery_conf_builder
|
||||
|
||||
logScope:
|
||||
topics = "waku conf builder"
|
||||
@ -80,6 +81,7 @@ type WakuConfBuilder* = object
|
||||
mixConf*: MixConfBuilder
|
||||
webSocketConf*: WebSocketConfBuilder
|
||||
rateLimitConf*: RateLimitConfBuilder
|
||||
kademliaDiscoveryConf*: KademliaDiscoveryConfBuilder
|
||||
# End conf builders
|
||||
relay: Option[bool]
|
||||
lightPush: Option[bool]
|
||||
@ -140,6 +142,7 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder =
|
||||
storeServiceConf: StoreServiceConfBuilder.init(),
|
||||
webSocketConf: WebSocketConfBuilder.init(),
|
||||
rateLimitConf: RateLimitConfBuilder.init(),
|
||||
kademliaDiscoveryConf: KademliaDiscoveryConfBuilder.init(),
|
||||
)
|
||||
|
||||
proc withNetworkConf*(b: var WakuConfBuilder, networkConf: NetworkConf) =
|
||||
@ -506,6 +509,9 @@ proc build*(
|
||||
let rateLimit = builder.rateLimitConf.build().valueOr:
|
||||
return err("Rate limits Conf building failed: " & $error)
|
||||
|
||||
let kademliaDiscoveryConf = builder.kademliaDiscoveryConf.build().valueOr:
|
||||
return err("Kademlia Discovery Conf building failed: " & $error)
|
||||
|
||||
# End - Build sub-configs
|
||||
|
||||
let logLevel =
|
||||
@ -628,6 +634,7 @@ proc build*(
|
||||
restServerConf: restServerConf,
|
||||
dnsDiscoveryConf: dnsDiscoveryConf,
|
||||
mixConf: mixConf,
|
||||
kademliaDiscoveryConf: kademliaDiscoveryConf,
|
||||
# end confs
|
||||
nodeKey: nodeKey,
|
||||
clusterId: clusterId,
|
||||
|
||||
@ -6,7 +6,8 @@ import
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/connectivity/relay/relay,
|
||||
libp2p/nameresolving/dnsresolver,
|
||||
libp2p/crypto/crypto
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519
|
||||
|
||||
import
|
||||
./internal_config,
|
||||
@ -32,6 +33,7 @@ import
|
||||
../waku_store_legacy/common as legacy_common,
|
||||
../waku_filter_v2,
|
||||
../waku_peer_exchange,
|
||||
../discovery/waku_ext_kademlia,
|
||||
../node/peer_manager,
|
||||
../node/peer_manager/peer_store/waku_peer_storage,
|
||||
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
||||
@ -165,12 +167,29 @@ proc setupProtocols(
|
||||
|
||||
#mount mix
|
||||
if conf.mixConf.isSome():
|
||||
let mixConf = conf.mixConf.get()
|
||||
(await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr:
|
||||
return err("failed to mount waku mix protocol: " & $error)
|
||||
|
||||
# Setup extended kademlia discovery
|
||||
if conf.kademliaDiscoveryConf.isSome():
|
||||
let mixPubKey =
|
||||
if conf.mixConf.isSome():
|
||||
some(conf.mixConf.get().mixPubKey)
|
||||
else:
|
||||
none(Curve25519Key)
|
||||
|
||||
(
|
||||
await node.mountMix(
|
||||
conf.clusterId, conf.mixConf.get().mixKey, conf.mixConf.get().mixnodes
|
||||
await setupExtendedKademliaDiscovery(
|
||||
node,
|
||||
ExtendedKademliaDiscoveryParams(
|
||||
bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes,
|
||||
mixPubKey: mixPubKey,
|
||||
advertiseMix: conf.mixConf.isSome(),
|
||||
),
|
||||
)
|
||||
).isOkOr:
|
||||
return err("failed to mount waku mix protocol: " & $error)
|
||||
return err("failed to setup kademlia discovery: " & error)
|
||||
|
||||
if conf.storeServiceConf.isSome():
|
||||
let storeServiceConf = conf.storeServiceConf.get()
|
||||
@ -477,6 +496,9 @@ proc startNode*(
|
||||
if conf.relay:
|
||||
node.peerManager.start()
|
||||
|
||||
let minMixPeers = if conf.mixConf.isSome(): 4 else: 0
|
||||
startExtendedKademliaDiscoveryLoop(node, minMixPeers = minMixPeers)
|
||||
|
||||
return ok()
|
||||
|
||||
proc setupNode*(
|
||||
|
||||
@ -192,6 +192,9 @@ proc new*(
|
||||
else:
|
||||
nil
|
||||
|
||||
# Set the extMultiAddrsOnly flag so the node knows not to replace explicit addresses
|
||||
node.extMultiAddrsOnly = wakuConf.endpointConf.extMultiAddrsOnly
|
||||
|
||||
node.setupAppCallbacks(wakuConf, appCallbacks).isOkOr:
|
||||
error "Failed setting up app callbacks", error = error
|
||||
return err("Failed setting up app callbacks: " & $error)
|
||||
|
||||
@ -4,6 +4,7 @@ import
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/multiaddress,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/peerid,
|
||||
secp256k1,
|
||||
results
|
||||
|
||||
@ -51,6 +52,10 @@ type MixConf* = ref object
|
||||
mixPubKey*: Curve25519Key
|
||||
mixnodes*: seq[MixNodePubInfo]
|
||||
|
||||
type KademliaDiscoveryConf* = object
|
||||
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
|
||||
## Bootstrap nodes for extended kademlia discovery.
|
||||
|
||||
type StoreServiceConf* {.requiresInit.} = object
|
||||
dbMigration*: bool
|
||||
dbURl*: string
|
||||
@ -109,6 +114,7 @@ type WakuConf* {.requiresInit.} = ref object
|
||||
metricsServerConf*: Option[MetricsServerConf]
|
||||
webSocketConf*: Option[WebSocketConf]
|
||||
mixConf*: Option[MixConf]
|
||||
kademliaDiscoveryConf*: Option[KademliaDiscoveryConf]
|
||||
|
||||
portsShift*: uint16
|
||||
dnsAddrsNameServers*: seq[IpAddress]
|
||||
|
||||
@ -247,11 +247,14 @@ proc lightpushPublish*(
|
||||
return lighpushErrorResult(
|
||||
LightPushErrorCode.SERVICE_NOT_AVAILABLE, "Waku lightpush not available"
|
||||
)
|
||||
if mixify and node.wakuMix.isNil():
|
||||
error "failed to publish message using mix as mix protocol is not mounted"
|
||||
var lmixify = mixify
|
||||
if not node.wakuMix.isNil():
|
||||
lmixify = true
|
||||
|
||||
#[ error "failed to publish message using mix as mix protocol is not mounted"
|
||||
return lighpushErrorResult(
|
||||
LightPushErrorCode.SERVICE_NOT_AVAILABLE, "Waku lightpush with mix not available"
|
||||
)
|
||||
) ]#
|
||||
let toPeer: RemotePeerInfo = peerOpt.valueOr:
|
||||
if not node.wakuLightPush.isNil():
|
||||
RemotePeerInfo.init(node.peerId())
|
||||
@ -281,4 +284,4 @@ proc lightpushPublish*(
|
||||
error "lightpush publish error", error = msg
|
||||
return lighpushErrorResult(LightPushErrorCode.INTERNAL_SERVER_ERROR, msg)
|
||||
|
||||
return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer, mixify)
|
||||
return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer, lmixify)
|
||||
|
||||
@ -43,9 +43,6 @@ type
|
||||
# Keeps track of peer shards
|
||||
ShardBook* = ref object of PeerBook[seq[uint16]]
|
||||
|
||||
# Keeps track of Mix protocol public keys of peers
|
||||
MixPubKeyBook* = ref object of PeerBook[Curve25519Key]
|
||||
|
||||
proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo =
|
||||
let addresses =
|
||||
if peerStore[LastSeenBook][peerId].isSome():
|
||||
@ -85,7 +82,7 @@ proc delete*(peerStore: PeerStore, peerId: PeerId) =
|
||||
|
||||
proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] =
|
||||
let allKeys = concat(
|
||||
toSeq(peerStore[LastSeenBook].book.keys()),
|
||||
toSeq(peerStore[LastSeenOutboundBook].book.keys()),
|
||||
toSeq(peerStore[AddressBook].book.keys()),
|
||||
toSeq(peerStore[ProtoBook].book.keys()),
|
||||
toSeq(peerStore[KeyBook].book.keys()),
|
||||
|
||||
@ -17,6 +17,7 @@ import
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/protocols/kad_disco,
|
||||
libp2p/builders,
|
||||
libp2p/transports/transport,
|
||||
libp2p/transports/tcptransport,
|
||||
@ -131,10 +132,13 @@ type
|
||||
wakuRendezvous*: WakuRendezVous
|
||||
wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient
|
||||
announcedAddresses*: seq[MultiAddress]
|
||||
extMultiAddrsOnly*: bool # When true, skip automatic IP address replacement
|
||||
started*: bool # Indicates that node has started listening
|
||||
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
||||
rateLimitSettings*: ProtocolRateLimitSettings
|
||||
wakuMix*: WakuMix
|
||||
wakuKademlia*: KademliaDiscovery
|
||||
kademliaDiscoveryLoop*: Future[void]
|
||||
|
||||
proc deduceRelayShard(
|
||||
node: WakuNode,
|
||||
@ -281,7 +285,7 @@ proc mountAutoSharding*(
|
||||
return ok()
|
||||
|
||||
proc getMixNodePoolSize*(node: WakuNode): int =
|
||||
return node.wakuMix.getNodePoolSize()
|
||||
return node.wakuMix.nodePool.len
|
||||
|
||||
proc mountMix*(
|
||||
node: WakuNode,
|
||||
@ -429,6 +433,11 @@ proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool =
|
||||
return false
|
||||
|
||||
proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] =
|
||||
# Skip automatic IP replacement if extMultiAddrsOnly is set
|
||||
# This respects the user's explicitly configured announced addresses
|
||||
if node.extMultiAddrsOnly:
|
||||
return ok()
|
||||
|
||||
let peerInfo = node.switch.peerInfo
|
||||
var announcedStr = ""
|
||||
var listenStr = ""
|
||||
@ -574,6 +583,11 @@ proc stop*(node: WakuNode) {.async.} =
|
||||
if not node.wakuPeerExchangeClient.isNil() and
|
||||
not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
|
||||
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()
|
||||
node.wakuPeerExchangeClient.pxLoopHandle = nil
|
||||
|
||||
if not node.kademliaDiscoveryLoop.isNil():
|
||||
await node.kademliaDiscoveryLoop.cancelAndWait()
|
||||
node.kademliaDiscoveryLoop = nil
|
||||
|
||||
if not node.wakuRendezvous.isNil():
|
||||
await node.wakuRendezvous.stopWait()
|
||||
|
||||
@ -38,6 +38,7 @@ type
|
||||
Static
|
||||
PeerExchange
|
||||
Dns
|
||||
Kademlia
|
||||
|
||||
PeerDirection* = enum
|
||||
UnknownDirection
|
||||
|
||||
@ -1,22 +1,20 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import chronicles, std/[options, tables, sequtils], chronos, results, metrics, strutils
|
||||
import chronicles, std/options, chronos, results, metrics
|
||||
|
||||
import
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/mix,
|
||||
libp2p/protocols/mix/mix_node,
|
||||
libp2p/protocols/mix/mix_protocol,
|
||||
libp2p/protocols/mix/mix_metrics,
|
||||
libp2p/[multiaddress, multicodec, peerid],
|
||||
libp2p/protocols/mix/delay_strategy,
|
||||
libp2p/[multiaddress, peerid],
|
||||
eth/common/keys
|
||||
|
||||
import
|
||||
../node/peer_manager,
|
||||
../waku_core,
|
||||
../waku_enr,
|
||||
../node/peer_manager/waku_peer_store,
|
||||
../common/nimchronos
|
||||
../node/peer_manager, ../waku_core, ../waku_enr, ../node/peer_manager/waku_peer_store
|
||||
|
||||
logScope:
|
||||
topics = "waku mix"
|
||||
@ -27,7 +25,6 @@ type
|
||||
WakuMix* = ref object of MixProtocol
|
||||
peerManager*: PeerManager
|
||||
clusterId: uint16
|
||||
nodePoolLoopHandle: Future[void]
|
||||
pubKey*: Curve25519Key
|
||||
|
||||
WakuMixResult*[T] = Result[T, string]
|
||||
@ -36,106 +33,10 @@ type
|
||||
multiAddr*: string
|
||||
pubKey*: Curve25519Key
|
||||
|
||||
proc filterMixNodes(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
||||
# Note that origin based(discv5) filtering is not done intentionally
|
||||
# so that more mix nodes can be discovered.
|
||||
if peer.mixPubKey.isNone():
|
||||
trace "remote peer has no mix Pub Key", peer = $peer
|
||||
return false
|
||||
|
||||
if cluster.isSome() and peer.enr.isSome() and
|
||||
peer.enr.get().isClusterMismatched(cluster.get()):
|
||||
trace "peer has mismatching cluster", peer = $peer
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress =
|
||||
if multiaddr.contains(multiCodec("p2p")).get():
|
||||
return multiaddr
|
||||
|
||||
var maddrStr = multiaddr.toString().valueOr:
|
||||
error "Failed to convert multiaddress to string.", err = error
|
||||
return multiaddr
|
||||
maddrStr.add("/p2p/" & $peerId)
|
||||
var cleanAddr = MultiAddress.init(maddrStr).valueOr:
|
||||
error "Failed to convert string to multiaddress.", err = error
|
||||
return multiaddr
|
||||
return cleanAddr
|
||||
|
||||
func getIPv4Multiaddr*(maddrs: seq[MultiAddress]): Option[MultiAddress] =
|
||||
for multiaddr in maddrs:
|
||||
trace "checking multiaddr", addr = $multiaddr
|
||||
if multiaddr.contains(multiCodec("ip4")).get():
|
||||
trace "found ipv4 multiaddr", addr = $multiaddr
|
||||
return some(multiaddr)
|
||||
trace "no ipv4 multiaddr found"
|
||||
return none(MultiAddress)
|
||||
|
||||
proc populateMixNodePool*(mix: WakuMix) =
|
||||
# populate only peers that i) are reachable ii) share cluster iii) support mix
|
||||
let remotePeers = mix.peerManager.switch.peerStore.peers().filterIt(
|
||||
filterMixNodes(some(mix.clusterId), it)
|
||||
)
|
||||
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||
|
||||
for i in 0 ..< min(remotePeers.len, 100):
|
||||
let ipv4addr = getIPv4Multiaddr(remotePeers[i].addrs).valueOr:
|
||||
trace "peer has no ipv4 address", peer = $remotePeers[i]
|
||||
continue
|
||||
let maddrWithPeerId = appendPeerIdToMultiaddr(ipv4addr, remotePeers[i].peerId)
|
||||
trace "remote peer info", info = remotePeers[i]
|
||||
|
||||
if remotePeers[i].mixPubKey.isNone():
|
||||
trace "peer has no mix Pub Key", remotePeerId = $remotePeers[i]
|
||||
continue
|
||||
|
||||
let peerMixPubKey = remotePeers[i].mixPubKey.get()
|
||||
var peerPubKey: crypto.PublicKey
|
||||
if not remotePeers[i].peerId.extractPublicKey(peerPubKey):
|
||||
warn "Failed to extract public key from peerId, skipping node",
|
||||
remotePeerId = remotePeers[i].peerId
|
||||
continue
|
||||
|
||||
if peerPubKey.scheme != PKScheme.Secp256k1:
|
||||
warn "Peer public key is not Secp256k1, skipping node",
|
||||
remotePeerId = remotePeers[i].peerId, scheme = peerPubKey.scheme
|
||||
continue
|
||||
|
||||
let mixNodePubInfo = MixPubInfo.init(
|
||||
remotePeers[i].peerId,
|
||||
ipv4addr,
|
||||
intoCurve25519Key(peerMixPubKey),
|
||||
peerPubKey.skkey,
|
||||
)
|
||||
trace "adding mix node to pool",
|
||||
remotePeerId = remotePeers[i].peerId, multiAddr = $ipv4addr
|
||||
mixNodes[remotePeers[i].peerId] = mixNodePubInfo
|
||||
|
||||
# set the mix node pool
|
||||
mix.setNodePool(mixNodes)
|
||||
mix_pool_size.set(len(mixNodes))
|
||||
trace "mix node pool updated", poolSize = mix.getNodePoolSize()
|
||||
|
||||
# Once mix protocol starts to use info from PeerStore, then this can be removed.
|
||||
proc startMixNodePoolMgr*(mix: WakuMix) {.async.} =
|
||||
info "starting mix node pool manager"
|
||||
# try more aggressively to populate the pool at startup
|
||||
var attempts = 50
|
||||
# TODO: make initial pool size configurable
|
||||
while mix.getNodePoolSize() < 100 and attempts > 0:
|
||||
attempts -= 1
|
||||
mix.populateMixNodePool()
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
# TODO: make interval configurable
|
||||
heartbeat "Updating mix node pool", 5.seconds:
|
||||
mix.populateMixNodePool()
|
||||
|
||||
proc processBootNodes(
|
||||
bootnodes: seq[MixNodePubInfo], peermgr: PeerManager
|
||||
): Table[PeerId, MixPubInfo] =
|
||||
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||
bootnodes: seq[MixNodePubInfo], peermgr: PeerManager, mix: WakuMix
|
||||
) =
|
||||
var count = 0
|
||||
for node in bootnodes:
|
||||
let pInfo = parsePeerInfo(node.multiAddr).valueOr:
|
||||
error "Failed to get peer id from multiaddress: ",
|
||||
@ -156,14 +57,15 @@ proc processBootNodes(
|
||||
error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error
|
||||
continue
|
||||
|
||||
mixNodes[peerId] = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey)
|
||||
let mixPubInfo = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey)
|
||||
mix.nodePool.add(mixPubInfo)
|
||||
count += 1
|
||||
|
||||
peermgr.addPeer(
|
||||
RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey))
|
||||
)
|
||||
mix_pool_size.set(len(mixNodes))
|
||||
info "using mix bootstrap nodes ", bootNodes = mixNodes
|
||||
return mixNodes
|
||||
mix_pool_size.set(count)
|
||||
info "using mix bootstrap nodes ", count = count
|
||||
|
||||
proc new*(
|
||||
T: type WakuMix,
|
||||
@ -183,22 +85,25 @@ proc new*(
|
||||
)
|
||||
if bootnodes.len < minMixPoolSize:
|
||||
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
|
||||
let initTable = processBootNodes(bootnodes, peermgr)
|
||||
|
||||
if len(initTable) < minMixPoolSize:
|
||||
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
|
||||
var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey)
|
||||
procCall MixProtocol(m).init(localMixNodeInfo, initTable, peermgr.switch)
|
||||
procCall MixProtocol(m).init(
|
||||
localMixNodeInfo,
|
||||
peermgr.switch,
|
||||
delayStrategy =
|
||||
ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()),
|
||||
)
|
||||
|
||||
processBootNodes(bootnodes, peermgr, m)
|
||||
|
||||
if m.nodePool.len < minMixPoolSize:
|
||||
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
|
||||
return ok(m)
|
||||
|
||||
method start*(mix: WakuMix) =
|
||||
info "starting waku mix protocol"
|
||||
mix.nodePoolLoopHandle = mix.startMixNodePoolMgr()
|
||||
|
||||
method stop*(mix: WakuMix) {.async.} =
|
||||
if mix.nodePoolLoopHandle.isNil():
|
||||
return
|
||||
await mix.nodePoolLoopHandle.cancelAndWait()
|
||||
mix.nodePoolLoopHandle = nil
|
||||
discard
|
||||
|
||||
# Mix Protocol
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user