mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
chore: chat2 with mix (#3506)
* add mixify option to lightpush publish * initialize mix protocol and peerExchange to populate node pool in chat2 * scripts for mixnet sim and chat2mix
This commit is contained in:
parent
6bced60d3b
commit
a6892bfebe
4
Makefile
4
Makefile
@ -228,6 +228,10 @@ chat2: | build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) waku.nims
|
||||
|
||||
chat2mix: | build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim chat2mix $(NIM_PARAMS) waku.nims
|
||||
|
||||
rln-db-inspector: | build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim rln_db_inspector $(NIM_PARAMS) waku.nims
|
||||
|
||||
704
apps/chat2mix/chat2mix.nim
Normal file
704
apps/chat2mix/chat2mix.nim
Normal file
@ -0,0 +1,704 @@
|
||||
## chat2 is an example of usage of Waku v2. For suggested usage options, please
|
||||
## see dingpu tutorial in docs folder.
|
||||
|
||||
when not (compileOption("threads")):
|
||||
{.fatal: "Please, compile this program with the --threads:on option!".}
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[strformat, strutils, times, options, random, sequtils]
|
||||
import
|
||||
confutils,
|
||||
chronicles,
|
||||
chronos,
|
||||
eth/keys,
|
||||
bearssl,
|
||||
stew/[byteutils, results],
|
||||
metrics,
|
||||
metrics/chronos_httpserver
|
||||
import
|
||||
libp2p/[
|
||||
switch, # manage transports, a single entry point for dialing and listening
|
||||
crypto/crypto, # cryptographic functions
|
||||
stream/connection, # create and close stream read / write connections
|
||||
multiaddress,
|
||||
# encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP
|
||||
peerinfo,
|
||||
# manage the information of a peer, such as peer ID and public / private key
|
||||
peerid, # Implement how peers interact
|
||||
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
|
||||
nameresolving/dnsresolver,
|
||||
] # define DNS resolution
|
||||
import mix/curve25519
|
||||
import
|
||||
waku/[
|
||||
waku_core,
|
||||
waku_lightpush_legacy/common,
|
||||
waku_lightpush_legacy/rpc,
|
||||
waku_enr,
|
||||
discovery/waku_dnsdisc,
|
||||
waku_store_legacy,
|
||||
waku_node,
|
||||
node/waku_metrics,
|
||||
node/peer_manager,
|
||||
factory/builder,
|
||||
common/utils/nat,
|
||||
waku_relay,
|
||||
waku_store/common,
|
||||
waku_filter_v2/client,
|
||||
],
|
||||
./config_chat2mix
|
||||
|
||||
import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/pubsub
|
||||
import ../../waku/waku_rln_relay
|
||||
|
||||
logScope:
|
||||
topics = "chat2 mix"
|
||||
|
||||
const Help =
|
||||
"""
|
||||
Commands: /[?|help|connect|nick|exit]
|
||||
help: Prints this help
|
||||
connect: dials a remote peer
|
||||
nick: change nickname for current chat session
|
||||
exit: exits chat session
|
||||
"""
|
||||
|
||||
# XXX Connected is a bit annoying, because incoming connections don't trigger state change
|
||||
# Could poll connection pool or something here, I suppose
|
||||
# TODO Ensure connected turns true on incoming connections, or get rid of it
|
||||
type Chat = ref object
|
||||
node: WakuNode # waku node for publishing, subscribing, etc
|
||||
transp: StreamTransport # transport streams between read & write file descriptor
|
||||
subscribed: bool # indicates if a node is subscribed or not to a topic
|
||||
connected: bool # if the node is connected to another peer
|
||||
started: bool # if the node has started
|
||||
nick: string # nickname for this chat session
|
||||
prompt: bool # chat prompt is showing
|
||||
contentTopic: string # default content topic for chat messages
|
||||
conf: Chat2Conf # configuration for chat2
|
||||
|
||||
type
|
||||
PrivateKey* = crypto.PrivateKey
|
||||
Topic* = waku_core.PubsubTopic
|
||||
|
||||
#####################
|
||||
## chat2 protobufs ##
|
||||
#####################
|
||||
|
||||
type
|
||||
SelectResult*[T] = Result[T, string]
|
||||
|
||||
Chat2Message* = object
|
||||
timestamp*: int64
|
||||
nick*: string
|
||||
payload*: seq[byte]
|
||||
|
||||
proc getPubsubTopic*(
|
||||
conf: Chat2Conf, node: WakuNode, contentTopic: string
|
||||
): PubsubTopic =
|
||||
let shard = node.wakuSharding.getShard(contentTopic).valueOr:
|
||||
echo "Could not parse content topic: " & error
|
||||
return "" #TODO: fix this.
|
||||
return $RelayShard(clusterId: conf.clusterId, shardId: shard.shardId)
|
||||
|
||||
proc init*(T: type Chat2Message, buffer: seq[byte]): ProtoResult[T] =
|
||||
var msg = Chat2Message()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var timestamp: uint64
|
||||
discard ?pb.getField(1, timestamp)
|
||||
msg.timestamp = int64(timestamp)
|
||||
|
||||
discard ?pb.getField(2, msg.nick)
|
||||
discard ?pb.getField(3, msg.payload)
|
||||
|
||||
ok(msg)
|
||||
|
||||
proc encode*(message: Chat2Message): ProtoBuffer =
|
||||
var serialised = initProtoBuffer()
|
||||
|
||||
serialised.write(1, uint64(message.timestamp))
|
||||
serialised.write(2, message.nick)
|
||||
serialised.write(3, message.payload)
|
||||
|
||||
return serialised
|
||||
|
||||
proc toString*(message: Chat2Message): string =
|
||||
# Get message date and timestamp in local time
|
||||
let time = message.timestamp.fromUnix().local().format("'<'MMM' 'dd,' 'HH:mm'>'")
|
||||
|
||||
return time & " " & message.nick & ": " & string.fromBytes(message.payload)
|
||||
|
||||
#####################
|
||||
|
||||
proc connectToNodes(c: Chat, nodes: seq[string]) {.async.} =
|
||||
echo "Connecting to nodes"
|
||||
await c.node.connectToNodes(nodes)
|
||||
c.connected = true
|
||||
|
||||
proc showChatPrompt(c: Chat) =
|
||||
if not c.prompt:
|
||||
try:
|
||||
stdout.write(">> ")
|
||||
stdout.flushFile()
|
||||
c.prompt = true
|
||||
except IOError:
|
||||
discard
|
||||
|
||||
proc getChatLine(c: Chat, msg: WakuMessage): Result[string, string] =
|
||||
# No payload encoding/encryption from Waku
|
||||
let
|
||||
pb = Chat2Message.init(msg.payload)
|
||||
chatLine =
|
||||
if pb.isOk:
|
||||
pb[].toString()
|
||||
else:
|
||||
string.fromBytes(msg.payload)
|
||||
return ok(chatline)
|
||||
|
||||
proc printReceivedMessage(c: Chat, msg: WakuMessage) =
|
||||
let
|
||||
pb = Chat2Message.init(msg.payload)
|
||||
chatLine =
|
||||
if pb.isOk:
|
||||
pb[].toString()
|
||||
else:
|
||||
string.fromBytes(msg.payload)
|
||||
try:
|
||||
echo &"{chatLine}"
|
||||
except ValueError:
|
||||
# Formatting fail. Print chat line in any case.
|
||||
echo chatLine
|
||||
|
||||
c.prompt = false
|
||||
showChatPrompt(c)
|
||||
trace "Printing message", chatLine, contentTopic = msg.contentTopic
|
||||
|
||||
proc readNick(transp: StreamTransport): Future[string] {.async.} =
|
||||
# Chat prompt
|
||||
stdout.write("Choose a nickname >> ")
|
||||
stdout.flushFile()
|
||||
return await transp.readLine()
|
||||
|
||||
proc startMetricsServer(
|
||||
serverIp: IpAddress, serverPort: Port
|
||||
): Result[MetricsHttpServerRef, string] =
|
||||
info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $serverPort
|
||||
|
||||
let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort)
|
||||
if metricsServerRes.isErr():
|
||||
return err("metrics HTTP server start failed: " & $metricsServerRes.error)
|
||||
|
||||
let server = metricsServerRes.value
|
||||
try:
|
||||
waitFor server.start()
|
||||
except CatchableError:
|
||||
return err("metrics HTTP server start failed: " & getCurrentExceptionMsg())
|
||||
|
||||
info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $serverPort
|
||||
ok(metricsServerRes.value)
|
||||
|
||||
proc publish(c: Chat, line: string) =
|
||||
# First create a Chat2Message protobuf with this line of text
|
||||
let time = getTime().toUnix()
|
||||
let chat2pb =
|
||||
Chat2Message(timestamp: time, nick: c.nick, payload: line.toBytes()).encode()
|
||||
|
||||
## @TODO: error handling on failure
|
||||
proc handler(response: PushResponse) {.gcsafe, closure.} =
|
||||
trace "lightpush response received", response = response
|
||||
|
||||
var message = WakuMessage(
|
||||
payload: chat2pb.buffer,
|
||||
contentTopic: c.contentTopic,
|
||||
version: 0,
|
||||
timestamp: getNanosecondTime(time),
|
||||
)
|
||||
|
||||
try:
|
||||
if not c.node.wakuLightpushClient.isNil():
|
||||
# Attempt lightpush with mix
|
||||
#(
|
||||
discard c.node.lightpushPublish(
|
||||
some(c.conf.getPubsubTopic(c.node, c.contentTopic)),
|
||||
message,
|
||||
none(RemotePeerInfo),
|
||||
true,
|
||||
) #TODO: Not waiting for response, have to change once SURB is implmented
|
||||
#).isOkOr:
|
||||
# error "failed to publish lightpush message", error = error
|
||||
else:
|
||||
error "failed to publish message as lightpush client is not initialized"
|
||||
except CatchableError:
|
||||
error "caught error publishing message: ", error = getCurrentExceptionMsg()
|
||||
|
||||
# TODO This should read or be subscribe handler subscribe
|
||||
proc readAndPrint(c: Chat) {.async.} =
|
||||
while true:
|
||||
# while p.connected:
|
||||
# # TODO: echo &"{p.id} -> "
|
||||
#
|
||||
# echo cast[string](await p.conn.readLp(1024))
|
||||
#echo "readAndPrint subscribe NYI"
|
||||
await sleepAsync(100)
|
||||
|
||||
# TODO Implement
|
||||
proc writeAndPrint(c: Chat) {.async.} =
|
||||
while true:
|
||||
# Connect state not updated on incoming WakuRelay connections
|
||||
# if not c.connected:
|
||||
# echo "type an address or wait for a connection:"
|
||||
# echo "type /[help|?] for help"
|
||||
|
||||
# Chat prompt
|
||||
showChatPrompt(c)
|
||||
|
||||
let line = await c.transp.readLine()
|
||||
if line.startsWith("/help") or line.startsWith("/?") or not c.started:
|
||||
echo Help
|
||||
continue
|
||||
|
||||
# if line.startsWith("/disconnect"):
|
||||
# echo "Ending current session"
|
||||
# if p.connected and p.conn.closed.not:
|
||||
# await p.conn.close()
|
||||
# p.connected = false
|
||||
elif line.startsWith("/connect"):
|
||||
# TODO Should be able to connect to multiple peers for Waku chat
|
||||
if c.connected:
|
||||
echo "already connected to at least one peer"
|
||||
continue
|
||||
|
||||
echo "enter address of remote peer"
|
||||
let address = await c.transp.readLine()
|
||||
if address.len > 0:
|
||||
await c.connectToNodes(@[address])
|
||||
elif line.startsWith("/nick"):
|
||||
# Set a new nickname
|
||||
c.nick = await readNick(c.transp)
|
||||
echo "You are now known as " & c.nick
|
||||
elif line.startsWith("/exit"):
|
||||
echo "quitting..."
|
||||
|
||||
try:
|
||||
await c.node.stop()
|
||||
except:
|
||||
echo "exception happened when stopping: " & getCurrentExceptionMsg()
|
||||
|
||||
quit(QuitSuccess)
|
||||
else:
|
||||
# XXX connected state problematic
|
||||
if c.started:
|
||||
echo "publishing message: " & line
|
||||
c.publish(line)
|
||||
# TODO Connect to peer logic?
|
||||
else:
|
||||
try:
|
||||
if line.startsWith("/") and "p2p" in line:
|
||||
await c.connectToNodes(@[line])
|
||||
except:
|
||||
echo &"unable to dial remote peer {line}"
|
||||
echo getCurrentExceptionMsg()
|
||||
|
||||
proc readWriteLoop(c: Chat) {.async.} =
|
||||
asyncSpawn c.writeAndPrint() # execute the async function but does not block
|
||||
asyncSpawn c.readAndPrint()
|
||||
|
||||
proc readInput(wfd: AsyncFD) {.thread, raises: [Defect, CatchableError].} =
|
||||
## This procedure performs reading from `stdin` and sends data over
|
||||
## pipe to main thread.
|
||||
let transp = fromPipe(wfd)
|
||||
|
||||
while true:
|
||||
let line = stdin.readLine()
|
||||
discard waitFor transp.write(line & "\r\n")
|
||||
|
||||
var alreadyUsedServicePeers {.threadvar.}: seq[RemotePeerInfo]
|
||||
|
||||
proc selectRandomServicePeer*(
|
||||
pm: PeerManager, actualPeer: Option[RemotePeerInfo], codec: string
|
||||
): Result[RemotePeerInfo, void] =
|
||||
if actualPeer.isSome():
|
||||
alreadyUsedServicePeers.add(actualPeer.get())
|
||||
|
||||
let supportivePeers = pm.switch.peerStore.getPeersByProtocol(codec).filterIt(
|
||||
it notin alreadyUsedServicePeers
|
||||
)
|
||||
if supportivePeers.len == 0:
|
||||
return err()
|
||||
|
||||
let rndPeerIndex = rand(0 .. supportivePeers.len - 1)
|
||||
return ok(supportivePeers[rndPeerIndex])
|
||||
|
||||
proc maintainSubscription(
|
||||
wakuNode: WakuNode,
|
||||
filterPubsubTopic: PubsubTopic,
|
||||
filterContentTopic: ContentTopic,
|
||||
filterPeer: RemotePeerInfo,
|
||||
preventPeerSwitch: bool,
|
||||
) {.async.} =
|
||||
var actualFilterPeer = filterPeer
|
||||
const maxFailedSubscribes = 3
|
||||
const maxFailedServiceNodeSwitches = 10
|
||||
var noFailedSubscribes = 0
|
||||
var noFailedServiceNodeSwitches = 0
|
||||
while true:
|
||||
info "maintaining subscription at", peer = constructMultiaddrStr(actualFilterPeer)
|
||||
# First use filter-ping to check if we have an active subscription
|
||||
let pingRes = await wakuNode.wakuFilterClient.ping(actualFilterPeer)
|
||||
if pingRes.isErr():
|
||||
# No subscription found. Let's subscribe.
|
||||
error "ping failed.", err = pingRes.error
|
||||
trace "no subscription found. Sending subscribe request"
|
||||
|
||||
let subscribeRes = await wakuNode.filterSubscribe(
|
||||
some(filterPubsubTopic), filterContentTopic, actualFilterPeer
|
||||
)
|
||||
|
||||
if subscribeRes.isErr():
|
||||
noFailedSubscribes += 1
|
||||
error "Subscribe request failed.",
|
||||
err = subscribeRes.error,
|
||||
peer = actualFilterPeer,
|
||||
failCount = noFailedSubscribes
|
||||
|
||||
# TODO: disconnet from failed actualFilterPeer
|
||||
# asyncSpawn(wakuNode.peerManager.switch.disconnect(p))
|
||||
# wakunode.peerManager.peerStore.delete(actualFilterPeer)
|
||||
|
||||
if noFailedSubscribes < maxFailedSubscribes:
|
||||
await sleepAsync(2000) # Wait a bit before retrying
|
||||
continue
|
||||
elif not preventPeerSwitch:
|
||||
let peerOpt = selectRandomServicePeer(
|
||||
wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec
|
||||
)
|
||||
if peerOpt.isOk():
|
||||
actualFilterPeer = peerOpt.get()
|
||||
|
||||
info "Found new peer for codec",
|
||||
codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer)
|
||||
|
||||
noFailedSubscribes = 0
|
||||
continue # try again with new peer without delay
|
||||
else:
|
||||
error "Failed to find new service peer. Exiting."
|
||||
noFailedServiceNodeSwitches += 1
|
||||
break
|
||||
else:
|
||||
if noFailedSubscribes > 0:
|
||||
noFailedSubscribes -= 1
|
||||
|
||||
notice "subscribe request successful."
|
||||
else:
|
||||
info "subscription is live."
|
||||
|
||||
await sleepAsync(30000) # Subscription maintenance interval
|
||||
|
||||
proc processMixNodes(localnode: WakuNode, nodes: seq[string]) {.async.} =
|
||||
if nodes.len == 0:
|
||||
return
|
||||
|
||||
info "Processing mix nodes: ", nodes = $nodes
|
||||
for node in nodes:
|
||||
var enrRec: enr.Record
|
||||
if enrRec.fromURI(node):
|
||||
let peerInfo = enrRec.toRemotePeerInfo().valueOr:
|
||||
error "Failed to parse mix node", error = error
|
||||
continue
|
||||
localnode.peermanager.addPeer(peerInfo, Discv5)
|
||||
info "Added mix node", peer = peerInfo
|
||||
else:
|
||||
error "Failed to parse mix node ENR", node = node
|
||||
|
||||
{.pop.}
|
||||
# @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
||||
proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
let
|
||||
transp = fromPipe(rfd)
|
||||
conf = Chat2Conf.load()
|
||||
nodekey =
|
||||
if conf.nodekey.isSome():
|
||||
conf.nodekey.get()
|
||||
else:
|
||||
PrivateKey.random(Secp256k1, rng[]).tryGet()
|
||||
|
||||
# set log level
|
||||
if conf.logLevel != LogLevel.NONE:
|
||||
setLogLevel(conf.logLevel)
|
||||
|
||||
let natRes = setupNat(
|
||||
conf.nat,
|
||||
clientId,
|
||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||
Port(uint16(conf.udpPort) + conf.portsShift),
|
||||
)
|
||||
|
||||
if natRes.isErr():
|
||||
raise newException(ValueError, "setupNat error " & natRes.error)
|
||||
|
||||
let (extIp, extTcpPort, extUdpPort) = natRes.get()
|
||||
|
||||
var enrBuilder = EnrBuilder.init(nodeKey)
|
||||
|
||||
enrBuilder.withWakuRelaySharding(
|
||||
RelayShards(clusterId: conf.clusterId, shardIds: conf.shards)
|
||||
).isOkOr:
|
||||
error "failed to add sharded topics to ENR", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
error "failed to create enr record", error = recordRes.error
|
||||
quit(QuitFailure)
|
||||
else:
|
||||
recordRes.get()
|
||||
|
||||
let node = block:
|
||||
var builder = WakuNodeBuilder.init()
|
||||
builder.withNodeKey(nodeKey)
|
||||
builder.withRecord(record)
|
||||
|
||||
builder
|
||||
.withNetworkConfigurationDetails(
|
||||
conf.listenAddress,
|
||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||
extIp,
|
||||
extTcpPort,
|
||||
wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift),
|
||||
wsEnabled = conf.websocketSupport,
|
||||
wssEnabled = conf.websocketSecureSupport,
|
||||
)
|
||||
.tryGet()
|
||||
builder.build().tryGet()
|
||||
|
||||
node.mountSharding(conf.clusterId, conf.numShardsInNetwork).isOkOr:
|
||||
error "failed to mount waku sharding: ", error = error
|
||||
quit(QuitFailure)
|
||||
node.mountMetadata(conf.clusterId).isOkOr:
|
||||
error "failed to mount waku metadata protocol: ", err = error
|
||||
quit(QuitFailure)
|
||||
|
||||
try:
|
||||
await node.mountPeerExchange()
|
||||
except CatchableError:
|
||||
error "failed to mount waku peer-exchange protocol",
|
||||
error = getCurrentExceptionMsg()
|
||||
quit(QuitFailure)
|
||||
|
||||
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
|
||||
error "failed to generate mix key pair", error = error
|
||||
return
|
||||
|
||||
(await node.mountMix(conf.clusterId, mixPrivKey)).isOkOr:
|
||||
error "failed to mount waku mix protocol: ", error = $error
|
||||
quit(QuitFailure)
|
||||
if conf.mixnodes.len > 0:
|
||||
await processMixNodes(node, conf.mixnodes)
|
||||
await node.start()
|
||||
|
||||
node.peerManager.start()
|
||||
|
||||
#[ if conf.rlnRelayCredPath == "":
|
||||
raise newException(ConfigurationError, "rln-relay-cred-path MUST be passed")
|
||||
|
||||
if conf.relay:
|
||||
let shards =
|
||||
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
|
||||
(await node.mountRelay()).isOkOr:
|
||||
echo "failed to mount relay: " & error
|
||||
return
|
||||
]#
|
||||
await node.mountLibp2pPing()
|
||||
let pubsubTopic = conf.getPubsubTopic(node, conf.contentTopic)
|
||||
echo "pubsub topic is: " & pubsubTopic
|
||||
let nick = await readNick(transp)
|
||||
echo "Welcome, " & nick & "!"
|
||||
|
||||
var chat = Chat(
|
||||
node: node,
|
||||
transp: transp,
|
||||
subscribed: true,
|
||||
connected: false,
|
||||
started: true,
|
||||
nick: nick,
|
||||
prompt: false,
|
||||
contentTopic: conf.contentTopic,
|
||||
conf: conf,
|
||||
)
|
||||
|
||||
var dnsDiscoveryUrl = none(string)
|
||||
|
||||
if conf.fleet != Fleet.none:
|
||||
# Use DNS discovery to connect to selected fleet
|
||||
echo "Connecting to " & $conf.fleet & " fleet using DNS discovery..."
|
||||
|
||||
if conf.fleet == Fleet.test:
|
||||
dnsDiscoveryUrl = some(
|
||||
"enrtree://AOGYWMBYOUIMOENHXCHILPKY3ZRFEULMFI4DOM442QSZ73TT2A7VI@test.waku.nodes.status.im"
|
||||
)
|
||||
else:
|
||||
# Connect to sandbox by default
|
||||
dnsDiscoveryUrl = some(
|
||||
"enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im"
|
||||
)
|
||||
elif conf.dnsDiscoveryUrl != "":
|
||||
# No pre-selected fleet. Discover nodes via DNS using user config
|
||||
debug "Discovering nodes using Waku DNS discovery", url = conf.dnsDiscoveryUrl
|
||||
dnsDiscoveryUrl = some(conf.dnsDiscoveryUrl)
|
||||
|
||||
var discoveredNodes: seq[RemotePeerInfo]
|
||||
|
||||
if dnsDiscoveryUrl.isSome:
|
||||
var nameServers: seq[TransportAddress]
|
||||
for ip in conf.dnsDiscoveryNameServers:
|
||||
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
||||
|
||||
let dnsResolver = DnsResolver.new(nameServers)
|
||||
|
||||
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
|
||||
trace "resolving", domain = domain
|
||||
let resolved = await dnsResolver.resolveTxt(domain)
|
||||
return resolved[0] # Use only first answer
|
||||
|
||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver)
|
||||
if wakuDnsDiscovery.isOk:
|
||||
let discoveredPeers = await wakuDnsDiscovery.get().findPeers()
|
||||
if discoveredPeers.isOk:
|
||||
info "Connecting to discovered peers"
|
||||
discoveredNodes = discoveredPeers.get()
|
||||
echo "Discovered and connecting to " & $discoveredNodes
|
||||
waitFor chat.node.connectToNodes(discoveredNodes)
|
||||
else:
|
||||
warn "Failed to init Waku DNS discovery"
|
||||
|
||||
let peerInfo = node.switch.peerInfo
|
||||
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
|
||||
echo &"Listening on\n {listenStr}"
|
||||
|
||||
if (conf.storenode != "") or (conf.store == true):
|
||||
await node.mountStore()
|
||||
|
||||
var storenode: Option[RemotePeerInfo]
|
||||
|
||||
if conf.storenode != "":
|
||||
let peerInfo = parsePeerInfo(conf.storenode)
|
||||
if peerInfo.isOk():
|
||||
storenode = some(peerInfo.value)
|
||||
else:
|
||||
error "Incorrect conf.storenode", error = peerInfo.error
|
||||
elif discoveredNodes.len > 0:
|
||||
echo "Store enabled, but no store nodes configured. Choosing one at random from discovered peers"
|
||||
storenode = some(discoveredNodes[rand(0 .. len(discoveredNodes) - 1)])
|
||||
|
||||
if storenode.isSome():
|
||||
# We have a viable storenode. Let's query it for historical messages.
|
||||
echo "Connecting to storenode: " & $(storenode.get())
|
||||
|
||||
node.mountStoreClient()
|
||||
node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec)
|
||||
|
||||
proc storeHandler(response: StoreQueryResponse) {.gcsafe.} =
|
||||
for msg in response.messages:
|
||||
let payload =
|
||||
if msg.message.isSome():
|
||||
msg.message.get().payload
|
||||
else:
|
||||
newSeq[byte](0)
|
||||
|
||||
let
|
||||
pb = Chat2Message.init(payload)
|
||||
chatLine =
|
||||
if pb.isOk:
|
||||
pb[].toString()
|
||||
else:
|
||||
string.fromBytes(payload)
|
||||
echo &"{chatLine}"
|
||||
info "Hit store handler"
|
||||
|
||||
let queryRes = await node.query(
|
||||
StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get()
|
||||
)
|
||||
if queryRes.isOk():
|
||||
storeHandler(queryRes.value)
|
||||
|
||||
if conf.edgemode: #Mount light protocol clients
|
||||
node.mountLightPushClient()
|
||||
await node.mountFilterClient()
|
||||
let filterHandler = proc(
|
||||
pubsubTopic: PubsubTopic, msg: WakuMessage
|
||||
): Future[void] {.async, closure.} =
|
||||
trace "Hit filter handler", contentTopic = msg.contentTopic
|
||||
chat.printReceivedMessage(msg)
|
||||
|
||||
node.wakuFilterClient.registerPushHandler(filterHandler)
|
||||
|
||||
if conf.serviceNode != "": #TODO: use one of discovered nodes if not present.
|
||||
let peerInfo = parsePeerInfo(conf.serviceNode)
|
||||
if peerInfo.isOk():
|
||||
#await mountLegacyLightPush(node)
|
||||
node.peerManager.addServicePeer(peerInfo.value, WakuLightpushCodec)
|
||||
node.peerManager.addServicePeer(peerInfo.value, WakuPeerExchangeCodec)
|
||||
# Start maintaining subscription
|
||||
asyncSpawn maintainSubscription(
|
||||
node, pubsubTopic, conf.contentTopic, peerInfo.value, false
|
||||
)
|
||||
else:
|
||||
error "LightPushClient not mounted. Couldn't parse conf.serviceNode",
|
||||
error = peerInfo.error
|
||||
# TODO: Loop faster
|
||||
node.startPeerExchangeLoop()
|
||||
|
||||
while node.getMixNodePoolSize() < 3:
|
||||
info "waiting for mix nodes to be discovered",
|
||||
currentpoolSize = node.getMixNodePoolSize()
|
||||
await sleepAsync(1000)
|
||||
notice "ready to publish with mix node pool size ",
|
||||
currentpoolSize = node.getMixNodePoolSize()
|
||||
echo "ready to publish messages now"
|
||||
|
||||
if conf.metricsLogging:
|
||||
startMetricsLog()
|
||||
|
||||
if conf.metricsServer:
|
||||
let metricsServer = startMetricsServer(
|
||||
conf.metricsServerAddress, Port(conf.metricsServerPort + conf.portsShift)
|
||||
)
|
||||
|
||||
await chat.readWriteLoop()
|
||||
|
||||
runForever()
|
||||
|
||||
proc main(rng: ref HmacDrbgContext) {.async.} =
|
||||
let (rfd, wfd) = createAsyncPipe()
|
||||
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
|
||||
raise newException(ValueError, "Could not initialize pipe!")
|
||||
|
||||
var thread: Thread[AsyncFD]
|
||||
thread.createThread(readInput, wfd)
|
||||
try:
|
||||
await processInput(rfd, rng)
|
||||
# Handle only ConfigurationError for now
|
||||
# TODO: Throw other errors from the mounting procedure
|
||||
except ConfigurationError as e:
|
||||
raise e
|
||||
|
||||
when isMainModule: # isMainModule = true when the module is compiled as the main file
|
||||
let rng = crypto.newRng()
|
||||
try:
|
||||
waitFor(main(rng))
|
||||
except CatchableError as e:
|
||||
raise e
|
||||
|
||||
## Dump of things that can be improved:
|
||||
##
|
||||
## - Incoming dialed peer does not change connected state (not relying on it for now)
|
||||
## - Unclear if staticnode argument works (can enter manually)
|
||||
## - Don't trigger self / double publish own messages
|
||||
## - Test/default to cluster node connection (diff protocol version)
|
||||
## - Redirect logs to separate file
|
||||
## - Expose basic publish/subscribe etc commands with /syntax
|
||||
## - Show part of peerid to know who sent message
|
||||
## - Deal with protobuf messages (e.g. other chat protocol, or encrypted)
|
||||
282
apps/chat2mix/config_chat2mix.nim
Normal file
282
apps/chat2mix/config_chat2mix.nim
Normal file
@ -0,0 +1,282 @@
|
||||
import chronicles, chronos, std/strutils, regex
|
||||
|
||||
import
|
||||
eth/keys,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/secp,
|
||||
nimcrypto/utils,
|
||||
confutils,
|
||||
confutils/defs,
|
||||
confutils/std/net
|
||||
|
||||
import waku/waku_core
|
||||
|
||||
type
|
||||
Fleet* = enum
|
||||
none
|
||||
prod
|
||||
test
|
||||
|
||||
EthRpcUrl* = distinct string
|
||||
|
||||
Chat2Conf* = object ## General node config
|
||||
edgemode* {.
|
||||
defaultValue: true, desc: "Run the app in edge mode", name: "edge-mode"
|
||||
.}: bool
|
||||
|
||||
logLevel* {.
|
||||
desc: "Sets the log level.", defaultValue: LogLevel.INFO, name: "log-level"
|
||||
.}: LogLevel
|
||||
|
||||
nodekey* {.desc: "P2P node private key as 64 char hex string.", name: "nodekey".}:
|
||||
Option[crypto.PrivateKey]
|
||||
|
||||
listenAddress* {.
|
||||
defaultValue: defaultListenAddress(config),
|
||||
desc: "Listening address for the LibP2P traffic.",
|
||||
name: "listen-address"
|
||||
.}: IpAddress
|
||||
|
||||
tcpPort* {.desc: "TCP listening port.", defaultValue: 60000, name: "tcp-port".}:
|
||||
Port
|
||||
|
||||
udpPort* {.desc: "UDP listening port.", defaultValue: 60000, name: "udp-port".}:
|
||||
Port
|
||||
|
||||
portsShift* {.
|
||||
desc: "Add a shift to all port numbers.", defaultValue: 0, name: "ports-shift"
|
||||
.}: uint16
|
||||
|
||||
nat* {.
|
||||
desc:
|
||||
"Specify method to use for determining public address. " &
|
||||
"Must be one of: any, none, upnp, pmp, extip:<IP>.",
|
||||
defaultValue: "any"
|
||||
.}: string
|
||||
|
||||
## Persistence config
|
||||
dbPath* {.
|
||||
desc: "The database path for peristent storage", defaultValue: "", name: "db-path"
|
||||
.}: string
|
||||
|
||||
persistPeers* {.
|
||||
desc: "Enable peer persistence: true|false",
|
||||
defaultValue: false,
|
||||
name: "persist-peers"
|
||||
.}: bool
|
||||
|
||||
persistMessages* {.
|
||||
desc: "Enable message persistence: true|false",
|
||||
defaultValue: false,
|
||||
name: "persist-messages"
|
||||
.}: bool
|
||||
|
||||
## Relay config
|
||||
relay* {.
|
||||
desc: "Enable relay protocol: true|false", defaultValue: true, name: "relay"
|
||||
.}: bool
|
||||
|
||||
staticnodes* {.
|
||||
desc: "Peer multiaddr to directly connect with. Argument may be repeated.",
|
||||
name: "staticnode"
|
||||
.}: seq[string]
|
||||
|
||||
mixnodes* {.
|
||||
desc: "Peer ENR to add as a mixnode. Argument may be repeated.", name: "mixnode"
|
||||
.}: seq[string]
|
||||
|
||||
keepAlive* {.
|
||||
desc: "Enable keep-alive for idle connections: true|false",
|
||||
defaultValue: false,
|
||||
name: "keep-alive"
|
||||
.}: bool
|
||||
|
||||
clusterId* {.
|
||||
desc:
|
||||
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
|
||||
defaultValue: 2,
|
||||
name: "cluster-id"
|
||||
.}: uint16
|
||||
|
||||
numShardsInNetwork* {.
|
||||
desc: "Number of shards in the network",
|
||||
defaultValue: 1,
|
||||
name: "num-shards-in-network"
|
||||
.}: uint32
|
||||
|
||||
shards* {.
|
||||
desc:
|
||||
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
|
||||
defaultValue: @[uint16(0)],
|
||||
name: "shard"
|
||||
.}: seq[uint16]
|
||||
|
||||
## Store config
|
||||
store* {.
|
||||
desc: "Enable store protocol: true|false", defaultValue: false, name: "store"
|
||||
.}: bool
|
||||
|
||||
storenode* {.
|
||||
desc: "Peer multiaddr to query for storage.", defaultValue: "", name: "storenode"
|
||||
.}: string
|
||||
|
||||
## Filter config
|
||||
filter* {.
|
||||
desc: "Enable filter protocol: true|false", defaultValue: false, name: "filter"
|
||||
.}: bool
|
||||
|
||||
## Lightpush config
|
||||
lightpush* {.
|
||||
desc: "Enable lightpush protocol: true|false",
|
||||
defaultValue: false,
|
||||
name: "lightpush"
|
||||
.}: bool
|
||||
|
||||
servicenode* {.
|
||||
desc: "Peer multiaddr to request lightpush and filter services",
|
||||
defaultValue: "",
|
||||
name: "servicenode"
|
||||
.}: string
|
||||
|
||||
## Metrics config
|
||||
metricsServer* {.
|
||||
desc: "Enable the metrics server: true|false",
|
||||
defaultValue: false,
|
||||
name: "metrics-server"
|
||||
.}: bool
|
||||
|
||||
metricsServerAddress* {.
|
||||
desc: "Listening address of the metrics server.",
|
||||
defaultValue: parseIpAddress("127.0.0.1"),
|
||||
name: "metrics-server-address"
|
||||
.}: IpAddress
|
||||
|
||||
metricsServerPort* {.
|
||||
desc: "Listening HTTP port of the metrics server.",
|
||||
defaultValue: 8008,
|
||||
name: "metrics-server-port"
|
||||
.}: uint16
|
||||
|
||||
metricsLogging* {.
|
||||
desc: "Enable metrics logging: true|false",
|
||||
defaultValue: true,
|
||||
name: "metrics-logging"
|
||||
.}: bool
|
||||
|
||||
## DNS discovery config
|
||||
dnsDiscovery* {.
|
||||
desc:
|
||||
"Deprecated, please set dns-discovery-url instead. Enable discovering nodes via DNS",
|
||||
defaultValue: false,
|
||||
name: "dns-discovery"
|
||||
.}: bool
|
||||
|
||||
dnsDiscoveryUrl* {.
|
||||
desc: "URL for DNS node list in format 'enrtree://<key>@<fqdn>'",
|
||||
defaultValue: "",
|
||||
name: "dns-discovery-url"
|
||||
.}: string
|
||||
|
||||
dnsDiscoveryNameServers* {.
|
||||
desc: "DNS name server IPs to query. Argument may be repeated.",
|
||||
defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
|
||||
name: "dns-discovery-name-server"
|
||||
.}: seq[IpAddress]
|
||||
|
||||
## Chat2 configuration
|
||||
fleet* {.
|
||||
desc:
|
||||
"Select the fleet to connect to. This sets the DNS discovery URL to the selected fleet.",
|
||||
defaultValue: Fleet.none,
|
||||
name: "fleet"
|
||||
.}: Fleet
|
||||
|
||||
contentTopic* {.
|
||||
desc: "Content topic for chat messages.",
|
||||
defaultValue: "/toy-chat-mix/2/huilong/proto",
|
||||
name: "content-topic"
|
||||
.}: string
|
||||
|
||||
## Websocket Configuration
|
||||
websocketSupport* {.
|
||||
desc: "Enable websocket: true|false",
|
||||
defaultValue: false,
|
||||
name: "websocket-support"
|
||||
.}: bool
|
||||
|
||||
websocketPort* {.
|
||||
desc: "WebSocket listening port.", defaultValue: 8000, name: "websocket-port"
|
||||
.}: Port
|
||||
|
||||
websocketSecureSupport* {.
|
||||
desc: "WebSocket Secure Support.",
|
||||
defaultValue: false,
|
||||
name: "websocket-secure-support"
|
||||
.}: bool ## rln-relay configuration
|
||||
|
||||
# NOTE: Keys are different in nim-libp2p
|
||||
proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T =
|
||||
try:
|
||||
let key = SkPrivateKey.init(utils.fromHex(p)).tryGet()
|
||||
# XXX: Here at the moment
|
||||
result = crypto.PrivateKey(scheme: Secp256k1, skkey: key)
|
||||
except CatchableError as e:
|
||||
raise newException(ValueError, "Invalid private key")
|
||||
|
||||
proc completeCmdArg*(T: type crypto.PrivateKey, val: string): seq[string] =
|
||||
return @[]
|
||||
|
||||
proc parseCmdArg*(T: type IpAddress, p: string): T =
|
||||
try:
|
||||
result = parseIpAddress(p)
|
||||
except CatchableError as e:
|
||||
raise newException(ValueError, "Invalid IP address")
|
||||
|
||||
proc completeCmdArg*(T: type IpAddress, val: string): seq[string] =
|
||||
return @[]
|
||||
|
||||
proc parseCmdArg*(T: type Port, p: string): T =
|
||||
try:
|
||||
result = Port(parseInt(p))
|
||||
except CatchableError as e:
|
||||
raise newException(ValueError, "Invalid Port number")
|
||||
|
||||
proc completeCmdArg*(T: type Port, val: string): seq[string] =
|
||||
return @[]
|
||||
|
||||
proc parseCmdArg*(T: type Option[uint], p: string): T =
|
||||
try:
|
||||
some(parseUint(p))
|
||||
except CatchableError:
|
||||
raise newException(ValueError, "Invalid unsigned integer")
|
||||
|
||||
proc completeCmdArg*(T: type EthRpcUrl, val: string): seq[string] =
|
||||
return @[]
|
||||
|
||||
proc parseCmdArg*(T: type EthRpcUrl, s: string): T =
|
||||
## allowed patterns:
|
||||
## http://url:port
|
||||
## https://url:port
|
||||
## http://url:port/path
|
||||
## https://url:port/path
|
||||
## http://url/with/path
|
||||
## http://url:port/path?query
|
||||
## https://url:port/path?query
|
||||
## disallowed patterns:
|
||||
## any valid/invalid ws or wss url
|
||||
var httpPattern =
|
||||
re2"^(https?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*"
|
||||
var wsPattern =
|
||||
re2"^(wss?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*"
|
||||
if regex.match(s, wsPattern):
|
||||
raise newException(
|
||||
ValueError, "Websocket RPC URL is not supported, Please use an HTTP URL"
|
||||
)
|
||||
if not regex.match(s, httpPattern):
|
||||
raise newException(ValueError, "Invalid HTTP RPC URL")
|
||||
return EthRpcUrl(s)
|
||||
|
||||
func defaultListenAddress*(conf: Chat2Conf): IpAddress =
|
||||
# TODO: How should we select between IPv4 and IPv6
|
||||
# Maybe there should be a config option for this.
|
||||
(static parseIpAddress("0.0.0.0"))
|
||||
4
apps/chat2mix/nim.cfg
Normal file
4
apps/chat2mix/nim.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
-d:chronicles_line_numbers
|
||||
-d:chronicles_runtime_filtering:on
|
||||
-d:discv5_protocol_id:d5waku
|
||||
path = "../.."
|
||||
4
simulations/README.md
Normal file
4
simulations/README.md
Normal file
@ -0,0 +1,4 @@
|
||||
# Purpose
|
||||
|
||||
This is a place where any simulation related scripts and utilities can be stored.
|
||||
Checkout mixnet folder to get an idea.
|
||||
70
simulations/mixnet/README.md
Normal file
70
simulations/mixnet/README.md
Normal file
@ -0,0 +1,70 @@
|
||||
# Mixnet simulation
|
||||
|
||||
## Aim
|
||||
|
||||
Simulate a local mixnet along with a chat app to publish using mix.
|
||||
This is helpful to test any changes while development.
|
||||
It includes scripts that run a `4 node` mixnet along with a lightpush service node(without mix) that can be used to test quickly.
|
||||
|
||||
## Simulation Details
|
||||
|
||||
Note that before running the simulation both `wakunode2` and `chat2mix` have to be built.
|
||||
|
||||
```bash
|
||||
cd <repo-root-dir>
|
||||
make wakunode2
|
||||
make chat2mix
|
||||
```
|
||||
|
||||
Simulation includes scripts for:
|
||||
|
||||
1. a 4 waku-node mixnet where `node1` is bootstrap node for the other 3 nodes.
|
||||
2. scripts to run chat app that publishes using lightpush protocol over the mixnet
|
||||
|
||||
## Usage
|
||||
|
||||
Start the service node with below command, which acts as bootstrap node for all other mix nodes.
|
||||
|
||||
`./run_lp_service_node.sh`
|
||||
|
||||
To run the nodes for mixnet run the 4 node scripts in different terminals as below.
|
||||
|
||||
`./run_mix_node1.sh`
|
||||
|
||||
Look for following 2 log lines to ensure node ran successfully and has also mounted mix protocol.
|
||||
|
||||
```log
|
||||
INF 2025-08-01 14:51:05.445+05:30 mounting mix protocol topics="waku node" tid=39996871 file=waku_node.nim:231 nodeId="(listenAddresses: @[\"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o\"], enrUri: \"enr:-NC4QKYtas8STkenlqBTJ3a1TTLzJA2DsGGbFlnxem9aSM2IXm-CSVZULdk2467bAyFnepnt8KP_QlfDzdaMXd_zqtwBgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzgIJyc4UAAgEAAIlzZWNwMjU2azGhA6RFtVJVBh0SYOoP8xrgnXSlpiFARmQkF9d8Rn4fSeiog3RjcILqYYN1ZHCCIymFd2FrdTIt\")"
|
||||
|
||||
INF 2025-08-01 14:49:23.467+05:30 Node setup complete topics="wakunode main" tid=39994244 file=wakunode2.nim:104
|
||||
```
|
||||
|
||||
Once all the 4 nodes are up without any issues, run the script to start the chat application.
|
||||
|
||||
`./run_chat_app.sh`
|
||||
|
||||
Enter a nickname to be used.
|
||||
|
||||
```bash
|
||||
pubsub topic is: /waku/2/rs/2/0
|
||||
Choose a nickname >>
|
||||
```
|
||||
|
||||
Once you see below log, it means the app is ready for publishing messages over the mixnet.
|
||||
|
||||
```bash
|
||||
Welcome, test!
|
||||
Listening on
|
||||
/ip4/192.168.68.64/tcp/60000/p2p/16Uiu2HAkxDGqix1ifY3wF1ZzojQWRAQEdKP75wn1LJMfoHhfHz57
|
||||
ready to publish messages now
|
||||
```
|
||||
|
||||
Follow similar instructions to run second instance of chat app.
|
||||
Once both the apps run successfully, send a message and check if it is received by the other app.
|
||||
|
||||
You can exit the chat apps by entering `/exit` as below
|
||||
|
||||
```bash
|
||||
>> /exit
|
||||
quitting...
|
||||
```
|
||||
25
simulations/mixnet/config.toml
Normal file
25
simulations/mixnet/config.toml
Normal file
@ -0,0 +1,25 @@
|
||||
log-level = "INFO"
|
||||
relay = true
|
||||
#mix = true
|
||||
filter = true
|
||||
store = false
|
||||
lightpush = true
|
||||
max-connections = 150
|
||||
peer-exchange = true
|
||||
metrics-logging = false
|
||||
cluster-id = 2
|
||||
discv5-discovery = true
|
||||
discv5-udp-port = 9000
|
||||
discv5-enr-auto-update = true
|
||||
rest = true
|
||||
rest-admin = true
|
||||
ports-shift = 1
|
||||
num-shards-in-network = 1
|
||||
shard = [0]
|
||||
agent-string = "nwaku-mix"
|
||||
nodekey = "f98e3fba96c32e8d1967d460f1b79457380e1a895f7971cecc8528abe733781a"
|
||||
mixkey = "a87db88246ec0eedda347b9b643864bee3d6933eb15ba41e6d58cb678d813258"
|
||||
rendezvous = true
|
||||
listen-address = "127.0.0.1"
|
||||
nat = "extip:127.0.0.1"
|
||||
ip-colocation-limit=0
|
||||
27
simulations/mixnet/config1.toml
Normal file
27
simulations/mixnet/config1.toml
Normal file
@ -0,0 +1,27 @@
|
||||
log-level = "INFO"
|
||||
relay = true
|
||||
mix = true
|
||||
filter = true
|
||||
store = false
|
||||
lightpush = true
|
||||
max-connections = 150
|
||||
peer-exchange = true
|
||||
metrics-logging = false
|
||||
cluster-id = 2
|
||||
discv5-discovery = true
|
||||
discv5-udp-port = 9001
|
||||
discv5-enr-auto-update = true
|
||||
discv5-bootstrap-node = ["enr:-Nq4QHkkKSYWg12nFHlB-Yjb53I922uquTL26R7G6Kk7Z0gNJx2N4qCt_fq0RFg-_ZDX5ju8SatMjNYyImPprl37utMEgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzigAIBMCoAWkG6mGCcnOFAAIBAACJc2VjcDI1NmsxoQOkRbVSVQYdEmDqD_Ma4J10paYhQEZkJBfXfEZ-H0noqIN0Y3CC6mGDdWRwgiMphXdha3UyLQ"]
|
||||
rest = true
|
||||
rest-admin = true
|
||||
ports-shift = 2
|
||||
num-shards-in-network = 1
|
||||
shard = [0]
|
||||
agent-string = "nwaku-mix"
|
||||
nodekey = "09e9d134331953357bd38bbfce8edb377f4b6308b4f3bfbe85c610497053d684"
|
||||
mixkey = "c86029e02c05a7e25182974b519d0d52fcbafeca6fe191fbb64857fb05be1a53"
|
||||
rendezvous = true
|
||||
listen-address = "127.0.0.1"
|
||||
nat = "extip:127.0.0.1"
|
||||
ip-colocation-limit=0
|
||||
#staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"]
|
||||
27
simulations/mixnet/config2.toml
Normal file
27
simulations/mixnet/config2.toml
Normal file
@ -0,0 +1,27 @@
|
||||
log-level = "INFO"
|
||||
relay = true
|
||||
mix = true
|
||||
filter = true
|
||||
store = false
|
||||
lightpush = true
|
||||
max-connections = 150
|
||||
peer-exchange = true
|
||||
metrics-logging = false
|
||||
cluster-id = 2
|
||||
discv5-discovery = true
|
||||
discv5-udp-port = 9002
|
||||
discv5-enr-auto-update = true
|
||||
discv5-bootstrap-node = ["enr:-Nq4QHkkKSYWg12nFHlB-Yjb53I922uquTL26R7G6Kk7Z0gNJx2N4qCt_fq0RFg-_ZDX5ju8SatMjNYyImPprl37utMEgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzigAIBMCoAWkG6mGCcnOFAAIBAACJc2VjcDI1NmsxoQOkRbVSVQYdEmDqD_Ma4J10paYhQEZkJBfXfEZ-H0noqIN0Y3CC6mGDdWRwgiMphXdha3UyLQ"]
|
||||
rest = false
|
||||
rest-admin = false
|
||||
ports-shift = 3
|
||||
num-shards-in-network = 1
|
||||
shard = [0]
|
||||
agent-string = "nwaku-mix"
|
||||
nodekey = "ed54db994682e857d77cd6fb81be697382dc43aa5cd78e16b0ec8098549f860e"
|
||||
mixkey = "b858ac16bbb551c4b2973313b1c8c8f7ea469fca03f1608d200bbf58d388ec7f"
|
||||
rendezvous = true
|
||||
listen-address = "127.0.0.1"
|
||||
nat = "extip:127.0.0.1"
|
||||
ip-colocation-limit=0
|
||||
#staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"]
|
||||
27
simulations/mixnet/config3.toml
Normal file
27
simulations/mixnet/config3.toml
Normal file
@ -0,0 +1,27 @@
|
||||
log-level = "INFO"
|
||||
relay = true
|
||||
mix = true
|
||||
filter = true
|
||||
store = false
|
||||
lightpush = true
|
||||
max-connections = 150
|
||||
peer-exchange = true
|
||||
metrics-logging = false
|
||||
cluster-id = 2
|
||||
discv5-discovery = true
|
||||
discv5-udp-port = 9003
|
||||
discv5-enr-auto-update = true
|
||||
discv5-bootstrap-node = ["enr:-Nq4QHkkKSYWg12nFHlB-Yjb53I922uquTL26R7G6Kk7Z0gNJx2N4qCt_fq0RFg-_ZDX5ju8SatMjNYyImPprl37utMEgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzigAIBMCoAWkG6mGCcnOFAAIBAACJc2VjcDI1NmsxoQOkRbVSVQYdEmDqD_Ma4J10paYhQEZkJBfXfEZ-H0noqIN0Y3CC6mGDdWRwgiMphXdha3UyLQ"]
|
||||
rest = false
|
||||
rest-admin = false
|
||||
ports-shift = 4
|
||||
num-shards-in-network = 1
|
||||
shard = [0]
|
||||
agent-string = "nwaku-mix"
|
||||
nodekey = "42f96f29f2d6670938b0864aced65a332dcf5774103b4c44ec4d0ea4ef3c47d6"
|
||||
mixkey = "d8bd379bb394b0f22dd236d63af9f1a9bc45266beffc3fbbe19e8b6575f2535b"
|
||||
rendezvous = true
|
||||
listen-address = "127.0.0.1"
|
||||
nat = "extip:127.0.0.1"
|
||||
ip-colocation-limit=0
|
||||
#staticnode = ["/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"]
|
||||
27
simulations/mixnet/config4.toml
Normal file
27
simulations/mixnet/config4.toml
Normal file
@ -0,0 +1,27 @@
|
||||
log-level = "DEBUG"
|
||||
relay = true
|
||||
mix = true
|
||||
filter = true
|
||||
store = false
|
||||
lightpush = true
|
||||
max-connections = 150
|
||||
peer-exchange = true
|
||||
metrics-logging = false
|
||||
cluster-id = 2
|
||||
discv5-discovery = true
|
||||
discv5-udp-port = 9004
|
||||
discv5-enr-auto-update = true
|
||||
discv5-bootstrap-node = ["enr:-Nq4QHkkKSYWg12nFHlB-Yjb53I922uquTL26R7G6Kk7Z0gNJx2N4qCt_fq0RFg-_ZDX5ju8SatMjNYyImPprl37utMEgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzigAIBMCoAWkG6mGCcnOFAAIBAACJc2VjcDI1NmsxoQOkRbVSVQYdEmDqD_Ma4J10paYhQEZkJBfXfEZ-H0noqIN0Y3CC6mGDdWRwgiMphXdha3UyLQ"]
|
||||
rest = false
|
||||
rest-admin = false
|
||||
ports-shift = 5
|
||||
num-shards-in-network = 1
|
||||
shard = [0]
|
||||
agent-string = "nwaku-mix"
|
||||
#nodekey = "3ce887b3c34b7a92dd2868af33941ed1dbec4893b054572cd5078da09dd923d4"
|
||||
mixkey = "780fff09e51e98df574e266bf3266ec6a3a1ddfcf7da826a349a29c137009d49"
|
||||
rendezvous = true
|
||||
listen-address = "127.0.0.1"
|
||||
nat = "extip:127.0.0.1"
|
||||
ip-colocation-limit=0
|
||||
#staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF"]
|
||||
1
simulations/mixnet/run_chat_mix.sh
Executable file
1
simulations/mixnet/run_chat_mix.sh
Executable file
@ -0,0 +1 @@
|
||||
../../build/chat2mix --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --mixnode="enr:-Nq4QHkkKSYWg12nFHlB-Yjb53I922uquTL26R7G6Kk7Z0gNJx2N4qCt_fq0RFg-_ZDX5ju8SatMjNYyImPprl37utMEgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzigAIBMCoAWkG6mGCcnOFAAIBAACJc2VjcDI1NmsxoQOkRbVSVQYdEmDqD_Ma4J10paYhQEZkJBfXfEZ-H0noqIN0Y3CC6mGDdWRwgiMphXdha3UyLQ" --mixnode="enr:-NC4QH3HbfXxl0emm33s-6ovpu4VEA959XDSMU7rQOMfS8w6U9WB39Y25Z_ZOcgegg2SQBoGsX4kwHpNdsd-ZWzuGasBgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCSMehtpkMlApAKhPhnAEznhjKrUs2OMLHsMizXlXEMKoptdWx0aWFkZHJzgIJyc4UAAgEAAIlzZWNwMjU2azGhA3pHyDDU-7wjAyVNqnT8Tu9V3XBxoWCD9VRpp5VZJlhUg3RjcILqYoN1ZHCCIyuFd2FrdTIt" --mixnode="enr:-Nq4QMAtUyBnD7j_o7qsXLuKWn2fxvSOC0EciyK91qSQyqCib_bPxRqYbjZLNh0YOSI0t0xN23Kp46OctZtzRhw_hxsCgmlkgnY0gmlwhH8AAAGHbWl4LWtleaAnXNaInh8pykjlue24ANGpT0nxPTk6Ds8aB691NQbebIptdWx0aWFkZHJzigAIBMCoREAG6mOCcnOFAAIBAACJc2VjcDI1NmsxoQPYhmrbTqylbdenVfvO2U0w6EC4A-l5lwvu3QWL7IqkO4N0Y3CC6mODdWRwgiMthXdha3UyLQ" --mixnode="enr:-Nq4QHo7054AuNsTZde5A5GcklDZmcrumkd32BBW3UUlLh7lBuYstu8dmClolil8g3nDQBqwU_B5-iEfVS1UVxRWoVoDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaDg7VlKjVBmgb4HXo4jcjR4OI-xgkd_ekaTCaJecHb8GIptdWx0aWFkZHJzigAIBMCoREAG6mSCcnOFAAIBAACJc2VjcDI1NmsxoQOnphVC3U5zmOCkjOI2tY0v8K5QkXSaE5xO37q3iFfKGIN0Y3CC6mSDdWRwgiMvhXdha3UyLQ"
|
||||
1
simulations/mixnet/run_chat_mix1.sh
Executable file
1
simulations/mixnet/run_chat_mix1.sh
Executable file
@ -0,0 +1 @@
|
||||
../../build/chat2mix --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --mixnode="enr:-Nq4QHkkKSYWg12nFHlB-Yjb53I922uquTL26R7G6Kk7Z0gNJx2N4qCt_fq0RFg-_ZDX5ju8SatMjNYyImPprl37utMEgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzigAIBMCoAWkG6mGCcnOFAAIBAACJc2VjcDI1NmsxoQOkRbVSVQYdEmDqD_Ma4J10paYhQEZkJBfXfEZ-H0noqIN0Y3CC6mGDdWRwgiMphXdha3UyLQ" --mixnode="enr:-NC4QH3HbfXxl0emm33s-6ovpu4VEA959XDSMU7rQOMfS8w6U9WB39Y25Z_ZOcgegg2SQBoGsX4kwHpNdsd-ZWzuGasBgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCSMehtpkMlApAKhPhnAEznhjKrUs2OMLHsMizXlXEMKoptdWx0aWFkZHJzgIJyc4UAAgEAAIlzZWNwMjU2azGhA3pHyDDU-7wjAyVNqnT8Tu9V3XBxoWCD9VRpp5VZJlhUg3RjcILqYoN1ZHCCIyuFd2FrdTIt" --mixnode="enr:-Nq4QMAtUyBnD7j_o7qsXLuKWn2fxvSOC0EciyK91qSQyqCib_bPxRqYbjZLNh0YOSI0t0xN23Kp46OctZtzRhw_hxsCgmlkgnY0gmlwhH8AAAGHbWl4LWtleaAnXNaInh8pykjlue24ANGpT0nxPTk6Ds8aB691NQbebIptdWx0aWFkZHJzigAIBMCoREAG6mOCcnOFAAIBAACJc2VjcDI1NmsxoQPYhmrbTqylbdenVfvO2U0w6EC4A-l5lwvu3QWL7IqkO4N0Y3CC6mODdWRwgiMthXdha3UyLQ" --mixnode="enr:-Nq4QHo7054AuNsTZde5A5GcklDZmcrumkd32BBW3UUlLh7lBuYstu8dmClolil8g3nDQBqwU_B5-iEfVS1UVxRWoVoDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaDg7VlKjVBmgb4HXo4jcjR4OI-xgkd_ekaTCaJecHb8GIptdWx0aWFkZHJzigAIBMCoREAG6mSCcnOFAAIBAACJc2VjcDI1NmsxoQOnphVC3U5zmOCkjOI2tY0v8K5QkXSaE5xO37q3iFfKGIN0Y3CC6mSDdWRwgiMvhXdha3UyLQ" --ports-shift=1
|
||||
1
simulations/mixnet/run_lp_service_node.sh
Executable file
1
simulations/mixnet/run_lp_service_node.sh
Executable file
@ -0,0 +1 @@
|
||||
../../build/wakunode2 --config-file="config.toml"
|
||||
1
simulations/mixnet/run_mix_node1.sh
Executable file
1
simulations/mixnet/run_mix_node1.sh
Executable file
@ -0,0 +1 @@
|
||||
../../build/wakunode2 --config-file="config1.toml"
|
||||
1
simulations/mixnet/run_mix_node2.sh
Executable file
1
simulations/mixnet/run_mix_node2.sh
Executable file
@ -0,0 +1 @@
|
||||
../../build/wakunode2 --config-file="config2.toml"
|
||||
1
simulations/mixnet/run_mix_node3.sh
Executable file
1
simulations/mixnet/run_mix_node3.sh
Executable file
@ -0,0 +1 @@
|
||||
../../build/wakunode2 --config-file="config3.toml"
|
||||
1
simulations/mixnet/run_mix_node4.sh
Executable file
1
simulations/mixnet/run_mix_node4.sh
Executable file
@ -0,0 +1 @@
|
||||
../../build/wakunode2 --config-file="config4.toml"
|
||||
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
||||
Subproject commit cd60b254a0700b0daac7a6cb2c0c48860b57c539
|
||||
Subproject commit 4c0d11c8b59dca7076e8e3acc4714d56a5d67c6e
|
||||
@ -149,6 +149,14 @@ task chat2, "Build example Waku chat usage":
|
||||
let name = "chat2"
|
||||
buildBinary name, "apps/chat2/", "-d:chronicles_sinks=textlines[file] -d:ssl"
|
||||
|
||||
task chat2mix, "Build example Waku chat mix usage":
|
||||
# NOTE For debugging, set debug level. For chat usage we want minimal log
|
||||
# output to STDOUT. Can be fixed by redirecting logs to file (e.g.)
|
||||
#buildBinary name, "examples/", "-d:chronicles_log_level=WARN"
|
||||
|
||||
let name = "chat2mix"
|
||||
buildBinary name, "apps/chat2mix/", "-d:chronicles_sinks=textlines[file] -d:ssl"
|
||||
|
||||
task chat2bridge, "Build chat2bridge":
|
||||
let name = "chat2bridge"
|
||||
buildBinary name, "apps/chat2bridge/"
|
||||
|
||||
@ -27,7 +27,8 @@ import
|
||||
mix/mix_protocol,
|
||||
mix/curve25519,
|
||||
mix/protocol,
|
||||
mix/mix_metrics
|
||||
mix/mix_metrics,
|
||||
mix/entry_connection
|
||||
|
||||
import
|
||||
../waku_core,
|
||||
@ -1182,17 +1183,36 @@ proc lightpushPublishHandler(
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
peer: RemotePeerInfo | PeerInfo,
|
||||
mixify: bool = false,
|
||||
): Future[lightpush_protocol.WakuLightPushResult] {.async.} =
|
||||
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
|
||||
if not node.wakuLightpushClient.isNil():
|
||||
notice "publishing message with lightpush",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
target_peer_id = peer.peerId,
|
||||
msg_hash = msgHash
|
||||
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer)
|
||||
msg_hash = msgHash,
|
||||
mixify = mixify
|
||||
if mixify:
|
||||
let conn = MixEntryConnection.newConn(
|
||||
$peer.addrs[0], #TODO: How to handle multiple addresses?
|
||||
peer.peerId,
|
||||
ProtocolType.fromString(WakuLightPushCodec),
|
||||
node.mix,
|
||||
)
|
||||
return await node.wakuLightpushClient.publishWithConn(
|
||||
pubsubTopic, message, conn, peer.peerId
|
||||
)
|
||||
else:
|
||||
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer)
|
||||
|
||||
if not node.wakuLightPush.isNil():
|
||||
if mixify:
|
||||
error "mixify is not supported with self hosted lightpush"
|
||||
return lighpushErrorResult(
|
||||
SERVICE_NOT_AVAILABLE, "Waku lightpush with mix not available"
|
||||
)
|
||||
notice "publishing message with self hosted lightpush",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
@ -1206,11 +1226,16 @@ proc lightpushPublish*(
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
message: WakuMessage,
|
||||
peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo),
|
||||
mixify: bool = false,
|
||||
): Future[lightpush_protocol.WakuLightPushResult] {.async.} =
|
||||
if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil():
|
||||
error "failed to publish message as lightpush not available"
|
||||
return lighpushErrorResult(SERVICE_NOT_AVAILABLE, "Waku lightpush not available")
|
||||
|
||||
if mixify and node.mix.isNil():
|
||||
error "failed to publish message using mix as mix protocol is not mounted"
|
||||
return lighpushErrorResult(
|
||||
SERVICE_NOT_AVAILABLE, "Waku lightpush with mix not available"
|
||||
)
|
||||
let toPeer: RemotePeerInfo = peerOpt.valueOr:
|
||||
if not node.wakuLightPush.isNil():
|
||||
RemotePeerInfo.init(node.peerId())
|
||||
@ -1233,7 +1258,7 @@ proc lightpushPublish*(
|
||||
error "lightpush publish error", error = msg
|
||||
return lighpushErrorResult(INTERNAL_SERVER_ERROR, msg)
|
||||
|
||||
return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer)
|
||||
return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer, mixify)
|
||||
|
||||
## Waku RLN Relay
|
||||
proc mountRlnRelay*(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user