remove pubsub peer on disconnect (#212)

* remove pubsub peer on disconnect

* make sure lock is aquired

* add $

* count upgrades/dials/disconnects
This commit is contained in:
Dmitriy Ryajov 2020-06-11 08:45:59 -06:00 committed by GitHub
parent 92579435b6
commit 6b196ad7b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 48 additions and 15 deletions

View File

@ -64,6 +64,7 @@ template withWriteLock(lock: AsyncLock, body: untyped): untyped =
await lock.acquire()
body
finally:
if not(isNil(lock)) and lock.locked:
lock.release()
template withEOFExceptions(body: untyped): untyped =

View File

@ -40,6 +40,8 @@ type
proc id*(p: PeerInfo): string =
p.peerId.pretty()
proc `$`*(p: PeerInfo): string = p.id
proc shortLog*(p: PeerInfo): auto =
(
id: p.id(),
@ -58,30 +60,36 @@ template postInit(peerinfo: PeerInfo,
peerinfo.protocols = @protocols
peerinfo.lifefut = newFuture[void]("libp2p.peerinfo.lifetime")
proc init*(p: typedesc[PeerInfo], key: PrivateKey,
proc init*(p: typedesc[PeerInfo],
key: PrivateKey,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.inline.} =
result = PeerInfo(keyType: HasPrivate, peerId: PeerID.init(key),
privateKey: key)
result.postInit(addrs, protocols)
proc init*(p: typedesc[PeerInfo], peerId: PeerID,
proc init*(p: typedesc[PeerInfo],
peerId: PeerID,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.inline.} =
result = PeerInfo(keyType: HasPublic, peerId: peerId)
result.postInit(addrs, protocols)
proc init*(p: typedesc[PeerInfo], peerId: string,
proc init*(p: typedesc[PeerInfo],
peerId: string,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.inline.} =
result = PeerInfo(keyType: HasPublic, peerId: PeerID.init(peerId))
result.postInit(addrs, protocols)
proc init*(p: typedesc[PeerInfo], key: PublicKey,
proc init*(p: typedesc[PeerInfo],
key: PublicKey,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.inline.} =
result = PeerInfo(keyType: HasPublic, peerId: PeerID.init(key),
result = PeerInfo(keyType: HasPublic,
peerId: PeerID.init(key),
key: some(key))
result.postInit(addrs, protocols)
proc close*(p: PeerInfo) {.inline.} =

View File

@ -49,6 +49,8 @@ method subscribeTopic*(f: FloodSub,
f.floodsub[topic].excl(peerId)
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async.} =
await procCall PubSub(f).handleDisconnect(peer)
## handle peer disconnects
for t in f.floodsub.keys:
f.floodsub[t].excl(peer.id)

View File

@ -96,15 +96,17 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base.} =
## handle peer disconnects
if peer.id in p.peers:
p.peers.del(peer.id)
# metrics
libp2p_pubsub_peers.dec()
proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} =
try:
await p.cleanupLock.acquire()
peer.refs.dec() # decrement refcount
if peer.refs == 0:
await p.handleDisconnect(peer)
peer.refs.dec() # decrement refcount
finally:
p.cleanupLock.release()
proc getPeer(p: PubSub,
@ -117,11 +119,12 @@ proc getPeer(p: PubSub,
# create new pubsub peer
let peer = newPubSubPeer(peerInfo, proto)
trace "created new pubsub peer", peerId = peer.id
# metrics
libp2p_pubsub_peers.inc()
p.peers[peer.id] = peer
peer.refs.inc # increment reference cound
peer.refs.inc # increment reference count
peer.observers = p.observers
result = peer

View File

@ -8,7 +8,7 @@
## those terms.
import tables, sequtils, options, strformat, sets
import chronos, chronicles
import chronos, chronicles, metrics
import connection,
transports/transport,
multistream,
@ -31,6 +31,11 @@ logScope:
# and only if the channel has been secured (i.e. if a secure manager has been
# previously provided)
declareGauge(libp2p_connected_peers, "total connected peers")
declareCounter(libp2p_dialed_peers, "dialed peers")
declareCounter(libp2p_failed_dials, "failed dials")
declareCounter(libp2p_failed_upgrade, "peers failed upgrade")
type
NoPubSubException = object of CatchableError
@ -90,6 +95,7 @@ proc identify(s: Switch, conn: Connection): Future[PeerInfo] {.async, gcsafe.} =
if info.protos.len > 0:
result.protocols = info.protos
debug "identify", info = shortLog(result)
except IdentityInvalidMsgError as exc:
error "identify: invalid message", msg = exc.msg
@ -152,6 +158,7 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
s.dialedPubSubPeers.excl(id)
libp2p_connected_peers.dec()
# TODO: Investigate cleanupConn() always called twice for one peer.
if not(conn.peerInfo.isClosed()):
conn.peerInfo.close()
@ -245,17 +252,27 @@ proc internalConnect(s: Switch,
for a in peer.addrs: # for each address
if t.handles(a): # check if it can dial it
trace "Dialing address", address = $a
try:
conn = await t.dial(a)
libp2p_dialed_peers.inc()
except CatchableError as exc:
trace "dialing failed", exc = exc.msg
libp2p_failed_dials.inc()
continue
# make sure to assign the peer to the connection
conn.peerInfo = peer
conn = await s.upgradeOutgoing(conn)
if isNil(conn):
libp2p_failed_upgrade.inc()
continue
conn.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.cleanupConn(conn)
libp2p_connected_peers.inc()
break
else:
trace "Reusing existing connection"
@ -308,6 +325,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
try:
try:
libp2p_connected_peers.inc()
await s.upgradeIncoming(conn) # perform upgrade on incoming connection
finally:
await s.cleanupConn(conn)
@ -470,6 +488,7 @@ proc newSwitch*(peerInfo: PeerInfo,
# try establishing a pubsub connection
await s.subscribeToPeer(muxer.connection.peerInfo)
except CatchableError as exc:
libp2p_failed_upgrade.inc()
trace "exception in muxer handler", exc = exc.msg
finally:
if not(isNil(stream)):