mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
feat: waku rendezvous wrapper (#2962)
This commit is contained in:
parent
b5edf6db98
commit
ae013e1928
@ -17,7 +17,7 @@ suite "Node Factory":
|
||||
node.wakuStore.isNil()
|
||||
node.wakuFilter.isNil()
|
||||
not node.wakuStoreClient.isNil()
|
||||
not node.rendezvous.isNil()
|
||||
not node.wakuRendezvous.isNil()
|
||||
|
||||
test "Set up a node with Store enabled":
|
||||
var conf = defaultTestWakuNodeConf()
|
||||
|
||||
@ -1,51 +1,63 @@
|
||||
{.used.}
|
||||
|
||||
import chronos, testutils/unittests, libp2p/builders, libp2p/protocols/rendezvous
|
||||
import std/options, chronos, testutils/unittests, libp2p/builders
|
||||
|
||||
import waku/node/waku_switch, ./testlib/common, ./testlib/wakucore
|
||||
|
||||
proc newRendezvousClientSwitch(rdv: RendezVous): Switch =
|
||||
SwitchBuilder
|
||||
.new()
|
||||
.withRng(rng())
|
||||
.withAddresses(@[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()])
|
||||
.withTcpTransport()
|
||||
.withMplex()
|
||||
.withNoise()
|
||||
.withRendezVous(rdv)
|
||||
.build()
|
||||
import
|
||||
waku/waku_core/peers,
|
||||
waku/node/waku_node,
|
||||
waku/node/peer_manager/peer_manager,
|
||||
waku/waku_rendezvous/protocol,
|
||||
./testlib/[wakucore, wakunode]
|
||||
|
||||
procSuite "Waku Rendezvous":
|
||||
asyncTest "Waku Switch uses Rendezvous":
|
||||
## Setup
|
||||
|
||||
asyncTest "Simple remote test":
|
||||
let
|
||||
wakuClient = RendezVous.new()
|
||||
sourceClient = RendezVous.new()
|
||||
destClient = RendezVous.new()
|
||||
wakuSwitch = newRendezvousClientSwitch(wakuClient) #rendezvous point
|
||||
sourceSwitch = newRendezvousClientSwitch(sourceClient) #client
|
||||
destSwitch = newRendezvousClientSwitch(destClient) #client
|
||||
clusterId = 10.uint16
|
||||
node1 = newTestWakuNode(
|
||||
generateSecp256k1Key(),
|
||||
parseIpAddress("0.0.0.0"),
|
||||
Port(0),
|
||||
clusterId = clusterId,
|
||||
)
|
||||
node2 = newTestWakuNode(
|
||||
generateSecp256k1Key(),
|
||||
parseIpAddress("0.0.0.0"),
|
||||
Port(0),
|
||||
clusterId = clusterId,
|
||||
)
|
||||
node3 = newTestWakuNode(
|
||||
generateSecp256k1Key(),
|
||||
parseIpAddress("0.0.0.0"),
|
||||
Port(0),
|
||||
clusterId = clusterId,
|
||||
)
|
||||
|
||||
# Setup client rendezvous
|
||||
wakuClient.setup(wakuSwitch)
|
||||
sourceClient.setup(sourceSwitch)
|
||||
destClient.setup(destSwitch)
|
||||
await allFutures(
|
||||
[node1.mountRendezvous(), node2.mountRendezvous(), node3.mountRendezvous()]
|
||||
)
|
||||
await allFutures([node1.start(), node2.start(), node3.start()])
|
||||
|
||||
await allFutures(wakuSwitch.start(), sourceSwitch.start(), destSwitch.start())
|
||||
let peerInfo1 = node1.switch.peerInfo.toRemotePeerInfo()
|
||||
let peerInfo2 = node2.switch.peerInfo.toRemotePeerInfo()
|
||||
let peerInfo3 = node3.switch.peerInfo.toRemotePeerInfo()
|
||||
|
||||
# Connect clients to the rendezvous point
|
||||
await sourceSwitch.connect(wakuSwitch.peerInfo.peerId, wakuSwitch.peerInfo.addrs)
|
||||
await destSwitch.connect(wakuSwitch.peerInfo.peerId, wakuSwitch.peerInfo.addrs)
|
||||
node1.peerManager.addPeer(peerInfo2)
|
||||
node2.peerManager.addPeer(peerInfo1)
|
||||
node2.peerManager.addPeer(peerInfo3)
|
||||
node3.peerManager.addPeer(peerInfo2)
|
||||
|
||||
let res0 = await sourceClient.request("empty")
|
||||
check res0.len == 0
|
||||
let namespace = "test/name/space"
|
||||
|
||||
let res = await node1.wakuRendezvous.batchAdvertise(
|
||||
namespace, 60.seconds, @[peerInfo2.peerId]
|
||||
)
|
||||
assert res.isOk(), $res.error
|
||||
|
||||
let response =
|
||||
await node3.wakuRendezvous.batchRequest(namespace, 1, @[peerInfo2.peerId])
|
||||
assert response.isOk(), $response.error
|
||||
let records = response.get()
|
||||
|
||||
# Check that source client gets peer info of dest client from rendezvous point
|
||||
await sourceClient.advertise("foo")
|
||||
let res1 = await destClient.request("foo")
|
||||
check:
|
||||
res1.len == 1
|
||||
res1[0] == sourceSwitch.peerInfo.signedPeerRecord.data
|
||||
|
||||
await allFutures(wakuSwitch.stop(), sourceSwitch.stop(), destSwitch.stop())
|
||||
records.len == 1
|
||||
records[0].peerId == peerInfo1.peerId
|
||||
|
||||
@ -40,6 +40,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
|
||||
clusterId: DefaultClusterId,
|
||||
shards: @[DefaultShardId],
|
||||
relay: true,
|
||||
rendezvous: true,
|
||||
storeMessageDbUrl: "sqlite://store.sqlite3",
|
||||
)
|
||||
|
||||
|
||||
@ -62,7 +62,7 @@ suite "Wakunode2 - Waku initialization":
|
||||
node.wakuArchive.isNil()
|
||||
node.wakuStore.isNil()
|
||||
not node.wakuStoreClient.isNil()
|
||||
not node.rendezvous.isNil()
|
||||
not node.wakuRendezvous.isNil()
|
||||
|
||||
## Cleanup
|
||||
waitFor waku.stop()
|
||||
@ -92,7 +92,7 @@ suite "Wakunode2 - Waku initialization":
|
||||
node.wakuArchive.isNil()
|
||||
node.wakuStore.isNil()
|
||||
not node.wakuStoreClient.isNil()
|
||||
not node.rendezvous.isNil()
|
||||
not node.wakuRendezvous.isNil()
|
||||
|
||||
# DS structures are updated with dynamic ports
|
||||
typedNodeEnr.get().tcp.get() != 0
|
||||
|
||||
@ -647,6 +647,13 @@ with the drawback of consuming some more bandwidth.""",
|
||||
name: "peer-exchange-node"
|
||||
.}: string
|
||||
|
||||
## Rendez vous
|
||||
rendezvous* {.
|
||||
desc: "Enable waku rendezvous discovery server",
|
||||
defaultValue: true,
|
||||
name: "rendezvous"
|
||||
.}: bool
|
||||
|
||||
## websocket config
|
||||
websocketSupport* {.
|
||||
desc: "Enable websocket: true|false",
|
||||
|
||||
@ -207,12 +207,9 @@ proc setupProtocols(
|
||||
protectedShard = shardKey.shard, publicKey = shardKey.key
|
||||
node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId)
|
||||
|
||||
# Enable Rendezvous Discovery protocol when Relay is enabled
|
||||
try:
|
||||
await mountRendezvous(node)
|
||||
except CatchableError:
|
||||
return
|
||||
err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg())
|
||||
# Only relay nodes should be rendezvous points.
|
||||
if conf.rendezvous:
|
||||
await node.mountRendezvous()
|
||||
|
||||
# Keepalive mounted on all nodes
|
||||
try:
|
||||
|
||||
@ -413,16 +413,6 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
if not waku[].deliveryMonitor.isNil():
|
||||
waku[].deliveryMonitor.startDeliveryMonitor()
|
||||
|
||||
## libp2p DiscoveryManager
|
||||
waku[].discoveryMngr = DiscoveryManager()
|
||||
waku[].discoveryMngr.add(
|
||||
RendezVousInterface.new(rdv = waku[].node.rendezvous, tta = 1.minutes)
|
||||
)
|
||||
if not isNil(waku[].node.wakuRelay):
|
||||
for topic in waku[].node.wakuRelay.getSubscribedTopics():
|
||||
debug "advertise rendezvous namespace", topic
|
||||
waku[].discoveryMngr.advertise(RdvNamespace(topic))
|
||||
|
||||
return ok()
|
||||
|
||||
# Waku shutdown
|
||||
|
||||
@ -17,7 +17,6 @@ import
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/protocols/connectivity/autonat/client,
|
||||
libp2p/protocols/connectivity/autonat/service,
|
||||
libp2p/protocols/rendezvous,
|
||||
libp2p/builders,
|
||||
libp2p/transports/transport,
|
||||
libp2p/transports/tcptransport,
|
||||
@ -39,6 +38,7 @@ import
|
||||
../waku_filter_v2/client as filter_client,
|
||||
../waku_filter_v2/subscriptions as filter_subscriptions,
|
||||
../waku_metadata,
|
||||
../waku_rendezvous/protocol,
|
||||
../waku_lightpush/client as lightpush_client,
|
||||
../waku_lightpush/common,
|
||||
../waku_lightpush/protocol,
|
||||
@ -110,7 +110,7 @@ type
|
||||
enr*: enr.Record
|
||||
libp2pPing*: Ping
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
rendezvous*: RendezVous
|
||||
wakuRendezvous*: WakuRendezVous
|
||||
announcedAddresses*: seq[MultiAddress]
|
||||
started*: bool # Indicates that node has started listening
|
||||
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
||||
@ -1217,22 +1217,16 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) =
|
||||
proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} =
|
||||
info "mounting rendezvous discovery protocol"
|
||||
|
||||
try:
|
||||
node.rendezvous = RendezVous.new(node.switch)
|
||||
except Exception as e:
|
||||
error "failed to create rendezvous", error = getCurrentExceptionMsg()
|
||||
node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr).valueOr:
|
||||
error "initializing waku rendezvous failed", error = error
|
||||
return
|
||||
|
||||
if node.started:
|
||||
try:
|
||||
await node.rendezvous.start()
|
||||
except CatchableError:
|
||||
error "failed to start rendezvous", error = getCurrentExceptionMsg()
|
||||
# Always start discovering peers at startup
|
||||
(await node.wakuRendezvous.initialRequestAll()).isOkOr:
|
||||
error "rendezvous failed initial requests", error = error
|
||||
|
||||
try:
|
||||
node.switch.mount(node.rendezvous)
|
||||
except LPError:
|
||||
error "failed to mount rendezvous", error = getCurrentExceptionMsg()
|
||||
if node.started:
|
||||
await node.wakuRendezvous.start()
|
||||
|
||||
proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool =
|
||||
let inputStr = $inputMultiAdd
|
||||
@ -1304,6 +1298,9 @@ proc start*(node: WakuNode) {.async.} =
|
||||
if not node.wakuStoreResume.isNil():
|
||||
await node.wakuStoreResume.start()
|
||||
|
||||
if not node.wakuRendezvous.isNil():
|
||||
await node.wakuRendezvous.start()
|
||||
|
||||
## The switch uses this mapper to update peer info addrs
|
||||
## with announced addrs after start
|
||||
let addressMapper = proc(
|
||||
@ -1346,6 +1343,9 @@ proc stop*(node: WakuNode) {.async.} =
|
||||
if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil():
|
||||
await node.wakuPeerExchange.pxLoopHandle.cancelAndWait()
|
||||
|
||||
if not node.wakuRendezvous.isNil():
|
||||
await node.wakuRendezvous.stopWait()
|
||||
|
||||
node.started = false
|
||||
|
||||
proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =
|
||||
|
||||
3
waku/waku_rendezvous.nim
Normal file
3
waku/waku_rendezvous.nim
Normal file
@ -0,0 +1,3 @@
|
||||
import ./waku_rendezvous/protocol
|
||||
|
||||
export protocol
|
||||
36
waku/waku_rendezvous/common.nim
Normal file
36
waku/waku_rendezvous/common.nim
Normal file
@ -0,0 +1,36 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/options, chronos
|
||||
|
||||
import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding
|
||||
|
||||
const DiscoverLimit* = 1000
|
||||
const DefaultRegistrationTTL* = 60.seconds
|
||||
const DefaultRegistrationInterval* = 10.seconds
|
||||
const PeersRequestedCount* = 12
|
||||
|
||||
proc computeNamespace*(clusterId: uint16, shard: uint16): string =
|
||||
var namespace = "rs/"
|
||||
|
||||
namespace &= $clusterId
|
||||
namespace &= '/'
|
||||
namespace &= $shard
|
||||
|
||||
return namespace
|
||||
|
||||
proc computeNamespace*(clusterId: uint16, shard: uint16, cap: Capabilities): string =
|
||||
var namespace = "rs/"
|
||||
|
||||
namespace &= $clusterId
|
||||
namespace &= '/'
|
||||
namespace &= $shard
|
||||
namespace &= '/'
|
||||
namespace &= $cap
|
||||
|
||||
return namespace
|
||||
|
||||
proc getRelayShards*(enr: enr.Record): Option[RelayShards] =
|
||||
let typedRecord = enr.toTyped().valueOr:
|
||||
return none(RelayShards)
|
||||
|
||||
return typedRecord.relaySharding()
|
||||
267
waku/waku_rendezvous/protocol.nim
Normal file
267
waku/waku_rendezvous/protocol.nim
Normal file
@ -0,0 +1,267 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[sugar, options],
|
||||
results,
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
libp2p/protocols/rendezvous,
|
||||
libp2p/switch,
|
||||
libp2p/utility
|
||||
|
||||
import
|
||||
../node/peer_manager,
|
||||
../common/enr,
|
||||
../waku_enr/capabilities,
|
||||
../waku_enr/sharding,
|
||||
../waku_core/peers,
|
||||
../waku_core/topics,
|
||||
./common
|
||||
|
||||
logScope:
|
||||
topics = "waku rendezvous"
|
||||
|
||||
declarePublicCounter rendezvousPeerFoundTotal,
|
||||
"total number of peers found via rendezvous"
|
||||
|
||||
type WakuRendezVous* = ref object
|
||||
rendezvous: Rendezvous
|
||||
peerManager: PeerManager
|
||||
|
||||
relayShard: RelayShards
|
||||
capabilities: seq[Capabilities]
|
||||
|
||||
periodicRegistrationFut: Future[void]
|
||||
|
||||
proc batchAdvertise*(
|
||||
self: WakuRendezVous,
|
||||
namespace: string,
|
||||
ttl: Duration = DefaultRegistrationTTL,
|
||||
peers: seq[PeerId],
|
||||
): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
## Register with all rendezvous peers under a namespace
|
||||
|
||||
# rendezvous.advertise except already opened connections
|
||||
# must dial first
|
||||
var futs = collect(newSeq):
|
||||
for peerId in peers:
|
||||
self.peerManager.dialPeer(peerId, RendezVousCodec)
|
||||
|
||||
let dialCatch = catch:
|
||||
await allFinished(futs)
|
||||
|
||||
if dialCatch.isErr():
|
||||
return err("batchAdvertise: " & dialCatch.error.msg)
|
||||
|
||||
futs = dialCatch.get()
|
||||
|
||||
let conns = collect(newSeq):
|
||||
for fut in futs:
|
||||
let catchable = catch:
|
||||
fut.read()
|
||||
|
||||
if catchable.isErr():
|
||||
error "rendezvous dial failed", error = catchable.error.msg
|
||||
continue
|
||||
|
||||
let connOpt = catchable.get()
|
||||
|
||||
let conn = connOpt.valueOr:
|
||||
continue
|
||||
|
||||
conn
|
||||
|
||||
let advertCatch = catch:
|
||||
await self.rendezvous.advertise(namespace, ttl, peers)
|
||||
|
||||
for conn in conns:
|
||||
await conn.close()
|
||||
|
||||
if advertCatch.isErr():
|
||||
return err("batchAdvertise: " & advertCatch.error.msg)
|
||||
|
||||
return ok()
|
||||
|
||||
proc batchRequest*(
|
||||
self: WakuRendezVous,
|
||||
namespace: string,
|
||||
count: int = DiscoverLimit,
|
||||
peers: seq[PeerId],
|
||||
): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} =
|
||||
## Request all records from all rendezvous peers matching a namespace
|
||||
|
||||
# rendezvous.request except already opened connections
|
||||
# must dial first
|
||||
var futs = collect(newSeq):
|
||||
for peerId in peers:
|
||||
self.peerManager.dialPeer(peerId, RendezVousCodec)
|
||||
|
||||
let dialCatch = catch:
|
||||
await allFinished(futs)
|
||||
|
||||
if dialCatch.isErr():
|
||||
return err("batchRequest: " & dialCatch.error.msg)
|
||||
|
||||
futs = dialCatch.get()
|
||||
|
||||
let conns = collect(newSeq):
|
||||
for fut in futs:
|
||||
let catchable = catch:
|
||||
fut.read()
|
||||
|
||||
if catchable.isErr():
|
||||
error "rendezvous dial failed", error = catchable.error.msg
|
||||
continue
|
||||
|
||||
let connOpt = catchable.get()
|
||||
|
||||
let conn = connOpt.valueOr:
|
||||
continue
|
||||
|
||||
conn
|
||||
|
||||
let reqCatch = catch:
|
||||
await self.rendezvous.request(namespace, count, peers)
|
||||
|
||||
for conn in conns:
|
||||
await conn.close()
|
||||
|
||||
if reqCatch.isErr():
|
||||
return err("batchRequest: " & reqCatch.error.msg)
|
||||
|
||||
return ok(reqCatch.get())
|
||||
|
||||
proc advertiseAll(
|
||||
self: WakuRendezVous
|
||||
): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
debug "waku rendezvous advertisements started"
|
||||
|
||||
let pubsubTopics = self.relayShard.topics()
|
||||
|
||||
let futs = collect(newSeq):
|
||||
for pubsubTopic in pubsubTopics:
|
||||
# Get a random RDV peer for that shard
|
||||
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
|
||||
error "could not get a peer supporting RendezVousCodec"
|
||||
continue
|
||||
|
||||
let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId)
|
||||
|
||||
# Advertise yourself on that peer
|
||||
self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId])
|
||||
|
||||
let catchable = catch:
|
||||
await allFinished(futs)
|
||||
|
||||
if catchable.isErr():
|
||||
return err(catchable.error.msg)
|
||||
|
||||
for fut in catchable.get():
|
||||
if fut.failed():
|
||||
error "rendezvous advertisement failed", error = fut.error.msg
|
||||
|
||||
debug "waku rendezvous advertisements finished"
|
||||
|
||||
return ok()
|
||||
|
||||
proc initialRequestAll*(
|
||||
self: WakuRendezVous
|
||||
): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
debug "waku rendezvous initial requests started"
|
||||
|
||||
let pubsubTopics = self.relayShard.topics()
|
||||
|
||||
let futs = collect(newSeq):
|
||||
for pubsubTopic in pubsubTopics:
|
||||
let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId)
|
||||
|
||||
# Get a random RDV peer for that shard
|
||||
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
|
||||
error "could not get a peer supporting RendezVousCodec"
|
||||
continue
|
||||
|
||||
# Ask for peer records for that shard
|
||||
self.batchRequest(namespace, PeersRequestedCount, @[rpi.peerId])
|
||||
|
||||
let catchable = catch:
|
||||
await allFinished(futs)
|
||||
|
||||
if catchable.isErr():
|
||||
return err(catchable.error.msg)
|
||||
|
||||
for fut in catchable.get():
|
||||
if fut.failed():
|
||||
error "rendezvous request failed", error = fut.error.msg
|
||||
elif fut.finished():
|
||||
let res = fut.value()
|
||||
|
||||
let records = res.valueOr:
|
||||
return err($res.error)
|
||||
|
||||
for record in records:
|
||||
rendezvousPeerFoundTotal.inc()
|
||||
self.peerManager.addPeer(record)
|
||||
|
||||
debug "waku rendezvous initial requests finished"
|
||||
|
||||
return ok()
|
||||
|
||||
proc periodicRegistration(self: WakuRendezVous) {.async.} =
|
||||
debug "waku rendezvous periodic registration started",
|
||||
interval = DefaultRegistrationInterval
|
||||
|
||||
# infinite loop
|
||||
while true:
|
||||
await sleepAsync(DefaultRegistrationInterval)
|
||||
|
||||
(await self.advertiseAll()).isOkOr:
|
||||
debug "waku rendezvous advertisements failed", error = error
|
||||
|
||||
proc new*(
|
||||
T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record
|
||||
): Result[T, string] {.raises: [].} =
|
||||
let relayshard = getRelayShards(enr).valueOr:
|
||||
warn "Using default cluster id 0"
|
||||
RelayShards(clusterID: 0, shardIds: @[])
|
||||
|
||||
let capabilities = enr.getCapabilities()
|
||||
|
||||
let rvCatchable = catch:
|
||||
RendezVous.new(switch = switch, minDuration = DefaultRegistrationTTL)
|
||||
|
||||
if rvCatchable.isErr():
|
||||
return err(rvCatchable.error.msg)
|
||||
|
||||
let rv = rvCatchable.get()
|
||||
|
||||
let mountCatchable = catch:
|
||||
switch.mount(rv)
|
||||
|
||||
if mountCatchable.isErr():
|
||||
return err(mountCatchable.error.msg)
|
||||
|
||||
var wrv = WakuRendezVous()
|
||||
wrv.rendezvous = rv
|
||||
wrv.peerManager = peerManager
|
||||
wrv.relayshard = relayshard
|
||||
wrv.capabilities = capabilities
|
||||
|
||||
debug "waku rendezvous initialized",
|
||||
cluster = relayshard.clusterId,
|
||||
shards = relayshard.shardIds,
|
||||
capabilities = capabilities
|
||||
|
||||
return ok(wrv)
|
||||
|
||||
proc start*(self: WakuRendezVous) {.async: (raises: []).} =
|
||||
# start registering forever
|
||||
self.periodicRegistrationFut = self.periodicRegistration()
|
||||
|
||||
debug "waku rendezvous discovery started"
|
||||
|
||||
proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} =
|
||||
if not self.periodicRegistrationFut.isNil():
|
||||
await self.periodicRegistrationFut.cancelAndWait()
|
||||
|
||||
debug "waku rendezvous discovery stopped"
|
||||
Loading…
x
Reference in New Issue
Block a user