move events to conn manager (#373)

This commit is contained in:
Dmitriy Ryajov 2020-09-23 08:07:16 -06:00 committed by GitHub
parent 471e5906f6
commit abd234601b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 126 additions and 106 deletions

View File

@ -11,7 +11,8 @@ import std/[options, tables, sequtils, sets]
import chronos, chronicles, metrics import chronos, chronicles, metrics
import peerinfo, import peerinfo,
stream/connection, stream/connection,
muxers/muxer muxers/muxer,
errors
logScope: logScope:
topics = "connmanager" topics = "connmanager"
@ -23,18 +24,45 @@ const MaxConnectionsPerPeer = 5
type type
TooManyConnections* = object of CatchableError TooManyConnections* = object of CatchableError
ConnEventKind* {.pure.} = enum
Connected, # A connection was made and securely upgraded - there may be
# more than one concurrent connection thus more than one upgrade
# event per peer.
Disconnected # Peer disconnected - this event is fired once per upgrade
# when the associated connection is terminated.
ConnEvent* = object
case kind*: ConnEventKind
of ConnEventKind.Connected:
incoming*: bool
else:
discard
ConnEventHandler* =
proc(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.}
PeerEvent* {.pure.} = enum
Left,
Joined
PeerEventHandler* =
proc(peerId: PeerID, event: PeerEvent): Future[void] {.gcsafe.}
MuxerHolder = object MuxerHolder = object
muxer: Muxer muxer: Muxer
handle: Future[void] handle: Future[void]
ConnManager* = ref object of RootObj ConnManager* = ref object of RootObj
maxConns: int
# NOTE: don't change to PeerInfo here # NOTE: don't change to PeerInfo here
# the reference semantics on the PeerInfo # the reference semantics on the PeerInfo
# object itself make it susceptible to # object itself make it susceptible to
# copies and mangling by unrelated code. # copies and mangling by unrelated code.
conns: Table[PeerID, HashSet[Connection]] conns: Table[PeerID, HashSet[Connection]]
muxed: Table[Connection, MuxerHolder] muxed: Table[Connection, MuxerHolder]
maxConns: int connEvents: Table[ConnEventKind, OrderedSet[ConnEventHandler]]
peerEvents: Table[PeerEvent, OrderedSet[PeerEventHandler]]
proc newTooManyConnections(): ref TooManyConnections {.inline.} = proc newTooManyConnections(): ref TooManyConnections {.inline.} =
result = newException(TooManyConnections, "too many connections for peer") result = newException(TooManyConnections, "too many connections for peer")
@ -45,6 +73,81 @@ proc init*(C: type ConnManager,
conns: initTable[PeerID, HashSet[Connection]](), conns: initTable[PeerID, HashSet[Connection]](),
muxed: initTable[Connection, MuxerHolder]()) muxed: initTable[Connection, MuxerHolder]())
proc connCount*(c: ConnManager, peerId: PeerID): int =
c.conns.getOrDefault(peerId).len
proc addConnEventHandler*(c: ConnManager,
handler: ConnEventHandler, kind: ConnEventKind) =
## Add peer event handler - handlers must not raise exceptions!
##
if isNil(handler): return
c.connEvents.mgetOrPut(kind,
initOrderedSet[ConnEventHandler]()).incl(handler)
proc removeConnEventHandler*(c: ConnManager,
handler: ConnEventHandler, kind: ConnEventKind) =
c.connEvents.withValue(kind, handlers) do:
handlers[].excl(handler)
proc triggerConnEvent*(c: ConnManager, peerId: PeerID, event: ConnEvent) {.async, gcsafe.} =
try:
if event.kind in c.connEvents:
var connEvents: seq[Future[void]]
for h in c.connEvents[event.kind]:
connEvents.add(h(peerId, event))
checkFutures(await allFinished(connEvents))
except CancelledError as exc:
raise exc
except CatchableError as exc: # handlers should not raise!
warn "Exception in triggerConnEvents",
msg = exc.msg, peerId, event = $event
proc addPeerEventHandler*(c: ConnManager,
handler: PeerEventHandler,
kind: PeerEvent) =
## Add peer event handler - handlers must not raise exceptions!
##
if isNil(handler): return
c.peerEvents.mgetOrPut(kind,
initOrderedSet[PeerEventHandler]()).incl(handler)
proc removePeerEventHandler*(c: ConnManager,
handler: PeerEventHandler,
kind: PeerEvent) =
c.peerEvents.withValue(kind, handlers) do:
handlers[].excl(handler)
proc triggerPeerEvents*(c: ConnManager,
peerId: PeerID,
event: PeerEvent) {.async, gcsafe.} =
if event notin c.peerEvents:
return
try:
let count = c.connCount(peerId)
if event == PeerEvent.Joined and count != 1:
trace "peer already joined", peerId, event
return
elif event == PeerEvent.Left and count != 0:
trace "peer still connected or already left", peerId, event
return
trace "triggering peer events", peerId, event
var peerEvents: seq[Future[void]]
for h in c.peerEvents[event]:
peerEvents.add(h(peerId, event))
checkFutures(await allFinished(peerEvents))
except CancelledError as exc:
raise exc
except CatchableError as exc: # handlers should not raise!
warn "exception in triggerPeerEvents", exc = exc.msg, peerId
proc contains*(c: ConnManager, conn: Connection): bool = proc contains*(c: ConnManager, conn: Connection): bool =
## checks if a connection is being tracked by the ## checks if a connection is being tracked by the
## connection manager ## connection manager
@ -78,9 +181,6 @@ proc contains*(c: ConnManager, muxer: Muxer): bool =
return muxer == c.muxed[conn].muxer return muxer == c.muxed[conn].muxer
proc connCount*(c: ConnManager, peerId: PeerID): int =
c.conns.getOrDefault(peerId).len
proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} = proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} =
trace "Cleaning up muxer", m = muxerHolder.muxer trace "Cleaning up muxer", m = muxerHolder.muxer

View File

@ -30,6 +30,8 @@ import stream/connection,
peerid, peerid,
errors errors
export connmanager
logScope: logScope:
topics = "switch" topics = "switch"
@ -47,30 +49,6 @@ type
UpgradeFailedError* = object of CatchableError UpgradeFailedError* = object of CatchableError
DialFailedError* = object of CatchableError DialFailedError* = object of CatchableError
ConnEventKind* {.pure.} = enum
Connected, # A connection was made and securely upgraded - there may be
# more than one concurrent connection thus more than one upgrade
# event per peer.
Disconnected # Peer disconnected - this event is fired once per upgrade
# when the associated connection is terminated.
ConnEvent* = object
case kind*: ConnEventKind
of ConnEventKind.Connected:
incoming*: bool
else:
discard
ConnEventHandler* =
proc(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.}
PeerEvent* {.pure.} = enum
Left,
Joined
PeerEventHandler* =
proc(peerId: PeerID, event: PeerEvent): Future[void] {.gcsafe.}
Switch* = ref object of RootObj Switch* = ref object of RootObj
peerInfo*: PeerInfo peerInfo*: PeerInfo
connManager: ConnManager connManager: ConnManager
@ -82,83 +60,26 @@ type
streamHandler*: StreamHandler streamHandler*: StreamHandler
secureManagers*: seq[Secure] secureManagers*: seq[Secure]
dialLock: Table[PeerID, AsyncLock] dialLock: Table[PeerID, AsyncLock]
connEvents: Table[ConnEventKind, OrderedSet[ConnEventHandler]]
peerEvents: Table[PeerEvent, OrderedSet[PeerEventHandler]]
proc addConnEventHandler*(s: Switch, proc addConnEventHandler*(s: Switch,
handler: ConnEventHandler, kind: ConnEventKind) = handler: ConnEventHandler,
## Add peer event handler - handlers must not raise exceptions! kind: ConnEventKind) =
## s.connManager.addConnEventHandler(handler, kind)
if isNil(handler): return
s.connEvents.mgetOrPut(kind,
initOrderedSet[ConnEventHandler]()).incl(handler)
proc removeConnEventHandler*(s: Switch, proc removeConnEventHandler*(s: Switch,
handler: ConnEventHandler, kind: ConnEventKind) = handler: ConnEventHandler,
s.connEvents.withValue(kind, handlers) do: kind: ConnEventKind) =
handlers[].excl(handler) s.connManager.removeConnEventHandler(handler, kind)
proc triggerConnEvent(s: Switch, peerId: PeerID, event: ConnEvent) {.async, gcsafe.} =
try:
if event.kind in s.connEvents:
var connEvents: seq[Future[void]]
for h in s.connEvents[event.kind]:
connEvents.add(h(peerId, event))
checkFutures(await allFinished(connEvents))
except CancelledError as exc:
raise exc
except CatchableError as exc: # handlers should not raise!
warn "Exception in triggerConnEvents",
msg = exc.msg, peerId, event = $event
proc addPeerEventHandler*(s: Switch, proc addPeerEventHandler*(s: Switch,
handler: PeerEventHandler, handler: PeerEventHandler,
kind: PeerEvent) = kind: PeerEvent) =
## Add peer event handler - handlers must not raise exceptions! s.connManager.addPeerEventHandler(handler, kind)
##
if isNil(handler): return
s.peerEvents.mgetOrPut(kind,
initOrderedSet[PeerEventHandler]()).incl(handler)
proc removePeerEventHandler*(s: Switch, proc removePeerEventHandler*(s: Switch,
handler: PeerEventHandler, handler: PeerEventHandler,
kind: PeerEvent) = kind: PeerEvent) =
s.peerEvents.withValue(kind, handlers) do: s.connManager.removePeerEventHandler(handler, kind)
handlers[].excl(handler)
proc triggerPeerEvents(s: Switch,
peerId: PeerID,
event: PeerEvent) {.async, gcsafe.} =
if event notin s.peerEvents:
return
try:
let count = s.connManager.connCount(peerId)
if event == PeerEvent.Joined and count != 1:
trace "peer already joined", local = s.peerInfo.peerId,
remote = peerId, event
return
elif event == PeerEvent.Left and count != 0:
trace "peer still connected or already left", local = s.peerInfo.peerId,
remote = peerId, event
return
trace "triggering peer events", local = s.peerInfo.peerId,
remote = peerId, event
var peerEvents: seq[Future[void]]
for h in s.peerEvents[event]:
peerEvents.add(h(peerId, event))
checkFutures(await allFinished(peerEvents))
except CancelledError as exc:
raise exc
except CatchableError as exc: # handlers should not raise!
warn "exception in triggerPeerEvents", exc = exc.msg, peerId
proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.} proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.}
@ -412,16 +333,16 @@ proc internalConnect(s: Switch,
# unworthy and disconnects it # unworthy and disconnects it
raise newLPStreamClosedError() raise newLPStreamClosedError()
await s.triggerPeerEvents(peerId, PeerEvent.Joined) await s.connManager.triggerPeerEvents(peerId, PeerEvent.Joined)
await s.triggerConnEvent( await s.connManager.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false)) peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false))
proc peerCleanup() {.async.} = proc peerCleanup() {.async.} =
try: try:
await conn.closeEvent.wait() await conn.closeEvent.wait()
await s.triggerConnEvent(peerId, await s.connManager.triggerConnEvent(
ConnEvent(kind: ConnEventKind.Disconnected)) peerId, ConnEvent(kind: ConnEventKind.Disconnected))
await s.triggerPeerEvents(peerId, PeerEvent.Left) await s.connManager.triggerPeerEvents(peerId, PeerEvent.Left)
except CatchableError as exc: except CatchableError as exc:
# This is top-level procedure which will work as separate task, so it # This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError and should handle other errors # do not need to propogate CancelledError and should handle other errors
@ -579,9 +500,9 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
proc peerCleanup() {.async.} = proc peerCleanup() {.async.} =
try: try:
await muxer.connection.join() await muxer.connection.join()
await s.triggerConnEvent(peerId, await s.connManager.triggerConnEvent(
ConnEvent(kind: ConnEventKind.Disconnected)) peerId, ConnEvent(kind: ConnEventKind.Disconnected))
await s.triggerPeerEvents(peerId, PeerEvent.Left) await s.connManager.triggerPeerEvents(peerId, PeerEvent.Left)
except CatchableError as exc: except CatchableError as exc:
# This is top-level procedure which will work as separate task, so it # This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError and shouldn't leak others # do not need to propogate CancelledError and shouldn't leak others
@ -590,10 +511,9 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
proc peerStartup() {.async.} = proc peerStartup() {.async.} =
try: try:
await s.triggerPeerEvents(peerId, PeerEvent.Joined) await s.connManager.triggerPeerEvents(peerId, PeerEvent.Joined)
await s.triggerConnEvent(peerId, await s.connManager.triggerConnEvent(peerId,
ConnEvent(kind: ConnEventKind.Connected, ConnEvent(kind: ConnEventKind.Connected, incoming: true))
incoming: true))
except CatchableError as exc: except CatchableError as exc:
# This is top-level procedure which will work as separate task, so it # This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError and shouldn't leak others # do not need to propogate CancelledError and shouldn't leak others