chore: capping mechanism for relay and service connections (#3184)

This commit is contained in:
Darshan K 2025-01-21 11:29:52 +05:30 committed by GitHub
parent d7cbe83b19
commit dd1a70bdb7
11 changed files with 182 additions and 92 deletions

View File

@ -463,7 +463,12 @@ proc initAndStartApp(
nodeBuilder.withNodeKey(key)
nodeBuilder.withRecord(record)
nodeBUilder.withSwitchConfiguration(maxConnections = some(MaxConnectedPeers))
nodeBuilder.withPeerManagerConfig(maxRelayPeers = some(20), shardAware = true)
nodeBuilder.withPeerManagerConfig(
maxConnections = MaxConnectedPeers,
relayServiceRatio = "13.33:86.67",
shardAware = true,
)
let res = nodeBuilder.withNetworkConfigurationDetails(bindIp, nodeTcpPort)
if res.isErr():
return err("node building error" & $res.error)

View File

@ -938,8 +938,8 @@ procSuite "Peer Manager":
test "peer manager cant have more max connections than peerstore size":
# Peerstore size can't be smaller than max connections
let peerStoreSize = 5
let maxConnections = 10
let peerStoreSize = 20
let maxConnections = 25
expect(Defect):
let pm = PeerManager.new(
@ -962,54 +962,61 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
maxFailedAttempts = 1,
maxRelayPeers = some(5),
storage = nil,
)
# Create 15 peers and add them to the peerstore
let peers = toSeq(1 .. 15)
# Create 30 peers and add them to the peerstore
let peers = toSeq(1 .. 30)
.mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get()))
.filterIt(it.isOk())
.mapIt(it.value)
for p in peers:
pm.addPeer(p)
# Check that we have 15 peers in the peerstore
# Check that we have 30 peers in the peerstore
check:
pm.wakuPeerStore.peers.len == 15
pm.wakuPeerStore.peers.len == 30
# fake that some peers failed to connected
pm.wakuPeerStore[NumberFailedConnBook][peers[0].peerId] = 2
pm.wakuPeerStore[NumberFailedConnBook][peers[1].peerId] = 2
pm.wakuPeerStore[NumberFailedConnBook][peers[2].peerId] = 2
pm.wakuPeerStore[NumberFailedConnBook][peers[3].peerId] = 2
pm.wakuPeerStore[NumberFailedConnBook][peers[4].peerId] = 2
# fake that some peers are connected
pm.wakuPeerStore[ConnectionBook][peers[5].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[8].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[10].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[12].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[15].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[18].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[24].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[29].peerId] = Connected
# Prune the peerstore (current=15, target=5)
# Prune the peerstore (current=30, target=25)
pm.prunePeerStore()
check:
# ensure peerstore was pruned
pm.wakuPeerStore.peers.len == 10
pm.wakuPeerStore.peers.len == 25
# ensure connected peers were not pruned
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[5].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[8].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[10].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[12].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[15].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[18].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[24].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[29].peerId)
# ensure peers that failed were the first to be pruned
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[0].peerId)
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[1].peerId)
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[2].peerId)
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[3].peerId)
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[4].peerId)
asyncTest "canBeConnected() returns correct value":
let pm = PeerManager.new(
@ -1018,14 +1025,13 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
initialBackoffInSec = 1,
# with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
backoffFactor = 2,
maxFailedAttempts = 10,
maxRelayPeers = some(5),
storage = nil,
)
var p1: PeerId
@ -1075,10 +1081,9 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
maxRelayPeers = some(5),
maxFailedAttempts = 150,
storage = nil,
)
@ -1091,11 +1096,10 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
maxFailedAttempts = 10,
maxRelayPeers = some(5),
storage = nil,
)
@ -1105,11 +1109,10 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
maxFailedAttempts = 5,
maxRelayPeers = some(5),
storage = nil,
)

View File

@ -106,49 +106,52 @@ suite "WakuNode":
await allFutures([node1.stop(), node2.stop()])
asyncTest "Maximum connections can be configured":
asyncTest "Maximum connections can be configured with 20 nodes":
let
maxConnections = 2
maxConnections = 20
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(
nodeKey1,
parseIpAddress("0.0.0.0"),
parseIpAddress("127.0.0.1"),
Port(60010),
maxConnections = maxConnections,
)
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(60012))
nodeKey3 = generateSecp256k1Key()
node3 = newTestWakuNode(nodeKey3, parseIpAddress("0.0.0.0"), Port(60013))
check:
# Sanity check, to verify config was applied
node1.switch.connManager.inSema.size == maxConnections
# Node with connection limit set to 1
# Initialize and start node1
await node1.start()
await node1.mountRelay()
# Remote node 1
await node2.start()
await node2.mountRelay()
# Create an array to hold the other nodes
var otherNodes: seq[WakuNode] = @[]
# Remote node 2
await node3.start()
await node3.mountRelay()
# Create and start 20 other nodes
for i in 0 ..< maxConnections + 1:
let
nodeKey = generateSecp256k1Key()
port = 60012 + i * 2 # Ensure unique ports for each node
node = newTestWakuNode(nodeKey, parseIpAddress("127.0.0.1"), Port(port))
await node.start()
await node.mountRelay()
otherNodes.add(node)
discard
await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo())
await sleepAsync(3.seconds)
discard
await node1.peerManager.connectPeer(node3.switch.peerInfo.toRemotePeerInfo())
# Connect all other nodes to node1
for node in otherNodes:
discard
await node1.peerManager.connectPeer(node.switch.peerInfo.toRemotePeerInfo())
await sleepAsync(2.seconds) # Small delay to avoid hammering the connection process
# Check that the number of connections matches the maxConnections
check:
# Verify that only the first connection succeeded
node1.switch.isConnected(node2.switch.peerInfo.peerId)
node1.switch.isConnected(node3.switch.peerInfo.peerId) == false
node1.switch.isConnected(otherNodes[0].switch.peerInfo.peerId)
node1.switch.isConnected(otherNodes[8].switch.peerInfo.peerId)
node1.switch.isConnected(otherNodes[14].switch.peerInfo.peerId)
node1.switch.isConnected(otherNodes[20].switch.peerInfo.peerId) == false
await allFutures([node1.stop(), node2.stop(), node3.stop()])
# Stop all nodes
var stopFutures = @[node1.stop()]
for node in otherNodes:
stopFutures.add(node.stop())
await allFutures(stopFutures)
asyncTest "Messages fails with wrong key path":
let nodeKey1 = generateSecp256k1Key()

View File

@ -36,6 +36,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
dnsAddrsNameServers: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
nat: "any",
maxConnections: 50,
relayServiceRatio: "60:40",
maxMessageSize: "1024 KiB",
clusterId: DefaultClusterId,
shards: @[DefaultShardId],

View File

@ -1,4 +1,4 @@
import std/strutils, results, regex
import std/[strutils, math], results, regex
proc parseMsgSize*(input: string): Result[uint64, string] =
## Parses size strings such as "1.2 KiB" or "3Kb" and returns the equivalent number of bytes
@ -49,3 +49,26 @@ proc parseCorrectMsgSize*(input: string): uint64 =
let ret = parseMsgSize(input).valueOr:
return 0
return ret
proc parseRelayServiceRatio*(ratio: string): Result[(float, float), string] =
## Parses a relay/service ratio string to [ float, float ]. The total should sum 100%
## e.g., (0.4, 0.6) == parseRelayServiceRatio("40:60")
let elements = ratio.split(":")
if elements.len != 2:
return err("expected format 'X:Y', ratio = " & ratio)
var relayRatio, serviceRatio: float
try:
relayRatio = parseFloat(elements[0])
serviceRatio = parseFloat(elements[1])
except ValueError:
return err("failed to parse ratio numbers: " & ratio)
if relayRatio < 0 or serviceRatio < 0:
return err("relay service ratio must be non-negative, ratio = " & ratio)
let total = relayRatio + serviceRatio
if int(total) != 100:
return err("total ratio should be 100, total = " & $total)
ok((relayRatio / 100.0, serviceRatio / 100.0))

View File

@ -1,7 +1,7 @@
{.push raises: [].}
import
std/[options, net],
std/[options, net, math],
results,
chronicles,
libp2p/crypto/crypto,
@ -15,7 +15,8 @@ import
../discovery/waku_discv5,
../waku_node,
../node/peer_manager,
../common/rate_limit/setting
../common/rate_limit/setting,
../common/utils/parse_size_units
type
WakuNodeBuilder* = object # General
@ -29,7 +30,8 @@ type
peerStorageCapacity: Option[int]
# Peer manager config
maxRelayPeers: Option[int]
maxRelayPeers: int
maxServicePeers: int
colocationLimit: int
shardAware: bool
@ -108,9 +110,17 @@ proc withPeerStorage*(
builder.peerStorageCapacity = capacity
proc withPeerManagerConfig*(
builder: var WakuNodeBuilder, maxRelayPeers = none(int), shardAware = false
builder: var WakuNodeBuilder,
maxConnections: int,
relayServiceRatio: string,
shardAware = false,
) =
builder.maxRelayPeers = maxRelayPeers
let (relayRatio, serviceRatio) = parseRelayServiceRatio(relayServiceRatio).get()
var relayPeers = int(ceil(float(maxConnections) * relayRatio))
var servicePeers = int(floor(float(maxConnections) * serviceRatio))
builder.maxServicePeers = servicePeers
builder.maxRelayPeers = relayPeers
builder.shardAware = shardAware
proc withColocationLimit*(builder: var WakuNodeBuilder, colocationLimit: int) =
@ -190,7 +200,8 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
let peerManager = PeerManager.new(
switch = switch,
storage = builder.peerStorage.get(nil),
maxRelayPeers = builder.maxRelayPeers,
maxRelayPeers = some(builder.maxRelayPeers),
maxServicePeers = some(builder.maxServicePeers),
colocationLimit = builder.colocationLimit,
shardedPeerManagement = builder.shardAware,
)

View File

@ -197,7 +197,20 @@ type WakuNodeConf* = object
desc: "Maximum allowed number of libp2p connections.",
defaultValue: 50,
name: "max-connections"
.}: uint16
.}: int
maxRelayPeers* {.
desc:
"Deprecated. Use relay-service-ratio instead. It represents the maximum allowed number of relay peers.",
name: "max-relay-peers"
.}: Option[int]
relayServiceRatio* {.
desc:
"This percentage ratio represents the relay peers to service peers. For example, 60:40, tells that 60% of the max-connections will be used for relay protocol and the other 40% of max-connections will be reserved for other service protocols (e.g., filter, lightpush, store, metadata, etc.)",
name: "relay-service-ratio",
defaultValue: "60:40" # 60:40 ratio of relay to service peers
.}: string
colocationLimit* {.
desc:
@ -206,10 +219,6 @@ type WakuNodeConf* = object
name: "ip-colocation-limit"
.}: int
maxRelayPeers* {.
desc: "Maximum allowed number of relay peers.", name: "max-relay-peers"
.}: Option[int]
peerStoreCapacity* {.
desc: "Maximum stored peers in the peerstore.", name: "peer-store-capacity"
.}: Option[int]

View File

@ -102,9 +102,27 @@ proc initNode(
agentString = some(conf.agentString),
)
builder.withColocationLimit(conf.colocationLimit)
builder.withPeerManagerConfig(
maxRelayPeers = conf.maxRelayPeers, shardAware = conf.relayShardedPeerManagement
)
if conf.maxRelayPeers.isSome():
let
maxRelayPeers = conf.maxRelayPeers.get()
maxConnections = conf.maxConnections
# Calculate the ratio as percentages
relayRatio = (maxRelayPeers.float / maxConnections.float) * 100
serviceRatio = 100 - relayRatio
builder.withPeerManagerConfig(
maxConnections = conf.maxConnections,
relayServiceRatio = $relayRatio & ":" & $serviceRatio,
shardAware = conf.relayShardedPeerManagement,
)
error "maxRelayPeers is deprecated. It is recommended to use relayServiceRatio instead. If relayServiceRatio is not set, it will be automatically calculated based on maxConnections and maxRelayPeers."
else:
builder.withPeerManagerConfig(
maxConnections = conf.maxConnections,
relayServiceRatio = conf.relayServiceRatio,
shardAware = conf.relayShardedPeerManagement,
)
builder.withRateLimit(conf.rateLimits)
builder.withCircuitRelay(relay)

View File

@ -79,7 +79,7 @@ proc logConfig(conf: WakuNodeConf) =
lightpush = conf.lightpush,
peerExchange = conf.peerExchange
info "Configuration. Network", cluster = conf.clusterId, maxPeers = conf.maxRelayPeers
info "Configuration. Network", cluster = conf.clusterId
for shard in conf.shards:
info "Configuration. Shards", shard = shard

View File

@ -1,7 +1,7 @@
{.push raises: [].}
import
std/[options, sets, sequtils, times, strutils, math, random],
std/[options, sets, sequtils, times, strformat, strutils, math, random],
chronos,
chronicles,
metrics,
@ -13,8 +13,10 @@ import
import
../../common/nimchronos,
../../common/enr,
../../common/utils/parse_size_units,
../../waku_core,
../../waku_relay,
../../waku_relay/protocol,
../../waku_enr/sharding,
../../waku_enr/capabilities,
../../waku_metadata,
@ -84,7 +86,9 @@ type PeerManager* = ref object of RootObj
maxFailedAttempts*: int
storage*: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
relayServiceRatio*: string
maxRelayPeers*: int
maxServicePeers*: int
outRelayPeersTarget: int
inRelayPeersTarget: int
ipTable*: Table[string, seq[PeerId]]
@ -265,6 +269,12 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str
warn "Can't add relay peer to service peers slots"
return
# Check if the number of service peers has reached the maximum limit
if pm.serviceSlots.len >= pm.maxServicePeers:
warn "Maximum number of service peers reached. Cannot add more.",
peerId = remotePeerInfo.peerId, service = proto
return
info "Adding peer to service slots",
peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto
waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]])
@ -970,6 +980,8 @@ proc new*(
switch: Switch,
wakuMetadata: WakuMetadata = nil,
maxRelayPeers: Option[int] = none(int),
maxServicePeers: Option[int] = none(int),
relayServiceRatio: string = "60:40",
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
@ -986,23 +998,26 @@ proc new*(
Defect, "Max number of connections can't be greater than PeerManager capacity"
)
var maxRelayPeersValue = 0
if maxRelayPeers.isSome():
if maxRelayPeers.get() > maxConnections:
error "Max number of relay peers can't be greater than the max amount of connections",
maxConnections = maxConnections, maxRelayPeers = maxRelayPeers.get()
raise newException(
Defect,
"Max number of relay peers can't be greater than the max amount of connections",
)
var relayRatio: float64
var serviceRatio: float64
(relayRatio, serviceRatio) = parseRelayServiceRatio(relayServiceRatio).get()
if maxRelayPeers.get() == maxConnections:
warn "Max number of relay peers is equal to max amount of connections, peer won't be contributing to service peers",
maxConnections = maxConnections, maxRelayPeers = maxRelayPeers.get()
maxRelayPeersValue = maxRelayPeers.get()
else:
# Leave by default 20% of connections for service peers
maxRelayPeersValue = maxConnections - (maxConnections div 5)
var relayPeers = int(ceil(float(maxConnections) * relayRatio))
var servicePeers = int(floor(float(maxConnections) * serviceRatio))
let minRelayPeers = WakuRelay.getDHigh()
if relayPeers < minRelayPeers:
let errorMsg =
fmt"""Doesn't fulfill minimum criteria for relay (which increases the chance of the node becoming isolated.)
relayPeers: {relayPeers}, should be greater or equal than minRelayPeers: {minRelayPeers}
relayServiceRatio: {relayServiceRatio}
maxConnections: {maxConnections}"""
error "Wrong relay peers config", error = errorMsg
return
let outRelayPeersTarget = relayPeers div 3
let inRelayPeersTarget = relayPeers - outRelayPeersTarget
# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
@ -1010,8 +1025,6 @@ proc new*(
error "Max backoff time can't be over 1 week", maxBackoff = backoff
raise newException(Defect, "Max backoff time can't be over 1 week")
let outRelayPeersTarget = maxRelayPeersValue div 3
let pm = PeerManager(
switch: switch,
wakuMetadata: wakuMetadata,
@ -1019,9 +1032,10 @@ proc new*(
storage: storage,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
maxRelayPeers: relayPeers,
maxServicePeers: servicePeers,
outRelayPeersTarget: outRelayPeersTarget,
inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget,
maxRelayPeers: maxRelayPeersValue,
inRelayPeersTarget: inRelayPeersTarget,
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit,
shardedPeerManagement: shardedPeerManagement,

View File

@ -314,6 +314,9 @@ proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
## Observes when a message is sent/received from the GossipSub PoV
procCall GossipSub(w).addObserver(observer)
proc getDHigh*(T: type WakuRelay): int =
return GossipsubParameters.dHigh
proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] =
## Returns the number of peers in a mesh defined by the passed pubsub topic.
## The 'mesh' atribute is defined in the GossipSub ref object.