mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-10 22:16:40 +00:00
fix: rejecting excess relay connections (#3065)
This commit is contained in:
parent
368bb3c199
commit
f8946b8263
@ -399,6 +399,24 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
|
|||||||
asyncSpawn(pm.switch.disconnect(peerId))
|
asyncSpawn(pm.switch.disconnect(peerId))
|
||||||
pm.peerStore.delete(peerId)
|
pm.peerStore.delete(peerId)
|
||||||
|
|
||||||
|
proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
|
||||||
|
## Returns the peerIds of physical connections (in and out)
|
||||||
|
## containing at least one stream with the given protocol.
|
||||||
|
|
||||||
|
var inPeers: seq[PeerId]
|
||||||
|
var outPeers: seq[PeerId]
|
||||||
|
|
||||||
|
for peerId, muxers in pm.switch.connManager.getConnections():
|
||||||
|
for peerConn in muxers:
|
||||||
|
let streams = peerConn.getStreams()
|
||||||
|
if streams.anyIt(it.protocol == protocol):
|
||||||
|
if peerConn.connection.transportDir == Direction.In:
|
||||||
|
inPeers.add(peerId)
|
||||||
|
elif peerConn.connection.transportDir == Direction.Out:
|
||||||
|
outPeers.add(peerId)
|
||||||
|
|
||||||
|
return (inPeers, outPeers)
|
||||||
|
|
||||||
# called when a peer i) first connects to us ii) disconnects all connections from us
|
# called when a peer i) first connects to us ii) disconnects all connections from us
|
||||||
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
||||||
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
|
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
|
||||||
@ -412,6 +430,17 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
|||||||
direction = if event.initiator: Outbound else: Inbound
|
direction = if event.initiator: Outbound else: Inbound
|
||||||
connectedness = Connected
|
connectedness = Connected
|
||||||
|
|
||||||
|
## Check max allowed in-relay peers
|
||||||
|
let inRelayPeers = pm.connectedPeers(WakuRelayCodec)[0]
|
||||||
|
if inRelayPeers.len > pm.inRelayPeersTarget and
|
||||||
|
pm.peerStore.hasPeer(peerId, WakuRelayCodec):
|
||||||
|
debug "disconnecting relay peer because reached max num in-relay peers",
|
||||||
|
peerId = peerId,
|
||||||
|
inRelayPeers = inRelayPeers.len,
|
||||||
|
inRelayPeersTarget = pm.inRelayPeersTarget
|
||||||
|
await pm.switch.disconnect(peerId)
|
||||||
|
|
||||||
|
## Apply max ip colocation limit
|
||||||
if (let ip = pm.getPeerIp(peerId); ip.isSome()):
|
if (let ip = pm.getPeerIp(peerId); ip.isSome()):
|
||||||
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)
|
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)
|
||||||
|
|
||||||
@ -494,7 +523,7 @@ proc new*(
|
|||||||
error "Max backoff time can't be over 1 week", maxBackoff = backoff
|
error "Max backoff time can't be over 1 week", maxBackoff = backoff
|
||||||
raise newException(Defect, "Max backoff time can't be over 1 week")
|
raise newException(Defect, "Max backoff time can't be over 1 week")
|
||||||
|
|
||||||
let outRelayPeersTarget = max(maxRelayPeersValue div 3, 10)
|
let outRelayPeersTarget = maxRelayPeersValue div 3
|
||||||
|
|
||||||
let pm = PeerManager(
|
let pm = PeerManager(
|
||||||
switch: switch,
|
switch: switch,
|
||||||
@ -560,46 +589,6 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str
|
|||||||
|
|
||||||
pm.addPeer(remotePeerInfo)
|
pm.addPeer(remotePeerInfo)
|
||||||
|
|
||||||
proc reconnectPeers*(
|
|
||||||
pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0)
|
|
||||||
) {.async.} =
|
|
||||||
## Reconnect to peers registered for this protocol. This will update connectedness.
|
|
||||||
## Especially useful to resume connections from persistent storage after a restart.
|
|
||||||
|
|
||||||
trace "Reconnecting peers", proto = proto
|
|
||||||
|
|
||||||
# Proto is not persisted, we need to iterate over all peers.
|
|
||||||
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
|
|
||||||
# Check that the peer can be connected
|
|
||||||
if peerInfo.connectedness == CannotConnect:
|
|
||||||
error "Not reconnecting to unreachable or non-existing peer",
|
|
||||||
peerId = peerInfo.peerId
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Respect optional backoff period where applicable.
|
|
||||||
let
|
|
||||||
# TODO: Add method to peerStore (eg isBackoffExpired())
|
|
||||||
disconnectTime = Moment.init(peerInfo.disconnectTime, Second) # Convert
|
|
||||||
currentTime = Moment.init(getTime().toUnix, Second)
|
|
||||||
# Current time comparable to persisted value
|
|
||||||
backoffTime = disconnectTime + backoff - currentTime
|
|
||||||
# Consider time elapsed since last disconnect
|
|
||||||
|
|
||||||
trace "Respecting backoff",
|
|
||||||
backoff = backoff,
|
|
||||||
disconnectTime = disconnectTime,
|
|
||||||
currentTime = currentTime,
|
|
||||||
backoffTime = backoffTime
|
|
||||||
|
|
||||||
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
|
|
||||||
if backoffTime > ZeroDuration:
|
|
||||||
trace "Backing off before reconnect...",
|
|
||||||
peerId = peerInfo.peerId, backoffTime = backoffTime
|
|
||||||
# We disconnected recently and still need to wait for a backoff period before connecting
|
|
||||||
await sleepAsync(backoffTime)
|
|
||||||
|
|
||||||
discard await pm.connectRelay(peerInfo)
|
|
||||||
|
|
||||||
####################
|
####################
|
||||||
# Dialer interface #
|
# Dialer interface #
|
||||||
####################
|
####################
|
||||||
@ -685,23 +674,29 @@ proc connectToNodes*(
|
|||||||
# later.
|
# later.
|
||||||
await sleepAsync(chronos.seconds(5))
|
await sleepAsync(chronos.seconds(5))
|
||||||
|
|
||||||
proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
|
proc reconnectPeers*(
|
||||||
## Returns the peerIds of physical connections (in and out)
|
pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0)
|
||||||
## containing at least one stream with the given protocol.
|
) {.async.} =
|
||||||
|
## Reconnect to peers registered for this protocol. This will update connectedness.
|
||||||
|
## Especially useful to resume connections from persistent storage after a restart.
|
||||||
|
|
||||||
var inPeers: seq[PeerId]
|
debug "Reconnecting peers", proto = proto
|
||||||
var outPeers: seq[PeerId]
|
|
||||||
|
|
||||||
for peerId, muxers in pm.switch.connManager.getConnections():
|
# Proto is not persisted, we need to iterate over all peers.
|
||||||
for peerConn in muxers:
|
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
|
||||||
let streams = peerConn.getStreams()
|
# Check that the peer can be connected
|
||||||
if streams.anyIt(it.protocol == protocol):
|
if peerInfo.connectedness == CannotConnect:
|
||||||
if peerConn.connection.transportDir == Direction.In:
|
error "Not reconnecting to unreachable or non-existing peer",
|
||||||
inPeers.add(peerId)
|
peerId = peerInfo.peerId
|
||||||
elif peerConn.connection.transportDir == Direction.Out:
|
continue
|
||||||
outPeers.add(peerId)
|
|
||||||
|
|
||||||
return (inPeers, outPeers)
|
if backoffTime > ZeroDuration:
|
||||||
|
debug "Backing off before reconnect",
|
||||||
|
peerId = peerInfo.peerId, backoffTime = backoffTime
|
||||||
|
# We disconnected recently and still need to wait for a backoff period before connecting
|
||||||
|
await sleepAsync(backoffTime)
|
||||||
|
|
||||||
|
await pm.connectToNodes(@[peerInfo])
|
||||||
|
|
||||||
proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
|
proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
|
||||||
var
|
var
|
||||||
@ -730,9 +725,7 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
|
|||||||
|
|
||||||
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
||||||
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
||||||
let maxConnections = pm.switch.connManager.inSema.size
|
|
||||||
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
|
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
|
||||||
let inPeersTarget = maxConnections - pm.outRelayPeersTarget
|
|
||||||
|
|
||||||
if inRelayPeers.len > pm.inRelayPeersTarget:
|
if inRelayPeers.len > pm.inRelayPeersTarget:
|
||||||
await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)
|
await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user