does not finish write futures while the channel is being closed

This commit is contained in:
Diego 2024-02-26 19:56:00 +01:00
parent e9b456162a
commit 242f516b5b
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806

View File

@ -61,6 +61,7 @@ type
closeCode*: MessageType # cached in/out close code closeCode*: MessageType # cached in/out close code
resetCode*: MessageType # cached in/out reset code resetCode*: MessageType # cached in/out reset code
writes*: int # In-flight writes writes*: int # In-flight writes
closing: Future[void]
func shortLog*(s: LPChannel): auto = func shortLog*(s: LPChannel): auto =
try: try:
@ -99,7 +100,7 @@ proc reset*(s: LPChannel) {.async.} =
if s.isClosed: if s.isClosed:
trace "Already closed", s trace "Already closed", s
return return
s.closing = newFuture[void]()
s.isClosed = true s.isClosed = true
s.closedLocal = true s.closedLocal = true
s.localReset = not s.remoteReset s.localReset = not s.remoteReset
@ -117,10 +118,11 @@ proc reset*(s: LPChannel) {.async.} =
await s.conn.close() await s.conn.close()
trace "Can't send reset message", s, conn = s.conn, msg = exc.msg trace "Can't send reset message", s, conn = s.conn, msg = exc.msg
asyncSpawn resetMessage() await resetMessage()
await s.closeImpl() # noraises, nocancels await s.closeImpl() # noraises, nocancels
if not s.closing.finished:
s.closing.complete()
trace "Channel reset", s trace "Channel reset", s
method close*(s: LPChannel) {.async.} = method close*(s: LPChannel) {.async.} =
@ -130,6 +132,8 @@ method close*(s: LPChannel) {.async.} =
if s.closedLocal: if s.closedLocal:
trace "Already closed", s trace "Already closed", s
return return
s.closing = newFuture[void]()
s.closedLocal = true s.closedLocal = true
trace "Closing channel", s, conn = s.conn, len = s.len trace "Closing channel", s, conn = s.conn, len = s.len
@ -147,7 +151,8 @@ method close*(s: LPChannel) {.async.} =
trace "Cannot send close message", s, id = s.id, msg = exc.msg trace "Cannot send close message", s, id = s.id, msg = exc.msg
await s.closeUnderlying() # maybe already eofed await s.closeUnderlying() # maybe already eofed
if not s.closing.finished:
s.closing.complete()
trace "Closed channel", s, len = s.len trace "Closed channel", s, len = s.len
method initStream*(s: LPChannel) = method initStream*(s: LPChannel) =
@ -200,6 +205,7 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
if s.remoteReset: if s.remoteReset:
raise newLPStreamResetError() raise newLPStreamResetError()
if s.closedLocal: if s.closedLocal:
await s.closing
raise newLPStreamClosedError() raise newLPStreamClosedError()
if s.conn.closed: if s.conn.closed:
raise newLPStreamConnDownError() raise newLPStreamConnDownError()
@ -293,8 +299,9 @@ proc init*(
msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn, msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn,
closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn, closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn,
resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn, resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn,
dir: if initiator: Direction.Out else: Direction.In) dir: if initiator: Direction.Out else: Direction.In,
closing: newFuture[void]())
chann.closing.complete()
chann.initStream() chann.initStream()
when chronicles.enabledLogLevel == LogLevel.TRACE: when chronicles.enabledLogLevel == LogLevel.TRACE: