mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
chore: add peer manager config to builder (#1816)
This commit is contained in:
parent
002330f4cd
commit
a67bf22e61
@ -430,6 +430,7 @@ proc initNode(conf: WakuNodeConf,
|
|||||||
agentString = some(conf.agentString)
|
agentString = some(conf.agentString)
|
||||||
)
|
)
|
||||||
builder.withWakuDiscv5(wakuDiscv5.get(nil))
|
builder.withWakuDiscv5(wakuDiscv5.get(nil))
|
||||||
|
builder.withPeerManagerConfig(maxRelayPeers = some(conf.maxRelayPeers.int))
|
||||||
|
|
||||||
node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)
|
node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)
|
||||||
|
|
||||||
|
|||||||
@ -93,6 +93,11 @@ type
|
|||||||
defaultValue: 50
|
defaultValue: 50
|
||||||
name: "max-connections" }: uint16
|
name: "max-connections" }: uint16
|
||||||
|
|
||||||
|
maxRelayPeers* {.
|
||||||
|
desc: "Maximum allowed number of relay peers."
|
||||||
|
defaultValue: 50
|
||||||
|
name: "max-relay-peers" }: uint16
|
||||||
|
|
||||||
peerStoreCapacity* {.
|
peerStoreCapacity* {.
|
||||||
desc: "Maximum stored peers in the peerstore."
|
desc: "Maximum stored peers in the peerstore."
|
||||||
name: "peer-store-capacity" }: Option[int]
|
name: "peer-store-capacity" }: Option[int]
|
||||||
|
|||||||
@ -607,6 +607,7 @@ procSuite "Peer Manager":
|
|||||||
.withMaxConnections(5)
|
.withMaxConnections(5)
|
||||||
.build(),
|
.build(),
|
||||||
maxFailedAttempts = 1,
|
maxFailedAttempts = 1,
|
||||||
|
maxRelayPeers = 5,
|
||||||
storage = nil)
|
storage = nil)
|
||||||
|
|
||||||
# Create 15 peers and add them to the peerstore
|
# Create 15 peers and add them to the peerstore
|
||||||
@ -659,6 +660,7 @@ procSuite "Peer Manager":
|
|||||||
initialBackoffInSec = 1, # with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
|
initialBackoffInSec = 1, # with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
|
||||||
backoffFactor = 2,
|
backoffFactor = 2,
|
||||||
maxFailedAttempts = 10,
|
maxFailedAttempts = 10,
|
||||||
|
maxRelayPeers = 5,
|
||||||
storage = nil)
|
storage = nil)
|
||||||
var p1: PeerId
|
var p1: PeerId
|
||||||
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
|
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
|
||||||
@ -707,6 +709,7 @@ procSuite "Peer Manager":
|
|||||||
.withPeerStore(10)
|
.withPeerStore(10)
|
||||||
.withMaxConnections(5)
|
.withMaxConnections(5)
|
||||||
.build(),
|
.build(),
|
||||||
|
maxRelayPeers = 5,
|
||||||
maxFailedAttempts = 150,
|
maxFailedAttempts = 150,
|
||||||
storage = nil)
|
storage = nil)
|
||||||
|
|
||||||
@ -718,6 +721,7 @@ procSuite "Peer Manager":
|
|||||||
.withMaxConnections(5)
|
.withMaxConnections(5)
|
||||||
.build(),
|
.build(),
|
||||||
maxFailedAttempts = 10,
|
maxFailedAttempts = 10,
|
||||||
|
maxRelayPeers = 5,
|
||||||
storage = nil)
|
storage = nil)
|
||||||
|
|
||||||
let pm = PeerManager.new(
|
let pm = PeerManager.new(
|
||||||
@ -726,6 +730,7 @@ procSuite "Peer Manager":
|
|||||||
.withMaxConnections(5)
|
.withMaxConnections(5)
|
||||||
.build(),
|
.build(),
|
||||||
maxFailedAttempts = 5,
|
maxFailedAttempts = 5,
|
||||||
|
maxRelayPeers = 5,
|
||||||
storage = nil)
|
storage = nil)
|
||||||
|
|
||||||
asyncTest "colocationLimit is enforced by pruneConnsByIp()":
|
asyncTest "colocationLimit is enforced by pruneConnsByIp()":
|
||||||
|
|||||||
@ -67,7 +67,7 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
|
|||||||
secureKey = if secureKey != "": some(secureKey) else: none(string),
|
secureKey = if secureKey != "": some(secureKey) else: none(string),
|
||||||
secureCert = if secureCert != "": some(secureCert) else: none(string),
|
secureCert = if secureCert != "": some(secureCert) else: none(string),
|
||||||
agentString = agentString,
|
agentString = agentString,
|
||||||
|
|
||||||
)
|
)
|
||||||
builder.withWakuDiscv5(wakuDiscv5.get(nil))
|
builder.withWakuDiscv5(wakuDiscv5.get(nil))
|
||||||
|
|
||||||
|
|||||||
@ -10,13 +10,15 @@ import
|
|||||||
chronicles,
|
chronicles,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/builders,
|
libp2p/builders,
|
||||||
libp2p/nameresolving/nameresolver
|
libp2p/nameresolving/nameresolver,
|
||||||
|
libp2p/transports/wstransport
|
||||||
import
|
import
|
||||||
../waku_enr,
|
../waku_enr,
|
||||||
../waku_discv5,
|
../waku_discv5,
|
||||||
./config,
|
./config,
|
||||||
./peer_manager,
|
./peer_manager,
|
||||||
./waku_node
|
./waku_node,
|
||||||
|
./waku_switch
|
||||||
|
|
||||||
|
|
||||||
type
|
type
|
||||||
@ -30,7 +32,9 @@ type
|
|||||||
# Peer storage and peer manager
|
# Peer storage and peer manager
|
||||||
peerStorage: Option[PeerStorage]
|
peerStorage: Option[PeerStorage]
|
||||||
peerStorageCapacity: Option[int]
|
peerStorageCapacity: Option[int]
|
||||||
peerManager: Option[PeerManager]
|
|
||||||
|
# Peer manager config
|
||||||
|
maxRelayPeers: Option[int]
|
||||||
|
|
||||||
# Libp2p switch
|
# Libp2p switch
|
||||||
switchMaxConnections: Option[int]
|
switchMaxConnections: Option[int]
|
||||||
@ -104,8 +108,10 @@ proc withPeerStorage*(builder: var WakuNodeBuilder, peerStorage: PeerStorage, ca
|
|||||||
|
|
||||||
builder.peerStorageCapacity = capacity
|
builder.peerStorageCapacity = capacity
|
||||||
|
|
||||||
proc withPeerManager*(builder: var WakuNodeBuilder, peerManager: PeerManager) =
|
proc withPeerManagerConfig*(builder: var WakuNodeBuilder,
|
||||||
builder.peerManager = some(peerManager)
|
maxRelayPeers = none(int)) =
|
||||||
|
builder.maxRelayPeers = maxRelayPeers
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
## Waku switch
|
## Waku switch
|
||||||
@ -149,22 +155,50 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
|
|||||||
if builder.netConfig.isNone():
|
if builder.netConfig.isNone():
|
||||||
return err("network configuration is required")
|
return err("network configuration is required")
|
||||||
|
|
||||||
|
# fallbck to max connections if not set
|
||||||
|
var maxRelayPeers: int
|
||||||
|
if builder.maxRelayPeers.isNone():
|
||||||
|
maxRelayPeers = builder.switchMaxConnections.get(builders.MaxConnections)
|
||||||
|
else:
|
||||||
|
maxRelayPeers = builder.maxRelayPeers.get()
|
||||||
|
|
||||||
|
var switch: Switch
|
||||||
|
try:
|
||||||
|
switch = newWakuSwitch(
|
||||||
|
privKey = builder.nodekey,
|
||||||
|
address = builder.netConfig.get().hostAddress,
|
||||||
|
wsAddress = builder.netConfig.get().wsHostAddress,
|
||||||
|
transportFlags = {ServerFlags.ReuseAddr},
|
||||||
|
rng = rng,
|
||||||
|
maxConnections = builder.switchMaxConnections.get(builders.MaxConnections),
|
||||||
|
wssEnabled = builder.netConfig.get().wssEnabled,
|
||||||
|
secureKeyPath = builder.switchSslSecureKey.get(""),
|
||||||
|
secureCertPath = builder.switchSslSecureCert.get(""),
|
||||||
|
nameResolver = builder.switchNameResolver.get(nil),
|
||||||
|
sendSignedPeerRecord = builder.switchSendSignedPeerRecord.get(false),
|
||||||
|
agentString = builder.switchAgentString,
|
||||||
|
peerStoreCapacity = builder.peerStorageCapacity,
|
||||||
|
services = @[Service(getAutonatService(rng))],
|
||||||
|
)
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to create switch: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
let peerManager = PeerManager.new(
|
||||||
|
switch = switch,
|
||||||
|
storage = builder.peerStorage.get(nil),
|
||||||
|
maxRelayPeers = maxRelayPeers,
|
||||||
|
)
|
||||||
|
|
||||||
var node: WakuNode
|
var node: WakuNode
|
||||||
try:
|
try:
|
||||||
node = WakuNode.new(
|
node = WakuNode.new(
|
||||||
rng = rng,
|
|
||||||
nodeKey = builder.nodeKey.get(),
|
nodeKey = builder.nodeKey.get(),
|
||||||
netConfig = builder.netConfig.get(),
|
netConfig = builder.netConfig.get(),
|
||||||
enr = builder.record,
|
enr = builder.record,
|
||||||
peerStorage = builder.peerStorage.get(nil),
|
switch = switch,
|
||||||
peerStoreCapacity = builder.peerStorageCapacity,
|
|
||||||
maxConnections = builder.switchMaxConnections.get(builders.MaxConnections),
|
|
||||||
nameResolver = builder.switchNameResolver.get(nil),
|
|
||||||
agentString = builder.switchAgentString,
|
|
||||||
secureKey = builder.switchSslSecureKey.get(""),
|
|
||||||
secureCert = builder.switchSslSecureCert.get(""),
|
|
||||||
sendSignedPeerRecord = builder.switchSendSignedPeerRecord.get(false),
|
|
||||||
wakuDiscv5 = builder.wakuDiscv5,
|
wakuDiscv5 = builder.wakuDiscv5,
|
||||||
|
peerManager = peerManager,
|
||||||
|
rng = rng,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
return err("failed to build WakuNode instance: " & getCurrentExceptionMsg())
|
return err("failed to build WakuNode instance: " & getCurrentExceptionMsg())
|
||||||
|
|||||||
@ -73,6 +73,7 @@ type
|
|||||||
maxFailedAttempts*: int
|
maxFailedAttempts*: int
|
||||||
storage: PeerStorage
|
storage: PeerStorage
|
||||||
serviceSlots*: Table[string, RemotePeerInfo]
|
serviceSlots*: Table[string, RemotePeerInfo]
|
||||||
|
maxRelayPeers*: int
|
||||||
outPeersTarget*: int
|
outPeersTarget*: int
|
||||||
ipTable*: Table[string, seq[PeerId]]
|
ipTable*: Table[string, seq[PeerId]]
|
||||||
colocationLimit*: int
|
colocationLimit*: int
|
||||||
@ -333,6 +334,7 @@ proc updateIpTable*(pm: PeerManager) =
|
|||||||
|
|
||||||
proc new*(T: type PeerManager,
|
proc new*(T: type PeerManager,
|
||||||
switch: Switch,
|
switch: Switch,
|
||||||
|
maxRelayPeers: int = 50,
|
||||||
storage: PeerStorage = nil,
|
storage: PeerStorage = nil,
|
||||||
initialBackoffInSec = InitialBackoffInSec,
|
initialBackoffInSec = InitialBackoffInSec,
|
||||||
backoffFactor = BackoffFactor,
|
backoffFactor = BackoffFactor,
|
||||||
@ -347,6 +349,17 @@ proc new*(T: type PeerManager,
|
|||||||
maxConnections = maxConnections
|
maxConnections = maxConnections
|
||||||
raise newException(Defect, "Max number of connections can't be greater than PeerManager capacity")
|
raise newException(Defect, "Max number of connections can't be greater than PeerManager capacity")
|
||||||
|
|
||||||
|
if maxRelayPeers > maxConnections:
|
||||||
|
error "Max number of relay peers can't be greater the max amount of connections",
|
||||||
|
maxConnections = maxConnections,
|
||||||
|
maxRelayPeers = maxRelayPeers
|
||||||
|
raise newException(Defect, "Max number of relay peers can't be greater the max amount of connections")
|
||||||
|
|
||||||
|
if maxRelayPeers == maxConnections:
|
||||||
|
warn "Max number of relay peers is equal to max amount of connections, peer wont be contribute to service peers",
|
||||||
|
maxConnections = maxConnections,
|
||||||
|
maxRelayPeers = maxRelayPeers
|
||||||
|
|
||||||
# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
|
# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
|
||||||
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
|
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
|
||||||
if backoff.weeks() > 1:
|
if backoff.weeks() > 1:
|
||||||
@ -361,7 +374,8 @@ proc new*(T: type PeerManager,
|
|||||||
backoffFactor: backoffFactor,
|
backoffFactor: backoffFactor,
|
||||||
outPeersTarget: max(maxConnections div 2, 10),
|
outPeersTarget: max(maxConnections div 2, 10),
|
||||||
maxFailedAttempts: maxFailedAttempts,
|
maxFailedAttempts: maxFailedAttempts,
|
||||||
colocationLimit: colocationLimit)
|
colocationLimit: colocationLimit,
|
||||||
|
maxRelayPeers: maxRelayPeers)
|
||||||
|
|
||||||
proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
||||||
onConnEvent(pm, peerId, event)
|
onConnEvent(pm, peerId, event)
|
||||||
|
|||||||
@ -159,15 +159,9 @@ proc new*(T: type WakuNode,
|
|||||||
nodeKey: crypto.PrivateKey,
|
nodeKey: crypto.PrivateKey,
|
||||||
netConfig: NetConfig,
|
netConfig: NetConfig,
|
||||||
enr: Option[enr.Record],
|
enr: Option[enr.Record],
|
||||||
peerStorage: PeerStorage = nil,
|
switch: Switch,
|
||||||
maxConnections = builders.MaxConnections,
|
|
||||||
secureKey: string = "",
|
|
||||||
secureCert: string = "",
|
|
||||||
nameResolver: NameResolver = nil,
|
|
||||||
sendSignedPeerRecord = false,
|
|
||||||
wakuDiscv5 = none(WakuDiscoveryV5),
|
wakuDiscv5 = none(WakuDiscoveryV5),
|
||||||
agentString = none(string), # defaults to nim-libp2p version
|
peerManager: PeerManager,
|
||||||
peerStoreCapacity = none(int), # defaults to 1.25 maxConnections
|
|
||||||
# TODO: make this argument required after tests are updated
|
# TODO: make this argument required after tests are updated
|
||||||
rng: ref HmacDrbgContext = crypto.newRng()
|
rng: ref HmacDrbgContext = crypto.newRng()
|
||||||
): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
|
): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
|
||||||
@ -175,23 +169,6 @@ proc new*(T: type WakuNode,
|
|||||||
|
|
||||||
info "Initializing networking", addrs= $netConfig.announcedAddresses
|
info "Initializing networking", addrs= $netConfig.announcedAddresses
|
||||||
|
|
||||||
let switch = newWakuSwitch(
|
|
||||||
some(nodekey),
|
|
||||||
address = netConfig.hostAddress,
|
|
||||||
wsAddress = netConfig.wsHostAddress,
|
|
||||||
transportFlags = {ServerFlags.ReuseAddr},
|
|
||||||
rng = rng,
|
|
||||||
maxConnections = maxConnections,
|
|
||||||
wssEnabled = netConfig.wssEnabled,
|
|
||||||
secureKeyPath = secureKey,
|
|
||||||
secureCertPath = secureCert,
|
|
||||||
nameResolver = nameResolver,
|
|
||||||
sendSignedPeerRecord = sendSignedPeerRecord,
|
|
||||||
agentString = agentString,
|
|
||||||
peerStoreCapacity = peerStoreCapacity,
|
|
||||||
services = @[Service(getAutonatService(rng))],
|
|
||||||
)
|
|
||||||
|
|
||||||
let enr =
|
let enr =
|
||||||
if enr.isNone():
|
if enr.isNone():
|
||||||
let nodeEnrRes = getEnr(netConfig, wakuDiscv5, nodekey)
|
let nodeEnrRes = getEnr(netConfig, wakuDiscv5, nodekey)
|
||||||
@ -203,7 +180,7 @@ proc new*(T: type WakuNode,
|
|||||||
else: enr.get()
|
else: enr.get()
|
||||||
|
|
||||||
return WakuNode(
|
return WakuNode(
|
||||||
peerManager: PeerManager.new(switch, peerStorage),
|
peerManager: peerManager,
|
||||||
switch: switch,
|
switch: switch,
|
||||||
rng: rng,
|
rng: rng,
|
||||||
enr: enr,
|
enr: enr,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user