logos-messaging-nim/examples/lightpush_publisher_mix.nim

163 lines
4.4 KiB
Nim
Raw Normal View History

import
std/[tables, times, sequtils],
stew/byteutils,
stew/shims/net,
chronicles,
results,
chronos,
confutils,
libp2p/crypto/crypto,
2025-03-26 10:43:00 +05:30
libp2p/crypto/curve25519,
libp2p/multiaddress,
eth/keys,
2025-03-29 17:01:19 +05:30
eth/p2p/discoveryv5/enr,
metrics
2025-03-29 19:35:56 +05:30
import mix/entry_connection, mix/protocol, mix/curve25519
import
waku/[
common/logging,
node/peer_manager,
waku_core,
waku_core/codecs,
waku_node,
waku_enr,
discovery/waku_discv5,
factory/builder,
2025-03-29 05:56:45 +05:30
waku_lightpush/client
],
2025-03-29 17:01:19 +05:30
./lightpush_publisher_mix_config,
./lightpush_publisher_mix_metrics
2025-03-29 05:56:45 +05:30
proc now*(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
2025-03-29 05:56:45 +05:30
const clusterId = 66
const shardId = @[0'u16]
const
2025-03-29 05:56:45 +05:30
LightpushPubsubTopic = PubsubTopic("/waku/2/rs/66/0")
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-mix-example/proto")
2025-03-29 05:56:45 +05:30
proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
# use notice to filter all waku messaging
setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
2025-03-29 05:56:45 +05:30
notice "starting publisher", wakuPort = conf.port
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get()
ip = parseIpAddress("0.0.0.0")
flags = CapabilitiesBitfield.init(relay = true)
let relayShards = RelayShards.init(clusterId, shardId).valueOr:
error "Relay shards initialization failed", error = error
quit(QuitFailure)
var enrBuilder = EnrBuilder.init(nodeKey)
enrBuilder.withWakuRelaySharding(relayShards).expect(
"Building ENR with relay sharding failed"
)
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
error "failed to create enr record", error = recordRes.error
quit(QuitFailure)
else:
recordRes.get()
var builder = WakuNodeBuilder.init()
builder.withNodeKey(nodeKey)
builder.withRecord(record)
2025-03-29 05:56:45 +05:30
builder.withNetworkConfigurationDetails(ip, Port(conf.port)).tryGet()
let node = builder.build().tryGet()
node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
node.mountLightPushClient()
try:
2025-03-28 15:50:09 +01:00
await node.mountPeerExchange(some(uint16(clusterId)))
except CatchableError:
2025-03-28 15:50:09 +01:00
error "failed to mount waku peer-exchange protocol: ",
errmsg = getCurrentExceptionMsg()
return
let pxPeerInfo = RemotePeerInfo.init(
2025-03-29 05:56:45 +05:30
conf.destPeerId,
@[MultiAddress.init(conf.destPeerAddr).get()],
)
node.peerManager.addServicePeer(pxPeerInfo, WakuPeerExchangeCodec)
2025-03-29 19:35:56 +05:30
let keyPairResult = generateKeyPair()
if keyPairResult.isErr:
return
let (mixPrivKey, mixPubKey) = keyPairResult.get()
2025-03-29 17:01:19 +05:30
(
2025-03-29 19:35:56 +05:30
await node.mountMix(mixPrivKey)
).isOkOr:
error "failed to mount waku mix protocol: ", error = $error
2025-03-28 15:50:09 +01:00
return
2025-03-29 05:56:45 +05:30
let destPeerId = PeerId.init(conf.destPeerId).valueOr:
error "Failed to initialize PeerId", err = error
return
let conn = MixEntryConnection.newConn(
2025-03-29 05:56:45 +05:30
conf.destPeerAddr,
destPeerId,
ProtocolType.fromString(WakuLightPushCodec),
2025-03-28 15:50:09 +01:00
node.mix,
)
await node.start()
node.peerManager.start()
node.startPeerExchangeLoop()
(await node.fetchPeerExchangePeers()).isOkOr:
warn "Cannot fetch peers from peer exchange", cause = error
2025-03-29 17:01:19 +05:30
while node.getMixNodePoolSize() < conf.minMixPoolSize:
2025-03-28 15:50:09 +01:00
info "waiting for mix nodes to be discovered",
currentpoolSize = node.getMixNodePoolSize()
await sleepAsync(1000)
2025-03-29 17:01:19 +05:30
notice "publisher service started with mix node pool size ", currentpoolSize = node.getMixNodePoolSize()
2025-03-13 16:05:51 +05:30
var i = 0
2025-03-29 17:01:19 +05:30
while i < conf.numMsgs:
2025-03-13 16:05:51 +05:30
i = i + 1
let text = "hi there i'm a publisher using mix, this is msg number " & $i
let message = WakuMessage(
payload: toBytes(text), # content of the message
contentTopic: LightpushContentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: now(),
) # current timestamp
2025-03-28 15:50:09 +01:00
let res = await node.wakuLightpushClient.publishWithConn(
LightpushPubsubTopic, message, conn
)
if res.isOk:
2025-03-29 17:01:19 +05:30
lp_mix_success.inc()
notice "published message",
text = text,
timestamp = message.timestamp,
psTopic = LightpushPubsubTopic,
contentTopic = LightpushContentTopic
else:
error "failed to publish message", error = res.error
2025-03-29 17:01:19 +05:30
lp_mix_failed.inc(labelValues = ["publish_error"])
await sleepAsync(1000)
2025-03-29 19:35:56 +05:30
info "###########Sent all messages via mix"
quit(0)
2025-03-13 16:05:51 +05:30
when isMainModule:
2025-03-29 05:56:45 +05:30
let conf = LPMixConf.load()
let rng = crypto.newRng()
2025-03-29 05:56:45 +05:30
asyncSpawn setupAndPublish(rng, conf)
runForever()