From 0959877b29be802b857cb10ab194f5cf29e3649f Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 20 Jan 2021 22:00:24 -0600 Subject: [PATCH] Connection limits (#384) * master merge * wip * avoid deadlocks * tcp limits * expose client field in chronosstream * limit incoming connections * update with new listen api * fix release * don't override peerinfo in connection * rework transport with accept * use semaphore to track resource ussage * rework with new transport accept api * move events to conn manager (#373) * use semaphore to track resource ussage * merge master * expose api to acquire conn slots * don't fail expensive metrics * allow tracking and updating connections * set global connection limits to 80 * add per peer connection limits * make sure conn is closed if tracking failed * more descriptive naming for handle * rework with new transport accept api * add `getStream` hide `selectConn` * add TransportClosedError * make nil explicit * don't make unnecessary copies of message * logging * error handling * cleanup semaphore * track connections properly * throw `TooManyConnections` when tracking outgoing * use proper exception and handle conventions * check onCloseHandle for nil * revert internalConnect changes * adding upgraded flag * await stream before closing * simplify tracking * wip * logging * split connection limits into incoming and outgoing * further streamline connection limits split counts * don't use closeWithEOF * move peer and conn event triggers from switch * wip * wip * wip * merge master * handle nil connections properly * add clarifying comment * don't raise exc on nil * no finally * add proper min/max connections logic * rebase master * merge master * master merge * remove request timeout should be addressed in separate PR * merge master * share semaphore when in/out limits arent enforced * merge master * use import * pass semaphore to trackConn * don't close last conn * use storeConn * merge master * use storeConn --- libp2p/connmanager.nim | 163 +++++++++++++++++++++------ libp2p/standard_setup.nim | 12 +- libp2p/stream/chronosstream.nim | 2 +- libp2p/switch.nim | 65 ++++++++--- libp2p/upgrademngrs/muxedupgrade.nim | 4 +- tests/pubsub/testgossipinternal.nim | 2 +- tests/testconnmngr.nim | 4 +- tests/testtransport.nim | 19 +--- 8 files changed, 197 insertions(+), 74 deletions(-) diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index b9d2855ed..e20642e65 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -12,6 +12,7 @@ import chronos, chronicles, metrics import peerinfo, stream/connection, muxers/muxer, + utils/semaphore, errors logScope: @@ -20,10 +21,13 @@ logScope: declareGauge(libp2p_peers, "total connected peers") const - MaxConnectionsPerPeer = 5 + MaxConnections* = 50 + MaxConnectionsPerPeer* = 5 type - TooManyConnections* = object of CatchableError + TooManyConnectionsError* = object of CatchableError + + ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.} ConnEventKind* {.pure.} = enum Connected, # A connection was made and securely upgraded - there may be @@ -62,24 +66,37 @@ type handle: Future[void] ConnManager* = ref object of RootObj - maxConns: int - # NOTE: don't change to PeerInfo here - # the reference semantics on the PeerInfo - # object itself make it susceptible to - # copies and mangling by unrelated code. + maxConnsPerPeer: int + inSema*: AsyncSemaphore + outSema*: AsyncSemaphore conns: Table[PeerID, HashSet[Connection]] muxed: Table[Connection, MuxerHolder] connEvents: Table[ConnEventKind, OrderedSet[ConnEventHandler]] peerEvents: Table[PeerEventKind, OrderedSet[PeerEventHandler]] -proc newTooManyConnections(): ref TooManyConnections {.inline.} = - result = newException(TooManyConnections, "too many connections for peer") +proc newTooManyConnectionsError(): ref TooManyConnectionsError {.inline.} = + result = newException(TooManyConnectionsError, "Too many connections") proc init*(C: type ConnManager, - maxConnsPerPeer: int = MaxConnectionsPerPeer): ConnManager = - C(maxConns: maxConnsPerPeer, + maxConnsPerPeer = MaxConnectionsPerPeer, + maxConnections = MaxConnections, + maxIn = -1, + maxOut = -1): ConnManager = + var inSema, outSema: AsyncSemaphore + if maxIn > 0 and maxOut > 0: + inSema = newAsyncSemaphore(maxIn) + outSema = newAsyncSemaphore(maxOut) + elif maxConnections > 0: + inSema = newAsyncSemaphore(maxConnections) + outSema = inSema + else: + raiseAssert "Invalid connection counts!" + + C(maxConnsPerPeer: maxConnsPerPeer, conns: initTable[PeerID, HashSet[Connection]](), - muxed: initTable[Connection, MuxerHolder]()) + muxed: initTable[Connection, MuxerHolder](), + inSema: inSema, + outSema: outSema) proc connCount*(c: ConnManager, peerId: PeerID): int = c.conns.getOrDefault(peerId).len @@ -104,7 +121,9 @@ proc triggerConnEvent*(c: ConnManager, peerId: PeerID, event: ConnEvent) {.async, gcsafe.} = try: + trace "About to trigger connection events", peer = peerId if event.kind in c.connEvents: + trace "triggering connection events", peer = peerId, event = $event.kind var connEvents: seq[Future[void]] for h in c.connEvents[event.kind]: connEvents.add(h(peerId, event)) @@ -112,7 +131,7 @@ proc triggerConnEvent*(c: ConnManager, checkFutures(await allFinished(connEvents)) except CancelledError as exc: raise exc - except CatchableError as exc: # handlers should not raise! + except CatchableError as exc: warn "Exception in triggerConnEvents", msg = exc.msg, peerId, event = $event @@ -199,7 +218,10 @@ proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} = await muxerHolder.muxer.close() if not(isNil(muxerHolder.handle)): - await muxerHolder.handle # TODO noraises? + try: + await muxerHolder.handle # TODO noraises? + except CatchableError as exc: + trace "Exception in close muxer handler", exc = exc.msg trace "Cleaned up muxer", m = muxerHolder.muxer proc delConn(c: ConnManager, conn: Connection) = @@ -217,9 +239,11 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} = ## clean connection's resources such as muxers and streams if isNil(conn): + trace "Wont cleanup a nil connection" return if isNil(conn.peerInfo): + trace "No peer info for connection" return # Remove connection from all tables without async breaks @@ -329,42 +353,115 @@ proc storeConn*(c: ConnManager, conn: Connection) = ## if isNil(conn): - raise newException(CatchableError, "connection cannot be nil") + raise newException(CatchableError, "Connection cannot be nil") - if conn.closed() or conn.atEof(): - trace "Can't store dead connection", conn - raise newException(CatchableError, "can't store dead connection") + if conn.closed or conn.atEof: + raise newException(CatchableError, "Connection closed or EOF") if isNil(conn.peerInfo): - raise newException(CatchableError, "empty peer info") + raise newException(CatchableError, "Empty peer info") let peerId = conn.peerInfo.peerId - if c.conns.getOrDefault(peerId).len > c.maxConns: - debug "too many connections", + if c.conns.getOrDefault(peerId).len > c.maxConnsPerPeer: + debug "Too many connections for peer", conn, conns = c.conns.getOrDefault(peerId).len - raise newTooManyConnections() + raise newTooManyConnectionsError() if peerId notin c.conns: c.conns[peerId] = initHashSet[Connection]() c.conns[peerId].incl(conn) + libp2p_peers.set(c.conns.len.int64) # Launch on close listener # All the errors are handled inside `onClose()` procedure. asyncSpawn c.onClose(conn) - libp2p_peers.set(c.conns.len.int64) trace "Stored connection", conn, direction = $conn.dir, connections = c.conns.len -proc storeOutgoing*(c: ConnManager, conn: Connection) = - conn.dir = Direction.Out - c.storeConn(conn) +proc trackConn(c: ConnManager, + provider: ConnProvider, + sema: AsyncSemaphore): + Future[Connection] {.async.} = + var conn: Connection + try: + conn = await provider() -proc storeIncoming*(c: ConnManager, conn: Connection) = - conn.dir = Direction.In - c.storeConn(conn) + if isNil(conn): + return + + trace "Got connection", conn + + proc semaphoreMonitor() {.async.} = + try: + await conn.join() + except CatchableError as exc: + trace "Exception in semaphore monitor, ignoring", exc = exc.msg + + sema.release() + + asyncSpawn semaphoreMonitor() + except CatchableError as exc: + trace "Exception tracking connection", exc = exc.msg + if not isNil(conn): + await conn.close() + + raise exc + + return conn + +proc trackIncomingConn*(c: ConnManager, + provider: ConnProvider): + Future[Connection] {.async.} = + ## await for a connection slot before attempting + ## to call the connection provider + ## + + var conn: Connection + try: + trace "Tracking incoming connection" + await c.inSema.acquire() + conn = await c.trackConn(provider, c.inSema) + if isNil(conn): + trace "Couldn't acquire connection, releasing semaphore slot", dir = $Direction.In + c.inSema.release() + + return conn + except CatchableError as exc: + trace "Exception tracking connection", exc = exc.msg + c.inSema.release() + raise exc + +proc trackOutgoingConn*(c: ConnManager, + provider: ConnProvider): + Future[Connection] {.async.} = + ## try acquiring a connection if all slots + ## are already taken, raise TooManyConnectionsError + ## exception + ## + + trace "Tracking outgoing connection", count = c.outSema.count, + max = c.outSema.size + + if not c.outSema.tryAcquire(): + trace "Too many outgoing connections!", count = c.outSema.count, + max = c.outSema.size + raise newTooManyConnectionsError() + + var conn: Connection + try: + conn = await c.trackConn(provider, c.outSema) + if isNil(conn): + trace "Couldn't acquire connection, releasing semaphore slot", dir = $Direction.Out + c.outSema.release() + + return conn + except CatchableError as exc: + trace "Exception tracking connection", exc = exc.msg + c.outSema.release() + raise exc proc storeMuxer*(c: ConnManager, muxer: Muxer, @@ -391,8 +488,8 @@ proc storeMuxer*(c: ConnManager, asyncSpawn c.onConnUpgraded(muxer.connection) proc getStream*(c: ConnManager, - peerId: PeerID, - dir: Direction): Future[Connection] {.async, gcsafe.} = + peerId: PeerID, + dir: Direction): Future[Connection] {.async, gcsafe.} = ## get a muxed stream for the provided peer ## with the given direction ## @@ -402,7 +499,7 @@ proc getStream*(c: ConnManager, return await muxer.newStream() proc getStream*(c: ConnManager, - peerId: PeerID): Future[Connection] {.async, gcsafe.} = + peerId: PeerID): Future[Connection] {.async, gcsafe.} = ## get a muxed stream for the passed peer from any connection ## @@ -411,7 +508,7 @@ proc getStream*(c: ConnManager, return await muxer.newStream() proc getStream*(c: ConnManager, - conn: Connection): Future[Connection] {.async, gcsafe.} = + conn: Connection): Future[Connection] {.async, gcsafe.} = ## get a muxed stream for the passed connection ## diff --git a/libp2p/standard_setup.nim b/libp2p/standard_setup.nim index 4d7f469c2..b61a45454 100644 --- a/libp2p/standard_setup.nim +++ b/libp2p/standard_setup.nim @@ -27,7 +27,11 @@ proc newStandardSwitch*(privKey = none(PrivateKey), transportFlags: set[ServerFlags] = {}, rng = newRng(), inTimeout: Duration = 5.minutes, - outTimeout: Duration = 5.minutes): Switch = + outTimeout: Duration = 5.minutes, + maxConnections = MaxConnections, + maxIn = -1, + maxOut = -1, + maxConnsPerPeer = MaxConnectionsPerPeer): Switch = proc createMplex(conn: Connection): Muxer = Mplex.init( conn, @@ -59,6 +63,10 @@ proc newStandardSwitch*(privKey = none(PrivateKey), transports, identify, muxers, - secureManagers = secureManagerInstances) + secureManagers = secureManagerInstances, + maxConnections = maxConnections, + maxIn = maxIn, + maxOut = maxOut, + maxConnsPerPeer = maxConnsPerPeer) return switch diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index aef0936ed..c2ac0741a 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -145,7 +145,7 @@ method closeImpl*(s: ChronosStream) {.async.} = raise exc except CatchableError as exc: trace "Error closing chronosstream", s, msg = exc.msg - + when defined(libp2p_agents_metrics): # do this after closing! s.untrackPeerIdentity() diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 49559eba1..e742d7860 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -7,11 +7,13 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import tables, - sequtils, - options, - sets, - oids +import std/[tables, + sequtils, + options, + sets, + oids, + sugar, + math] import chronos, chronicles, @@ -107,7 +109,14 @@ proc dialAndUpgrade(s: Switch, trace "Dialing address", address = $a, peerId let dialed = try: libp2p_total_dial_attempts.inc() - await t.dial(a) + # await a connection slot when the total + # connection count is equal to `maxConns` + await s.connManager.trackOutgoingConn( + () => t.dial(a) + ) + except TooManyConnectionsError as exc: + trace "Connection limit reached!" + raise exc except CancelledError as exc: debug "Dialing canceled", msg = exc.msg, peerId raise exc @@ -138,7 +147,8 @@ proc dialAndUpgrade(s: Switch, proc internalConnect(s: Switch, peerId: PeerID, - addrs: seq[MultiAddress]): Future[Connection] {.async.} = + addrs: seq[MultiAddress]): + Future[Connection] {.async.} = if s.peerInfo.peerId == peerId: raise newException(CatchableError, "can't dial self!") @@ -182,6 +192,13 @@ proc internalConnect(s: Switch, lock.release() proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} = + ## attempt to create establish a connection + ## with a remote peer + ## + + if s.connManager.connCount(peerId) > 0: + return + discard await s.internalConnect(peerId, addrs) proc negotiateStream(s: Switch, conn: Connection, protos: seq[string]): Future[Connection] {.async.} = @@ -205,17 +222,17 @@ proc dial*(s: Switch, proc dial*(s: Switch, peerId: PeerID, - proto: string): Future[Connection] = dial(s, peerId, @[proto]) + proto: string): Future[Connection] = + dial(s, peerId, @[proto]) proc dial*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress], protos: seq[string]): Future[Connection] {.async.} = - trace "Dialing (new)", peerId, protos - let conn = await s.internalConnect(peerId, addrs) - trace "Opening stream", conn - let stream = await s.connManager.getStream(conn) + var + conn: Connection + stream: Connection proc cleanup() {.async.} = if not(isNil(stream)): @@ -225,9 +242,14 @@ proc dial*(s: Switch, await conn.close() try: + trace "Dialing (new)", peerId, protos + conn = await s.internalConnect(peerId, addrs) + trace "Opening stream", conn + stream = await s.connManager.getStream(conn) + if isNil(stream): - await conn.close() - raise newException(DialFailedError, "Couldn't get muxed stream") + raise newException(DialFailedError, + "Couldn't get muxed stream") return await s.negotiateStream(stream, protos) except CancelledError as exc: @@ -287,8 +309,11 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises # remember to always release the slot when # the upgrade succeeds or fails, this is # currently done by the `upgradeMonitor` - await upgrades.acquire() # first wait for an upgrade slot to become available - conn = await transport.accept() # next attempt to get a connection + await upgrades.acquire() # first wait for an upgrade slot to become available + conn = await s.connManager # next attempt to get an incoming connection + .trackIncomingConn( + () => transport.accept() + ) if isNil(conn): # A nil connection means that we might have hit a # file-handle limit (or another non-fatal error), @@ -360,12 +385,16 @@ proc newSwitch*(peerInfo: PeerInfo, transports: seq[Transport], identity: Identify, muxers: Table[string, MuxerProvider], - secureManagers: openarray[Secure] = []): Switch = + secureManagers: openarray[Secure] = [], + maxConnections = MaxConnections, + maxIn = -1, + maxOut = -1, + maxConnsPerPeer = MaxConnectionsPerPeer): Switch = if secureManagers.len == 0: raise (ref CatchableError)(msg: "Provide at least one secure manager") let ms = newMultistream() - let connManager = ConnManager.init() + let connManager = ConnManager.init(maxConnsPerPeer, maxConnections, maxIn, maxOut) let upgrade = MuxedUpgrade.init(identity, muxers, secureManagers, connManager, ms) let switch = Switch( diff --git a/libp2p/upgrademngrs/muxedupgrade.nim b/libp2p/upgrademngrs/muxedupgrade.nim index feddc95e0..1f57dfbbd 100644 --- a/libp2p/upgrademngrs/muxedupgrade.nim +++ b/libp2p/upgrademngrs/muxedupgrade.nim @@ -52,7 +52,7 @@ proc mux*(u: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} = # install stream handler muxer.streamHandler = u.streamHandler - u.connManager.storeOutgoing(conn) + u.connManager.storeConn(conn) # store it in muxed connections if we have a peer for it u.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop @@ -155,7 +155,7 @@ proc muxerHandler(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} = return # store incoming connection - u.connManager.storeIncoming(conn) + u.connManager.storeConn(conn) # store muxer and muxed connection u.connManager.storeMuxer(muxer) diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 810a9f847..8a64fb954 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -471,7 +471,7 @@ suite "GossipSub internal": peer.score = gossipSub.parameters.graylistThreshold - 1 gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer) gossipSub.peers[peerInfo.peerId] = peer - gossipSub.switch.connManager.storeIncoming(conn) + gossipSub.switch.connManager.storeConn(conn) gossipSub.updateScores() diff --git a/tests/testconnmngr.nim b/tests/testconnmngr.nim index f700200d8..d427bcb9a 100644 --- a/tests/testconnmngr.nim +++ b/tests/testconnmngr.nim @@ -172,7 +172,7 @@ suite "Connection Manager": await stream.close() asyncTest "should raise on too many connections": - let connMngr = ConnManager.init(1) + let connMngr = ConnManager.init(maxConnsPerPeer = 1) let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) connMngr.storeConn(Connection.init(peer, Direction.In)) @@ -181,7 +181,7 @@ suite "Connection Manager": Connection.init(peer, Direction.In), Connection.init(peer, Direction.In)] - expect TooManyConnections: + expect TooManyConnectionsError: connMngr.storeConn(conns[0]) connMngr.storeConn(conns[1]) diff --git a/tests/testtransport.nim b/tests/testtransport.nim index 96c76ef4f..dc0690668 100644 --- a/tests/testtransport.nim +++ b/tests/testtransport.nim @@ -183,11 +183,8 @@ suite "TCP transport": let transport2: TcpTransport = TcpTransport.init() let cancellation = transport2.dial(transport1.ma) - try: - cancellation.cancel() - except CancelledError as exc: - await sleepAsync(100.millis) - check cancellation.cancelled + await cancellation.cancelAndWait() + check cancellation.cancelled await transport2.stop() await transport1.stop() @@ -198,16 +195,8 @@ suite "TCP transport": let transport1: TcpTransport = TcpTransport.init() await transport1.start(ma) - let transport2: TcpTransport = TcpTransport.init() - let connFut = transport2.dial(transport1.ma) - let acceptHandler = transport1.accept() - try: - acceptHandler.cancel() - except CancelledError as exc: - await sleepAsync(100.millis) - check acceptHandler.cancelled - check isNil((await connFut)) + await acceptHandler.cancelAndWait() + check acceptHandler.cancelled await transport1.stop() - await transport2.stop()