From abd234601b3a11c2b2356793dd7b40469c8f136f Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 23 Sep 2020 08:07:16 -0600 Subject: [PATCH] move events to conn manager (#373) --- libp2p/connmanager.nim | 110 +++++++++++++++++++++++++++++++++++-- libp2p/switch.nim | 122 +++++++---------------------------------- 2 files changed, 126 insertions(+), 106 deletions(-) diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index fe09df2..cd49e56 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -11,7 +11,8 @@ import std/[options, tables, sequtils, sets] import chronos, chronicles, metrics import peerinfo, stream/connection, - muxers/muxer + muxers/muxer, + errors logScope: topics = "connmanager" @@ -23,18 +24,45 @@ const MaxConnectionsPerPeer = 5 type TooManyConnections* = object of CatchableError + ConnEventKind* {.pure.} = enum + Connected, # A connection was made and securely upgraded - there may be + # more than one concurrent connection thus more than one upgrade + # event per peer. + + Disconnected # Peer disconnected - this event is fired once per upgrade + # when the associated connection is terminated. + + ConnEvent* = object + case kind*: ConnEventKind + of ConnEventKind.Connected: + incoming*: bool + else: + discard + + ConnEventHandler* = + proc(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} + + PeerEvent* {.pure.} = enum + Left, + Joined + + PeerEventHandler* = + proc(peerId: PeerID, event: PeerEvent): Future[void] {.gcsafe.} + MuxerHolder = object muxer: Muxer handle: Future[void] ConnManager* = ref object of RootObj + maxConns: int # NOTE: don't change to PeerInfo here # the reference semantics on the PeerInfo # object itself make it susceptible to # copies and mangling by unrelated code. conns: Table[PeerID, HashSet[Connection]] muxed: Table[Connection, MuxerHolder] - maxConns: int + connEvents: Table[ConnEventKind, OrderedSet[ConnEventHandler]] + peerEvents: Table[PeerEvent, OrderedSet[PeerEventHandler]] proc newTooManyConnections(): ref TooManyConnections {.inline.} = result = newException(TooManyConnections, "too many connections for peer") @@ -45,6 +73,81 @@ proc init*(C: type ConnManager, conns: initTable[PeerID, HashSet[Connection]](), muxed: initTable[Connection, MuxerHolder]()) +proc connCount*(c: ConnManager, peerId: PeerID): int = + c.conns.getOrDefault(peerId).len + +proc addConnEventHandler*(c: ConnManager, + handler: ConnEventHandler, kind: ConnEventKind) = + ## Add peer event handler - handlers must not raise exceptions! + ## + + if isNil(handler): return + c.connEvents.mgetOrPut(kind, + initOrderedSet[ConnEventHandler]()).incl(handler) + +proc removeConnEventHandler*(c: ConnManager, + handler: ConnEventHandler, kind: ConnEventKind) = + c.connEvents.withValue(kind, handlers) do: + handlers[].excl(handler) + +proc triggerConnEvent*(c: ConnManager, peerId: PeerID, event: ConnEvent) {.async, gcsafe.} = + try: + if event.kind in c.connEvents: + var connEvents: seq[Future[void]] + for h in c.connEvents[event.kind]: + connEvents.add(h(peerId, event)) + + checkFutures(await allFinished(connEvents)) + except CancelledError as exc: + raise exc + except CatchableError as exc: # handlers should not raise! + warn "Exception in triggerConnEvents", + msg = exc.msg, peerId, event = $event + +proc addPeerEventHandler*(c: ConnManager, + handler: PeerEventHandler, + kind: PeerEvent) = + ## Add peer event handler - handlers must not raise exceptions! + ## + + if isNil(handler): return + c.peerEvents.mgetOrPut(kind, + initOrderedSet[PeerEventHandler]()).incl(handler) + +proc removePeerEventHandler*(c: ConnManager, + handler: PeerEventHandler, + kind: PeerEvent) = + c.peerEvents.withValue(kind, handlers) do: + handlers[].excl(handler) + +proc triggerPeerEvents*(c: ConnManager, + peerId: PeerID, + event: PeerEvent) {.async, gcsafe.} = + + if event notin c.peerEvents: + return + + try: + let count = c.connCount(peerId) + if event == PeerEvent.Joined and count != 1: + trace "peer already joined", peerId, event + return + elif event == PeerEvent.Left and count != 0: + trace "peer still connected or already left", peerId, event + return + + trace "triggering peer events", peerId, event + + var peerEvents: seq[Future[void]] + for h in c.peerEvents[event]: + peerEvents.add(h(peerId, event)) + + checkFutures(await allFinished(peerEvents)) + except CancelledError as exc: + raise exc + except CatchableError as exc: # handlers should not raise! + warn "exception in triggerPeerEvents", exc = exc.msg, peerId + proc contains*(c: ConnManager, conn: Connection): bool = ## checks if a connection is being tracked by the ## connection manager @@ -78,9 +181,6 @@ proc contains*(c: ConnManager, muxer: Muxer): bool = return muxer == c.muxed[conn].muxer -proc connCount*(c: ConnManager, peerId: PeerID): int = - c.conns.getOrDefault(peerId).len - proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} = trace "Cleaning up muxer", m = muxerHolder.muxer diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 16400cb..6d47e54 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -30,6 +30,8 @@ import stream/connection, peerid, errors +export connmanager + logScope: topics = "switch" @@ -47,30 +49,6 @@ type UpgradeFailedError* = object of CatchableError DialFailedError* = object of CatchableError - ConnEventKind* {.pure.} = enum - Connected, # A connection was made and securely upgraded - there may be - # more than one concurrent connection thus more than one upgrade - # event per peer. - Disconnected # Peer disconnected - this event is fired once per upgrade - # when the associated connection is terminated. - - ConnEvent* = object - case kind*: ConnEventKind - of ConnEventKind.Connected: - incoming*: bool - else: - discard - - ConnEventHandler* = - proc(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} - - PeerEvent* {.pure.} = enum - Left, - Joined - - PeerEventHandler* = - proc(peerId: PeerID, event: PeerEvent): Future[void] {.gcsafe.} - Switch* = ref object of RootObj peerInfo*: PeerInfo connManager: ConnManager @@ -82,83 +60,26 @@ type streamHandler*: StreamHandler secureManagers*: seq[Secure] dialLock: Table[PeerID, AsyncLock] - connEvents: Table[ConnEventKind, OrderedSet[ConnEventHandler]] - peerEvents: Table[PeerEvent, OrderedSet[PeerEventHandler]] proc addConnEventHandler*(s: Switch, - handler: ConnEventHandler, kind: ConnEventKind) = - ## Add peer event handler - handlers must not raise exceptions! - ## - - if isNil(handler): return - s.connEvents.mgetOrPut(kind, - initOrderedSet[ConnEventHandler]()).incl(handler) + handler: ConnEventHandler, + kind: ConnEventKind) = + s.connManager.addConnEventHandler(handler, kind) proc removeConnEventHandler*(s: Switch, - handler: ConnEventHandler, kind: ConnEventKind) = - s.connEvents.withValue(kind, handlers) do: - handlers[].excl(handler) - -proc triggerConnEvent(s: Switch, peerId: PeerID, event: ConnEvent) {.async, gcsafe.} = - try: - if event.kind in s.connEvents: - var connEvents: seq[Future[void]] - for h in s.connEvents[event.kind]: - connEvents.add(h(peerId, event)) - - checkFutures(await allFinished(connEvents)) - except CancelledError as exc: - raise exc - except CatchableError as exc: # handlers should not raise! - warn "Exception in triggerConnEvents", - msg = exc.msg, peerId, event = $event + handler: ConnEventHandler, + kind: ConnEventKind) = + s.connManager.removeConnEventHandler(handler, kind) proc addPeerEventHandler*(s: Switch, handler: PeerEventHandler, kind: PeerEvent) = - ## Add peer event handler - handlers must not raise exceptions! - ## - - if isNil(handler): return - s.peerEvents.mgetOrPut(kind, - initOrderedSet[PeerEventHandler]()).incl(handler) + s.connManager.addPeerEventHandler(handler, kind) proc removePeerEventHandler*(s: Switch, handler: PeerEventHandler, kind: PeerEvent) = - s.peerEvents.withValue(kind, handlers) do: - handlers[].excl(handler) - -proc triggerPeerEvents(s: Switch, - peerId: PeerID, - event: PeerEvent) {.async, gcsafe.} = - - if event notin s.peerEvents: - return - - try: - let count = s.connManager.connCount(peerId) - if event == PeerEvent.Joined and count != 1: - trace "peer already joined", local = s.peerInfo.peerId, - remote = peerId, event - return - elif event == PeerEvent.Left and count != 0: - trace "peer still connected or already left", local = s.peerInfo.peerId, - remote = peerId, event - return - - trace "triggering peer events", local = s.peerInfo.peerId, - remote = peerId, event - - var peerEvents: seq[Future[void]] - for h in s.peerEvents[event]: - peerEvents.add(h(peerId, event)) - - checkFutures(await allFinished(peerEvents)) - except CancelledError as exc: - raise exc - except CatchableError as exc: # handlers should not raise! - warn "exception in triggerPeerEvents", exc = exc.msg, peerId + s.connManager.removePeerEventHandler(handler, kind) proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.} @@ -412,16 +333,16 @@ proc internalConnect(s: Switch, # unworthy and disconnects it raise newLPStreamClosedError() - await s.triggerPeerEvents(peerId, PeerEvent.Joined) - await s.triggerConnEvent( + await s.connManager.triggerPeerEvents(peerId, PeerEvent.Joined) + await s.connManager.triggerConnEvent( peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false)) proc peerCleanup() {.async.} = try: await conn.closeEvent.wait() - await s.triggerConnEvent(peerId, - ConnEvent(kind: ConnEventKind.Disconnected)) - await s.triggerPeerEvents(peerId, PeerEvent.Left) + await s.connManager.triggerConnEvent( + peerId, ConnEvent(kind: ConnEventKind.Disconnected)) + await s.connManager.triggerPeerEvents(peerId, PeerEvent.Left) except CatchableError as exc: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError and should handle other errors @@ -579,9 +500,9 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = proc peerCleanup() {.async.} = try: await muxer.connection.join() - await s.triggerConnEvent(peerId, - ConnEvent(kind: ConnEventKind.Disconnected)) - await s.triggerPeerEvents(peerId, PeerEvent.Left) + await s.connManager.triggerConnEvent( + peerId, ConnEvent(kind: ConnEventKind.Disconnected)) + await s.connManager.triggerPeerEvents(peerId, PeerEvent.Left) except CatchableError as exc: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError and shouldn't leak others @@ -590,10 +511,9 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = proc peerStartup() {.async.} = try: - await s.triggerPeerEvents(peerId, PeerEvent.Joined) - await s.triggerConnEvent(peerId, - ConnEvent(kind: ConnEventKind.Connected, - incoming: true)) + await s.connManager.triggerPeerEvents(peerId, PeerEvent.Joined) + await s.connManager.triggerConnEvent(peerId, + ConnEvent(kind: ConnEventKind.Connected, incoming: true)) except CatchableError as exc: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError and shouldn't leak others