diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 39beffc5f..fe3252501 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -419,7 +419,7 @@ proc setupProtocols( #mount mix if conf.mix: - (await node.mountMix(mixPrivKey)).isOkOr: + (await node.mountMix(conf.clusterId, mixPrivKey)).isOkOr: return err("failed to mount waku mix protocol: " & $error) return ok() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 2f3ab62a7..4200da20e 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -60,7 +60,8 @@ import ../common/rate_limit/setting, ../discovery/autonat_service, ../common/nimchronos, - ../waku_enr/mix + ../waku_enr/mix, + ../waku_mix declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicHistogram waku_histogram_message_size, @@ -129,7 +130,7 @@ type started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] rateLimitSettings*: ProtocolRateLimitSettings - mix*: MixProtocol + mix*: WakuMix mixbootNodes*: Table[PeerId, MixPubInfo] proc new*( @@ -216,153 +217,24 @@ proc mountSharding*( node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount) return ok() -#[ proc getBootStrapMixNodes*(node: WakuNode): Table[PeerId, MixPubInfo] = - var mixNodes = initTable[PeerId, MixPubInfo]() - # MixNode Multiaddrs and PublicKeys: - let bootNodesMultiaddrs = [ - "/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", - "/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", - "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA", - "/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f", - "/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu", - ] - let bootNodesMixPubKeys = [ - "9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c", - "9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a", - "275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c", - "e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18", - "8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f", - ] - for index, mixNodeMultiaddr in bootNodesMultiaddrs: - let peerIdRes = getPeerIdFromMultiAddr(mixNodeMultiaddr) - if peerIdRes.isErr: - error "Failed to get peer id from multiaddress: ", error = peerIdRes.error - let peerId = peerIdRes.get() - #if (not peerID == nil) and peerID == exceptPeerID: - # continue - let mixNodePubInfo = createMixPubInfo( - mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index])) - ) - - mixNodes[peerId] = mixNodePubInfo - info "using mix bootstrap nodes ", bootNodes = mixNodes - return mixNodes - - ]# - -#TODO: Ideally these procs should be moved out into mix specific file, but keeping it here for now. -proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool = - # Note that origin based(discv5) filtering is not done intentionally - # so that more mix nodes can be discovered. - if peer.enr.isNone(): - trace "peer has no ENR", peer = $peer - return false - - if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()): - debug "peer has mismatching cluster", peer = $peer - return false - - # Filter if mix is enabled - if not peer.enr.get().supportsCapability(Capabilities.Mix): - debug "peer doesn't support mix", peer = $peer - return false - - return true - -proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress = - if multiaddr.contains(multiCodec("p2p")).get(): - return multiaddr - - var maddrStr = multiaddr.toString().valueOr: - error "Failed to convert multiaddress to string.", err = error - return multiaddr - maddrStr.add("/p2p/" & $peerId) - var cleanAddr = MultiAddress.init(maddrStr).valueOr: - error "Failed to convert string to multiaddress.", err = error - return multiaddr - return cleanAddr - -proc populateMixNodePool*(node: WakuNode) {.async.} = - var cluster: uint16 - let enrRes = node.enr.toTyped() - if enrRes.isOk(): - let shardingRes = enrRes.get().relaySharding() - if shardingRes.isSome(): - let relayShard = shardingRes.get() - cluster = relayShard.clusterID - else: - error "could not get cluster from ENR", error = enrRes.error - - # populate only peers that i) are reachable ii) share cluster iii) support mix - let remotePeers = node.peerManager.wakuPeerStore.getReachablePeers().filterIt( - mixPoolFilter(some(cluster), it) - ) - var mixNodes = initTable[PeerId, MixPubInfo]() - - for i in 0 ..< min(remotePeers.len, 100): - let remotePeerENR = remotePeers[i].enr.get() - # TODO: use the most exposed/external multiaddr of the peer, right now using the first - let maddrWithPeerId = - toString(appendPeerIdToMultiaddr(remotePeers[i].addrs[0], remotePeers[i].peerId)) - trace "remote peer ENR", - peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId - - let peerMixPubKey = mixKey(remotePeerENR).get() - let mixNodePubInfo = - createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey)) - mixNodes[remotePeers[i].peerId] = mixNodePubInfo - mix_pool_size.set(len(mixNodes)) - # set the mix node pool - node.mix.setNodePool(mixNodes) - trace "mix node pool updated", poolSize = node.mix.getNodePoolSize() - return - -proc startMixNodePoolMgr*(node: WakuNode) {.async.} = - info "starting mix node pool manager" - # try more aggressively to populate the pool at startup - var attempts = 50 - # TODO: make initial pool size configurable - while node.mix.getNodePoolSize() < 100 and attempts > 0: - attempts -= 1 - discard node.populateMixNodePool() - await sleepAsync(1.seconds) - - # TODO: make interval configurable - heartbeat "Updating mix node pool", 5.seconds: - discard node.populateMixNodePool() - proc getMixNodePoolSize*(node: WakuNode): int = return node.mix.getNodePoolSize() -#[ proc setMixBootStrapNodes*(node: WakuNode,){.async}= - node.mix.setNodePool(node.getBootStrapMixNodes()) - ]# -# Mix Protocol proc mountMix*( - node: WakuNode, mixPrivKey: Curve25519Key + node: WakuNode, clusterId: uint16, mixPrivKey: Curve25519Key ): Future[Result[void, string]] {.async.} = info "mounting mix protocol", nodeId = node.info #TODO log the config used - let mixPubKey = public(mixPrivKey) - - info "mixPrivKey", mixPrivKey = mixPrivKey, mixPubKey = mixPubKey let localaddrStr = node.announcedAddresses[0].toString().valueOr: return err("Failed to convert multiaddress to string.") info "local addr", localaddr = localaddrStr - let localMixNodeInfo = initMixNodeInfo( - localaddrStr & "/p2p/" & $node.peerId, - mixPubKey, - mixPrivKey, - node.switch.peerInfo.publicKey.skkey, - node.switch.peerInfo.privateKey.skkey, - ) + let nodeAddr = localaddrStr & "/p2p/" & $node.peerId # TODO: Pass bootnodes from config, - # TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered let protoRes = - MixProtocol.initMix(localMixNodeInfo, node.switch, initTable[PeerId, MixPubInfo]()) + WakuMix.new(nodeAddr, node.switch, node.peerManager, clusterId, mixPrivKey) if protoRes.isErr: - error "Mix protocol initialization failed", err = protoRes.error + error "Waku Mix protocol initialization failed", err = protoRes.error return node.mix = protoRes.value @@ -370,9 +242,7 @@ proc mountMix*( node.switch.mount(node.mix) if catchRes.isErr(): return err(catchRes.error.msg) - - discard startMixNodePoolMgr(node) - + node.mix.start() return ok() ## Waku Sync diff --git a/waku/waku_mix.nim b/waku/waku_mix.nim new file mode 100644 index 000000000..0484e46db --- /dev/null +++ b/waku/waku_mix.nim @@ -0,0 +1,3 @@ +import ./waku_mix/[protocol] + +export protocol diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim new file mode 100644 index 000000000..902cc0845 --- /dev/null +++ b/waku/waku_mix/protocol.nim @@ -0,0 +1,157 @@ +{.push raises: [].} + +import chronicles, std/[options], chronos, results + +import + libp2p/crypto/curve25519, + mix/mix_protocol, + mix/mix_node, + mix/mix_metrics, + libp2p/[multiaddress, multicodec, peerid] + +import + ../node/peer_manager, + ../waku_enr/mix, + ../waku_enr, + ../node/peer_manager/waku_peer_store + +logScope: + topics = "waku mix" + +type + WakuMix* = ref object of MixProtocol + peerManager*: PeerManager + clusterId: uint16 + + WakuMixResult*[T] = Result[T, string] + +proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool = + # Note that origin based(discv5) filtering is not done intentionally + # so that more mix nodes can be discovered. + if peer.enr.isNone(): + trace "peer has no ENR", peer = $peer + return false + + if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()): + debug "peer has mismatching cluster", peer = $peer + return false + + # Filter if mix is enabled + if not peer.enr.get().supportsCapability(Capabilities.Mix): + debug "peer doesn't support mix", peer = $peer + return false + + return true + +proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress = + if multiaddr.contains(multiCodec("p2p")).get(): + return multiaddr + + var maddrStr = multiaddr.toString().valueOr: + error "Failed to convert multiaddress to string.", err = error + return multiaddr + maddrStr.add("/p2p/" & $peerId) + var cleanAddr = MultiAddress.init(maddrStr).valueOr: + error "Failed to convert string to multiaddress.", err = error + return multiaddr + return cleanAddr + +proc populateMixNodePool*(mix: WakuMix) {.async.} = + # populate only peers that i) are reachable ii) share cluster iii) support mix + let remotePeers = mix.peerManager.wakuPeerStore.getReachablePeers().filterIt( + mixPoolFilter(some(mix.clusterId), it) + ) + var mixNodes = initTable[PeerId, MixPubInfo]() + + for i in 0 ..< min(remotePeers.len, 100): + let remotePeerENR = remotePeers[i].enr.get() + # TODO: use the most exposed/external multiaddr of the peer, right now using the first + let maddrWithPeerId = + toString(appendPeerIdToMultiaddr(remotePeers[i].addrs[0], remotePeers[i].peerId)) + trace "remote peer ENR", + peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId + + let peerMixPubKey = mixKey(remotePeerENR).get() + let mixNodePubInfo = + createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey)) + mixNodes[remotePeers[i].peerId] = mixNodePubInfo + + mix_pool_size.set(len(mixNodes)) + # set the mix node pool + mix.setNodePool(mixNodes) + trace "mix node pool updated", poolSize = mix.getNodePoolSize() + return + +proc startMixNodePoolMgr*(mix: WakuMix) {.async.} = + info "starting mix node pool manager" + # try more aggressively to populate the pool at startup + var attempts = 50 + # TODO: make initial pool size configurable + while mix.getNodePoolSize() < 100 and attempts > 0: + attempts -= 1 + discard mix.populateMixNodePool() + await sleepAsync(1.seconds) + + # TODO: make interval configurable + heartbeat "Updating mix node pool", 5.seconds: + discard mix.populateMixNodePool() + +#[ proc getBootStrapMixNodes*(node: WakuNode): Table[PeerId, MixPubInfo] = + var mixNodes = initTable[PeerId, MixPubInfo]() + # MixNode Multiaddrs and PublicKeys: + let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", + "/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", + "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA", + "/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f", + "/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu", + ] + let bootNodesMixPubKeys = ["9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c", + "9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a", + "275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c", + "e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18", + "8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" + ] + for index, mixNodeMultiaddr in bootNodesMultiaddrs: + let peerIdRes = getPeerIdFromMultiAddr(mixNodeMultiaddr) + if peerIdRes.isErr: + error "Failed to get peer id from multiaddress: " , error = peerIdRes.error + let peerId = peerIdRes.get() + #if (not peerID == nil) and peerID == exceptPeerID: + # continue + let mixNodePubInfo = createMixPubInfo(mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index]))) + + mixNodes[peerId] = mixNodePubInfo + info "using mix bootstrap nodes ", bootNodes = mixNodes + return mixNodes + ]# + +proc new*( + T: type WakuMix, + nodeAddr: string, + switch: Switch, + peermgr: PeerManager, + clusterId: uint16, + mixPrivKey: Curve25519Key, +): WakuMixResult[T] = + let mixPubKey = public(mixPrivKey) + info "mixPrivKey", mixPrivKey = mixPrivKey, mixPubKey = mixPubKey + + # TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered + var m: WakuMix + + let localMixNodeInfo = initMixNodeInfo( + nodeAddr, mixPubKey, mixPrivKey, switch.peerInfo.publicKey.skkey, + switch.peerInfo.privateKey.skkey, + ) + m = initMix(localMixNodeInfo, switch, initTable[PeerId, MixPubInfo]()) + m.peerManager = peermgr + m.clusterId = clusterId + return + +proc start*(mix: Wakumix) = + discard mix.startMixNodePoolMgr() + +#[ proc setMixBootStrapNodes*(node: WakuNode,){.async}= + node.mix.setNodePool(node.getBootStrapMixNodes()) + ]# +# Mix Protocol