From 6b196ad7b4bfb08a5308cabfa61de09288ea93b1 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 11 Jun 2020 08:45:59 -0600 Subject: [PATCH] remove pubsub peer on disconnect (#212) * remove pubsub peer on disconnect * make sure lock is aquired * add $ * count upgrades/dials/disconnects --- libp2p/muxers/mplex/lpchannel.nim | 3 ++- libp2p/peerinfo.nim | 18 +++++++++++++----- libp2p/protocols/pubsub/floodsub.nim | 2 ++ libp2p/protocols/pubsub/pubsub.nim | 17 ++++++++++------- libp2p/switch.nim | 23 +++++++++++++++++++++-- 5 files changed, 48 insertions(+), 15 deletions(-) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index e2c5b69..96d58fe 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -64,7 +64,8 @@ template withWriteLock(lock: AsyncLock, body: untyped): untyped = await lock.acquire() body finally: - lock.release() + if not(isNil(lock)) and lock.locked: + lock.release() template withEOFExceptions(body: untyped): untyped = try: diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index e170a34..31c2a6b 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -40,6 +40,8 @@ type proc id*(p: PeerInfo): string = p.peerId.pretty() +proc `$`*(p: PeerInfo): string = p.id + proc shortLog*(p: PeerInfo): auto = ( id: p.id(), @@ -58,30 +60,36 @@ template postInit(peerinfo: PeerInfo, peerinfo.protocols = @protocols peerinfo.lifefut = newFuture[void]("libp2p.peerinfo.lifetime") -proc init*(p: typedesc[PeerInfo], key: PrivateKey, +proc init*(p: typedesc[PeerInfo], + key: PrivateKey, addrs: openarray[MultiAddress] = [], protocols: openarray[string] = []): PeerInfo {.inline.} = result = PeerInfo(keyType: HasPrivate, peerId: PeerID.init(key), privateKey: key) result.postInit(addrs, protocols) -proc init*(p: typedesc[PeerInfo], peerId: PeerID, +proc init*(p: typedesc[PeerInfo], + peerId: PeerID, addrs: openarray[MultiAddress] = [], protocols: openarray[string] = []): PeerInfo {.inline.} = result = PeerInfo(keyType: HasPublic, peerId: peerId) result.postInit(addrs, protocols) -proc init*(p: typedesc[PeerInfo], peerId: string, +proc init*(p: typedesc[PeerInfo], + peerId: string, addrs: openarray[MultiAddress] = [], protocols: openarray[string] = []): PeerInfo {.inline.} = result = PeerInfo(keyType: HasPublic, peerId: PeerID.init(peerId)) result.postInit(addrs, protocols) -proc init*(p: typedesc[PeerInfo], key: PublicKey, +proc init*(p: typedesc[PeerInfo], + key: PublicKey, addrs: openarray[MultiAddress] = [], protocols: openarray[string] = []): PeerInfo {.inline.} = - result = PeerInfo(keyType: HasPublic, peerId: PeerID.init(key), + result = PeerInfo(keyType: HasPublic, + peerId: PeerID.init(key), key: some(key)) + result.postInit(addrs, protocols) proc close*(p: PeerInfo) {.inline.} = diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 64e89e7..0fff586 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -49,6 +49,8 @@ method subscribeTopic*(f: FloodSub, f.floodsub[topic].excl(peerId) method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async.} = + await procCall PubSub(f).handleDisconnect(peer) + ## handle peer disconnects for t in f.floodsub.keys: f.floodsub[t].excl(peer.id) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 868289e..ce1e33d 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -96,16 +96,18 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base.} = ## handle peer disconnects if peer.id in p.peers: p.peers.del(peer.id) + # metrics libp2p_pubsub_peers.dec() proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} = - await p.cleanupLock.acquire() - if peer.refs == 0: - await p.handleDisconnect(peer) - - peer.refs.dec() # decrement refcount - p.cleanupLock.release() + try: + await p.cleanupLock.acquire() + peer.refs.dec() # decrement refcount + if peer.refs == 0: + await p.handleDisconnect(peer) + finally: + p.cleanupLock.release() proc getPeer(p: PubSub, peerInfo: PeerInfo, @@ -117,11 +119,12 @@ proc getPeer(p: PubSub, # create new pubsub peer let peer = newPubSubPeer(peerInfo, proto) trace "created new pubsub peer", peerId = peer.id + # metrics libp2p_pubsub_peers.inc() p.peers[peer.id] = peer - peer.refs.inc # increment reference cound + peer.refs.inc # increment reference count peer.observers = p.observers result = peer diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 9012cdd..234a1e2 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -8,7 +8,7 @@ ## those terms. import tables, sequtils, options, strformat, sets -import chronos, chronicles +import chronos, chronicles, metrics import connection, transports/transport, multistream, @@ -31,6 +31,11 @@ logScope: # and only if the channel has been secured (i.e. if a secure manager has been # previously provided) +declareGauge(libp2p_connected_peers, "total connected peers") +declareCounter(libp2p_dialed_peers, "dialed peers") +declareCounter(libp2p_failed_dials, "failed dials") +declareCounter(libp2p_failed_upgrade, "peers failed upgrade") + type NoPubSubException = object of CatchableError @@ -90,6 +95,7 @@ proc identify(s: Switch, conn: Connection): Future[PeerInfo] {.async, gcsafe.} = if info.protos.len > 0: result.protocols = info.protos + debug "identify", info = shortLog(result) except IdentityInvalidMsgError as exc: error "identify: invalid message", msg = exc.msg @@ -152,6 +158,7 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = s.dialedPubSubPeers.excl(id) + libp2p_connected_peers.dec() # TODO: Investigate cleanupConn() always called twice for one peer. if not(conn.peerInfo.isClosed()): conn.peerInfo.close() @@ -245,17 +252,27 @@ proc internalConnect(s: Switch, for a in peer.addrs: # for each address if t.handles(a): # check if it can dial it trace "Dialing address", address = $a - conn = await t.dial(a) + try: + conn = await t.dial(a) + libp2p_dialed_peers.inc() + except CatchableError as exc: + trace "dialing failed", exc = exc.msg + libp2p_failed_dials.inc() + continue + # make sure to assign the peer to the connection conn.peerInfo = peer + conn = await s.upgradeOutgoing(conn) if isNil(conn): + libp2p_failed_upgrade.inc() continue conn.closeEvent.wait() .addCallback do(udata: pointer): asyncCheck s.cleanupConn(conn) + libp2p_connected_peers.inc() break else: trace "Reusing existing connection" @@ -308,6 +325,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = try: try: + libp2p_connected_peers.inc() await s.upgradeIncoming(conn) # perform upgrade on incoming connection finally: await s.cleanupConn(conn) @@ -470,6 +488,7 @@ proc newSwitch*(peerInfo: PeerInfo, # try establishing a pubsub connection await s.subscribeToPeer(muxer.connection.peerInfo) except CatchableError as exc: + libp2p_failed_upgrade.inc() trace "exception in muxer handler", exc = exc.msg finally: if not(isNil(stream)):