mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-02-05 22:43:26 +00:00
update waku mix to not have local node pool as it is moved to mix protocol
This commit is contained in:
parent
9b3af3864d
commit
92fbb68d1d
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
||||
Subproject commit 2ef14415f31ecb598fc4e1a2f9761677076aefc5
|
||||
Subproject commit fd7bbc9dfd49af1b93b8b60cccf129d2ac6e8ece
|
||||
@ -256,7 +256,7 @@ proc mountAutoSharding*(
|
||||
return ok()
|
||||
|
||||
proc getMixNodePoolSize*(node: WakuNode): int =
|
||||
return node.wakuMix.getNodePoolSize()
|
||||
return node.wakuMix.nodePool.len
|
||||
|
||||
proc mountMix*(
|
||||
node: WakuNode,
|
||||
|
||||
@ -1,22 +1,23 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import chronicles, std/[options, tables, sequtils], chronos, results, metrics, strutils
|
||||
import chronicles, std/options, chronos, results, metrics
|
||||
|
||||
import
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/mix,
|
||||
libp2p/protocols/mix/mix_node,
|
||||
libp2p/protocols/mix/mix_protocol,
|
||||
libp2p/protocols/mix/mix_metrics,
|
||||
libp2p/[multiaddress, multicodec, peerid],
|
||||
libp2p/protocols/mix/delay_strategy,
|
||||
libp2p/[multiaddress, peerid],
|
||||
eth/common/keys
|
||||
|
||||
import
|
||||
../node/peer_manager,
|
||||
../waku_core,
|
||||
../waku_enr,
|
||||
../node/peer_manager/waku_peer_store,
|
||||
../common/nimchronos
|
||||
../node/peer_manager/waku_peer_store
|
||||
|
||||
logScope:
|
||||
topics = "waku mix"
|
||||
@ -27,7 +28,6 @@ type
|
||||
WakuMix* = ref object of MixProtocol
|
||||
peerManager*: PeerManager
|
||||
clusterId: uint16
|
||||
nodePoolLoopHandle: Future[void]
|
||||
pubKey*: Curve25519Key
|
||||
|
||||
WakuMixResult*[T] = Result[T, string]
|
||||
@ -36,106 +36,10 @@ type
|
||||
multiAddr*: string
|
||||
pubKey*: Curve25519Key
|
||||
|
||||
proc filterMixNodes(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
||||
# Note that origin based(discv5) filtering is not done intentionally
|
||||
# so that more mix nodes can be discovered.
|
||||
if peer.mixPubKey.isNone():
|
||||
trace "remote peer has no mix Pub Key", peer = $peer
|
||||
return false
|
||||
|
||||
if cluster.isSome() and peer.enr.isSome() and
|
||||
peer.enr.get().isClusterMismatched(cluster.get()):
|
||||
trace "peer has mismatching cluster", peer = $peer
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress =
|
||||
if multiaddr.contains(multiCodec("p2p")).get():
|
||||
return multiaddr
|
||||
|
||||
var maddrStr = multiaddr.toString().valueOr:
|
||||
error "Failed to convert multiaddress to string.", err = error
|
||||
return multiaddr
|
||||
maddrStr.add("/p2p/" & $peerId)
|
||||
var cleanAddr = MultiAddress.init(maddrStr).valueOr:
|
||||
error "Failed to convert string to multiaddress.", err = error
|
||||
return multiaddr
|
||||
return cleanAddr
|
||||
|
||||
func getIPv4Multiaddr*(maddrs: seq[MultiAddress]): Option[MultiAddress] =
|
||||
for multiaddr in maddrs:
|
||||
trace "checking multiaddr", addr = $multiaddr
|
||||
if multiaddr.contains(multiCodec("ip4")).get():
|
||||
trace "found ipv4 multiaddr", addr = $multiaddr
|
||||
return some(multiaddr)
|
||||
trace "no ipv4 multiaddr found"
|
||||
return none(MultiAddress)
|
||||
|
||||
proc populateMixNodePool*(mix: WakuMix) =
|
||||
# populate only peers that i) are reachable ii) share cluster iii) support mix
|
||||
let remotePeers = mix.peerManager.switch.peerStore.peers().filterIt(
|
||||
filterMixNodes(some(mix.clusterId), it)
|
||||
)
|
||||
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||
|
||||
for i in 0 ..< min(remotePeers.len, 100):
|
||||
let ipv4addr = getIPv4Multiaddr(remotePeers[i].addrs).valueOr:
|
||||
trace "peer has no ipv4 address", peer = $remotePeers[i]
|
||||
continue
|
||||
let maddrWithPeerId = appendPeerIdToMultiaddr(ipv4addr, remotePeers[i].peerId)
|
||||
trace "remote peer info", info = remotePeers[i]
|
||||
|
||||
if remotePeers[i].mixPubKey.isNone():
|
||||
trace "peer has no mix Pub Key", remotePeerId = $remotePeers[i]
|
||||
continue
|
||||
|
||||
let peerMixPubKey = remotePeers[i].mixPubKey.get()
|
||||
var peerPubKey: crypto.PublicKey
|
||||
if not remotePeers[i].peerId.extractPublicKey(peerPubKey):
|
||||
warn "Failed to extract public key from peerId, skipping node",
|
||||
remotePeerId = remotePeers[i].peerId
|
||||
continue
|
||||
|
||||
if peerPubKey.scheme != PKScheme.Secp256k1:
|
||||
warn "Peer public key is not Secp256k1, skipping node",
|
||||
remotePeerId = remotePeers[i].peerId, scheme = peerPubKey.scheme
|
||||
continue
|
||||
|
||||
let mixNodePubInfo = MixPubInfo.init(
|
||||
remotePeers[i].peerId,
|
||||
ipv4addr,
|
||||
intoCurve25519Key(peerMixPubKey),
|
||||
peerPubKey.skkey,
|
||||
)
|
||||
trace "adding mix node to pool",
|
||||
remotePeerId = remotePeers[i].peerId, multiAddr = $ipv4addr
|
||||
mixNodes[remotePeers[i].peerId] = mixNodePubInfo
|
||||
|
||||
# set the mix node pool
|
||||
mix.setNodePool(mixNodes)
|
||||
mix_pool_size.set(len(mixNodes))
|
||||
trace "mix node pool updated", poolSize = mix.getNodePoolSize()
|
||||
|
||||
# Once mix protocol starts to use info from PeerStore, then this can be removed.
|
||||
proc startMixNodePoolMgr*(mix: WakuMix) {.async.} =
|
||||
info "starting mix node pool manager"
|
||||
# try more aggressively to populate the pool at startup
|
||||
var attempts = 50
|
||||
# TODO: make initial pool size configurable
|
||||
while mix.getNodePoolSize() < 100 and attempts > 0:
|
||||
attempts -= 1
|
||||
mix.populateMixNodePool()
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
# TODO: make interval configurable
|
||||
heartbeat "Updating mix node pool", 5.seconds:
|
||||
mix.populateMixNodePool()
|
||||
|
||||
proc processBootNodes(
|
||||
bootnodes: seq[MixNodePubInfo], peermgr: PeerManager
|
||||
): Table[PeerId, MixPubInfo] =
|
||||
var mixNodes = initTable[PeerId, MixPubInfo]()
|
||||
bootnodes: seq[MixNodePubInfo], peermgr: PeerManager, mix: WakuMix
|
||||
) =
|
||||
var count = 0
|
||||
for node in bootnodes:
|
||||
let pInfo = parsePeerInfo(node.multiAddr).valueOr:
|
||||
error "Failed to get peer id from multiaddress: ",
|
||||
@ -156,14 +60,15 @@ proc processBootNodes(
|
||||
error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error
|
||||
continue
|
||||
|
||||
mixNodes[peerId] = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey)
|
||||
let mixPubInfo = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey)
|
||||
mix.nodePool.add(mixPubInfo)
|
||||
count += 1
|
||||
|
||||
peermgr.addPeer(
|
||||
RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey))
|
||||
)
|
||||
mix_pool_size.set(len(mixNodes))
|
||||
info "using mix bootstrap nodes ", bootNodes = mixNodes
|
||||
return mixNodes
|
||||
mix_pool_size.set(count)
|
||||
info "using mix bootstrap nodes ", count = count
|
||||
|
||||
proc new*(
|
||||
T: type WakuMix,
|
||||
@ -183,22 +88,22 @@ proc new*(
|
||||
)
|
||||
if bootnodes.len < minMixPoolSize:
|
||||
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
|
||||
let initTable = processBootNodes(bootnodes, peermgr)
|
||||
|
||||
if len(initTable) < minMixPoolSize:
|
||||
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
|
||||
var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey)
|
||||
procCall MixProtocol(m).init(localMixNodeInfo, initTable, peermgr.switch)
|
||||
procCall MixProtocol(m).init(
|
||||
localMixNodeInfo, peermgr.switch, delayStrategy = NoSamplingDelayStrategy.new(crypto.newRng())
|
||||
)
|
||||
|
||||
processBootNodes(bootnodes, peermgr, m)
|
||||
|
||||
if m.nodePool.len < minMixPoolSize:
|
||||
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
|
||||
return ok(m)
|
||||
|
||||
method start*(mix: WakuMix) =
|
||||
info "starting waku mix protocol"
|
||||
mix.nodePoolLoopHandle = mix.startMixNodePoolMgr()
|
||||
|
||||
method stop*(mix: WakuMix) {.async.} =
|
||||
if mix.nodePoolLoopHandle.isNil():
|
||||
return
|
||||
await mix.nodePoolLoopHandle.cancelAndWait()
|
||||
mix.nodePoolLoopHandle = nil
|
||||
discard
|
||||
|
||||
# Mix Protocol
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user