mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 15:33:08 +00:00
Fix mix imports
This commit is contained in:
parent
09f109ebcb
commit
a3b1435854
@ -12,8 +12,7 @@ import
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr
|
||||
|
||||
import entry_connection,
|
||||
app_protocols
|
||||
import mix/entry_connection, mix/protocol
|
||||
|
||||
import
|
||||
waku/[
|
||||
@ -25,7 +24,7 @@ import
|
||||
waku_enr,
|
||||
discovery/waku_discv5,
|
||||
factory/builder,
|
||||
waku_lightpush/client
|
||||
waku_lightpush/client,
|
||||
]
|
||||
|
||||
proc now*(): Timestamp =
|
||||
@ -81,28 +80,34 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
|
||||
node.mountLightPushClient()
|
||||
try:
|
||||
await node.mountPeerExchange(
|
||||
some(uint16(clusterId))
|
||||
)
|
||||
await node.mountPeerExchange(some(uint16(clusterId)))
|
||||
except CatchableError:
|
||||
error "failed to mount waku peer-exchange protocol: ", errmsg = getCurrentExceptionMsg()
|
||||
error "failed to mount waku peer-exchange protocol: ",
|
||||
errmsg = getCurrentExceptionMsg()
|
||||
return
|
||||
|
||||
let pxPeerInfo = RemotePeerInfo.init(
|
||||
"16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
|
||||
@[MultiAddress.init("/ip4/127.0.0.1/tcp/60001").get()]
|
||||
@[MultiAddress.init("/ip4/127.0.0.1/tcp/60001").get()],
|
||||
)
|
||||
node.peerManager.addServicePeer(pxPeerInfo, WakuPeerExchangeCodec)
|
||||
let pxPeerInfo2 = RemotePeerInfo.init(
|
||||
"16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu",
|
||||
@[MultiAddress.init("/ip4/127.0.0.1/tcp/60005").get()]
|
||||
@[MultiAddress.init("/ip4/127.0.0.1/tcp/60005").get()],
|
||||
)
|
||||
node.peerManager.addServicePeer(pxPeerInfo2, WakuPeerExchangeCodec)
|
||||
|
||||
(
|
||||
await node.mountMix(intoCurve25519Key(ncrutils.fromHex("401dd1eb5582f6dc9488d424aa26ed1092becefcf8543172e6d92c17ed07265a")))
|
||||
await node.mountMix(
|
||||
intoCurve25519Key(
|
||||
ncrutils.fromHex(
|
||||
"401dd1eb5582f6dc9488d424aa26ed1092becefcf8543172e6d92c17ed07265a"
|
||||
)
|
||||
)
|
||||
)
|
||||
).isOkOr:
|
||||
error "failed to mount waku mix protocol: ", error = $error
|
||||
return
|
||||
return
|
||||
#discard node.setMixBootStrapNodes()
|
||||
|
||||
let destPeerId = PeerId.init("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o").valueOr:
|
||||
@ -113,7 +118,8 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
"/ip4/127.0.0.1/tcp/60001",
|
||||
destPeerId,
|
||||
ProtocolType.fromString(WakuLightPushCodec),
|
||||
node.mix)
|
||||
node.mix,
|
||||
)
|
||||
|
||||
await node.start()
|
||||
node.peerManager.start()
|
||||
@ -123,7 +129,8 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
warn "Cannot fetch peers from peer exchange", cause = error
|
||||
|
||||
while node.getMixNodePoolSize() < 3:
|
||||
info "waiting for mix nodes to be discovered", currentpoolSize = node.getMixNodePoolSize()
|
||||
info "waiting for mix nodes to be discovered",
|
||||
currentpoolSize = node.getMixNodePoolSize()
|
||||
await sleepAsync(1000)
|
||||
|
||||
notice "publisher service started"
|
||||
@ -139,8 +146,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
timestamp: now(),
|
||||
) # current timestamp
|
||||
|
||||
let res =
|
||||
await node.wakuLightpushClient.publishWithConn(LightpushPubsubTopic, message, conn)
|
||||
let res = await node.wakuLightpushClient.publishWithConn(
|
||||
LightpushPubsubTopic, message, conn
|
||||
)
|
||||
|
||||
if res.isOk:
|
||||
notice "published message",
|
||||
@ -153,7 +161,6 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
|
||||
await sleepAsync(1000)
|
||||
|
||||
|
||||
when isMainModule:
|
||||
let rng = crypto.newRng()
|
||||
asyncSpawn setupAndPublish(rng)
|
||||
|
||||
2
vendor/mix
vendored
2
vendor/mix
vendored
@ -1 +1 @@
|
||||
Subproject commit 6b4787e5899a839deca84c11c6bcf7000353cb9b
|
||||
Subproject commit 5f9633df51e557ff99f030bc0ad9b3fca29566ab
|
||||
@ -39,7 +39,7 @@ import
|
||||
../common/utils/parse_size_units,
|
||||
../common/rate_limit/setting,
|
||||
../common/databases/dburl,
|
||||
curve25519_utils
|
||||
mix/curve25519_utils
|
||||
|
||||
## Peer persistence
|
||||
|
||||
|
||||
@ -25,11 +25,10 @@ import
|
||||
libp2p/transports/tcptransport,
|
||||
libp2p/transports/wstransport,
|
||||
libp2p/utility,
|
||||
mix_node,
|
||||
mix_protocol,
|
||||
curve25519_utils,
|
||||
app_protocols
|
||||
|
||||
mix/mix_node,
|
||||
mix/mix_protocol,
|
||||
mix/curve25519_utils,
|
||||
mix/app_protocols
|
||||
|
||||
import
|
||||
../waku_core,
|
||||
@ -250,7 +249,7 @@ proc mountSharding*(
|
||||
|
||||
#TODO: Ideally these procs should be moved out into mix specific file, but keeping it here for now.
|
||||
proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
||||
# Note that origin based(discv5) filtering is not done intentionally
|
||||
# Note that origin based(discv5) filtering is not done intentionally
|
||||
# so that more mix nodes can be discovered.
|
||||
if peer.enr.isNone():
|
||||
trace "peer has no ENR", peer = $peer
|
||||
@ -267,7 +266,7 @@ proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
||||
|
||||
return true
|
||||
|
||||
proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId:PeerId): MultiAddress =
|
||||
proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress =
|
||||
if multiaddr.contains(multiCodec("p2p")).get():
|
||||
return multiaddr
|
||||
|
||||
@ -280,7 +279,7 @@ proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId:PeerId): MultiAddr
|
||||
return multiaddr
|
||||
return cleanAddr
|
||||
|
||||
proc populateMixNodePool*(node: WakuNode){.async} =
|
||||
proc populateMixNodePool*(node: WakuNode) {.async.} =
|
||||
var cluster: uint16
|
||||
let enrRes = node.enr.toTyped()
|
||||
if enrRes.isOk():
|
||||
@ -293,18 +292,21 @@ proc populateMixNodePool*(node: WakuNode){.async} =
|
||||
|
||||
# populate only peers that i) are reachable ii) share cluster iii) support mix
|
||||
let remotePeers = node.peerManager.wakuPeerStore.getReachablePeers().filterIt(
|
||||
mixPoolFilter(some(cluster), it)
|
||||
mixPoolFilter(some(cluster), it)
|
||||
)
|
||||
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||
|
||||
for i in 0 ..< min(remotePeers.len, 100):
|
||||
let remotePeerENR = remotePeers[i].enr.get()
|
||||
# TODO: use the most exposed/external multiaddr of the peer, right now using the first
|
||||
let maddrWithPeerId = toString(appendPeerIdToMultiaddr(remotePeers[i].addrs[0],remotePeers[i].peerId))
|
||||
trace "remote peer ENR", peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId
|
||||
let maddrWithPeerId =
|
||||
toString(appendPeerIdToMultiaddr(remotePeers[i].addrs[0], remotePeers[i].peerId))
|
||||
trace "remote peer ENR",
|
||||
peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId
|
||||
|
||||
let peerMixPubKey = mixKey(remotePeerENR).get()
|
||||
let mixNodePubInfo = createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey))
|
||||
let mixNodePubInfo =
|
||||
createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey))
|
||||
mixNodes[remotePeers[i].peerId] = mixNodePubInfo
|
||||
|
||||
# set the mix node pool
|
||||
@ -312,8 +314,7 @@ proc populateMixNodePool*(node: WakuNode){.async} =
|
||||
trace "mix node pool updated", poolSize = node.mix.getNodePoolSize()
|
||||
return
|
||||
|
||||
|
||||
proc startMixNodePoolMgr*(node: WakuNode ){.async} =
|
||||
proc startMixNodePoolMgr*(node: WakuNode) {.async.} =
|
||||
info "starting mix node pool manager"
|
||||
# try more aggressively to populate the pool at startup
|
||||
var attempts = 50
|
||||
@ -333,10 +334,12 @@ proc getMixNodePoolSize*(node: WakuNode): int =
|
||||
#[ proc setMixBootStrapNodes*(node: WakuNode,){.async}=
|
||||
node.mix.setNodePool(node.getBootStrapMixNodes())
|
||||
]#
|
||||
# Mix Protocol
|
||||
proc mountMix*(node: WakuNode, mixPrivKey: Curve25519Key): Future[Result[void, string]] {.async.} =
|
||||
# Mix Protocol
|
||||
proc mountMix*(
|
||||
node: WakuNode, mixPrivKey: Curve25519Key
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
info "mounting mix protocol", nodeId = node.info #TODO log the config used
|
||||
let mixPubKey = public(mixPrivKey)
|
||||
let mixPubKey = public(mixPrivKey)
|
||||
|
||||
info "mixPrivKey", mixPrivKey = mixPrivKey, mixPubKey = mixPubKey
|
||||
|
||||
@ -345,12 +348,16 @@ proc mountMix*(node: WakuNode, mixPrivKey: Curve25519Key): Future[Result[void, s
|
||||
info "local addr", localaddr = localaddrStr
|
||||
|
||||
let localMixNodeInfo = initMixNodeInfo(
|
||||
localaddrStr & "/p2p/" & $node.peerId, mixPubKey, mixPrivKey, node.switch.peerInfo.publicKey.skkey,
|
||||
localaddrStr & "/p2p/" & $node.peerId,
|
||||
mixPubKey,
|
||||
mixPrivKey,
|
||||
node.switch.peerInfo.publicKey.skkey,
|
||||
node.switch.peerInfo.privateKey.skkey,
|
||||
)
|
||||
# TODO: Pass bootnodes from config,
|
||||
# TODO: Pass bootnodes from config,
|
||||
# TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered
|
||||
let protoRes = MixProtocol.initMix(localMixNodeInfo, node.switch, initTable[PeerId, MixPubInfo]())
|
||||
let protoRes =
|
||||
MixProtocol.initMix(localMixNodeInfo, node.switch, initTable[PeerId, MixPubInfo]())
|
||||
if protoRes.isErr:
|
||||
error "Mix protocol initialization failed", err = protoRes.error
|
||||
return
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
import
|
||||
std/[options, bitops, sequtils, net, tables], results, eth/keys, libp2p/crypto/crypto
|
||||
import ../common/enr, ../waku_core/codecs
|
||||
import mix_protocol
|
||||
import mix/mix_protocol
|
||||
|
||||
const CapabilitiesEnrField* = "waku2"
|
||||
|
||||
@ -33,7 +33,8 @@ const capabilityToCodec = {
|
||||
}.toTable
|
||||
|
||||
func init*(
|
||||
T: type CapabilitiesBitfield, lightpush, filter, store, relay, sync, mix: bool = false
|
||||
T: type CapabilitiesBitfield,
|
||||
lightpush, filter, store, relay, sync, mix: bool = false,
|
||||
): T =
|
||||
## Creates an waku2 ENR flag bit field according to RFC 31 (https://rfc.vac.dev/spec/31/)
|
||||
var bitfield: uint8
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user