refactor: re-arrange function based on responsibility of peer-manager (#3086)

This commit is contained in:
Darshan K 2024-10-04 15:23:20 +05:30 committed by GitHub
parent eb2bbae665
commit 0f8e874000
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 434 additions and 418 deletions

View File

@ -88,6 +88,17 @@ type PeerManager* = ref object of RootObj
started: bool started: bool
shardedPeerManagement: bool # temp feature flag shardedPeerManagement: bool # temp feature flag
#~~~~~~~~~~~~~~~~~~~#
# Helper Functions #
#~~~~~~~~~~~~~~~~~~~#
proc calculateBackoff(
initialBackoffInSec: int, backoffFactor: int, failedAttempts: int
): timer.Duration =
if failedAttempts == 0:
return chronos.seconds(0)
return chronos.seconds(initialBackoffInSec * (backoffFactor ^ (failedAttempts - 1)))
proc protocolMatcher*(codec: string): Matcher = proc protocolMatcher*(codec: string): Matcher =
## Returns a protocol matcher function for the provided codec ## Returns a protocol matcher function for the provided codec
proc match(proto: string): bool {.gcsafe.} = proc match(proto: string): bool {.gcsafe.} =
@ -98,16 +109,9 @@ proc protocolMatcher*(codec: string): Matcher =
return match return match
proc calculateBackoff( #~~~~~~~~~~~~~~~~~~~~~~~~~~#
initialBackoffInSec: int, backoffFactor: int, failedAttempts: int # Peer Storage Management #
): timer.Duration = #~~~~~~~~~~~~~~~~~~~~~~~~~~#
if failedAttempts == 0:
return chronos.seconds(0)
return chronos.seconds(initialBackoffInSec * (backoffFactor ^ (failedAttempts - 1)))
####################
# Helper functions #
####################
proc insertOrReplace(ps: PeerStorage, remotePeerInfo: RemotePeerInfo) {.gcsafe.} = proc insertOrReplace(ps: PeerStorage, remotePeerInfo: RemotePeerInfo) {.gcsafe.} =
## Insert peer entry into persistent storage, or replace existing entry with updated info ## Insert peer entry into persistent storage, or replace existing entry with updated info
@ -167,6 +171,109 @@ proc addPeer*(
pm.storage.insertOrReplace(remotePeerInfo) pm.storage.insertOrReplace(remotePeerInfo)
proc loadFromStorage(pm: PeerManager) {.gcsafe.} =
## Load peers from storage, if available
trace "loading peers from storage"
var amount = 0
proc onData(remotePeerInfo: RemotePeerInfo) =
let peerId = remotePeerInfo.peerId
if pm.switch.peerInfo.peerId == peerId:
# Do not manage self
return
trace "loading peer",
peerId = peerId,
address = remotePeerInfo.addrs,
protocols = remotePeerInfo.protocols,
agent = remotePeerInfo.agent,
version = remotePeerInfo.protoVersion
# nim-libp2p books
pm.wakuPeerStore[AddressBook][peerId] = remotePeerInfo.addrs
pm.wakuPeerStore[ProtoBook][peerId] = remotePeerInfo.protocols
pm.wakuPeerStore[KeyBook][peerId] = remotePeerInfo.publicKey
pm.wakuPeerStore[AgentBook][peerId] = remotePeerInfo.agent
pm.wakuPeerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion
# custom books
pm.wakuPeerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state
pm.wakuPeerStore[DisconnectBook][peerId] = remotePeerInfo.disconnectTime
pm.wakuPeerStore[SourceBook][peerId] = remotePeerInfo.origin
if remotePeerInfo.enr.isSome():
pm.wakuPeerStore[ENRBook][peerId] = remotePeerInfo.enr.get()
amount.inc()
pm.storage.getAll(onData).isOkOr:
warn "loading peers from storage failed", err = error
waku_peers_errors.inc(labelValues = ["storage_load_failure"])
return
trace "recovered peers from storage", amount = amount
proc selectPeer*(
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
): Option[RemotePeerInfo] =
trace "Selecting peer from peerstore", protocol = proto
# Selects the best peer for a given protocol
var peers = pm.wakuPeerStore.getPeersByProtocol(proto)
if shard.isSome():
peers.keepItIf((it.enr.isSome() and it.enr.get().containsShard(shard.get())))
# No criteria for selecting a peer for WakuRelay, random one
if proto == WakuRelayCodec:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
if peers.len > 0:
trace "Got peer from peerstore",
peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto
return some(peers[0])
trace "No peer found for protocol", protocol = proto
return none(RemotePeerInfo)
# For other protocols, we select the peer that is slotted for the given protocol
pm.serviceSlots.withValue(proto, serviceSlot):
trace "Got peer from service slots",
peerId = serviceSlot[].peerId, multi = serviceSlot[].addrs[0], protocol = proto
return some(serviceSlot[])
# If not slotted, we select a random peer for the given protocol
if peers.len > 0:
trace "Got peer from peerstore",
peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto
return some(peers[0])
trace "No peer found for protocol", protocol = proto
return none(RemotePeerInfo)
# Adds a peer to the service slots, which is a list of peers that are slotted for a given protocol
proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Do not add relay peers
if proto == WakuRelayCodec:
warn "Can't add relay peer to service peers slots"
return
info "Adding peer to service slots",
peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto
waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]])
# Set peer for service slot
pm.serviceSlots[proto] = remotePeerInfo
pm.addPeer(remotePeerInfo)
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
# Connection Lifecycle Management #
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
# require pre-connection
proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.}
# Connects to a given node. Note that this function uses `connect` and # Connects to a given node. Note that this function uses `connect` and
# does not provide a protocol. Streams for relay (gossipsub) are created # does not provide a protocol. Streams for relay (gossipsub) are created
# automatically without the needing to dial. # automatically without the needing to dial.
@ -227,6 +334,53 @@ proc connectRelay*(
return false return false
proc connectToNodes*(
pm: PeerManager,
nodes: seq[string] | seq[RemotePeerInfo],
dialTimeout = DefaultDialTimeout,
source = "api",
) {.async.} =
if nodes.len == 0:
return
info "Dialing multiple peers", numOfPeers = nodes.len, nodes = $nodes
var futConns: seq[Future[bool]]
var connectedPeers: seq[RemotePeerInfo]
for node in nodes:
let node = parsePeerInfo(node)
if node.isOk():
futConns.add(pm.connectRelay(node.value))
connectedPeers.add(node.value)
else:
error "Couldn't parse node info", error = node.error
await allFutures(futConns)
# Filtering successful connectedPeers based on futConns
let combined = zip(connectedPeers, futConns)
connectedPeers = combined.filterIt(it[1].read() == true).mapIt(it[0])
when defined(debugDiscv5):
let peerIds = connectedPeers.mapIt(it.peerId)
let origin = connectedPeers.mapIt(it.origin)
if peerIds.len > 0:
notice "established connections with found peers",
peerIds = peerIds.mapIt(shortLog(it)), origin = origin
else:
notice "could not connect to new peers", attempted = nodes.len
info "Finished dialing multiple peers",
successfulConns = connectedPeers.len, attempted = nodes.len
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
# fully connected.
#
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
# later.
await sleepAsync(chronos.seconds(5))
proc disconnectNode*(pm: PeerManager, peer: RemotePeerInfo) {.async.} = proc disconnectNode*(pm: PeerManager, peer: RemotePeerInfo) {.async.} =
let peerId = peer.peerId let peerId = peer.peerId
await pm.switch.disconnect(peerId) await pm.switch.disconnect(peerId)
@ -267,50 +421,39 @@ proc dialPeer(
return none(Connection) return none(Connection)
proc loadFromStorage(pm: PeerManager) {.gcsafe.} = proc dialPeer*(
## Load peers from storage, if available pm: PeerManager,
remotePeerInfo: RemotePeerInfo,
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api",
): Future[Option[Connection]] {.async.} =
# Dial a given peer and add it to the list of known peers
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
trace "loading peers from storage" # First add dialed peer info to peer store, if it does not exist yet..
# TODO: nim libp2p peerstore already adds them
if not pm.wakuPeerStore.hasPeer(remotePeerInfo.peerId, proto):
trace "Adding newly dialed peer to manager",
peerId = $remotePeerInfo.peerId, address = $remotePeerInfo.addrs[0], proto = proto
pm.addPeer(remotePeerInfo)
var amount = 0 return await pm.dialPeer(
remotePeerInfo.peerId, remotePeerInfo.addrs, proto, dialTimeout, source
)
proc onData(remotePeerInfo: RemotePeerInfo) = proc dialPeer*(
let peerId = remotePeerInfo.peerId pm: PeerManager,
peerId: PeerID,
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api",
): Future[Option[Connection]] {.async.} =
# Dial an existing peer by looking up it's existing addrs in the switch's peerStore
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
if pm.switch.peerInfo.peerId == peerId: let addrs = pm.switch.peerStore[AddressBook][peerId]
# Do not manage self return await pm.dialPeer(peerId, addrs, proto, dialTimeout, source)
return
trace "loading peer",
peerId = peerId,
address = remotePeerInfo.addrs,
protocols = remotePeerInfo.protocols,
agent = remotePeerInfo.agent,
version = remotePeerInfo.protoVersion
# nim-libp2p books
pm.wakuPeerStore[AddressBook][peerId] = remotePeerInfo.addrs
pm.wakuPeerStore[ProtoBook][peerId] = remotePeerInfo.protocols
pm.wakuPeerStore[KeyBook][peerId] = remotePeerInfo.publicKey
pm.wakuPeerStore[AgentBook][peerId] = remotePeerInfo.agent
pm.wakuPeerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion
# custom books
pm.wakuPeerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state
pm.wakuPeerStore[DisconnectBook][peerId] = remotePeerInfo.disconnectTime
pm.wakuPeerStore[SourceBook][peerId] = remotePeerInfo.origin
if remotePeerInfo.enr.isSome():
pm.wakuPeerStore[ENRBook][peerId] = remotePeerInfo.enr.get()
amount.inc()
pm.storage.getAll(onData).isOkOr:
warn "loading peers from storage failed", err = error
waku_peers_errors.inc(labelValues = ["storage_load_failure"])
return
trace "recovered peers from storage", amount = amount
proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool = proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool =
# Returns if we can try to connect to this peer, based on past failed attempts # Returns if we can try to connect to this peer, based on past failed attempts
@ -335,9 +478,93 @@ proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool =
return now >= (lastFailed + backoff) return now >= (lastFailed + backoff)
################## proc connectedPeers*(
# Initialisation # pm: PeerManager, protocol: string = ""
################## ): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## If a protocol is specified, only returns peers with at least one stream of that protocol
var inPeers: seq[PeerId]
var outPeers: seq[PeerId]
for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
let streams = peerConn.getStreams()
if protocol.len == 0 or streams.anyIt(it.protocol == protocol):
if peerConn.connection.transportDir == Direction.In:
inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out:
outPeers.add(peerId)
return (inPeers, outPeers)
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
if inRelayPeers.len > pm.inRelayPeersTarget:
await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)
if outRelayPeers.len >= pm.outRelayPeersTarget:
return
let notConnectedPeers = pm.wakuPeerStore.getDisconnectedPeers()
var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
shuffle(outsideBackoffPeers)
var index = 0
var numPendingConnReqs =
min(outsideBackoffPeers.len, pm.outRelayPeersTarget - outRelayPeers.len)
## number of outstanding connection requests
while numPendingConnReqs > 0 and outRelayPeers.len < pm.outRelayPeersTarget:
let numPeersToConnect = min(numPendingConnReqs, MaxParallelDials)
await pm.connectToNodes(outsideBackoffPeers[index ..< (index + numPeersToConnect)])
(inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
index += numPeersToConnect
numPendingConnReqs -= numPeersToConnect
proc reconnectPeers*(
pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0)
) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.
debug "Reconnecting peers", proto = proto
# Proto is not persisted, we need to iterate over all peers.
for peerInfo in pm.wakuPeerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if peerInfo.connectedness == CannotConnect:
error "Not reconnecting to unreachable or non-existing peer",
peerId = peerInfo.peerId
continue
if backoffTime > ZeroDuration:
debug "Backing off before reconnect",
peerId = peerInfo.peerId, backoffTime = backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)
await pm.connectToNodes(@[peerInfo])
proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
var
numStreamsIn = 0
numStreamsOut = 0
for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
for stream in peerConn.getStreams():
if stream.protocol == protocol:
if stream.dir == Direction.In:
numStreamsIn += 1
elif stream.dir == Direction.Out:
numStreamsOut += 1
return (numStreamsIn, numStreamsOut)
proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] = proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] =
if not pm.switch.connManager.getConnections().hasKey(peerId): if not pm.switch.connManager.getConnections().hasKey(peerId):
@ -354,14 +581,9 @@ proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] =
return some(obAddr.getHostname()) return some(obAddr.getHostname())
# called when a connection i) is created or ii) is closed #~~~~~~~~~~~~~~~~~#
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = # Event Handling #
case event.kind #~~~~~~~~~~~~~~~~~#
of ConnEventKind.Connected:
#let direction = if event.incoming: Inbound else: Outbound
discard
of ConnEventKind.Disconnected:
discard
proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
let res = catch: let res = catch:
@ -404,25 +626,14 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
asyncSpawn(pm.switch.disconnect(peerId)) asyncSpawn(pm.switch.disconnect(peerId))
pm.wakuPeerStore.delete(peerId) pm.wakuPeerStore.delete(peerId)
proc connectedPeers*( # called when a connection i) is created or ii) is closed
pm: PeerManager, protocol: string = "" proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
): (seq[PeerId], seq[PeerId]) = case event.kind
## Returns the peerIds of physical connections (in and out) of ConnEventKind.Connected:
## If a protocol is specified, only returns peers with at least one stream of that protocol #let direction = if event.incoming: Inbound else: Outbound
discard
var inPeers: seq[PeerId] of ConnEventKind.Disconnected:
var outPeers: seq[PeerId] discard
for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
let streams = peerConn.getStreams()
if protocol.len == 0 or streams.anyIt(it.protocol == protocol):
if peerConn.connection.transportDir == Direction.In:
inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out:
outPeers.add(peerId)
return (inPeers, outPeers)
# called when a peer i) first connects to us ii) disconnects all connections from us # called when a peer i) first connects to us ii) disconnects all connections from us
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
@ -485,280 +696,48 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
pm.storage.insertOrReplace(remotePeerInfo) pm.storage.insertOrReplace(remotePeerInfo)
proc new*( #~~~~~~~~~~~~~~~~~#
T: type PeerManager, # Metrics Logging #
switch: Switch, #~~~~~~~~~~~~~~~~~#
wakuMetadata: WakuMetadata = nil,
maxRelayPeers: Option[int] = none(int), proc logAndMetrics(pm: PeerManager) {.async.} =
storage: PeerStorage = nil, heartbeat "Scheduling log and metrics run", LogAndMetricsInterval:
initialBackoffInSec = InitialBackoffInSec, # log metrics
backoffFactor = BackoffFactor, let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
maxFailedAttempts = MaxFailedAttempts, let maxConnections = pm.switch.connManager.inSema.size
colocationLimit = DefaultColocationLimit, let notConnectedPeers = pm.wakuPeerStore.getDisconnectedPeers().mapIt(
shardedPeerManagement = false, RemotePeerInfo.init(it.peerId, it.addrs)
): PeerManager {.gcsafe.} = )
let capacity = switch.peerStore.capacity let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
let maxConnections = switch.connManager.inSema.size let totalConnections = pm.switch.connManager.getConnections().len
if maxConnections > capacity:
error "Max number of connections can't be greater than PeerManager capacity", info "Relay peer connections",
capacity = capacity, maxConnections = maxConnections inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget,
raise newException( outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget,
Defect, "Max number of connections can't be greater than PeerManager capacity" totalConnections = $totalConnections & "/" & $maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
# update prometheus metrics
for proto in pm.wakuPeerStore.getWakuProtos():
let (protoConnsIn, protoConnsOut) = pm.connectedPeers(proto)
let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto)
waku_connected_peers.set(
protoConnsIn.len.float64, labelValues = [$Direction.In, proto]
)
waku_connected_peers.set(
protoConnsOut.len.float64, labelValues = [$Direction.Out, proto]
)
waku_streams_peers.set(
protoStreamsIn.float64, labelValues = [$Direction.In, proto]
)
waku_streams_peers.set(
protoStreamsOut.float64, labelValues = [$Direction.Out, proto]
) )
var maxRelayPeersValue = 0 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
if maxRelayPeers.isSome(): # Pruning and Maintenance (Stale Peers Management) #
if maxRelayPeers.get() > maxConnections: #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
error "Max number of relay peers can't be greater than the max amount of connections",
maxConnections = maxConnections, maxRelayPeers = maxRelayPeers.get()
raise newException(
Defect,
"Max number of relay peers can't be greater than the max amount of connections",
)
if maxRelayPeers.get() == maxConnections:
warn "Max number of relay peers is equal to max amount of connections, peer won't be contributing to service peers",
maxConnections = maxConnections, maxRelayPeers = maxRelayPeers.get()
maxRelayPeersValue = maxRelayPeers.get()
else:
# Leave by default 20% of connections for service peers
maxRelayPeersValue = maxConnections - (maxConnections div 5)
# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
if backoff.weeks() > 1:
error "Max backoff time can't be over 1 week", maxBackoff = backoff
raise newException(Defect, "Max backoff time can't be over 1 week")
let outRelayPeersTarget = maxRelayPeersValue div 3
let pm = PeerManager(
switch: switch,
wakuMetadata: wakuMetadata,
wakuPeerStore: createWakuPeerStore(switch.peerStore),
storage: storage,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
outRelayPeersTarget: outRelayPeersTarget,
inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget,
maxRelayPeers: maxRelayPeersValue,
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit,
shardedPeerManagement: shardedPeerManagement,
)
proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
proc peerHook(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} =
onPeerEvent(pm, peerId, event)
proc peerStoreChanged(peerId: PeerId) {.gcsafe.} =
waku_peer_store_size.set(toSeq(pm.wakuPeerStore[AddressBook].book.keys).len.int64)
# currently disabled
#pm.switch.addConnEventHandler(connHook, ConnEventKind.Connected)
#pm.switch.addConnEventHandler(connHook, ConnEventKind.Disconnected)
pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Joined)
pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Left)
# called every time the peerstore is updated
pm.wakuPeerStore[AddressBook].addHandler(peerStoreChanged)
pm.serviceSlots = initTable[string, RemotePeerInfo]()
pm.ipTable = initTable[string, seq[PeerId]]()
if not storage.isNil():
trace "found persistent peer storage"
pm.loadFromStorage() # Load previously managed peers.
else:
trace "no peer storage found"
return pm
#####################
# Manager interface #
#####################
proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Do not add relay peers
if proto == WakuRelayCodec:
warn "Can't add relay peer to service peers slots"
return
info "Adding peer to service slots",
peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto
waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]])
# Set peer for service slot
pm.serviceSlots[proto] = remotePeerInfo
pm.addPeer(remotePeerInfo)
####################
# Dialer interface #
####################
proc dialPeer*(
pm: PeerManager,
remotePeerInfo: RemotePeerInfo,
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api",
): Future[Option[Connection]] {.async.} =
# Dial a given peer and add it to the list of known peers
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
# First add dialed peer info to peer store, if it does not exist yet..
# TODO: nim libp2p peerstore already adds them
if not pm.wakuPeerStore.hasPeer(remotePeerInfo.peerId, proto):
trace "Adding newly dialed peer to manager",
peerId = $remotePeerInfo.peerId, address = $remotePeerInfo.addrs[0], proto = proto
pm.addPeer(remotePeerInfo)
return await pm.dialPeer(
remotePeerInfo.peerId, remotePeerInfo.addrs, proto, dialTimeout, source
)
proc dialPeer*(
pm: PeerManager,
peerId: PeerID,
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api",
): Future[Option[Connection]] {.async.} =
# Dial an existing peer by looking up it's existing addrs in the switch's peerStore
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
let addrs = pm.switch.peerStore[AddressBook][peerId]
return await pm.dialPeer(peerId, addrs, proto, dialTimeout, source)
proc connectToNodes*(
pm: PeerManager,
nodes: seq[string] | seq[RemotePeerInfo],
dialTimeout = DefaultDialTimeout,
source = "api",
) {.async.} =
if nodes.len == 0:
return
info "Dialing multiple peers", numOfPeers = nodes.len, nodes = $nodes
var futConns: seq[Future[bool]]
var connectedPeers: seq[RemotePeerInfo]
for node in nodes:
let node = parsePeerInfo(node)
if node.isOk():
futConns.add(pm.connectRelay(node.value))
connectedPeers.add(node.value)
else:
error "Couldn't parse node info", error = node.error
await allFutures(futConns)
# Filtering successful connectedPeers based on futConns
let combined = zip(connectedPeers, futConns)
connectedPeers = combined.filterIt(it[1].read() == true).mapIt(it[0])
when defined(debugDiscv5):
let peerIds = connectedPeers.mapIt(it.peerId)
let origin = connectedPeers.mapIt(it.origin)
if peerIds.len > 0:
notice "established connections with found peers",
peerIds = peerIds.mapIt(shortLog(it)), origin = origin
else:
notice "could not connect to new peers", attempted = nodes.len
info "Finished dialing multiple peers",
successfulConns = connectedPeers.len, attempted = nodes.len
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
# fully connected.
#
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
# later.
await sleepAsync(chronos.seconds(5))
proc reconnectPeers*(
pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0)
) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.
debug "Reconnecting peers", proto = proto
# Proto is not persisted, we need to iterate over all peers.
for peerInfo in pm.wakuPeerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if peerInfo.connectedness == CannotConnect:
error "Not reconnecting to unreachable or non-existing peer",
peerId = peerInfo.peerId
continue
if backoffTime > ZeroDuration:
debug "Backing off before reconnect",
peerId = peerInfo.peerId, backoffTime = backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)
await pm.connectToNodes(@[peerInfo])
proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
var
numStreamsIn = 0
numStreamsOut = 0
for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
for stream in peerConn.getStreams():
if stream.protocol == protocol:
if stream.dir == Direction.In:
numStreamsIn += 1
elif stream.dir == Direction.Out:
numStreamsOut += 1
return (numStreamsIn, numStreamsOut)
proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
if amount <= 0:
return
let (inRelayPeers, _) = pm.connectedPeers(WakuRelayCodec)
let connsToPrune = min(amount, inRelayPeers.len)
for p in inRelayPeers[0 ..< connsToPrune]:
trace "Pruning Peer", Peer = $p
asyncSpawn(pm.switch.disconnect(p))
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
if inRelayPeers.len > pm.inRelayPeersTarget:
await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)
if outRelayPeers.len >= pm.outRelayPeersTarget:
return
let notConnectedPeers = pm.wakuPeerStore.getDisconnectedPeers()
var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
shuffle(outsideBackoffPeers)
var index = 0
var numPendingConnReqs =
min(outsideBackoffPeers.len, pm.outRelayPeersTarget - outRelayPeers.len)
## number of outstanding connection requests
while numPendingConnReqs > 0 and outRelayPeers.len < pm.outRelayPeersTarget:
let numPeersToConnect = min(numPendingConnReqs, MaxParallelDials)
await pm.connectToNodes(outsideBackoffPeers[index ..< (index + numPeersToConnect)])
(inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
index += numPeersToConnect
numPendingConnReqs -= numPeersToConnect
proc manageRelayPeers*(pm: PeerManager) {.async.} = proc manageRelayPeers*(pm: PeerManager) {.async.} =
if pm.wakuMetadata.shards.len == 0: if pm.wakuMetadata.shards.len == 0:
@ -915,41 +894,6 @@ proc prunePeerStore*(pm: PeerManager) =
capacity = capacity, capacity = capacity,
pruned = peersToPrune.len pruned = peersToPrune.len
proc selectPeer*(
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
): Option[RemotePeerInfo] =
trace "Selecting peer from peerstore", protocol = proto
# Selects the best peer for a given protocol
var peers = pm.wakuPeerStore.getPeersByProtocol(proto)
if shard.isSome():
peers.keepItIf((it.enr.isSome() and it.enr.get().containsShard(shard.get())))
# No criteria for selecting a peer for WakuRelay, random one
if proto == WakuRelayCodec:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
if peers.len > 0:
trace "Got peer from peerstore",
peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto
return some(peers[0])
trace "No peer found for protocol", protocol = proto
return none(RemotePeerInfo)
# For other protocols, we select the peer that is slotted for the given protocol
pm.serviceSlots.withValue(proto, serviceSlot):
trace "Got peer from service slots",
peerId = serviceSlot[].peerId, multi = serviceSlot[].addrs[0], protocol = proto
return some(serviceSlot[])
# If not slotted, we select a random peer for the given protocol
if peers.len > 0:
trace "Got peer from peerstore",
peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto
return some(peers[0])
trace "No peer found for protocol", protocol = proto
return none(RemotePeerInfo)
# Prunes peers from peerstore to remove old/stale ones # Prunes peers from peerstore to remove old/stale ones
proc prunePeerStoreLoop(pm: PeerManager) {.async.} = proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
trace "Starting prune peerstore loop" trace "Starting prune peerstore loop"
@ -981,40 +925,20 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
# Shorten the connectivity loop interval dynamically based on percentage of peers to fill or connections to prune # Shorten the connectivity loop interval dynamically based on percentage of peers to fill or connections to prune
await sleepAsync(dynamicSleepInterval) await sleepAsync(dynamicSleepInterval)
proc logAndMetrics(pm: PeerManager) {.async.} = proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
heartbeat "Scheduling log and metrics run", LogAndMetricsInterval: if amount <= 0:
# log metrics return
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let notConnectedPeers = pm.wakuPeerStore.getDisconnectedPeers().mapIt(
RemotePeerInfo.init(it.peerId, it.addrs)
)
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
let totalConnections = pm.switch.connManager.getConnections().len
info "Relay peer connections", let (inRelayPeers, _) = pm.connectedPeers(WakuRelayCodec)
inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget, let connsToPrune = min(amount, inRelayPeers.len)
outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget,
totalConnections = $totalConnections & "/" & $maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
# update prometheus metrics for p in inRelayPeers[0 ..< connsToPrune]:
for proto in pm.wakuPeerStore.getWakuProtos(): trace "Pruning Peer", Peer = $p
let (protoConnsIn, protoConnsOut) = pm.connectedPeers(proto) asyncSpawn(pm.switch.disconnect(p))
let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto)
waku_connected_peers.set( #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
protoConnsIn.len.float64, labelValues = [$Direction.In, proto] # Initialization and Constructor #
) #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
waku_connected_peers.set(
protoConnsOut.len.float64, labelValues = [$Direction.Out, proto]
)
waku_streams_peers.set(
protoStreamsIn.float64, labelValues = [$Direction.In, proto]
)
waku_streams_peers.set(
protoStreamsOut.float64, labelValues = [$Direction.Out, proto]
)
proc start*(pm: PeerManager) = proc start*(pm: PeerManager) =
pm.started = true pm.started = true
@ -1024,3 +948,95 @@ proc start*(pm: PeerManager) =
proc stop*(pm: PeerManager) = proc stop*(pm: PeerManager) =
pm.started = false pm.started = false
proc new*(
T: type PeerManager,
switch: Switch,
wakuMetadata: WakuMetadata = nil,
maxRelayPeers: Option[int] = none(int),
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = DefaultColocationLimit,
shardedPeerManagement = false,
): PeerManager {.gcsafe.} =
let capacity = switch.peerStore.capacity
let maxConnections = switch.connManager.inSema.size
if maxConnections > capacity:
error "Max number of connections can't be greater than PeerManager capacity",
capacity = capacity, maxConnections = maxConnections
raise newException(
Defect, "Max number of connections can't be greater than PeerManager capacity"
)
var maxRelayPeersValue = 0
if maxRelayPeers.isSome():
if maxRelayPeers.get() > maxConnections:
error "Max number of relay peers can't be greater than the max amount of connections",
maxConnections = maxConnections, maxRelayPeers = maxRelayPeers.get()
raise newException(
Defect,
"Max number of relay peers can't be greater than the max amount of connections",
)
if maxRelayPeers.get() == maxConnections:
warn "Max number of relay peers is equal to max amount of connections, peer won't be contributing to service peers",
maxConnections = maxConnections, maxRelayPeers = maxRelayPeers.get()
maxRelayPeersValue = maxRelayPeers.get()
else:
# Leave by default 20% of connections for service peers
maxRelayPeersValue = maxConnections - (maxConnections div 5)
# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
if backoff.weeks() > 1:
error "Max backoff time can't be over 1 week", maxBackoff = backoff
raise newException(Defect, "Max backoff time can't be over 1 week")
let outRelayPeersTarget = maxRelayPeersValue div 3
let pm = PeerManager(
switch: switch,
wakuMetadata: wakuMetadata,
wakuPeerStore: createWakuPeerStore(switch.peerStore),
storage: storage,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
outRelayPeersTarget: outRelayPeersTarget,
inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget,
maxRelayPeers: maxRelayPeersValue,
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit,
shardedPeerManagement: shardedPeerManagement,
)
proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
proc peerHook(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} =
onPeerEvent(pm, peerId, event)
proc peerStoreChanged(peerId: PeerId) {.gcsafe.} =
waku_peer_store_size.set(toSeq(pm.wakuPeerStore[AddressBook].book.keys).len.int64)
# currently disabled
#pm.switch.addConnEventHandler(connHook, ConnEventKind.Connected)
#pm.switch.addConnEventHandler(connHook, ConnEventKind.Disconnected)
pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Joined)
pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Left)
# called every time the peerstore is updated
pm.wakuPeerStore[AddressBook].addHandler(peerStoreChanged)
pm.serviceSlots = initTable[string, RemotePeerInfo]()
pm.ipTable = initTable[string, seq[PeerId]]()
if not storage.isNil():
trace "found persistent peer storage"
pm.loadFromStorage() # Load previously managed peers.
else:
trace "no peer storage found"
return pm