diff --git a/examples/lightpush_publisher_mix.nim b/examples/lightpush_publisher_mix.nim index 8f70a0cf2..cedbea689 100644 --- a/examples/lightpush_publisher_mix.nim +++ b/examples/lightpush_publisher_mix.nim @@ -12,8 +12,7 @@ import eth/keys, eth/p2p/discoveryv5/enr -import entry_connection, - app_protocols +import mix/entry_connection, mix/protocol import waku/[ @@ -25,7 +24,7 @@ import waku_enr, discovery/waku_discv5, factory/builder, - waku_lightpush/client + waku_lightpush/client, ] proc now*(): Timestamp = @@ -81,28 +80,34 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol") node.mountLightPushClient() try: - await node.mountPeerExchange( - some(uint16(clusterId)) - ) + await node.mountPeerExchange(some(uint16(clusterId))) except CatchableError: - error "failed to mount waku peer-exchange protocol: ", errmsg = getCurrentExceptionMsg() + error "failed to mount waku peer-exchange protocol: ", + errmsg = getCurrentExceptionMsg() return let pxPeerInfo = RemotePeerInfo.init( "16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", - @[MultiAddress.init("/ip4/127.0.0.1/tcp/60001").get()] + @[MultiAddress.init("/ip4/127.0.0.1/tcp/60001").get()], ) node.peerManager.addServicePeer(pxPeerInfo, WakuPeerExchangeCodec) let pxPeerInfo2 = RemotePeerInfo.init( "16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu", - @[MultiAddress.init("/ip4/127.0.0.1/tcp/60005").get()] + @[MultiAddress.init("/ip4/127.0.0.1/tcp/60005").get()], ) node.peerManager.addServicePeer(pxPeerInfo2, WakuPeerExchangeCodec) + ( - await node.mountMix(intoCurve25519Key(ncrutils.fromHex("401dd1eb5582f6dc9488d424aa26ed1092becefcf8543172e6d92c17ed07265a"))) + await node.mountMix( + intoCurve25519Key( + ncrutils.fromHex( + "401dd1eb5582f6dc9488d424aa26ed1092becefcf8543172e6d92c17ed07265a" + ) + ) + ) ).isOkOr: error "failed to mount waku mix protocol: ", error = $error - return + return #discard node.setMixBootStrapNodes() let destPeerId = PeerId.init("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o").valueOr: @@ -113,7 +118,8 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = "/ip4/127.0.0.1/tcp/60001", destPeerId, ProtocolType.fromString(WakuLightPushCodec), - node.mix) + node.mix, + ) await node.start() node.peerManager.start() @@ -123,7 +129,8 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = warn "Cannot fetch peers from peer exchange", cause = error while node.getMixNodePoolSize() < 3: - info "waiting for mix nodes to be discovered", currentpoolSize = node.getMixNodePoolSize() + info "waiting for mix nodes to be discovered", + currentpoolSize = node.getMixNodePoolSize() await sleepAsync(1000) notice "publisher service started" @@ -139,8 +146,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = timestamp: now(), ) # current timestamp - let res = - await node.wakuLightpushClient.publishWithConn(LightpushPubsubTopic, message, conn) + let res = await node.wakuLightpushClient.publishWithConn( + LightpushPubsubTopic, message, conn + ) if res.isOk: notice "published message", @@ -153,7 +161,6 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = await sleepAsync(1000) - when isMainModule: let rng = crypto.newRng() asyncSpawn setupAndPublish(rng) diff --git a/vendor/mix b/vendor/mix index 6b4787e58..5f9633df5 160000 --- a/vendor/mix +++ b/vendor/mix @@ -1 +1 @@ -Subproject commit 6b4787e5899a839deca84c11c6bcf7000353cb9b +Subproject commit 5f9633df51e557ff99f030bc0ad9b3fca29566ab diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 22e002e60..9da1994c7 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -39,7 +39,7 @@ import ../common/utils/parse_size_units, ../common/rate_limit/setting, ../common/databases/dburl, - curve25519_utils + mix/curve25519_utils ## Peer persistence diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 4a41d2635..eaecc7713 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -25,11 +25,10 @@ import libp2p/transports/tcptransport, libp2p/transports/wstransport, libp2p/utility, - mix_node, - mix_protocol, - curve25519_utils, - app_protocols - + mix/mix_node, + mix/mix_protocol, + mix/curve25519_utils, + mix/app_protocols import ../waku_core, @@ -250,7 +249,7 @@ proc mountSharding*( #TODO: Ideally these procs should be moved out into mix specific file, but keeping it here for now. proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool = - # Note that origin based(discv5) filtering is not done intentionally + # Note that origin based(discv5) filtering is not done intentionally # so that more mix nodes can be discovered. if peer.enr.isNone(): trace "peer has no ENR", peer = $peer @@ -267,7 +266,7 @@ proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool = return true -proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId:PeerId): MultiAddress = +proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress = if multiaddr.contains(multiCodec("p2p")).get(): return multiaddr @@ -280,7 +279,7 @@ proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId:PeerId): MultiAddr return multiaddr return cleanAddr -proc populateMixNodePool*(node: WakuNode){.async} = +proc populateMixNodePool*(node: WakuNode) {.async.} = var cluster: uint16 let enrRes = node.enr.toTyped() if enrRes.isOk(): @@ -293,18 +292,21 @@ proc populateMixNodePool*(node: WakuNode){.async} = # populate only peers that i) are reachable ii) share cluster iii) support mix let remotePeers = node.peerManager.wakuPeerStore.getReachablePeers().filterIt( - mixPoolFilter(some(cluster), it) + mixPoolFilter(some(cluster), it) ) var mixNodes = initTable[PeerId, MixPubInfo]() for i in 0 ..< min(remotePeers.len, 100): let remotePeerENR = remotePeers[i].enr.get() # TODO: use the most exposed/external multiaddr of the peer, right now using the first - let maddrWithPeerId = toString(appendPeerIdToMultiaddr(remotePeers[i].addrs[0],remotePeers[i].peerId)) - trace "remote peer ENR", peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId + let maddrWithPeerId = + toString(appendPeerIdToMultiaddr(remotePeers[i].addrs[0], remotePeers[i].peerId)) + trace "remote peer ENR", + peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId let peerMixPubKey = mixKey(remotePeerENR).get() - let mixNodePubInfo = createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey)) + let mixNodePubInfo = + createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey)) mixNodes[remotePeers[i].peerId] = mixNodePubInfo # set the mix node pool @@ -312,8 +314,7 @@ proc populateMixNodePool*(node: WakuNode){.async} = trace "mix node pool updated", poolSize = node.mix.getNodePoolSize() return - -proc startMixNodePoolMgr*(node: WakuNode ){.async} = +proc startMixNodePoolMgr*(node: WakuNode) {.async.} = info "starting mix node pool manager" # try more aggressively to populate the pool at startup var attempts = 50 @@ -333,10 +334,12 @@ proc getMixNodePoolSize*(node: WakuNode): int = #[ proc setMixBootStrapNodes*(node: WakuNode,){.async}= node.mix.setNodePool(node.getBootStrapMixNodes()) ]# - # Mix Protocol -proc mountMix*(node: WakuNode, mixPrivKey: Curve25519Key): Future[Result[void, string]] {.async.} = +# Mix Protocol +proc mountMix*( + node: WakuNode, mixPrivKey: Curve25519Key +): Future[Result[void, string]] {.async.} = info "mounting mix protocol", nodeId = node.info #TODO log the config used - let mixPubKey = public(mixPrivKey) + let mixPubKey = public(mixPrivKey) info "mixPrivKey", mixPrivKey = mixPrivKey, mixPubKey = mixPubKey @@ -345,12 +348,16 @@ proc mountMix*(node: WakuNode, mixPrivKey: Curve25519Key): Future[Result[void, s info "local addr", localaddr = localaddrStr let localMixNodeInfo = initMixNodeInfo( - localaddrStr & "/p2p/" & $node.peerId, mixPubKey, mixPrivKey, node.switch.peerInfo.publicKey.skkey, + localaddrStr & "/p2p/" & $node.peerId, + mixPubKey, + mixPrivKey, + node.switch.peerInfo.publicKey.skkey, node.switch.peerInfo.privateKey.skkey, ) - # TODO: Pass bootnodes from config, + # TODO: Pass bootnodes from config, # TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered - let protoRes = MixProtocol.initMix(localMixNodeInfo, node.switch, initTable[PeerId, MixPubInfo]()) + let protoRes = + MixProtocol.initMix(localMixNodeInfo, node.switch, initTable[PeerId, MixPubInfo]()) if protoRes.isErr: error "Mix protocol initialization failed", err = protoRes.error return diff --git a/waku/waku_enr/capabilities.nim b/waku/waku_enr/capabilities.nim index 343a1e78e..f74b8b177 100644 --- a/waku/waku_enr/capabilities.nim +++ b/waku/waku_enr/capabilities.nim @@ -3,7 +3,7 @@ import std/[options, bitops, sequtils, net, tables], results, eth/keys, libp2p/crypto/crypto import ../common/enr, ../waku_core/codecs -import mix_protocol +import mix/mix_protocol const CapabilitiesEnrField* = "waku2" @@ -33,7 +33,8 @@ const capabilityToCodec = { }.toTable func init*( - T: type CapabilitiesBitfield, lightpush, filter, store, relay, sync, mix: bool = false + T: type CapabilitiesBitfield, + lightpush, filter, store, relay, sync, mix: bool = false, ): T = ## Creates an waku2 ENR flag bit field according to RFC 31 (https://rfc.vac.dev/spec/31/) var bitfield: uint8