feat(networking): prune peers from same ip beyond colocation limit (#1765)

This commit is contained in:
Alvaro Revuelta 2023-05-31 09:47:56 +02:00 committed by GitHub
parent ffac77611c
commit 047d1cf095
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 116 additions and 32 deletions

View File

@ -727,3 +727,40 @@ procSuite "Peer Manager":
.build(),
maxFailedAttempts = 5,
storage = nil)
asyncTest "colocationLimit is enforced by pruneConnsByIp()":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
# 2 in connections
discard await nodes[1].peerManager.connectRelay(pInfos[0])
discard await nodes[2].peerManager.connectRelay(pInfos[0])
# 2 out connections
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])
# force max 1 conn per ip
nodes[0].peerManager.colocationLimit = 1
nodes[0].peerManager.updateIpTable()
# table is updated and we have 4 conns (2in 2out)
check:
nodes[0].peerManager.ipTable["127.0.0.1"].len == 4
nodes[0].peerManager.switch.connManager.getConnections().len == 4
nodes[0].peerManager.peerStore.peers().len == 4
await nodes[0].peerManager.pruneConnsByIp()
# peers are pruned, max 1 conn per ip
nodes[0].peerManager.updateIpTable()
check:
nodes[0].peerManager.ipTable["127.0.0.1"].len == 1
nodes[0].peerManager.switch.connManager.getConnections().len == 1
nodes[0].peerManager.peerStore.peers().len == 1

View File

@ -10,7 +10,8 @@ import
chronicles,
metrics,
libp2p/multistream,
libp2p/muxers/muxer
libp2p/muxers/muxer,
libp2p/nameresolving/nameresolver
import
../../../common/nimchronos,
../../waku_core,
@ -54,11 +55,14 @@ const
# How often the peer store is pruned
PrunePeerStoreInterval = chronos.minutes(5)
# How often the peer store is updated with metrics
UpdateMetricsInterval = chronos.seconds(15)
# How often metrics and logs are shown/updated
LogAndMetricsInterval = chronos.seconds(60)
# How often to log peer manager metrics
LogSummaryInterval = chronos.seconds(60)
# Prune by ip interval
PruneByIpInterval = chronos.seconds(30)
# Max peers that we allow from the same IP
ColocationLimit = 5
type
PeerManager* = ref object of RootObj
@ -70,6 +74,8 @@ type
storage: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
outPeersTarget*: int
ipTable*: Table[string, seq[PeerId]]
colocationLimit*: int
started: bool
proc protocolMatcher*(codec: string): Matcher =
@ -278,13 +284,8 @@ proc canBeConnected*(pm: PeerManager,
# Initialisation #
##################
# currently disabled. note that peer connection state connected/disconnected
# cant be tracked using this handler when more than one conn is allowed and
# when using autonat. eg if a peer has 2 conns and one is disconnected we cant
# assume that the peer is disconnected, because the other one might still be active.
# note that even with maxconn = 1, autonat forces more than one connection.
# called when a connection i) is created or ii) is closed
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
case event.kind
of ConnEventKind.Connected:
let direction = if event.incoming: Inbound else: Outbound
@ -292,30 +293,51 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
of ConnEventKind.Disconnected:
discard
# called when a peer i) first connects to us ii) disconnects all connections from us
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
var direction: PeerDirection
var connectedness: Connectedness
if event.kind == PeerEventKind.Joined:
let direction = if event.initiator: Outbound else: Inbound
pm.peerStore[ConnectionBook][peerId] = Connected
pm.peerStore[DirectionBook][peerId] = direction
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
return
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected
elif event.kind == PeerEventKind.Left:
pm.peerStore[DirectionBook][peerId] = UnknownDirection
pm.peerStore[ConnectionBook][peerId] = CanConnect
direction = UnknownDirection
connectedness = CanConnect
pm.peerStore[ConnectionBook][peerId] = connectedness
pm.peerStore[DirectionBook][peerId] = direction
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), connectedness, getTime().toUnix)
proc updateIpTable*(pm: PeerManager) =
# clean table
pm.ipTable = initTable[string, seq[PeerId]]()
# populate ip->peerIds from existing out/in connections
for peerId, conn in pm.switch.connManager.getConnections():
if conn.len == 0:
continue
# we may want to enable it only in inbound peers
#if conn[0].connection.transportDir != In:
# continue
# assumes just one physical connection per peer
let observedAddr = conn[0].connection.observedAddr
if observedAddr.isSome:
# TODO: think if circuit relay ips should be handled differently
let ip = observedAddr.get.getHostname()
pm.ipTable.mgetOrPut(ip, newSeq[PeerId]()).add(peerId)
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
return
proc new*(T: type PeerManager,
switch: Switch,
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
maxFailedAttempts = MaxFailedAttempts,): PeerManager =
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = ColocationLimit,): PeerManager =
let capacity = switch.peerStore.capacity
let maxConnections = switch.connManager.inSema.size
@ -338,7 +360,8 @@ proc new*(T: type PeerManager,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
outPeersTarget: max(maxConnections div 10, 10),
maxFailedAttempts: maxFailedAttempts)
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit)
proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
@ -360,6 +383,7 @@ proc new*(T: type PeerManager,
pm.peerStore[AddressBook].addHandler(peerStoreChanged)
pm.serviceSlots = initTable[string, RemotePeerInfo]()
pm.ipTable = initTable[string, seq[PeerId]]()
if not storage.isNil():
debug "found persistent peer storage"
@ -520,6 +544,23 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
for p in inRelayPeers[0..<connsToPrune]:
await pm.switch.disconnect(p)
proc pruneConnsByIp*(pm: PeerManager) {.async.} =
## prunes connections based on ip colocation, allowing no more
## than ColocationLimit inbound connections from same ip
##
# update the table tracking ip and the connected peers
pm.updateIpTable()
# trigger disconnections based on colocationLimit
for ip, peersInIp in pm.ipTable.pairs:
if peersInIp.len > pm.colocationLimit:
let connsToPrune = peersInIp.len - pm.colocationLimit
for peerId in peersInIp[0..<connsToPrune]:
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
await pm.switch.disconnect(peerId)
pm.peerStore.delete(peerId)
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
@ -603,6 +644,12 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)
proc pruneConnsByIpLoop(pm: PeerManager) {.async.} =
debug "Starting prune peer by ip loop"
while pm.started:
await pm.pruneConnsByIp()
await sleepAsync(PruneByIpInterval)
# Prunes peers from peerstore to remove old/stale ones
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
debug "Starting prune peerstore loop"
@ -617,8 +664,9 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
await pm.connectToRelayPeers()
await sleepAsync(ConnectivityLoopInterval)
proc logSummary*(pm: PeerManager) {.async.} =
heartbeat "Log peer manager summary", LogSummaryInterval:
proc logAndMetrics(pm: PeerManager) {.async.} =
heartbeat "Scheduling log and metrics run", LogAndMetricsInterval:
# log metrics
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
@ -634,8 +682,7 @@ proc logSummary*(pm: PeerManager) {.async.} =
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
proc updateMetrics(pm: PeerManager) {.async.} =
heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval:
# update prometheus metrics
for proto in pm.peerStore.getWakuProtos():
let (protoConnsIn, protoConnsOut) = pm.connectedPeers(proto)
let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto)
@ -646,10 +693,10 @@ proc updateMetrics(pm: PeerManager) {.async.} =
proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.updateMetrics()
asyncSpawn pm.relayConnectivityLoop()
asyncSpawn pm.prunePeerStoreLoop()
asyncSpawn pm.logSummary()
asyncSpawn pm.pruneConnsByIpLoop()
asyncSpawn pm.logAndMetrics()
proc stop*(pm: PeerManager) =
pm.started = false