mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-25 09:13:09 +00:00
update lightpush-mix example
This commit is contained in:
parent
dd713154b9
commit
1959f15625
@ -82,6 +82,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
).isOkOr:
|
||||
error "failed to mount waku mix protocol: ", error = $error
|
||||
return
|
||||
discard node.setMixBootStrapNodes()
|
||||
|
||||
let destPeerId = PeerId.init("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o").valueOr:
|
||||
error "Failed to initialize PeerId", err = error
|
||||
@ -97,8 +98,11 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
node.peerManager.start()
|
||||
|
||||
notice "publisher service started"
|
||||
while true:
|
||||
let text = "hi there i'm a publisher using mix"
|
||||
var numMsgs = 2
|
||||
var i = 0
|
||||
while i < numMsgs:
|
||||
i = i + 1
|
||||
let text = "hi there i'm a publisher using mix, this is msg number " & $i
|
||||
let message = WakuMessage(
|
||||
payload: toBytes(text), # content of the message
|
||||
contentTopic: LightpushContentTopic, # content topic to publish to
|
||||
@ -118,8 +122,8 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
else:
|
||||
error "failed to publish message", error = res.error
|
||||
|
||||
await sleepAsync(5000)
|
||||
break
|
||||
await sleepAsync(500)
|
||||
|
||||
|
||||
when isMainModule:
|
||||
let rng = crypto.newRng()
|
||||
|
||||
@ -215,39 +215,42 @@ proc mountSharding*(
|
||||
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
|
||||
return ok()
|
||||
|
||||
#[ proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] =
|
||||
proc getBootStrapMixNodes*(node: WakuNode): Table[PeerId, MixPubInfo] =
|
||||
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||
# MixNode Multiaddrs and PublicKeys:
|
||||
let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
|
||||
"/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF",
|
||||
"/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA",
|
||||
"/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f",
|
||||
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu",
|
||||
]
|
||||
let bootNodesMixPubKeys = ["9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c",
|
||||
"9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a",
|
||||
"275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c",
|
||||
"e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18",
|
||||
"8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
|
||||
]
|
||||
let bootNodesMultiaddrs = [
|
||||
"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
|
||||
"/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF",
|
||||
"/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA",
|
||||
"/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f",
|
||||
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu",
|
||||
]
|
||||
let bootNodesMixPubKeys = [
|
||||
"9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c",
|
||||
"9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a",
|
||||
"275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c",
|
||||
"e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18",
|
||||
"8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f",
|
||||
]
|
||||
for index, mixNodeMultiaddr in bootNodesMultiaddrs:
|
||||
let peerIdRes = getPeerIdFromMultiAddr(mixNodeMultiaddr)
|
||||
if peerIdRes.isErr:
|
||||
error "Failed to get peer id from multiaddress: " , error = peerIdRes.error
|
||||
error "Failed to get peer id from multiaddress: ", error = peerIdRes.error
|
||||
let peerId = peerIdRes.get()
|
||||
if peerID == exceptPeerID:
|
||||
continue
|
||||
let mixNodePubInfo = createMixPubInfo(mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index])))
|
||||
#if (not peerID == nil) and peerID == exceptPeerID:
|
||||
# continue
|
||||
let mixNodePubInfo = createMixPubInfo(
|
||||
mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index]))
|
||||
)
|
||||
|
||||
mixNodes[peerId] = mixNodePubInfo
|
||||
info "using mix bootstrap nodes ", bootNodes = mixNodes
|
||||
return mixNodes
|
||||
]#
|
||||
|
||||
#TODO: Ideally these procs should be moved out into mix specific file, but keeping it here for now.
|
||||
proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
||||
if peer.enr.isNone():
|
||||
debug "peer has no ENR", peer = $peer
|
||||
trace "peer has no ENR", peer = $peer
|
||||
return false
|
||||
|
||||
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
|
||||
@ -293,7 +296,7 @@ proc populateMixNodePool*(node: WakuNode) {.async.} =
|
||||
# TODO: use the most exposed/external multiaddr of the peer, right now using the first
|
||||
let maddrWithPeerId =
|
||||
toString(addPeerId(remotePeers[i].addrs[0], remotePeers[i].peerId))
|
||||
debug "remote peer ENR",
|
||||
trace "remote peer ENR",
|
||||
peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId
|
||||
|
||||
let peerMixPubKey = mixKey(remotePeerENR).get()
|
||||
@ -303,17 +306,19 @@ proc populateMixNodePool*(node: WakuNode) {.async.} =
|
||||
|
||||
# set the mix node pool
|
||||
node.mix.setNodePool(mixNodes)
|
||||
debug "mix node pool updated", poolSize = node.mix.getNodePoolSize()
|
||||
trace "mix node pool updated", poolSize = node.mix.getNodePoolSize()
|
||||
return
|
||||
|
||||
proc startMixNodePoolMgr*(node: WakuNode) {.async.} =
|
||||
# try more aggressively to populate the pool at startup
|
||||
var attempts = 50
|
||||
# TODO: make initial pool size configurable
|
||||
while node.mix.getNodePoolSize() < 100 and attempts > 0:
|
||||
attempts -= 1
|
||||
discard node.populateMixNodePool()
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
# TODO: make interval configurable
|
||||
heartbeat "Updating mix node pool", 10.minutes:
|
||||
discard node.populateMixNodePool()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user