exceptions and resource cleanup
This commit is contained in:
parent
d4bdb42046
commit
d83ce4c932
|
@ -50,35 +50,24 @@ proc writeMsg*(conn: Connection,
|
||||||
trace "sending data over mplex", id,
|
trace "sending data over mplex", id,
|
||||||
msgType,
|
msgType,
|
||||||
data = data.len
|
data = data.len
|
||||||
try:
|
var
|
||||||
var
|
left = data.len
|
||||||
left = data.len
|
offset = 0
|
||||||
offset = 0
|
while left > 0 or data.len == 0:
|
||||||
while left > 0 or data.len == 0:
|
let
|
||||||
let
|
chunkSize = if left > MaxMsgSize: MaxMsgSize - 64 else: left
|
||||||
chunkSize = if left > MaxMsgSize: MaxMsgSize - 64 else: left
|
chunk = if chunkSize > 0 : data[offset..(offset + chunkSize - 1)] else: data
|
||||||
chunk = if chunkSize > 0 : data[offset..(offset + chunkSize - 1)] else: data
|
## write lenght prefixed
|
||||||
## write lenght prefixed
|
var buf = initVBuffer()
|
||||||
var buf = initVBuffer()
|
buf.writePBVarint(id shl 3 or ord(msgType).uint64)
|
||||||
buf.writePBVarint(id shl 3 or ord(msgType).uint64)
|
buf.writePBVarint(chunkSize.uint64) # size should be always sent
|
||||||
buf.writePBVarint(chunkSize.uint64) # size should be always sent
|
buf.finish()
|
||||||
buf.finish()
|
left = left - chunkSize
|
||||||
left = left - chunkSize
|
offset = offset + chunkSize
|
||||||
offset = offset + chunkSize
|
await conn.write(buf.buffer & chunk)
|
||||||
await conn.write(buf.buffer & chunk)
|
|
||||||
|
|
||||||
if data.len == 0:
|
if data.len == 0:
|
||||||
return
|
return
|
||||||
except LPStreamEOFError:
|
|
||||||
trace "Ignoring EOF while writing"
|
|
||||||
except CancelledError as exc:
|
|
||||||
raise exc
|
|
||||||
except CatchableError as exc:
|
|
||||||
# TODO these exceptions are ignored since it's likely that if writes are
|
|
||||||
# are failing, the underlying connection is already closed - this needs
|
|
||||||
# more cleanup though
|
|
||||||
debug "Could not write to connection", error = exc.name
|
|
||||||
trace "Could not write to connection - verbose", msg = exc.msg
|
|
||||||
|
|
||||||
proc writeMsg*(conn: Connection,
|
proc writeMsg*(conn: Connection,
|
||||||
id: uint64,
|
id: uint64,
|
||||||
|
|
|
@ -63,6 +63,8 @@ template withEOFExceptions(body: untyped): untyped =
|
||||||
except LPStreamIncompleteError as exc:
|
except LPStreamIncompleteError as exc:
|
||||||
trace "incomplete message", exc = exc.msg
|
trace "incomplete message", exc = exc.msg
|
||||||
|
|
||||||
|
method reset*(s: LPChannel) {.base, async, gcsafe.}
|
||||||
|
|
||||||
proc newChannel*(id: uint64,
|
proc newChannel*(id: uint64,
|
||||||
conn: Connection,
|
conn: Connection,
|
||||||
initiator: bool,
|
initiator: bool,
|
||||||
|
@ -81,17 +83,26 @@ proc newChannel*(id: uint64,
|
||||||
|
|
||||||
let chan = result
|
let chan = result
|
||||||
proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} =
|
proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} =
|
||||||
if chan.isLazy and not(chan.isOpen):
|
try:
|
||||||
await chan.open()
|
if chan.isLazy and not(chan.isOpen):
|
||||||
|
await chan.open()
|
||||||
|
|
||||||
# writes should happen in sequence
|
# writes should happen in sequence
|
||||||
trace "sending data", data = data.shortLog,
|
trace "sending data", data = data.shortLog,
|
||||||
id = chan.id,
|
id = chan.id,
|
||||||
initiator = chan.initiator,
|
initiator = chan.initiator,
|
||||||
name = chan.name,
|
name = chan.name,
|
||||||
oid = chan.oid
|
oid = chan.oid
|
||||||
|
|
||||||
await conn.writeMsg(chan.id, chan.msgCode, data) # write header
|
try:
|
||||||
|
await conn.writeMsg(chan.id,
|
||||||
|
chan.msgCode,
|
||||||
|
data).wait(2.minutes) # write header
|
||||||
|
except AsyncTimeoutError:
|
||||||
|
trace "timeout writting channel, resetting"
|
||||||
|
asyncCheck chan.reset()
|
||||||
|
except CatchableError as exc:
|
||||||
|
trace "unable to write in bufferstream handler", exc = exc.msg
|
||||||
|
|
||||||
result.initBufferStream(writeHandler, size)
|
result.initBufferStream(writeHandler, size)
|
||||||
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
||||||
|
@ -106,9 +117,9 @@ proc closeMessage(s: LPChannel) {.async.} =
|
||||||
withEOFExceptions:
|
withEOFExceptions:
|
||||||
withWriteLock(s.writeLock):
|
withWriteLock(s.writeLock):
|
||||||
trace "sending close message", id = s.id,
|
trace "sending close message", id = s.id,
|
||||||
initiator = s.initiator,
|
initiator = s.initiator,
|
||||||
name = s.name,
|
name = s.name,
|
||||||
oid = s.oid
|
oid = s.oid
|
||||||
|
|
||||||
await s.conn.writeMsg(s.id, s.closeCode) # write close
|
await s.conn.writeMsg(s.id, s.closeCode) # write close
|
||||||
|
|
||||||
|
@ -116,9 +127,9 @@ proc resetMessage(s: LPChannel) {.async.} =
|
||||||
withEOFExceptions:
|
withEOFExceptions:
|
||||||
withWriteLock(s.writeLock):
|
withWriteLock(s.writeLock):
|
||||||
trace "sending reset message", id = s.id,
|
trace "sending reset message", id = s.id,
|
||||||
initiator = s.initiator,
|
initiator = s.initiator,
|
||||||
name = s.name,
|
name = s.name,
|
||||||
oid = s.oid
|
oid = s.oid
|
||||||
|
|
||||||
await s.conn.writeMsg(s.id, s.resetCode) # write reset
|
await s.conn.writeMsg(s.id, s.resetCode) # write reset
|
||||||
|
|
||||||
|
@ -129,8 +140,8 @@ proc open*(s: LPChannel) {.async, gcsafe.} =
|
||||||
withEOFExceptions:
|
withEOFExceptions:
|
||||||
await s.conn.writeMsg(s.id, MessageType.New, s.name)
|
await s.conn.writeMsg(s.id, MessageType.New, s.name)
|
||||||
trace "oppened channel", oid = s.oid,
|
trace "oppened channel", oid = s.oid,
|
||||||
name = s.name,
|
name = s.name,
|
||||||
initiator = s.initiator
|
initiator = s.initiator
|
||||||
s.isOpen = true
|
s.isOpen = true
|
||||||
|
|
||||||
proc closeRemote*(s: LPChannel) {.async.} =
|
proc closeRemote*(s: LPChannel) {.async.} =
|
||||||
|
@ -144,9 +155,9 @@ proc closeRemote*(s: LPChannel) {.async.} =
|
||||||
await s.dataReadEvent.wait()
|
await s.dataReadEvent.wait()
|
||||||
s.dataReadEvent.clear()
|
s.dataReadEvent.clear()
|
||||||
|
|
||||||
# TODO: Not sure if this needs to be set here or bfore consuming
|
await s.close() # close local end
|
||||||
# the buffer
|
|
||||||
s.isEof = true # set EOF immediately to prevent further reads
|
s.isEof = true # set EOF immediately to prevent further reads
|
||||||
|
# call to avoid leacks
|
||||||
await procCall BufferStream(s).close() # close parent bufferstream
|
await procCall BufferStream(s).close() # close parent bufferstream
|
||||||
|
|
||||||
trace "channel closed on EOF", id = s.id,
|
trace "channel closed on EOF", id = s.id,
|
||||||
|
@ -161,33 +172,49 @@ method closed*(s: LPChannel): bool =
|
||||||
## header of the file
|
## header of the file
|
||||||
s.closedLocal
|
s.closedLocal
|
||||||
|
|
||||||
method close*(s: LPChannel) {.async, gcsafe.} =
|
method reset*(s: LPChannel) {.base, async, gcsafe.} =
|
||||||
if s.closedLocal:
|
|
||||||
return
|
|
||||||
|
|
||||||
trace "closing local lpchannel", id = s.id,
|
|
||||||
initiator = s.initiator,
|
|
||||||
name = s.name,
|
|
||||||
oid = s.oid
|
|
||||||
# TODO: we should install a timer that on expire
|
|
||||||
# will make sure the channel did close by the remote
|
|
||||||
# so the hald-closed flow completed, if it didn't
|
|
||||||
# we should send a `reset` and move on.
|
|
||||||
await s.closeMessage()
|
|
||||||
s.closedLocal = true
|
|
||||||
if s.atEof: # already closed by remote close parent buffer imediately
|
|
||||||
await procCall BufferStream(s).close()
|
|
||||||
|
|
||||||
trace "lpchannel closed local", id = s.id,
|
|
||||||
initiator = s.initiator,
|
|
||||||
name = s.name,
|
|
||||||
oid = s.oid
|
|
||||||
|
|
||||||
method reset*(s: LPChannel) {.base, async.} =
|
|
||||||
# we asyncCheck here because the other end
|
# we asyncCheck here because the other end
|
||||||
# might be dead already - reset is always
|
# might be dead already - reset is always
|
||||||
# optimistic
|
# optimistic
|
||||||
asyncCheck s.resetMessage()
|
asyncCheck s.resetMessage()
|
||||||
|
# # because of the async check above,
|
||||||
|
# # give the message time to depart
|
||||||
|
# await sleepAsync(100.millis)
|
||||||
await procCall BufferStream(s).close()
|
await procCall BufferStream(s).close()
|
||||||
s.isEof = true
|
s.isEof = true
|
||||||
s.closedLocal = true
|
s.closedLocal = true
|
||||||
|
|
||||||
|
method close*(s: LPChannel) {.async, gcsafe.} =
|
||||||
|
if s.closedLocal:
|
||||||
|
trace "channel already closed", id = s.id,
|
||||||
|
initiator = s.initiator,
|
||||||
|
name = s.name,
|
||||||
|
oid = s.oid
|
||||||
|
return
|
||||||
|
|
||||||
|
proc closeRemote() {.async.} =
|
||||||
|
try:
|
||||||
|
trace "closing local lpchannel", id = s.id,
|
||||||
|
initiator = s.initiator,
|
||||||
|
name = s.name,
|
||||||
|
oid = s.oid
|
||||||
|
# TODO: we should install a timer that on expire
|
||||||
|
# will make sure the channel did close by the remote
|
||||||
|
# so the hald-closed flow completed, if it didn't
|
||||||
|
# we should send a `reset` and move on.
|
||||||
|
await s.closeMessage().wait(2.minutes)
|
||||||
|
s.closedLocal = true
|
||||||
|
if s.atEof: # already closed by remote close parent buffer imediately
|
||||||
|
await procCall BufferStream(s).close()
|
||||||
|
except AsyncTimeoutError:
|
||||||
|
trace "close timeoud, reset channel"
|
||||||
|
asyncCheck s.reset() # reset on timeout
|
||||||
|
except CatchableError as exc:
|
||||||
|
trace "exception closing channel"
|
||||||
|
|
||||||
|
trace "lpchannel closed local", id = s.id,
|
||||||
|
initiator = s.initiator,
|
||||||
|
name = s.name,
|
||||||
|
oid = s.oid
|
||||||
|
|
||||||
|
asyncCheck closeRemote()
|
||||||
|
|
|
@ -26,6 +26,7 @@ type
|
||||||
Mplex* = ref object of Muxer
|
Mplex* = ref object of Muxer
|
||||||
remote: Table[uint64, LPChannel]
|
remote: Table[uint64, LPChannel]
|
||||||
local: Table[uint64, LPChannel]
|
local: Table[uint64, LPChannel]
|
||||||
|
conns: seq[Connection]
|
||||||
handlerFuts: seq[Future[void]]
|
handlerFuts: seq[Future[void]]
|
||||||
currentId*: uint64
|
currentId*: uint64
|
||||||
maxChannels*: uint64
|
maxChannels*: uint64
|
||||||
|
@ -93,17 +94,25 @@ method handle*(m: Mplex) {.async, gcsafe.} =
|
||||||
oid = m.oid
|
oid = m.oid
|
||||||
if not isNil(m.streamHandler):
|
if not isNil(m.streamHandler):
|
||||||
let stream = newConnection(channel)
|
let stream = newConnection(channel)
|
||||||
|
m.conns.add(stream)
|
||||||
stream.peerInfo = m.connection.peerInfo
|
stream.peerInfo = m.connection.peerInfo
|
||||||
|
|
||||||
var fut = newFuture[void]()
|
var fut = newFuture[void]()
|
||||||
proc handler() {.async.} =
|
proc handler() {.async.} =
|
||||||
tryAndWarn "mplex channel handler":
|
try:
|
||||||
await m.streamHandler(stream)
|
try:
|
||||||
|
await m.streamHandler(stream)
|
||||||
|
trace "streamhandler ended", oid = stream.oid
|
||||||
|
finally:
|
||||||
|
if not(stream.closed):
|
||||||
|
await stream.close()
|
||||||
|
except CatchableError as exc:
|
||||||
|
trace "exception in stream handler", exc = exc.msg
|
||||||
|
finally:
|
||||||
|
m.conns.keepItIf(it != stream)
|
||||||
|
m.handlerFuts.keepItIf(it != fut)
|
||||||
|
|
||||||
fut = handler()
|
fut = handler()
|
||||||
m.handlerFuts.add(fut)
|
|
||||||
fut.addCallback do(udata: pointer):
|
|
||||||
m.handlerFuts.keepItIf(it != fut)
|
|
||||||
|
|
||||||
of MessageType.MsgIn, MessageType.MsgOut:
|
of MessageType.MsgIn, MessageType.MsgOut:
|
||||||
trace "pushing data to channel", id = id,
|
trace "pushing data to channel", id = id,
|
||||||
|
@ -191,16 +200,16 @@ method close*(m: Mplex) {.async, gcsafe.} =
|
||||||
return
|
return
|
||||||
|
|
||||||
trace "closing mplex muxer", oid = m.oid
|
trace "closing mplex muxer", oid = m.oid
|
||||||
|
await all(
|
||||||
|
toSeq(m.remote.values).mapIt(it.reset()) &
|
||||||
|
toSeq(m.local.values).mapIt(it.reset()))
|
||||||
|
|
||||||
checkFutures(
|
await all(m.conns.mapIt(it.close())) # dispose of channe's connections
|
||||||
await allFinished(
|
await all(m.handlerFuts)
|
||||||
toSeq(m.remote.values).mapIt(it.reset()) &
|
|
||||||
toSeq(m.local.values).mapIt(it.reset())))
|
|
||||||
|
|
||||||
checkFutures(await allFinished(m.handlerFuts))
|
|
||||||
|
|
||||||
await m.connection.close()
|
await m.connection.close()
|
||||||
m.remote.clear()
|
m.remote.clear()
|
||||||
m.local.clear()
|
m.local.clear()
|
||||||
|
m.conns = @[]
|
||||||
m.handlerFuts = @[]
|
m.handlerFuts = @[]
|
||||||
m.isClosed = true
|
m.isClosed = true
|
||||||
|
|
|
@ -46,22 +46,23 @@ proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider
|
||||||
|
|
||||||
method init(c: MuxerProvider) =
|
method init(c: MuxerProvider) =
|
||||||
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||||
let
|
try:
|
||||||
muxer = c.newMuxer(conn)
|
let
|
||||||
|
muxer = c.newMuxer(conn)
|
||||||
|
|
||||||
if not isNil(c.streamHandler):
|
if not isNil(c.streamHandler):
|
||||||
muxer.streamHandler = c.streamHandler
|
muxer.streamHandler = c.streamHandler
|
||||||
|
|
||||||
var futs = newSeq[Future[void]]()
|
var futs = newSeq[Future[void]]()
|
||||||
|
futs &= muxer.handle()
|
||||||
|
|
||||||
futs &= muxer.handle()
|
# finally await both the futures
|
||||||
|
if not isNil(c.muxerHandler):
|
||||||
|
futs &= c.muxerHandler(muxer)
|
||||||
|
|
||||||
# finally await both the futures
|
# log and re-raise on errors
|
||||||
if not isNil(c.muxerHandler):
|
await all(futs)
|
||||||
futs &= c.muxerHandler(muxer)
|
except CatchableError as exc:
|
||||||
|
trace "exception in muxer handler", exc = exc.msg
|
||||||
# log and re-raise on errors
|
|
||||||
futs = await allFinished(futs)
|
|
||||||
checkFutures(futs)
|
|
||||||
|
|
||||||
c.handler = handler
|
c.handler = handler
|
||||||
|
|
Loading…
Reference in New Issue