mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-02-19 21:33:10 +00:00
feat: poc to integrate mix into waku and use lightpush to demonstrate
This commit is contained in:
parent
cc30666016
commit
ec2e9b4ed2
4
.gitmodules
vendored
4
.gitmodules
vendored
@ -184,3 +184,7 @@
|
||||
url = https://github.com/waku-org/waku-rlnv2-contract.git
|
||||
ignore = untracked
|
||||
branch = master
|
||||
[submodule "vendor/mix"]
|
||||
path = vendor/mix
|
||||
url = https://github.com/vacp2p/mix/
|
||||
branch = mix-waku-integ
|
||||
|
||||
58
Dockerfile.lightpushWithMix.compile
Normal file
58
Dockerfile.lightpushWithMix.compile
Normal file
@ -0,0 +1,58 @@
|
||||
# BUILD NIM APP ----------------------------------------------------------------
|
||||
FROM rust:1.81.0-alpine3.19 AS nim-build
|
||||
|
||||
ARG NIMFLAGS
|
||||
ARG MAKE_TARGET=lightpushwithmix
|
||||
ARG NIM_COMMIT
|
||||
ARG LOG_LEVEL=TRACE
|
||||
|
||||
# Get build tools and required header files
|
||||
RUN apk add --no-cache bash git build-base openssl-dev pcre-dev linux-headers curl jq
|
||||
|
||||
WORKDIR /app
|
||||
COPY . .
|
||||
|
||||
# workaround for alpine issue: https://github.com/alpinelinux/docker-alpine/issues/383
|
||||
RUN apk update && apk upgrade
|
||||
|
||||
# Ran separately from 'make' to avoid re-doing
|
||||
RUN git submodule update --init --recursive
|
||||
|
||||
# Slowest build step for the sake of caching layers
|
||||
RUN make -j$(nproc) deps QUICK_AND_DIRTY_COMPILER=1 ${NIM_COMMIT}
|
||||
|
||||
# Build the final node binary
|
||||
RUN make -j$(nproc) ${NIM_COMMIT} $MAKE_TARGET LOG_LEVEL=${LOG_LEVEL} NIMFLAGS="${NIMFLAGS}"
|
||||
|
||||
|
||||
# REFERENCE IMAGE as BASE for specialized PRODUCTION IMAGES----------------------------------------
|
||||
FROM alpine:3.18 AS base_lpt
|
||||
|
||||
ARG MAKE_TARGET=lightpushwithmix
|
||||
|
||||
LABEL maintainer="prem@waku.org"
|
||||
LABEL source="https://github.com/waku-org/nwaku"
|
||||
LABEL description="Lite Push With Mix: Waku light-client"
|
||||
LABEL commit="unknown"
|
||||
LABEL version="unknown"
|
||||
|
||||
# DevP2P, LibP2P, and JSON RPC ports
|
||||
EXPOSE 30303 60000 8545
|
||||
|
||||
# Referenced in the binary
|
||||
RUN apk add --no-cache libgcc pcre-dev libpq-dev \
|
||||
wget \
|
||||
iproute2 \
|
||||
python3 \
|
||||
jq
|
||||
|
||||
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
|
||||
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3
|
||||
|
||||
COPY --from=nim-build /app/build/lightpush_publisher_mix /usr/bin/
|
||||
RUN chmod +x /usr/bin/lightpush_publisher_mix
|
||||
|
||||
# Standalone image to be used manually and in lpt-runner -------------------------------------------
|
||||
FROM base_lpt AS standalone_lpt
|
||||
|
||||
ENTRYPOINT ["/usr/bin/lightpush_publisher_mix"]
|
||||
4
Makefile
4
Makefile
@ -240,6 +240,10 @@ liteprotocoltester: | build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim liteprotocoltester $(NIM_PARAMS) waku.nims
|
||||
|
||||
lightpushwithmix: | build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim lightpushwithmix $(NIM_PARAMS) waku.nims
|
||||
|
||||
build/%: | build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$*" && \
|
||||
$(ENV_SCRIPT) nim buildone $(NIM_PARAMS) waku.nims $*
|
||||
|
||||
189
examples/lightpush_mix/lightpush_publisher_mix.nim
Normal file
189
examples/lightpush_mix/lightpush_publisher_mix.nim
Normal file
@ -0,0 +1,189 @@
|
||||
import
|
||||
std/[tables, times, sequtils, strutils],
|
||||
stew/byteutils,
|
||||
chronicles,
|
||||
results,
|
||||
chronos,
|
||||
confutils,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/multiaddress,
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
metrics,
|
||||
metrics/chronos_httpserver
|
||||
|
||||
import mix/entry_connection, mix/protocol, mix/curve25519
|
||||
|
||||
import
|
||||
waku/[
|
||||
common/logging,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
waku_core/codecs,
|
||||
waku_node,
|
||||
waku_enr,
|
||||
discovery/waku_discv5,
|
||||
factory/builder,
|
||||
waku_lightpush/client,
|
||||
],
|
||||
./lightpush_publisher_mix_config,
|
||||
./lightpush_publisher_mix_metrics
|
||||
|
||||
const clusterId = 66
|
||||
const shardId = @[0'u16]
|
||||
|
||||
const
|
||||
LightpushPubsubTopic = PubsubTopic("/waku/2/rs/66/0")
|
||||
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-mix-example/proto")
|
||||
|
||||
proc splitPeerIdAndAddr(maddr: string): (string, string) =
|
||||
let parts = maddr.split("/p2p/")
|
||||
if parts.len != 2:
|
||||
error "Invalid multiaddress format", parts = parts
|
||||
return
|
||||
|
||||
let
|
||||
address = parts[0]
|
||||
peerId = parts[1]
|
||||
return (address, peerId)
|
||||
|
||||
proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.} =
|
||||
# use notice to filter all waku messaging
|
||||
setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
|
||||
|
||||
notice "starting publisher", wakuPort = conf.port
|
||||
|
||||
let
|
||||
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get()
|
||||
ip = parseIpAddress("0.0.0.0")
|
||||
flags = CapabilitiesBitfield.init(relay = true)
|
||||
|
||||
let relayShards = RelayShards.init(clusterId, shardId).valueOr:
|
||||
error "Relay shards initialization failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
var enrBuilder = EnrBuilder.init(nodeKey)
|
||||
enrBuilder.withWakuRelaySharding(relayShards).expect(
|
||||
"Building ENR with relay sharding failed"
|
||||
)
|
||||
|
||||
let record = enrBuilder.build().valueOr:
|
||||
error "failed to create enr record", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
setLogLevel(logging.LogLevel.TRACE)
|
||||
var builder = WakuNodeBuilder.init()
|
||||
builder.withNodeKey(nodeKey)
|
||||
builder.withRecord(record)
|
||||
builder.withNetworkConfigurationDetails(ip, Port(conf.port)).tryGet()
|
||||
|
||||
let node = builder.build().tryGet()
|
||||
|
||||
node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
|
||||
node.mountLightPushClient()
|
||||
try:
|
||||
await node.mountPeerExchange(some(uint16(clusterId)))
|
||||
except CatchableError:
|
||||
error "failed to mount waku peer-exchange protocol",
|
||||
error = getCurrentExceptionMsg()
|
||||
return
|
||||
|
||||
let (destPeerAddr, destPeerId) = splitPeerIdAndAddr(conf.destPeerAddr)
|
||||
let (pxPeerAddr, pxPeerId) = splitPeerIdAndAddr(conf.pxAddr)
|
||||
info "dest peer address", destPeerAddr = destPeerAddr, destPeerId = destPeerId
|
||||
info "peer exchange address", pxPeerAddr = pxPeerAddr, pxPeerId = pxPeerId
|
||||
let pxPeerInfo =
|
||||
RemotePeerInfo.init(destPeerId, @[MultiAddress.init(destPeerAddr).get()])
|
||||
node.peerManager.addServicePeer(pxPeerInfo, WakuPeerExchangeCodec)
|
||||
|
||||
let pxPeerInfo1 =
|
||||
RemotePeerInfo.init(pxPeerId, @[MultiAddress.init(pxPeerAddr).get()])
|
||||
node.peerManager.addServicePeer(pxPeerInfo1, WakuPeerExchangeCodec)
|
||||
|
||||
if not conf.mixDisabled:
|
||||
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
|
||||
error "failed to generate mix key pair", error = error
|
||||
return
|
||||
(await node.mountMix(clusterId, mixPrivKey)).isOkOr:
|
||||
error "failed to mount waku mix protocol: ", error = $error
|
||||
return
|
||||
|
||||
let dPeerId = PeerId.init(destPeerId).valueOr:
|
||||
error "Failed to initialize PeerId", err = error
|
||||
return
|
||||
var conn: Connection
|
||||
if not conf.mixDisabled:
|
||||
conn = MixEntryConnection.newConn(
|
||||
destPeerAddr, dPeerId, ProtocolType.fromString(WakuLightPushCodec), node.mix
|
||||
)
|
||||
|
||||
await node.start()
|
||||
node.peerManager.start()
|
||||
node.startPeerExchangeLoop()
|
||||
try:
|
||||
startMetricsHttpServer("0.0.0.0", Port(8008))
|
||||
except Exception:
|
||||
error "failed to start metrics server: ", error = getCurrentExceptionMsg()
|
||||
(await node.fetchPeerExchangePeers()).isOkOr:
|
||||
warn "Cannot fetch peers from peer exchange", cause = error
|
||||
|
||||
if not conf.mixDisabled:
|
||||
while node.getMixNodePoolSize() < conf.minMixPoolSize:
|
||||
info "waiting for mix nodes to be discovered",
|
||||
currentpoolSize = node.getMixNodePoolSize()
|
||||
await sleepAsync(1000)
|
||||
notice "publisher service started with mix node pool size ",
|
||||
currentpoolSize = node.getMixNodePoolSize()
|
||||
|
||||
var i = 0
|
||||
while i < conf.numMsgs:
|
||||
if conf.mixDisabled:
|
||||
let connOpt = await node.peerManager.dialPeer(dPeerId, WakuLightPushCodec)
|
||||
if connOpt.isNone():
|
||||
error "failed to dial peer with WakuLightPushCodec", target_peer_id = dPeerId
|
||||
return
|
||||
conn = connOpt.get()
|
||||
i = i + 1
|
||||
let text =
|
||||
"""Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nullam venenatis magna ut tortor faucibus, in vestibulum nibh commodo. Aenean eget vestibulum augue. Nullam suscipit urna non nunc efficitur, at iaculis nisl consequat. Mauris quis ultrices elit. Suspendisse lobortis odio vitae laoreet facilisis. Cras ornare sem felis, at vulputate magna aliquam ac. Duis quis est ultricies, euismod nulla ac, interdum dui. Maecenas sit amet est vitae enim commodo gravida. Proin vitae elit nulla. Donec tempor dolor lectus, in faucibus velit elementum quis. Donec non mauris eu nibh faucibus cursus ut egestas dolor. Aliquam venenatis ligula id velit pulvinar malesuada. Vestibulum scelerisque, justo non porta gravida, nulla justo tempor purus, at sollicitudin erat erat vel libero.
|
||||
Fusce nec eros eu metus tristique aliquet. Sed ut magna sagittis, vulputate diam sit amet, aliquam magna. Aenean sollicitudin velit lacus, eu ultrices magna semper at. Integer vitae felis ligula. In a eros nec risus condimentum tincidunt fermentum sit amet ex. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Nullam vitae justo maximus, fringilla tellus nec, rutrum purus. Etiam efficitur nisi dapibus euismod vestibulum. Phasellus at felis elementum, tristique nulla ac, consectetur neque.
|
||||
Maecenas hendrerit nibh eget velit rutrum, in ornare mauris molestie. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Praesent dignissim efficitur eros, sit amet rutrum justo mattis a. Fusce mollis neque at erat placerat bibendum. Ut fringilla fringilla orci, ut fringilla metus fermentum vel. In hac habitasse platea dictumst. Donec hendrerit porttitor odio. Suspendisse ornare sollicitudin mauris, sodales pulvinar velit finibus vel. Fusce id pulvinar neque. Suspendisse eget tincidunt sapien, ac accumsan turpis.
|
||||
Curabitur cursus tincidunt leo at aliquet. Nunc dapibus quam id venenatis varius. Aenean eget augue vel velit dapibus aliquam. Nulla facilisi. Curabitur cursus, turpis vel congue volutpat, tellus eros cursus lacus, eu fringilla turpis orci non ipsum. In hac habitasse platea dictumst. Nulla aliquam nisl a nunc placerat, eget dignissim felis pulvinar. Fusce sed porta mauris. Donec sodales arcu in nisl sodales, quis posuere massa ultricies. Nam feugiat massa eget felis ultricies finibus. Nunc magna nulla, interdum a elit vel, egestas efficitur urna. Ut posuere tincidunt odio in maximus. Sed at dignissim est.
|
||||
Morbi accumsan elementum ligula ut fringilla. Praesent in ex metus. Phasellus urna est, tempus sit amet elementum vitae, sollicitudin vel ipsum. Fusce hendrerit eleifend dignissim. Maecenas tempor dapibus dui quis laoreet. Cras tincidunt sed ipsum sed pellentesque. Proin ut tellus nec ipsum varius interdum. Curabitur id velit ligula. Etiam sapien nulla, cursus sodales orci eu, porta lobortis nunc. Nunc at dapibus velit. Nulla et nunc vehicula, condimentum erat quis, elementum dolor. Quisque eu metus fermentum, vestibulum tellus at, sollicitudin odio. Ut vel neque justo.
|
||||
Praesent porta porta velit, vel porttitor sem. Donec sagittis at nulla venenatis iaculis. Nullam vel eleifend felis. Nullam a pellentesque lectus. Aliquam tincidunt semper dui sed bibendum. Donec hendrerit, urna et cursus dictum, neque neque convallis magna, id condimentum sem urna quis massa. Fusce non quam vulputate, fermentum mauris at, malesuada ipsum. Mauris id pellentesque libero. Donec vel erat ullamcorper, dapibus quam id, imperdiet urna. Praesent sed ligula ut est pellentesque pharetra quis et diam. Ut placerat lorem eget mi fermentum aliquet.
|
||||
This is message #""" &
|
||||
$i & """ sent from a publisher using mix. End of transmission."""
|
||||
let message = WakuMessage(
|
||||
payload: toBytes(text), # content of the message
|
||||
contentTopic: LightpushContentTopic, # content topic to publish to
|
||||
ephemeral: true, # tell store nodes to not store it
|
||||
timestamp: getNowInNanosecondTime(),
|
||||
) # current timestamp
|
||||
|
||||
let res = await node.wakuLightpushClient.publishWithConn(
|
||||
LightpushPubsubTopic, message, conn, dPeerId
|
||||
)
|
||||
|
||||
if res.isOk():
|
||||
lp_mix_success.inc()
|
||||
notice "published message",
|
||||
text = text,
|
||||
timestamp = message.timestamp,
|
||||
psTopic = LightpushPubsubTopic,
|
||||
contentTopic = LightpushContentTopic
|
||||
else:
|
||||
error "failed to publish message", error = res.error
|
||||
lp_mix_failed.inc(labelValues = ["publish_error"])
|
||||
|
||||
if conf.mixDisabled:
|
||||
await conn.close()
|
||||
await sleepAsync(conf.msgIntervalMilliseconds)
|
||||
info "###########Sent all messages via mix"
|
||||
quit(0)
|
||||
|
||||
when isMainModule:
|
||||
let conf = LightPushMixConf.load()
|
||||
let rng = crypto.newRng()
|
||||
asyncSpawn setupAndPublish(rng, conf)
|
||||
runForever()
|
||||
28
examples/lightpush_mix/lightpush_publisher_mix_config.nim
Normal file
28
examples/lightpush_mix/lightpush_publisher_mix_config.nim
Normal file
@ -0,0 +1,28 @@
|
||||
import confutils/defs
|
||||
|
||||
type LightPushMixConf* = object
|
||||
destPeerAddr* {.desc: "Destination peer address with peerId.", name: "dp-addr".}:
|
||||
string
|
||||
|
||||
pxAddr* {.desc: "Peer exchange address with peerId.", name: "px-addr".}: string
|
||||
|
||||
port* {.desc: "Port to listen on.", defaultValue: 50000, name: "port".}: int
|
||||
|
||||
numMsgs* {.desc: "Number of messages to send.", defaultValue: 1, name: "num-msgs".}:
|
||||
int
|
||||
|
||||
msgIntervalMilliseconds* {.
|
||||
desc: "Interval between messages in milliseconds.",
|
||||
defaultValue: 1000,
|
||||
name: "msg-interval"
|
||||
.}: int
|
||||
|
||||
minMixPoolSize* {.
|
||||
desc: "Number of messages to wait for before sending.",
|
||||
defaultValue: 3,
|
||||
name: "min-mix-pool-size"
|
||||
.}: int
|
||||
|
||||
mixDisabled* {.
|
||||
desc: "Do not use mix for publishing.", defaultValue: false, name: "without-mix"
|
||||
.}: bool
|
||||
@ -0,0 +1,8 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import metrics
|
||||
|
||||
declarePublicCounter lp_mix_success, "number of lightpush messages sent via mix"
|
||||
|
||||
declarePublicCounter lp_mix_failed,
|
||||
"number of lightpush messages failed via mix", labels = ["error"]
|
||||
1
vendor/mix
vendored
Submodule
1
vendor/mix
vendored
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit 78cca77195d5bbd257cdd3a74c2dd79e2bafe7cf
|
||||
@ -157,6 +157,10 @@ task liteprotocoltester, "Build liteprotocoltester":
|
||||
let name = "liteprotocoltester"
|
||||
buildBinary name, "apps/liteprotocoltester/"
|
||||
|
||||
task lightpushwithmix, "Build lightpushwithmix":
|
||||
let name = "lightpush_publisher_mix"
|
||||
buildBinary name, "examples/lightpush_mix/"
|
||||
|
||||
task buildone, "Build custom target":
|
||||
let filepath = paramStr(paramCount())
|
||||
discard buildModule filepath
|
||||
|
||||
@ -8,10 +8,11 @@ import
|
||||
./discv5_conf_builder,
|
||||
./web_socket_conf_builder,
|
||||
./metrics_server_conf_builder,
|
||||
./rln_relay_conf_builder
|
||||
./rln_relay_conf_builder,
|
||||
./mix_conf_builder
|
||||
|
||||
export
|
||||
waku_conf_builder, filter_service_conf_builder, store_sync_conf_builder,
|
||||
store_service_conf_builder, rest_server_conf_builder, dns_discovery_conf_builder,
|
||||
discv5_conf_builder, web_socket_conf_builder, metrics_server_conf_builder,
|
||||
rln_relay_conf_builder
|
||||
rln_relay_conf_builder, mix_conf_builder
|
||||
|
||||
38
waku/factory/conf_builder/mix_conf_builder.nim
Normal file
38
waku/factory/conf_builder/mix_conf_builder.nim
Normal file
@ -0,0 +1,38 @@
|
||||
import chronicles, std/options, results
|
||||
import libp2p/crypto/crypto, libp2p/crypto/curve25519, mix/curve25519
|
||||
import ../waku_conf
|
||||
|
||||
logScope:
|
||||
topics = "waku conf builder mix"
|
||||
|
||||
##################################
|
||||
## Mix Config Builder ##
|
||||
##################################
|
||||
type MixConfBuilder* = object
|
||||
enabled*: Option[bool]
|
||||
mixKey*: Option[string]
|
||||
|
||||
proc init*(T: type MixConfBuilder): MixConfBuilder =
|
||||
MixConfBuilder()
|
||||
|
||||
proc withEnabled*(b: var MixConfBuilder, enabled: bool) =
|
||||
b.enabled = some(enabled)
|
||||
|
||||
proc withMixKey*(b: var MixConfBuilder, mixKey: string) =
|
||||
b.mixKey = some(mixKey)
|
||||
|
||||
proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
|
||||
if not b.enabled.get(false):
|
||||
return ok(none[MixConf]())
|
||||
else:
|
||||
if b.mixKey.isSome():
|
||||
let mixPrivKey = intoCurve25519Key(ncrutils.fromHex(b.mixKey.get()))
|
||||
let mixPubKey = public(mixPrivKey)
|
||||
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))
|
||||
else:
|
||||
# Generate a new key pair if not provided
|
||||
let keyPairResult = generateKeyPair()
|
||||
if keyPairResult.isErr:
|
||||
return err("Generate key pair error: " & $keyPairResult.error)
|
||||
let (mixPrivKey, mixPubKey) = keyPairResult.get()
|
||||
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))
|
||||
@ -23,7 +23,8 @@ import
|
||||
./discv5_conf_builder,
|
||||
./web_socket_conf_builder,
|
||||
./metrics_server_conf_builder,
|
||||
./rln_relay_conf_builder
|
||||
./rln_relay_conf_builder,
|
||||
./mix_conf_builder
|
||||
|
||||
logScope:
|
||||
topics = "waku conf builder"
|
||||
@ -72,6 +73,7 @@ type WakuConfBuilder* = object
|
||||
restServerConf*: RestServerConfBuilder
|
||||
rlnRelayConf*: RlnRelayConfBuilder
|
||||
storeServiceConf*: StoreServiceConfBuilder
|
||||
mixConf*: MixConfBuilder
|
||||
webSocketConf*: WebSocketConfBuilder
|
||||
# End conf builders
|
||||
relay: Option[bool]
|
||||
@ -79,6 +81,7 @@ type WakuConfBuilder* = object
|
||||
peerExchange: Option[bool]
|
||||
storeSync: Option[bool]
|
||||
relayPeerExchange: Option[bool]
|
||||
mix: Option[bool]
|
||||
|
||||
# TODO: move within a relayConf
|
||||
rendezvous: Option[bool]
|
||||
@ -269,6 +272,9 @@ proc withMaxMessageSize*(builder: var WakuConfBuilder, maxMessageSize: string) =
|
||||
proc withStaticNodes*(builder: var WakuConfBuilder, staticNodes: seq[string]) =
|
||||
builder.staticNodes = concat(builder.staticNodes, staticNodes)
|
||||
|
||||
proc withMix*(builder: var WakuConfBuilder, mix: bool) =
|
||||
builder.mix = some(mix)
|
||||
|
||||
proc nodeKey(
|
||||
builder: WakuConfBuilder, rng: ref HmacDrbgContext
|
||||
): Result[crypto.PrivateKey, string] =
|
||||
@ -398,6 +404,13 @@ proc build*(
|
||||
warn "whether to mount rendezvous is not specified, defaulting to not mounting"
|
||||
false
|
||||
|
||||
let mix =
|
||||
if builder.mix.isSome():
|
||||
builder.mix.get()
|
||||
else:
|
||||
warn "whether to mount mix is not specified, defaulting to not mounting"
|
||||
false
|
||||
|
||||
let relayPeerExchange = builder.relayPeerExchange.get(false)
|
||||
|
||||
let nodeKey = ?nodeKey(builder, rng)
|
||||
@ -463,6 +476,9 @@ proc build*(
|
||||
let storeServiceConf = builder.storeServiceConf.build().valueOr:
|
||||
return err("Store Conf building failed: " & $error)
|
||||
|
||||
let mixConf = builder.mixConf.build().valueOr:
|
||||
return err("Mix Conf building failed: " & $error)
|
||||
|
||||
let webSocketConf = builder.webSocketConf.build().valueOr:
|
||||
return err("WebSocket Conf building failed: " & $error)
|
||||
# End - Build sub-configs
|
||||
@ -570,6 +586,7 @@ proc build*(
|
||||
store = storeServiceConf.isSome,
|
||||
relay = relay,
|
||||
sync = storeServiceConf.isSome() and storeServiceConf.get().storeSyncConf.isSome,
|
||||
mix = mix,
|
||||
)
|
||||
|
||||
let wakuConf = WakuConf(
|
||||
@ -581,6 +598,7 @@ proc build*(
|
||||
metricsServerConf: metricsServerConf,
|
||||
restServerConf: restServerConf,
|
||||
dnsDiscoveryConf: dnsDiscoveryConf,
|
||||
mixConf: mixConf,
|
||||
# end confs
|
||||
nodeKey: nodeKey,
|
||||
clusterId: clusterId,
|
||||
|
||||
@ -625,6 +625,13 @@ with the drawback of consuming some more bandwidth.""",
|
||||
name: "rendezvous"
|
||||
.}: bool
|
||||
|
||||
#Mix config
|
||||
mix* {.desc: "Enable mix protocol: true|false", defaultValue: false, name: "mix".}:
|
||||
bool
|
||||
|
||||
mixkey* {.desc: "ED25519 private key as 64 char hex string.", name: "mixkey".}:
|
||||
Option[string]
|
||||
|
||||
## websocket config
|
||||
websocketSupport* {.
|
||||
desc: "Enable websocket: true|false",
|
||||
@ -976,6 +983,11 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
|
||||
b.storeServiceConf.storeSyncConf.withRangeSec(n.storeSyncRange)
|
||||
b.storeServiceConf.storeSyncConf.withRelayJitterSec(n.storeSyncRelayJitter)
|
||||
|
||||
b.mixConf.withEnabled(n.mix)
|
||||
b.withMix(n.mix)
|
||||
if n.mixkey.isSome():
|
||||
b.mixConf.withMixKey(n.mixkey.get())
|
||||
|
||||
b.filterServiceConf.withEnabled(n.filter)
|
||||
b.filterServiceConf.withSubscriptionTimeout(n.filterSubscriptionTimeout)
|
||||
b.filterServiceConf.withMaxPeersToServe(n.filterMaxPeersToServe)
|
||||
|
||||
@ -2,14 +2,16 @@ import
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/multiaddress,
|
||||
libp2p/nameresolving/dnsresolver,
|
||||
std/[options, sequtils, net],
|
||||
std/[options, sequtils, net, strutils],
|
||||
results
|
||||
import
|
||||
../common/utils/nat,
|
||||
../node/net_config,
|
||||
../waku_enr,
|
||||
../waku_enr/mix,
|
||||
../waku_core,
|
||||
./waku_conf,
|
||||
./networks_config
|
||||
@ -33,6 +35,9 @@ proc enrConfiguration*(
|
||||
).isOkOr:
|
||||
return err("could not initialize ENR with shards")
|
||||
|
||||
if conf.mixConf.isSome():
|
||||
enrBuilder.withMixKey(conf.mixConf.get().mixPubKey)
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
|
||||
@ -6,7 +6,8 @@ import
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/connectivity/relay/relay,
|
||||
libp2p/nameresolving/dnsresolver,
|
||||
libp2p/crypto/crypto
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519
|
||||
|
||||
import
|
||||
./internal_config,
|
||||
@ -415,6 +416,10 @@ proc setupProtocols(
|
||||
return
|
||||
err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)
|
||||
|
||||
#mount mix
|
||||
if conf.mixConf.isSome():
|
||||
(await node.mountMix(conf.clusterId, conf.mixConf.get().mixKey)).isOkOr:
|
||||
return err("failed to mount waku mix protocol: " & $error)
|
||||
return ok()
|
||||
|
||||
## Start node
|
||||
|
||||
@ -9,6 +9,7 @@ import
|
||||
libp2p/protocols/connectivity/relay/client,
|
||||
libp2p/wire,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/services/autorelayservice,
|
||||
libp2p/services/hpservice,
|
||||
|
||||
@ -3,6 +3,7 @@ import
|
||||
chronicles,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/multiaddress,
|
||||
libp2p/crypto/curve25519,
|
||||
secp256k1,
|
||||
results
|
||||
|
||||
@ -35,6 +36,10 @@ type StoreSyncConf* {.requiresInit.} = object
|
||||
intervalSec*: uint32
|
||||
relayJitterSec*: uint32
|
||||
|
||||
type MixConf* = ref object
|
||||
mixKey*: Curve25519Key
|
||||
mixPubKey*: Curve25519Key
|
||||
|
||||
type StoreServiceConf* {.requiresInit.} = object
|
||||
dbMigration*: bool
|
||||
dbURl*: string
|
||||
@ -92,6 +97,7 @@ type WakuConf* {.requiresInit.} = ref object
|
||||
restServerConf*: Option[RestServerConf]
|
||||
metricsServerConf*: Option[MetricsServerConf]
|
||||
webSocketConf*: Option[WebSocketConf]
|
||||
mixConf*: Option[MixConf]
|
||||
|
||||
portsShift*: uint16
|
||||
dnsAddrsNameServers*: seq[IpAddress]
|
||||
|
||||
@ -9,9 +9,12 @@ import
|
||||
stew/byteutils,
|
||||
eth/keys,
|
||||
nimcrypto,
|
||||
nimcrypto/utils as ncrutils,
|
||||
bearssl/rand,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/[multiaddress, multicodec],
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
@ -19,7 +22,13 @@ import
|
||||
libp2p/transports/transport,
|
||||
libp2p/transports/tcptransport,
|
||||
libp2p/transports/wstransport,
|
||||
libp2p/utility
|
||||
libp2p/utility,
|
||||
mix/mix_node,
|
||||
mix/mix_protocol,
|
||||
mix/curve25519,
|
||||
mix/protocol,
|
||||
mix/mix_metrics
|
||||
|
||||
import
|
||||
../waku_core,
|
||||
../waku_core/topics/sharding,
|
||||
@ -48,7 +57,11 @@ import
|
||||
../waku_rln_relay,
|
||||
./net_config,
|
||||
./peer_manager,
|
||||
../common/rate_limit/setting
|
||||
../common/rate_limit/setting,
|
||||
../discovery/autonat_service,
|
||||
../common/nimchronos,
|
||||
../waku_enr/mix,
|
||||
../waku_mix
|
||||
|
||||
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
||||
declarePublicHistogram waku_histogram_message_size,
|
||||
@ -121,6 +134,8 @@ type
|
||||
started*: bool # Indicates that node has started listening
|
||||
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
||||
rateLimitSettings*: ProtocolRateLimitSettings
|
||||
mix*: WakuMix
|
||||
mixbootNodes*: Table[PeerId, MixPubInfo]
|
||||
|
||||
proc new*(
|
||||
T: type WakuNode,
|
||||
@ -206,6 +221,33 @@ proc mountSharding*(
|
||||
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
|
||||
return ok()
|
||||
|
||||
proc getMixNodePoolSize*(node: WakuNode): int =
|
||||
return node.mix.getNodePoolSize()
|
||||
|
||||
proc mountMix*(
|
||||
node: WakuNode, clusterId: uint16, mixPrivKey: Curve25519Key
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
info "mounting mix protocol", nodeId = node.info #TODO log the config used
|
||||
|
||||
let localaddrStr = node.announcedAddresses[0].toString().valueOr:
|
||||
return err("Failed to convert multiaddress to string.")
|
||||
info "local addr", localaddr = localaddrStr
|
||||
|
||||
let nodeAddr = localaddrStr & "/p2p/" & $node.peerId
|
||||
# TODO: Pass bootnodes from config,
|
||||
let protoRes = WakuMix.new(nodeAddr, node.peerManager, clusterId, mixPrivKey)
|
||||
if protoRes.isErr:
|
||||
error "Waku Mix protocol initialization failed", err = protoRes.error
|
||||
return
|
||||
node.mix = protoRes.value
|
||||
|
||||
let catchRes = catch:
|
||||
node.switch.mount(node.mix)
|
||||
if catchRes.isErr():
|
||||
return err(catchRes.error.msg)
|
||||
node.mix.start()
|
||||
return ok()
|
||||
|
||||
## Waku Sync
|
||||
|
||||
proc mountStoreSync*(
|
||||
|
||||
@ -3,6 +3,7 @@
|
||||
import
|
||||
std/[options, bitops, sequtils, net, tables], results, eth/keys, libp2p/crypto/crypto
|
||||
import ../common/enr, ../waku_core/codecs
|
||||
import mix/mix_protocol
|
||||
|
||||
const CapabilitiesEnrField* = "waku2"
|
||||
|
||||
@ -20,6 +21,7 @@ type
|
||||
Filter = 2
|
||||
Lightpush = 3
|
||||
Sync = 4
|
||||
Mix = 5
|
||||
|
||||
const capabilityToCodec = {
|
||||
Capabilities.Relay: WakuRelayCodec,
|
||||
@ -27,10 +29,12 @@ const capabilityToCodec = {
|
||||
Capabilities.Filter: WakuFilterSubscribeCodec,
|
||||
Capabilities.Lightpush: WakuLightPushCodec,
|
||||
Capabilities.Sync: WakuReconciliationCodec,
|
||||
Capabilities.Mix: MixProtocolID,
|
||||
}.toTable
|
||||
|
||||
func init*(
|
||||
T: type CapabilitiesBitfield, lightpush, filter, store, relay, sync: bool = false
|
||||
T: type CapabilitiesBitfield,
|
||||
lightpush, filter, store, relay, sync, mix: bool = false,
|
||||
): T =
|
||||
## Creates an waku2 ENR flag bit field according to RFC 31 (https://rfc.vac.dev/spec/31/)
|
||||
var bitfield: uint8
|
||||
@ -44,6 +48,8 @@ func init*(
|
||||
bitfield.setBit(3)
|
||||
if sync:
|
||||
bitfield.setBit(4)
|
||||
if mix:
|
||||
bitfield.setBit(5)
|
||||
CapabilitiesBitfield(bitfield)
|
||||
|
||||
func init*(T: type CapabilitiesBitfield, caps: varargs[Capabilities]): T =
|
||||
|
||||
26
waku/waku_enr/mix.nim
Normal file
26
waku/waku_enr/mix.nim
Normal file
@ -0,0 +1,26 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options], results, libp2p/crypto/curve25519, nimcrypto/utils as ncrutils
|
||||
|
||||
import ../common/enr
|
||||
|
||||
const MixKeyEnrField* = "mix-key"
|
||||
|
||||
func withMixKey*(builder: var EnrBuilder, mixPubKey: Curve25519Key) =
|
||||
builder.addFieldPair(MixKeyEnrField, getBytes(mixPubKey))
|
||||
|
||||
func mixKey*(record: TypedRecord): Option[seq[byte]] =
|
||||
let field = record.tryGet(MixKeyEnrField, seq[byte])
|
||||
if field.isNone():
|
||||
return none(seq[byte])
|
||||
return field
|
||||
|
||||
func mixKey*(record: Record): Option[seq[byte]] =
|
||||
let recordRes = record.toTyped()
|
||||
if recordRes.isErr():
|
||||
return none(seq[byte])
|
||||
|
||||
let field = recordRes.value.tryGet(MixKeyEnrField, seq[byte])
|
||||
if field.isNone():
|
||||
return none(seq[byte])
|
||||
return field
|
||||
@ -1,7 +1,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
|
||||
import libp2p/peerid
|
||||
import libp2p/peerid, libp2p/stream/connection
|
||||
import
|
||||
../waku_core/peers,
|
||||
../node/peer_manager,
|
||||
@ -101,12 +101,16 @@ proc publishToAny*(
|
||||
if message.timestamp == 0:
|
||||
message.timestamp = getNowInNanosecondTime()
|
||||
|
||||
info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
|
||||
|
||||
let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
|
||||
# TODO: check if it is matches the situation - shall we distinguish client side missing peers from server side?
|
||||
return lighpushErrorResult(NO_PEERS_TO_RELAY, "no suitable remote peers")
|
||||
|
||||
info "publishToAny",
|
||||
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
|
||||
peer_id = peer.peerId,
|
||||
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex,
|
||||
sentTime = getNowInNanosecondTime()
|
||||
|
||||
let pushRequest = LightpushRequest(
|
||||
requestId: generateRequestId(wl.rng),
|
||||
pubSubTopic: some(pubSubTopic),
|
||||
@ -118,3 +122,35 @@ proc publishToAny*(
|
||||
obs.onMessagePublished(pubSubTopic, message)
|
||||
|
||||
return lightpushSuccessResult(publishedCount)
|
||||
|
||||
#TODO: Remove multiple publishX procs and use Option pattern instead
|
||||
proc publishWithConn*(
|
||||
wl: WakuLightPushClient,
|
||||
pubSubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
conn: Connection,
|
||||
destPeer: PeerId,
|
||||
): Future[WakuLightPushResult] {.async, gcsafe.} =
|
||||
## This proc is similar to the publish one but in this case
|
||||
## we use existing connection to publish.
|
||||
|
||||
info "publishWithConn",
|
||||
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
|
||||
peer_id = destPeer,
|
||||
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex,
|
||||
sentTime = getNowInNanosecondTime()
|
||||
|
||||
let pushRequest = LightpushRequest(
|
||||
requestId: generateRequestId(wl.rng),
|
||||
pubSubTopic: some(pubSubTopic),
|
||||
message: message,
|
||||
)
|
||||
|
||||
await conn.writeLP(pushRequest.encode().buffer)
|
||||
|
||||
for obs in wl.publishObservers:
|
||||
obs.onMessagePublished(pubSubTopic, message)
|
||||
|
||||
#TODO: Implement response handling.
|
||||
|
||||
return lightpushSuccessResult(1)
|
||||
|
||||
3
waku/waku_mix.nim
Normal file
3
waku/waku_mix.nim
Normal file
@ -0,0 +1,3 @@
|
||||
import ./waku_mix/[protocol]
|
||||
|
||||
export protocol
|
||||
159
waku/waku_mix/protocol.nim
Normal file
159
waku/waku_mix/protocol.nim
Normal file
@ -0,0 +1,159 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import chronicles, std/[options, sequtils], chronos, results, metrics
|
||||
|
||||
import
|
||||
libp2p/crypto/curve25519,
|
||||
mix/mix_protocol,
|
||||
mix/mix_node,
|
||||
mix/mix_metrics,
|
||||
mix/tag_manager,
|
||||
libp2p/[multiaddress, multicodec, peerid]
|
||||
|
||||
import
|
||||
../node/peer_manager,
|
||||
../waku_core,
|
||||
../waku_enr/mix,
|
||||
../waku_enr,
|
||||
../node/peer_manager/waku_peer_store,
|
||||
../common/nimchronos
|
||||
|
||||
logScope:
|
||||
topics = "waku mix"
|
||||
|
||||
type
|
||||
WakuMix* = ref object of MixProtocol
|
||||
peerManager*: PeerManager
|
||||
clusterId: uint16
|
||||
|
||||
WakuMixResult*[T] = Result[T, string]
|
||||
|
||||
proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
||||
# Note that origin based(discv5) filtering is not done intentionally
|
||||
# so that more mix nodes can be discovered.
|
||||
if peer.enr.isNone():
|
||||
trace "peer has no ENR", peer = $peer
|
||||
return false
|
||||
|
||||
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
|
||||
debug "peer has mismatching cluster", peer = $peer
|
||||
return false
|
||||
|
||||
# Filter if mix is enabled
|
||||
if not peer.enr.get().supportsCapability(Capabilities.Mix):
|
||||
debug "peer doesn't support mix", peer = $peer
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress =
|
||||
if multiaddr.contains(multiCodec("p2p")).get():
|
||||
return multiaddr
|
||||
|
||||
var maddrStr = multiaddr.toString().valueOr:
|
||||
error "Failed to convert multiaddress to string.", err = error
|
||||
return multiaddr
|
||||
maddrStr.add("/p2p/" & $peerId)
|
||||
var cleanAddr = MultiAddress.init(maddrStr).valueOr:
|
||||
error "Failed to convert string to multiaddress.", err = error
|
||||
return multiaddr
|
||||
return cleanAddr
|
||||
|
||||
proc populateMixNodePool*(mix: WakuMix) =
|
||||
# populate only peers that i) are reachable ii) share cluster iii) support mix
|
||||
let remotePeers = mix.peerManager.switch.peerStore.getReachablePeers().filterIt(
|
||||
mixPoolFilter(some(mix.clusterId), it)
|
||||
)
|
||||
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||
|
||||
for i in 0 ..< min(remotePeers.len, 100):
|
||||
let remotePeerENR = remotePeers[i].enr.get()
|
||||
# TODO: use the most exposed/external multiaddr of the peer, right now using the first
|
||||
let maddrWithPeerId =
|
||||
toString(appendPeerIdToMultiaddr(remotePeers[i].addrs[0], remotePeers[i].peerId))
|
||||
trace "remote peer ENR",
|
||||
peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId
|
||||
|
||||
let peerMixPubKey = mixKey(remotePeerENR).get()
|
||||
let mixNodePubInfo =
|
||||
createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey))
|
||||
mixNodes[remotePeers[i].peerId] = mixNodePubInfo
|
||||
|
||||
mix_pool_size.set(len(mixNodes))
|
||||
# set the mix node pool
|
||||
mix.setNodePool(mixNodes)
|
||||
trace "mix node pool updated", poolSize = mix.getNodePoolSize()
|
||||
|
||||
proc startMixNodePoolMgr*(mix: WakuMix) {.async.} =
|
||||
info "starting mix node pool manager"
|
||||
# try more aggressively to populate the pool at startup
|
||||
var attempts = 50
|
||||
# TODO: make initial pool size configurable
|
||||
while mix.getNodePoolSize() < 100 and attempts > 0:
|
||||
attempts -= 1
|
||||
mix.populateMixNodePool()
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
# TODO: make interval configurable
|
||||
heartbeat "Updating mix node pool", 5.seconds:
|
||||
mix.populateMixNodePool()
|
||||
|
||||
#[ proc getBootStrapMixNodes*(node: WakuNode): Table[PeerId, MixPubInfo] =
|
||||
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||
# MixNode Multiaddrs and PublicKeys:
|
||||
let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
|
||||
"/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF",
|
||||
"/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA",
|
||||
"/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f",
|
||||
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu",
|
||||
]
|
||||
let bootNodesMixPubKeys = ["9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c",
|
||||
"9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a",
|
||||
"275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c",
|
||||
"e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18",
|
||||
"8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
|
||||
]
|
||||
for index, mixNodeMultiaddr in bootNodesMultiaddrs:
|
||||
let peerIdRes = getPeerIdFromMultiAddr(mixNodeMultiaddr)
|
||||
if peerIdRes.isErr:
|
||||
error "Failed to get peer id from multiaddress: " , error = peerIdRes.error
|
||||
let peerId = peerIdRes.get()
|
||||
#if (not peerID == nil) and peerID == exceptPeerID:
|
||||
# continue
|
||||
let mixNodePubInfo = createMixPubInfo(mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index])))
|
||||
|
||||
mixNodes[peerId] = mixNodePubInfo
|
||||
info "using mix bootstrap nodes ", bootNodes = mixNodes
|
||||
return mixNodes
|
||||
]#
|
||||
|
||||
proc new*(
|
||||
T: type WakuMix,
|
||||
nodeAddr: string,
|
||||
peermgr: PeerManager,
|
||||
clusterId: uint16,
|
||||
mixPrivKey: Curve25519Key,
|
||||
): WakuMixResult[T] =
|
||||
let mixPubKey = public(mixPrivKey)
|
||||
info "mixPrivKey", mixPrivKey = mixPrivKey, mixPubKey = mixPubKey
|
||||
|
||||
let localMixNodeInfo = initMixNodeInfo(
|
||||
nodeAddr, mixPubKey, mixPrivKey, peermgr.switch.peerInfo.publicKey.skkey,
|
||||
peermgr.switch.peerInfo.privateKey.skkey,
|
||||
)
|
||||
|
||||
# TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered
|
||||
var m = WakuMix(peerManager: peermgr, clusterId: clusterId)
|
||||
procCall MixProtocol(m).initialize(
|
||||
localMixNodeInfo, peermgr.switch, initTable[PeerId, MixPubInfo]()
|
||||
)
|
||||
|
||||
return ok(m)
|
||||
|
||||
proc start*(mix: Wakumix) =
|
||||
discard mix.startMixNodePoolMgr()
|
||||
|
||||
#[ proc setMixBootStrapNodes*(node: WakuNode,){.async}=
|
||||
node.mix.setNodePool(node.getBootStrapMixNodes())
|
||||
]#
|
||||
# Mix Protocol
|
||||
Loading…
x
Reference in New Issue
Block a user