mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-05-02 08:33:10 +00:00
feat: poc to integrate mix into waku and use lightpush to demonstrate
This commit is contained in:
parent
26c2b96cfe
commit
8ea2d27685
4
.gitmodules
vendored
4
.gitmodules
vendored
@ -184,3 +184,7 @@
|
|||||||
url = https://github.com/waku-org/waku-rlnv2-contract.git
|
url = https://github.com/waku-org/waku-rlnv2-contract.git
|
||||||
ignore = untracked
|
ignore = untracked
|
||||||
branch = master
|
branch = master
|
||||||
|
[submodule "vendor/mix"]
|
||||||
|
path = vendor/mix
|
||||||
|
url = https://github.com/vacp2p/mix/
|
||||||
|
branch = mix-waku-integ
|
||||||
|
|||||||
127
examples/lightpush_publisher_mix.nim
Normal file
127
examples/lightpush_publisher_mix.nim
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
import
|
||||||
|
std/[tables, times, sequtils],
|
||||||
|
stew/byteutils,
|
||||||
|
stew/shims/net,
|
||||||
|
chronicles,
|
||||||
|
results,
|
||||||
|
chronos,
|
||||||
|
confutils,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
|
eth/keys,
|
||||||
|
eth/p2p/discoveryv5/enr
|
||||||
|
|
||||||
|
import ../vendor/mix/src/entry_connection,
|
||||||
|
../vendor/mix/src/protocol
|
||||||
|
|
||||||
|
import
|
||||||
|
waku/[
|
||||||
|
common/logging,
|
||||||
|
node/peer_manager,
|
||||||
|
waku_core,
|
||||||
|
waku_core/codecs,
|
||||||
|
waku_node,
|
||||||
|
waku_enr,
|
||||||
|
discovery/waku_discv5,
|
||||||
|
factory/builder,
|
||||||
|
waku_lightpush/client
|
||||||
|
]
|
||||||
|
|
||||||
|
proc now*(): Timestamp =
|
||||||
|
getNanosecondTime(getTime().toUnixFloat())
|
||||||
|
|
||||||
|
# careful if running pub and sub in the same machine
|
||||||
|
const wakuPort = 60000
|
||||||
|
|
||||||
|
const clusterId = 2
|
||||||
|
const shardId = @[0'u16]
|
||||||
|
|
||||||
|
const
|
||||||
|
LightpushPeer =
|
||||||
|
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"
|
||||||
|
LightpushPubsubTopic = PubsubTopic("/waku/2/rs/2/0")
|
||||||
|
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-mix-example/proto")
|
||||||
|
|
||||||
|
proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||||
|
# use notice to filter all waku messaging
|
||||||
|
setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
|
||||||
|
|
||||||
|
notice "starting publisher", wakuPort = wakuPort
|
||||||
|
|
||||||
|
let
|
||||||
|
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get()
|
||||||
|
ip = parseIpAddress("0.0.0.0")
|
||||||
|
flags = CapabilitiesBitfield.init(relay = true)
|
||||||
|
|
||||||
|
let relayShards = RelayShards.init(clusterId, shardId).valueOr:
|
||||||
|
error "Relay shards initialization failed", error = error
|
||||||
|
quit(QuitFailure)
|
||||||
|
|
||||||
|
var enrBuilder = EnrBuilder.init(nodeKey)
|
||||||
|
enrBuilder.withWakuRelaySharding(relayShards).expect(
|
||||||
|
"Building ENR with relay sharding failed"
|
||||||
|
)
|
||||||
|
|
||||||
|
let recordRes = enrBuilder.build()
|
||||||
|
let record =
|
||||||
|
if recordRes.isErr():
|
||||||
|
error "failed to create enr record", error = recordRes.error
|
||||||
|
quit(QuitFailure)
|
||||||
|
else:
|
||||||
|
recordRes.get()
|
||||||
|
|
||||||
|
var builder = WakuNodeBuilder.init()
|
||||||
|
builder.withNodeKey(nodeKey)
|
||||||
|
builder.withRecord(record)
|
||||||
|
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
|
||||||
|
let node = builder.build().tryGet()
|
||||||
|
|
||||||
|
node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
|
||||||
|
node.mountLightPushClient()
|
||||||
|
(
|
||||||
|
await node.mountMix("401dd1eb5582f6dc9488d424aa26ed1092becefcf8543172e6d92c17ed07265a")
|
||||||
|
).isOkOr:
|
||||||
|
error "failed to mount waku mix protocol: ", error = $error
|
||||||
|
return
|
||||||
|
|
||||||
|
let destPeerId = PeerId.init("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o").valueOr:
|
||||||
|
error "Failed to initialize PeerId", err = error
|
||||||
|
return
|
||||||
|
|
||||||
|
let conn = MixEntryConnection.newConn(
|
||||||
|
"/ip4/127.0.0.1/tcp/60001",
|
||||||
|
destPeerId,
|
||||||
|
ProtocolType.fromString(WakuLightPushCodec),
|
||||||
|
node.mix)
|
||||||
|
|
||||||
|
await node.start()
|
||||||
|
node.peerManager.start()
|
||||||
|
|
||||||
|
notice "publisher service started"
|
||||||
|
while true:
|
||||||
|
let text = "hi there i'm a publisher using mix"
|
||||||
|
let message = WakuMessage(
|
||||||
|
payload: toBytes(text), # content of the message
|
||||||
|
contentTopic: LightpushContentTopic, # content topic to publish to
|
||||||
|
ephemeral: true, # tell store nodes to not store it
|
||||||
|
timestamp: now(),
|
||||||
|
) # current timestamp
|
||||||
|
|
||||||
|
let res =
|
||||||
|
await node.wakuLightpushClient.publishWithConn(LightpushPubsubTopic, message, conn)
|
||||||
|
|
||||||
|
if res.isOk:
|
||||||
|
notice "published message",
|
||||||
|
text = text,
|
||||||
|
timestamp = message.timestamp,
|
||||||
|
psTopic = LightpushPubsubTopic,
|
||||||
|
contentTopic = LightpushContentTopic
|
||||||
|
else:
|
||||||
|
error "failed to publish message", error = res.error
|
||||||
|
|
||||||
|
await sleepAsync(5000)
|
||||||
|
break
|
||||||
|
|
||||||
|
when isMainModule:
|
||||||
|
let rng = crypto.newRng()
|
||||||
|
asyncSpawn setupAndPublish(rng)
|
||||||
|
runForever()
|
||||||
@ -140,6 +140,7 @@ task example2, "Build Waku examples":
|
|||||||
buildBinary "subscriber", "examples/"
|
buildBinary "subscriber", "examples/"
|
||||||
buildBinary "filter_subscriber", "examples/"
|
buildBinary "filter_subscriber", "examples/"
|
||||||
buildBinary "lightpush_publisher", "examples/"
|
buildBinary "lightpush_publisher", "examples/"
|
||||||
|
buildBinary "lightpush_publisher_mix", "examples/"
|
||||||
|
|
||||||
task chat2, "Build example Waku chat usage":
|
task chat2, "Build example Waku chat usage":
|
||||||
# NOTE For debugging, set debug level. For chat usage we want minimal log
|
# NOTE For debugging, set debug level. For chat usage we want minimal log
|
||||||
|
|||||||
@ -631,6 +631,16 @@ with the drawback of consuming some more bandwidth.""",
|
|||||||
name: "rendezvous"
|
name: "rendezvous"
|
||||||
.}: bool
|
.}: bool
|
||||||
|
|
||||||
|
#Mix config
|
||||||
|
mixkey* {.desc: "ED25519 private key as 64 char hex string.", name: "mixkey".}:
|
||||||
|
Option[string]
|
||||||
|
#TODO: Temp config for simulations.Ideally need to get this info from bootstrap ENRs
|
||||||
|
#[ mixBootstrapNodes* {.
|
||||||
|
desc:
|
||||||
|
"Text-encoded data for mix bootstrap node. Encoded in the format Multiaddress:libp2pPubKey:MixPubKey. Argument may be repeated.",
|
||||||
|
name: "mix-bootstrap-node"
|
||||||
|
.}: seq[string] ]#
|
||||||
|
|
||||||
## websocket config
|
## websocket config
|
||||||
websocketSupport* {.
|
websocketSupport* {.
|
||||||
desc: "Enable websocket: true|false",
|
desc: "Enable websocket: true|false",
|
||||||
|
|||||||
@ -415,6 +415,17 @@ proc setupProtocols(
|
|||||||
return
|
return
|
||||||
err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)
|
err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)
|
||||||
|
|
||||||
|
#mount mix
|
||||||
|
let mixPrivKey:string =
|
||||||
|
if conf.mixkey.isSome():
|
||||||
|
conf.mixkey.get()
|
||||||
|
else:
|
||||||
|
error "missing mix key"
|
||||||
|
return err("missing mix key")
|
||||||
|
(
|
||||||
|
await node.mountMix(mixPrivKey)
|
||||||
|
).isOkOr:
|
||||||
|
return err("failed to mount waku mix protocol: " & $error)
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
## Start node
|
## Start node
|
||||||
|
|||||||
@ -9,9 +9,11 @@ import
|
|||||||
stew/byteutils,
|
stew/byteutils,
|
||||||
eth/keys,
|
eth/keys,
|
||||||
nimcrypto,
|
nimcrypto,
|
||||||
|
nimcrypto/utils as ncrutils,
|
||||||
bearssl/rand,
|
bearssl/rand,
|
||||||
eth/p2p/discoveryv5/enr,
|
eth/p2p/discoveryv5/enr,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
|
libp2p/crypto/curve25519,
|
||||||
libp2p/protocols/ping,
|
libp2p/protocols/ping,
|
||||||
libp2p/protocols/pubsub/gossipsub,
|
libp2p/protocols/pubsub/gossipsub,
|
||||||
libp2p/protocols/pubsub/rpc/messages,
|
libp2p/protocols/pubsub/rpc/messages,
|
||||||
@ -19,7 +21,13 @@ import
|
|||||||
libp2p/transports/transport,
|
libp2p/transports/transport,
|
||||||
libp2p/transports/tcptransport,
|
libp2p/transports/tcptransport,
|
||||||
libp2p/transports/wstransport,
|
libp2p/transports/wstransport,
|
||||||
libp2p/utility
|
libp2p/utility,
|
||||||
|
../../vendor/mix/src/mix_node,
|
||||||
|
../../vendor/mix/src/mix_protocol,
|
||||||
|
../../vendor/mix/src/curve25519,
|
||||||
|
../../vendor/mix/src/protocol
|
||||||
|
|
||||||
|
|
||||||
import
|
import
|
||||||
../waku_core,
|
../waku_core,
|
||||||
../waku_core/topics/sharding,
|
../waku_core/topics/sharding,
|
||||||
@ -117,6 +125,8 @@ type
|
|||||||
started*: bool # Indicates that node has started listening
|
started*: bool # Indicates that node has started listening
|
||||||
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
||||||
rateLimitSettings*: ProtocolRateLimitSettings
|
rateLimitSettings*: ProtocolRateLimitSettings
|
||||||
|
mix*: MixProtocol
|
||||||
|
mixbootNodes*: Table[PeerId, MixPubInfo]
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: type WakuNode,
|
T: type WakuNode,
|
||||||
@ -202,6 +212,65 @@ proc mountSharding*(
|
|||||||
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
|
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
|
proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] =
|
||||||
|
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||||
|
# MixNode Multiaddrs and PublicKeys:
|
||||||
|
let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
|
||||||
|
"/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF",
|
||||||
|
"/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA",
|
||||||
|
"/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f",
|
||||||
|
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu",
|
||||||
|
]
|
||||||
|
let bootNodesMixPubKeys = ["9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c",
|
||||||
|
"9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a",
|
||||||
|
"275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c",
|
||||||
|
"e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18",
|
||||||
|
"8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
|
||||||
|
]
|
||||||
|
for index, mixNodeMultiaddr in bootNodesMultiaddrs:
|
||||||
|
let peerIdRes = getPeerIdFromMultiAddr(mixNodeMultiaddr)
|
||||||
|
if peerIdRes.isErr:
|
||||||
|
error "Failed to get peer id from multiaddress: " , error = peerIdRes.error
|
||||||
|
let peerId = peerIdRes.get()
|
||||||
|
if peerID == exceptPeerID:
|
||||||
|
continue
|
||||||
|
let mixNodePubInfo = createMixPubInfo(mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index])))
|
||||||
|
|
||||||
|
mixNodes[peerId] = mixNodePubInfo
|
||||||
|
info "using mix bootstrap nodes ", bootNodes = mixNodes
|
||||||
|
return mixNodes
|
||||||
|
|
||||||
|
|
||||||
|
# Mix Protocol
|
||||||
|
proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]] {.async.} =
|
||||||
|
info "mounting mix protocol", nodeId = node.info #TODO log the config used
|
||||||
|
info "mixPrivKey", mixPrivKey = mixPrivKey
|
||||||
|
|
||||||
|
let mixKey = intoCurve25519Key(ncrutils.fromHex(mixPrivKey))
|
||||||
|
let mixPubKey = public(mixKey)
|
||||||
|
|
||||||
|
let localaddrStr = node.announcedAddresses[0].toString().valueOr:
|
||||||
|
return err("Failed to convert multiaddress to string.")
|
||||||
|
info "local addr", localaddr = localaddrStr
|
||||||
|
|
||||||
|
let localMixNodeInfo = initMixNodeInfo(
|
||||||
|
localaddrStr & "/p2p/" & $node.peerId, mixPubKey, mixKey, node.switch.peerInfo.publicKey.skkey,
|
||||||
|
node.switch.peerInfo.privateKey.skkey,
|
||||||
|
)
|
||||||
|
|
||||||
|
let protoRes = MixProtocol.initMix(localMixNodeInfo, node.switch, node.getBootStrapMixNodes(node.peerId))
|
||||||
|
if protoRes.isErr:
|
||||||
|
error "Mix protocol initialization failed", err = protoRes.error
|
||||||
|
return
|
||||||
|
node.mix = protoRes.value
|
||||||
|
|
||||||
|
let catchRes = catch:
|
||||||
|
node.switch.mount(node.mix)
|
||||||
|
if catchRes.isErr():
|
||||||
|
return err(catchRes.error.msg)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
## Waku Sync
|
## Waku Sync
|
||||||
|
|
||||||
proc mountStoreSync*(
|
proc mountStoreSync*(
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
|
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
|
||||||
import libp2p/peerid
|
import libp2p/peerid, libp2p/stream/connection
|
||||||
import
|
import
|
||||||
../waku_core/peers,
|
../waku_core/peers,
|
||||||
../node/peer_manager,
|
../node/peer_manager,
|
||||||
@ -118,3 +118,21 @@ proc publishToAny*(
|
|||||||
obs.onMessagePublished(pubSubTopic, message)
|
obs.onMessagePublished(pubSubTopic, message)
|
||||||
|
|
||||||
return lightpushSuccessResult(publishedCount)
|
return lightpushSuccessResult(publishedCount)
|
||||||
|
|
||||||
|
|
||||||
|
proc publishWithConn*(
|
||||||
|
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage, conn: Connection
|
||||||
|
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||||
|
## This proc is similar to the publish one but in this case
|
||||||
|
## we use existing connection to publish.
|
||||||
|
|
||||||
|
info "publishWithConn", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
|
||||||
|
|
||||||
|
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
|
||||||
|
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(pushRequest))
|
||||||
|
await conn.writeLP(rpc.encode().buffer)
|
||||||
|
|
||||||
|
for obs in wl.publishObservers:
|
||||||
|
obs.onMessagePublished(pubSubTopic, message)
|
||||||
|
|
||||||
|
return lightpushSuccessResult(1)
|
||||||
Loading…
x
Reference in New Issue
Block a user