mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-05-18 08:19:26 +00:00
Mix usage of service discovery
This commit is contained in:
parent
4ee846357b
commit
a6aee7cc71
@ -266,7 +266,8 @@ type WakuNodeConf* = object
|
|||||||
|
|
||||||
## Circuit-relay config
|
## Circuit-relay config
|
||||||
isRelayClient* {.
|
isRelayClient* {.
|
||||||
desc: """Set the node as a relay-client.
|
desc:
|
||||||
|
"""Set the node as a relay-client.
|
||||||
Set it to true for nodes that run behind a NAT or firewall and
|
Set it to true for nodes that run behind a NAT or firewall and
|
||||||
hence would have reachability issues.""",
|
hence would have reachability issues.""",
|
||||||
defaultValue: false,
|
defaultValue: false,
|
||||||
@ -636,12 +637,6 @@ with the drawback of consuming some more bandwidth.""",
|
|||||||
name: "mixkey"
|
name: "mixkey"
|
||||||
.}: Option[string]
|
.}: Option[string]
|
||||||
|
|
||||||
mixnodes* {.
|
|
||||||
desc:
|
|
||||||
"Multiaddress and mix-key of mix node to be statically specified in format multiaddr:mixPubKey. Argument may be repeated.",
|
|
||||||
name: "mixnode"
|
|
||||||
.}: seq[MixNodePubInfo]
|
|
||||||
|
|
||||||
# Kademlia Discovery config
|
# Kademlia Discovery config
|
||||||
enableKadDiscovery* {.
|
enableKadDiscovery* {.
|
||||||
desc:
|
desc:
|
||||||
@ -735,22 +730,6 @@ proc isNumber(x: string): bool =
|
|||||||
except ValueError:
|
except ValueError:
|
||||||
result = false
|
result = false
|
||||||
|
|
||||||
proc parseCmdArg*(T: type MixNodePubInfo, p: string): T =
|
|
||||||
let elements = p.split(":")
|
|
||||||
if elements.len != 2:
|
|
||||||
raise newException(
|
|
||||||
ValueError, "Invalid format for mix node expected multiaddr:mixPublicKey"
|
|
||||||
)
|
|
||||||
let multiaddr = MultiAddress.init(elements[0]).valueOr:
|
|
||||||
raise newException(ValueError, "Invalid multiaddress format")
|
|
||||||
if not multiaddr.contains(multiCodec("ip4")).get():
|
|
||||||
raise newException(
|
|
||||||
ValueError, "Invalid format for ip address, expected a ipv4 multiaddress"
|
|
||||||
)
|
|
||||||
return MixNodePubInfo(
|
|
||||||
multiaddr: elements[0], pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1]))
|
|
||||||
)
|
|
||||||
|
|
||||||
proc parseCmdArg*(T: type ProtectedShard, p: string): T =
|
proc parseCmdArg*(T: type ProtectedShard, p: string): T =
|
||||||
let elements = p.split(":")
|
let elements = p.split(":")
|
||||||
if elements.len != 2:
|
if elements.len != 2:
|
||||||
@ -834,22 +813,6 @@ proc readValue*(
|
|||||||
except CatchableError:
|
except CatchableError:
|
||||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
raise newException(SerializationError, getCurrentExceptionMsg())
|
||||||
|
|
||||||
proc readValue*(
|
|
||||||
r: var TomlReader, value: var MixNodePubInfo
|
|
||||||
) {.raises: [SerializationError].} =
|
|
||||||
try:
|
|
||||||
value = parseCmdArg(MixNodePubInfo, r.readValue(string))
|
|
||||||
except CatchableError:
|
|
||||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
proc readValue*(
|
|
||||||
r: var EnvvarReader, value: var MixNodePubInfo
|
|
||||||
) {.raises: [SerializationError].} =
|
|
||||||
try:
|
|
||||||
value = parseCmdArg(MixNodePubInfo, r.readValue(string))
|
|
||||||
except CatchableError:
|
|
||||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
proc readValue*(
|
proc readValue*(
|
||||||
r: var TomlReader, value: var ProtectedShard
|
r: var TomlReader, value: var ProtectedShard
|
||||||
) {.raises: [SerializationError].} =
|
) {.raises: [SerializationError].} =
|
||||||
@ -1067,7 +1030,6 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
|
|||||||
b.storeServiceConf.storeSyncConf.withRelayJitterSec(n.storeSyncRelayJitter)
|
b.storeServiceConf.storeSyncConf.withRelayJitterSec(n.storeSyncRelayJitter)
|
||||||
|
|
||||||
b.mixConf.withEnabled(n.mix)
|
b.mixConf.withEnabled(n.mix)
|
||||||
b.mixConf.withMixNodes(n.mixnodes)
|
|
||||||
b.withMix(n.mix)
|
b.withMix(n.mix)
|
||||||
if n.mixkey.isSome():
|
if n.mixkey.isSome():
|
||||||
b.mixConf.withMixKey(n.mixkey.get())
|
b.mixConf.withMixKey(n.mixkey.get())
|
||||||
|
|||||||
@ -1,11 +1,13 @@
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import std/[options, sequtils, sugar]
|
import std/[options, sequtils]
|
||||||
import
|
import
|
||||||
chronos,
|
chronos,
|
||||||
chronicles,
|
chronicles,
|
||||||
results,
|
results,
|
||||||
stew/byteutils,
|
libp2p/crypto/curve25519,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
|
libp2p/protocols/mix/mix_protocol,
|
||||||
libp2p/[peerid, multiaddress, switch],
|
libp2p/[peerid, multiaddress, switch],
|
||||||
libp2p/extended_peer_record,
|
libp2p/extended_peer_record,
|
||||||
libp2p/protocols/[kademlia, kad_disco],
|
libp2p/protocols/[kademlia, kad_disco],
|
||||||
@ -15,14 +17,14 @@ import
|
|||||||
import waku/waku_core, waku/node/peer_manager
|
import waku/waku_core, waku/node/peer_manager
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "waku kademlia discovery"
|
topics = "waku kademlia"
|
||||||
|
|
||||||
const DefaultKademliaDiscoveryInterval* = chronos.seconds(10)
|
const DefaultKademliaDiscoveryInterval* = chronos.seconds(10)
|
||||||
|
|
||||||
type WakuKademlia* = ref object
|
type WakuKademlia* = ref object
|
||||||
protocol*: KademliaDiscovery
|
protocol*: KademliaDiscovery
|
||||||
peerManager: PeerManager
|
peerManager: PeerManager
|
||||||
intervalFut: Future[void]
|
walkIntervalFut: Future[void]
|
||||||
|
|
||||||
proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||||
debug "processing kademlia record",
|
debug "processing kademlia record",
|
||||||
@ -42,22 +44,37 @@ proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
|||||||
|
|
||||||
let protocols = record.services.mapIt(it.id)
|
let protocols = record.services.mapIt(it.id)
|
||||||
|
|
||||||
|
var mixPubKey = none(Curve25519Key)
|
||||||
|
for service in record.services:
|
||||||
|
if service.id != MixProtocolID:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if service.data.len != Curve25519KeySize:
|
||||||
|
continue
|
||||||
|
|
||||||
|
mixPubKey = some(intoCurve25519Key(service.data))
|
||||||
|
break
|
||||||
|
|
||||||
return some(
|
return some(
|
||||||
RemotePeerInfo.init(
|
RemotePeerInfo.init(
|
||||||
record.peerId, addrs = addrs, protocols = protocols, origin = PeerOrigin.Kademlia
|
record.peerId,
|
||||||
|
addrs = addrs,
|
||||||
|
protocols = protocols,
|
||||||
|
origin = PeerOrigin.Kademlia,
|
||||||
|
mixPubKey = mixPubKey,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
proc runDiscoveryLoop(
|
proc runDiscoveryLoop(
|
||||||
wk: WakuKademlia, interval: Duration
|
self: WakuKademlia, interval: Duration
|
||||||
) {.async: (raises: [CancelledError]).} =
|
) {.async: (raises: [CancelledError]).} =
|
||||||
info "kademlia discovery loop started", interval = interval
|
debug "kademlia discovery loop started", interval = interval
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
await sleepAsync(interval)
|
await sleepAsync(interval)
|
||||||
|
|
||||||
let res = catch:
|
let res = catch:
|
||||||
await wk.protocol.randomRecords()
|
await self.protocol.randomRecords()
|
||||||
let records = res.valueOr:
|
let records = res.valueOr:
|
||||||
error "kademlia discovery lookup failed", error = res.error.msg
|
error "kademlia discovery lookup failed", error = res.error.msg
|
||||||
continue
|
continue
|
||||||
@ -66,15 +83,13 @@ proc runDiscoveryLoop(
|
|||||||
let peerInfo = toRemotePeerInfo(record).valueOr:
|
let peerInfo = toRemotePeerInfo(record).valueOr:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||||
|
|
||||||
debug "peer added via kademlia discovery",
|
debug "peer added via random walk",
|
||||||
peerId = $peerInfo.peerId,
|
peerId = $peerInfo.peerId,
|
||||||
addresses = peerInfo.addrs.mapIt($it),
|
addresses = peerInfo.addrs.mapIt($it),
|
||||||
protocols = peerInfo.protocols
|
protocols = peerInfo.protocols
|
||||||
|
|
||||||
#TODO peer added metric
|
|
||||||
|
|
||||||
proc lookup*(
|
proc lookup*(
|
||||||
self: WakuKademlia, codec: string
|
self: WakuKademlia, codec: string
|
||||||
): Future[seq[RemotePeerInfo]] {.async: (raises: []).} =
|
): Future[seq[RemotePeerInfo]] {.async: (raises: []).} =
|
||||||
@ -97,13 +112,12 @@ proc lookup*(
|
|||||||
|
|
||||||
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||||
|
|
||||||
debug "peer added via kademlia discovery",
|
debug "peer added via service discovery",
|
||||||
|
service = codec,
|
||||||
peerId = $peerInfo.peerId,
|
peerId = $peerInfo.peerId,
|
||||||
addresses = peerInfo.addrs.mapIt($it),
|
addresses = peerInfo.addrs.mapIt($it),
|
||||||
protocols = peerInfo.protocols
|
protocols = peerInfo.protocols
|
||||||
|
|
||||||
#TODO peer added metric
|
|
||||||
|
|
||||||
peerInfos.add(peerInfo)
|
peerInfos.add(peerInfo)
|
||||||
|
|
||||||
return peerInfos
|
return peerInfos
|
||||||
@ -116,7 +130,7 @@ proc new*(
|
|||||||
providedServices: var seq[ServiceInfo],
|
providedServices: var seq[ServiceInfo],
|
||||||
): T =
|
): T =
|
||||||
if bootstrapNodes.len == 0:
|
if bootstrapNodes.len == 0:
|
||||||
info "creating kademlia discovery as seed node (no bootstrap nodes)"
|
debug "creating kademlia discovery as seed node (no bootstrap nodes)"
|
||||||
|
|
||||||
let kademlia = KademliaDiscovery.new(
|
let kademlia = KademliaDiscovery.new(
|
||||||
switch,
|
switch,
|
||||||
@ -127,34 +141,34 @@ proc new*(
|
|||||||
services = providedServices,
|
services = providedServices,
|
||||||
)
|
)
|
||||||
|
|
||||||
info "kademlia service discovery created", bootstrapNodes = bootstrapNodes.len
|
|
||||||
|
|
||||||
return WakuKademlia(protocol: kademlia, peerManager: peerManager)
|
return WakuKademlia(protocol: kademlia, peerManager: peerManager)
|
||||||
|
|
||||||
proc start*(
|
proc start*(
|
||||||
self: WakuKademlia, interval: Duration = DefaultKademliaDiscoveryInterval
|
self: WakuKademlia, interval: Duration = DefaultKademliaDiscoveryInterval
|
||||||
) {.async: (raises: [CancelledError]).} =
|
) {.async.} =
|
||||||
if self.protocol.started:
|
if self.protocol.started:
|
||||||
warn "Starting kad-disco twice"
|
warn "Starting waku kad twice"
|
||||||
return
|
return
|
||||||
|
|
||||||
|
info "Starting Waku Kademlia"
|
||||||
|
|
||||||
await self.protocol.start()
|
await self.protocol.start()
|
||||||
|
|
||||||
self.intervalFut = self.runDiscoveryLoop(interval)
|
self.walkIntervalFut = self.runDiscoveryLoop(interval)
|
||||||
|
|
||||||
info "kademlia discovery started"
|
info "Waku Kademlia Started"
|
||||||
|
|
||||||
proc stop*(self: WakuKademlia) {.async: (raises: []).} =
|
proc stop*(self: WakuKademlia) {.async.} =
|
||||||
if not self.protocol.started:
|
if not self.protocol.started:
|
||||||
return
|
return
|
||||||
|
|
||||||
info "Stopping kademlia discovery"
|
info "Stopping Waku Kademlia"
|
||||||
|
|
||||||
if not self.intervalFut.isNil():
|
if not self.walkIntervalFut.isNil():
|
||||||
self.intervalFut.cancelSoon()
|
self.walkIntervalFut.cancelSoon()
|
||||||
self.intervalFut = nil
|
self.walkIntervalFut = nil
|
||||||
|
|
||||||
if not self.protocol.isNil():
|
if not self.protocol.isNil():
|
||||||
await self.protocol.stop()
|
await self.protocol.stop()
|
||||||
|
|
||||||
info "Successfully stopped kademlia discovery"
|
info "Successfully stopped Waku Kademlia"
|
||||||
|
|||||||
@ -11,7 +11,6 @@ logScope:
|
|||||||
type MixConfBuilder* = object
|
type MixConfBuilder* = object
|
||||||
enabled: Option[bool]
|
enabled: Option[bool]
|
||||||
mixKey: Option[string]
|
mixKey: Option[string]
|
||||||
mixNodes: seq[MixNodePubInfo]
|
|
||||||
|
|
||||||
proc init*(T: type MixConfBuilder): MixConfBuilder =
|
proc init*(T: type MixConfBuilder): MixConfBuilder =
|
||||||
MixConfBuilder()
|
MixConfBuilder()
|
||||||
@ -22,9 +21,6 @@ proc withEnabled*(b: var MixConfBuilder, enabled: bool) =
|
|||||||
proc withMixKey*(b: var MixConfBuilder, mixKey: string) =
|
proc withMixKey*(b: var MixConfBuilder, mixKey: string) =
|
||||||
b.mixKey = some(mixKey)
|
b.mixKey = some(mixKey)
|
||||||
|
|
||||||
proc withMixNodes*(b: var MixConfBuilder, mixNodes: seq[MixNodePubInfo]) =
|
|
||||||
b.mixNodes = mixNodes
|
|
||||||
|
|
||||||
proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
|
proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
|
||||||
if not b.enabled.get(false):
|
if not b.enabled.get(false):
|
||||||
return ok(none[MixConf]())
|
return ok(none[MixConf]())
|
||||||
@ -32,12 +28,8 @@ proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
|
|||||||
if b.mixKey.isSome():
|
if b.mixKey.isSome():
|
||||||
let mixPrivKey = intoCurve25519Key(ncrutils.fromHex(b.mixKey.get()))
|
let mixPrivKey = intoCurve25519Key(ncrutils.fromHex(b.mixKey.get()))
|
||||||
let mixPubKey = public(mixPrivKey)
|
let mixPubKey = public(mixPrivKey)
|
||||||
return ok(
|
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))
|
||||||
some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes))
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
|
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
|
||||||
return err("Generate key pair error: " & $error)
|
return err("Generate key pair error: " & $error)
|
||||||
return ok(
|
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))
|
||||||
some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes))
|
|
||||||
)
|
|
||||||
|
|||||||
@ -32,6 +32,7 @@ import
|
|||||||
../waku_filter_v2,
|
../waku_filter_v2,
|
||||||
../waku_peer_exchange,
|
../waku_peer_exchange,
|
||||||
../discovery/waku_kademlia,
|
../discovery/waku_kademlia,
|
||||||
|
../waku_mix/protocol,
|
||||||
../node/peer_manager,
|
../node/peer_manager,
|
||||||
../node/peer_manager/peer_store/waku_peer_storage,
|
../node/peer_manager/peer_store/waku_peer_storage,
|
||||||
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
||||||
@ -168,10 +169,10 @@ proc setupProtocols(
|
|||||||
if conf.mixConf.isSome():
|
if conf.mixConf.isSome():
|
||||||
let mixConf = conf.mixConf.get()
|
let mixConf = conf.mixConf.get()
|
||||||
|
|
||||||
let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixKey))
|
let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixPubKey))
|
||||||
providedServices.add(mixService)
|
providedServices.add(mixService)
|
||||||
|
|
||||||
(await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr:
|
(await node.mountMix(mixConf.mixKey)).isOkOr:
|
||||||
return err("failed to mount waku mix protocol: " & $error)
|
return err("failed to mount waku mix protocol: " & $error)
|
||||||
|
|
||||||
if conf.storeServiceConf.isSome():
|
if conf.storeServiceConf.isSome():
|
||||||
@ -420,6 +421,10 @@ proc setupProtocols(
|
|||||||
|
|
||||||
node.wakuKademlia = kademlia
|
node.wakuKademlia = kademlia
|
||||||
|
|
||||||
|
# Connect kademlia to mix for peer discovery
|
||||||
|
if not node.wakuMix.isNil() and not node.wakuKademlia.isNil():
|
||||||
|
node.wakuMix.setKademlia(node.wakuKademlia)
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
## Start node
|
## Start node
|
||||||
@ -471,12 +476,6 @@ proc startNode*(
|
|||||||
if conf.relay:
|
if conf.relay:
|
||||||
node.peerManager.start()
|
node.peerManager.start()
|
||||||
|
|
||||||
if not node.wakuKademlia.isNil():
|
|
||||||
let catchRes = catch:
|
|
||||||
await node.wakuKademlia.start()
|
|
||||||
if catchRes.isErr():
|
|
||||||
return err("failed to start kademlia discovery: " & catchRes.error.msg)
|
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc setupNode*(
|
proc setupNode*(
|
||||||
|
|||||||
@ -50,7 +50,6 @@ type StoreSyncConf* {.requiresInit.} = object
|
|||||||
type MixConf* = ref object
|
type MixConf* = ref object
|
||||||
mixKey*: Curve25519Key
|
mixKey*: Curve25519Key
|
||||||
mixPubKey*: Curve25519Key
|
mixPubKey*: Curve25519Key
|
||||||
mixnodes*: seq[MixNodePubInfo]
|
|
||||||
|
|
||||||
type KademliaDiscoveryConf* = object
|
type KademliaDiscoveryConf* = object
|
||||||
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
|
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
|
||||||
|
|||||||
@ -299,10 +299,7 @@ proc getMixNodePoolSize*(node: WakuNode): int =
|
|||||||
return node.wakuMix.poolSize()
|
return node.wakuMix.poolSize()
|
||||||
|
|
||||||
proc mountMix*(
|
proc mountMix*(
|
||||||
node: WakuNode,
|
node: WakuNode, mixPrivKey: Curve25519Key
|
||||||
clusterId: uint16,
|
|
||||||
mixPrivKey: Curve25519Key,
|
|
||||||
mixnodes: seq[MixNodePubInfo],
|
|
||||||
): Future[Result[void, string]] {.async.} =
|
): Future[Result[void, string]] {.async.} =
|
||||||
info "mounting mix protocol", nodeId = node.info #TODO log the config used
|
info "mounting mix protocol", nodeId = node.info #TODO log the config used
|
||||||
|
|
||||||
@ -314,10 +311,13 @@ proc mountMix*(
|
|||||||
info "local addr", localaddr = localaddrStr
|
info "local addr", localaddr = localaddrStr
|
||||||
|
|
||||||
node.wakuMix = WakuMix.new(
|
node.wakuMix = WakuMix.new(
|
||||||
localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes
|
mixPrivKey = mixPrivKey,
|
||||||
|
nodeAddr = localaddrStr,
|
||||||
|
switch = node.switch,
|
||||||
|
wakuKademlia = node.wakuKademlia,
|
||||||
).valueOr:
|
).valueOr:
|
||||||
error "Waku Mix protocol initialization failed", err = error
|
error "Waku Mix protocol initialization failed", err = error
|
||||||
return
|
return err("Waku Mix protocol initialization failed: " & error)
|
||||||
#TODO: should we do the below only for exit node? Also, what if multiple protocols use mix?
|
#TODO: should we do the below only for exit node? Also, what if multiple protocols use mix?
|
||||||
node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1)))
|
node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1)))
|
||||||
let catchRes = catch:
|
let catchRes = catch:
|
||||||
@ -640,6 +640,9 @@ proc stop*(node: WakuNode) {.async.} =
|
|||||||
if not node.wakuRendezvousClient.isNil():
|
if not node.wakuRendezvousClient.isNil():
|
||||||
await node.wakuRendezvousClient.stopWait()
|
await node.wakuRendezvousClient.stopWait()
|
||||||
|
|
||||||
|
if not node.wakuKademlia.isNil():
|
||||||
|
await node.wakuKademlia.stop()
|
||||||
|
|
||||||
node.started = false
|
node.started = false
|
||||||
|
|
||||||
proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =
|
proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =
|
||||||
|
|||||||
@ -10,98 +10,129 @@ import
|
|||||||
libp2p/protocols/mix/mix_protocol,
|
libp2p/protocols/mix/mix_protocol,
|
||||||
libp2p/protocols/mix/mix_metrics,
|
libp2p/protocols/mix/mix_metrics,
|
||||||
libp2p/protocols/mix/delay_strategy,
|
libp2p/protocols/mix/delay_strategy,
|
||||||
libp2p/[multiaddress, peerid],
|
libp2p/[multiaddress, peerid, switch],
|
||||||
|
libp2p/extended_peer_record,
|
||||||
eth/common/keys
|
eth/common/keys
|
||||||
|
|
||||||
import
|
import
|
||||||
waku/node/peer_manager,
|
waku/node/peer_manager,
|
||||||
waku/waku_core,
|
waku/waku_core,
|
||||||
waku/waku_enr,
|
waku/waku_enr,
|
||||||
waku/node/peer_manager/waku_peer_store
|
waku/node/peer_manager/waku_peer_store,
|
||||||
|
waku/discovery/waku_kademlia
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "waku mix"
|
topics = "waku mix"
|
||||||
|
|
||||||
const minMixPoolSize = 4
|
const
|
||||||
|
MinimumMixPoolSize = 4
|
||||||
|
DefaultMixPoolMaintenanceInterval = chronos.seconds(10)
|
||||||
|
|
||||||
type
|
type WakuMix* = ref object of MixProtocol
|
||||||
WakuMix* = ref object of MixProtocol
|
pubKey*: Curve25519Key
|
||||||
peerManager*: PeerManager
|
targetMixPoolSize: int
|
||||||
clusterId: uint16
|
currentMixPoolSize: int
|
||||||
pubKey*: Curve25519Key
|
maintenanceInterval: Duration
|
||||||
|
maintenanceIntervalFut: Future[void]
|
||||||
|
wakuKademlia: WakuKademlia
|
||||||
|
|
||||||
WakuMixResult*[T] = Result[T, string]
|
proc poolSize*(self: WakuMix): int =
|
||||||
|
if self.nodePool.isNil():
|
||||||
|
0
|
||||||
|
else:
|
||||||
|
self.nodePool.len()
|
||||||
|
|
||||||
MixNodePubInfo* = object
|
proc mixPoolMaintenance(
|
||||||
multiAddr*: string
|
self: WakuMix, interval: Duration
|
||||||
pubKey*: Curve25519Key
|
) {.async: (raises: [CancelledError]).} =
|
||||||
|
debug "mix pool maintenance loop started", interval = interval
|
||||||
|
|
||||||
proc processBootNodes(
|
while true:
|
||||||
bootnodes: seq[MixNodePubInfo], peermgr: PeerManager, mix: WakuMix
|
await sleepAsync(interval)
|
||||||
) =
|
|
||||||
var count = 0
|
self.currentMixPoolSize = self.poolSize()
|
||||||
for node in bootnodes:
|
mix_pool_size.set(self.currentMixPoolSize.int64)
|
||||||
let pInfo = parsePeerInfo(node.multiAddr).valueOr:
|
|
||||||
error "Failed to get peer id from multiaddress: ",
|
if self.currentMixPoolSize >= self.targetMixPoolSize:
|
||||||
error = error, multiAddr = $node.multiAddr
|
|
||||||
continue
|
|
||||||
let peerId = pInfo.peerId
|
|
||||||
var peerPubKey: crypto.PublicKey
|
|
||||||
if not peerId.extractPublicKey(peerPubKey):
|
|
||||||
warn "Failed to extract public key from peerId, skipping node", peerId = peerId
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if peerPubKey.scheme != PKScheme.Secp256k1:
|
# Skip discovery if kademlia not available
|
||||||
warn "Peer public key is not Secp256k1, skipping node",
|
if self.wakuKademlia.isNil():
|
||||||
peerId = peerId, scheme = peerPubKey.scheme
|
debug "kademlia not available for mix peer discovery"
|
||||||
continue
|
continue
|
||||||
|
|
||||||
let multiAddr = MultiAddress.init(node.multiAddr).valueOr:
|
trace "mix node pool below threshold, performing targeted lookup",
|
||||||
error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error
|
currentPoolSize = self.currentMixPoolSize, threshold = self.targetMixPoolSize
|
||||||
continue
|
|
||||||
|
|
||||||
let mixPubInfo = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey)
|
let mixPeers = await self.wakuKademlia.lookup(MixProtocolID)
|
||||||
mix.nodePool.add(mixPubInfo)
|
|
||||||
count.inc()
|
|
||||||
|
|
||||||
peermgr.addPeer(
|
trace "mix peer discovery completed", discoveredPeers = mixPeers.len
|
||||||
RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey))
|
|
||||||
)
|
|
||||||
mix_pool_size.set(count)
|
|
||||||
info "using mix bootstrap nodes ", count = count
|
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: typedesc[WakuMix],
|
T: typedesc[WakuMix],
|
||||||
nodeAddr: string,
|
|
||||||
peermgr: PeerManager,
|
|
||||||
clusterId: uint16,
|
|
||||||
mixPrivKey: Curve25519Key,
|
mixPrivKey: Curve25519Key,
|
||||||
bootnodes: seq[MixNodePubInfo],
|
nodeAddr: string,
|
||||||
): WakuMixResult[T] =
|
switch: Switch,
|
||||||
|
targetMixPoolSize: int = MinimumMixPoolSize,
|
||||||
|
maintenanceInterval: Duration = DefaultMixPoolMaintenanceInterval,
|
||||||
|
wakuKademlia: WakuKademlia = nil,
|
||||||
|
): Result[T, string] =
|
||||||
let mixPubKey = public(mixPrivKey)
|
let mixPubKey = public(mixPrivKey)
|
||||||
info "mixPubKey", mixPubKey = mixPubKey
|
|
||||||
|
debug "Mix Public Key", mixPubKey = mixPubKey
|
||||||
|
|
||||||
let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr:
|
let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr:
|
||||||
return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error)
|
return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error)
|
||||||
|
|
||||||
let localMixNodeInfo = initMixNodeInfo(
|
let localMixNodeInfo = initMixNodeInfo(
|
||||||
peermgr.switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey,
|
switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey,
|
||||||
peermgr.switch.peerInfo.publicKey.skkey, peermgr.switch.peerInfo.privateKey.skkey,
|
switch.peerInfo.publicKey.skkey, switch.peerInfo.privateKey.skkey,
|
||||||
)
|
)
|
||||||
|
|
||||||
var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey)
|
let mix = WakuMix(
|
||||||
procCall MixProtocol(m).init(
|
pubKey: mixPubKey,
|
||||||
|
targetMixPoolSize: targetMixPoolSize,
|
||||||
|
currentMixPoolSize: 0,
|
||||||
|
maintenanceInterval: maintenanceInterval,
|
||||||
|
maintenanceIntervalFut: nil,
|
||||||
|
wakuKademlia: wakuKademlia,
|
||||||
|
)
|
||||||
|
|
||||||
|
procCall MixProtocol(mix).init(
|
||||||
localMixNodeInfo,
|
localMixNodeInfo,
|
||||||
peermgr.switch,
|
switch,
|
||||||
delayStrategy =
|
delayStrategy =
|
||||||
ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()),
|
ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()),
|
||||||
)
|
)
|
||||||
|
|
||||||
processBootNodes(bootnodes, peermgr, m)
|
return ok(mix)
|
||||||
|
|
||||||
if m.nodePool.len < minMixPoolSize:
|
proc setKademlia*(self: WakuMix, wakuKademlia: WakuKademlia) =
|
||||||
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
|
self.wakuKademlia = wakuKademlia
|
||||||
return ok(m)
|
|
||||||
|
|
||||||
proc poolSize*(mix: WakuMix): int =
|
method start*(self: WakuMix) {.async.} =
|
||||||
mix.nodePool.len
|
if self.started:
|
||||||
|
warn "Starting Waku Mix twice"
|
||||||
|
return
|
||||||
|
|
||||||
# Mix Protocol
|
info "Starting Waku Mix"
|
||||||
|
|
||||||
|
await procCall start(MixProtocol(self))
|
||||||
|
|
||||||
|
self.maintenanceIntervalFut = self.mixPoolMaintenance(self.maintenanceInterval)
|
||||||
|
|
||||||
|
info "Waku Mix Started"
|
||||||
|
|
||||||
|
method stop*(self: WakuMix) {.async.} =
|
||||||
|
if not self.started:
|
||||||
|
return
|
||||||
|
|
||||||
|
info "Stopping Waku Mix"
|
||||||
|
|
||||||
|
if not self.maintenanceIntervalFut.isNil():
|
||||||
|
self.maintenanceIntervalFut.cancelSoon()
|
||||||
|
self.maintenanceIntervalFut = nil
|
||||||
|
|
||||||
|
await procCall stop(MixProtocol(self))
|
||||||
|
|
||||||
|
info "Successfully stopped Waku Mix"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user