mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-02-14 02:43:12 +00:00
integrate extended kademlia discovery with xpr for mix
This commit is contained in:
parent
07c618248b
commit
d2ba5149af
@ -24,12 +24,14 @@ import
|
||||
stream/connection, # create and close stream read / write connections
|
||||
multiaddress,
|
||||
# encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP
|
||||
multicodec,
|
||||
peerinfo,
|
||||
# manage the information of a peer, such as peer ID and public / private key
|
||||
peerid, # Implement how peers interact
|
||||
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
|
||||
nameresolving/dnsresolver,
|
||||
protocols/mix/curve25519,
|
||||
protocols/mix/mix_protocol,
|
||||
] # define DNS resolution
|
||||
import
|
||||
waku/[
|
||||
@ -38,6 +40,7 @@ import
|
||||
waku_lightpush/rpc,
|
||||
waku_enr,
|
||||
discovery/waku_dnsdisc,
|
||||
discovery/waku_ext_kademlia,
|
||||
waku_node,
|
||||
node/waku_metrics,
|
||||
node/peer_manager,
|
||||
@ -84,6 +87,24 @@ type
|
||||
|
||||
const MinMixNodePoolSize = 4
|
||||
|
||||
proc parseKadBootstrapNode(
|
||||
multiAddrStr: string
|
||||
): Result[(PeerId, seq[MultiAddress]), string] =
|
||||
## Parse a multiaddr string that includes /p2p/<peerID> into (PeerId, seq[MultiAddress])
|
||||
let multiAddr = MultiAddress.init(multiAddrStr).valueOr:
|
||||
return err("Invalid multiaddress: " & multiAddrStr)
|
||||
|
||||
let peerIdPart = multiAddr.getPart(multiCodec("p2p")).valueOr:
|
||||
return err("Multiaddress must include /p2p/<peerID>: " & multiAddrStr)
|
||||
|
||||
let peerIdBytes = peerIdPart.protoArgument().valueOr:
|
||||
return err("Failed to extract peer ID from multiaddress: " & multiAddrStr)
|
||||
|
||||
let peerId = PeerId.init(peerIdBytes).valueOr:
|
||||
return err("Invalid peer ID in multiaddress: " & multiAddrStr)
|
||||
|
||||
ok((peerId, @[multiAddr]))
|
||||
|
||||
#####################
|
||||
## chat2 protobufs ##
|
||||
#####################
|
||||
@ -453,14 +474,39 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
(await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr:
|
||||
error "failed to mount waku mix protocol: ", error = $error
|
||||
quit(QuitFailure)
|
||||
await node.mountRendezvousClient(conf.clusterId)
|
||||
|
||||
# Setup extended kademlia discovery if bootstrap nodes are provided
|
||||
if conf.kadBootstrapNodes.len > 0:
|
||||
var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])]
|
||||
for nodeStr in conf.kadBootstrapNodes:
|
||||
let parsed = parseKadBootstrapNode(nodeStr).valueOr:
|
||||
error "Failed to parse kademlia bootstrap node", node = nodeStr, error = error
|
||||
continue
|
||||
kadBootstrapPeers.add(parsed)
|
||||
|
||||
if kadBootstrapPeers.len > 0:
|
||||
(
|
||||
await setupExtendedKademliaDiscovery(
|
||||
node,
|
||||
ExtendedKademliaDiscoveryParams(
|
||||
bootstrapNodes: kadBootstrapPeers,
|
||||
mixPubKey: some(mixPubKey),
|
||||
advertiseMix: false,
|
||||
),
|
||||
)
|
||||
).isOkOr:
|
||||
error "failed to setup kademlia discovery", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
#await node.mountRendezvousClient(conf.clusterId)
|
||||
|
||||
await node.start()
|
||||
|
||||
node.peerManager.start()
|
||||
node.startExtendedKademliaDiscoveryLoop()
|
||||
|
||||
await node.mountLibp2pPing()
|
||||
await node.mountPeerExchangeClient()
|
||||
#await node.mountPeerExchangeClient()
|
||||
let pubsubTopic = conf.getPubsubTopic(node, conf.contentTopic)
|
||||
echo "pubsub topic is: " & pubsubTopic
|
||||
let nick = await readNick(transp)
|
||||
@ -601,15 +647,14 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
node, pubsubTopic, conf.contentTopic, servicePeerInfo, false
|
||||
)
|
||||
echo "waiting for mix nodes to be discovered..."
|
||||
while true:
|
||||
if node.getMixNodePoolSize() >= MinMixNodePoolSize:
|
||||
break
|
||||
discard await node.fetchPeerExchangePeers()
|
||||
await sleepAsync(1000)
|
||||
|
||||
while node.getMixNodePoolSize() < MinMixNodePoolSize:
|
||||
info "waiting for mix nodes to be discovered",
|
||||
currentpoolSize = node.getMixNodePoolSize()
|
||||
# Try to lookup mix peers via kademlia if pool is low
|
||||
let found = await node.lookupMixPeers()
|
||||
if found > 0:
|
||||
info "found mix peers via kademlia lookup", count = found
|
||||
await sleepAsync(1000)
|
||||
notice "ready to publish with mix node pool size ",
|
||||
currentpoolSize = node.getMixNodePoolSize()
|
||||
|
||||
@ -203,7 +203,7 @@ type
|
||||
fleet* {.
|
||||
desc:
|
||||
"Select the fleet to connect to. This sets the DNS discovery URL to the selected fleet.",
|
||||
defaultValue: Fleet.test,
|
||||
defaultValue: Fleet.none,
|
||||
name: "fleet"
|
||||
.}: Fleet
|
||||
|
||||
@ -228,7 +228,14 @@ type
|
||||
desc: "WebSocket Secure Support.",
|
||||
defaultValue: false,
|
||||
name: "websocket-secure-support"
|
||||
.}: bool ## rln-relay configuration
|
||||
.}: bool
|
||||
|
||||
## Kademlia Discovery config
|
||||
kadBootstrapNodes* {.
|
||||
desc:
|
||||
"Peer multiaddr for kademlia discovery bootstrap node (must include /p2p/<peerID>). Argument may be repeated.",
|
||||
name: "kad-bootstrap-node"
|
||||
.}: seq[string]
|
||||
|
||||
proc parseCmdArg*(T: type MixNodePubInfo, p: string): T =
|
||||
let elements = p.split(":")
|
||||
|
||||
@ -621,6 +621,20 @@ with the drawback of consuming some more bandwidth.""",
|
||||
name: "mixnode"
|
||||
.}: seq[MixNodePubInfo]
|
||||
|
||||
# Kademlia Discovery config
|
||||
enableKadDiscovery* {.
|
||||
desc:
|
||||
"Enable extended kademlia discovery. Can be enabled without bootstrap nodes for the first node in the network.",
|
||||
defaultValue: false,
|
||||
name: "enable-kad-discovery"
|
||||
.}: bool
|
||||
|
||||
kadBootstrapNodes* {.
|
||||
desc:
|
||||
"Peer multiaddr for kademlia discovery bootstrap node (must include /p2p/<peerID>). Argument may be repeated.",
|
||||
name: "kad-bootstrap-node"
|
||||
.}: seq[string]
|
||||
|
||||
## websocket config
|
||||
websocketSupport* {.
|
||||
desc: "Enable websocket: true|false",
|
||||
@ -1057,4 +1071,7 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
|
||||
|
||||
b.rateLimitConf.withRateLimits(n.rateLimits)
|
||||
|
||||
b.kademliaDiscoveryConf.withEnabled(n.enableKadDiscovery)
|
||||
b.kademliaDiscoveryConf.withBootstrapNodes(n.kadBootstrapNodes)
|
||||
|
||||
return b.build()
|
||||
|
||||
224
waku/discovery/waku_ext_kademlia.nim
Normal file
224
waku/discovery/waku_ext_kademlia.nim
Normal file
@ -0,0 +1,224 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, sequtils],
|
||||
chronos,
|
||||
chronicles,
|
||||
results,
|
||||
libp2p/[peerid, multiaddress],
|
||||
libp2p/extended_peer_record,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/protocols/[kademlia, kad_disco],
|
||||
libp2p/protocols/kademlia_discovery/types as kad_types,
|
||||
libp2p/protocols/mix/mix_protocol
|
||||
|
||||
import ../waku_core, ../node/waku_node, ../node/peer_manager
|
||||
|
||||
logScope:
|
||||
topics = "waku extended kademlia discovery"
|
||||
|
||||
const
|
||||
DefaultExtendedKademliaDiscoveryInterval* = chronos.seconds(5)
|
||||
ExtendedKademliaDiscoveryStartupDelay* = chronos.seconds(5)
|
||||
|
||||
type ExtendedKademliaDiscoveryParams* = object
|
||||
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
|
||||
mixPubKey*: Option[Curve25519Key]
|
||||
advertiseMix*: bool = false
|
||||
|
||||
proc setupExtendedKademliaDiscovery*(
|
||||
node: WakuNode, params: ExtendedKademliaDiscoveryParams
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
if params.bootstrapNodes.len == 0:
|
||||
info "starting kademlia discovery as seed node (no bootstrap nodes)"
|
||||
|
||||
let kademlia = KademliaDiscovery.new(
|
||||
node.switch,
|
||||
bootstrapNodes = params.bootstrapNodes,
|
||||
config = KadDHTConfig.new(
|
||||
validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector()
|
||||
),
|
||||
codec = ExtendedKademliaDiscoveryCodec,
|
||||
)
|
||||
|
||||
try:
|
||||
node.switch.mount(kademlia)
|
||||
except CatchableError:
|
||||
return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg())
|
||||
|
||||
# Register services BEFORE starting kademlia so they are included in the
|
||||
# initial self-signed peer record published to the DHT
|
||||
if params.advertiseMix:
|
||||
if params.mixPubKey.isSome():
|
||||
discard kademlia.startAdvertising(
|
||||
ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get()))
|
||||
)
|
||||
debug "extended kademlia advertising mix service",
|
||||
keyHex = params.mixPubKey.get().toHex(),
|
||||
bootstrapNodes = params.bootstrapNodes.len
|
||||
else:
|
||||
warn "mix advertising enabled but no key provided"
|
||||
|
||||
try:
|
||||
await kademlia.start()
|
||||
except CatchableError:
|
||||
return err("failed to start kademlia discovery: " & getCurrentExceptionMsg())
|
||||
|
||||
node.wakuKademlia = kademlia
|
||||
|
||||
info "kademlia discovery started",
|
||||
bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix
|
||||
|
||||
ok()
|
||||
|
||||
proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] =
|
||||
if service.id != MixProtocolID:
|
||||
trace "service is not mix protocol", serviceId = service.id, mixProtocolId = MixProtocolID
|
||||
return none(Curve25519Key)
|
||||
|
||||
debug "found mix protocol service", dataLen = service.data.len, expectedLen = Curve25519KeySize
|
||||
|
||||
if service.data.len != Curve25519KeySize:
|
||||
warn "invalid mix pub key length from kademlia record",
|
||||
expected = Curve25519KeySize, actual = service.data.len, dataHex = service.data.toHex()
|
||||
return none(Curve25519Key)
|
||||
|
||||
let key = intoCurve25519Key(service.data)
|
||||
debug "successfully extracted mix pub key", keyHex = key.toHex()
|
||||
some(key)
|
||||
|
||||
proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
|
||||
debug "processing kademlia record",
|
||||
peerId = record.peerId,
|
||||
numAddresses = record.addresses.len,
|
||||
numServices = record.services.len,
|
||||
serviceIds = record.services.mapIt(it.id)
|
||||
|
||||
if record.addresses.len == 0:
|
||||
trace "kademlia record missing addresses", peerId = record.peerId
|
||||
return none(RemotePeerInfo)
|
||||
|
||||
let addrs = record.addresses.mapIt(it.address)
|
||||
if addrs.len == 0:
|
||||
trace "kademlia record produced no dialable addresses", peerId = record.peerId
|
||||
return none(RemotePeerInfo)
|
||||
|
||||
let protocols = record.services.mapIt(it.id)
|
||||
|
||||
var mixPubKey = none(Curve25519Key)
|
||||
for service in record.services:
|
||||
debug "checking service", peerId = record.peerId, serviceId = service.id, dataLen = service.data.len
|
||||
mixPubKey = extractMixPubKey(service)
|
||||
if mixPubKey.isSome():
|
||||
debug "extracted mix public key from service", peerId = record.peerId
|
||||
break
|
||||
|
||||
if record.services.len > 0 and mixPubKey.isNone():
|
||||
debug "record has services but no valid mix key",
|
||||
peerId = record.peerId,
|
||||
services = record.services.mapIt(it.id)
|
||||
|
||||
some(
|
||||
RemotePeerInfo.init(
|
||||
record.peerId,
|
||||
addrs = addrs,
|
||||
protocols = protocols,
|
||||
origin = PeerOrigin.Kademlia,
|
||||
mixPubKey = mixPubKey,
|
||||
)
|
||||
)
|
||||
|
||||
proc runExtendedKademliaDiscoveryLoop*(
|
||||
node: WakuNode, interval = DefaultExtendedKademliaDiscoveryInterval
|
||||
): Future[void] {.async.} =
|
||||
info "extended kademlia discovery loop started", interval = interval
|
||||
|
||||
while true:
|
||||
if node.wakuKademlia.isNil():
|
||||
info "extended kademlia discovery loop stopping: protocol disabled"
|
||||
return
|
||||
|
||||
if not node.started:
|
||||
await sleepAsync(ExtendedKademliaDiscoveryStartupDelay)
|
||||
continue
|
||||
|
||||
var records: seq[ExtendedPeerRecord]
|
||||
try:
|
||||
records = await node.wakuKademlia.randomRecords()
|
||||
except CatchableError:
|
||||
warn "extended kademlia discovery failed", error = getCurrentExceptionMsg()
|
||||
await sleepAsync(interval)
|
||||
continue
|
||||
|
||||
debug "received random records from kademlia", numRecords = records.len
|
||||
|
||||
var added = 0
|
||||
for record in records:
|
||||
let peerOpt = remotePeerInfoFrom(record)
|
||||
if peerOpt.isNone():
|
||||
continue
|
||||
|
||||
let peerInfo = peerOpt.get()
|
||||
node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
debug "peer added via extended kademlia discovery",
|
||||
peerId = $peerInfo.peerId,
|
||||
addresses = peerInfo.addrs.mapIt($it),
|
||||
protocols = peerInfo.protocols,
|
||||
hasMixPubKey = peerInfo.mixPubKey.isSome()
|
||||
added.inc()
|
||||
|
||||
if added > 0:
|
||||
info "added peers from extended kademlia discovery", count = added
|
||||
|
||||
await sleepAsync(interval)
|
||||
|
||||
proc startExtendedKademliaDiscoveryLoop*(
|
||||
node: WakuNode, interval = DefaultExtendedKademliaDiscoveryInterval
|
||||
) =
|
||||
if node.wakuKademlia.isNil():
|
||||
trace "extended kademlia discovery not started: protocol not mounted"
|
||||
return
|
||||
|
||||
if not node.kademliaDiscoveryLoop.isNil():
|
||||
trace "extended kademlia discovery loop already running"
|
||||
return
|
||||
|
||||
node.kademliaDiscoveryLoop = node.runExtendedKademliaDiscoveryLoop(interval)
|
||||
|
||||
proc lookupMixPeers*(
|
||||
node: WakuNode
|
||||
): Future[int] {.async.} =
|
||||
## Lookup mix peers via kademlia and add them to the peer store.
|
||||
## Returns the number of mix peers found and added.
|
||||
if node.wakuKademlia.isNil():
|
||||
warn "cannot lookup mix peers: kademlia not mounted"
|
||||
return 0
|
||||
|
||||
let mixService = ServiceInfo(id: MixProtocolID, data: @[])
|
||||
var records: seq[ExtendedPeerRecord]
|
||||
try:
|
||||
records = await node.wakuKademlia.lookup(mixService)
|
||||
except CatchableError:
|
||||
warn "mix peer lookup failed", error = getCurrentExceptionMsg()
|
||||
return 0
|
||||
|
||||
debug "mix peer lookup returned records", numRecords = records.len
|
||||
|
||||
var added = 0
|
||||
for record in records:
|
||||
let peerOpt = remotePeerInfoFrom(record)
|
||||
if peerOpt.isNone():
|
||||
continue
|
||||
|
||||
let peerInfo = peerOpt.get()
|
||||
if peerInfo.mixPubKey.isNone():
|
||||
continue
|
||||
|
||||
node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
|
||||
info "mix peer added via kademlia lookup",
|
||||
peerId = $peerInfo.peerId,
|
||||
mixPubKey = peerInfo.mixPubKey.get().toHex()
|
||||
added.inc()
|
||||
|
||||
info "mix peer lookup complete", found = added
|
||||
return added
|
||||
@ -10,10 +10,12 @@ import
|
||||
./metrics_server_conf_builder,
|
||||
./rate_limit_conf_builder,
|
||||
./rln_relay_conf_builder,
|
||||
./mix_conf_builder
|
||||
./mix_conf_builder,
|
||||
./kademlia_discovery_conf_builder
|
||||
|
||||
export
|
||||
waku_conf_builder, filter_service_conf_builder, store_sync_conf_builder,
|
||||
store_service_conf_builder, rest_server_conf_builder, dns_discovery_conf_builder,
|
||||
discv5_conf_builder, web_socket_conf_builder, metrics_server_conf_builder,
|
||||
rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder
|
||||
rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder,
|
||||
kademlia_discovery_conf_builder
|
||||
|
||||
@ -0,0 +1,59 @@
|
||||
import chronicles, std/options, results
|
||||
import libp2p/[peerid, multiaddress, multicodec]
|
||||
import ../waku_conf
|
||||
|
||||
logScope:
|
||||
topics = "waku conf builder kademlia discovery"
|
||||
|
||||
#######################################
|
||||
## Kademlia Discovery Config Builder ##
|
||||
#######################################
|
||||
type KademliaDiscoveryConfBuilder* = object
|
||||
enabled*: Option[bool]
|
||||
bootstrapNodes*: seq[string]
|
||||
|
||||
proc init*(T: type KademliaDiscoveryConfBuilder): KademliaDiscoveryConfBuilder =
|
||||
KademliaDiscoveryConfBuilder()
|
||||
|
||||
proc withEnabled*(b: var KademliaDiscoveryConfBuilder, enabled: bool) =
|
||||
b.enabled = some(enabled)
|
||||
|
||||
proc withBootstrapNodes*(
|
||||
b: var KademliaDiscoveryConfBuilder, bootstrapNodes: seq[string]
|
||||
) =
|
||||
b.bootstrapNodes = bootstrapNodes
|
||||
|
||||
proc parseBootstrapNode(
|
||||
multiAddrStr: string
|
||||
): Result[(PeerId, seq[MultiAddress]), string] =
|
||||
## Parse a multiaddr string that includes /p2p/<peerID> into (PeerId, seq[MultiAddress])
|
||||
let multiAddr = MultiAddress.init(multiAddrStr).valueOr:
|
||||
return err("Invalid multiaddress: " & multiAddrStr)
|
||||
|
||||
let peerIdPart = multiAddr.getPart(multiCodec("p2p")).valueOr:
|
||||
return err("Multiaddress must include /p2p/<peerID>: " & multiAddrStr)
|
||||
|
||||
let peerIdBytes = peerIdPart.protoArgument().valueOr:
|
||||
return err("Failed to extract peer ID from multiaddress: " & multiAddrStr)
|
||||
|
||||
let peerId = PeerId.init(peerIdBytes).valueOr:
|
||||
return err("Invalid peer ID in multiaddress: " & multiAddrStr)
|
||||
|
||||
# Get the address without the p2p part for connecting
|
||||
ok((peerId, @[multiAddr]))
|
||||
|
||||
proc build*(
|
||||
b: KademliaDiscoveryConfBuilder
|
||||
): Result[Option[KademliaDiscoveryConf], string] =
|
||||
# Kademlia is enabled if explicitly enabled OR if bootstrap nodes are provided
|
||||
let enabled = b.enabled.get(false) or b.bootstrapNodes.len > 0
|
||||
if not enabled:
|
||||
return ok(none(KademliaDiscoveryConf))
|
||||
|
||||
var parsedNodes: seq[(PeerId, seq[MultiAddress])]
|
||||
for nodeStr in b.bootstrapNodes:
|
||||
let parsed = parseBootstrapNode(nodeStr).valueOr:
|
||||
return err("Failed to parse kademlia bootstrap node: " & error)
|
||||
parsedNodes.add(parsed)
|
||||
|
||||
return ok(some(KademliaDiscoveryConf(bootstrapNodes: parsedNodes)))
|
||||
@ -25,7 +25,8 @@ import
|
||||
./metrics_server_conf_builder,
|
||||
./rate_limit_conf_builder,
|
||||
./rln_relay_conf_builder,
|
||||
./mix_conf_builder
|
||||
./mix_conf_builder,
|
||||
./kademlia_discovery_conf_builder
|
||||
|
||||
logScope:
|
||||
topics = "waku conf builder"
|
||||
@ -80,6 +81,7 @@ type WakuConfBuilder* = object
|
||||
mixConf*: MixConfBuilder
|
||||
webSocketConf*: WebSocketConfBuilder
|
||||
rateLimitConf*: RateLimitConfBuilder
|
||||
kademliaDiscoveryConf*: KademliaDiscoveryConfBuilder
|
||||
# End conf builders
|
||||
relay: Option[bool]
|
||||
lightPush: Option[bool]
|
||||
@ -140,6 +142,7 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder =
|
||||
storeServiceConf: StoreServiceConfBuilder.init(),
|
||||
webSocketConf: WebSocketConfBuilder.init(),
|
||||
rateLimitConf: RateLimitConfBuilder.init(),
|
||||
kademliaDiscoveryConf: KademliaDiscoveryConfBuilder.init(),
|
||||
)
|
||||
|
||||
proc withNetworkConf*(b: var WakuConfBuilder, networkConf: NetworkConf) =
|
||||
@ -506,6 +509,9 @@ proc build*(
|
||||
let rateLimit = builder.rateLimitConf.build().valueOr:
|
||||
return err("Rate limits Conf building failed: " & $error)
|
||||
|
||||
let kademliaDiscoveryConf = builder.kademliaDiscoveryConf.build().valueOr:
|
||||
return err("Kademlia Discovery Conf building failed: " & $error)
|
||||
|
||||
# End - Build sub-configs
|
||||
|
||||
let logLevel =
|
||||
@ -628,6 +634,7 @@ proc build*(
|
||||
restServerConf: restServerConf,
|
||||
dnsDiscoveryConf: dnsDiscoveryConf,
|
||||
mixConf: mixConf,
|
||||
kademliaDiscoveryConf: kademliaDiscoveryConf,
|
||||
# end confs
|
||||
nodeKey: nodeKey,
|
||||
clusterId: clusterId,
|
||||
|
||||
@ -6,7 +6,8 @@ import
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/connectivity/relay/relay,
|
||||
libp2p/nameresolving/dnsresolver,
|
||||
libp2p/crypto/crypto
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519
|
||||
|
||||
import
|
||||
./internal_config,
|
||||
@ -32,6 +33,7 @@ import
|
||||
../waku_store_legacy/common as legacy_common,
|
||||
../waku_filter_v2,
|
||||
../waku_peer_exchange,
|
||||
../discovery/waku_ext_kademlia,
|
||||
../node/peer_manager,
|
||||
../node/peer_manager/peer_store/waku_peer_storage,
|
||||
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
||||
@ -165,12 +167,29 @@ proc setupProtocols(
|
||||
|
||||
#mount mix
|
||||
if conf.mixConf.isSome():
|
||||
let mixConf = conf.mixConf.get()
|
||||
(await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr:
|
||||
return err("failed to mount waku mix protocol: " & $error)
|
||||
|
||||
# Setup extended kademlia discovery
|
||||
if conf.kademliaDiscoveryConf.isSome():
|
||||
let mixPubKey =
|
||||
if conf.mixConf.isSome():
|
||||
some(conf.mixConf.get().mixPubKey)
|
||||
else:
|
||||
none(Curve25519Key)
|
||||
|
||||
(
|
||||
await node.mountMix(
|
||||
conf.clusterId, conf.mixConf.get().mixKey, conf.mixConf.get().mixnodes
|
||||
await setupExtendedKademliaDiscovery(
|
||||
node,
|
||||
ExtendedKademliaDiscoveryParams(
|
||||
bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes,
|
||||
mixPubKey: mixPubKey,
|
||||
advertiseMix: conf.mixConf.isSome(),
|
||||
),
|
||||
)
|
||||
).isOkOr:
|
||||
return err("failed to mount waku mix protocol: " & $error)
|
||||
return err("failed to setup kademlia discovery: " & error)
|
||||
|
||||
if conf.storeServiceConf.isSome():
|
||||
let storeServiceConf = conf.storeServiceConf.get()
|
||||
@ -477,6 +496,8 @@ proc startNode*(
|
||||
if conf.relay:
|
||||
node.peerManager.start()
|
||||
|
||||
startExtendedKademliaDiscoveryLoop(node)
|
||||
|
||||
return ok()
|
||||
|
||||
proc setupNode*(
|
||||
|
||||
@ -4,6 +4,7 @@ import
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/multiaddress,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/peerid,
|
||||
secp256k1,
|
||||
results
|
||||
|
||||
@ -51,6 +52,10 @@ type MixConf* = ref object
|
||||
mixPubKey*: Curve25519Key
|
||||
mixnodes*: seq[MixNodePubInfo]
|
||||
|
||||
type KademliaDiscoveryConf* = object
|
||||
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
|
||||
## Bootstrap nodes for extended kademlia discovery.
|
||||
|
||||
type StoreServiceConf* {.requiresInit.} = object
|
||||
dbMigration*: bool
|
||||
dbURl*: string
|
||||
@ -109,6 +114,7 @@ type WakuConf* {.requiresInit.} = ref object
|
||||
metricsServerConf*: Option[MetricsServerConf]
|
||||
webSocketConf*: Option[WebSocketConf]
|
||||
mixConf*: Option[MixConf]
|
||||
kademliaDiscoveryConf*: Option[KademliaDiscoveryConf]
|
||||
|
||||
portsShift*: uint16
|
||||
dnsAddrsNameServers*: seq[IpAddress]
|
||||
|
||||
@ -17,6 +17,7 @@ import
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/protocols/kad_disco,
|
||||
libp2p/builders,
|
||||
libp2p/transports/transport,
|
||||
libp2p/transports/tcptransport,
|
||||
@ -131,6 +132,8 @@ type
|
||||
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
||||
rateLimitSettings*: ProtocolRateLimitSettings
|
||||
wakuMix*: WakuMix
|
||||
wakuKademlia*: KademliaDiscovery
|
||||
kademliaDiscoveryLoop*: Future[void]
|
||||
|
||||
proc getShardsGetter(node: WakuNode): GetShards =
|
||||
return proc(): seq[uint16] {.closure, gcsafe, raises: [].} =
|
||||
@ -534,6 +537,11 @@ proc stop*(node: WakuNode) {.async.} =
|
||||
if not node.wakuPeerExchangeClient.isNil() and
|
||||
not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
|
||||
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()
|
||||
node.wakuPeerExchangeClient.pxLoopHandle = nil
|
||||
|
||||
if not node.kademliaDiscoveryLoop.isNil():
|
||||
await node.kademliaDiscoveryLoop.cancelAndWait()
|
||||
node.kademliaDiscoveryLoop = nil
|
||||
|
||||
if not node.wakuRendezvous.isNil():
|
||||
await node.wakuRendezvous.stopWait()
|
||||
|
||||
@ -38,6 +38,7 @@ type
|
||||
Static
|
||||
PeerExchange
|
||||
Dns
|
||||
Kademlia
|
||||
|
||||
PeerDirection* = enum
|
||||
UnknownDirection
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user