Cleanup resources (#246)

* consolidate reading in lpstream

* remove debug echo

* tune log level

* add channel cleanup and cancelation handling

* cancelation handling

* cancelation handling

* cancelation handling

* cancelation handling

* cleanup and cancelation handling

* cancelation handling

* cancelation

* tests

* rename isConnected to connected

* remove testing trace

* comment out debug stacktraces

* explicit raises
This commit is contained in:
Dmitriy Ryajov 2020-06-29 09:15:31 -06:00 committed by GitHub
parent aa6756dfe0
commit c788a6a3c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 619 additions and 532 deletions

View File

@ -37,12 +37,6 @@ type
handlers*: seq[HandlerHolder]
codec*: string
MultistreamHandshakeException* = object of CatchableError
proc newMultistreamHandshakeException*(): ref CatchableError {.inline.} =
result = newException(MultistreamHandshakeException,
"could not perform multistream handshake")
proc newMultistream*(): MultistreamSelect =
new result
result.codec = MSCodec
@ -62,7 +56,7 @@ proc select*(m: MultistreamSelect,
s.removeSuffix("\n")
if s != Codec:
notice "handshake failed", codec = s.toHex()
raise newMultistreamHandshakeException()
return ""
if proto.len() == 0: # no protocols, must be a handshake call
return Codec
@ -152,8 +146,12 @@ proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} =
return
debug "no handlers for ", protocol = ms
await conn.write(Na)
except CancelledError as exc:
await conn.close()
raise exc
except CatchableError as exc:
trace "exception in multistream", exc = exc.msg
await conn.close()
finally:
trace "leaving multistream loop"

View File

@ -15,7 +15,8 @@ import types,
../../stream/connection,
../../stream/bufferstream,
../../utility,
../../errors
../../errors,
../../peerinfo
export connection
@ -90,87 +91,104 @@ proc newChannel*(id: uint64,
name: string = "",
size: int = DefaultBufferSize,
lazy: bool = false): LPChannel =
new result
result.id = id
result.name = name
result.conn = conn
result.initiator = initiator
result.msgCode = if initiator: MessageType.MsgOut else: MessageType.MsgIn
result.closeCode = if initiator: MessageType.CloseOut else: MessageType.CloseIn
result.resetCode = if initiator: MessageType.ResetOut else: MessageType.ResetIn
result.isLazy = lazy
result = LPChannel(id: id,
name: name,
conn: conn,
initiator: initiator,
msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn,
closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn,
resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn,
isLazy: lazy)
let chan = result
logScope:
id = chan.id
initiator = chan.initiator
name = chan.name
oid = $chan.oid
peer = $chan.conn.peerInfo
# stack = getStackTrace()
proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} =
try:
if chan.isLazy and not(chan.isOpen):
await chan.open()
# writes should happen in sequence
trace "sending data", data = data.shortLog,
id = chan.id,
initiator = chan.initiator,
name = chan.name,
oid = chan.oid
trace "sending data"
try:
await conn.writeMsg(chan.id,
chan.msgCode,
data).wait(2.minutes) # write header
except AsyncTimeoutError:
trace "timeout writing channel, resetting"
asyncCheck chan.reset()
except CatchableError as exc:
trace "unable to write in bufferstream handler", exc = exc.msg
trace "exception in lpchannel write handler", exc = exc.msg
await chan.reset()
raise exc
result.initBufferStream(writeHandler, size)
when chronicles.enabledLogLevel == LogLevel.TRACE:
result.name = if result.name.len > 0: result.name else: $result.oid
trace "created new lpchannel", id = result.id,
oid = result.oid,
initiator = result.initiator,
name = result.name
trace "created new lpchannel"
proc closeMessage(s: LPChannel) {.async.} =
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
# stack = getStackTrace()
## send close message - this will not raise
## on EOF or Closed
withEOFExceptions:
withWriteLock(s.writeLock):
trace "sending close message", id = s.id,
initiator = s.initiator,
name = s.name,
oid = s.oid
trace "sending close message"
await s.conn.writeMsg(s.id, s.closeCode) # write close
proc resetMessage(s: LPChannel) {.async.} =
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
# stack = getStackTrace()
## send reset message - this will not raise
withEOFExceptions:
withWriteLock(s.writeLock):
trace "sending reset message", id = s.id,
initiator = s.initiator,
name = s.name,
oid = s.oid
trace "sending reset message"
await s.conn.writeMsg(s.id, s.resetCode) # write reset
proc open*(s: LPChannel) {.async, gcsafe.} =
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
# stack = getStackTrace()
## NOTE: Don't call withExcAndLock or withWriteLock,
## because this already gets called from writeHandler
## which is locked
withEOFExceptions:
await s.conn.writeMsg(s.id, MessageType.New, s.name)
trace "opened channel", oid = s.oid,
name = s.name,
initiator = s.initiator
trace "opened channel"
s.isOpen = true
proc closeRemote*(s: LPChannel) {.async.} =
trace "got EOF, closing channel", id = s.id,
initiator = s.initiator,
name = s.name,
oid = s.oid
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
# stack = getStackTrace()
trace "got EOF, closing channel"
# wait for all data in the buffer to be consumed
while s.len > 0:
@ -181,11 +199,7 @@ proc closeRemote*(s: LPChannel) {.async.} =
await s.close() # close local end
# call to avoid leaks
await procCall BufferStream(s).close() # close parent bufferstream
trace "channel closed on EOF", id = s.id,
initiator = s.initiator,
oid = s.oid,
name = s.name
trace "channel closed on EOF"
method closed*(s: LPChannel): bool =
## this emulates half-closed behavior
@ -195,6 +209,20 @@ method closed*(s: LPChannel): bool =
s.closedLocal
method reset*(s: LPChannel) {.base, async, gcsafe.} =
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
# stack = getStackTrace()
trace "resetting channel"
if s.closedLocal and s.isEof:
trace "channel already closed or reset"
return
# we asyncCheck here because the other end
# might be dead already - reset is always
# optimistic
@ -203,33 +231,36 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} =
s.isEof = true
s.closedLocal = true
trace "channel reset"
method close*(s: LPChannel) {.async, gcsafe.} =
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
# stack = getStackTrace()
if s.closedLocal:
trace "channel already closed", id = s.id,
initiator = s.initiator,
name = s.name,
oid = s.oid
trace "channel already closed"
return
proc closeRemote() {.async.} =
trace "closing local lpchannel"
proc closeInternal() {.async.} =
try:
trace "closing local lpchannel", id = s.id,
initiator = s.initiator,
name = s.name,
oid = s.oid
await s.closeMessage().wait(2.minutes)
if s.atEof: # already closed by remote close parent buffer immediately
await procCall BufferStream(s).close()
except AsyncTimeoutError:
trace "close timed out, reset channel"
asyncCheck s.reset() # reset on timeout
except CancelledError as exc:
await s.reset() # reset on timeout
raise exc
except CatchableError as exc:
trace "exception closing channel"
await s.reset() # reset on timeout
trace "lpchannel closed local", id = s.id,
initiator = s.initiator,
name = s.name,
oid = s.oid
trace "lpchannel closed local"
s.closedLocal = true
asyncCheck closeRemote()
asyncCheck closeInternal()

View File

@ -8,12 +8,13 @@
## those terms.
import tables, sequtils, oids
import chronos, chronicles, stew/byteutils
import chronos, chronicles, stew/byteutils, metrics
import ../muxer,
../../stream/connection,
../../stream/bufferstream,
../../utility,
../../errors,
../../peerinfo,
coder,
types,
lpchannel
@ -21,6 +22,8 @@ import ../muxer,
logScope:
topics = "mplex"
declareGauge(libp2p_mplex_channels, "mplex channels", labels = ["initiator", "peer"])
type
Mplex* = ref object of Muxer
remote: Table[uint64, LPChannel]
@ -33,10 +36,10 @@ type
proc getChannelList(m: Mplex, initiator: bool): var Table[uint64, LPChannel] =
if initiator:
trace "picking local channels", initiator = initiator, oid = m.oid
trace "picking local channels", initiator = initiator, oid = $m.oid
result = m.local
else:
trace "picking remote channels", initiator = initiator, oid = m.oid
trace "picking remote channels", initiator = initiator, oid = $m.oid
result = m.remote
proc newStreamInternal*(m: Mplex,
@ -46,6 +49,7 @@ proc newStreamInternal*(m: Mplex,
lazy: bool = false):
Future[LPChannel] {.async, gcsafe.} =
## create new channel/stream
##
let id = if initiator:
m.currentId.inc(); m.currentId
else: chanId
@ -53,7 +57,7 @@ proc newStreamInternal*(m: Mplex,
trace "creating new channel", channelId = id,
initiator = initiator,
name = name,
oid = m.oid
oid = $m.oid
result = newChannel(id,
m.connection,
initiator,
@ -63,28 +67,58 @@ proc newStreamInternal*(m: Mplex,
result.peerInfo = m.connection.peerInfo
result.observedAddr = m.connection.observedAddr
m.getChannelList(initiator)[id] = result
doAssert(id notin m.getChannelList(initiator),
"channel slot already taken!")
proc handleStream(m: Muxer, chann: LPChannel) {.async.} =
m.getChannelList(initiator)[id] = result
libp2p_mplex_channels.set(
m.getChannelList(initiator).len.int64,
labelValues = [$initiator,
$m.connection.peerInfo])
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
## remove the local channel from the internal tables
##
await chann.closeEvent.wait()
if not isNil(chann):
m.getChannelList(chann.initiator).del(chann.id)
trace "cleaned up channel", id = chann.id
libp2p_mplex_channels.set(
m.getChannelList(chann.initiator).len.int64,
labelValues = [$chann.initiator,
$m.connection.peerInfo])
proc handleStream(m: Mplex, chann: LPChannel) {.async.} =
## call the muxer stream handler for this channel
##
try:
await m.streamHandler(chann)
trace "finished handling stream"
doAssert(chann.closed, "connection not closed by handler!")
except CancelledError as exc:
trace "cancling stream handler", exc = exc.msg
await chann.reset()
raise
except CatchableError as exc:
trace "exception in stream handler", exc = exc.msg
await chann.reset()
await m.cleanupChann(chann)
method handle*(m: Mplex) {.async, gcsafe.} =
trace "starting mplex main loop", oid = m.oid
try:
trace "starting mplex main loop", oid = $m.oid
try:
defer:
trace "stopping mplex main loop", oid = $m.oid
await m.close()
while not m.connection.closed:
trace "waiting for data", oid = m.oid
trace "waiting for data", oid = $m.oid
let (id, msgType, data) = await m.connection.readMsg()
trace "read message from connection", id = id,
msgType = msgType,
data = data.shortLog,
oid = m.oid
oid = $m.oid
let initiator = bool(ord(msgType) and 1)
var channel: LPChannel
@ -95,7 +129,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "Channel not found, skipping", id = id,
initiator = initiator,
msg = msgType,
oid = m.oid
oid = $m.oid
continue
channel = channels[id]
@ -104,7 +138,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
initiator = initiator
msgType = msgType
size = data.len
muxer_oid = m.oid
muxer_oid = $m.oid
case msgType:
of MessageType.New:
@ -112,7 +146,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
channel = await m.newStreamInternal(false, id, name)
trace "created channel", name = channel.name,
oid = channel.oid
oid = $channel.oid
if not isNil(m.streamHandler):
# launch handler task
@ -121,38 +155,40 @@ method handle*(m: Mplex) {.async, gcsafe.} =
of MessageType.MsgIn, MessageType.MsgOut:
logScope:
name = channel.name
oid = channel.oid
oid = $channel.oid
trace "pushing data to channel"
if data.len > MaxMsgSize:
raise newLPStreamLimitError()
await channel.pushTo(data)
of MessageType.CloseIn, MessageType.CloseOut:
logScope:
name = channel.name
oid = channel.oid
oid = $channel.oid
trace "closing channel"
await channel.closeRemote()
m.getChannelList(initiator).del(id)
await m.cleanupChann(channel)
trace "deleted channel"
of MessageType.ResetIn, MessageType.ResetOut:
logScope:
name = channel.name
oid = channel.oid
oid = $channel.oid
trace "resetting channel"
await channel.reset()
m.getChannelList(initiator).del(id)
await m.cleanupChann(channel)
trace "deleted channel"
finally:
trace "stopping mplex main loop", oid = m.oid
await m.close()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "Exception occurred", exception = exc.msg, oid = m.oid
trace "Exception occurred", exception = exc.msg, oid = $m.oid
proc newMplex*(conn: Connection,
maxChanns: uint = MaxChannels): Mplex =
@ -165,14 +201,6 @@ proc newMplex*(conn: Connection,
when chronicles.enabledLogLevel == LogLevel.TRACE:
result.oid = genOid()
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
## remove the local channel from the internal tables
##
await chann.closeEvent.wait()
if not isNil(chann):
m.getChannelList(true).del(chann.id)
trace "cleaned up channel", id = chann.id
method newStream*(m: Mplex,
name: string = "",
lazy: bool = false): Future[Connection] {.async, gcsafe.} =
@ -187,19 +215,17 @@ method close*(m: Mplex) {.async, gcsafe.} =
if m.isClosed:
return
try:
trace "closing mplex muxer", oid = m.oid
defer:
m.remote.clear()
m.local.clear()
m.isClosed = true
trace "closing mplex muxer", oid = $m.oid
let channs = toSeq(m.remote.values) &
toSeq(m.local.values)
for chann in channs:
try:
await chann.reset()
except CatchableError as exc:
warn "error resetting channel", exc = exc.msg
await m.cleanupChann(chann)
await m.connection.close()
finally:
m.remote.clear()
m.local.clear()
m.isClosed = true

View File

@ -63,8 +63,12 @@ method init(c: MuxerProvider) =
futs &= c.muxerHandler(muxer)
checkFutures(await allFinished(futs))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in muxer handler", exc = exc.msg, peer = $conn, proto=proto
finally:
await conn.close()
c.handler = handler

View File

@ -38,7 +38,8 @@ type
key: Option[PublicKey]
proc id*(p: PeerInfo): string =
p.peerId.pretty()
if not(isNil(p)):
return p.peerId.pretty()
proc `$`*(p: PeerInfo): string = p.id

View File

@ -113,13 +113,15 @@ proc newIdentify*(peerInfo: PeerInfo): Identify =
method init*(p: Identify) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
try:
try:
defer:
trace "exiting identify handler", oid = conn.oid
await conn.close()
trace "handling identify request", oid = conn.oid
var pb = encodeMsg(p.peerInfo, conn.observedAddr)
await conn.writeLp(pb.buffer)
finally:
trace "exiting identify handler", oid = conn.oid
await conn.close()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in identify handler", exc = exc.msg

View File

@ -159,6 +159,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "mesh balanced, got peers", peers = g.mesh.getOrDefault(topic).len,
topicId = topic
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception occurred re-balancing mesh", exc = exc.msg
@ -227,12 +229,10 @@ proc heartbeat(g: GossipSub) {.async.} =
checkFutures(await allFinished(sent))
g.mcache.shift() # shift the cache
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception ocurred in gossipsub heartbeat", exc = exc.msg
# sleep less in the case of an error
# but still throttle
await sleepAsync(100.millis)
continue
await sleepAsync(1.seconds)

View File

@ -104,13 +104,13 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base.} =
p.peers.del(peer.id)
# metrics
libp2p_pubsub_peers.dec()
libp2p_pubsub_peers.set(p.peers.len.int64)
proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} =
try:
await p.cleanupLock.acquire()
peer.refs.dec() # decrement refcount
if peer.refs == 0:
if peer.refs <= 0:
await p.handleDisconnect(peer)
finally:
p.cleanupLock.release()
@ -119,24 +119,23 @@ proc getPeer(p: PubSub,
peerInfo: PeerInfo,
proto: string): PubSubPeer =
if peerInfo.id in p.peers:
result = p.peers[peerInfo.id]
return
return p.peers[peerInfo.id]
# create new pubsub peer
let peer = newPubSubPeer(peerInfo, proto)
trace "created new pubsub peer", peerId = peer.id
# metrics
libp2p_pubsub_peers.inc()
p.peers[peer.id] = peer
peer.refs.inc # increment reference count
peer.observers = p.observers
result = peer
libp2p_pubsub_peers.set(p.peers.len.int64)
return peer
proc internalCleanup(p: PubSub, conn: Connection) {.async.} =
# handle connection close
if conn.closed:
if isNil(conn):
return
var peer = p.getPeer(conn.peerInfo, p.codec)
@ -168,6 +167,7 @@ method handleConn*(p: PubSub,
# call pubsub rpc handler
await p.rpcHandler(peer, msgs)
asyncCheck p.internalCleanup(conn)
let peer = p.getPeer(conn.peerInfo, proto)
let topics = toSeq(p.topics.keys)
if topics.len > 0:
@ -176,19 +176,28 @@ method handleConn*(p: PubSub,
peer.handler = handler
await peer.handle(conn) # spawn peer read loop
trace "pubsub peer handler ended, cleaning up"
await p.internalCleanup(conn)
except CancelledError as exc:
await conn.close()
raise exc
except CatchableError as exc:
trace "exception ocurred in pubsub handle", exc = exc.msg
await conn.close()
method subscribeToPeer*(p: PubSub,
conn: Connection) {.base, async.} =
var peer = p.getPeer(conn.peerInfo, p.codec)
if not(isNil(conn)):
let peer = p.getPeer(conn.peerInfo, p.codec)
trace "setting connection for peer", peerId = conn.peerInfo.id
if not peer.isConnected:
if not peer.connected:
peer.conn = conn
asyncCheck p.internalCleanup(conn)
proc connected*(p: PubSub, peer: PeerInfo): bool =
let peer = p.getPeer(peer, p.codec)
if not(isNil(peer)):
return peer.connected
method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} =
## unsubscribe from a list of ``topic`` strings
@ -309,7 +318,8 @@ proc newPubSub*(P: typedesc[PubSub],
msgIdProvider: msgIdProvider)
result.initPubSub()
proc addObserver*(p: PubSub; observer: PubSubObserver) = p.observers[] &= observer
proc addObserver*(p: PubSub; observer: PubSubObserver) =
p.observers[] &= observer
proc removeObserver*(p: PubSub; observer: PubSubObserver) =
let idx = p.observers[].find(observer)

View File

@ -47,8 +47,8 @@ type
proc id*(p: PubSubPeer): string = p.peerInfo.id
proc isConnected*(p: PubSubPeer): bool =
(not isNil(p.sendConn))
proc connected*(p: PubSubPeer): bool =
not(isNil(p.sendConn))
proc `conn=`*(p: PubSubPeer, conn: Connection) =
if not(isNil(conn)):
@ -126,8 +126,10 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
try:
trace "about to send message", peer = p.id,
encoded = digest
if not p.onConnect.isSet:
await p.onConnect.wait()
if p.isConnected: # this can happen if the remote disconnected
if p.connected: # this can happen if the remote disconnected
trace "sending encoded msgs to peer", peer = p.id,
encoded = encoded.buffer.shortLog
await p.sendConn.writeLp(encoded.buffer)
@ -139,8 +141,12 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg
if not(isNil(p.sendConn)):
await p.sendConn.close()
p.sendConn = nil
p.onConnect.clear()

View File

@ -72,6 +72,10 @@ method init*(s: Secure) {.gcsafe.} =
# We don't need the result but we definitely need to await the handshake
discard await s.handleConn(conn, false)
trace "connection secured"
except CancelledError as exc:
warn "securing connection canceled"
await conn.close()
raise
except CatchableError as exc:
warn "securing connection failed", msg = exc.msg
await conn.close()
@ -79,19 +83,12 @@ method init*(s: Secure) {.gcsafe.} =
s.handler = handle
method secure*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, base, gcsafe.} =
try:
result = await s.handleConn(conn, initiator)
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "securing connection failed", msg = exc.msg
return nil
method readOnce*(s: SecureConn,
pbytes: pointer,
nbytes: int):
Future[int] {.async, gcsafe.} =
try:
if nbytes == 0:
return 0
@ -103,7 +100,3 @@ method readOnce*(s: SecureConn,
var p = cast[ptr UncheckedArray[byte]](pbytes)
return s.buf.consumeTo(toOpenArray(p, 0, nbytes - 1))
except CatchableError as exc:
trace "exception reading from secure connection", exc = exc.msg, oid = s.oid
await s.close() # make sure to close the wrapped connection
raise exc

View File

@ -128,7 +128,9 @@ proc initBufferStream*(s: BufferStream,
if not(isNil(handler)):
s.writeHandler = proc (data: seq[byte]) {.async, gcsafe.} =
try:
defer:
s.writeLock.release()
# Using a lock here to guarantee
# proper write ordering. This is
# specially important when
@ -137,10 +139,8 @@ proc initBufferStream*(s: BufferStream,
# strict message ordering
await s.writeLock.acquire()
await handler(data)
finally:
s.writeLock.release()
trace "created bufferstream", oid = s.oid
trace "created bufferstream", oid = $s.oid
proc newBufferStream*(handler: WriteHandler = nil,
size: int = DefaultBufferSize): BufferStream =
@ -173,7 +173,10 @@ method pushTo*(s: BufferStream, data: seq[byte]) {.base, async.} =
if s.atEof:
raise newLPStreamEOFError()
try:
defer:
# trace "ended", size = s.len
s.lock.release()
await s.lock.acquire()
var index = 0
while not s.closed():
@ -194,9 +197,6 @@ method pushTo*(s: BufferStream, data: seq[byte]) {.base, async.} =
# internal buf wait on a read event
await s.dataReadEvent.wait()
s.dataReadEvent.clear()
finally:
# trace "ended", size = s.len
s.lock.release()
method readOnce*(s: BufferStream,
pbytes: pointer,
@ -290,8 +290,10 @@ method close*(s: BufferStream) {.async, gcsafe.} =
await procCall Connection(s).close()
inc getBufferStreamTracker().closed
trace "bufferstream closed", oid = s.oid
trace "bufferstream closed", oid = $s.oid
else:
trace "attempt to close an already closed bufferstream", trace = getStackTrace()
except CancelledError as exc:
raise
except CatchableError as exc:
trace "error closing buffer stream", exc = exc.msg

View File

@ -74,7 +74,7 @@ method initStream*(s: LPStream) {.base.} =
s.oid = genOid()
libp2p_open_streams.inc(labelValues = [s.objName])
trace "stream created", oid = s.oid, name = s.objName
trace "stream created", oid = $s.oid, name = s.objName
# TODO: debuging aid to troubleshoot streams open/close
# try:
@ -150,7 +150,6 @@ proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} =
for i in 0..<len(buffer):
await conn.readExactly(addr buffer[i], 1)
trace "BUFFER ", buffer
let res = PB.getUVarint(buffer.toOpenArray(0, i), length, varint)
if res.isOk():
return varint
@ -191,12 +190,13 @@ proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecate
proc write*(s: LPStream, msg: string): Future[void] =
s.write(@(toOpenArrayByte(msg, 0, msg.high)))
# TODO: split `close` into `close` and `dispose/destroy`
method close*(s: LPStream) {.base, async.} =
if not s.isClosed:
s.isClosed = true
s.closeEvent.fire()
libp2p_open_streams.dec(labelValues = [s.objName])
trace "stream destroyed", oid = s.oid, name = s.objName
trace "stream destroyed", oid = $s.oid, name = s.objName
# TODO: debuging aid to troubleshoot streams open/close
# try:

View File

@ -77,7 +77,6 @@ type
streamHandler*: StreamHandler
secureManagers*: seq[Secure]
pubSub*: Option[PubSub]
dialedPubSubPeers: HashSet[string]
dialLock: Table[string, AsyncLock]
proc newNoPubSubException(): ref NoPubSubException {.inline.} =
@ -87,6 +86,7 @@ proc newTooManyConnections(): ref TooManyConnections {.inline.} =
result = newException(TooManyConnections, "too many connections for peer")
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.}
proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
proc selectConn(s: Switch, peerInfo: PeerInfo): Connection =
## select the "best" connection according to some criteria
@ -134,15 +134,18 @@ proc storeConn(s: Switch,
handle: Future[void] = nil) {.async.} =
## store the connection and muxer
##
if not(isNil(muxer)):
if isNil(muxer):
return
let conn = muxer.connection
if not(isNil(conn)):
if isNil(conn):
return
let id = conn.peerInfo.id
if s.connections.getOrDefault(id).len > MaxConnectionsPerPeer:
warn "disconnecting peer, too many connections", peer = $conn.peerInfo,
conns = s.connections
.getOrDefault(id).len
await muxer.close()
await s.disconnect(conn.peerInfo)
raise newTooManyConnections()
@ -171,43 +174,34 @@ proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
doAssert(secureProtocol.len > 0)
result = await secureProtocol[0].secure(conn, true)
proc identify(s: Switch, conn: Connection): Future[PeerInfo] {.async, gcsafe.} =
proc identify(s: Switch, conn: Connection) {.async, gcsafe.} =
## identify the connection
if not isNil(conn.peerInfo):
result = conn.peerInfo
try:
if (await s.ms.select(conn, s.identity.codec)):
let info = await s.identity.identify(conn, conn.peerInfo)
if info.pubKey.isNone and isNil(result):
if info.pubKey.isNone and isNil(conn):
raise newException(CatchableError,
"no public key provided and no existing peer identity found")
if info.pubKey.isSome:
result = PeerInfo.init(info.pubKey.get())
trace "identify: identified remote peer", peer = result.id
if isNil(conn.peerInfo):
conn.peerInfo = PeerInfo.init(info.pubKey.get())
if info.addrs.len > 0:
result.addrs = info.addrs
conn.peerInfo.addrs = info.addrs
if info.agentVersion.isSome:
result.agentVersion = info.agentVersion.get()
conn.peerInfo.agentVersion = info.agentVersion.get()
if info.protoVersion.isSome:
result.protoVersion = info.protoVersion.get()
conn.peerInfo.protoVersion = info.protoVersion.get()
if info.protos.len > 0:
result.protocols = info.protos
conn.peerInfo.protocols = info.protos
trace "identify", info = shortLog(result)
except IdentityInvalidMsgError as exc:
debug "identify: invalid message", msg = exc.msg
except IdentityNoMatchError as exc:
debug "identify: peer's public keys don't match ", msg = exc.msg
trace "identify: identified remote peer", peer = $conn.peerInfo
proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
proc mux(s: Switch, conn: Connection) {.async, gcsafe.} =
## mux incoming connection
trace "muxing connection", peer = $conn
@ -231,28 +225,40 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
# new stream for identify
var stream = await muxer.newStream()
var handlerFut: Future[void]
defer:
if not(isNil(stream)):
await stream.close() # close identify stream
# call muxer handler, this should
# not end until muxer ends
let handlerFut = muxer.handle()
handlerFut = muxer.handle()
try:
# do identify first, so that we have a
# PeerInfo in case we didn't before
conn.peerInfo = await s.identify(stream)
finally:
await stream.close() # close identify stream
await s.identify(stream)
if isNil(conn.peerInfo):
await muxer.close()
return
raise newException(CatchableError,
"unable to identify peer, aborting upgrade")
# store it in muxed connections if we have a peer for it
trace "adding muxer for peer", peer = conn.peerInfo.id
await s.storeConn(muxer, Direction.Out, handlerFut)
proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
try:
if not isNil(conn.peerInfo):
if isNil(conn):
return
defer:
await conn.close()
libp2p_peers.set(s.connections.len.int64)
if isNil(conn.peerInfo):
return
let id = conn.peerInfo.id
trace "cleaning up connection for peer", peerId = id
if id in s.muxed:
@ -266,6 +272,7 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
if not(isNil(muxerHolder[0].handle)):
await muxerHolder[0].handle
if id in s.muxed:
s.muxed[id].keepItIf(
it.muxer.connection != conn
)
@ -281,18 +288,10 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
if s.connections[id].len == 0:
s.connections.del(id)
await conn.close()
s.dialedPubSubPeers.excl(id)
# TODO: Investigate cleanupConn() always called twice for one peer.
if not(conn.peerInfo.isClosed()):
conn.peerInfo.close()
except CatchableError as exc:
trace "exception cleaning up connection", exc = exc.msg
finally:
libp2p_peers.set(s.connections.len.int64)
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} =
let connections = s.connections.getOrDefault(peer.id)
for connHolder in connections:
@ -308,27 +307,25 @@ proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Connection] {.async,
return await muxer.newStream()
proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
trace "handling connection", conn = $conn, oid = conn.oid
logScope:
conn = $conn
oid = $conn.oid
let sconn = await s.secure(conn) # secure the connection
if isNil(sconn):
trace "unable to secure connection, stopping upgrade", conn = $conn,
oid = conn.oid
await conn.close()
return
raise newException(CatchableError,
"unable to secure connection, stopping upgrade")
trace "upgrading connection"
await s.mux(sconn) # mux it if possible
if isNil(conn.peerInfo):
trace "unable to mux connection, stopping upgrade", conn = $conn,
oid = conn.oid
if isNil(sconn.peerInfo):
await sconn.close()
return
raise newException(CatchableError,
"unable to mux connection, stopping upgrade")
libp2p_peers.set(s.connections.len.int64)
trace "succesfully upgraded outgoing connection", conn = $conn,
oid = conn.oid,
uoid = sconn.oid
result = sconn
trace "succesfully upgraded outgoing connection", uoid = sconn.oid
return sconn
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
trace "upgrading incoming connection", conn = $conn, oid = conn.oid
@ -338,30 +335,31 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
proc securedHandler (conn: Connection,
proto: string)
{.async, gcsafe, closure.} =
try:
var sconn: Connection
trace "Securing connection", oid = conn.oid
let secure = s.secureManagers.filterIt(it.codec == proto)[0]
let sconn = await secure.secure(conn, false)
if sconn.isNil:
try:
sconn = await secure.secure(conn, false)
if isNil(sconn):
return
defer:
await sconn.close()
# add the muxer
for muxer in s.muxers.values:
ms.addHandler(muxer.codec, muxer)
# handle subsequent requests
try:
await ms.handle(sconn)
finally:
await sconn.close()
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "ending secured handler", err = exc.msg
try:
try:
if (await ms.select(conn)): # just handshake
# add the secure handlers
for k in s.secureManagers:
@ -369,14 +367,6 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
# handle secured connections
await ms.handle(conn)
finally:
await conn.close()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error in multistream", err = exc.msg
proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
proc internalConnect(s: Switch,
peer: PeerInfo): Future[Connection] {.async.} =
@ -388,7 +378,10 @@ proc internalConnect(s: Switch,
let lock = s.dialLock.mgetOrPut(id, newAsyncLock())
var conn: Connection
try:
defer:
if lock.locked():
lock.release()
await lock.acquire()
trace "about to dial peer", peer = id
conn = s.selectConn(peer)
@ -401,6 +394,9 @@ proc internalConnect(s: Switch,
try:
conn = await t.dial(a)
libp2p_dialed_peers.inc()
except CancelledError as exc:
trace "dialing canceled", exc = exc.msg
raise
except CatchableError as exc:
trace "dialing failed", exc = exc.msg
libp2p_failed_dials.inc()
@ -408,7 +404,15 @@ proc internalConnect(s: Switch,
# make sure to assign the peer to the connection
conn.peerInfo = peer
try:
conn = await s.upgradeOutgoing(conn)
except CatchableError as exc:
if not(isNil(conn)):
await conn.close()
trace "Unable to establish outgoing link", exc = exc.msg
raise exc
if isNil(conn):
libp2p_failed_upgrade.inc()
continue
@ -419,48 +423,43 @@ proc internalConnect(s: Switch,
break
else:
trace "Reusing existing connection", oid = conn.oid
except CatchableError as exc:
trace "exception connecting to peer", exc = exc.msg
if not(isNil(conn)):
if isNil(conn):
raise newException(CatchableError,
"Unable to establish outgoing link")
if conn.closed or conn.atEof:
await conn.close()
raise newException(CatchableError,
"Connection dead on arrival")
raise exc # re-raise
finally:
if lock.locked():
lock.release()
doAssert(conn.peerInfo.id in s.connections,
"connection not tracked!")
if not isNil(conn):
doAssert(conn.peerInfo.id in s.connections, "connection not tracked!")
trace "dial succesfull", oid = conn.oid
await s.subscribeToPeer(peer)
result = conn
return conn
proc connect*(s: Switch, peer: PeerInfo) {.async.} =
var conn = await s.internalConnect(peer)
if isNil(conn):
raise newException(CatchableError, "Unable to connect to peer")
proc dial*(s: Switch,
peer: PeerInfo,
proto: string):
Future[Connection] {.async.} =
var conn = await s.internalConnect(peer)
if isNil(conn):
raise newException(CatchableError, "Unable to establish outgoing link")
if conn.closed:
raise newException(CatchableError, "Connection dead on arrival")
result = conn
let stream = await s.getMuxedStream(peer)
if not isNil(stream):
trace "Connection is muxed, return muxed stream", oid = conn.oid
result = stream
trace "Attempting to select remote", proto = proto, oid = conn.oid
if isNil(stream):
await conn.close()
raise newException(CatchableError, "Couldn't get muxed stream")
if not await s.ms.select(result, proto):
trace "Attempting to select remote", proto = proto, oid = conn.oid
if not await s.ms.select(stream, proto):
await stream.close()
raise newException(CatchableError, "Unable to select sub-protocol " & proto)
return stream
proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
if isNil(proto.handler):
raise newException(CatchableError,
@ -477,10 +476,10 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
try:
try:
await s.upgradeIncoming(conn) # perform upgrade on incoming connection
finally:
defer:
await s.cleanupConn(conn)
await s.upgradeIncoming(conn) # perform upgrade on incoming connection
except CancelledError as exc:
raise exc
except CatchableError as exc:
@ -501,7 +500,6 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
result = startFuts # listen for incoming connections
proc stop*(s: Switch) {.async.} =
try:
trace "stopping switch"
# we want to report errors but we do not want to fail
@ -514,39 +512,49 @@ proc stop*(s: Switch) {.async.} =
for conn in conns:
try:
await s.cleanupConn(conn.conn)
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "error cleaning up connections"
for t in s.transports:
try:
await t.close()
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "error cleaning up transports"
trace "switch stopped"
except CatchableError as exc:
warn "error stopping switch", exc = exc.msg
proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog()
## Subscribe to pub sub peer
if s.pubSub.isSome and (peerInfo.id notin s.dialedPubSubPeers):
let conn = await s.getMuxedStream(peerInfo)
if isNil(conn):
if s.pubSub.isSome and not(s.pubSub.get().connected(peerInfo)):
trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog()
var stream: Connection
try:
stream = await s.getMuxedStream(peerInfo)
except CancelledError as exc:
if not(isNil(stream)):
await stream.close()
raise exc
except CatchableError as exc:
trace "exception in subscribe to peer", peer = peerInfo.shortLog,
exc = exc.msg
if not(isNil(stream)):
await stream.close()
if isNil(stream):
trace "unable to subscribe to peer", peer = peerInfo.shortLog
return
s.dialedPubSubPeers.incl(peerInfo.id)
try:
if (await s.ms.select(conn, s.pubSub.get().codec)):
await s.pubSub.get().subscribeToPeer(conn)
else:
await conn.close()
except CatchableError as exc:
trace "exception in subscribe to peer", peer = peerInfo.shortLog, exc = exc.msg
await conn.close()
finally:
s.dialedPubSubPeers.excl(peerInfo.id)
if not await s.ms.select(stream, s.pubSub.get().codec):
if not(isNil(stream)):
await stream.close()
return
await s.pubSub.get().subscribeToPeer(stream)
proc subscribe*(s: Switch, topic: string,
handler: TopicHandler): Future[void] =
@ -594,6 +602,43 @@ proc removeValidator*(s: Switch,
s.pubSub.get().removeValidator(topics, hook)
proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
var stream = await muxer.newStream()
defer:
if not(isNil(stream)):
await stream.close()
trace "got new muxer"
try:
# once we got a muxed connection, attempt to
# identify it
await s.identify(stream)
if isNil(stream.peerInfo):
await muxer.close()
return
muxer.connection.peerInfo = stream.peerInfo
# store muxer and muxed connection
await s.storeConn(muxer, Direction.In)
libp2p_peers.set(s.connections.len.int64)
muxer.connection.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.cleanupConn(muxer.connection)
# try establishing a pubsub connection
await s.subscribeToPeer(muxer.connection.peerInfo)
except CancelledError as exc:
await muxer.close()
raise exc
except CatchableError as exc:
await muxer.close()
libp2p_failed_upgrade.inc()
trace "exception in muxer handler", exc = exc.msg
proc newSwitch*(peerInfo: PeerInfo,
transports: seq[Transport],
identity: Identify,
@ -609,49 +654,25 @@ proc newSwitch*(peerInfo: PeerInfo,
result.identity = identity
result.muxers = muxers
result.secureManagers = @secureManagers
result.dialedPubSubPeers = initHashSet[string]()
let s = result # can't capture result
result.streamHandler = proc(stream: Connection) {.async, gcsafe.} =
try:
trace "handling connection for", peerInfo = $stream
try:
await s.ms.handle(stream) # handle incoming connection
finally:
if not(stream.closed):
defer:
if not(isNil(stream)):
await stream.close()
await s.ms.handle(stream) # handle incoming connection
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in stream handler", exc = exc.msg
result.mount(identity)
for key, val in muxers:
val.streamHandler = result.streamHandler
val.muxerHandler = proc(muxer: Muxer) {.async, gcsafe.} =
var stream: Connection
try:
trace "got new muxer"
stream = await muxer.newStream()
# once we got a muxed connection, attempt to
# identify it
muxer.connection.peerInfo = await s.identify(stream)
# store muxer and muxed connection
await s.storeConn(muxer, Direction.In)
libp2p_peers.set(s.connections.len.int64)
muxer.connection.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.cleanupConn(muxer.connection)
# try establishing a pubsub connection
await s.subscribeToPeer(muxer.connection.peerInfo)
except CatchableError as exc:
libp2p_failed_upgrade.inc()
trace "exception in muxer handler", exc = exc.msg
finally:
if not(isNil(stream)):
await stream.close()
val.muxerHandler = proc(muxer: Muxer): Future[void] =
s.muxerHandler(muxer)
if result.secureManagers.len <= 0:
# use plain text if no secure managers are provided

View File

@ -97,14 +97,7 @@ proc connCb(server: StreamServer,
raise exc
except CatchableError as err:
debug "Connection setup failed", err = err.msg
if not client.closed:
try:
client.close()
except CancelledError as err:
raise err
except CatchableError as err:
# shouldn't happen but..
warn "Error closing connection", err = err.msg
proc init*(T: type TcpTransport, flags: set[ServerFlags] = {}): T =
result = T(flags: flags)

View File

@ -243,7 +243,7 @@ suite "Mplex":
await done.wait(1.seconds)
await conn.close()
await mplexDialFut
await mplexDialFut.wait(1.seconds)
await allFuturesThrowing(
transport1.close(),
transport2.close())

View File

@ -71,8 +71,8 @@ proc createSwitch(ma: MultiAddress; outgoing: bool): (Switch, PeerInfo) =
suite "Noise":
teardown:
for tracker in testTrackers():
echo tracker.dump()
# check tracker.isLeaked() == false
# echo tracker.dump()
check tracker.isLeaked() == false
test "e2e: handle write + noise":
proc testListenerDialer(): Future[bool] {.async.} =