mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
remove mixPubKey from ENR and provide config param to pass mix nodes statically (#3587)
This commit is contained in:
parent
63f3234876
commit
797370ec80
@ -397,22 +397,6 @@ proc maintainSubscription(
|
||||
|
||||
await sleepAsync(30000) # Subscription maintenance interval
|
||||
|
||||
proc processMixNodes(localnode: WakuNode, nodes: seq[string]) {.async.} =
|
||||
if nodes.len == 0:
|
||||
return
|
||||
|
||||
info "Processing mix nodes: ", nodes = $nodes
|
||||
for node in nodes:
|
||||
var enrRec: enr.Record
|
||||
if enrRec.fromURI(node):
|
||||
let peerInfo = enrRec.toRemotePeerInfo().valueOr:
|
||||
error "Failed to parse mix node", error = error
|
||||
continue
|
||||
localnode.peermanager.addPeer(peerInfo, Discv5)
|
||||
info "Added mix node", peer = peerInfo
|
||||
else:
|
||||
error "Failed to parse mix node ENR", node = node
|
||||
|
||||
{.pop.}
|
||||
# @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
||||
proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
@ -486,11 +470,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
error "failed to generate mix key pair", error = error
|
||||
return
|
||||
|
||||
(await node.mountMix(conf.clusterId, mixPrivKey)).isOkOr:
|
||||
(await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr:
|
||||
error "failed to mount waku mix protocol: ", error = $error
|
||||
quit(QuitFailure)
|
||||
if conf.mixnodes.len > 0:
|
||||
await processMixNodes(node, conf.mixnodes)
|
||||
await node.start()
|
||||
|
||||
node.peerManager.start()
|
||||
@ -624,7 +606,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
servicePeerInfo = parsePeerInfo(conf.serviceNode).valueOr:
|
||||
error "Couldn't parse conf.serviceNode", error = error
|
||||
RemotePeerInfo()
|
||||
if $servicePeerInfo.peerId == "":
|
||||
if servicePeerInfo == nil or $servicePeerInfo.peerId == "":
|
||||
# Assuming that service node supports all services
|
||||
servicePeerInfo = selectRandomServicePeer(
|
||||
node.peerManager, none(RemotePeerInfo), WakuLightpushCodec
|
||||
|
||||
@ -4,12 +4,15 @@ import
|
||||
eth/keys,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/secp,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/multiaddress,
|
||||
libp2p/multicodec,
|
||||
nimcrypto/utils,
|
||||
confutils,
|
||||
confutils/defs,
|
||||
confutils/std/net
|
||||
|
||||
import waku/waku_core
|
||||
import waku/waku_core, waku/waku_mix
|
||||
|
||||
type
|
||||
Fleet* = enum
|
||||
@ -83,8 +86,10 @@ type
|
||||
.}: seq[string]
|
||||
|
||||
mixnodes* {.
|
||||
desc: "Peer ENR to add as a mixnode. Argument may be repeated.", name: "mixnode"
|
||||
.}: seq[string]
|
||||
desc:
|
||||
"Multiaddress and mix-key of mix node to be statically specified in format multiaddr:mixPubKey. Argument may be repeated.",
|
||||
name: "mixnode"
|
||||
.}: seq[MixNodePubInfo]
|
||||
|
||||
keepAlive* {.
|
||||
desc: "Enable keep-alive for idle connections: true|false",
|
||||
@ -225,6 +230,23 @@ type
|
||||
name: "websocket-secure-support"
|
||||
.}: bool ## rln-relay configuration
|
||||
|
||||
proc parseCmdArg*(T: type MixNodePubInfo, p: string): T =
|
||||
let elements = p.split(":")
|
||||
if elements.len != 2:
|
||||
raise newException(
|
||||
ValueError, "Invalid format for mix node expected multiaddr:mixPublicKey"
|
||||
)
|
||||
let multiaddr = MultiAddress.init(elements[0]).valueOr:
|
||||
raise newException(ValueError, "Invalid multiaddress format")
|
||||
if not multiaddr.contains(multiCodec("ip4")).get():
|
||||
raise newException(
|
||||
ValueError, "Invalid format for ip address, expected a ipv4 multiaddress"
|
||||
)
|
||||
|
||||
return MixNodePubInfo(
|
||||
multiaddr: elements[0], pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1]))
|
||||
)
|
||||
|
||||
# NOTE: Keys are different in nim-libp2p
|
||||
proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T =
|
||||
try:
|
||||
|
||||
@ -107,7 +107,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.}
|
||||
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
|
||||
error "failed to generate mix key pair", error = error
|
||||
return
|
||||
(await node.mountMix(clusterId, mixPrivKey)).isOkOr:
|
||||
(await node.mountMix(clusterId, mixPrivKey, conf.mixnodes)).isOkOr:
|
||||
error "failed to mount waku mix protocol: ", error = $error
|
||||
return
|
||||
|
||||
|
||||
@ -1,4 +1,11 @@
|
||||
import confutils/defs
|
||||
import
|
||||
confutils/defs,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/multiaddress,
|
||||
libp2p/multicodec,
|
||||
nimcrypto/utils as ncrutils
|
||||
|
||||
import waku/waku_mix
|
||||
|
||||
type LightPushMixConf* = object
|
||||
destPeerAddr* {.desc: "Destination peer address with peerId.", name: "dp-addr".}:
|
||||
@ -26,3 +33,26 @@ type LightPushMixConf* = object
|
||||
mixDisabled* {.
|
||||
desc: "Do not use mix for publishing.", defaultValue: false, name: "without-mix"
|
||||
.}: bool
|
||||
|
||||
mixnodes* {.
|
||||
desc:
|
||||
"Multiaddress and mix-key of mix node to be statically specified in format multiaddr:mixPubKey. Argument may be repeated.",
|
||||
name: "mixnode"
|
||||
.}: seq[MixNodePubInfo]
|
||||
|
||||
proc parseCmdArg*(T: typedesc[MixNodePubInfo], p: string): T =
|
||||
let elements = p.split(":")
|
||||
if elements.len != 2:
|
||||
raise newException(
|
||||
ValueError, "Invalid format for mix node expected multiaddr:mixPublicKey"
|
||||
)
|
||||
|
||||
let multiaddr = MultiAddress.init(elements[0]).valueOr:
|
||||
raise newException(ValueError, "Invalid multiaddress format")
|
||||
if not multiaddr.contains(multiCodec("ip4")).get():
|
||||
raise newException(
|
||||
ValueError, "Invalid format for ip address, expected a ipv4 multiaddress"
|
||||
)
|
||||
return MixNodePubInfo(
|
||||
multiaddr: elements[0], pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1]))
|
||||
)
|
||||
|
||||
@ -1 +1 @@
|
||||
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --mixnode="enr:-Nq4QIPd6TbOWns1TsbSq2KB6g3hIClJa8qBUWFFwbGut9OBCwTHYshi0-iv1ilTMx4FkuSJ4NtkZVx0QSrrMRTGpEsDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCSMehtpkMlApAKhPhnAEznhjKrUs2OMLHsMizXlXEMKoptdWx0aWFkZHJzigAIBMCoRD4G6mKCcnOFAAIBAACJc2VjcDI1NmsxoQN6R8gw1Pu8IwMlTap0_E7vVd1wcaFgg_VUaaeVWSZYVIN0Y3CC6mKDdWRwgiMrhXdha3UyLQ" --mixnode="enr:-Nq4QC6XyKXZSlJNFzTDPI118SBC2ilLqE05RR4o4OzEZxueGkYtExHtTBvmY-9pl17EXZtXvF_tIV_2g0K_fb2LmsoDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaAnXNaInh8pykjlue24ANGpT0nxPTk6Ds8aB691NQbebIptdWx0aWFkZHJzigAIBMCoRD4G6mOCcnOFAAIBAACJc2VjcDI1NmsxoQPYhmrbTqylbdenVfvO2U0w6EC4A-l5lwvu3QWL7IqkO4N0Y3CC6mODdWRwgiMthXdha3UyLQ" --mixnode="enr:-Nq4QKoh8Ta8Q3zLLAkf4hyYzxpuTc-BRBGb_WYVIm6hRptKZFuIo3DNlWCpfIxJnNI5epjLWQWHFUo3dqpAoWhoXEUDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaDg7VlKjVBmgb4HXo4jcjR4OI-xgkd_ekaTCaJecHb8GIptdWx0aWFkZHJzigAIBMCoRD4G6mSCcnOFAAIBAACJc2VjcDI1NmsxoQOnphVC3U5zmOCkjOI2tY0v8K5QkXSaE5xO37q3iFfKGIN0Y3CC6mSDdWRwgiMvhXdha3UyLQ" --mixnode="enr:-Nq4QN7ub3xi53eDyKKstEM2IjFo7oY5Kf4glFz45W2saWqNXPqJFruw08c9B_EIu1LoW4opwXId_4zvPmekZwYHKp8DgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCP16GnwZtAPSMUUqmx6kDrHMdvRV2RjviYDnaF-e7rH4ptdWx0aWFkZHJzigAIBMCoRD4G6mWCcnOFAAIBAACJc2VjcDI1NmsxoQLJtl9kA98YgBkVElkJgl9XyyRNco78oShb1hsv6Mlbs4N0Y3CC6mWDdWRwgiMxhXdha3UyLQ"
|
||||
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
|
||||
|
||||
@ -1 +1 @@
|
||||
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --mixnode="enr:-Nq4QIPd6TbOWns1TsbSq2KB6g3hIClJa8qBUWFFwbGut9OBCwTHYshi0-iv1ilTMx4FkuSJ4NtkZVx0QSrrMRTGpEsDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCSMehtpkMlApAKhPhnAEznhjKrUs2OMLHsMizXlXEMKoptdWx0aWFkZHJzigAIBMCoRD4G6mKCcnOFAAIBAACJc2VjcDI1NmsxoQN6R8gw1Pu8IwMlTap0_E7vVd1wcaFgg_VUaaeVWSZYVIN0Y3CC6mKDdWRwgiMrhXdha3UyLQ" --mixnode="enr:-Nq4QC6XyKXZSlJNFzTDPI118SBC2ilLqE05RR4o4OzEZxueGkYtExHtTBvmY-9pl17EXZtXvF_tIV_2g0K_fb2LmsoDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaAnXNaInh8pykjlue24ANGpT0nxPTk6Ds8aB691NQbebIptdWx0aWFkZHJzigAIBMCoRD4G6mOCcnOFAAIBAACJc2VjcDI1NmsxoQPYhmrbTqylbdenVfvO2U0w6EC4A-l5lwvu3QWL7IqkO4N0Y3CC6mODdWRwgiMthXdha3UyLQ" --mixnode="enr:-Nq4QKoh8Ta8Q3zLLAkf4hyYzxpuTc-BRBGb_WYVIm6hRptKZFuIo3DNlWCpfIxJnNI5epjLWQWHFUo3dqpAoWhoXEUDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaDg7VlKjVBmgb4HXo4jcjR4OI-xgkd_ekaTCaJecHb8GIptdWx0aWFkZHJzigAIBMCoRD4G6mSCcnOFAAIBAACJc2VjcDI1NmsxoQOnphVC3U5zmOCkjOI2tY0v8K5QkXSaE5xO37q3iFfKGIN0Y3CC6mSDdWRwgiMvhXdha3UyLQ" --mixnode="enr:-Nq4QN7ub3xi53eDyKKstEM2IjFo7oY5Kf4glFz45W2saWqNXPqJFruw08c9B_EIu1LoW4opwXId_4zvPmekZwYHKp8DgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCP16GnwZtAPSMUUqmx6kDrHMdvRV2RjviYDnaF-e7rH4ptdWx0aWFkZHJzigAIBMCoRD4G6mWCcnOFAAIBAACJc2VjcDI1NmsxoQLJtl9kA98YgBkVElkJgl9XyyRNco78oShb1hsv6Mlbs4N0Y3CC6mWDdWRwgiMxhXdha3UyLQ"
|
||||
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
|
||||
|
||||
@ -94,7 +94,7 @@ suite "Waku v2 REST API - health":
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.nodeHealth == HealthStatus.READY
|
||||
response.data.protocolsHealth.len() == 14
|
||||
response.data.protocolsHealth.len() == 15
|
||||
response.data.protocolsHealth[0].protocol == "Relay"
|
||||
response.data.protocolsHealth[0].health == HealthStatus.NOT_READY
|
||||
response.data.protocolsHealth[0].desc == some("No connected peers")
|
||||
@ -114,19 +114,21 @@ suite "Waku v2 REST API - health":
|
||||
response.data.protocolsHealth[7].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[8].protocol == "Rendezvous"
|
||||
response.data.protocolsHealth[8].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[9].protocol == "Lightpush Client"
|
||||
response.data.protocolsHealth[9].health == HealthStatus.NOT_READY
|
||||
response.data.protocolsHealth[9].desc ==
|
||||
response.data.protocolsHealth[9].protocol == "Mix"
|
||||
response.data.protocolsHealth[9].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[10].protocol == "Lightpush Client"
|
||||
response.data.protocolsHealth[10].health == HealthStatus.NOT_READY
|
||||
response.data.protocolsHealth[10].desc ==
|
||||
some("No Lightpush service peer available yet")
|
||||
response.data.protocolsHealth[10].protocol == "Legacy Lightpush Client"
|
||||
response.data.protocolsHealth[10].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[11].protocol == "Store Client"
|
||||
response.data.protocolsHealth[11].protocol == "Legacy Lightpush Client"
|
||||
response.data.protocolsHealth[11].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[12].protocol == "Legacy Store Client"
|
||||
response.data.protocolsHealth[12].protocol == "Store Client"
|
||||
response.data.protocolsHealth[12].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[13].protocol == "Filter Client"
|
||||
response.data.protocolsHealth[13].health == HealthStatus.NOT_READY
|
||||
response.data.protocolsHealth[13].desc ==
|
||||
response.data.protocolsHealth[13].protocol == "Legacy Store Client"
|
||||
response.data.protocolsHealth[13].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[14].protocol == "Filter Client"
|
||||
response.data.protocolsHealth[14].health == HealthStatus.NOT_READY
|
||||
response.data.protocolsHealth[14].desc ==
|
||||
some("No Filter service peer available yet")
|
||||
|
||||
await restServer.stop()
|
||||
|
||||
@ -11,9 +11,11 @@ import
|
||||
confutils/std/net,
|
||||
confutils/toml/defs as confTomlDefs,
|
||||
confutils/toml/std/net as confTomlNet,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/secp,
|
||||
libp2p/multiaddress,
|
||||
libp2p/multicodec,
|
||||
nimcrypto/utils,
|
||||
secp256k1,
|
||||
json
|
||||
@ -26,6 +28,7 @@ import
|
||||
node/peer_manager,
|
||||
waku_core/topics/pubsub_topic,
|
||||
waku_core/message/default_values,
|
||||
waku_mix,
|
||||
],
|
||||
../../tools/rln_keystore_generator/rln_keystore_generator
|
||||
|
||||
@ -615,6 +618,12 @@ with the drawback of consuming some more bandwidth.""",
|
||||
name: "mixkey"
|
||||
.}: Option[string]
|
||||
|
||||
mixnodes* {.
|
||||
desc:
|
||||
"Multiaddress and mix-key of mix node to be statically specified in format multiaddr:mixPubKey. Argument may be repeated.",
|
||||
name: "mixnode"
|
||||
.}: seq[MixNodePubInfo]
|
||||
|
||||
## websocket config
|
||||
websocketSupport* {.
|
||||
desc: "Enable websocket: true|false",
|
||||
@ -694,6 +703,22 @@ proc isNumber(x: string): bool =
|
||||
except ValueError:
|
||||
result = false
|
||||
|
||||
proc parseCmdArg*(T: type MixNodePubInfo, p: string): T =
|
||||
let elements = p.split(":")
|
||||
if elements.len != 2:
|
||||
raise newException(
|
||||
ValueError, "Invalid format for mix node expected multiaddr:mixPublicKey"
|
||||
)
|
||||
let multiaddr = MultiAddress.init(elements[0]).valueOr:
|
||||
raise newException(ValueError, "Invalid multiaddress format")
|
||||
if not multiaddr.contains(multiCodec("ip4")).get():
|
||||
raise newException(
|
||||
ValueError, "Invalid format for ip address, expected a ipv4 multiaddress"
|
||||
)
|
||||
return MixNodePubInfo(
|
||||
multiaddr: elements[0], pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1]))
|
||||
)
|
||||
|
||||
proc parseCmdArg*(T: type ProtectedShard, p: string): T =
|
||||
let elements = p.split(":")
|
||||
if elements.len != 2:
|
||||
@ -778,6 +803,22 @@ proc readValue*(
|
||||
except CatchableError:
|
||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
||||
|
||||
proc readValue*(
|
||||
r: var TomlReader, value: var MixNodePubInfo
|
||||
) {.raises: [SerializationError].} =
|
||||
try:
|
||||
value = parseCmdArg(MixNodePubInfo, r.readValue(string))
|
||||
except CatchableError:
|
||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
||||
|
||||
proc readValue*(
|
||||
r: var EnvvarReader, value: var MixNodePubInfo
|
||||
) {.raises: [SerializationError].} =
|
||||
try:
|
||||
value = parseCmdArg(MixNodePubInfo, r.readValue(string))
|
||||
except CatchableError:
|
||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
||||
|
||||
proc readValue*(
|
||||
r: var TomlReader, value: var ProtectedShard
|
||||
) {.raises: [SerializationError].} =
|
||||
@ -972,6 +1013,7 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
|
||||
b.storeServiceConf.storeSyncConf.withRelayJitterSec(n.storeSyncRelayJitter)
|
||||
|
||||
b.mixConf.withEnabled(n.mix)
|
||||
b.mixConf.withMixNodes(n.mixnodes)
|
||||
b.withMix(n.mix)
|
||||
if n.mixkey.isSome():
|
||||
b.mixConf.withMixKey(n.mixkey.get())
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import chronicles, std/options, results
|
||||
import libp2p/crypto/crypto, libp2p/crypto/curve25519, mix/curve25519
|
||||
import ../waku_conf
|
||||
import ../waku_conf, waku/waku_mix
|
||||
|
||||
logScope:
|
||||
topics = "waku conf builder mix"
|
||||
@ -11,6 +11,7 @@ logScope:
|
||||
type MixConfBuilder* = object
|
||||
enabled: Option[bool]
|
||||
mixKey: Option[string]
|
||||
mixNodes: seq[MixNodePubInfo]
|
||||
|
||||
proc init*(T: type MixConfBuilder): MixConfBuilder =
|
||||
MixConfBuilder()
|
||||
@ -21,6 +22,9 @@ proc withEnabled*(b: var MixConfBuilder, enabled: bool) =
|
||||
proc withMixKey*(b: var MixConfBuilder, mixKey: string) =
|
||||
b.mixKey = some(mixKey)
|
||||
|
||||
proc withMixNodes*(b: var MixConfBuilder, mixNodes: seq[MixNodePubInfo]) =
|
||||
b.mixNodes = mixNodes
|
||||
|
||||
proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
|
||||
if not b.enabled.get(false):
|
||||
return ok(none[MixConf]())
|
||||
@ -28,8 +32,12 @@ proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
|
||||
if b.mixKey.isSome():
|
||||
let mixPrivKey = intoCurve25519Key(ncrutils.fromHex(b.mixKey.get()))
|
||||
let mixPubKey = public(mixPrivKey)
|
||||
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))
|
||||
return ok(
|
||||
some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes))
|
||||
)
|
||||
else:
|
||||
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
|
||||
return err("Generate key pair error: " & $error)
|
||||
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))
|
||||
return ok(
|
||||
some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes))
|
||||
)
|
||||
|
||||
@ -29,9 +29,6 @@ proc enrConfiguration*(
|
||||
).isOkOr:
|
||||
return err("could not initialize ENR with shards")
|
||||
|
||||
if conf.mixConf.isSome():
|
||||
enrBuilder.withMixKey(conf.mixConf.get().mixPubKey)
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
|
||||
@ -434,7 +434,11 @@ proc setupProtocols(
|
||||
|
||||
#mount mix
|
||||
if conf.mixConf.isSome():
|
||||
(await node.mountMix(conf.clusterId, conf.mixConf.get().mixKey)).isOkOr:
|
||||
(
|
||||
await node.mountMix(
|
||||
conf.clusterId, conf.mixConf.get().mixKey, conf.mixConf.get().mixnodes
|
||||
)
|
||||
).isOkOr:
|
||||
return err("failed to mount waku mix protocol: " & $error)
|
||||
return ok()
|
||||
|
||||
|
||||
@ -15,7 +15,8 @@ import
|
||||
../common/logging,
|
||||
../common/rate_limit/setting,
|
||||
../waku_enr/capabilities,
|
||||
./networks_config
|
||||
./networks_config,
|
||||
../waku_mix
|
||||
|
||||
export RlnRelayConf, RlnRelayCreds, RestServerConf, Discv5Conf, MetricsServerConf
|
||||
|
||||
@ -48,6 +49,7 @@ type StoreSyncConf* {.requiresInit.} = object
|
||||
type MixConf* = ref object
|
||||
mixKey*: Curve25519Key
|
||||
mixPubKey*: Curve25519Key
|
||||
mixnodes*: seq[MixNodePubInfo]
|
||||
|
||||
type StoreServiceConf* {.requiresInit.} = object
|
||||
dbMigration*: bool
|
||||
|
||||
@ -233,6 +233,15 @@ proc getRendezvousHealth(hm: NodeHealthMonitor): ProtocolHealth =
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getMixHealth(hm: NodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Mix")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuMix.isNil():
|
||||
return p.notMounted()
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc selectRandomPeersForKeepalive(
|
||||
node: WakuNode, outPeers: seq[PeerId], numRandomPeers: int
|
||||
): Future[seq[PeerId]] {.async.} =
|
||||
@ -387,6 +396,7 @@ proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.}
|
||||
report.protocolsHealth.add(hm.getLegacyStoreHealth())
|
||||
report.protocolsHealth.add(hm.getPeerExchangeHealth())
|
||||
report.protocolsHealth.add(hm.getRendezvousHealth())
|
||||
report.protocolsHealth.add(hm.getMixHealth())
|
||||
|
||||
report.protocolsHealth.add(hm.getLightpushClientHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getLegacyLightpushClientHealth(relayHealth.health))
|
||||
|
||||
@ -97,6 +97,7 @@ type
|
||||
WakuInfo* = object # NOTE One for simplicity, can extend later as needed
|
||||
listenAddresses*: seq[string]
|
||||
enrUri*: string #multiaddrStrings*: seq[string]
|
||||
mixPubKey*: Option[string]
|
||||
|
||||
# NOTE based on Eth2Node in NBC eth2_network.nim
|
||||
WakuNode* = ref object
|
||||
@ -201,7 +202,11 @@ proc info*(node: WakuNode): WakuInfo =
|
||||
var fulladdr = $address & "/p2p/" & $peerInfo.peerId
|
||||
listenStr &= fulladdr
|
||||
let enrUri = node.enr.toUri()
|
||||
let wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri)
|
||||
var wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri)
|
||||
if not node.wakuMix.isNil():
|
||||
let keyStr = node.wakuMix.pubKey.to0xHex()
|
||||
wakuInfo.mixPubKey = some(keyStr)
|
||||
info "node info", wakuInfo
|
||||
return wakuInfo
|
||||
|
||||
proc connectToNodes*(
|
||||
@ -245,7 +250,10 @@ proc getMixNodePoolSize*(node: WakuNode): int =
|
||||
return node.wakuMix.getNodePoolSize()
|
||||
|
||||
proc mountMix*(
|
||||
node: WakuNode, clusterId: uint16, mixPrivKey: Curve25519Key
|
||||
node: WakuNode,
|
||||
clusterId: uint16,
|
||||
mixPrivKey: Curve25519Key,
|
||||
mixnodes: seq[MixNodePubInfo],
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
info "mounting mix protocol", nodeId = node.info #TODO log the config used
|
||||
|
||||
@ -257,8 +265,9 @@ proc mountMix*(
|
||||
info "local addr", localaddr = localaddrStr
|
||||
|
||||
let nodeAddr = localaddrStr & "/p2p/" & $node.peerId
|
||||
# TODO: Pass bootnodes from config,
|
||||
node.wakuMix = WakuMix.new(nodeAddr, node.peerManager, clusterId, mixPrivKey).valueOr:
|
||||
node.wakuMix = WakuMix.new(
|
||||
nodeAddr, node.peerManager, clusterId, mixPrivKey, mixnodes
|
||||
).valueOr:
|
||||
error "Waku Mix protocol initialization failed", err = error
|
||||
return
|
||||
node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1)))
|
||||
|
||||
@ -9,12 +9,15 @@ import std/typetraits
|
||||
type DebugWakuInfo* = object
|
||||
listenAddresses*: seq[string]
|
||||
enrUri*: Option[string]
|
||||
mixPubKey*: Option[string]
|
||||
|
||||
#### Type conversion
|
||||
|
||||
proc toDebugWakuInfo*(nodeInfo: WakuInfo): DebugWakuInfo =
|
||||
DebugWakuInfo(
|
||||
listenAddresses: nodeInfo.listenAddresses, enrUri: some(nodeInfo.enrUri)
|
||||
listenAddresses: nodeInfo.listenAddresses,
|
||||
enrUri: some(nodeInfo.enrUri),
|
||||
mixPubKey: nodeInfo.mixPubKey,
|
||||
)
|
||||
|
||||
#### Serialization and deserialization
|
||||
@ -26,6 +29,8 @@ proc writeValue*(
|
||||
writer.writeField("listenAddresses", value.listenAddresses)
|
||||
if value.enrUri.isSome():
|
||||
writer.writeField("enrUri", value.enrUri.get())
|
||||
if value.mixPubKey.isSome():
|
||||
writer.writeField("mixPubKey", value.mixPubKey.get())
|
||||
writer.endRecord()
|
||||
|
||||
proc readValue*(
|
||||
@ -47,10 +52,18 @@ proc readValue*(
|
||||
if enrUri.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `enrUri` fields found", "DebugWakuInfo")
|
||||
enrUri = some(reader.readValue(string))
|
||||
of "mixPubKey":
|
||||
if value.mixPubKey.isSome():
|
||||
reader.raiseUnexpectedField(
|
||||
"Multiple `mixPubKey` fields found", "DebugWakuInfo"
|
||||
)
|
||||
value.mixPubKey = some(reader.readValue(string))
|
||||
else:
|
||||
unrecognizedFieldWarning(value)
|
||||
|
||||
if listenAddresses.isNone():
|
||||
reader.raiseUnexpectedValue("Field `listenAddresses` is missing")
|
||||
|
||||
value = DebugWakuInfo(listenAddresses: listenAddresses.get, enrUri: enrUri)
|
||||
value = DebugWakuInfo(
|
||||
listenAddresses: listenAddresses.get, enrUri: enrUri, mixPubKey: value.mixPubKey
|
||||
)
|
||||
|
||||
@ -1,8 +1,3 @@
|
||||
import
|
||||
./common/enr,
|
||||
./waku_enr/capabilities,
|
||||
./waku_enr/multiaddr,
|
||||
./waku_enr/sharding,
|
||||
./waku_enr/mix
|
||||
import ./common/enr, ./waku_enr/capabilities, ./waku_enr/multiaddr, ./waku_enr/sharding
|
||||
|
||||
export enr, capabilities, multiaddr, sharding, mix
|
||||
export enr, capabilities, multiaddr, sharding
|
||||
|
||||
@ -1,20 +0,0 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options], results, libp2p/crypto/curve25519, nimcrypto/utils as ncrutils
|
||||
|
||||
import ../common/enr
|
||||
|
||||
const MixKeyEnrField* = "mix-key"
|
||||
|
||||
func withMixKey*(builder: var EnrBuilder, mixPubKey: Curve25519Key) =
|
||||
builder.addFieldPair(MixKeyEnrField, getBytes(mixPubKey))
|
||||
|
||||
func mixKey*(record: Record): Option[seq[byte]] =
|
||||
let recordRes = record.toTyped()
|
||||
if recordRes.isErr():
|
||||
return none(seq[byte])
|
||||
|
||||
let field = recordRes.value.tryGet(MixKeyEnrField, seq[byte])
|
||||
if field.isNone():
|
||||
return none(seq[byte])
|
||||
return field
|
||||
@ -14,7 +14,6 @@ import
|
||||
import
|
||||
../node/peer_manager,
|
||||
../waku_core,
|
||||
../waku_enr/mix,
|
||||
../waku_enr,
|
||||
../node/peer_manager/waku_peer_store,
|
||||
../common/nimchronos
|
||||
@ -22,14 +21,21 @@ import
|
||||
logScope:
|
||||
topics = "waku mix"
|
||||
|
||||
const mixMixPoolSize = 3
|
||||
|
||||
type
|
||||
WakuMix* = ref object of MixProtocol
|
||||
peerManager*: PeerManager
|
||||
clusterId: uint16
|
||||
nodePoolLoopHandle: Future[void]
|
||||
pubKey*: Curve25519Key
|
||||
|
||||
WakuMixResult*[T] = Result[T, string]
|
||||
|
||||
MixNodePubInfo* = object
|
||||
multiAddr*: string
|
||||
pubKey*: Curve25519Key
|
||||
|
||||
proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
||||
# Note that origin based(discv5) filtering is not done intentionally
|
||||
# so that more mix nodes can be discovered.
|
||||
@ -70,7 +76,8 @@ func getIPv4Multiaddr*(maddrs: seq[MultiAddress]): Option[MultiAddress] =
|
||||
trace "no ipv4 multiaddr found"
|
||||
return none(MultiAddress)
|
||||
|
||||
proc populateMixNodePool*(mix: WakuMix) =
|
||||
#[ Not deleting as these can be reused once discovery is sorted
|
||||
proc populateMixNodePool*(mix: WakuMix) =
|
||||
# populate only peers that i) are reachable ii) share cluster iii) support mix
|
||||
let remotePeers = mix.peerManager.switch.peerStore.peers().filterIt(
|
||||
mixPoolFilter(some(mix.clusterId), it)
|
||||
@ -110,35 +117,17 @@ proc startMixNodePoolMgr*(mix: WakuMix) {.async.} =
|
||||
# TODO: make interval configurable
|
||||
heartbeat "Updating mix node pool", 5.seconds:
|
||||
mix.populateMixNodePool()
|
||||
|
||||
#[ proc getBootStrapMixNodes*(node: WakuNode): Table[PeerId, MixPubInfo] =
|
||||
]#
|
||||
proc toMixNodeTable(bootnodes: seq[MixNodePubInfo]): Table[PeerId, MixPubInfo] =
|
||||
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||
# MixNode Multiaddrs and PublicKeys:
|
||||
let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
|
||||
"/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF",
|
||||
"/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA",
|
||||
"/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f",
|
||||
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu",
|
||||
]
|
||||
let bootNodesMixPubKeys = ["9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c",
|
||||
"9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a",
|
||||
"275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c",
|
||||
"e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18",
|
||||
"8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
|
||||
]
|
||||
for index, mixNodeMultiaddr in bootNodesMultiaddrs:
|
||||
let peerIdRes = getPeerIdFromMultiAddr(mixNodeMultiaddr)
|
||||
if peerIdRes.isErr:
|
||||
error "Failed to get peer id from multiaddress: " , error = peerIdRes.error
|
||||
let peerId = peerIdRes.get()
|
||||
#if (not peerID == nil) and peerID == exceptPeerID:
|
||||
# continue
|
||||
let mixNodePubInfo = createMixPubInfo(mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index])))
|
||||
|
||||
mixNodes[peerId] = mixNodePubInfo
|
||||
for node in bootnodes:
|
||||
let peerId = getPeerIdFromMultiAddr(node.multiAddr).valueOr:
|
||||
error "Failed to get peer id from multiaddress: ",
|
||||
error = error, multiAddr = $node.multiAddr
|
||||
continue
|
||||
mixNodes[peerId] = createMixPubInfo(node.multiAddr, node.pubKey)
|
||||
info "using mix bootstrap nodes ", bootNodes = mixNodes
|
||||
return mixNodes
|
||||
]#
|
||||
|
||||
proc new*(
|
||||
T: type WakuMix,
|
||||
@ -146,6 +135,7 @@ proc new*(
|
||||
peermgr: PeerManager,
|
||||
clusterId: uint16,
|
||||
mixPrivKey: Curve25519Key,
|
||||
bootnodes: seq[MixNodePubInfo],
|
||||
): WakuMixResult[T] =
|
||||
let mixPubKey = public(mixPrivKey)
|
||||
info "mixPrivKey", mixPrivKey = mixPrivKey, mixPubKey = mixPubKey
|
||||
@ -154,16 +144,18 @@ proc new*(
|
||||
nodeAddr, mixPubKey, mixPrivKey, peermgr.switch.peerInfo.publicKey.skkey,
|
||||
peermgr.switch.peerInfo.privateKey.skkey,
|
||||
)
|
||||
|
||||
# TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered
|
||||
var m = WakuMix(peerManager: peermgr, clusterId: clusterId)
|
||||
procCall MixProtocol(m).init(
|
||||
localMixNodeInfo, initTable[PeerId, MixPubInfo](), peermgr.switch
|
||||
)
|
||||
if bootnodes.len < mixMixPoolSize:
|
||||
warn "publishing with mix won't work as there are less than 3 mix nodes in node pool"
|
||||
let initTable = toMixNodeTable(bootnodes)
|
||||
if len(initTable) < mixMixPoolSize:
|
||||
warn "publishing with mix won't work as there are less than 3 mix nodes in node pool"
|
||||
var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey)
|
||||
procCall MixProtocol(m).init(localMixNodeInfo, initTable, peermgr.switch)
|
||||
return ok(m)
|
||||
|
||||
method start*(mix: WakuMix) =
|
||||
mix.nodePoolLoopHandle = mix.startMixNodePoolMgr()
|
||||
info "starting waku mix protocol"
|
||||
#mix.nodePoolLoopHandle = mix.startMixNodePoolMgr() This can be re-enabled once discovery is addressed
|
||||
|
||||
method stop*(mix: WakuMix) {.async.} =
|
||||
if mix.nodePoolLoopHandle.isNil():
|
||||
@ -171,7 +163,4 @@ method stop*(mix: WakuMix) {.async.} =
|
||||
await mix.nodePoolLoopHandle.cancelAndWait()
|
||||
mix.nodePoolLoopHandle = nil
|
||||
|
||||
#[ proc setMixBootStrapNodes*(node: WakuNode,){.async}=
|
||||
node.mix.setNodePool(node.getBootStrapMixNodes())
|
||||
]#
|
||||
# Mix Protocol
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user