mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-09 01:13:08 +00:00
maintain mix node pool based on discovered nodes
This commit is contained in:
parent
fa16884e51
commit
4072fa7a50
@ -14,6 +14,7 @@ import
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/[multiaddress, multicodec],
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
@ -59,7 +60,9 @@ import
|
||||
./config,
|
||||
./peer_manager,
|
||||
../common/rate_limit/setting,
|
||||
../discovery/autonat_service
|
||||
../discovery/autonat_service,
|
||||
../common/nimchronos,
|
||||
../waku_enr/mix
|
||||
|
||||
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
||||
declarePublicHistogram waku_histogram_message_size,
|
||||
@ -216,7 +219,7 @@ proc mountSharding*(
|
||||
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
|
||||
return ok()
|
||||
|
||||
proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] =
|
||||
#[ proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] =
|
||||
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||
# MixNode Multiaddrs and PublicKeys:
|
||||
let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
|
||||
@ -243,7 +246,79 @@ proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, M
|
||||
mixNodes[peerId] = mixNodePubInfo
|
||||
info "using mix bootstrap nodes ", bootNodes = mixNodes
|
||||
return mixNodes
|
||||
]#
|
||||
|
||||
#TODO: Ideally these procs should be moved out into mix specific file, but keeping it here for now.
|
||||
proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
||||
|
||||
if peer.enr.isNone():
|
||||
debug "peer has no ENR", peer = $peer
|
||||
return false
|
||||
|
||||
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
|
||||
debug "peer has mismatching cluster", peer = $peer
|
||||
return false
|
||||
|
||||
#TODO: Filter if mix is enabled
|
||||
|
||||
return true
|
||||
|
||||
proc addPeerId*(multiaddr: MultiAddress, peerId:PeerId): MultiAddress =
|
||||
if multiaddr.contains(multiCodec("p2p")).get():
|
||||
return multiaddr
|
||||
|
||||
var maddrStr = multiaddr.toString().valueOr:
|
||||
error "Failed to convert multiaddress to string.", err = error
|
||||
return multiaddr
|
||||
maddrStr.add("/p2p/" & $peerId)
|
||||
var cleanAddr = MultiAddress.init(maddrStr).valueOr:
|
||||
error "Failed to convert string to multiaddress.", err = error
|
||||
return multiaddr
|
||||
return cleanAddr
|
||||
|
||||
proc populateMixNodePool*(node: WakuNode){.async} =
|
||||
var cluster: uint16
|
||||
let enrRes = node.enr.toTyped()
|
||||
if enrRes.isOk():
|
||||
let shardingRes = enrRes.get().relaySharding()
|
||||
if shardingRes.isSome():
|
||||
let relayShard = shardingRes.get()
|
||||
cluster = relayShard.clusterID
|
||||
else:
|
||||
error "could not get cluster from ENR", error = enrRes.error
|
||||
|
||||
# populate only peers that i) are reachable ii) share cluster iii) support mix
|
||||
let remotePeers = node.peerManager.wakuPeerStore.getReachablePeers().filterIt(
|
||||
mixPoolFilter(some(cluster), it)
|
||||
)
|
||||
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||
|
||||
for i in 0 ..< min(remotePeers.len, 100):
|
||||
let remotePeerENR = remotePeers[i].enr.get()
|
||||
# TODO: use the most exposed/external multiaddr of the peer, right now using the first
|
||||
let maddrWithPeerId = toString(addPeerId(remotePeers[i].addrs[0],remotePeers[i].peerId))
|
||||
debug "remote peer ENR", peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId
|
||||
|
||||
let peerMixPubKey = mixKey(remotePeerENR).get()
|
||||
let mixNodePubInfo = createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey))
|
||||
mixNodes[remotePeers[i].peerId] = mixNodePubInfo
|
||||
|
||||
# set the mix node pool
|
||||
node.mix.setNodePool(mixNodes)
|
||||
debug "mix node pool updated", poolSize = node.mix.getNodePoolSize()
|
||||
return
|
||||
|
||||
|
||||
proc startMixNodePoolMgr*(node: WakuNode ){.async} =
|
||||
# try more aggressively to populate the pool at startup
|
||||
var attempts = 50
|
||||
while node.mix.getNodePoolSize() < 100 and attempts > 0:
|
||||
attempts -= 1
|
||||
discard node.populateMixNodePool()
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
heartbeat "Updating mix node pool", 10.minutes:
|
||||
discard node.populateMixNodePool()
|
||||
|
||||
# Mix Protocol
|
||||
proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]] {.async.} =
|
||||
@ -261,8 +336,9 @@ proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]]
|
||||
localaddrStr & "/p2p/" & $node.peerId, mixPubKey, mixKey, node.switch.peerInfo.publicKey.skkey,
|
||||
node.switch.peerInfo.privateKey.skkey,
|
||||
)
|
||||
|
||||
let protoRes = MixProtocol.initMix(localMixNodeInfo, node.switch, node.getBootStrapMixNodes(node.peerId))
|
||||
# TODO: Pass bootnodes from config,
|
||||
# TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered
|
||||
let protoRes = MixProtocol.initMix(localMixNodeInfo, node.switch, initTable[PeerId, MixPubInfo]())
|
||||
if protoRes.isErr:
|
||||
error "Mix protocol initialization failed", err = protoRes.error
|
||||
return
|
||||
@ -273,6 +349,8 @@ proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]]
|
||||
if catchRes.isErr():
|
||||
return err(catchRes.error.msg)
|
||||
|
||||
discard startMixNodePoolMgr(node)
|
||||
|
||||
return ok()
|
||||
|
||||
## Waku Sync
|
||||
|
||||
@ -3,6 +3,7 @@
|
||||
import
|
||||
std/[options, bitops, sequtils, net, tables], results, eth/keys, libp2p/crypto/crypto
|
||||
import ../common/enr, ../waku_core/codecs
|
||||
import ../../vendor/mix/src/mix_protocol
|
||||
|
||||
const CapabilitiesEnrField* = "waku2"
|
||||
|
||||
@ -28,6 +29,7 @@ const capabilityToCodec = {
|
||||
Capabilities.Filter: WakuFilterSubscribeCodec,
|
||||
Capabilities.Lightpush: WakuLightPushCodec,
|
||||
Capabilities.Sync: WakuSyncCodec,
|
||||
Capabilities.Mix: MixProtocolID
|
||||
}.toTable
|
||||
|
||||
func init*(
|
||||
|
||||
@ -20,3 +20,13 @@ func mixKey*(record: TypedRecord): Option[seq[byte]] =
|
||||
if field.isNone():
|
||||
return none(seq[byte])
|
||||
return field
|
||||
|
||||
func mixKey*(record: Record): Option[seq[byte]] =
|
||||
let recordRes = record.toTyped()
|
||||
if recordRes.isErr():
|
||||
return none(seq[byte])
|
||||
|
||||
let field = recordRes.value.tryGet(MixKeyEnrField, seq[byte])
|
||||
if field.isNone():
|
||||
return none(seq[byte])
|
||||
return field
|
||||
Loading…
x
Reference in New Issue
Block a user