Connection limits (#384)

* master merge

* wip

* avoid deadlocks

* tcp limits

* expose client field in chronosstream

* limit incoming connections

* update with new listen api

* fix release

* don't override peerinfo in connection

* rework transport with accept

* use semaphore to track resource ussage

* rework with new transport accept api

* move events to conn manager (#373)

* use semaphore to track resource ussage

* merge master

* expose api to acquire conn slots

* don't fail expensive metrics

* allow tracking and updating connections

* set global connection limits to 80

* add per peer connection limits

* make sure conn is closed if tracking failed

* more descriptive naming for handle

* rework with new transport accept api

* add `getStream` hide `selectConn`

* add TransportClosedError

* make nil explicit

* don't make unnecessary copies of message

* logging

* error handling

* cleanup semaphore

* track connections properly

* throw `TooManyConnections` when tracking outgoing

* use proper exception and handle conventions

* check onCloseHandle for nil

* revert internalConnect changes

* adding upgraded flag

* await stream before closing

* simplify tracking

* wip

* logging

* split connection limits into incoming and outgoing

* further streamline connection limits split counts

* don't use closeWithEOF

* move peer and conn event triggers from switch

* wip

* wip

* wip

* merge master

* handle nil connections properly

* add clarifying comment

* don't raise exc on nil

* no finally

* add proper min/max connections logic

* rebase master

* merge master

* master merge

* remove request timeout

should be addressed in separate PR

* merge master

* share semaphore when in/out limits arent enforced

* merge master

* use import

* pass semaphore to trackConn

* don't close last conn

* use storeConn

* merge master

* use storeConn
This commit is contained in:
Dmitriy Ryajov 2021-01-20 22:00:24 -06:00 committed by GitHub
parent 96c01e5e69
commit 0959877b29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 197 additions and 74 deletions

View File

@ -12,6 +12,7 @@ import chronos, chronicles, metrics
import peerinfo, import peerinfo,
stream/connection, stream/connection,
muxers/muxer, muxers/muxer,
utils/semaphore,
errors errors
logScope: logScope:
@ -20,10 +21,13 @@ logScope:
declareGauge(libp2p_peers, "total connected peers") declareGauge(libp2p_peers, "total connected peers")
const const
MaxConnectionsPerPeer = 5 MaxConnections* = 50
MaxConnectionsPerPeer* = 5
type type
TooManyConnections* = object of CatchableError TooManyConnectionsError* = object of CatchableError
ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.}
ConnEventKind* {.pure.} = enum ConnEventKind* {.pure.} = enum
Connected, # A connection was made and securely upgraded - there may be Connected, # A connection was made and securely upgraded - there may be
@ -62,24 +66,37 @@ type
handle: Future[void] handle: Future[void]
ConnManager* = ref object of RootObj ConnManager* = ref object of RootObj
maxConns: int maxConnsPerPeer: int
# NOTE: don't change to PeerInfo here inSema*: AsyncSemaphore
# the reference semantics on the PeerInfo outSema*: AsyncSemaphore
# object itself make it susceptible to
# copies and mangling by unrelated code.
conns: Table[PeerID, HashSet[Connection]] conns: Table[PeerID, HashSet[Connection]]
muxed: Table[Connection, MuxerHolder] muxed: Table[Connection, MuxerHolder]
connEvents: Table[ConnEventKind, OrderedSet[ConnEventHandler]] connEvents: Table[ConnEventKind, OrderedSet[ConnEventHandler]]
peerEvents: Table[PeerEventKind, OrderedSet[PeerEventHandler]] peerEvents: Table[PeerEventKind, OrderedSet[PeerEventHandler]]
proc newTooManyConnections(): ref TooManyConnections {.inline.} = proc newTooManyConnectionsError(): ref TooManyConnectionsError {.inline.} =
result = newException(TooManyConnections, "too many connections for peer") result = newException(TooManyConnectionsError, "Too many connections")
proc init*(C: type ConnManager, proc init*(C: type ConnManager,
maxConnsPerPeer: int = MaxConnectionsPerPeer): ConnManager = maxConnsPerPeer = MaxConnectionsPerPeer,
C(maxConns: maxConnsPerPeer, maxConnections = MaxConnections,
maxIn = -1,
maxOut = -1): ConnManager =
var inSema, outSema: AsyncSemaphore
if maxIn > 0 and maxOut > 0:
inSema = newAsyncSemaphore(maxIn)
outSema = newAsyncSemaphore(maxOut)
elif maxConnections > 0:
inSema = newAsyncSemaphore(maxConnections)
outSema = inSema
else:
raiseAssert "Invalid connection counts!"
C(maxConnsPerPeer: maxConnsPerPeer,
conns: initTable[PeerID, HashSet[Connection]](), conns: initTable[PeerID, HashSet[Connection]](),
muxed: initTable[Connection, MuxerHolder]()) muxed: initTable[Connection, MuxerHolder](),
inSema: inSema,
outSema: outSema)
proc connCount*(c: ConnManager, peerId: PeerID): int = proc connCount*(c: ConnManager, peerId: PeerID): int =
c.conns.getOrDefault(peerId).len c.conns.getOrDefault(peerId).len
@ -104,7 +121,9 @@ proc triggerConnEvent*(c: ConnManager,
peerId: PeerID, peerId: PeerID,
event: ConnEvent) {.async, gcsafe.} = event: ConnEvent) {.async, gcsafe.} =
try: try:
trace "About to trigger connection events", peer = peerId
if event.kind in c.connEvents: if event.kind in c.connEvents:
trace "triggering connection events", peer = peerId, event = $event.kind
var connEvents: seq[Future[void]] var connEvents: seq[Future[void]]
for h in c.connEvents[event.kind]: for h in c.connEvents[event.kind]:
connEvents.add(h(peerId, event)) connEvents.add(h(peerId, event))
@ -112,7 +131,7 @@ proc triggerConnEvent*(c: ConnManager,
checkFutures(await allFinished(connEvents)) checkFutures(await allFinished(connEvents))
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: # handlers should not raise! except CatchableError as exc:
warn "Exception in triggerConnEvents", warn "Exception in triggerConnEvents",
msg = exc.msg, peerId, event = $event msg = exc.msg, peerId, event = $event
@ -199,7 +218,10 @@ proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} =
await muxerHolder.muxer.close() await muxerHolder.muxer.close()
if not(isNil(muxerHolder.handle)): if not(isNil(muxerHolder.handle)):
await muxerHolder.handle # TODO noraises? try:
await muxerHolder.handle # TODO noraises?
except CatchableError as exc:
trace "Exception in close muxer handler", exc = exc.msg
trace "Cleaned up muxer", m = muxerHolder.muxer trace "Cleaned up muxer", m = muxerHolder.muxer
proc delConn(c: ConnManager, conn: Connection) = proc delConn(c: ConnManager, conn: Connection) =
@ -217,9 +239,11 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} =
## clean connection's resources such as muxers and streams ## clean connection's resources such as muxers and streams
if isNil(conn): if isNil(conn):
trace "Wont cleanup a nil connection"
return return
if isNil(conn.peerInfo): if isNil(conn.peerInfo):
trace "No peer info for connection"
return return
# Remove connection from all tables without async breaks # Remove connection from all tables without async breaks
@ -329,42 +353,115 @@ proc storeConn*(c: ConnManager, conn: Connection) =
## ##
if isNil(conn): if isNil(conn):
raise newException(CatchableError, "connection cannot be nil") raise newException(CatchableError, "Connection cannot be nil")
if conn.closed() or conn.atEof(): if conn.closed or conn.atEof:
trace "Can't store dead connection", conn raise newException(CatchableError, "Connection closed or EOF")
raise newException(CatchableError, "can't store dead connection")
if isNil(conn.peerInfo): if isNil(conn.peerInfo):
raise newException(CatchableError, "empty peer info") raise newException(CatchableError, "Empty peer info")
let peerId = conn.peerInfo.peerId let peerId = conn.peerInfo.peerId
if c.conns.getOrDefault(peerId).len > c.maxConns: if c.conns.getOrDefault(peerId).len > c.maxConnsPerPeer:
debug "too many connections", debug "Too many connections for peer",
conn, conns = c.conns.getOrDefault(peerId).len conn, conns = c.conns.getOrDefault(peerId).len
raise newTooManyConnections() raise newTooManyConnectionsError()
if peerId notin c.conns: if peerId notin c.conns:
c.conns[peerId] = initHashSet[Connection]() c.conns[peerId] = initHashSet[Connection]()
c.conns[peerId].incl(conn) c.conns[peerId].incl(conn)
libp2p_peers.set(c.conns.len.int64)
# Launch on close listener # Launch on close listener
# All the errors are handled inside `onClose()` procedure. # All the errors are handled inside `onClose()` procedure.
asyncSpawn c.onClose(conn) asyncSpawn c.onClose(conn)
libp2p_peers.set(c.conns.len.int64)
trace "Stored connection", trace "Stored connection",
conn, direction = $conn.dir, connections = c.conns.len conn, direction = $conn.dir, connections = c.conns.len
proc storeOutgoing*(c: ConnManager, conn: Connection) = proc trackConn(c: ConnManager,
conn.dir = Direction.Out provider: ConnProvider,
c.storeConn(conn) sema: AsyncSemaphore):
Future[Connection] {.async.} =
var conn: Connection
try:
conn = await provider()
proc storeIncoming*(c: ConnManager, conn: Connection) = if isNil(conn):
conn.dir = Direction.In return
c.storeConn(conn)
trace "Got connection", conn
proc semaphoreMonitor() {.async.} =
try:
await conn.join()
except CatchableError as exc:
trace "Exception in semaphore monitor, ignoring", exc = exc.msg
sema.release()
asyncSpawn semaphoreMonitor()
except CatchableError as exc:
trace "Exception tracking connection", exc = exc.msg
if not isNil(conn):
await conn.close()
raise exc
return conn
proc trackIncomingConn*(c: ConnManager,
provider: ConnProvider):
Future[Connection] {.async.} =
## await for a connection slot before attempting
## to call the connection provider
##
var conn: Connection
try:
trace "Tracking incoming connection"
await c.inSema.acquire()
conn = await c.trackConn(provider, c.inSema)
if isNil(conn):
trace "Couldn't acquire connection, releasing semaphore slot", dir = $Direction.In
c.inSema.release()
return conn
except CatchableError as exc:
trace "Exception tracking connection", exc = exc.msg
c.inSema.release()
raise exc
proc trackOutgoingConn*(c: ConnManager,
provider: ConnProvider):
Future[Connection] {.async.} =
## try acquiring a connection if all slots
## are already taken, raise TooManyConnectionsError
## exception
##
trace "Tracking outgoing connection", count = c.outSema.count,
max = c.outSema.size
if not c.outSema.tryAcquire():
trace "Too many outgoing connections!", count = c.outSema.count,
max = c.outSema.size
raise newTooManyConnectionsError()
var conn: Connection
try:
conn = await c.trackConn(provider, c.outSema)
if isNil(conn):
trace "Couldn't acquire connection, releasing semaphore slot", dir = $Direction.Out
c.outSema.release()
return conn
except CatchableError as exc:
trace "Exception tracking connection", exc = exc.msg
c.outSema.release()
raise exc
proc storeMuxer*(c: ConnManager, proc storeMuxer*(c: ConnManager,
muxer: Muxer, muxer: Muxer,
@ -391,8 +488,8 @@ proc storeMuxer*(c: ConnManager,
asyncSpawn c.onConnUpgraded(muxer.connection) asyncSpawn c.onConnUpgraded(muxer.connection)
proc getStream*(c: ConnManager, proc getStream*(c: ConnManager,
peerId: PeerID, peerId: PeerID,
dir: Direction): Future[Connection] {.async, gcsafe.} = dir: Direction): Future[Connection] {.async, gcsafe.} =
## get a muxed stream for the provided peer ## get a muxed stream for the provided peer
## with the given direction ## with the given direction
## ##
@ -402,7 +499,7 @@ proc getStream*(c: ConnManager,
return await muxer.newStream() return await muxer.newStream()
proc getStream*(c: ConnManager, proc getStream*(c: ConnManager,
peerId: PeerID): Future[Connection] {.async, gcsafe.} = peerId: PeerID): Future[Connection] {.async, gcsafe.} =
## get a muxed stream for the passed peer from any connection ## get a muxed stream for the passed peer from any connection
## ##
@ -411,7 +508,7 @@ proc getStream*(c: ConnManager,
return await muxer.newStream() return await muxer.newStream()
proc getStream*(c: ConnManager, proc getStream*(c: ConnManager,
conn: Connection): Future[Connection] {.async, gcsafe.} = conn: Connection): Future[Connection] {.async, gcsafe.} =
## get a muxed stream for the passed connection ## get a muxed stream for the passed connection
## ##

View File

@ -27,7 +27,11 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
transportFlags: set[ServerFlags] = {}, transportFlags: set[ServerFlags] = {},
rng = newRng(), rng = newRng(),
inTimeout: Duration = 5.minutes, inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes): Switch = outTimeout: Duration = 5.minutes,
maxConnections = MaxConnections,
maxIn = -1,
maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer): Switch =
proc createMplex(conn: Connection): Muxer = proc createMplex(conn: Connection): Muxer =
Mplex.init( Mplex.init(
conn, conn,
@ -59,6 +63,10 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
transports, transports,
identify, identify,
muxers, muxers,
secureManagers = secureManagerInstances) secureManagers = secureManagerInstances,
maxConnections = maxConnections,
maxIn = maxIn,
maxOut = maxOut,
maxConnsPerPeer = maxConnsPerPeer)
return switch return switch

View File

@ -145,7 +145,7 @@ method closeImpl*(s: ChronosStream) {.async.} =
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "Error closing chronosstream", s, msg = exc.msg trace "Error closing chronosstream", s, msg = exc.msg
when defined(libp2p_agents_metrics): when defined(libp2p_agents_metrics):
# do this after closing! # do this after closing!
s.untrackPeerIdentity() s.untrackPeerIdentity()

View File

@ -7,11 +7,13 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import tables, import std/[tables,
sequtils, sequtils,
options, options,
sets, sets,
oids oids,
sugar,
math]
import chronos, import chronos,
chronicles, chronicles,
@ -107,7 +109,14 @@ proc dialAndUpgrade(s: Switch,
trace "Dialing address", address = $a, peerId trace "Dialing address", address = $a, peerId
let dialed = try: let dialed = try:
libp2p_total_dial_attempts.inc() libp2p_total_dial_attempts.inc()
await t.dial(a) # await a connection slot when the total
# connection count is equal to `maxConns`
await s.connManager.trackOutgoingConn(
() => t.dial(a)
)
except TooManyConnectionsError as exc:
trace "Connection limit reached!"
raise exc
except CancelledError as exc: except CancelledError as exc:
debug "Dialing canceled", msg = exc.msg, peerId debug "Dialing canceled", msg = exc.msg, peerId
raise exc raise exc
@ -138,7 +147,8 @@ proc dialAndUpgrade(s: Switch,
proc internalConnect(s: Switch, proc internalConnect(s: Switch,
peerId: PeerID, peerId: PeerID,
addrs: seq[MultiAddress]): Future[Connection] {.async.} = addrs: seq[MultiAddress]):
Future[Connection] {.async.} =
if s.peerInfo.peerId == peerId: if s.peerInfo.peerId == peerId:
raise newException(CatchableError, "can't dial self!") raise newException(CatchableError, "can't dial self!")
@ -182,6 +192,13 @@ proc internalConnect(s: Switch,
lock.release() lock.release()
proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} = proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} =
## attempt to create establish a connection
## with a remote peer
##
if s.connManager.connCount(peerId) > 0:
return
discard await s.internalConnect(peerId, addrs) discard await s.internalConnect(peerId, addrs)
proc negotiateStream(s: Switch, conn: Connection, protos: seq[string]): Future[Connection] {.async.} = proc negotiateStream(s: Switch, conn: Connection, protos: seq[string]): Future[Connection] {.async.} =
@ -205,17 +222,17 @@ proc dial*(s: Switch,
proc dial*(s: Switch, proc dial*(s: Switch,
peerId: PeerID, peerId: PeerID,
proto: string): Future[Connection] = dial(s, peerId, @[proto]) proto: string): Future[Connection] =
dial(s, peerId, @[proto])
proc dial*(s: Switch, proc dial*(s: Switch,
peerId: PeerID, peerId: PeerID,
addrs: seq[MultiAddress], addrs: seq[MultiAddress],
protos: seq[string]): protos: seq[string]):
Future[Connection] {.async.} = Future[Connection] {.async.} =
trace "Dialing (new)", peerId, protos var
let conn = await s.internalConnect(peerId, addrs) conn: Connection
trace "Opening stream", conn stream: Connection
let stream = await s.connManager.getStream(conn)
proc cleanup() {.async.} = proc cleanup() {.async.} =
if not(isNil(stream)): if not(isNil(stream)):
@ -225,9 +242,14 @@ proc dial*(s: Switch,
await conn.close() await conn.close()
try: try:
trace "Dialing (new)", peerId, protos
conn = await s.internalConnect(peerId, addrs)
trace "Opening stream", conn
stream = await s.connManager.getStream(conn)
if isNil(stream): if isNil(stream):
await conn.close() raise newException(DialFailedError,
raise newException(DialFailedError, "Couldn't get muxed stream") "Couldn't get muxed stream")
return await s.negotiateStream(stream, protos) return await s.negotiateStream(stream, protos)
except CancelledError as exc: except CancelledError as exc:
@ -287,8 +309,11 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises
# remember to always release the slot when # remember to always release the slot when
# the upgrade succeeds or fails, this is # the upgrade succeeds or fails, this is
# currently done by the `upgradeMonitor` # currently done by the `upgradeMonitor`
await upgrades.acquire() # first wait for an upgrade slot to become available await upgrades.acquire() # first wait for an upgrade slot to become available
conn = await transport.accept() # next attempt to get a connection conn = await s.connManager # next attempt to get an incoming connection
.trackIncomingConn(
() => transport.accept()
)
if isNil(conn): if isNil(conn):
# A nil connection means that we might have hit a # A nil connection means that we might have hit a
# file-handle limit (or another non-fatal error), # file-handle limit (or another non-fatal error),
@ -360,12 +385,16 @@ proc newSwitch*(peerInfo: PeerInfo,
transports: seq[Transport], transports: seq[Transport],
identity: Identify, identity: Identify,
muxers: Table[string, MuxerProvider], muxers: Table[string, MuxerProvider],
secureManagers: openarray[Secure] = []): Switch = secureManagers: openarray[Secure] = [],
maxConnections = MaxConnections,
maxIn = -1,
maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer): Switch =
if secureManagers.len == 0: if secureManagers.len == 0:
raise (ref CatchableError)(msg: "Provide at least one secure manager") raise (ref CatchableError)(msg: "Provide at least one secure manager")
let ms = newMultistream() let ms = newMultistream()
let connManager = ConnManager.init() let connManager = ConnManager.init(maxConnsPerPeer, maxConnections, maxIn, maxOut)
let upgrade = MuxedUpgrade.init(identity, muxers, secureManagers, connManager, ms) let upgrade = MuxedUpgrade.init(identity, muxers, secureManagers, connManager, ms)
let switch = Switch( let switch = Switch(

View File

@ -52,7 +52,7 @@ proc mux*(u: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} =
# install stream handler # install stream handler
muxer.streamHandler = u.streamHandler muxer.streamHandler = u.streamHandler
u.connManager.storeOutgoing(conn) u.connManager.storeConn(conn)
# store it in muxed connections if we have a peer for it # store it in muxed connections if we have a peer for it
u.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop u.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop
@ -155,7 +155,7 @@ proc muxerHandler(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} =
return return
# store incoming connection # store incoming connection
u.connManager.storeIncoming(conn) u.connManager.storeConn(conn)
# store muxer and muxed connection # store muxer and muxed connection
u.connManager.storeMuxer(muxer) u.connManager.storeMuxer(muxer)

View File

@ -471,7 +471,7 @@ suite "GossipSub internal":
peer.score = gossipSub.parameters.graylistThreshold - 1 peer.score = gossipSub.parameters.graylistThreshold - 1
gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer) gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer)
gossipSub.peers[peerInfo.peerId] = peer gossipSub.peers[peerInfo.peerId] = peer
gossipSub.switch.connManager.storeIncoming(conn) gossipSub.switch.connManager.storeConn(conn)
gossipSub.updateScores() gossipSub.updateScores()

View File

@ -172,7 +172,7 @@ suite "Connection Manager":
await stream.close() await stream.close()
asyncTest "should raise on too many connections": asyncTest "should raise on too many connections":
let connMngr = ConnManager.init(1) let connMngr = ConnManager.init(maxConnsPerPeer = 1)
let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet())
connMngr.storeConn(Connection.init(peer, Direction.In)) connMngr.storeConn(Connection.init(peer, Direction.In))
@ -181,7 +181,7 @@ suite "Connection Manager":
Connection.init(peer, Direction.In), Connection.init(peer, Direction.In),
Connection.init(peer, Direction.In)] Connection.init(peer, Direction.In)]
expect TooManyConnections: expect TooManyConnectionsError:
connMngr.storeConn(conns[0]) connMngr.storeConn(conns[0])
connMngr.storeConn(conns[1]) connMngr.storeConn(conns[1])

View File

@ -183,11 +183,8 @@ suite "TCP transport":
let transport2: TcpTransport = TcpTransport.init() let transport2: TcpTransport = TcpTransport.init()
let cancellation = transport2.dial(transport1.ma) let cancellation = transport2.dial(transport1.ma)
try: await cancellation.cancelAndWait()
cancellation.cancel() check cancellation.cancelled
except CancelledError as exc:
await sleepAsync(100.millis)
check cancellation.cancelled
await transport2.stop() await transport2.stop()
await transport1.stop() await transport1.stop()
@ -198,16 +195,8 @@ suite "TCP transport":
let transport1: TcpTransport = TcpTransport.init() let transport1: TcpTransport = TcpTransport.init()
await transport1.start(ma) await transport1.start(ma)
let transport2: TcpTransport = TcpTransport.init()
let connFut = transport2.dial(transport1.ma)
let acceptHandler = transport1.accept() let acceptHandler = transport1.accept()
try: await acceptHandler.cancelAndWait()
acceptHandler.cancel() check acceptHandler.cancelled
except CancelledError as exc:
await sleepAsync(100.millis)
check acceptHandler.cancelled
check isNil((await connFut))
await transport1.stop() await transport1.stop()
await transport2.stop()