first attempt to proper channel cleanup

This commit is contained in:
Dmitriy Ryajov 2019-09-07 17:34:40 -06:00
parent 65ce1a93fc
commit b5dcfa9bc4
1 changed files with 44 additions and 27 deletions

View File

@ -16,7 +16,7 @@
## This still needs to be implemented properly - I'm leaving it ## This still needs to be implemented properly - I'm leaving it
## here to not forget that this needs to be fixed ASAP. ## here to not forget that this needs to be fixed ASAP.
import tables, sequtils, strformat, options import tables, sequtils, options, strformat
import chronos import chronos
import coder, types, channel, import coder, types, channel,
../../varint, ../../varint,
@ -56,33 +56,50 @@ proc newStreamInternal*(m: Mplex,
result = newChannel(id, m.connection, initiator, name) result = newChannel(id, m.connection, initiator, name)
m.getChannelList(initiator)[id] = result m.getChannelList(initiator)[id] = result
method handle*(m: Mplex): Future[void] {.async, gcsafe.} = method handle*(m: Mplex) {.async, gcsafe.} =
try:
while not m.connection.closed: while not m.connection.closed:
try: let msgRes = await m.connection.readMsg()
let (id, msgType, data) = await m.connection.readMsg() if msgRes.isNone:
let initiator = bool(ord(msgType) and 1) await sleepAsync(100.millis)
var channel: Channel continue
if MessageType(msgType) != MessageType.New:
let channels = m.getChannelList(initiator)
if not channels.contains(id):
raise newMplexNoSuchChannel(id, msgType)
channel = channels[id]
case msgType: let (id, msgType, data) = msgRes.get()
of MessageType.New: let initiator = bool(ord(msgType) and 1)
channel = await m.newStreamInternal(false, id, cast[string](data)) var channel: Channel
if not isNil(m.streamHandler): if MessageType(msgType) != MessageType.New:
await m.streamHandler(newConnection(channel)) let channels = m.getChannelList(initiator)
of MessageType.MsgIn, MessageType.MsgOut: if not channels.contains(id):
await channel.pushTo(data) raise newMplexNoSuchChannel(id, msgType)
of MessageType.CloseIn, MessageType.CloseOut: channel = channels[id]
await channel.closedByRemote()
m.getChannelList(initiator).del(id) case msgType:
of MessageType.ResetIn, MessageType.ResetOut: of MessageType.New:
await channel.resetByRemote() channel = await m.newStreamInternal(false, id, cast[string](data))
else: raise newMplexUnknownMsgError() if not isNil(m.streamHandler):
finally: let handlerFut = m.streamHandler(newConnection(channel))
await m.connection.close() proc cleanUpChan(udata: pointer) {.gcsafe.} =
if handlerFut.finished:
channel.close().addCallback(
proc(udata: pointer) =
# TODO: is waitFor() OK here?
channel.cleanUp()
.addCallback(proc(udata: pointer) =
echo &"cleaned up channel {$id}")
)
handlerFut.addCallback(cleanUpChan)
continue
of MessageType.MsgIn, MessageType.MsgOut:
await channel.pushTo(data)
of MessageType.CloseIn, MessageType.CloseOut:
await channel.closedByRemote()
m.getChannelList(initiator).del(id)
of MessageType.ResetIn, MessageType.ResetOut:
await channel.resetByRemote()
break
else: raise newMplexUnknownMsgError()
finally:
await m.connection.close()
proc newMplex*(conn: Connection, proc newMplex*(conn: Connection,
maxChanns: uint = MaxChannels): Mplex = maxChanns: uint = MaxChannels): Mplex =
@ -94,7 +111,7 @@ proc newMplex*(conn: Connection,
method newStream*(m: Mplex, name: string = ""): Future[Connection] {.async, gcsafe.} = method newStream*(m: Mplex, name: string = ""): Future[Connection] {.async, gcsafe.} =
let channel = await m.newStreamInternal() let channel = await m.newStreamInternal()
await m.connection.writeMsg(channel.id, MessageType.New, cast[seq[byte]](toSeq(name.items))) await m.connection.writeMsg(channel.id, MessageType.New, name)
result = newConnection(channel) result = newConnection(channel)
method close*(m: Mplex) {.async, gcsafe.} = method close*(m: Mplex) {.async, gcsafe.} =