Change closing proc (#492)

This commit is contained in:
KonradStaniec 2022-03-28 12:35:08 +02:00 committed by GitHub
parent 4c7cdcaaf2
commit c28597fee5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 83 additions and 6 deletions

View File

@ -612,9 +612,13 @@ proc checkTimeoutsLoop(s: UtpSocket) {.async.} =
try: try:
while true: while true:
await sleepAsync(checkTimeoutsLoopInterval) await sleepAsync(checkTimeoutsLoopInterval)
await s.eventQueue.put(SocketEvent(kind: CheckTimeouts)) s.eventQueue.putNoWait(SocketEvent(kind: CheckTimeouts))
except CancelledError: except CancelledError as exc:
# check timeouts loop is last running future managed by socket, if its
# cancelled we can fire closeEvent
s.closeEvent.fire()
trace "checkTimeoutsLoop canceled" trace "checkTimeoutsLoop canceled"
raise exc
proc startTimeoutLoop(s: UtpSocket) = proc startTimeoutLoop(s: UtpSocket) =
s.checkTimeoutsLoop = checkTimeoutsLoop(s) s.checkTimeoutsLoop = checkTimeoutsLoop(s)
@ -682,6 +686,10 @@ proc isConnected*(socket: UtpSocket): bool =
proc isClosed*(socket: UtpSocket): bool = proc isClosed*(socket: UtpSocket): bool =
socket.state == Destroy and socket.closeEvent.isSet() socket.state == Destroy and socket.closeEvent.isSet()
proc isClosedAndCleanedUpAllResources*(socket: UtpSocket): bool =
## Test Api to check that all resources are properly cleaned up
socket.isClosed() and socket.eventLoop.cancelled() and socket.checkTimeoutsLoop.cancelled()
proc destroy*(s: UtpSocket) = proc destroy*(s: UtpSocket) =
info "Destroying socket", info "Destroying socket",
to = s.socketKey to = s.socketKey
@ -689,8 +697,12 @@ proc destroy*(s: UtpSocket) =
## Remote is not notified in any way about socket end of life ## Remote is not notified in any way about socket end of life
s.state = Destroy s.state = Destroy
s.eventLoop.cancel() s.eventLoop.cancel()
s.checkTimeoutsLoop.cancel() # This procedure initiate cleanup process which goes like:
s.closeEvent.fire() # Cancel EventLoop -> Cancel timeoutsLoop -> Fire closeEvent
# This is necessary due to how evenLoop look like i.e it has only one await
# point on `eventQueue.get` which trigger cancellation excepion only when
# someone will try run `eventQueue.put`. Without `eventQueue.put` , eventLoop
# future shows as cancelled, but handler for CancelledError is not run
proc destroyWait*(s: UtpSocket) {.async.} = proc destroyWait*(s: UtpSocket) {.async.} =
## Moves socket to destroy state and clean all reasources and wait for all registered ## Moves socket to destroy state and clean all reasources and wait for all registered
@ -1609,9 +1621,8 @@ proc eventLoop(socket: UtpSocket) {.async.} =
else: else:
# in any other case we do not need to do any thing # in any other case we do not need to do any thing
discard discard
socket.checkTimeouts() socket.checkTimeouts()
except CancelledError: except CancelledError as exc:
for w in socket.pendingWrites.items(): for w in socket.pendingWrites.items():
if w.kind == Data and (not w.writer.finished()): if w.kind == Data and (not w.writer.finished()):
let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
@ -1624,7 +1635,10 @@ proc eventLoop(socket: UtpSocket) {.async.} =
r.reader.complete(r.bytesAvailable) r.reader.complete(r.bytesAvailable)
socket.pendingWrites.clear() socket.pendingWrites.clear()
socket.pendingReads.clear() socket.pendingReads.clear()
# main eventLoop has been cancelled, try to cancel check timeouts loop
socket.checkTimeoutsLoop.cancel()
trace "main socket event loop cancelled" trace "main socket event loop cancelled"
raise exc
proc startEventLoop(s: UtpSocket) = proc startEventLoop(s: UtpSocket) =
s.eventLoop = eventLoop(s) s.eventLoop = eventLoop(s)

View File

@ -312,6 +312,8 @@ procSuite "Utp router unit tests":
let connectResult = await connectFuture let connectResult = await connectFuture
await waitUntil(proc (): bool = router.len() == 0)
check: check:
connectResult.isErr() connectResult.isErr()
connectResult.error().kind == ConnectionTimedOut connectResult.error().kind == ConnectionTimedOut

View File

@ -1465,3 +1465,64 @@ procSuite "Utp socket unit test":
receivedData == dataRecived receivedData == dataRecived
await sock1.destroyWait() await sock1.destroyWait()
asyncTest "Clean up all resources when closing due to timeout failure":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let dataToWrite = @[1'u8]
let customCfg = SocketConfig.init(dataResendsBeforeFailure = 2, optSndBuffer = 1)
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = customCfg)
let bytesWritten = await outgoingSocket.write(dataToWrite)
# this future will never finish as there is not place in write buffer
# although it should get properly clearead up when socket is closed
let writeFut = outgoingSocket.write(dataToWrite)
check:
bytesWritten.get() == len(dataToWrite)
let sentPacket = await q.get()
# wait for failure and cleanup of all resources
await waitUntil(proc (): bool = outgoingSocket.isClosedAndCleanedUpAllResources())
check:
# main event loop handler should fire and clean up dangling future
writeFut.finished()
await outgoingSocket.destroyWait()
asyncTest "Receiving ack for fin packet should destroy socket and clean up all resources":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
outgoingSocket.close()
let sendFin = await q.get()
check:
sendFin.header.pType == ST_FIN
let responseAck =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
sendFin.header.seqNr,
testBufferSize,
0
)
await outgoingSocket.processPacket(responseAck)
await waitUntil(proc (): bool = not outgoingSocket.isConnected())
check:
not outgoingSocket.isConnected()
await waitUntil(proc (): bool = outgoingSocket.isClosedAndCleanedUpAllResources())
check:
outgoingSocket.isClosedAndCleanedUpAllResources()
await outgoingSocket.destroyWait()