diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 3cdbc1394..a1947c7cb 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -14,6 +14,7 @@ import eth/p2p/discoveryv5/enr, libp2p/crypto/crypto, libp2p/crypto/curve25519, + libp2p/[multiaddress, multicodec], libp2p/protocols/ping, libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/rpc/messages, @@ -27,7 +28,6 @@ import ../../vendor/mix/src/curve25519, ../../vendor/mix/src/protocol - import ../waku_core, ../waku_core/topics/sharding, @@ -56,7 +56,10 @@ import ../waku_rln_relay, ./net_config, ./peer_manager, - ../common/rate_limit/setting + ../common/rate_limit/setting, + ../discovery/autonat_service, + ../common/nimchronos, + ../waku_enr/mix declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicHistogram waku_histogram_message_size, @@ -212,7 +215,7 @@ proc mountSharding*( node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount) return ok() -proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] = +#[ proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] = var mixNodes = initTable[PeerId, MixPubInfo]() # MixNode Multiaddrs and PublicKeys: let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", @@ -239,10 +242,86 @@ proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, M mixNodes[peerId] = mixNodePubInfo info "using mix bootstrap nodes ", bootNodes = mixNodes return mixNodes + ]# +#TODO: Ideally these procs should be moved out into mix specific file, but keeping it here for now. +proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool = + if peer.enr.isNone(): + debug "peer has no ENR", peer = $peer + return false - # Mix Protocol -proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]] {.async.} = + if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()): + debug "peer has mismatching cluster", peer = $peer + return false + + #TODO: Filter if mix is enabled + + return true + +proc addPeerId*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress = + if multiaddr.contains(multiCodec("p2p")).get(): + return multiaddr + + var maddrStr = multiaddr.toString().valueOr: + error "Failed to convert multiaddress to string.", err = error + return multiaddr + maddrStr.add("/p2p/" & $peerId) + var cleanAddr = MultiAddress.init(maddrStr).valueOr: + error "Failed to convert string to multiaddress.", err = error + return multiaddr + return cleanAddr + +proc populateMixNodePool*(node: WakuNode) {.async.} = + var cluster: uint16 + let enrRes = node.enr.toTyped() + if enrRes.isOk(): + let shardingRes = enrRes.get().relaySharding() + if shardingRes.isSome(): + let relayShard = shardingRes.get() + cluster = relayShard.clusterID + else: + error "could not get cluster from ENR", error = enrRes.error + + # populate only peers that i) are reachable ii) share cluster iii) support mix + let remotePeers = node.peerManager.wakuPeerStore.getReachablePeers().filterIt( + mixPoolFilter(some(cluster), it) + ) + var mixNodes = initTable[PeerId, MixPubInfo]() + + for i in 0 ..< min(remotePeers.len, 100): + let remotePeerENR = remotePeers[i].enr.get() + # TODO: use the most exposed/external multiaddr of the peer, right now using the first + let maddrWithPeerId = + toString(addPeerId(remotePeers[i].addrs[0], remotePeers[i].peerId)) + debug "remote peer ENR", + peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId + + let peerMixPubKey = mixKey(remotePeerENR).get() + let mixNodePubInfo = + createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey)) + mixNodes[remotePeers[i].peerId] = mixNodePubInfo + + # set the mix node pool + node.mix.setNodePool(mixNodes) + debug "mix node pool updated", poolSize = node.mix.getNodePoolSize() + return + +proc startMixNodePoolMgr*(node: WakuNode) {.async.} = + # try more aggressively to populate the pool at startup + var attempts = 50 + while node.mix.getNodePoolSize() < 100 and attempts > 0: + attempts -= 1 + discard node.populateMixNodePool() + await sleepAsync(1.seconds) + + heartbeat "Updating mix node pool", 10.minutes: + discard node.populateMixNodePool() + + # Mix Protocol + +proc mountMix*( + node: WakuNode, mixPrivKey: string +): Future[Result[void, string]] {.async.} = info "mounting mix protocol", nodeId = node.info #TODO log the config used info "mixPrivKey", mixPrivKey = mixPrivKey @@ -254,11 +333,16 @@ proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]] info "local addr", localaddr = localaddrStr let localMixNodeInfo = initMixNodeInfo( - localaddrStr & "/p2p/" & $node.peerId, mixPubKey, mixKey, node.switch.peerInfo.publicKey.skkey, + localaddrStr & "/p2p/" & $node.peerId, + mixPubKey, + mixKey, + node.switch.peerInfo.publicKey.skkey, node.switch.peerInfo.privateKey.skkey, ) - - let protoRes = MixProtocol.initMix(localMixNodeInfo, node.switch, node.getBootStrapMixNodes(node.peerId)) + # TODO: Pass bootnodes from config, + # TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered + let protoRes = + MixProtocol.initMix(localMixNodeInfo, node.switch, initTable[PeerId, MixPubInfo]()) if protoRes.isErr: error "Mix protocol initialization failed", err = protoRes.error return @@ -269,6 +353,8 @@ proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]] if catchRes.isErr(): return err(catchRes.error.msg) + discard startMixNodePoolMgr(node) + return ok() ## Waku Sync diff --git a/waku/waku_enr/capabilities.nim b/waku/waku_enr/capabilities.nim index ba6ee3a99..7eec92019 100644 --- a/waku/waku_enr/capabilities.nim +++ b/waku/waku_enr/capabilities.nim @@ -3,6 +3,7 @@ import std/[options, bitops, sequtils, net, tables], results, eth/keys, libp2p/crypto/crypto import ../common/enr, ../waku_core/codecs +import ../../vendor/mix/src/mix_protocol const CapabilitiesEnrField* = "waku2" @@ -28,10 +29,12 @@ const capabilityToCodec = { Capabilities.Filter: WakuFilterSubscribeCodec, Capabilities.Lightpush: WakuLightPushCodec, Capabilities.Sync: WakuReconciliationCodec, + Capabilities.Mix: MixProtocolID, }.toTable func init*( - T: type CapabilitiesBitfield, lightpush, filter, store, relay, sync, mix: bool = false + T: type CapabilitiesBitfield, + lightpush, filter, store, relay, sync, mix: bool = false, ): T = ## Creates an waku2 ENR flag bit field according to RFC 31 (https://rfc.vac.dev/spec/31/) var bitfield: uint8 diff --git a/waku/waku_enr/mix.nim b/waku/waku_enr/mix.nim index 4399d02af..6971b3e67 100644 --- a/waku/waku_enr/mix.nim +++ b/waku/waku_enr/mix.nim @@ -20,3 +20,13 @@ func mixKey*(record: TypedRecord): Option[seq[byte]] = if field.isNone(): return none(seq[byte]) return field + +func mixKey*(record: Record): Option[seq[byte]] = + let recordRes = record.toTyped() + if recordRes.isErr(): + return none(seq[byte]) + + let field = recordRes.value.tryGet(MixKeyEnrField, seq[byte]) + if field.isNone(): + return none(seq[byte]) + return field \ No newline at end of file