mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 07:23:12 +00:00
feat: poc to integrate mix into waku and use lightpush to demonstrate
This commit is contained in:
parent
ed0b260c2d
commit
8ca9726081
4
.gitmodules
vendored
4
.gitmodules
vendored
@ -194,3 +194,7 @@
|
||||
url = https://github.com/waku-org/waku-rlnv2-contract.git
|
||||
ignore = untracked
|
||||
branch = master
|
||||
[submodule "vendor/mix"]
|
||||
path = vendor/mix
|
||||
url = https://github.com/vacp2p/mix/
|
||||
branch = mix-waku-integ
|
||||
|
||||
127
examples/lightpush_publisher_mix.nim
Normal file
127
examples/lightpush_publisher_mix.nim
Normal file
@ -0,0 +1,127 @@
|
||||
import
|
||||
std/[tables, times, sequtils],
|
||||
stew/byteutils,
|
||||
stew/shims/net,
|
||||
chronicles,
|
||||
results,
|
||||
chronos,
|
||||
confutils,
|
||||
libp2p/crypto/crypto,
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr
|
||||
|
||||
import ../vendor/mix/src/entry_connection,
|
||||
../vendor/mix/src/protocol
|
||||
|
||||
import
|
||||
waku/[
|
||||
common/logging,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
waku_core/codecs,
|
||||
waku_node,
|
||||
waku_enr,
|
||||
discovery/waku_discv5,
|
||||
factory/builder,
|
||||
waku_lightpush/client
|
||||
]
|
||||
|
||||
proc now*(): Timestamp =
|
||||
getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
# careful if running pub and sub in the same machine
|
||||
const wakuPort = 60000
|
||||
|
||||
const clusterId = 2
|
||||
const shardId = @[0'u16]
|
||||
|
||||
const
|
||||
LightpushPeer =
|
||||
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"
|
||||
LightpushPubsubTopic = PubsubTopic("/waku/2/rs/2/0")
|
||||
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-mix-example/proto")
|
||||
|
||||
proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
# use notice to filter all waku messaging
|
||||
setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
|
||||
|
||||
notice "starting publisher", wakuPort = wakuPort
|
||||
|
||||
let
|
||||
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get()
|
||||
ip = parseIpAddress("0.0.0.0")
|
||||
flags = CapabilitiesBitfield.init(relay = true)
|
||||
|
||||
let relayShards = RelayShards.init(clusterId, shardId).valueOr:
|
||||
error "Relay shards initialization failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
var enrBuilder = EnrBuilder.init(nodeKey)
|
||||
enrBuilder.withWakuRelaySharding(relayShards).expect(
|
||||
"Building ENR with relay sharding failed"
|
||||
)
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
error "failed to create enr record", error = recordRes.error
|
||||
quit(QuitFailure)
|
||||
else:
|
||||
recordRes.get()
|
||||
|
||||
var builder = WakuNodeBuilder.init()
|
||||
builder.withNodeKey(nodeKey)
|
||||
builder.withRecord(record)
|
||||
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
|
||||
let node = builder.build().tryGet()
|
||||
|
||||
node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
|
||||
node.mountLightPushClient()
|
||||
(
|
||||
await node.mountMix("401dd1eb5582f6dc9488d424aa26ed1092becefcf8543172e6d92c17ed07265a")
|
||||
).isOkOr:
|
||||
error "failed to mount waku mix protocol: ", error = $error
|
||||
return
|
||||
|
||||
let destPeerId = PeerId.init("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o").valueOr:
|
||||
error "Failed to initialize PeerId", err = error
|
||||
return
|
||||
|
||||
let conn = MixEntryConnection.newConn(
|
||||
"/ip4/127.0.0.1/tcp/60001",
|
||||
destPeerId,
|
||||
ProtocolType.fromString(WakuLightPushCodec),
|
||||
node.mix)
|
||||
|
||||
await node.start()
|
||||
node.peerManager.start()
|
||||
|
||||
notice "publisher service started"
|
||||
while true:
|
||||
let text = "hi there i'm a publisher using mix"
|
||||
let message = WakuMessage(
|
||||
payload: toBytes(text), # content of the message
|
||||
contentTopic: LightpushContentTopic, # content topic to publish to
|
||||
ephemeral: true, # tell store nodes to not store it
|
||||
timestamp: now(),
|
||||
) # current timestamp
|
||||
|
||||
let res =
|
||||
await node.wakuLightpushClient.publishWithConn(LightpushPubsubTopic, message, conn)
|
||||
|
||||
if res.isOk:
|
||||
notice "published message",
|
||||
text = text,
|
||||
timestamp = message.timestamp,
|
||||
psTopic = LightpushPubsubTopic,
|
||||
contentTopic = LightpushContentTopic
|
||||
else:
|
||||
error "failed to publish message", error = res.error
|
||||
|
||||
await sleepAsync(5000)
|
||||
break
|
||||
|
||||
when isMainModule:
|
||||
let rng = crypto.newRng()
|
||||
asyncSpawn setupAndPublish(rng)
|
||||
runForever()
|
||||
@ -132,6 +132,7 @@ task example2, "Build Waku examples":
|
||||
buildBinary "subscriber", "examples/"
|
||||
buildBinary "filter_subscriber", "examples/"
|
||||
buildBinary "lightpush_publisher", "examples/"
|
||||
buildBinary "lightpush_publisher_mix", "examples/"
|
||||
|
||||
task chat2, "Build example Waku chat usage":
|
||||
# NOTE For debugging, set debug level. For chat usage we want minimal log
|
||||
|
||||
@ -664,6 +664,16 @@ with the drawback of consuming some more bandwidth.""",
|
||||
name: "rendezvous"
|
||||
.}: bool
|
||||
|
||||
#Mix config
|
||||
mixkey* {.desc: "ED25519 private key as 64 char hex string.", name: "mixkey".}:
|
||||
Option[string]
|
||||
#TODO: Temp config for simulations.Ideally need to get this info from bootstrap ENRs
|
||||
#[ mixBootstrapNodes* {.
|
||||
desc:
|
||||
"Text-encoded data for mix bootstrap node. Encoded in the format Multiaddress:libp2pPubKey:MixPubKey. Argument may be repeated.",
|
||||
name: "mix-bootstrap-node"
|
||||
.}: seq[string] ]#
|
||||
|
||||
## websocket config
|
||||
websocketSupport* {.
|
||||
desc: "Enable websocket: true|false",
|
||||
|
||||
@ -426,6 +426,17 @@ proc setupProtocols(
|
||||
return
|
||||
err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)
|
||||
|
||||
#mount mix
|
||||
let mixPrivKey:string =
|
||||
if conf.mixkey.isSome():
|
||||
conf.mixkey.get()
|
||||
else:
|
||||
error "missing mix key"
|
||||
return err("missing mix key")
|
||||
(
|
||||
await node.mountMix(mixPrivKey)
|
||||
).isOkOr:
|
||||
return err("failed to mount waku mix protocol: " & $error)
|
||||
return ok()
|
||||
|
||||
## Start node
|
||||
|
||||
@ -9,9 +9,11 @@ import
|
||||
stew/byteutils,
|
||||
eth/keys,
|
||||
nimcrypto,
|
||||
nimcrypto/utils as ncrutils,
|
||||
bearssl/rand,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
@ -21,7 +23,13 @@ import
|
||||
libp2p/transports/transport,
|
||||
libp2p/transports/tcptransport,
|
||||
libp2p/transports/wstransport,
|
||||
libp2p/utility
|
||||
libp2p/utility,
|
||||
../../vendor/mix/src/mix_node,
|
||||
../../vendor/mix/src/mix_protocol,
|
||||
../../vendor/mix/src/curve25519,
|
||||
../../vendor/mix/src/protocol
|
||||
|
||||
|
||||
import
|
||||
../waku_core,
|
||||
../waku_core/topics/sharding,
|
||||
@ -121,6 +129,8 @@ type
|
||||
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
||||
contentTopicHandlers: Table[ContentTopic, TopicHandler]
|
||||
rateLimitSettings*: ProtocolRateLimitSettings
|
||||
mix*: MixProtocol
|
||||
mixbootNodes*: Table[PeerId, MixPubInfo]
|
||||
|
||||
proc new*(
|
||||
T: type WakuNode,
|
||||
@ -206,6 +216,65 @@ proc mountSharding*(
|
||||
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
|
||||
return ok()
|
||||
|
||||
proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] =
|
||||
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||
# MixNode Multiaddrs and PublicKeys:
|
||||
let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
|
||||
"/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF",
|
||||
"/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA",
|
||||
"/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f",
|
||||
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu",
|
||||
]
|
||||
let bootNodesMixPubKeys = ["9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c",
|
||||
"9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a",
|
||||
"275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c",
|
||||
"e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18",
|
||||
"8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
|
||||
]
|
||||
for index, mixNodeMultiaddr in bootNodesMultiaddrs:
|
||||
let peerIdRes = getPeerIdFromMultiAddr(mixNodeMultiaddr)
|
||||
if peerIdRes.isErr:
|
||||
error "Failed to get peer id from multiaddress: " , error = peerIdRes.error
|
||||
let peerId = peerIdRes.get()
|
||||
if peerID == exceptPeerID:
|
||||
continue
|
||||
let mixNodePubInfo = createMixPubInfo(mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index])))
|
||||
|
||||
mixNodes[peerId] = mixNodePubInfo
|
||||
info "using mix bootstrap nodes ", bootNodes = mixNodes
|
||||
return mixNodes
|
||||
|
||||
|
||||
# Mix Protocol
|
||||
proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]] {.async.} =
|
||||
info "mounting mix protocol", nodeId = node.info #TODO log the config used
|
||||
info "mixPrivKey", mixPrivKey = mixPrivKey
|
||||
|
||||
let mixKey = intoCurve25519Key(ncrutils.fromHex(mixPrivKey))
|
||||
let mixPubKey = public(mixKey)
|
||||
|
||||
let localaddrStr = node.announcedAddresses[0].toString().valueOr:
|
||||
return err("Failed to convert multiaddress to string.")
|
||||
info "local addr", localaddr = localaddrStr
|
||||
|
||||
let localMixNodeInfo = initMixNodeInfo(
|
||||
localaddrStr & "/p2p/" & $node.peerId, mixPubKey, mixKey, node.switch.peerInfo.publicKey.skkey,
|
||||
node.switch.peerInfo.privateKey.skkey,
|
||||
)
|
||||
|
||||
let protoRes = MixProtocol.initMix(localMixNodeInfo, node.switch, node.getBootStrapMixNodes(node.peerId))
|
||||
if protoRes.isErr:
|
||||
error "Mix protocol initialization failed", err = protoRes.error
|
||||
return
|
||||
node.mix = protoRes.value
|
||||
|
||||
let catchRes = catch:
|
||||
node.switch.mount(node.mix)
|
||||
if catchRes.isErr():
|
||||
return err(catchRes.error.msg)
|
||||
|
||||
return ok()
|
||||
|
||||
## Waku Sync
|
||||
|
||||
proc mountStoreSync*(
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
|
||||
import libp2p/peerid
|
||||
import libp2p/peerid, libp2p/stream/connection
|
||||
import
|
||||
../waku_core/peers,
|
||||
../node/peer_manager,
|
||||
@ -110,3 +110,21 @@ proc publishToAny*(
|
||||
obs.onMessagePublished(pubSubTopic, message)
|
||||
|
||||
return lightpushSuccessResult(publishedCount)
|
||||
|
||||
|
||||
proc publishWithConn*(
|
||||
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage, conn: Connection
|
||||
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
## This proc is similar to the publish one but in this case
|
||||
## we use existing connection to publish.
|
||||
|
||||
info "publishWithConn", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
|
||||
|
||||
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
|
||||
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(pushRequest))
|
||||
await conn.writeLP(rpc.encode().buffer)
|
||||
|
||||
for obs in wl.publishObservers:
|
||||
obs.onMessagePublished(pubSubTopic, message)
|
||||
|
||||
return lightpushSuccessResult(1)
|
||||
Loading…
x
Reference in New Issue
Block a user