mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-04-14 04:03:20 +00:00
Merge c7b124d85e603e65135c57eff93b2d58e0547a4a into 549bf8bc4309aadb30530a181fa1933bced2c9a7
This commit is contained in:
commit
d060d88faf
2
.gitmodules
vendored
2
.gitmodules
vendored
@ -12,7 +12,7 @@
|
||||
path = vendor/nim-libp2p
|
||||
url = https://github.com/vacp2p/nim-libp2p.git
|
||||
ignore = dirty
|
||||
branch = master
|
||||
branch = feat--logos-capability-discovery
|
||||
[submodule "vendor/nim-stew"]
|
||||
path = vendor/nim-stew
|
||||
url = https://github.com/status-im/nim-stew.git
|
||||
|
||||
@ -31,6 +31,7 @@ import
|
||||
nameresolving/dnsresolver,
|
||||
protocols/mix/curve25519,
|
||||
protocols/mix/mix_protocol,
|
||||
extended_peer_record,
|
||||
] # define DNS resolution
|
||||
import
|
||||
waku/[
|
||||
@ -48,6 +49,7 @@ import
|
||||
waku_store/common,
|
||||
waku_filter_v2/client,
|
||||
common/logging,
|
||||
waku_mix,
|
||||
],
|
||||
./config_chat2mix
|
||||
|
||||
@ -57,7 +59,8 @@ import ../../waku/waku_rln_relay
|
||||
logScope:
|
||||
topics = "chat2 mix"
|
||||
|
||||
const Help = """
|
||||
const Help =
|
||||
"""
|
||||
Commands: /[?|help|connect|nick|exit]
|
||||
help: Prints this help
|
||||
connect: dials a remote peer
|
||||
@ -428,16 +431,16 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
builder.withRecord(record)
|
||||
|
||||
builder
|
||||
.withNetworkConfigurationDetails(
|
||||
conf.listenAddress,
|
||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||
extIp,
|
||||
extTcpPort,
|
||||
wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift),
|
||||
wsEnabled = conf.websocketSupport,
|
||||
wssEnabled = conf.websocketSecureSupport,
|
||||
)
|
||||
.tryGet()
|
||||
.withNetworkConfigurationDetails(
|
||||
conf.listenAddress,
|
||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||
extIp,
|
||||
extTcpPort,
|
||||
wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift),
|
||||
wsEnabled = conf.websocketSupport,
|
||||
wssEnabled = conf.websocketSecureSupport,
|
||||
)
|
||||
.tryGet()
|
||||
builder.build().tryGet()
|
||||
|
||||
node.mountAutoSharding(conf.clusterId, conf.numShardsInNetwork).isOkOr:
|
||||
@ -447,15 +450,20 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
error "failed to mount waku metadata protocol: ", err = error
|
||||
quit(QuitFailure)
|
||||
|
||||
var providedServices: seq[ServiceInfo] = @[]
|
||||
|
||||
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
|
||||
error "failed to generate mix key pair", error = error
|
||||
return
|
||||
|
||||
(await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr:
|
||||
let mixService = ServiceInfo(id: MixProtocolID, data: @(mixPubKey))
|
||||
providedServices.add(mixService)
|
||||
|
||||
(await node.mountMix(mixPrivKey)).isOkOr:
|
||||
error "failed to mount waku mix protocol: ", error = $error
|
||||
quit(QuitFailure)
|
||||
|
||||
# Setup extended kademlia discovery if bootstrap nodes are provided
|
||||
# Setup kademlia discovery if bootstrap nodes are provided
|
||||
if conf.kadBootstrapNodes.len > 0:
|
||||
var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])]
|
||||
for nodeStr in conf.kadBootstrapNodes:
|
||||
@ -466,23 +474,8 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
|
||||
if kadBootstrapPeers.len > 0:
|
||||
node.wakuKademlia = WakuKademlia.new(
|
||||
node.switch,
|
||||
ExtendedKademliaDiscoveryParams(
|
||||
bootstrapNodes: kadBootstrapPeers,
|
||||
mixPubKey: some(mixPubKey),
|
||||
advertiseMix: false,
|
||||
),
|
||||
node.peerManager,
|
||||
getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} =
|
||||
if node.wakuMix.isNil():
|
||||
0
|
||||
else:
|
||||
node.getMixNodePoolSize(),
|
||||
isNodeStarted = proc(): bool {.gcsafe, raises: [].} =
|
||||
node.started,
|
||||
).valueOr:
|
||||
error "failed to setup kademlia discovery", error = error
|
||||
quit(QuitFailure)
|
||||
node.switch, node.peerManager, kadBootstrapPeers, providedServices
|
||||
)
|
||||
|
||||
#await node.mountRendezvousClient(conf.clusterId)
|
||||
|
||||
@ -490,9 +483,11 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
|
||||
node.peerManager.start()
|
||||
if not node.wakuKademlia.isNil():
|
||||
(await node.wakuKademlia.start(minMixPeers = MinMixNodePoolSize)).isOkOr:
|
||||
error "failed to start kademlia discovery", error = error
|
||||
quit(QuitFailure)
|
||||
await node.wakuKademlia.start()
|
||||
|
||||
# Wire mix protocol with kademlia for peer discovery
|
||||
if not node.wakuMix.isNil() and not node.wakuKademlia.isNil():
|
||||
node.wakuMix.setKademlia(node.wakuKademlia)
|
||||
|
||||
await node.mountLibp2pPing()
|
||||
#await node.mountPeerExchangeClient()
|
||||
@ -645,8 +640,8 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
currentpoolSize = node.getMixNodePoolSize()
|
||||
echo "ready to publish messages now"
|
||||
|
||||
# Once min mixnodes are discovered loop as per default setting
|
||||
node.startPeerExchangeLoop()
|
||||
# Peer exchange disabled - using Kademlia discovery only
|
||||
# node.startPeerExchangeLoop()
|
||||
|
||||
if conf.metricsLogging:
|
||||
startMetricsLog()
|
||||
|
||||
@ -85,12 +85,6 @@ type
|
||||
defaultValue: @[]
|
||||
.}: seq[string]
|
||||
|
||||
mixnodes* {.
|
||||
desc:
|
||||
"Multiaddress and mix-key of mix node to be statically specified in format multiaddr:mixPubKey. Argument may be repeated.",
|
||||
name: "mixnode"
|
||||
.}: seq[MixNodePubInfo]
|
||||
|
||||
keepAlive* {.
|
||||
desc: "Enable keep-alive for idle connections: true|false",
|
||||
defaultValue: false,
|
||||
@ -236,23 +230,6 @@ type
|
||||
name: "kad-bootstrap-node"
|
||||
.}: seq[string]
|
||||
|
||||
proc parseCmdArg*(T: type MixNodePubInfo, p: string): T =
|
||||
let elements = p.split(":")
|
||||
if elements.len != 2:
|
||||
raise newException(
|
||||
ValueError, "Invalid format for mix node expected multiaddr:mixPublicKey"
|
||||
)
|
||||
let multiaddr = MultiAddress.init(elements[0]).valueOr:
|
||||
raise newException(ValueError, "Invalid multiaddress format")
|
||||
if not multiaddr.contains(multiCodec("ip4")).get():
|
||||
raise newException(
|
||||
ValueError, "Invalid format for ip address, expected a ipv4 multiaddress"
|
||||
)
|
||||
|
||||
return MixNodePubInfo(
|
||||
multiaddr: elements[0], pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1]))
|
||||
)
|
||||
|
||||
# NOTE: Keys are different in nim-libp2p
|
||||
proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T =
|
||||
try:
|
||||
|
||||
@ -95,6 +95,9 @@ if not defined(macosx) and not defined(android):
|
||||
nimStackTraceOverride
|
||||
switch("import", "libbacktrace")
|
||||
|
||||
# Shim to provide valueOr and withValue for Option[T]
|
||||
switch("import", "waku/common/option_shim")
|
||||
|
||||
--define:
|
||||
nimOldCaseObjects
|
||||
# https://github.com/status-im/nim-confutils/issues/9
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[sequtils, strutils, net],
|
||||
std/[options, sequtils, strutils, net],
|
||||
stew/byteutils,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[os, strutils, sequtils, sysrand, math],
|
||||
std/[options, os, strutils, sequtils, sysrand, math],
|
||||
stew/byteutils,
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/sequtils,
|
||||
std/[options, sequtils],
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
|
||||
@ -263,7 +263,8 @@ type WakuNodeConf* = object
|
||||
|
||||
## Circuit-relay config
|
||||
isRelayClient* {.
|
||||
desc: """Set the node as a relay-client.
|
||||
desc:
|
||||
"""Set the node as a relay-client.
|
||||
Set it to true for nodes that run behind a NAT or firewall and
|
||||
hence would have reachability issues.""",
|
||||
defaultValue: false,
|
||||
@ -631,12 +632,6 @@ with the drawback of consuming some more bandwidth.""",
|
||||
name: "mixkey"
|
||||
.}: Option[string]
|
||||
|
||||
mixnodes* {.
|
||||
desc:
|
||||
"Multiaddress and mix-key of mix node to be statically specified in format multiaddr:mixPubKey. Argument may be repeated.",
|
||||
name: "mixnode"
|
||||
.}: seq[MixNodePubInfo]
|
||||
|
||||
# Kademlia Discovery config
|
||||
enableKadDiscovery* {.
|
||||
desc:
|
||||
@ -730,22 +725,6 @@ proc isNumber(x: string): bool =
|
||||
except ValueError:
|
||||
result = false
|
||||
|
||||
proc parseCmdArg*(T: type MixNodePubInfo, p: string): T =
|
||||
let elements = p.split(":")
|
||||
if elements.len != 2:
|
||||
raise newException(
|
||||
ValueError, "Invalid format for mix node expected multiaddr:mixPublicKey"
|
||||
)
|
||||
let multiaddr = MultiAddress.init(elements[0]).valueOr:
|
||||
raise newException(ValueError, "Invalid multiaddress format")
|
||||
if not multiaddr.contains(multiCodec("ip4")).get():
|
||||
raise newException(
|
||||
ValueError, "Invalid format for ip address, expected a ipv4 multiaddress"
|
||||
)
|
||||
return MixNodePubInfo(
|
||||
multiaddr: elements[0], pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1]))
|
||||
)
|
||||
|
||||
proc parseCmdArg*(T: type ProtectedShard, p: string): T =
|
||||
let elements = p.split(":")
|
||||
if elements.len != 2:
|
||||
@ -829,22 +808,6 @@ proc readValue*(
|
||||
except CatchableError:
|
||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
||||
|
||||
proc readValue*(
|
||||
r: var TomlReader, value: var MixNodePubInfo
|
||||
) {.raises: [SerializationError].} =
|
||||
try:
|
||||
value = parseCmdArg(MixNodePubInfo, r.readValue(string))
|
||||
except CatchableError:
|
||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
||||
|
||||
proc readValue*(
|
||||
r: var EnvvarReader, value: var MixNodePubInfo
|
||||
) {.raises: [SerializationError].} =
|
||||
try:
|
||||
value = parseCmdArg(MixNodePubInfo, r.readValue(string))
|
||||
except CatchableError:
|
||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
||||
|
||||
proc readValue*(
|
||||
r: var TomlReader, value: var ProtectedShard
|
||||
) {.raises: [SerializationError].} =
|
||||
@ -1062,7 +1025,6 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
|
||||
b.storeServiceConf.storeSyncConf.withRelayJitterSec(n.storeSyncRelayJitter)
|
||||
|
||||
b.mixConf.withEnabled(n.mix)
|
||||
b.mixConf.withMixNodes(n.mixnodes)
|
||||
b.withMix(n.mix)
|
||||
if n.mixkey.isSome():
|
||||
b.mixConf.withMixKey(n.mixkey.get())
|
||||
|
||||
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
||||
Subproject commit ff8d51857b4b79a68468e7bcc27b2026cca02996
|
||||
Subproject commit 1a71bc57036306a26ba58e7b57891f993ebb73bd
|
||||
26
waku/common/option_shim.nim
Normal file
26
waku/common/option_shim.nim
Normal file
@ -0,0 +1,26 @@
|
||||
# Shim to provide valueOr and withValue for Option[T]
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/options
|
||||
|
||||
template valueOr*[T](self: Option[T], def: untyped): T =
|
||||
let s = self
|
||||
if s.isSome():
|
||||
s.get()
|
||||
else:
|
||||
def
|
||||
|
||||
template withValue*[T](self: Option[T], value, body: untyped) =
|
||||
let s = self
|
||||
if s.isSome():
|
||||
let value {.inject.} = s.get()
|
||||
body
|
||||
|
||||
template withValue*[T](self: Option[T], value, body, elseStmt: untyped) =
|
||||
let s = self
|
||||
if s.isSome():
|
||||
let value {.inject.} = s.get()
|
||||
body
|
||||
else:
|
||||
elseStmt
|
||||
@ -5,114 +5,29 @@ import
|
||||
chronos,
|
||||
chronicles,
|
||||
results,
|
||||
stew/byteutils,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/mix/mix_protocol,
|
||||
libp2p/[peerid, multiaddress, switch],
|
||||
libp2p/extended_peer_record,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/protocols/[kademlia, kad_disco],
|
||||
libp2p/protocols/kademlia_discovery/types as kad_types,
|
||||
libp2p/protocols/mix/mix_protocol
|
||||
libp2p/protocols/service_discovery/types
|
||||
|
||||
import waku/waku_core, waku/node/peer_manager
|
||||
|
||||
logScope:
|
||||
topics = "waku extended kademlia discovery"
|
||||
topics = "waku kademlia"
|
||||
|
||||
const
|
||||
DefaultExtendedKademliaDiscoveryInterval* = chronos.seconds(5)
|
||||
ExtendedKademliaDiscoveryStartupDelay* = chronos.seconds(5)
|
||||
const DefaultKademliaDiscoveryInterval* = chronos.seconds(10)
|
||||
|
||||
type
|
||||
MixNodePoolSizeProvider* = proc(): int {.gcsafe, raises: [].}
|
||||
NodeStartedProvider* = proc(): bool {.gcsafe, raises: [].}
|
||||
type WakuKademlia* = ref object
|
||||
protocol*: KademliaDiscovery
|
||||
peerManager: PeerManager
|
||||
loopInterval: Duration
|
||||
walkIntervalFut: Future[void]
|
||||
|
||||
ExtendedKademliaDiscoveryParams* = object
|
||||
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
|
||||
mixPubKey*: Option[Curve25519Key]
|
||||
advertiseMix*: bool = false
|
||||
|
||||
WakuKademlia* = ref object
|
||||
protocol*: KademliaDiscovery
|
||||
peerManager: PeerManager
|
||||
discoveryLoop: Future[void]
|
||||
running*: bool
|
||||
getMixNodePoolSize: MixNodePoolSizeProvider
|
||||
isNodeStarted: NodeStartedProvider
|
||||
|
||||
proc new*(
|
||||
T: type WakuKademlia,
|
||||
switch: Switch,
|
||||
params: ExtendedKademliaDiscoveryParams,
|
||||
peerManager: PeerManager,
|
||||
getMixNodePoolSize: MixNodePoolSizeProvider = nil,
|
||||
isNodeStarted: NodeStartedProvider = nil,
|
||||
): Result[T, string] =
|
||||
if params.bootstrapNodes.len == 0:
|
||||
info "creating kademlia discovery as seed node (no bootstrap nodes)"
|
||||
|
||||
let kademlia = KademliaDiscovery.new(
|
||||
switch,
|
||||
bootstrapNodes = params.bootstrapNodes,
|
||||
config = KadDHTConfig.new(
|
||||
validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector()
|
||||
),
|
||||
codec = ExtendedKademliaDiscoveryCodec,
|
||||
)
|
||||
|
||||
try:
|
||||
switch.mount(kademlia)
|
||||
except CatchableError:
|
||||
return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg())
|
||||
|
||||
# Register services BEFORE starting kademlia so they are included in the
|
||||
# initial self-signed peer record published to the DHT
|
||||
if params.advertiseMix:
|
||||
if params.mixPubKey.isSome():
|
||||
let alreadyAdvertising = kademlia.startAdvertising(
|
||||
ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get()))
|
||||
)
|
||||
if alreadyAdvertising:
|
||||
warn "mix service was already being advertised"
|
||||
debug "extended kademlia advertising mix service",
|
||||
keyHex = byteutils.toHex(params.mixPubKey.get()),
|
||||
bootstrapNodes = params.bootstrapNodes.len
|
||||
else:
|
||||
warn "mix advertising enabled but no key provided"
|
||||
|
||||
info "kademlia discovery created",
|
||||
bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix
|
||||
|
||||
return ok(
|
||||
WakuKademlia(
|
||||
protocol: kademlia,
|
||||
peerManager: peerManager,
|
||||
running: false,
|
||||
getMixNodePoolSize: getMixNodePoolSize,
|
||||
isNodeStarted: isNodeStarted,
|
||||
)
|
||||
)
|
||||
|
||||
proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] =
|
||||
if service.id != MixProtocolID:
|
||||
trace "service is not mix protocol",
|
||||
serviceId = service.id, mixProtocolId = MixProtocolID
|
||||
return none(Curve25519Key)
|
||||
|
||||
if service.data.len != Curve25519KeySize:
|
||||
warn "invalid mix pub key length from kademlia record",
|
||||
expected = Curve25519KeySize,
|
||||
actual = service.data.len,
|
||||
dataHex = byteutils.toHex(service.data)
|
||||
return none(Curve25519Key)
|
||||
|
||||
debug "found mix protocol service",
|
||||
dataLen = service.data.len, expectedLen = Curve25519KeySize
|
||||
|
||||
let key = intoCurve25519Key(service.data)
|
||||
debug "successfully extracted mix pub key", keyHex = byteutils.toHex(key)
|
||||
return some(key)
|
||||
|
||||
proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
debug "processing kademlia record",
|
||||
peerId = record.peerId,
|
||||
numAddresses = record.addresses.len,
|
||||
@ -132,17 +47,15 @@ proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
|
||||
var mixPubKey = none(Curve25519Key)
|
||||
for service in record.services:
|
||||
debug "checking service",
|
||||
peerId = record.peerId, serviceId = service.id, dataLen = service.data.len
|
||||
mixPubKey = extractMixPubKey(service)
|
||||
if mixPubKey.isSome():
|
||||
debug "extracted mix public key from service", peerId = record.peerId
|
||||
break
|
||||
if service.id != MixProtocolID:
|
||||
continue
|
||||
|
||||
if service.data.len != Curve25519KeySize:
|
||||
continue
|
||||
|
||||
mixPubKey = some(intoCurve25519Key(service.data))
|
||||
break
|
||||
|
||||
if record.services.len > 0 and mixPubKey.isNone():
|
||||
debug "record has services but no valid mix key",
|
||||
peerId = record.peerId, services = record.services.mapIt(it.id)
|
||||
return none(RemotePeerInfo)
|
||||
return some(
|
||||
RemotePeerInfo.init(
|
||||
record.peerId,
|
||||
@ -153,128 +66,111 @@ proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
)
|
||||
)
|
||||
|
||||
proc lookupMixPeers*(
|
||||
wk: WakuKademlia
|
||||
): Future[Result[int, string]] {.async: (raises: []).} =
|
||||
## Lookup mix peers via kademlia and add them to the peer store.
|
||||
## Returns the number of mix peers found and added.
|
||||
if wk.protocol.isNil():
|
||||
return err("cannot lookup mix peers: kademlia not mounted")
|
||||
|
||||
let mixService = ServiceInfo(id: MixProtocolID, data: @[])
|
||||
var records: seq[ExtendedPeerRecord]
|
||||
try:
|
||||
records = await wk.protocol.lookup(mixService)
|
||||
except CatchableError:
|
||||
return err("mix peer lookup failed: " & getCurrentExceptionMsg())
|
||||
|
||||
debug "mix peer lookup returned records", numRecords = records.len
|
||||
|
||||
var added = 0
|
||||
for record in records:
|
||||
let peerOpt = remotePeerInfoFrom(record)
|
||||
if peerOpt.isNone():
|
||||
continue
|
||||
|
||||
let peerInfo = peerOpt.get()
|
||||
if peerInfo.mixPubKey.isNone():
|
||||
continue
|
||||
|
||||
wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
info "mix peer added via kademlia lookup",
|
||||
peerId = $peerInfo.peerId, mixPubKey = byteutils.toHex(peerInfo.mixPubKey.get())
|
||||
added.inc()
|
||||
|
||||
info "mix peer lookup complete", found = added
|
||||
return ok(added)
|
||||
|
||||
proc runDiscoveryLoop(
|
||||
wk: WakuKademlia, interval: Duration, minMixPeers: int
|
||||
) {.async: (raises: []).} =
|
||||
info "extended kademlia discovery loop started", interval = interval
|
||||
self: WakuKademlia, interval: Duration
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
debug "kademlia discovery loop started", interval = interval
|
||||
|
||||
try:
|
||||
while true:
|
||||
# Wait for node to be started
|
||||
if not wk.isNodeStarted.isNil() and not wk.isNodeStarted():
|
||||
await sleepAsync(ExtendedKademliaDiscoveryStartupDelay)
|
||||
while true:
|
||||
await sleepAsync(interval)
|
||||
|
||||
let res = catch:
|
||||
await self.protocol.randomRecords()
|
||||
let records = res.valueOr:
|
||||
error "kademlia discovery lookup failed", error = res.error.msg
|
||||
continue
|
||||
|
||||
for record in records:
|
||||
let peerInfo = toRemotePeerInfo(record).valueOr:
|
||||
continue
|
||||
|
||||
var records: seq[ExtendedPeerRecord]
|
||||
try:
|
||||
records = await wk.protocol.randomRecords()
|
||||
except CatchableError as e:
|
||||
warn "extended kademlia discovery failed", error = e.msg
|
||||
await sleepAsync(interval)
|
||||
continue
|
||||
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
|
||||
debug "received random records from kademlia", numRecords = records.len
|
||||
debug "peer added via random walk",
|
||||
peerId = $peerInfo.peerId,
|
||||
addresses = peerInfo.addrs.mapIt($it),
|
||||
protocols = peerInfo.protocols
|
||||
|
||||
var added = 0
|
||||
for record in records:
|
||||
let peerOpt = remotePeerInfoFrom(record)
|
||||
if peerOpt.isNone():
|
||||
continue
|
||||
proc lookup*(
|
||||
self: WakuKademlia, codec: string
|
||||
): Future[seq[RemotePeerInfo]] {.async: (raises: []).} =
|
||||
let serviceId = hashServiceId(codec)
|
||||
|
||||
let peerInfo = peerOpt.get()
|
||||
wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
debug "peer added via extended kademlia discovery",
|
||||
peerId = $peerInfo.peerId,
|
||||
addresses = peerInfo.addrs.mapIt($it),
|
||||
protocols = peerInfo.protocols,
|
||||
hasMixPubKey = peerInfo.mixPubKey.isSome()
|
||||
added.inc()
|
||||
|
||||
if added > 0:
|
||||
info "added peers from extended kademlia discovery", count = added
|
||||
|
||||
# Targeted mix peer lookup when pool is low
|
||||
if minMixPeers > 0 and not wk.getMixNodePoolSize.isNil() and
|
||||
wk.getMixNodePoolSize() < minMixPeers:
|
||||
debug "mix node pool below threshold, performing targeted lookup",
|
||||
currentPoolSize = wk.getMixNodePoolSize(), threshold = minMixPeers
|
||||
let found = (await wk.lookupMixPeers()).valueOr:
|
||||
warn "targeted mix peer lookup failed", error = error
|
||||
0
|
||||
if found > 0:
|
||||
info "found mix peers via targeted kademlia lookup", count = found
|
||||
|
||||
await sleepAsync(interval)
|
||||
except CancelledError as e:
|
||||
debug "extended kademlia discovery loop cancelled", error = e.msg
|
||||
except CatchableError as e:
|
||||
error "extended kademlia discovery loop failed", error = e.msg
|
||||
|
||||
proc start*(
|
||||
wk: WakuKademlia,
|
||||
interval: Duration = DefaultExtendedKademliaDiscoveryInterval,
|
||||
minMixPeers: int = 0,
|
||||
): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
if wk.running:
|
||||
return err("already running")
|
||||
|
||||
try:
|
||||
await wk.protocol.start()
|
||||
except CatchableError as e:
|
||||
return err("failed to start kademlia discovery: " & e.msg)
|
||||
|
||||
wk.discoveryLoop = wk.runDiscoveryLoop(interval, minMixPeers)
|
||||
|
||||
info "kademlia discovery started"
|
||||
return ok()
|
||||
|
||||
proc stop*(wk: WakuKademlia) {.async: (raises: []).} =
|
||||
if not wk.running:
|
||||
let catchRes = catch:
|
||||
await self.protocol.lookup(serviceId)
|
||||
let lookupRes = catchRes.valueOr:
|
||||
error "kademlia discovery lookup failed", error = catchRes.error.msg
|
||||
return
|
||||
|
||||
info "Stopping kademlia discovery"
|
||||
let ads = lookupRes.valueOr:
|
||||
error "kademlia discovery lookup failed", error
|
||||
return
|
||||
|
||||
wk.running = false
|
||||
var peerInfos = newSeqOfCap[RemotePeerInfo](ads.len)
|
||||
for ad in ads:
|
||||
let peerInfo = toRemotePeerInfo(ad.data).valueOr:
|
||||
continue
|
||||
|
||||
if not wk.discoveryLoop.isNil():
|
||||
await wk.discoveryLoop.cancelAndWait()
|
||||
wk.discoveryLoop = nil
|
||||
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
|
||||
if not wk.protocol.isNil():
|
||||
await wk.protocol.stop()
|
||||
info "Successfully stopped kademlia discovery"
|
||||
debug "peer added via service discovery",
|
||||
service = codec,
|
||||
peerId = $peerInfo.peerId,
|
||||
addresses = peerInfo.addrs.mapIt($it),
|
||||
protocols = peerInfo.protocols
|
||||
|
||||
peerInfos.add(peerInfo)
|
||||
|
||||
return peerInfos
|
||||
|
||||
proc new*(
|
||||
T: type WakuKademlia,
|
||||
switch: Switch,
|
||||
peerManager: PeerManager,
|
||||
bootstrapNodes: seq[(PeerId, seq[MultiAddress])],
|
||||
providedServices: var seq[ServiceInfo],
|
||||
loopInterval: Duration = DefaultKademliaDiscoveryInterval,
|
||||
): T =
|
||||
if bootstrapNodes.len == 0:
|
||||
debug "creating kademlia discovery as seed node (no bootstrap nodes)"
|
||||
|
||||
let kademlia = KademliaDiscovery.new(
|
||||
switch,
|
||||
bootstrapNodes = bootstrapNodes,
|
||||
config = KadDHTConfig.new(
|
||||
validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector()
|
||||
),
|
||||
services = providedServices,
|
||||
)
|
||||
|
||||
return WakuKademlia(
|
||||
protocol: kademlia, peerManager: peerManager, loopInterval: loopInterval
|
||||
)
|
||||
|
||||
proc start*(self: WakuKademlia) {.async.} =
|
||||
if self.protocol.started:
|
||||
warn "Starting waku kad twice"
|
||||
return
|
||||
|
||||
info "Starting Waku Kademlia"
|
||||
|
||||
await self.protocol.start()
|
||||
|
||||
self.walkIntervalFut = self.runDiscoveryLoop(self.loopInterval)
|
||||
|
||||
info "Waku Kademlia Started"
|
||||
|
||||
proc stop*(self: WakuKademlia) {.async.} =
|
||||
if not self.protocol.started:
|
||||
return
|
||||
|
||||
info "Stopping Waku Kademlia"
|
||||
|
||||
if not self.walkIntervalFut.isNil():
|
||||
self.walkIntervalFut.cancelSoon()
|
||||
self.walkIntervalFut = nil
|
||||
|
||||
if not self.protocol.isNil():
|
||||
await self.protocol.stop()
|
||||
|
||||
info "Successfully stopped Waku Kademlia"
|
||||
|
||||
@ -11,7 +11,6 @@ logScope:
|
||||
type MixConfBuilder* = object
|
||||
enabled: Option[bool]
|
||||
mixKey: Option[string]
|
||||
mixNodes: seq[MixNodePubInfo]
|
||||
|
||||
proc init*(T: type MixConfBuilder): MixConfBuilder =
|
||||
MixConfBuilder()
|
||||
@ -22,9 +21,6 @@ proc withEnabled*(b: var MixConfBuilder, enabled: bool) =
|
||||
proc withMixKey*(b: var MixConfBuilder, mixKey: string) =
|
||||
b.mixKey = some(mixKey)
|
||||
|
||||
proc withMixNodes*(b: var MixConfBuilder, mixNodes: seq[MixNodePubInfo]) =
|
||||
b.mixNodes = mixNodes
|
||||
|
||||
proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
|
||||
if not b.enabled.get(false):
|
||||
return ok(none[MixConf]())
|
||||
@ -32,12 +28,8 @@ proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
|
||||
if b.mixKey.isSome():
|
||||
let mixPrivKey = intoCurve25519Key(ncrutils.fromHex(b.mixKey.get()))
|
||||
let mixPubKey = public(mixPrivKey)
|
||||
return ok(
|
||||
some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes))
|
||||
)
|
||||
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))
|
||||
else:
|
||||
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
|
||||
return err("Generate key pair error: " & $error)
|
||||
return ok(
|
||||
some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes))
|
||||
)
|
||||
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import
|
||||
std/[options, sequtils],
|
||||
std/[options, sequtils, sugar],
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/peerid,
|
||||
@ -7,7 +7,9 @@ import
|
||||
libp2p/protocols/connectivity/relay/relay,
|
||||
libp2p/nameresolving/dnsresolver,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/extended_peer_record,
|
||||
libp2p/protocols/mix/mix_protocol
|
||||
|
||||
import
|
||||
./internal_config,
|
||||
@ -30,6 +32,7 @@ import
|
||||
../waku_filter_v2,
|
||||
../waku_peer_exchange,
|
||||
../discovery/waku_kademlia,
|
||||
../waku_mix/protocol,
|
||||
../node/peer_manager,
|
||||
../node/peer_manager/peer_store/waku_peer_storage,
|
||||
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
||||
@ -121,10 +124,11 @@ proc initNode(
|
||||
builder.withRateLimit(conf.rateLimit)
|
||||
builder.withCircuitRelay(relay)
|
||||
|
||||
let node = ?builder.build().mapErr(
|
||||
proc(err: string): string =
|
||||
"failed to create waku node instance: " & err
|
||||
)
|
||||
let node =
|
||||
?builder.build().mapErr(
|
||||
proc(err: string): string =
|
||||
"failed to create waku node instance: " & err
|
||||
)
|
||||
|
||||
ok(node)
|
||||
|
||||
@ -159,38 +163,18 @@ proc setupProtocols(
|
||||
error "Unrecoverable error occurred", error = msg
|
||||
quit(QuitFailure)
|
||||
|
||||
var providedServices: seq[ServiceInfo]
|
||||
|
||||
#mount mix
|
||||
if conf.mixConf.isSome():
|
||||
let mixConf = conf.mixConf.get()
|
||||
(await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr:
|
||||
|
||||
let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixPubKey))
|
||||
providedServices.add(mixService)
|
||||
|
||||
(await node.mountMix(mixConf.mixKey)).isOkOr:
|
||||
return err("failed to mount waku mix protocol: " & $error)
|
||||
|
||||
# Setup extended kademlia discovery
|
||||
if conf.kademliaDiscoveryConf.isSome():
|
||||
let mixPubKey =
|
||||
if conf.mixConf.isSome():
|
||||
some(conf.mixConf.get().mixPubKey)
|
||||
else:
|
||||
none(Curve25519Key)
|
||||
|
||||
node.wakuKademlia = WakuKademlia.new(
|
||||
node.switch,
|
||||
ExtendedKademliaDiscoveryParams(
|
||||
bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes,
|
||||
mixPubKey: mixPubKey,
|
||||
advertiseMix: conf.mixConf.isSome(),
|
||||
),
|
||||
node.peerManager,
|
||||
getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} =
|
||||
if node.wakuMix.isNil():
|
||||
0
|
||||
else:
|
||||
node.getMixNodePoolSize(),
|
||||
isNodeStarted = proc(): bool {.gcsafe, raises: [].} =
|
||||
node.started,
|
||||
).valueOr:
|
||||
return err("failed to setup kademlia discovery: " & error)
|
||||
|
||||
if conf.storeServiceConf.isSome():
|
||||
let storeServiceConf = conf.storeServiceConf.get()
|
||||
|
||||
@ -214,6 +198,9 @@ proc setupProtocols(
|
||||
except CatchableError:
|
||||
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
|
||||
|
||||
let storeService = ServiceInfo(id: WakuStoreCodec, data: @[])
|
||||
providedServices.add(storeService)
|
||||
|
||||
if storeServiceConf.storeSyncConf.isSome():
|
||||
let confStoreSync = storeServiceConf.storeSyncConf.get()
|
||||
|
||||
@ -226,6 +213,11 @@ proc setupProtocols(
|
||||
).isOkOr:
|
||||
return err("failed to mount waku store sync protocol: " & $error)
|
||||
|
||||
let reconciliationService = ServiceInfo(id: WakuReconciliationCodec, data: @[])
|
||||
let transferService = ServiceInfo(id: WakuTransferCodec, data: @[])
|
||||
providedServices.add(reconciliationService)
|
||||
providedServices.add(transferService)
|
||||
|
||||
if conf.remoteStoreNode.isSome():
|
||||
let storeNode = parsePeerInfo(conf.remoteStoreNode.get()).valueOr:
|
||||
return err("failed to set node waku store-sync peer: " & error)
|
||||
@ -309,10 +301,16 @@ proc setupProtocols(
|
||||
protectedShard = shardKey.shard, publicKey = shardKey.key
|
||||
node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId)
|
||||
|
||||
let relayService = ServiceInfo(id: WakuRelayCodec, data: @[])
|
||||
providedServices.add(relayService)
|
||||
|
||||
if conf.rendezvous:
|
||||
await node.mountRendezvous(conf.clusterId, shards)
|
||||
await node.mountRendezvousClient(conf.clusterId)
|
||||
|
||||
let rendezvousService = ServiceInfo(id: WakuRendezVousCodec, data: @[])
|
||||
providedServices.add(rendezvousService)
|
||||
|
||||
# Keepalive mounted on all nodes
|
||||
try:
|
||||
await mountLibp2pPing(node)
|
||||
@ -349,6 +347,9 @@ proc setupProtocols(
|
||||
except CatchableError:
|
||||
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
|
||||
|
||||
let lightpushService = ServiceInfo(id: WakuLightPushCodec, data: @[])
|
||||
providedServices.add(lightpushService)
|
||||
|
||||
mountLightPushClient(node)
|
||||
mountLegacyLightPushClient(node)
|
||||
if conf.remoteLightPushNode.isSome():
|
||||
@ -371,6 +372,9 @@ proc setupProtocols(
|
||||
except CatchableError:
|
||||
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
|
||||
|
||||
let filterService = ServiceInfo(id: WakuFilterPushCodec, data: @[])
|
||||
providedServices.add(filterService)
|
||||
|
||||
await node.mountFilterClient()
|
||||
if conf.remoteFilterNode.isSome():
|
||||
let filterNode = parsePeerInfo(conf.remoteFilterNode.get()).valueOr:
|
||||
@ -391,6 +395,9 @@ proc setupProtocols(
|
||||
return
|
||||
err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
|
||||
|
||||
let peerXchangeService = ServiceInfo(id: WakuPeerExchangeCodec, data: @[])
|
||||
providedServices.add(peerXchangeService)
|
||||
|
||||
if conf.remotePeerExchangeNode.isSome():
|
||||
let peerExchangeNode = parsePeerInfo(conf.remotePeerExchangeNode.get()).valueOr:
|
||||
return err("failed to set node waku peer-exchange peer: " & error)
|
||||
@ -399,6 +406,25 @@ proc setupProtocols(
|
||||
if conf.peerExchangeDiscovery:
|
||||
await node.mountPeerExchangeClient()
|
||||
|
||||
if conf.kademliaDiscoveryConf.isSome():
|
||||
let kademlia = WakuKademlia.new(
|
||||
node.switch,
|
||||
node.peerManager,
|
||||
conf.kademliaDiscoveryConf.get().bootstrapNodes,
|
||||
providedServices,
|
||||
)
|
||||
|
||||
let catchRes = catch:
|
||||
node.switch.mount(kademlia.protocol)
|
||||
if catchRes.isErr():
|
||||
return err("failed to mount kademlia discovery: " & catchRes.error.msg)
|
||||
|
||||
node.wakuKademlia = kademlia
|
||||
|
||||
# Connect kademlia to mix for peer discovery
|
||||
if not node.wakuMix.isNil() and not node.wakuKademlia.isNil():
|
||||
node.wakuMix.setKademlia(node.wakuKademlia)
|
||||
|
||||
return ok()
|
||||
|
||||
## Start node
|
||||
@ -450,11 +476,6 @@ proc startNode*(
|
||||
if conf.relay:
|
||||
node.peerManager.start()
|
||||
|
||||
if not node.wakuKademlia.isNil():
|
||||
let minMixPeers = if conf.mixConf.isSome(): 4 else: 0
|
||||
(await node.wakuKademlia.start(minMixPeers = minMixPeers)).isOkOr:
|
||||
return err("failed to start kademlia discovery: " & error)
|
||||
|
||||
return ok()
|
||||
|
||||
proc setupNode*(
|
||||
|
||||
@ -50,7 +50,6 @@ type StoreSyncConf* {.requiresInit.} = object
|
||||
type MixConf* = ref object
|
||||
mixKey*: Curve25519Key
|
||||
mixPubKey*: Curve25519Key
|
||||
mixnodes*: seq[MixNodePubInfo]
|
||||
|
||||
type KademliaDiscoveryConf* = object
|
||||
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
|
||||
|
||||
@ -297,10 +297,7 @@ proc getMixNodePoolSize*(node: WakuNode): int =
|
||||
return node.wakuMix.poolSize()
|
||||
|
||||
proc mountMix*(
|
||||
node: WakuNode,
|
||||
clusterId: uint16,
|
||||
mixPrivKey: Curve25519Key,
|
||||
mixnodes: seq[MixNodePubInfo],
|
||||
node: WakuNode, mixPrivKey: Curve25519Key
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
info "mounting mix protocol", nodeId = node.info #TODO log the config used
|
||||
|
||||
@ -312,10 +309,13 @@ proc mountMix*(
|
||||
info "local addr", localaddr = localaddrStr
|
||||
|
||||
node.wakuMix = WakuMix.new(
|
||||
localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes
|
||||
mixPrivKey = mixPrivKey,
|
||||
nodeAddr = localaddrStr,
|
||||
switch = node.switch,
|
||||
wakuKademlia = node.wakuKademlia,
|
||||
).valueOr:
|
||||
error "Waku Mix protocol initialization failed", err = error
|
||||
return
|
||||
return err("Waku Mix protocol initialization failed: " & error)
|
||||
#TODO: should we do the below only for exit node? Also, what if multiple protocols use mix?
|
||||
node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1)))
|
||||
let catchRes = catch:
|
||||
@ -341,11 +341,12 @@ proc mountStoreSync*(
|
||||
|
||||
let pubsubTopics = shards.mapIt($RelayShard(clusterId: cluster, shardId: it))
|
||||
|
||||
let recon = ?await SyncReconciliation.new(
|
||||
pubsubTopics, contentTopics, node.peerManager, node.wakuArchive,
|
||||
storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds,
|
||||
idsChannel, wantsChannel, needsChannel,
|
||||
)
|
||||
let recon =
|
||||
?await SyncReconciliation.new(
|
||||
pubsubTopics, contentTopics, node.peerManager, node.wakuArchive,
|
||||
storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds,
|
||||
idsChannel, wantsChannel, needsChannel,
|
||||
)
|
||||
|
||||
node.wakuStoreReconciliation = recon
|
||||
|
||||
@ -582,12 +583,6 @@ proc start*(node: WakuNode) {.async.} =
|
||||
if not node.wakuRelay.isNil():
|
||||
await node.startRelay()
|
||||
|
||||
if not node.wakuMix.isNil():
|
||||
node.wakuMix.start()
|
||||
|
||||
if not node.wakuMetadata.isNil():
|
||||
node.wakuMetadata.start()
|
||||
|
||||
if not node.wakuStoreResume.isNil():
|
||||
await node.wakuStoreResume.start()
|
||||
|
||||
@ -597,12 +592,6 @@ proc start*(node: WakuNode) {.async.} =
|
||||
if not node.wakuRendezvousClient.isNil():
|
||||
await node.wakuRendezvousClient.start()
|
||||
|
||||
if not node.wakuStoreReconciliation.isNil():
|
||||
node.wakuStoreReconciliation.start()
|
||||
|
||||
if not node.wakuStoreTransfer.isNil():
|
||||
node.wakuStoreTransfer.start()
|
||||
|
||||
## The switch uses this mapper to update peer info addrs
|
||||
## with announced addrs after start
|
||||
let addressMapper = proc(
|
||||
@ -653,19 +642,10 @@ proc stop*(node: WakuNode) {.async.} =
|
||||
if not node.wakuStoreResume.isNil():
|
||||
await node.wakuStoreResume.stopWait()
|
||||
|
||||
if not node.wakuStoreReconciliation.isNil():
|
||||
node.wakuStoreReconciliation.stop()
|
||||
|
||||
if not node.wakuStoreTransfer.isNil():
|
||||
node.wakuStoreTransfer.stop()
|
||||
|
||||
if not node.wakuPeerExchangeClient.isNil() and
|
||||
not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
|
||||
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()
|
||||
|
||||
if not node.wakuKademlia.isNil():
|
||||
await node.wakuKademlia.stop()
|
||||
|
||||
if not node.wakuRendezvous.isNil():
|
||||
await node.wakuRendezvous.stopWait()
|
||||
|
||||
|
||||
@ -10,104 +10,129 @@ import
|
||||
libp2p/protocols/mix/mix_protocol,
|
||||
libp2p/protocols/mix/mix_metrics,
|
||||
libp2p/protocols/mix/delay_strategy,
|
||||
libp2p/[multiaddress, peerid],
|
||||
libp2p/[multiaddress, peerid, switch],
|
||||
libp2p/extended_peer_record,
|
||||
eth/common/keys
|
||||
|
||||
import
|
||||
waku/node/peer_manager,
|
||||
waku/waku_core,
|
||||
waku/waku_enr,
|
||||
waku/node/peer_manager/waku_peer_store
|
||||
waku/node/peer_manager/waku_peer_store,
|
||||
waku/discovery/waku_kademlia
|
||||
|
||||
logScope:
|
||||
topics = "waku mix"
|
||||
|
||||
const minMixPoolSize = 4
|
||||
const
|
||||
MinimumMixPoolSize = 4
|
||||
DefaultMixPoolMaintenanceInterval = chronos.seconds(10)
|
||||
|
||||
type
|
||||
WakuMix* = ref object of MixProtocol
|
||||
peerManager*: PeerManager
|
||||
clusterId: uint16
|
||||
pubKey*: Curve25519Key
|
||||
type WakuMix* = ref object of MixProtocol
|
||||
pubKey*: Curve25519Key
|
||||
targetMixPoolSize: int
|
||||
currentMixPoolSize: int
|
||||
maintenanceInterval: Duration
|
||||
maintenanceIntervalFut: Future[void]
|
||||
wakuKademlia: WakuKademlia
|
||||
|
||||
WakuMixResult*[T] = Result[T, string]
|
||||
proc poolSize*(self: WakuMix): int =
|
||||
if self.nodePool.isNil():
|
||||
0
|
||||
else:
|
||||
self.nodePool.len()
|
||||
|
||||
MixNodePubInfo* = object
|
||||
multiAddr*: string
|
||||
pubKey*: Curve25519Key
|
||||
proc mixPoolMaintenance(
|
||||
self: WakuMix, interval: Duration
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
debug "mix pool maintenance loop started", interval = interval
|
||||
|
||||
proc processBootNodes(
|
||||
bootnodes: seq[MixNodePubInfo], peermgr: PeerManager, mix: WakuMix
|
||||
) =
|
||||
var count = 0
|
||||
for node in bootnodes:
|
||||
let pInfo = parsePeerInfo(node.multiAddr).valueOr:
|
||||
error "Failed to get peer id from multiaddress: ",
|
||||
error = error, multiAddr = $node.multiAddr
|
||||
continue
|
||||
let peerId = pInfo.peerId
|
||||
var peerPubKey: crypto.PublicKey
|
||||
if not peerId.extractPublicKey(peerPubKey):
|
||||
warn "Failed to extract public key from peerId, skipping node", peerId = peerId
|
||||
while true:
|
||||
await sleepAsync(interval)
|
||||
|
||||
self.currentMixPoolSize = self.poolSize()
|
||||
mix_pool_size.set(self.currentMixPoolSize.int64)
|
||||
|
||||
if self.currentMixPoolSize >= self.targetMixPoolSize:
|
||||
continue
|
||||
|
||||
if peerPubKey.scheme != PKScheme.Secp256k1:
|
||||
warn "Peer public key is not Secp256k1, skipping node",
|
||||
peerId = peerId, scheme = peerPubKey.scheme
|
||||
# Skip discovery if kademlia not available
|
||||
if self.wakuKademlia.isNil():
|
||||
debug "kademlia not available for mix peer discovery"
|
||||
continue
|
||||
|
||||
let multiAddr = MultiAddress.init(node.multiAddr).valueOr:
|
||||
error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error
|
||||
continue
|
||||
trace "mix node pool below threshold, performing targeted lookup",
|
||||
currentPoolSize = self.currentMixPoolSize, threshold = self.targetMixPoolSize
|
||||
|
||||
let mixPubInfo = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey)
|
||||
mix.nodePool.add(mixPubInfo)
|
||||
count.inc()
|
||||
let mixPeers = await self.wakuKademlia.lookup(MixProtocolID)
|
||||
|
||||
peermgr.addPeer(
|
||||
RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey))
|
||||
)
|
||||
mix_pool_size.set(count)
|
||||
info "using mix bootstrap nodes ", count = count
|
||||
trace "mix peer discovery completed", discoveredPeers = mixPeers.len
|
||||
|
||||
proc new*(
|
||||
T: typedesc[WakuMix],
|
||||
nodeAddr: string,
|
||||
peermgr: PeerManager,
|
||||
clusterId: uint16,
|
||||
mixPrivKey: Curve25519Key,
|
||||
bootnodes: seq[MixNodePubInfo],
|
||||
): WakuMixResult[T] =
|
||||
nodeAddr: string,
|
||||
switch: Switch,
|
||||
targetMixPoolSize: int = MinimumMixPoolSize,
|
||||
maintenanceInterval: Duration = DefaultMixPoolMaintenanceInterval,
|
||||
wakuKademlia: WakuKademlia = nil,
|
||||
): Result[T, string] =
|
||||
let mixPubKey = public(mixPrivKey)
|
||||
info "mixPubKey", mixPubKey = mixPubKey
|
||||
|
||||
debug "Mix Public Key", mixPubKey = mixPubKey
|
||||
|
||||
let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr:
|
||||
return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error)
|
||||
|
||||
let localMixNodeInfo = initMixNodeInfo(
|
||||
peermgr.switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey,
|
||||
peermgr.switch.peerInfo.publicKey.skkey, peermgr.switch.peerInfo.privateKey.skkey,
|
||||
switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey,
|
||||
switch.peerInfo.publicKey.skkey, switch.peerInfo.privateKey.skkey,
|
||||
)
|
||||
|
||||
var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey)
|
||||
procCall MixProtocol(m).init(
|
||||
let mix = WakuMix(
|
||||
pubKey: mixPubKey,
|
||||
targetMixPoolSize: targetMixPoolSize,
|
||||
currentMixPoolSize: 0,
|
||||
maintenanceInterval: maintenanceInterval,
|
||||
maintenanceIntervalFut: nil,
|
||||
wakuKademlia: wakuKademlia,
|
||||
)
|
||||
|
||||
procCall MixProtocol(mix).init(
|
||||
localMixNodeInfo,
|
||||
peermgr.switch,
|
||||
switch,
|
||||
delayStrategy =
|
||||
ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()),
|
||||
)
|
||||
|
||||
processBootNodes(bootnodes, peermgr, m)
|
||||
return ok(mix)
|
||||
|
||||
if m.nodePool.len < minMixPoolSize:
|
||||
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
|
||||
return ok(m)
|
||||
proc setKademlia*(self: WakuMix, wakuKademlia: WakuKademlia) =
|
||||
self.wakuKademlia = wakuKademlia
|
||||
|
||||
proc poolSize*(mix: WakuMix): int =
|
||||
mix.nodePool.len
|
||||
method start*(self: WakuMix) {.async.} =
|
||||
if self.started:
|
||||
warn "Starting Waku Mix twice"
|
||||
return
|
||||
|
||||
method start*(mix: WakuMix) =
|
||||
info "starting waku mix protocol"
|
||||
info "Starting Waku Mix"
|
||||
|
||||
method stop*(mix: WakuMix) {.async.} =
|
||||
discard
|
||||
await procCall start(MixProtocol(self))
|
||||
|
||||
# Mix Protocol
|
||||
self.maintenanceIntervalFut = self.mixPoolMaintenance(self.maintenanceInterval)
|
||||
|
||||
info "Waku Mix Started"
|
||||
|
||||
method stop*(self: WakuMix) {.async.} =
|
||||
if not self.started:
|
||||
return
|
||||
|
||||
info "Stopping Waku Mix"
|
||||
|
||||
if not self.maintenanceIntervalFut.isNil():
|
||||
self.maintenanceIntervalFut.cancelSoon()
|
||||
self.maintenanceIntervalFut = nil
|
||||
|
||||
await procCall stop(MixProtocol(self))
|
||||
|
||||
info "Successfully stopped Waku Mix"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user