feat: mix poc (#3284)

* feat: poc to integrate mix into waku and use lightpush to demonstrate
This commit is contained in:
Prem Chaitanya Prathi 2025-09-11 20:40:01 +05:30 committed by GitHub
parent 753891217e
commit 0740812816
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 1947 additions and 35 deletions

4
.gitmodules vendored
View File

@ -184,3 +184,7 @@
url = https://github.com/waku-org/waku-rlnv2-contract.git
ignore = untracked
branch = master
[submodule "vendor/mix"]
path = vendor/mix
url = https://github.com/vacp2p/mix/
branch = main

View File

@ -0,0 +1,58 @@
# BUILD NIM APP ----------------------------------------------------------------
FROM rust:1.81.0-alpine3.19 AS nim-build
ARG NIMFLAGS
ARG MAKE_TARGET=lightpushwithmix
ARG NIM_COMMIT
ARG LOG_LEVEL=TRACE
# Get build tools and required header files
RUN apk add --no-cache bash git build-base openssl-dev pcre-dev linux-headers curl jq
WORKDIR /app
COPY . .
# workaround for alpine issue: https://github.com/alpinelinux/docker-alpine/issues/383
RUN apk update && apk upgrade
# Ran separately from 'make' to avoid re-doing
RUN git submodule update --init --recursive
# Slowest build step for the sake of caching layers
RUN make -j$(nproc) deps QUICK_AND_DIRTY_COMPILER=1 ${NIM_COMMIT}
# Build the final node binary
RUN make -j$(nproc) ${NIM_COMMIT} $MAKE_TARGET LOG_LEVEL=${LOG_LEVEL} NIMFLAGS="${NIMFLAGS}"
# REFERENCE IMAGE as BASE for specialized PRODUCTION IMAGES----------------------------------------
FROM alpine:3.18 AS base_lpt
ARG MAKE_TARGET=lightpushwithmix
LABEL maintainer="prem@waku.org"
LABEL source="https://github.com/waku-org/nwaku"
LABEL description="Lite Push With Mix: Waku light-client"
LABEL commit="unknown"
LABEL version="unknown"
# DevP2P, LibP2P, and JSON RPC ports
EXPOSE 30303 60000 8545
# Referenced in the binary
RUN apk add --no-cache libgcc pcre-dev libpq-dev \
wget \
iproute2 \
python3 \
jq
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3
COPY --from=nim-build /app/build/lightpush_publisher_mix /usr/bin/
RUN chmod +x /usr/bin/lightpush_publisher_mix
# Standalone image to be used manually and in lpt-runner -------------------------------------------
FROM base_lpt AS standalone_lpt
ENTRYPOINT ["/usr/bin/lightpush_publisher_mix"]

View File

@ -240,6 +240,10 @@ chat2: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) waku.nims
chat2mix: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim chat2mix $(NIM_PARAMS) waku.nims
rln-db-inspector: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim rln_db_inspector $(NIM_PARAMS) waku.nims
@ -252,6 +256,10 @@ liteprotocoltester: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim liteprotocoltester $(NIM_PARAMS) waku.nims
lightpushwithmix: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim lightpushwithmix $(NIM_PARAMS) waku.nims
build/%: | build deps librln
echo -e $(BUILD_MSG) "build/$*" && \
$(ENV_SCRIPT) nim buildone $(NIM_PARAMS) waku.nims $*

704
apps/chat2mix/chat2mix.nim Normal file
View File

@ -0,0 +1,704 @@
## chat2 is an example of usage of Waku v2. For suggested usage options, please
## see dingpu tutorial in docs folder.
when not (compileOption("threads")):
{.fatal: "Please, compile this program with the --threads:on option!".}
{.push raises: [].}
import std/[strformat, strutils, times, options, random, sequtils]
import
confutils,
chronicles,
chronos,
eth/keys,
bearssl,
stew/[byteutils, results],
metrics,
metrics/chronos_httpserver
import
libp2p/[
switch, # manage transports, a single entry point for dialing and listening
crypto/crypto, # cryptographic functions
stream/connection, # create and close stream read / write connections
multiaddress,
# encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP
peerinfo,
# manage the information of a peer, such as peer ID and public / private key
peerid, # Implement how peers interact
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
nameresolving/dnsresolver,
] # define DNS resolution
import mix/curve25519
import
waku/[
waku_core,
waku_lightpush/common,
waku_lightpush/rpc,
waku_enr,
discovery/waku_dnsdisc,
waku_node,
node/waku_metrics,
node/peer_manager,
factory/builder,
common/utils/nat,
waku_store/common,
waku_filter_v2/client,
common/logging,
],
./config_chat2mix
import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/pubsub
import ../../waku/waku_rln_relay
logScope:
topics = "chat2 mix"
const Help =
"""
Commands: /[?|help|connect|nick|exit]
help: Prints this help
connect: dials a remote peer
nick: change nickname for current chat session
exit: exits chat session
"""
# XXX Connected is a bit annoying, because incoming connections don't trigger state change
# Could poll connection pool or something here, I suppose
# TODO Ensure connected turns true on incoming connections, or get rid of it
type Chat = ref object
node: WakuNode # waku node for publishing, subscribing, etc
transp: StreamTransport # transport streams between read & write file descriptor
subscribed: bool # indicates if a node is subscribed or not to a topic
connected: bool # if the node is connected to another peer
started: bool # if the node has started
nick: string # nickname for this chat session
prompt: bool # chat prompt is showing
contentTopic: string # default content topic for chat messages
conf: Chat2Conf # configuration for chat2
type
PrivateKey* = crypto.PrivateKey
Topic* = waku_core.PubsubTopic
#####################
## chat2 protobufs ##
#####################
type
SelectResult*[T] = Result[T, string]
Chat2Message* = object
timestamp*: int64
nick*: string
payload*: seq[byte]
proc getPubsubTopic*(
conf: Chat2Conf, node: WakuNode, contentTopic: string
): PubsubTopic =
let shard = node.wakuAutoSharding.get().getShard(contentTopic).valueOr:
echo "Could not parse content topic: " & error
return "" #TODO: fix this.
return $RelayShard(clusterId: conf.clusterId, shardId: shard.shardId)
proc init*(T: type Chat2Message, buffer: seq[byte]): ProtoResult[T] =
var msg = Chat2Message()
let pb = initProtoBuffer(buffer)
var timestamp: uint64
discard ?pb.getField(1, timestamp)
msg.timestamp = int64(timestamp)
discard ?pb.getField(2, msg.nick)
discard ?pb.getField(3, msg.payload)
ok(msg)
proc encode*(message: Chat2Message): ProtoBuffer =
var serialised = initProtoBuffer()
serialised.write(1, uint64(message.timestamp))
serialised.write(2, message.nick)
serialised.write(3, message.payload)
return serialised
proc toString*(message: Chat2Message): string =
# Get message date and timestamp in local time
let time = message.timestamp.fromUnix().local().format("'<'MMM' 'dd,' 'HH:mm'>'")
return time & " " & message.nick & ": " & string.fromBytes(message.payload)
#####################
proc connectToNodes(c: Chat, nodes: seq[string]) {.async.} =
echo "Connecting to nodes"
await c.node.connectToNodes(nodes)
c.connected = true
proc showChatPrompt(c: Chat) =
if not c.prompt:
try:
stdout.write(">> ")
stdout.flushFile()
c.prompt = true
except IOError:
discard
proc getChatLine(c: Chat, msg: WakuMessage): Result[string, string] =
# No payload encoding/encryption from Waku
let
pb = Chat2Message.init(msg.payload)
chatLine =
if pb.isOk:
pb[].toString()
else:
string.fromBytes(msg.payload)
return ok(chatline)
proc printReceivedMessage(c: Chat, msg: WakuMessage) =
let
pb = Chat2Message.init(msg.payload)
chatLine =
if pb.isOk:
pb[].toString()
else:
string.fromBytes(msg.payload)
try:
echo &"{chatLine}"
except ValueError:
# Formatting fail. Print chat line in any case.
echo chatLine
c.prompt = false
showChatPrompt(c)
trace "Printing message", chatLine, contentTopic = msg.contentTopic
proc readNick(transp: StreamTransport): Future[string] {.async.} =
# Chat prompt
stdout.write("Choose a nickname >> ")
stdout.flushFile()
return await transp.readLine()
proc startMetricsServer(
serverIp: IpAddress, serverPort: Port
): Result[MetricsHttpServerRef, string] =
info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $serverPort
let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort)
if metricsServerRes.isErr():
return err("metrics HTTP server start failed: " & $metricsServerRes.error)
let server = metricsServerRes.value
try:
waitFor server.start()
except CatchableError:
return err("metrics HTTP server start failed: " & getCurrentExceptionMsg())
info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $serverPort
ok(metricsServerRes.value)
proc publish(c: Chat, line: string) {.async.} =
# First create a Chat2Message protobuf with this line of text
let time = getTime().toUnix()
let chat2pb =
Chat2Message(timestamp: time, nick: c.nick, payload: line.toBytes()).encode()
## @TODO: error handling on failure
proc handler(response: LightPushResponse) {.gcsafe, closure.} =
trace "lightpush response received", response = response
var message = WakuMessage(
payload: chat2pb.buffer,
contentTopic: c.contentTopic,
version: 0,
timestamp: getNanosecondTime(time),
)
try:
if not c.node.wakuLightpushClient.isNil():
# Attempt lightpush with mix
(
waitFor c.node.lightpushPublish(
some(c.conf.getPubsubTopic(c.node, c.contentTopic)),
message,
none(RemotePeerInfo),
true,
)
).isOkOr:
error "failed to publish lightpush message", error = error
else:
error "failed to publish message as lightpush client is not initialized"
except CatchableError:
error "caught error publishing message: ", error = getCurrentExceptionMsg()
# TODO This should read or be subscribe handler subscribe
proc readAndPrint(c: Chat) {.async.} =
while true:
# while p.connected:
# # TODO: echo &"{p.id} -> "
#
# echo cast[string](await p.conn.readLp(1024))
#echo "readAndPrint subscribe NYI"
await sleepAsync(100)
# TODO Implement
proc writeAndPrint(c: Chat) {.async.} =
while true:
# Connect state not updated on incoming WakuRelay connections
# if not c.connected:
# echo "type an address or wait for a connection:"
# echo "type /[help|?] for help"
# Chat prompt
showChatPrompt(c)
let line = await c.transp.readLine()
if line.startsWith("/help") or line.startsWith("/?") or not c.started:
echo Help
continue
# if line.startsWith("/disconnect"):
# echo "Ending current session"
# if p.connected and p.conn.closed.not:
# await p.conn.close()
# p.connected = false
elif line.startsWith("/connect"):
# TODO Should be able to connect to multiple peers for Waku chat
if c.connected:
echo "already connected to at least one peer"
continue
echo "enter address of remote peer"
let address = await c.transp.readLine()
if address.len > 0:
await c.connectToNodes(@[address])
elif line.startsWith("/nick"):
# Set a new nickname
c.nick = await readNick(c.transp)
echo "You are now known as " & c.nick
elif line.startsWith("/exit"):
echo "quitting..."
try:
await c.node.stop()
except:
echo "exception happened when stopping: " & getCurrentExceptionMsg()
quit(QuitSuccess)
else:
# XXX connected state problematic
if c.started:
echo "publishing message: " & line
await c.publish(line)
# TODO Connect to peer logic?
else:
try:
if line.startsWith("/") and "p2p" in line:
await c.connectToNodes(@[line])
except:
echo &"unable to dial remote peer {line}"
echo getCurrentExceptionMsg()
proc readWriteLoop(c: Chat) {.async.} =
asyncSpawn c.writeAndPrint() # execute the async function but does not block
asyncSpawn c.readAndPrint()
proc readInput(wfd: AsyncFD) {.thread, raises: [Defect, CatchableError].} =
## This procedure performs reading from `stdin` and sends data over
## pipe to main thread.
let transp = fromPipe(wfd)
while true:
let line = stdin.readLine()
discard waitFor transp.write(line & "\r\n")
var alreadyUsedServicePeers {.threadvar.}: seq[RemotePeerInfo]
proc selectRandomServicePeer*(
pm: PeerManager, actualPeer: Option[RemotePeerInfo], codec: string
): Result[RemotePeerInfo, void] =
if actualPeer.isSome():
alreadyUsedServicePeers.add(actualPeer.get())
let supportivePeers = pm.switch.peerStore.getPeersByProtocol(codec).filterIt(
it notin alreadyUsedServicePeers
)
if supportivePeers.len == 0:
return err()
let rndPeerIndex = rand(0 .. supportivePeers.len - 1)
return ok(supportivePeers[rndPeerIndex])
proc maintainSubscription(
wakuNode: WakuNode,
filterPubsubTopic: PubsubTopic,
filterContentTopic: ContentTopic,
filterPeer: RemotePeerInfo,
preventPeerSwitch: bool,
) {.async.} =
var actualFilterPeer = filterPeer
const maxFailedSubscribes = 3
const maxFailedServiceNodeSwitches = 10
var noFailedSubscribes = 0
var noFailedServiceNodeSwitches = 0
while true:
info "maintaining subscription at", peer = constructMultiaddrStr(actualFilterPeer)
# First use filter-ping to check if we have an active subscription
let pingRes = await wakuNode.wakuFilterClient.ping(actualFilterPeer)
if pingRes.isErr():
# No subscription found. Let's subscribe.
error "ping failed.", err = pingRes.error
trace "no subscription found. Sending subscribe request"
let subscribeRes = await wakuNode.filterSubscribe(
some(filterPubsubTopic), filterContentTopic, actualFilterPeer
)
if subscribeRes.isErr():
noFailedSubscribes += 1
error "Subscribe request failed.",
err = subscribeRes.error,
peer = actualFilterPeer,
failCount = noFailedSubscribes
# TODO: disconnet from failed actualFilterPeer
# asyncSpawn(wakuNode.peerManager.switch.disconnect(p))
# wakunode.peerManager.peerStore.delete(actualFilterPeer)
if noFailedSubscribes < maxFailedSubscribes:
await sleepAsync(2000) # Wait a bit before retrying
continue
elif not preventPeerSwitch:
let peerOpt = selectRandomServicePeer(
wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec
)
if peerOpt.isOk():
actualFilterPeer = peerOpt.get()
info "Found new peer for codec",
codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer)
noFailedSubscribes = 0
continue # try again with new peer without delay
else:
error "Failed to find new service peer. Exiting."
noFailedServiceNodeSwitches += 1
break
else:
if noFailedSubscribes > 0:
noFailedSubscribes -= 1
notice "subscribe request successful."
else:
info "subscription is live."
await sleepAsync(30000) # Subscription maintenance interval
proc processMixNodes(localnode: WakuNode, nodes: seq[string]) {.async.} =
if nodes.len == 0:
return
info "Processing mix nodes: ", nodes = $nodes
for node in nodes:
var enrRec: enr.Record
if enrRec.fromURI(node):
let peerInfo = enrRec.toRemotePeerInfo().valueOr:
error "Failed to parse mix node", error = error
continue
localnode.peermanager.addPeer(peerInfo, Discv5)
info "Added mix node", peer = peerInfo
else:
error "Failed to parse mix node ENR", node = node
{.pop.}
# @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
let
transp = fromPipe(rfd)
conf = Chat2Conf.load()
nodekey =
if conf.nodekey.isSome():
conf.nodekey.get()
else:
PrivateKey.random(Secp256k1, rng[]).tryGet()
# set log level
if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
let natRes = setupNat(
conf.nat,
clientId,
Port(uint16(conf.tcpPort) + conf.portsShift),
Port(uint16(conf.udpPort) + conf.portsShift),
)
if natRes.isErr():
raise newException(ValueError, "setupNat error " & natRes.error)
let (extIp, extTcpPort, extUdpPort) = natRes.get()
var enrBuilder = EnrBuilder.init(nodeKey)
enrBuilder.withWakuRelaySharding(
RelayShards(clusterId: conf.clusterId, shardIds: conf.shards)
).isOkOr:
error "failed to add sharded topics to ENR", error = error
quit(QuitFailure)
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
error "failed to create enr record", error = recordRes.error
quit(QuitFailure)
else:
recordRes.get()
let node = block:
var builder = WakuNodeBuilder.init()
builder.withNodeKey(nodeKey)
builder.withRecord(record)
builder
.withNetworkConfigurationDetails(
conf.listenAddress,
Port(uint16(conf.tcpPort) + conf.portsShift),
extIp,
extTcpPort,
wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift),
wsEnabled = conf.websocketSupport,
wssEnabled = conf.websocketSecureSupport,
)
.tryGet()
builder.build().tryGet()
node.mountAutoSharding(conf.clusterId, conf.numShardsInNetwork).isOkOr:
error "failed to mount waku sharding: ", error = error
quit(QuitFailure)
node.mountMetadata(conf.clusterId, conf.shards).isOkOr:
error "failed to mount waku metadata protocol: ", err = error
quit(QuitFailure)
try:
await node.mountPeerExchange()
except CatchableError:
error "failed to mount waku peer-exchange protocol",
error = getCurrentExceptionMsg()
quit(QuitFailure)
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
error "failed to generate mix key pair", error = error
return
(await node.mountMix(conf.clusterId, mixPrivKey)).isOkOr:
error "failed to mount waku mix protocol: ", error = $error
quit(QuitFailure)
if conf.mixnodes.len > 0:
await processMixNodes(node, conf.mixnodes)
await node.start()
node.peerManager.start()
#[ if conf.rlnRelayCredPath == "":
raise newException(ConfigurationError, "rln-relay-cred-path MUST be passed")
if conf.relay:
let shards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
(await node.mountRelay()).isOkOr:
echo "failed to mount relay: " & error
return
]#
await node.mountLibp2pPing()
let pubsubTopic = conf.getPubsubTopic(node, conf.contentTopic)
echo "pubsub topic is: " & pubsubTopic
let nick = await readNick(transp)
echo "Welcome, " & nick & "!"
var chat = Chat(
node: node,
transp: transp,
subscribed: true,
connected: false,
started: true,
nick: nick,
prompt: false,
contentTopic: conf.contentTopic,
conf: conf,
)
var dnsDiscoveryUrl = none(string)
if conf.fleet != Fleet.none:
# Use DNS discovery to connect to selected fleet
echo "Connecting to " & $conf.fleet & " fleet using DNS discovery..."
if conf.fleet == Fleet.test:
dnsDiscoveryUrl = some(
"enrtree://AOGYWMBYOUIMOENHXCHILPKY3ZRFEULMFI4DOM442QSZ73TT2A7VI@test.waku.nodes.status.im"
)
else:
# Connect to sandbox by default
dnsDiscoveryUrl = some(
"enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im"
)
elif conf.dnsDiscoveryUrl != "":
# No pre-selected fleet. Discover nodes via DNS using user config
debug "Discovering nodes using Waku DNS discovery", url = conf.dnsDiscoveryUrl
dnsDiscoveryUrl = some(conf.dnsDiscoveryUrl)
var discoveredNodes: seq[RemotePeerInfo]
if dnsDiscoveryUrl.isSome:
var nameServers: seq[TransportAddress]
for ip in conf.dnsDiscoveryNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
let dnsResolver = DnsResolver.new(nameServers)
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
trace "resolving", domain = domain
let resolved = await dnsResolver.resolveTxt(domain)
return resolved[0] # Use only first answer
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver)
if wakuDnsDiscovery.isOk:
let discoveredPeers = await wakuDnsDiscovery.get().findPeers()
if discoveredPeers.isOk:
info "Connecting to discovered peers"
discoveredNodes = discoveredPeers.get()
echo "Discovered and connecting to " & $discoveredNodes
waitFor chat.node.connectToNodes(discoveredNodes)
else:
warn "Failed to init Waku DNS discovery"
let peerInfo = node.switch.peerInfo
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
echo &"Listening on\n {listenStr}"
if (conf.storenode != "") or (conf.store == true):
await node.mountStore()
var storenode: Option[RemotePeerInfo]
if conf.storenode != "":
let peerInfo = parsePeerInfo(conf.storenode)
if peerInfo.isOk():
storenode = some(peerInfo.value)
else:
error "Incorrect conf.storenode", error = peerInfo.error
elif discoveredNodes.len > 0:
echo "Store enabled, but no store nodes configured. Choosing one at random from discovered peers"
storenode = some(discoveredNodes[rand(0 .. len(discoveredNodes) - 1)])
if storenode.isSome():
# We have a viable storenode. Let's query it for historical messages.
echo "Connecting to storenode: " & $(storenode.get())
node.mountStoreClient()
node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec)
proc storeHandler(response: StoreQueryResponse) {.gcsafe.} =
for msg in response.messages:
let payload =
if msg.message.isSome():
msg.message.get().payload
else:
newSeq[byte](0)
let
pb = Chat2Message.init(payload)
chatLine =
if pb.isOk:
pb[].toString()
else:
string.fromBytes(payload)
echo &"{chatLine}"
info "Hit store handler"
let queryRes = await node.query(
StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get()
)
if queryRes.isOk():
storeHandler(queryRes.value)
if conf.edgemode: #Mount light protocol clients
node.mountLightPushClient()
await node.mountFilterClient()
let filterHandler = proc(
pubsubTopic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, closure.} =
trace "Hit filter handler", contentTopic = msg.contentTopic
chat.printReceivedMessage(msg)
node.wakuFilterClient.registerPushHandler(filterHandler)
if conf.serviceNode != "": #TODO: use one of discovered nodes if not present.
let peerInfo = parsePeerInfo(conf.serviceNode)
if peerInfo.isOk():
#await mountLegacyLightPush(node)
node.peerManager.addServicePeer(peerInfo.value, WakuLightpushCodec)
node.peerManager.addServicePeer(peerInfo.value, WakuPeerExchangeCodec)
# Start maintaining subscription
asyncSpawn maintainSubscription(
node, pubsubTopic, conf.contentTopic, peerInfo.value, false
)
else:
error "LightPushClient not mounted. Couldn't parse conf.serviceNode",
error = peerInfo.error
# TODO: Loop faster
node.startPeerExchangeLoop()
while node.getMixNodePoolSize() < 3:
info "waiting for mix nodes to be discovered",
currentpoolSize = node.getMixNodePoolSize()
await sleepAsync(1000)
notice "ready to publish with mix node pool size ",
currentpoolSize = node.getMixNodePoolSize()
echo "ready to publish messages now"
if conf.metricsLogging:
startMetricsLog()
if conf.metricsServer:
let metricsServer = startMetricsServer(
conf.metricsServerAddress, Port(conf.metricsServerPort + conf.portsShift)
)
await chat.readWriteLoop()
runForever()
proc main(rng: ref HmacDrbgContext) {.async.} =
let (rfd, wfd) = createAsyncPipe()
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
raise newException(ValueError, "Could not initialize pipe!")
var thread: Thread[AsyncFD]
thread.createThread(readInput, wfd)
try:
await processInput(rfd, rng)
# Handle only ConfigurationError for now
# TODO: Throw other errors from the mounting procedure
except ConfigurationError as e:
raise e
when isMainModule: # isMainModule = true when the module is compiled as the main file
let rng = crypto.newRng()
try:
waitFor(main(rng))
except CatchableError as e:
raise e
## Dump of things that can be improved:
##
## - Incoming dialed peer does not change connected state (not relying on it for now)
## - Unclear if staticnode argument works (can enter manually)
## - Don't trigger self / double publish own messages
## - Test/default to cluster node connection (diff protocol version)
## - Redirect logs to separate file
## - Expose basic publish/subscribe etc commands with /syntax
## - Show part of peerid to know who sent message
## - Deal with protobuf messages (e.g. other chat protocol, or encrypted)

View File

@ -0,0 +1,282 @@
import chronicles, chronos, std/strutils, regex
import
eth/keys,
libp2p/crypto/crypto,
libp2p/crypto/secp,
nimcrypto/utils,
confutils,
confutils/defs,
confutils/std/net
import waku/waku_core
type
Fleet* = enum
none
prod
test
EthRpcUrl* = distinct string
Chat2Conf* = object ## General node config
edgemode* {.
defaultValue: true, desc: "Run the app in edge mode", name: "edge-mode"
.}: bool
logLevel* {.
desc: "Sets the log level.", defaultValue: LogLevel.INFO, name: "log-level"
.}: LogLevel
nodekey* {.desc: "P2P node private key as 64 char hex string.", name: "nodekey".}:
Option[crypto.PrivateKey]
listenAddress* {.
defaultValue: defaultListenAddress(config),
desc: "Listening address for the LibP2P traffic.",
name: "listen-address"
.}: IpAddress
tcpPort* {.desc: "TCP listening port.", defaultValue: 60000, name: "tcp-port".}:
Port
udpPort* {.desc: "UDP listening port.", defaultValue: 60000, name: "udp-port".}:
Port
portsShift* {.
desc: "Add a shift to all port numbers.", defaultValue: 0, name: "ports-shift"
.}: uint16
nat* {.
desc:
"Specify method to use for determining public address. " &
"Must be one of: any, none, upnp, pmp, extip:<IP>.",
defaultValue: "any"
.}: string
## Persistence config
dbPath* {.
desc: "The database path for peristent storage", defaultValue: "", name: "db-path"
.}: string
persistPeers* {.
desc: "Enable peer persistence: true|false",
defaultValue: false,
name: "persist-peers"
.}: bool
persistMessages* {.
desc: "Enable message persistence: true|false",
defaultValue: false,
name: "persist-messages"
.}: bool
## Relay config
relay* {.
desc: "Enable relay protocol: true|false", defaultValue: true, name: "relay"
.}: bool
staticnodes* {.
desc: "Peer multiaddr to directly connect with. Argument may be repeated.",
name: "staticnode"
.}: seq[string]
mixnodes* {.
desc: "Peer ENR to add as a mixnode. Argument may be repeated.", name: "mixnode"
.}: seq[string]
keepAlive* {.
desc: "Enable keep-alive for idle connections: true|false",
defaultValue: false,
name: "keep-alive"
.}: bool
clusterId* {.
desc:
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 2,
name: "cluster-id"
.}: uint16
numShardsInNetwork* {.
desc: "Number of shards in the network",
defaultValue: 1,
name: "num-shards-in-network"
.}: uint32
shards* {.
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
defaultValue: @[uint16(0)],
name: "shard"
.}: seq[uint16]
## Store config
store* {.
desc: "Enable store protocol: true|false", defaultValue: false, name: "store"
.}: bool
storenode* {.
desc: "Peer multiaddr to query for storage.", defaultValue: "", name: "storenode"
.}: string
## Filter config
filter* {.
desc: "Enable filter protocol: true|false", defaultValue: false, name: "filter"
.}: bool
## Lightpush config
lightpush* {.
desc: "Enable lightpush protocol: true|false",
defaultValue: false,
name: "lightpush"
.}: bool
servicenode* {.
desc: "Peer multiaddr to request lightpush and filter services",
defaultValue: "",
name: "servicenode"
.}: string
## Metrics config
metricsServer* {.
desc: "Enable the metrics server: true|false",
defaultValue: false,
name: "metrics-server"
.}: bool
metricsServerAddress* {.
desc: "Listening address of the metrics server.",
defaultValue: parseIpAddress("127.0.0.1"),
name: "metrics-server-address"
.}: IpAddress
metricsServerPort* {.
desc: "Listening HTTP port of the metrics server.",
defaultValue: 8008,
name: "metrics-server-port"
.}: uint16
metricsLogging* {.
desc: "Enable metrics logging: true|false",
defaultValue: true,
name: "metrics-logging"
.}: bool
## DNS discovery config
dnsDiscovery* {.
desc:
"Deprecated, please set dns-discovery-url instead. Enable discovering nodes via DNS",
defaultValue: false,
name: "dns-discovery"
.}: bool
dnsDiscoveryUrl* {.
desc: "URL for DNS node list in format 'enrtree://<key>@<fqdn>'",
defaultValue: "",
name: "dns-discovery-url"
.}: string
dnsDiscoveryNameServers* {.
desc: "DNS name server IPs to query. Argument may be repeated.",
defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
name: "dns-discovery-name-server"
.}: seq[IpAddress]
## Chat2 configuration
fleet* {.
desc:
"Select the fleet to connect to. This sets the DNS discovery URL to the selected fleet.",
defaultValue: Fleet.none,
name: "fleet"
.}: Fleet
contentTopic* {.
desc: "Content topic for chat messages.",
defaultValue: "/toy-chat-mix/2/huilong/proto",
name: "content-topic"
.}: string
## Websocket Configuration
websocketSupport* {.
desc: "Enable websocket: true|false",
defaultValue: false,
name: "websocket-support"
.}: bool
websocketPort* {.
desc: "WebSocket listening port.", defaultValue: 8000, name: "websocket-port"
.}: Port
websocketSecureSupport* {.
desc: "WebSocket Secure Support.",
defaultValue: false,
name: "websocket-secure-support"
.}: bool ## rln-relay configuration
# NOTE: Keys are different in nim-libp2p
proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T =
try:
let key = SkPrivateKey.init(utils.fromHex(p)).tryGet()
# XXX: Here at the moment
result = crypto.PrivateKey(scheme: Secp256k1, skkey: key)
except CatchableError as e:
raise newException(ValueError, "Invalid private key")
proc completeCmdArg*(T: type crypto.PrivateKey, val: string): seq[string] =
return @[]
proc parseCmdArg*(T: type IpAddress, p: string): T =
try:
result = parseIpAddress(p)
except CatchableError as e:
raise newException(ValueError, "Invalid IP address")
proc completeCmdArg*(T: type IpAddress, val: string): seq[string] =
return @[]
proc parseCmdArg*(T: type Port, p: string): T =
try:
result = Port(parseInt(p))
except CatchableError as e:
raise newException(ValueError, "Invalid Port number")
proc completeCmdArg*(T: type Port, val: string): seq[string] =
return @[]
proc parseCmdArg*(T: type Option[uint], p: string): T =
try:
some(parseUint(p))
except CatchableError:
raise newException(ValueError, "Invalid unsigned integer")
proc completeCmdArg*(T: type EthRpcUrl, val: string): seq[string] =
return @[]
proc parseCmdArg*(T: type EthRpcUrl, s: string): T =
## allowed patterns:
## http://url:port
## https://url:port
## http://url:port/path
## https://url:port/path
## http://url/with/path
## http://url:port/path?query
## https://url:port/path?query
## disallowed patterns:
## any valid/invalid ws or wss url
var httpPattern =
re2"^(https?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*"
var wsPattern =
re2"^(wss?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*"
if regex.match(s, wsPattern):
raise newException(
ValueError, "Websocket RPC URL is not supported, Please use an HTTP URL"
)
if not regex.match(s, httpPattern):
raise newException(ValueError, "Invalid HTTP RPC URL")
return EthRpcUrl(s)
func defaultListenAddress*(conf: Chat2Conf): IpAddress =
# TODO: How should we select between IPv4 and IPv6
# Maybe there should be a config option for this.
(static parseIpAddress("0.0.0.0"))

4
apps/chat2mix/nim.cfg Normal file
View File

@ -0,0 +1,4 @@
-d:chronicles_line_numbers
-d:chronicles_runtime_filtering:on
-d:discv5_protocol_id:d5waku
path = "../.."

View File

@ -0,0 +1,196 @@
import
std/[tables, times, sequtils, strutils],
stew/byteutils,
chronicles,
results,
chronos,
confutils,
libp2p/crypto/crypto,
libp2p/crypto/curve25519,
libp2p/multiaddress,
eth/keys,
eth/p2p/discoveryv5/enr,
metrics,
metrics/chronos_httpserver
import mix, mix/mix_protocol, mix/curve25519
import
waku/[
common/logging,
node/peer_manager,
waku_core,
waku_core/codecs,
waku_node,
waku_enr,
discovery/waku_discv5,
factory/builder,
waku_lightpush/client,
],
./lightpush_publisher_mix_config,
./lightpush_publisher_mix_metrics
const clusterId = 66
const shardId = @[0'u16]
const
LightpushPubsubTopic = PubsubTopic("/waku/2/rs/66/0")
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-mix-example/proto")
proc splitPeerIdAndAddr(maddr: string): (string, string) =
let parts = maddr.split("/p2p/")
if parts.len != 2:
error "Invalid multiaddress format", parts = parts
return
let
address = parts[0]
peerId = parts[1]
return (address, peerId)
proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.} =
# use notice to filter all waku messaging
setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
notice "starting publisher", wakuPort = conf.port
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get()
ip = parseIpAddress("0.0.0.0")
flags = CapabilitiesBitfield.init(relay = true)
let relayShards = RelayShards.init(clusterId, shardId).valueOr:
error "Relay shards initialization failed", error = error
quit(QuitFailure)
var enrBuilder = EnrBuilder.init(nodeKey)
enrBuilder.withWakuRelaySharding(relayShards).expect(
"Building ENR with relay sharding failed"
)
let record = enrBuilder.build().valueOr:
error "failed to create enr record", error = error
quit(QuitFailure)
setLogLevel(logging.LogLevel.TRACE)
var builder = WakuNodeBuilder.init()
builder.withNodeKey(nodeKey)
builder.withRecord(record)
builder.withNetworkConfigurationDetails(ip, Port(conf.port)).tryGet()
let node = builder.build().tryGet()
node.mountMetadata(clusterId, shardId).expect(
"failed to mount waku metadata protocol"
)
node.mountLightPushClient()
try:
await node.mountPeerExchange(some(uint16(clusterId)))
except CatchableError:
error "failed to mount waku peer-exchange protocol",
error = getCurrentExceptionMsg()
return
let (destPeerAddr, destPeerId) = splitPeerIdAndAddr(conf.destPeerAddr)
let (pxPeerAddr, pxPeerId) = splitPeerIdAndAddr(conf.pxAddr)
info "dest peer address", destPeerAddr = destPeerAddr, destPeerId = destPeerId
info "peer exchange address", pxPeerAddr = pxPeerAddr, pxPeerId = pxPeerId
let pxPeerInfo =
RemotePeerInfo.init(destPeerId, @[MultiAddress.init(destPeerAddr).get()])
node.peerManager.addServicePeer(pxPeerInfo, WakuPeerExchangeCodec)
let pxPeerInfo1 =
RemotePeerInfo.init(pxPeerId, @[MultiAddress.init(pxPeerAddr).get()])
node.peerManager.addServicePeer(pxPeerInfo1, WakuPeerExchangeCodec)
if not conf.mixDisabled:
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
error "failed to generate mix key pair", error = error
return
(await node.mountMix(clusterId, mixPrivKey)).isOkOr:
error "failed to mount waku mix protocol: ", error = $error
return
let dPeerId = PeerId.init(destPeerId).valueOr:
error "Failed to initialize PeerId", error = error
return
var conn: Connection
if not conf.mixDisabled:
conn = node.wakuMix.toConnection(
MixDestination.init(dPeerId, pxPeerInfo.addrs[0]), # destination lightpush peer
WakuLightPushCodec, # protocol codec which will be used over the mix connection
Opt.some(MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1)))),
# mix parameters indicating we expect a single reply
).valueOr:
error "failed to create mix connection", error = error
return
await node.start()
node.peerManager.start()
node.startPeerExchangeLoop()
try:
startMetricsHttpServer("0.0.0.0", Port(8008))
except Exception:
error "failed to start metrics server: ", error = getCurrentExceptionMsg()
(await node.fetchPeerExchangePeers()).isOkOr:
warn "Cannot fetch peers from peer exchange", cause = error
if not conf.mixDisabled:
while node.getMixNodePoolSize() < conf.minMixPoolSize:
info "waiting for mix nodes to be discovered",
currentpoolSize = node.getMixNodePoolSize()
await sleepAsync(1000)
notice "publisher service started with mix node pool size ",
currentpoolSize = node.getMixNodePoolSize()
var i = 0
while i < conf.numMsgs:
if conf.mixDisabled:
let connOpt = await node.peerManager.dialPeer(dPeerId, WakuLightPushCodec)
if connOpt.isNone():
error "failed to dial peer with WakuLightPushCodec", target_peer_id = dPeerId
return
conn = connOpt.get()
i = i + 1
let text =
"""Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nullam venenatis magna ut tortor faucibus, in vestibulum nibh commodo. Aenean eget vestibulum augue. Nullam suscipit urna non nunc efficitur, at iaculis nisl consequat. Mauris quis ultrices elit. Suspendisse lobortis odio vitae laoreet facilisis. Cras ornare sem felis, at vulputate magna aliquam ac. Duis quis est ultricies, euismod nulla ac, interdum dui. Maecenas sit amet est vitae enim commodo gravida. Proin vitae elit nulla. Donec tempor dolor lectus, in faucibus velit elementum quis. Donec non mauris eu nibh faucibus cursus ut egestas dolor. Aliquam venenatis ligula id velit pulvinar malesuada. Vestibulum scelerisque, justo non porta gravida, nulla justo tempor purus, at sollicitudin erat erat vel libero.
Fusce nec eros eu metus tristique aliquet. Sed ut magna sagittis, vulputate diam sit amet, aliquam magna. Aenean sollicitudin velit lacus, eu ultrices magna semper at. Integer vitae felis ligula. In a eros nec risus condimentum tincidunt fermentum sit amet ex. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Nullam vitae justo maximus, fringilla tellus nec, rutrum purus. Etiam efficitur nisi dapibus euismod vestibulum. Phasellus at felis elementum, tristique nulla ac, consectetur neque.
Maecenas hendrerit nibh eget velit rutrum, in ornare mauris molestie. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Praesent dignissim efficitur eros, sit amet rutrum justo mattis a. Fusce mollis neque at erat placerat bibendum. Ut fringilla fringilla orci, ut fringilla metus fermentum vel. In hac habitasse platea dictumst. Donec hendrerit porttitor odio. Suspendisse ornare sollicitudin mauris, sodales pulvinar velit finibus vel. Fusce id pulvinar neque. Suspendisse eget tincidunt sapien, ac accumsan turpis.
Curabitur cursus tincidunt leo at aliquet. Nunc dapibus quam id venenatis varius. Aenean eget augue vel velit dapibus aliquam. Nulla facilisi. Curabitur cursus, turpis vel congue volutpat, tellus eros cursus lacus, eu fringilla turpis orci non ipsum. In hac habitasse platea dictumst. Nulla aliquam nisl a nunc placerat, eget dignissim felis pulvinar. Fusce sed porta mauris. Donec sodales arcu in nisl sodales, quis posuere massa ultricies. Nam feugiat massa eget felis ultricies finibus. Nunc magna nulla, interdum a elit vel, egestas efficitur urna. Ut posuere tincidunt odio in maximus. Sed at dignissim est.
Morbi accumsan elementum ligula ut fringilla. Praesent in ex metus. Phasellus urna est, tempus sit amet elementum vitae, sollicitudin vel ipsum. Fusce hendrerit eleifend dignissim. Maecenas tempor dapibus dui quis laoreet. Cras tincidunt sed ipsum sed pellentesque. Proin ut tellus nec ipsum varius interdum. Curabitur id velit ligula. Etiam sapien nulla, cursus sodales orci eu, porta lobortis nunc. Nunc at dapibus velit. Nulla et nunc vehicula, condimentum erat quis, elementum dolor. Quisque eu metus fermentum, vestibulum tellus at, sollicitudin odio. Ut vel neque justo.
Praesent porta porta velit, vel porttitor sem. Donec sagittis at nulla venenatis iaculis. Nullam vel eleifend felis. Nullam a pellentesque lectus. Aliquam tincidunt semper dui sed bibendum. Donec hendrerit, urna et cursus dictum, neque neque convallis magna, id condimentum sem urna quis massa. Fusce non quam vulputate, fermentum mauris at, malesuada ipsum. Mauris id pellentesque libero. Donec vel erat ullamcorper, dapibus quam id, imperdiet urna. Praesent sed ligula ut est pellentesque pharetra quis et diam. Ut placerat lorem eget mi fermentum aliquet.
This is message #""" &
$i & """ sent from a publisher using mix. End of transmission."""
let message = WakuMessage(
payload: toBytes(text), # content of the message
contentTopic: LightpushContentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: getNowInNanosecondTime(),
) # current timestamp
let res = await node.wakuLightpushClient.publishWithConn(
LightpushPubsubTopic, message, conn, dPeerId
)
if res.isOk():
lp_mix_success.inc()
notice "published message",
text = text,
timestamp = message.timestamp,
psTopic = LightpushPubsubTopic,
contentTopic = LightpushContentTopic
else:
error "failed to publish message", error = $res.error
lp_mix_failed.inc(labelValues = ["publish_error"])
if conf.mixDisabled:
await conn.close()
await sleepAsync(conf.msgIntervalMilliseconds)
info "###########Sent all messages via mix"
quit(0)
when isMainModule:
let conf = LightPushMixConf.load()
let rng = crypto.newRng()
asyncSpawn setupAndPublish(rng, conf)
runForever()

View File

@ -0,0 +1,28 @@
import confutils/defs
type LightPushMixConf* = object
destPeerAddr* {.desc: "Destination peer address with peerId.", name: "dp-addr".}:
string
pxAddr* {.desc: "Peer exchange address with peerId.", name: "px-addr".}: string
port* {.desc: "Port to listen on.", defaultValue: 50000, name: "port".}: int
numMsgs* {.desc: "Number of messages to send.", defaultValue: 1, name: "num-msgs".}:
int
msgIntervalMilliseconds* {.
desc: "Interval between messages in milliseconds.",
defaultValue: 1000,
name: "msg-interval"
.}: int
minMixPoolSize* {.
desc: "Number of mix nodes to be discovered before sending lightpush messages.",
defaultValue: 3,
name: "min-mix-pool-size"
.}: int
mixDisabled* {.
desc: "Do not use mix for publishing.", defaultValue: false, name: "without-mix"
.}: bool

View File

@ -0,0 +1,8 @@
{.push raises: [].}
import metrics
declarePublicCounter lp_mix_success, "number of lightpush messages sent via mix"
declarePublicCounter lp_mix_failed,
"number of lightpush messages failed via mix", labels = ["error"]

4
simulations/README.md Normal file
View File

@ -0,0 +1,4 @@
# Purpose
This is a place where any simulation related scripts and utilities can be stored.
Checkout mixnet folder to get an idea.

View File

@ -0,0 +1,70 @@
# Mixnet simulation
## Aim
Simulate a local mixnet along with a chat app to publish using mix.
This is helpful to test any changes while development.
It includes scripts that run a `4 node` mixnet along with a lightpush service node(without mix) that can be used to test quickly.
## Simulation Details
Note that before running the simulation both `wakunode2` and `chat2mix` have to be built.
```bash
cd <repo-root-dir>
make wakunode2
make chat2mix
```
Simulation includes scripts for:
1. a 4 waku-node mixnet where `node1` is bootstrap node for the other 3 nodes.
2. scripts to run chat app that publishes using lightpush protocol over the mixnet
## Usage
Start the service node with below command, which acts as bootstrap node for all other mix nodes.
`./run_lp_service_node.sh`
To run the nodes for mixnet run the 4 node scripts in different terminals as below.
`./run_mix_node1.sh`
Look for following 2 log lines to ensure node ran successfully and has also mounted mix protocol.
```log
INF 2025-08-01 14:51:05.445+05:30 mounting mix protocol topics="waku node" tid=39996871 file=waku_node.nim:231 nodeId="(listenAddresses: @[\"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o\"], enrUri: \"enr:-NC4QKYtas8STkenlqBTJ3a1TTLzJA2DsGGbFlnxem9aSM2IXm-CSVZULdk2467bAyFnepnt8KP_QlfDzdaMXd_zqtwBgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzgIJyc4UAAgEAAIlzZWNwMjU2azGhA6RFtVJVBh0SYOoP8xrgnXSlpiFARmQkF9d8Rn4fSeiog3RjcILqYYN1ZHCCIymFd2FrdTIt\")"
INF 2025-08-01 14:49:23.467+05:30 Node setup complete topics="wakunode main" tid=39994244 file=wakunode2.nim:104
```
Once all the 4 nodes are up without any issues, run the script to start the chat application.
`./run_chat_app.sh`
Enter a nickname to be used.
```bash
pubsub topic is: /waku/2/rs/2/0
Choose a nickname >>
```
Once you see below log, it means the app is ready for publishing messages over the mixnet.
```bash
Welcome, test!
Listening on
/ip4/192.168.68.64/tcp/60000/p2p/16Uiu2HAkxDGqix1ifY3wF1ZzojQWRAQEdKP75wn1LJMfoHhfHz57
ready to publish messages now
```
Follow similar instructions to run second instance of chat app.
Once both the apps run successfully, send a message and check if it is received by the other app.
You can exit the chat apps by entering `/exit` as below
```bash
>> /exit
quitting...
```

View File

@ -0,0 +1,25 @@
log-level = "INFO"
relay = true
#mix = true
filter = true
store = false
lightpush = true
max-connections = 150
peer-exchange = true
metrics-logging = false
cluster-id = 2
discv5-discovery = true
discv5-udp-port = 9000
discv5-enr-auto-update = true
rest = true
rest-admin = true
ports-shift = 1
num-shards-in-network = 1
shard = [0]
agent-string = "nwaku-mix"
nodekey = "f98e3fba96c32e8d1967d460f1b79457380e1a895f7971cecc8528abe733781a"
#mixkey = "a87db88246ec0eedda347b9b643864bee3d6933eb15ba41e6d58cb678d813258"
rendezvous = true
listen-address = "127.0.0.1"
nat = "extip:127.0.0.1"
ip-colocation-limit=0

View File

@ -0,0 +1,27 @@
log-level = "INFO"
relay = true
mix = true
filter = true
store = false
lightpush = true
max-connections = 150
peer-exchange = true
metrics-logging = false
cluster-id = 2
discv5-discovery = true
discv5-udp-port = 9001
discv5-enr-auto-update = true
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
rest = true
rest-admin = true
ports-shift = 2
num-shards-in-network = 1
shard = [0]
agent-string = "nwaku-mix"
nodekey = "09e9d134331953357bd38bbfce8edb377f4b6308b4f3bfbe85c610497053d684"
mixkey = "c86029e02c05a7e25182974b519d0d52fcbafeca6fe191fbb64857fb05be1a53"
rendezvous = true
listen-address = "127.0.0.1"
nat = "extip:127.0.0.1"
ip-colocation-limit=0
#staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"]

View File

@ -0,0 +1,27 @@
log-level = "INFO"
relay = true
mix = true
filter = true
store = false
lightpush = true
max-connections = 150
peer-exchange = true
metrics-logging = false
cluster-id = 2
discv5-discovery = true
discv5-udp-port = 9002
discv5-enr-auto-update = true
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
rest = false
rest-admin = false
ports-shift = 3
num-shards-in-network = 1
shard = [0]
agent-string = "nwaku-mix"
nodekey = "ed54db994682e857d77cd6fb81be697382dc43aa5cd78e16b0ec8098549f860e"
mixkey = "b858ac16bbb551c4b2973313b1c8c8f7ea469fca03f1608d200bbf58d388ec7f"
rendezvous = true
listen-address = "127.0.0.1"
nat = "extip:127.0.0.1"
ip-colocation-limit=0
#staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"]

View File

@ -0,0 +1,27 @@
log-level = "INFO"
relay = true
mix = true
filter = true
store = false
lightpush = true
max-connections = 150
peer-exchange = true
metrics-logging = false
cluster-id = 2
discv5-discovery = true
discv5-udp-port = 9003
discv5-enr-auto-update = true
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
rest = false
rest-admin = false
ports-shift = 4
num-shards-in-network = 1
shard = [0]
agent-string = "nwaku-mix"
nodekey = "42f96f29f2d6670938b0864aced65a332dcf5774103b4c44ec4d0ea4ef3c47d6"
mixkey = "d8bd379bb394b0f22dd236d63af9f1a9bc45266beffc3fbbe19e8b6575f2535b"
rendezvous = true
listen-address = "127.0.0.1"
nat = "extip:127.0.0.1"
ip-colocation-limit=0
#staticnode = ["/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"]

View File

@ -0,0 +1,27 @@
log-level = "INFO"
relay = true
mix = true
filter = true
store = false
lightpush = true
max-connections = 150
peer-exchange = true
metrics-logging = false
cluster-id = 2
discv5-discovery = true
discv5-udp-port = 9004
discv5-enr-auto-update = true
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
rest = false
rest-admin = false
ports-shift = 5
num-shards-in-network = 1
shard = [0]
agent-string = "nwaku-mix"
#nodekey = "3ce887b3c34b7a92dd2868af33941ed1dbec4893b054572cd5078da09dd923d4"
mixkey = "780fff09e51e98df574e266bf3266ec6a3a1ddfcf7da826a349a29c137009d49"
rendezvous = true
listen-address = "127.0.0.1"
nat = "extip:127.0.0.1"
ip-colocation-limit=0
#staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF"]

View File

@ -0,0 +1 @@
../../build/chat2mix --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --mixnode="enr:-Nq4QIPd6TbOWns1TsbSq2KB6g3hIClJa8qBUWFFwbGut9OBCwTHYshi0-iv1ilTMx4FkuSJ4NtkZVx0QSrrMRTGpEsDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCSMehtpkMlApAKhPhnAEznhjKrUs2OMLHsMizXlXEMKoptdWx0aWFkZHJzigAIBMCoRD4G6mKCcnOFAAIBAACJc2VjcDI1NmsxoQN6R8gw1Pu8IwMlTap0_E7vVd1wcaFgg_VUaaeVWSZYVIN0Y3CC6mKDdWRwgiMrhXdha3UyLQ" --mixnode="enr:-Nq4QC6XyKXZSlJNFzTDPI118SBC2ilLqE05RR4o4OzEZxueGkYtExHtTBvmY-9pl17EXZtXvF_tIV_2g0K_fb2LmsoDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaAnXNaInh8pykjlue24ANGpT0nxPTk6Ds8aB691NQbebIptdWx0aWFkZHJzigAIBMCoRD4G6mOCcnOFAAIBAACJc2VjcDI1NmsxoQPYhmrbTqylbdenVfvO2U0w6EC4A-l5lwvu3QWL7IqkO4N0Y3CC6mODdWRwgiMthXdha3UyLQ" --mixnode="enr:-Nq4QKoh8Ta8Q3zLLAkf4hyYzxpuTc-BRBGb_WYVIm6hRptKZFuIo3DNlWCpfIxJnNI5epjLWQWHFUo3dqpAoWhoXEUDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaDg7VlKjVBmgb4HXo4jcjR4OI-xgkd_ekaTCaJecHb8GIptdWx0aWFkZHJzigAIBMCoRD4G6mSCcnOFAAIBAACJc2VjcDI1NmsxoQOnphVC3U5zmOCkjOI2tY0v8K5QkXSaE5xO37q3iFfKGIN0Y3CC6mSDdWRwgiMvhXdha3UyLQ" --mixnode="enr:-Nq4QN7ub3xi53eDyKKstEM2IjFo7oY5Kf4glFz45W2saWqNXPqJFruw08c9B_EIu1LoW4opwXId_4zvPmekZwYHKp8DgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCP16GnwZtAPSMUUqmx6kDrHMdvRV2RjviYDnaF-e7rH4ptdWx0aWFkZHJzigAIBMCoRD4G6mWCcnOFAAIBAACJc2VjcDI1NmsxoQLJtl9kA98YgBkVElkJgl9XyyRNco78oShb1hsv6Mlbs4N0Y3CC6mWDdWRwgiMxhXdha3UyLQ"

View File

@ -0,0 +1 @@
../../build/chat2mix --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --mixnode="enr:-Nq4QIPd6TbOWns1TsbSq2KB6g3hIClJa8qBUWFFwbGut9OBCwTHYshi0-iv1ilTMx4FkuSJ4NtkZVx0QSrrMRTGpEsDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCSMehtpkMlApAKhPhnAEznhjKrUs2OMLHsMizXlXEMKoptdWx0aWFkZHJzigAIBMCoRD4G6mKCcnOFAAIBAACJc2VjcDI1NmsxoQN6R8gw1Pu8IwMlTap0_E7vVd1wcaFgg_VUaaeVWSZYVIN0Y3CC6mKDdWRwgiMrhXdha3UyLQ" --mixnode="enr:-Nq4QC6XyKXZSlJNFzTDPI118SBC2ilLqE05RR4o4OzEZxueGkYtExHtTBvmY-9pl17EXZtXvF_tIV_2g0K_fb2LmsoDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaAnXNaInh8pykjlue24ANGpT0nxPTk6Ds8aB691NQbebIptdWx0aWFkZHJzigAIBMCoRD4G6mOCcnOFAAIBAACJc2VjcDI1NmsxoQPYhmrbTqylbdenVfvO2U0w6EC4A-l5lwvu3QWL7IqkO4N0Y3CC6mODdWRwgiMthXdha3UyLQ" --mixnode="enr:-Nq4QKoh8Ta8Q3zLLAkf4hyYzxpuTc-BRBGb_WYVIm6hRptKZFuIo3DNlWCpfIxJnNI5epjLWQWHFUo3dqpAoWhoXEUDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaDg7VlKjVBmgb4HXo4jcjR4OI-xgkd_ekaTCaJecHb8GIptdWx0aWFkZHJzigAIBMCoRD4G6mSCcnOFAAIBAACJc2VjcDI1NmsxoQOnphVC3U5zmOCkjOI2tY0v8K5QkXSaE5xO37q3iFfKGIN0Y3CC6mSDdWRwgiMvhXdha3UyLQ" --mixnode="enr:-Nq4QN7ub3xi53eDyKKstEM2IjFo7oY5Kf4glFz45W2saWqNXPqJFruw08c9B_EIu1LoW4opwXId_4zvPmekZwYHKp8DgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCP16GnwZtAPSMUUqmx6kDrHMdvRV2RjviYDnaF-e7rH4ptdWx0aWFkZHJzigAIBMCoRD4G6mWCcnOFAAIBAACJc2VjcDI1NmsxoQLJtl9kA98YgBkVElkJgl9XyyRNco78oShb1hsv6Mlbs4N0Y3CC6mWDdWRwgiMxhXdha3UyLQ"

View File

@ -0,0 +1 @@
../../build/wakunode2 --config-file="config.toml"

View File

@ -0,0 +1 @@
../../build/wakunode2 --config-file="config1.toml"

View File

@ -0,0 +1 @@
../../build/wakunode2 --config-file="config2.toml"

View File

@ -0,0 +1 @@
../../build/wakunode2 --config-file="config3.toml"

View File

@ -0,0 +1 @@
../../build/wakunode2 --config-file="config4.toml"

1
vendor/mix vendored Submodule

@ -0,0 +1 @@
Subproject commit e45cd05bfdb775a4cb2c9443077a15b9da13c037

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 5eaa43b8608221a615e442587f27520a49a56460
Subproject commit c3faabf5220d37822d0e7a9e40fe69515b368cb3

View File

@ -148,7 +148,19 @@ task chat2, "Build example Waku chat usage":
#buildBinary name, "examples/", "-d:chronicles_log_level=WARN"
let name = "chat2"
buildBinary name, "apps/chat2/", "-d:chronicles_sinks=textlines[file] -d:ssl"
buildBinary name,
"apps/chat2/",
"-d:chronicles_sinks=textlines[file] -d:ssl -d:chronicles_log_level='TRACE' "
task chat2mix, "Build example Waku chat mix usage":
# NOTE For debugging, set debug level. For chat usage we want minimal log
# output to STDOUT. Can be fixed by redirecting logs to file (e.g.)
#buildBinary name, "examples/", "-d:chronicles_log_level=WARN"
let name = "chat2mix"
buildBinary name,
"apps/chat2mix/",
"-d:chronicles_sinks=textlines[file] -d:ssl -d:chronicles_log_level='TRACE' "
task chat2bridge, "Build chat2bridge":
let name = "chat2bridge"
@ -158,6 +170,10 @@ task liteprotocoltester, "Build liteprotocoltester":
let name = "liteprotocoltester"
buildBinary name, "apps/liteprotocoltester/"
task lightpushwithmix, "Build lightpushwithmix":
let name = "lightpush_publisher_mix"
buildBinary name, "examples/lightpush_mix/"
task buildone, "Build custom target":
let filepath = paramStr(paramCount())
discard buildModule filepath

View File

@ -4,7 +4,7 @@ import
bearssl/rand,
libp2p/protocols/connectivity/autonat/client,
libp2p/protocols/connectivity/autonat/service,
libp2p/protocols/connectivity/autonat/core
libp2p/protocols/connectivity/autonat/types
const AutonatCheckInterval = Opt.some(chronos.seconds(30))

View File

@ -9,10 +9,11 @@ import
./web_socket_conf_builder,
./metrics_server_conf_builder,
./rate_limit_conf_builder,
./rln_relay_conf_builder
./rln_relay_conf_builder,
./mix_conf_builder
export
waku_conf_builder, filter_service_conf_builder, store_sync_conf_builder,
store_service_conf_builder, rest_server_conf_builder, dns_discovery_conf_builder,
discv5_conf_builder, web_socket_conf_builder, metrics_server_conf_builder,
rate_limit_conf_builder, rln_relay_conf_builder
rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder

View File

@ -0,0 +1,35 @@
import chronicles, std/options, results
import libp2p/crypto/crypto, libp2p/crypto/curve25519, mix/curve25519
import ../waku_conf
logScope:
topics = "waku conf builder mix"
##################################
## Mix Config Builder ##
##################################
type MixConfBuilder* = object
enabled: Option[bool]
mixKey: Option[string]
proc init*(T: type MixConfBuilder): MixConfBuilder =
MixConfBuilder()
proc withEnabled*(b: var MixConfBuilder, enabled: bool) =
b.enabled = some(enabled)
proc withMixKey*(b: var MixConfBuilder, mixKey: string) =
b.mixKey = some(mixKey)
proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
if not b.enabled.get(false):
return ok(none[MixConf]())
else:
if b.mixKey.isSome():
let mixPrivKey = intoCurve25519Key(ncrutils.fromHex(b.mixKey.get()))
let mixPubKey = public(mixPrivKey)
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))
else:
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
return err("Generate key pair error: " & $error)
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))

View File

@ -24,7 +24,8 @@ import
./web_socket_conf_builder,
./metrics_server_conf_builder,
./rate_limit_conf_builder,
./rln_relay_conf_builder
./rln_relay_conf_builder,
./mix_conf_builder
logScope:
topics = "waku conf builder"
@ -74,6 +75,7 @@ type WakuConfBuilder* = object
restServerConf*: RestServerConfBuilder
rlnRelayConf*: RlnRelayConfBuilder
storeServiceConf*: StoreServiceConfBuilder
mixConf*: MixConfBuilder
webSocketConf*: WebSocketConfBuilder
rateLimitConf*: RateLimitConfBuilder
# End conf builders
@ -82,6 +84,7 @@ type WakuConfBuilder* = object
peerExchange: Option[bool]
storeSync: Option[bool]
relayPeerExchange: Option[bool]
mix: Option[bool]
# TODO: move within a relayConf
rendezvous: Option[bool]
@ -181,6 +184,9 @@ proc withRelayPeerExchange*(b: var WakuConfBuilder, relayPeerExchange: bool) =
proc withRendezvous*(b: var WakuConfBuilder, rendezvous: bool) =
b.rendezvous = some(rendezvous)
proc withMix*(builder: var WakuConfBuilder, mix: bool) =
builder.mix = some(mix)
proc withRemoteStoreNode*(b: var WakuConfBuilder, remoteStoreNode: string) =
b.remoteStoreNode = some(remoteStoreNode)
@ -430,6 +436,13 @@ proc build*(
warn "whether to mount rendezvous is not specified, defaulting to not mounting"
false
let mix =
if builder.mix.isSome():
builder.mix.get()
else:
warn "whether to mount mix is not specified, defaulting to not mounting"
false
let relayPeerExchange = builder.relayPeerExchange.get(false)
let nodeKey = ?nodeKey(builder, rng)
@ -485,6 +498,9 @@ proc build*(
let storeServiceConf = builder.storeServiceConf.build().valueOr:
return err("Store Conf building failed: " & $error)
let mixConf = builder.mixConf.build().valueOr:
return err("Mix Conf building failed: " & $error)
let webSocketConf = builder.webSocketConf.build().valueOr:
return err("WebSocket Conf building failed: " & $error)
@ -595,6 +611,7 @@ proc build*(
store = storeServiceConf.isSome,
relay = relay,
sync = storeServiceConf.isSome() and storeServiceConf.get().storeSyncConf.isSome,
mix = mix,
)
let wakuConf = WakuConf(
@ -606,6 +623,7 @@ proc build*(
metricsServerConf: metricsServerConf,
restServerConf: restServerConf,
dnsDiscoveryConf: dnsDiscoveryConf,
mixConf: mixConf,
# end confs
nodeKey: nodeKey,
clusterId: clusterId,

View File

@ -621,6 +621,16 @@ with the drawback of consuming some more bandwidth.""",
name: "rendezvous"
.}: bool
#Mix config
mix* {.desc: "Enable mix protocol: true|false", defaultValue: false, name: "mix".}:
bool
mixkey* {.
desc:
"ED25519 private key as 64 char hex string , without 0x. If not provided, a random key will be generated.",
name: "mixkey"
.}: Option[string]
## websocket config
websocketSupport* {.
desc: "Enable websocket: true|false",
@ -982,6 +992,11 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
b.storeServiceConf.storeSyncConf.withRangeSec(n.storeSyncRange)
b.storeServiceConf.storeSyncConf.withRelayJitterSec(n.storeSyncRelayJitter)
b.mixConf.withEnabled(n.mix)
b.withMix(n.mix)
if n.mixkey.isSome():
b.mixConf.withMixKey(n.mixkey.get())
b.filterServiceConf.withEnabled(n.filter)
b.filterServiceConf.withSubscriptionTimeout(n.filterSubscriptionTimeout)
b.filterServiceConf.withMaxPeersToServe(n.filterMaxPeersToServe)

View File

@ -2,10 +2,12 @@ import
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/crypto/curve25519,
libp2p/multiaddress,
libp2p/nameresolving/dnsresolver,
std/[options, sequtils, net],
std/[options, sequtils, net, strutils],
results
import ../common/utils/nat, ../node/net_config, ../waku_enr, ../waku_core, ./waku_conf
proc enrConfiguration*(
@ -27,6 +29,9 @@ proc enrConfiguration*(
).isOkOr:
return err("could not initialize ENR with shards")
if conf.mixConf.isSome():
enrBuilder.withMixKey(conf.mixConf.get().mixPubKey)
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():

View File

@ -431,6 +431,11 @@ proc setupProtocols(
if conf.peerExchangeDiscovery:
await node.mountPeerExchangeClient()
#mount mix
if conf.mixConf.isSome():
(await node.mountMix(conf.clusterId, conf.mixConf.get().mixKey)).isOkOr:
return err("failed to mount waku mix protocol: " & $error)
return ok()
## Start node

View File

@ -3,6 +3,7 @@ import
chronicles,
libp2p/crypto/crypto,
libp2p/multiaddress,
libp2p/crypto/curve25519,
secp256k1,
results
@ -44,6 +45,10 @@ type StoreSyncConf* {.requiresInit.} = object
intervalSec*: uint32
relayJitterSec*: uint32
type MixConf* = ref object
mixKey*: Curve25519Key
mixPubKey*: Curve25519Key
type StoreServiceConf* {.requiresInit.} = object
dbMigration*: bool
dbURl*: string
@ -101,6 +106,7 @@ type WakuConf* {.requiresInit.} = ref object
restServerConf*: Option[RestServerConf]
metricsServerConf*: Option[MetricsServerConf]
webSocketConf*: Option[WebSocketConf]
mixConf*: Option[MixConf]
portsShift*: uint16
dnsAddrsNameServers*: seq[IpAddress]

View File

@ -12,6 +12,8 @@ import
bearssl/rand,
eth/p2p/discoveryv5/enr,
libp2p/crypto/crypto,
libp2p/crypto/curve25519,
libp2p/[multiaddress, multicodec],
libp2p/protocols/ping,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/messages,
@ -19,7 +21,11 @@ import
libp2p/transports/transport,
libp2p/transports/tcptransport,
libp2p/transports/wstransport,
libp2p/utility
libp2p/utility,
mix,
mix/mix_node,
mix/mix_protocol
import
../waku_core,
../waku_core/topics/sharding,
@ -49,7 +55,10 @@ import
./net_config,
./peer_manager,
../common/rate_limit/setting,
../common/callbacks
../common/callbacks,
../common/nimchronos,
../waku_enr/mix,
../waku_mix
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicHistogram waku_histogram_message_size,
@ -123,6 +132,7 @@ type
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
rateLimitSettings*: ProtocolRateLimitSettings
wakuMix*: WakuMix
proc getShardsGetter(node: WakuNode): GetShards =
return proc(): seq[uint16] {.closure, gcsafe, raises: [].} =
@ -226,6 +236,33 @@ proc mountAutoSharding*(
some(Sharding(clusterId: clusterId, shardCountGenZero: shardCount))
return ok()
proc getMixNodePoolSize*(node: WakuNode): int =
return node.wakuMix.getNodePoolSize()
proc mountMix*(
node: WakuNode, clusterId: uint16, mixPrivKey: Curve25519Key
): Future[Result[void, string]] {.async.} =
info "mounting mix protocol", nodeId = node.info #TODO log the config used
if node.announcedAddresses.len == 0:
return err("Trying to mount mix without having announced addresses")
let localaddrStr = node.announcedAddresses[0].toString().valueOr:
return err("Failed to convert multiaddress to string.")
info "local addr", localaddr = localaddrStr
let nodeAddr = localaddrStr & "/p2p/" & $node.peerId
# TODO: Pass bootnodes from config,
node.wakuMix = WakuMix.new(nodeAddr, node.peerManager, clusterId, mixPrivKey).valueOr:
error "Waku Mix protocol initialization failed", err = error
return
node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1)))
let catchRes = catch:
node.switch.mount(node.wakuMix)
if catchRes.isErr():
return err(catchRes.error.msg)
return ok()
## Waku Sync
proc mountStoreSync*(
@ -1182,17 +1219,46 @@ proc lightpushPublishHandler(
pubsubTopic: PubsubTopic,
message: WakuMessage,
peer: RemotePeerInfo | PeerInfo,
mixify: bool = false,
): Future[lightpush_protocol.WakuLightPushResult] {.async.} =
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
if not node.wakuLightpushClient.isNil():
notice "publishing message with lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msgHash
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer)
msg_hash = msgHash,
mixify = mixify
if mixify: #indicates we want to use mix to send the message
#TODO: How to handle multiple addresses?
let conn = node.wakuMix.toConnection(
MixDestination.init(peer.peerId, peer.addrs[0]),
WakuLightPushCodec,
Opt.some(
MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1)))
# indicating we expect a single reply hence numSurbs = 1
),
).valueOr:
error "could not create mix connection"
return lighpushErrorResult(
LightPushErrorCode.SERVICE_NOT_AVAILABLE,
"Waku lightpush with mix not available",
)
return await node.wakuLightpushClient.publishWithConn(
pubsubTopic, message, conn, peer.peerId
)
else:
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer)
if not node.wakuLightPush.isNil():
if mixify:
error "mixify is not supported with self hosted lightpush"
return lighpushErrorResult(
LightPushErrorCode.SERVICE_NOT_AVAILABLE,
"Waku lightpush with mix not available",
)
notice "publishing message with self hosted lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
@ -1206,13 +1272,18 @@ proc lightpushPublish*(
pubsubTopic: Option[PubsubTopic],
message: WakuMessage,
peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo),
mixify: bool = false,
): Future[lightpush_protocol.WakuLightPushResult] {.async.} =
if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil():
error "failed to publish message as lightpush not available"
return lighpushErrorResult(
LightPushErrorCode.SERVICE_NOT_AVAILABLE, "Waku lightpush not available"
)
if mixify and node.wakuMix.isNil():
error "failed to publish message using mix as mix protocol is not mounted"
return lighpushErrorResult(
LightPushErrorCode.SERVICE_NOT_AVAILABLE, "Waku lightpush with mix not available"
)
let toPeer: RemotePeerInfo = peerOpt.valueOr:
if not node.wakuLightPush.isNil():
RemotePeerInfo.init(node.peerId())
@ -1242,7 +1313,7 @@ proc lightpushPublish*(
error "lightpush publish error", error = msg
return lighpushErrorResult(LightPushErrorCode.INTERNAL_SERVER_ERROR, msg)
return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer)
return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer, mixify)
## Waku RLN Relay
proc mountRlnRelay*(
@ -1520,6 +1591,9 @@ proc start*(node: WakuNode) {.async.} =
if not node.wakuRelay.isNil():
await node.startRelay()
if not node.wakuMix.isNil():
node.wakuMix.start()
if not node.wakuMetadata.isNil():
node.wakuMetadata.start()

View File

@ -47,15 +47,10 @@ proc withWssTransport*(
): SwitchBuilder {.raises: [Defect, IOError].} =
let key: TLSPrivateKey = getSecureKey(secureKeyPath)
let cert: TLSCertificate = getSecureCert(secureCertPath)
b.withTransport(
proc(upgr: Upgrade, privateKey: crypto.PrivateKey): Transport =
WsTransport.new(
upgr,
tlsPrivateKey = key,
tlsCertificate = cert,
autotls = nil, # required 5th param
tlsFlags = {TLSFlags.NoVerifyHost, TLSFlags.NoVerifyServerName},
)
b.withWsTransport(
tlsPrivateKey = key,
tlsCertificate = cert,
{TLSFlags.NoVerifyHost, TLSFlags.NoVerifyServerName}, # THIS IS INSECURE, NO?
)
proc newWakuSwitch*(

View File

@ -1,3 +1,8 @@
import ./common/enr, ./waku_enr/capabilities, ./waku_enr/multiaddr, ./waku_enr/sharding
import
./common/enr,
./waku_enr/capabilities,
./waku_enr/multiaddr,
./waku_enr/sharding,
./waku_enr/mix
export enr, capabilities, multiaddr, sharding
export enr, capabilities, multiaddr, sharding, mix

View File

@ -3,6 +3,7 @@
import
std/[options, bitops, sequtils, net, tables], results, eth/keys, libp2p/crypto/crypto
import ../common/enr, ../waku_core/codecs
import mix/mix_protocol
const CapabilitiesEnrField* = "waku2"
@ -20,6 +21,7 @@ type
Filter = 2
Lightpush = 3
Sync = 4
Mix = 5
const capabilityToCodec = {
Capabilities.Relay: WakuRelayCodec,
@ -27,10 +29,12 @@ const capabilityToCodec = {
Capabilities.Filter: WakuFilterSubscribeCodec,
Capabilities.Lightpush: WakuLightPushCodec,
Capabilities.Sync: WakuReconciliationCodec,
Capabilities.Mix: MixProtocolID,
}.toTable
func init*(
T: type CapabilitiesBitfield, lightpush, filter, store, relay, sync: bool = false
T: type CapabilitiesBitfield,
lightpush, filter, store, relay, sync, mix: bool = false,
): T =
## Creates an waku2 ENR flag bit field according to RFC 31 (https://rfc.vac.dev/spec/31/)
var bitfield: uint8
@ -44,6 +48,8 @@ func init*(
bitfield.setBit(3)
if sync:
bitfield.setBit(4)
if mix:
bitfield.setBit(5)
CapabilitiesBitfield(bitfield)
func init*(T: type CapabilitiesBitfield, caps: varargs[Capabilities]): T =

20
waku/waku_enr/mix.nim Normal file
View File

@ -0,0 +1,20 @@
{.push raises: [].}
import std/[options], results, libp2p/crypto/curve25519, nimcrypto/utils as ncrutils
import ../common/enr
const MixKeyEnrField* = "mix-key"
func withMixKey*(builder: var EnrBuilder, mixPubKey: Curve25519Key) =
builder.addFieldPair(MixKeyEnrField, getBytes(mixPubKey))
func mixKey*(record: Record): Option[seq[byte]] =
let recordRes = record.toTyped()
if recordRes.isErr():
return none(seq[byte])
let field = recordRes.value.tryGet(MixKeyEnrField, seq[byte])
if field.isNone():
return none(seq[byte])
return field

View File

@ -1,7 +1,7 @@
{.push raises: [].}
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
import libp2p/peerid
import libp2p/peerid, libp2p/stream/connection
import
../waku_core/peers,
../node/peer_manager,
@ -30,14 +30,18 @@ proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) =
wl.publishObservers.add(obs)
proc sendPushRequest(
wl: WakuLightPushClient, req: LightPushRequest, peer: PeerId | RemotePeerInfo
wl: WakuLightPushClient,
req: LightPushRequest,
peer: PeerId | RemotePeerInfo,
conn: Option[Connection] = none(Connection),
): Future[WakuLightPushResult] {.async.} =
let connection = (await wl.peerManager.dialPeer(peer, WakuLightPushCodec)).valueOr:
waku_lightpush_v3_errors.inc(labelValues = [dialFailure])
return lighpushErrorResult(
LightPushErrorCode.NO_PEERS_TO_RELAY,
dialFailure & ": " & $peer & " is not accessible",
)
let connection = conn.valueOr:
(await wl.peerManager.dialPeer(peer, WakuLightPushCodec)).valueOr:
waku_lightpush_v3_errors.inc(labelValues = [dialFailure])
return lighpushErrorResult(
LightPushErrorCode.NO_PEERS_TO_RELAY,
dialFailure & ": " & $peer & " is not accessible",
)
await connection.writeLP(req.encode().buffer)
@ -102,14 +106,18 @@ proc publishToAny*(
if message.timestamp == 0:
message.timestamp = getNowInNanosecondTime()
info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
# TODO: check if it is matches the situation - shall we distinguish client side missing peers from server side?
return lighpushErrorResult(
LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers"
)
info "publishToAny",
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
peer_id = peer.peerId,
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex,
sentTime = getNowInNanosecondTime()
let pushRequest = LightpushRequest(
requestId: generateRequestId(wl.rng),
pubSubTopic: some(pubSubTopic),
@ -121,3 +129,30 @@ proc publishToAny*(
obs.onMessagePublished(pubSubTopic, message)
return lightpushSuccessResult(publishedCount)
proc publishWithConn*(
wl: WakuLightPushClient,
pubSubTopic: PubsubTopic,
message: WakuMessage,
conn: Connection,
destPeer: PeerId,
): Future[WakuLightPushResult] {.async, gcsafe.} =
info "publishWithConn",
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
peer_id = destPeer,
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex,
sentTime = getNowInNanosecondTime()
let pushRequest = LightpushRequest(
requestId: generateRequestId(wl.rng),
pubSubTopic: some(pubSubTopic),
message: message,
)
#TODO: figure out how to not pass destPeer as this is just a hack
let publishedCount =
?await wl.sendPushRequest(pushRequest, destPeer, conn = some(conn))
for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
return lightpushSuccessResult(publishedCount)

3
waku/waku_mix.nim Normal file
View File

@ -0,0 +1,3 @@
import ./waku_mix/protocol
export protocol

166
waku/waku_mix/protocol.nim Normal file
View File

@ -0,0 +1,166 @@
{.push raises: [].}
import chronicles, std/[options, tables, sequtils], chronos, results, metrics
import
libp2p/crypto/curve25519,
mix/mix_protocol,
mix/mix_node,
mix/mix_metrics,
mix/tag_manager,
libp2p/[multiaddress, multicodec, peerid],
eth/common/keys
import
../node/peer_manager,
../waku_core,
../waku_enr/mix,
../waku_enr,
../node/peer_manager/waku_peer_store,
../common/nimchronos
logScope:
topics = "waku mix"
type
WakuMix* = ref object of MixProtocol
peerManager*: PeerManager
clusterId: uint16
nodePoolLoopHandle: Future[void]
WakuMixResult*[T] = Result[T, string]
proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
# Note that origin based(discv5) filtering is not done intentionally
# so that more mix nodes can be discovered.
if peer.enr.isNone():
trace "peer has no ENR", peer = $peer
return false
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
debug "peer has mismatching cluster", peer = $peer
return false
# Filter if mix is enabled
if not peer.enr.get().supportsCapability(Capabilities.Mix):
debug "peer doesn't support mix", peer = $peer
return false
return true
proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress =
if multiaddr.contains(multiCodec("p2p")).get():
return multiaddr
var maddrStr = multiaddr.toString().valueOr:
error "Failed to convert multiaddress to string.", err = error
return multiaddr
maddrStr.add("/p2p/" & $peerId)
var cleanAddr = MultiAddress.init(maddrStr).valueOr:
error "Failed to convert string to multiaddress.", err = error
return multiaddr
return cleanAddr
proc populateMixNodePool*(mix: WakuMix) =
# populate only peers that i) are reachable ii) share cluster iii) support mix
let remotePeers = mix.peerManager.switch.peerStore.getReachablePeers().filterIt(
mixPoolFilter(some(mix.clusterId), it)
)
var mixNodes = initTable[PeerId, MixPubInfo]()
for i in 0 ..< min(remotePeers.len, 100):
let remotePeerENR = remotePeers[i].enr.get()
# TODO: use the most exposed/external multiaddr of the peer, right now using the first
let maddrWithPeerId =
toString(appendPeerIdToMultiaddr(remotePeers[i].addrs[0], remotePeers[i].peerId))
trace "remote peer ENR",
peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId
let peerMixPubKey = mixKey(remotePeerENR).get()
let mixNodePubInfo =
createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey))
mixNodes[remotePeers[i].peerId] = mixNodePubInfo
mix_pool_size.set(len(mixNodes))
# set the mix node pool
mix.setNodePool(mixNodes)
trace "mix node pool updated", poolSize = mix.getNodePoolSize()
proc startMixNodePoolMgr*(mix: WakuMix) {.async.} =
info "starting mix node pool manager"
# try more aggressively to populate the pool at startup
var attempts = 50
# TODO: make initial pool size configurable
while mix.getNodePoolSize() < 100 and attempts > 0:
attempts -= 1
mix.populateMixNodePool()
await sleepAsync(1.seconds)
# TODO: make interval configurable
heartbeat "Updating mix node pool", 5.seconds:
mix.populateMixNodePool()
#[ proc getBootStrapMixNodes*(node: WakuNode): Table[PeerId, MixPubInfo] =
var mixNodes = initTable[PeerId, MixPubInfo]()
# MixNode Multiaddrs and PublicKeys:
let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
"/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF",
"/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA",
"/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f",
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu",
]
let bootNodesMixPubKeys = ["9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c",
"9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a",
"275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c",
"e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18",
"8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
]
for index, mixNodeMultiaddr in bootNodesMultiaddrs:
let peerIdRes = getPeerIdFromMultiAddr(mixNodeMultiaddr)
if peerIdRes.isErr:
error "Failed to get peer id from multiaddress: " , error = peerIdRes.error
let peerId = peerIdRes.get()
#if (not peerID == nil) and peerID == exceptPeerID:
# continue
let mixNodePubInfo = createMixPubInfo(mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index])))
mixNodes[peerId] = mixNodePubInfo
info "using mix bootstrap nodes ", bootNodes = mixNodes
return mixNodes
]#
proc new*(
T: type WakuMix,
nodeAddr: string,
peermgr: PeerManager,
clusterId: uint16,
mixPrivKey: Curve25519Key,
): WakuMixResult[T] =
let mixPubKey = public(mixPrivKey)
info "mixPrivKey", mixPrivKey = mixPrivKey, mixPubKey = mixPubKey
let localMixNodeInfo = initMixNodeInfo(
nodeAddr, mixPubKey, mixPrivKey, peermgr.switch.peerInfo.publicKey.skkey,
peermgr.switch.peerInfo.privateKey.skkey,
)
# TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered
var m = WakuMix(peerManager: peermgr, clusterId: clusterId)
procCall MixProtocol(m).init(
localMixNodeInfo, initTable[PeerId, MixPubInfo](), peermgr.switch
)
return ok(m)
method start*(mix: WakuMix) =
mix.nodePoolLoopHandle = mix.startMixNodePoolMgr()
method stop*(mix: WakuMix) {.async.} =
if mix.nodePoolLoopHandle.isNil():
return
await mix.nodePoolLoopHandle.cancelAndWait()
mix.nodePoolLoopHandle = nil
#[ proc setMixBootStrapNodes*(node: WakuNode,){.async}=
node.mix.setNodePool(node.getBootStrapMixNodes())
]#
# Mix Protocol