This commit is contained in:
Dmitriy Ryajov 2020-08-05 20:26:25 -06:00
parent 8cd71eebd1
commit df7003655b
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
14 changed files with 75 additions and 57 deletions

View File

@ -53,6 +53,9 @@ proc contains*(c: ConnManager, conn: Connection): bool =
if isNil(conn.peerInfo):
return
if conn.peerInfo.peerId notin c.conns:
return
return conn in c.conns[conn.peerInfo.peerId]
proc contains*(c: ConnManager, peerId: PeerID): bool =
@ -121,7 +124,7 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} =
## triggers the connections resource cleanup
##
await conn.closeEvent.wait()
await conn.join()
trace "triggering connection cleanup"
await c.cleanupConn(conn)

View File

@ -71,5 +71,5 @@ proc writeMsg*(conn: Connection,
proc writeMsg*(conn: Connection,
id: uint64,
msgType: MessageType,
data: string): Future[void] =
data: string): Future[void] {.gcsafe.} =
conn.writeMsg(id, msgType, data.toBytes())

View File

@ -83,6 +83,7 @@ proc closeMessage(s: LPChannel) {.async.} =
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
closed = s.closed
# stack = getStackTrace()
## send close message - this will not raise
@ -99,6 +100,7 @@ proc resetMessage(s: LPChannel) {.async.} =
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
closed = s.closed
# stack = getStackTrace()
## send reset message - this will not raise
@ -115,6 +117,7 @@ proc open*(s: LPChannel) {.async, gcsafe.} =
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
closed = s.closed
# stack = getStackTrace()
## NOTE: Don't call withExcAndLock or withWriteLock,
@ -131,6 +134,7 @@ proc closeRemote*(s: LPChannel) {.async.} =
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
closed = s.closed
# stack = getStackTrace()
trace "got EOF, closing channel"
@ -163,6 +167,7 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} =
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
closed = s.closed
# stack = getStackTrace()
if s.closedLocal and s.isEof:
@ -198,6 +203,7 @@ method close*(s: LPChannel) {.async, gcsafe.} =
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
closed = s.closed
# stack = getStackTrace()
if s.closedLocal:
@ -211,6 +217,7 @@ method close*(s: LPChannel) {.async, gcsafe.} =
await s.closeMessage().wait(2.minutes)
if s.atEof: # already closed by remote close parent buffer immediately
await procCall BufferStream(s).close()
trace "lpchannel closed local"
except CancelledError as exc:
await s.reset()
raise exc
@ -218,8 +225,6 @@ method close*(s: LPChannel) {.async, gcsafe.} =
trace "exception closing channel", exc = exc.msg
await s.reset()
trace "lpchannel closed local"
s.closedLocal = true
asyncCheck closeInternal()

View File

@ -96,7 +96,7 @@ proc newStreamInternal*(m: Mplex,
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
## remove the local channel from the internal tables
##
await chann.closeEvent.wait()
await chann.join()
if not isNil(chann):
m.getChannelList(chann.initiator).del(chann.id)
trace "cleaned up channel", id = chann.id

View File

@ -54,7 +54,7 @@ method subscribeTopic*(f: FloodSub,
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) =
## handle peer disconnects
##
procCall PubSub(f).handleDisconnect(peer)
if not(isNil(peer)) and peer.peerInfo notin f.conns:

View File

@ -248,24 +248,27 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
##
procCall FloodSub(g).handleDisconnect(peer)
if not(isNil(peer)) and peer.peerInfo notin g.conns:
for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, peer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(t).int64, labelValues = [t])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(t).int64, labelValues = [t])
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(t).int64, labelValues = [t])
for t in toSeq(g.mesh.keys):
g.mesh.removePeer(t, peer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(t).int64, labelValues = [t])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(t).int64, labelValues = [t])
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(t).int64, labelValues = [t])
for t in toSeq(g.fanout.keys):
g.fanout.removePeer(t, peer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t])
method subscribePeer*(p: GossipSub,
conn: Connection) =
@ -434,8 +437,7 @@ method rpcHandler*(g: GossipSub,
# forward the message to all peers interested in it
let published = await g.publishHelper(toSendPeers, m.messages, DefaultSendTimeout)
trace "forwared message to peers", peers = published
trace "forwared message to peers", peers = published, msgs = m.messages
var respControl: ControlMessage
if m.control.isSome:

View File

@ -63,7 +63,7 @@ type
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
## handle peer disconnects
##
if not(isNil(peer)) and peer.peerInfo notin p.conns:
trace "deleting peer", peer = peer.id
peer.onConnect.fire() # Make sure all pending sends are unblocked
@ -76,7 +76,8 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
proc onConnClose(p: PubSub, conn: Connection) {.async.} =
try:
let peer = conn.peerInfo
await conn.closeEvent.wait()
await conn.join()
trace "triggering onConnClose cleanup", peer = $conn
if peer in p.conns:
p.conns[peer].excl(conn)
@ -115,7 +116,7 @@ method rpcHandler*(p: PubSub,
trace "processing messages", msg = m.shortLog
if m.subscriptions.len > 0: # if there are any subscriptions
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
trace "about to subscribe to topic", topicId = s.topic
trace "about to subscribe to topic", topicId = s.topic, peer = $peer
await p.subscribeTopic(s.topic, s.subscribe, peer.id)
proc getOrCreatePeer(p: PubSub,
@ -282,10 +283,13 @@ proc publishHelper*(p: PubSub,
let f = sent.filterIt(it.fut == s)
if f.len > 0:
if s.failed:
trace "sending messages to peer failed", peer = f[0].id
trace "sending messages to peer failed", peer = f[0].id,
exc = f[0].fut.readError().msg,
msgs = msgs
failed.add(f[0].id)
else:
trace "sending messages to peer succeeded", peer = f[0].id
trace "sending messages to peer succeeded", peer = f[0].id,
msgs = msgs
published.add(f[0].id)
for f in failed:

View File

@ -180,7 +180,8 @@ proc send*(
let sendFut = sendToRemote()
try:
await sendFut.wait(timeout)
# await sendFut.wait(timeout)
await sendFut
except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg
if not sendFut.finished:

View File

@ -60,7 +60,7 @@ proc handleConn*(s: Secure,
initiator: bool): Future[Connection] {.async, gcsafe.} =
var sconn = await s.handshake(conn, initiator)
if not isNil(sconn):
conn.closeEvent.wait()
conn.join()
.addCallback do(udata: pointer = nil):
asyncCheck sconn.close()

View File

@ -38,7 +38,7 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
transportFlags: set[ServerFlags] = {},
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
rng = newRng(),
inTimeout: Duration = 5.minutes,
inTimeout: Duration = 1.minutes,
outTimeout: Duration = 5.minutes): Switch =
proc createMplex(conn: Connection): Muxer =
Mplex.init(

View File

@ -101,7 +101,7 @@ proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} =
try:
await conn.closeEvent.wait()
await conn.join()
trace "about to cleanup pubsub peer"
if s.pubSub.isSome:
let fut = s.pubsubMonitors.getOrDefault(conn.peerInfo.peerId)
@ -305,7 +305,7 @@ proc internalConnect(s: Switch,
# make sure to assign the peer to the connection
conn.peerInfo = PeerInfo.init(peerId, addrs)
conn.closeEvent.wait()
conn.join()
.addCallback do(udata: pointer):
asyncCheck s.triggerHooks(
conn.peerInfo,
@ -326,7 +326,10 @@ proc internalConnect(s: Switch,
s.connManager.storeOutgoing(uconn)
asyncCheck s.triggerHooks(uconn.peerInfo, Lifecycle.Upgraded)
conn = uconn
trace "dial successful", oid = $conn.oid, peer = $conn.peerInfo
trace "dial successful", oid = $conn.oid,
upgraddedOid = $uconn.oid,
peer = $conn.peerInfo
except CatchableError as exc:
if not(isNil(conn)):
await conn.close()
@ -355,7 +358,6 @@ proc internalConnect(s: Switch,
peer = shortLog(conn.peerInfo)
asyncCheck s.cleanupPubSubPeer(conn)
asyncCheck s.subscribePeer(peerId)
trace "got connection", oid = $conn.oid,
direction = $conn.dir,
@ -418,7 +420,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
try:
conn.closeEvent.wait()
conn.join()
.addCallback do(udata: pointer):
asyncCheck s.triggerHooks(
conn.peerInfo,
@ -489,7 +491,7 @@ proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.} =
return
s.pubSub.get().subscribePeer(stream)
await stream.closeEvent.wait()
await stream.join()
except CancelledError as exc:
if not(isNil(stream)):
await stream.close()
@ -498,6 +500,7 @@ proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.} =
except CatchableError as exc:
trace "exception in subscribe to peer", peer = peerId,
exc = exc.msg
finally:
if not(isNil(stream)):
await stream.close()
@ -510,24 +513,25 @@ proc pubsubMonitor(s: Switch, peerId: PeerID) {.async.} =
try:
trace "subscribing to pubsub peer", peer = peerId
await s.subscribePeerInternal(peerId)
trace "sleeping before trying pubsub peer", peer = peerId
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in pubsub monitor", peer = peerId, exc = exc.msg
finally:
trace "sleeping before trying pubsub peer", peer = peerId
await sleepAsync(1.seconds) # allow the peer to cooldown
await sleepAsync(1.seconds) # allow the peer to cooldown
trace "exiting pubsub monitor", peer = peerId
proc subscribePeer*(s: Switch, peerInfo: PeerInfo): Future[void] {.gcsafe.} =
proc subscribePeer*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
## Waits until ``server`` is not closed.
##
var retFuture = newFuture[void]("stream.transport.server.join")
let pubsubFut = s.pubsubMonitors.mgetOrPut(
peerInfo.peerId,
s.pubsubMonitor(peerInfo))
peerId,
s.pubsubMonitor(peerId))
proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()
@ -630,7 +634,6 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
# try establishing a pubsub connection
asyncCheck s.cleanupPubSubPeer(muxer.connection)
asyncCheck s.subscribePeer(muxer.connection.peerInfo.peerId)
except CancelledError as exc:
await muxer.close()

View File

@ -33,7 +33,7 @@ suite "Connection Manager":
connMngr.storeConn(conn)
check conn in connMngr
let peerConn = connMngr.selectConn(peer)
let peerConn = connMngr.selectConn(peer.peerID)
check peerConn == conn
check peerConn.dir == Direction.In
@ -62,8 +62,8 @@ suite "Connection Manager":
check conn1 in connMngr
check conn2 in connMngr
let outConn = connMngr.selectConn(peer, Direction.Out)
let inConn = connMngr.selectConn(peer, Direction.In)
let outConn = connMngr.selectConn(peer.peerId, Direction.Out)
let inConn = connMngr.selectConn(peer.peerId, Direction.In)
check outConn != inConn
check outConn.dir == Direction.Out
@ -83,7 +83,7 @@ suite "Connection Manager":
connMngr.storeMuxer(muxer)
check muxer in connMngr
let stream = await connMngr.getMuxedStream(peer)
let stream = await connMngr.getMuxedStream(peer.peerId)
check not(isNil(stream))
check stream.peerInfo == peer
@ -103,8 +103,8 @@ suite "Connection Manager":
connMngr.storeMuxer(muxer)
check muxer in connMngr
check not(isNil((await connMngr.getMuxedStream(peer, Direction.In))))
check isNil((await connMngr.getMuxedStream(peer, Direction.Out)))
check not(isNil((await connMngr.getMuxedStream(peer.peerId, Direction.In))))
check isNil((await connMngr.getMuxedStream(peer.peerId, Direction.Out)))
waitFor(test())
@ -179,14 +179,13 @@ suite "Connection Manager":
check conn in connMngr
check muxer in connMngr
check not(isNil(connMngr.selectConn(peer, dir)))
check not(isNil(connMngr.selectConn(peer.peerId, dir)))
check peer in connMngr.peers
await connMngr.dropPeer(peer)
check peer.peerId in connMngr
await connMngr.dropPeer(peer.peerId)
check peer notin connMngr.peers
check isNil(connMngr.selectConn(peer, Direction.In))
check isNil(connMngr.selectConn(peer, Direction.Out))
check connMngr.peers.len == 0
check peer.peerId notin connMngr
check isNil(connMngr.selectConn(peer.peerId, Direction.In))
check isNil(connMngr.selectConn(peer.peerId, Direction.Out))
waitFor(test())

View File

@ -15,6 +15,7 @@ import testmultibase,
testpeer
import testtransport,
testconnmngr,
testmultistream,
testbufferstream,
testidentify,

View File

@ -114,14 +114,14 @@ suite "Switch":
await sleepAsync(2.seconds) # wait a little for cleanup to happen
var bufferTracker = getTracker(BufferStreamTrackerName)
# echo bufferTracker.dump()
echo bufferTracker.dump()
# plus 4 for the pubsub streams
check (BufferStreamTracker(bufferTracker).opened ==
(BufferStreamTracker(bufferTracker).closed + 4.uint64))
var connTracker = getTracker(ConnectionTrackerName)
# echo connTracker.dump()
echo connTracker.dump()
# plus 8 is for the secured connection and the socket
# and the pubsub streams that won't clean up until