mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-05-12 05:19:33 +00:00
peer manager not disconnect abruptly ongoing service peers streams
This commit is contained in:
parent
4d314b376d
commit
7d3e0469be
@ -404,6 +404,16 @@ proc disconnectNode*(pm: PeerManager, peer: RemotePeerInfo) {.async.} =
|
||||
let peerId = peer.peerId
|
||||
await pm.disconnectNode(peerId)
|
||||
|
||||
proc evictPeer(pm: PeerManager, peerId: PeerId) {.async.} =
|
||||
## Policy-based eviction (relay-peer limit, IP colocation, pruning).
|
||||
## Skips the disconnect when the peer has an active store-query stream so
|
||||
## that in-flight store requests are not aborted by unrelated limit checks.
|
||||
let (storeInPeers, storeOutPeers) = pm.connectedPeers(WakuStoreCodec)
|
||||
if storeInPeers.contains(peerId) or storeOutPeers.contains(peerId):
|
||||
debug "skipping peer eviction: active store stream", peerId = peerId
|
||||
return
|
||||
await pm.switch.disconnect(peerId)
|
||||
|
||||
# Dialing should be used for just protocols that require a stream to write and read
|
||||
# This shall not be used to dial Relay protocols, since that would create
|
||||
# unneccesary unused streams.
|
||||
@ -770,11 +780,11 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
||||
let inRelayPeers = pm.connectedPeers(WakuRelayCodec)[0]
|
||||
if inRelayPeers.len > pm.inRelayPeersTarget and
|
||||
peerStore.hasPeer(peerId, WakuRelayCodec):
|
||||
info "disconnecting relay peer because reached max num in-relay peers",
|
||||
info "relay peer limit reached, evicting peer",
|
||||
peerId = peerId,
|
||||
inRelayPeers = inRelayPeers.len,
|
||||
inRelayPeersTarget = pm.inRelayPeersTarget
|
||||
await pm.switch.disconnect(peerId)
|
||||
await pm.evictPeer(peerId)
|
||||
|
||||
## Apply max ip colocation limit
|
||||
if (let ip = pm.getPeerIp(peerId); ip.isSome()):
|
||||
@ -787,7 +797,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
||||
if pm.colocationLimit != 0 and peersBehindIp.len > pm.colocationLimit:
|
||||
for peerId in peersBehindIp[0 ..< (peersBehindIp.len - pm.colocationLimit)]:
|
||||
info "Pruning connection due to ip colocation", peerId = peerId, ip = ip
|
||||
asyncSpawn(pm.switch.disconnect(peerId))
|
||||
asyncSpawn(pm.evictPeer(peerId))
|
||||
peerStore.delete(peerId)
|
||||
|
||||
WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventConnected)
|
||||
@ -1100,7 +1110,7 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
|
||||
|
||||
for p in inRelayPeers[0 ..< connsToPrune]:
|
||||
trace "Pruning Peer", Peer = $p
|
||||
asyncSpawn(pm.switch.disconnect(p))
|
||||
asyncSpawn(pm.evictPeer(p))
|
||||
|
||||
proc addExtPeerEventHandler*(
|
||||
pm: PeerManager, eventHandler: PeerEventHandler, eventKind: PeerEventKind
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user