mirror of https://github.com/waku-org/nwaku.git
feat: limit relay connections below max conns (#1813)
This commit is contained in:
parent
661638daaa
commit
17b24cded6
|
@ -291,7 +291,7 @@ proc initNode(conf: WakuNodeConf,
|
|||
sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
|
||||
agentString = some(conf.agentString)
|
||||
)
|
||||
builder.withPeerManagerConfig(maxRelayPeers = some(conf.maxRelayPeers.int))
|
||||
builder.withPeerManagerConfig(maxRelayPeers = conf.maxRelayPeers)
|
||||
|
||||
node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)
|
||||
|
||||
|
|
|
@ -95,8 +95,7 @@ type
|
|||
|
||||
maxRelayPeers* {.
|
||||
desc: "Maximum allowed number of relay peers."
|
||||
defaultValue: 50
|
||||
name: "max-relay-peers" }: uint16
|
||||
name: "max-relay-peers" }: Option[int]
|
||||
|
||||
peerStoreCapacity* {.
|
||||
desc: "Maximum stored peers in the peerstore."
|
||||
|
|
|
@ -607,7 +607,7 @@ procSuite "Peer Manager":
|
|||
.withMaxConnections(5)
|
||||
.build(),
|
||||
maxFailedAttempts = 1,
|
||||
maxRelayPeers = 5,
|
||||
maxRelayPeers = some(5),
|
||||
storage = nil)
|
||||
|
||||
# Create 15 peers and add them to the peerstore
|
||||
|
@ -660,7 +660,7 @@ procSuite "Peer Manager":
|
|||
initialBackoffInSec = 1, # with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
|
||||
backoffFactor = 2,
|
||||
maxFailedAttempts = 10,
|
||||
maxRelayPeers = 5,
|
||||
maxRelayPeers = some(5),
|
||||
storage = nil)
|
||||
var p1: PeerId
|
||||
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
|
||||
|
@ -709,7 +709,7 @@ procSuite "Peer Manager":
|
|||
.withPeerStore(10)
|
||||
.withMaxConnections(5)
|
||||
.build(),
|
||||
maxRelayPeers = 5,
|
||||
maxRelayPeers = some(5),
|
||||
maxFailedAttempts = 150,
|
||||
storage = nil)
|
||||
|
||||
|
@ -721,7 +721,7 @@ procSuite "Peer Manager":
|
|||
.withMaxConnections(5)
|
||||
.build(),
|
||||
maxFailedAttempts = 10,
|
||||
maxRelayPeers = 5,
|
||||
maxRelayPeers = some(5),
|
||||
storage = nil)
|
||||
|
||||
let pm = PeerManager.new(
|
||||
|
@ -730,7 +730,7 @@ procSuite "Peer Manager":
|
|||
.withMaxConnections(5)
|
||||
.build(),
|
||||
maxFailedAttempts = 5,
|
||||
maxRelayPeers = 5,
|
||||
maxRelayPeers = some(5),
|
||||
storage = nil)
|
||||
|
||||
asyncTest "colocationLimit is enforced by pruneConnsByIp()":
|
||||
|
|
|
@ -145,13 +145,6 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
|
|||
if builder.record.isNone():
|
||||
return err("node record is required")
|
||||
|
||||
# fallbck to max connections if not set
|
||||
var maxRelayPeers: int
|
||||
if builder.maxRelayPeers.isNone():
|
||||
maxRelayPeers = builder.switchMaxConnections.get(builders.MaxConnections)
|
||||
else:
|
||||
maxRelayPeers = builder.maxRelayPeers.get()
|
||||
|
||||
var switch: Switch
|
||||
try:
|
||||
switch = newWakuSwitch(
|
||||
|
@ -176,7 +169,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
|
|||
let peerManager = PeerManager.new(
|
||||
switch = switch,
|
||||
storage = builder.peerStorage.get(nil),
|
||||
maxRelayPeers = maxRelayPeers,
|
||||
maxRelayPeers = builder.maxRelayPeers,
|
||||
)
|
||||
|
||||
var node: WakuNode
|
||||
|
|
|
@ -56,7 +56,7 @@ const
|
|||
PrunePeerStoreInterval = chronos.minutes(5)
|
||||
|
||||
# How often metrics and logs are shown/updated
|
||||
LogAndMetricsInterval = chronos.seconds(60)
|
||||
LogAndMetricsInterval = chronos.minutes(3)
|
||||
|
||||
# Max peers that we allow from the same IP
|
||||
ColocationLimit = 5
|
||||
|
@ -71,7 +71,8 @@ type
|
|||
storage: PeerStorage
|
||||
serviceSlots*: Table[string, RemotePeerInfo]
|
||||
maxRelayPeers*: int
|
||||
outPeersTarget*: int
|
||||
outRelayPeersTarget: int
|
||||
inRelayPeersTarget: int
|
||||
ipTable*: Table[string, seq[PeerId]]
|
||||
colocationLimit*: int
|
||||
started: bool
|
||||
|
@ -343,7 +344,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
|||
|
||||
proc new*(T: type PeerManager,
|
||||
switch: Switch,
|
||||
maxRelayPeers: int = 50,
|
||||
maxRelayPeers: Option[int] = none(int),
|
||||
storage: PeerStorage = nil,
|
||||
initialBackoffInSec = InitialBackoffInSec,
|
||||
backoffFactor = BackoffFactor,
|
||||
|
@ -358,16 +359,22 @@ proc new*(T: type PeerManager,
|
|||
maxConnections = maxConnections
|
||||
raise newException(Defect, "Max number of connections can't be greater than PeerManager capacity")
|
||||
|
||||
if maxRelayPeers > maxConnections:
|
||||
error "Max number of relay peers can't be greater the max amount of connections",
|
||||
maxConnections = maxConnections,
|
||||
maxRelayPeers = maxRelayPeers
|
||||
raise newException(Defect, "Max number of relay peers can't be greater the max amount of connections")
|
||||
var maxRelayPeersValue = 0
|
||||
if maxRelayPeers.isSome():
|
||||
if maxRelayPeers.get() > maxConnections:
|
||||
error "Max number of relay peers can't be greater than the max amount of connections",
|
||||
maxConnections = maxConnections,
|
||||
maxRelayPeers = maxRelayPeers.get()
|
||||
raise newException(Defect, "Max number of relay peers can't be greater than the max amount of connections")
|
||||
|
||||
if maxRelayPeers == maxConnections:
|
||||
warn "Max number of relay peers is equal to max amount of connections, peer wont be contribute to service peers",
|
||||
maxConnections = maxConnections,
|
||||
maxRelayPeers = maxRelayPeers
|
||||
if maxRelayPeers.get() == maxConnections:
|
||||
warn "Max number of relay peers is equal to max amount of connections, peer won't be contributing to service peers",
|
||||
maxConnections = maxConnections,
|
||||
maxRelayPeers = maxRelayPeers.get()
|
||||
maxRelayPeersValue = maxRelayPeers.get()
|
||||
else:
|
||||
# Leave by default 20% of connections for service peers
|
||||
maxRelayPeersValue = maxConnections - (maxConnections div 5)
|
||||
|
||||
# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
|
||||
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
|
||||
|
@ -376,15 +383,18 @@ proc new*(T: type PeerManager,
|
|||
maxBackoff=backoff
|
||||
raise newException(Defect, "Max backoff time can't be over 1 week")
|
||||
|
||||
let outRelayPeersTarget = max(maxRelayPeersValue div 3, 10)
|
||||
|
||||
let pm = PeerManager(switch: switch,
|
||||
peerStore: switch.peerStore,
|
||||
storage: storage,
|
||||
initialBackoffInSec: initialBackoffInSec,
|
||||
backoffFactor: backoffFactor,
|
||||
outPeersTarget: max(maxConnections div 2, 10),
|
||||
outRelayPeersTarget: outRelayPeersTarget,
|
||||
inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget,
|
||||
maxRelayPeers: maxRelayPeersValue,
|
||||
maxFailedAttempts: maxFailedAttempts,
|
||||
colocationLimit: colocationLimit,
|
||||
maxRelayPeers: maxRelayPeers)
|
||||
colocationLimit: colocationLimit)
|
||||
|
||||
proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
||||
onConnEvent(pm, peerId, event)
|
||||
|
@ -571,16 +581,12 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
|||
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
||||
let maxConnections = pm.switch.connManager.inSema.size
|
||||
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
|
||||
let inPeersTarget = maxConnections - pm.outPeersTarget
|
||||
let inPeersTarget = maxConnections - pm.outRelayPeersTarget
|
||||
|
||||
if inRelayPeers.len > inPeersTarget:
|
||||
await pm.pruneInRelayConns(inRelayPeers.len-inPeersTarget)
|
||||
if inRelayPeers.len > pm.inRelayPeersTarget:
|
||||
await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)
|
||||
|
||||
if outRelayPeers.len >= pm.outPeersTarget:
|
||||
return
|
||||
|
||||
# Leave some room for service peers
|
||||
if totalRelayPeers >= (maxConnections - 5):
|
||||
if outRelayPeers.len >= pm.outRelayPeersTarget:
|
||||
return
|
||||
|
||||
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
|
||||
|
@ -670,15 +676,14 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
|
|||
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
||||
let maxConnections = pm.switch.connManager.inSema.size
|
||||
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
|
||||
let inPeersTarget = maxConnections - pm.outPeersTarget
|
||||
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
|
||||
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
|
||||
let totalConnections = pm.switch.connManager.getConnections().len
|
||||
|
||||
info "Relay peer connections",
|
||||
inRelayConns = $inRelayPeers.len & "/" & $inPeersTarget,
|
||||
outRelayConns = $outRelayPeers.len & "/" & $pm.outPeersTarget,
|
||||
totalRelayConns = totalRelayPeers,
|
||||
maxConnections = maxConnections,
|
||||
inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget,
|
||||
outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget,
|
||||
totalConnections = $totalConnections & "/" & $maxConnections,
|
||||
notConnectedPeers = notConnectedPeers.len,
|
||||
outsideBackoffPeers = outsideBackoffPeers.len
|
||||
|
||||
|
|
Loading…
Reference in New Issue