From 22d0ac81e1495960e61df65d80d8a9c6f31d0726 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Wed, 30 Nov 2022 09:34:08 +0100 Subject: [PATCH] Fix defect when writing over send buffer (#564) * Fix defect when writing over send buffer --- eth/utp/utp_socket.nim | 72 ++++++++++++++++++++++----------- tests/utp/test_utils.nim | 2 +- tests/utp/test_utp_socket.nim | 76 +++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 24 deletions(-) diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 0a1bd17..3d2801e 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -1554,10 +1554,10 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult = proc eventLoop(socket: UtpSocket) {.async.} = try: while true: - let ev = await socket.eventQueue.get() - case ev.kind + let socketEvent = await socket.eventQueue.get() + case socketEvent.kind of NewPacket: - socket.processPacketInternal(ev.packet) + socket.processPacketInternal(socketEvent.packet) # we processed a packet and rcv buffer size is larger than 0, # check if we can finish some pending readers @@ -1577,8 +1577,8 @@ proc eventLoop(socket: UtpSocket) {.async.} = # we processed packet, so there could more place in the send buffer while socket.pendingWrites.len() > 0: - let wr = socket.pendingWrites.popFirst() - case wr.kind + let pendingWrite = socket.pendingWrites.popFirst() + case pendingWrite.kind of Close: socket.handleClose() # close should be last packet send @@ -1587,15 +1587,24 @@ proc eventLoop(socket: UtpSocket) {.async.} = # check if writing was not cancelled in the mean time. This approach # can create partial writes as part of the data could be written with # with WriteReq - if (not wr.writer.finished()): - let bytesWritten = socket.handleDataWrite(wr.data) - if (bytesWritten == len(wr.data)): - # all bytes were written we can finish external future - wr.writer.complete(Result[int, WriteError].ok(bytesWritten)) + if (not pendingWrite.writer.finished()): + let bytesWritten = socket.handleDataWrite(pendingWrite.data) + if (bytesWritten == len(pendingWrite.data)): + # all bytes were written we can finish external future + pendingWrite.writer.complete( + Result[int, WriteError].ok(bytesWritten) + ) else: - let bytesLeft = wr.data[bytesWritten..ev.data.high] + let bytesLeft = + pendingWrite.data[bytesWritten..pendingWrite.data.high] # bytes partially written to buffer, schedule rest of data for later - socket.pendingWrites.addFirst(WriteRequest(kind: Data, data: bytesLeft, writer: ev.writer)) + socket.pendingWrites.addFirst( + WriteRequest( + kind: Data, + data: bytesLeft, + writer: pendingWrite.writer + ) + ) # there is no more place in the buffer break from the loop break of CheckTimeouts: @@ -1608,28 +1617,43 @@ proc eventLoop(socket: UtpSocket) {.async.} = socket.handleClose() of WriteReq: # check if the writer was not cancelled in mean time - if (not ev.writer.finished()): + if (not socketEvent.writer.finished()): if (socket.pendingWrites.len() > 0): # there are still some unfinished writes, waiting to be finished schdule this batch for later - socket.pendingWrites.addLast(WriteRequest(kind: Data, data: ev.data, writer: ev.writer)) + socket.pendingWrites.addLast( + WriteRequest( + kind: Data, + data: socketEvent.data, + writer: socketEvent.writer + ) + ) else: - let bytesWritten = socket.handleDataWrite(ev.data) - if (bytesWritten == len(ev.data)): + let bytesWritten = socket.handleDataWrite(socketEvent.data) + if (bytesWritten == len(socketEvent.data)): # all bytes were written we can finish external future - ev.writer.complete(Result[int, WriteError].ok(bytesWritten)) + socketEvent.writer.complete( + Result[int, WriteError].ok(bytesWritten) + ) else: - let bytesLeft = ev.data[bytesWritten..ev.data.high] + let bytesLeft = + socketEvent.data[bytesWritten..socketEvent.data.high] # bytes partially written to buffer, schedule rest of data for later - socket.pendingWrites.addLast(WriteRequest(kind: Data, data: bytesLeft, writer: ev.writer)) + socket.pendingWrites.addLast( + WriteRequest( + kind: Data, + data: bytesLeft, + writer: socketEvent.writer + ) + ) of ReadReqType: # check if the writer was not cancelled in mean time - if (not ev.readReq.reader.finished()): + if (not socketEvent.readReq.reader.finished()): if (socket.pendingReads.len() > 0): # there is already pending unfinished read request, schedule this one for # later - socket.pendingReads.addLast(ev.readReq) + socket.pendingReads.addLast(socketEvent.readReq) else: - var readReq = ev.readReq + var readReq = socketEvent.readReq let readResult = socket.onRead(readReq) case readResult of ReadNotFinished: @@ -1641,7 +1665,9 @@ proc eventLoop(socket: UtpSocket) {.async.} = except CancelledError as exc: for w in socket.pendingWrites.items(): if w.kind == Data and (not w.writer.finished()): - let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) + let res = Result[int, WriteError].err( + WriteError(kind: SocketNotWriteable, currentState: socket.state) + ) w.writer.complete(res) for r in socket.pendingReads.items(): # complete every reader with already read bytes diff --git a/tests/utp/test_utils.nim b/tests/utp/test_utils.nim index 962a31a..c94b71a 100644 --- a/tests/utp/test_utils.nim +++ b/tests/utp/test_utils.nim @@ -6,7 +6,7 @@ import type AssertionCallback = proc(): bool {.gcsafe, raises: [Defect].} -let testBufferSize = 1024'u32 +let testBufferSize* = 1024'u32 let defaultRcvOutgoingId = 314'u16 proc waitUntil*(f: AssertionCallback): Future[void] {.async.} = diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index 685e686..9cff6f3 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -1552,3 +1552,79 @@ procSuite "Utp socket unit test": len(dp2.payload) == int(maxPayloadSize) await outgoingSocket.destroyWait() + + proc sendAndDrainPacketByPacket( + socket: UtpSocket[TransportAddress], + socketSendQueue: AsyncQueue[Packet], + initalRemoteSeq: uint16, + initialPacket: Packet, + bytes: seq[byte]): Future[seq[byte]] {.async.} = + var sentAndAckedBytes: seq[byte] + let numBytesToSend = len(bytes) + + if numBytesToSend == 0: + return sentAndAckedBytes + + let writeFut = socket.write(bytes) + + while true: + if len(sentAndAckedBytes) == numBytesToSend: + check: + writeFut.finished() + + return sentAndAckedBytes + + let sentPacket = await socketSendQueue.get() + + check: + sentPacket.header.pType == ST_DATA + + let ackPacket = ackPacket( + initalRemoteSeq, + initialPacket.header.connectionId, + sentPacket.header.seqNr, + # remote buffer always allows for only one packet + uint32(socket.getPacketSize()), + 0 + ) + + await socket.processPacket(ackPacket) + + # remote received and acked packet add it sent and received bytes + sentAndAckedBytes.add(sentPacket.payload) + + asyncTest "Async block large write until there is space in snd buffer": + # remote is initialized with buffer to small to handle whole payload + let + sndBufferSize = 5000 + dataToWrite = 2 * sndBufferSize + q = newAsyncQueue[Packet]() + initialRemoteSeq = 10'u16 + customConfig = SocketConfig.init( + # small write buffer. Big write should async block, and process write + # as soon as buffer is freed by processing remote acks. + optSndBuffer = uint32(sndBufferSize) + ) + (outgoingSocket, initialPacket) = connectOutGoingSocket( + initialRemoteSeq, + q, + testBufferSize, + customConfig + ) + largeDataToWrite = rng[].generateBytes(dataToWrite) + + # As we are sending data larger than send buffer socket.write will not finish + # immediately but will progreass with each packet acked by remote. + let bytesReceivedByRemote = await sendAndDrainPacketByPacket( + outgoingSocket, + q, + initialRemoteSeq, + initialPacket, + largeDataToWrite + ) + + check: + bytesReceivedByRemote == largeDataToWrite + + await outgoingSocket.destroyWait() +