From dff9040cc164d0ae0c2cb108b33dd81d010fb245 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Thu, 3 Mar 2022 22:38:13 +0100 Subject: [PATCH] Fix bug when reading till eof (#483) --- eth/utp/utp_socket.nim | 26 ++++++++++++++++++++- tests/utp/test_protocol_integration.nim | 30 ++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index fa5bf20..10acbd5 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -1048,6 +1048,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = socketAckNr = socket.ackNr, socketSeqNr = socket.seqNr, windowPackets = socket.curWindowPackets, + rcvBufferSize = socket.offset, packetType = p.header.pType, seqNr = p.header.seqNr, ackNr = p.header.ackNr, @@ -1463,6 +1464,12 @@ template shiftBuffer(t, c: untyped) = (t).offset = 0 proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult = + debug "Handling incoming read", + rcvBufferSize = socket.offset, + reorderBufferSize = socket.inBufferBytes, + socketAtEOF = socket.atEof(), + readTillEOF = readReq.bytesToRead == 0 + if readReq.reader.finished(): return ReadCancelled @@ -1477,9 +1484,18 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult = readReq.bytesAvailable.add(socket.rcvBuffer.toOpenArray(0, socket.offset - 1)) socket.shiftBuffer(socket.offset) if (socket.atEof()): + + debug "Read finished", + bytesRead = len(readReq.bytesAvailable), + socektAtEof = socket.atEof() + readReq.reader.complete(readReq.bytesAvailable) return ReadFinished else: + debug "Read not finished", + bytesRead = len(readReq.bytesAvailable), + socektAtEof = socket.atEof() + return ReadNotFinished else: let bytesAlreadyRead = len(readReq.bytesAvailable) @@ -1488,9 +1504,17 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult = readReq.bytesAvailable.add(socket.rcvBuffer.toOpenArray(0, count - 1)) socket.shiftBuffer(count) if (len(readReq.bytesAvailable) == readReq.bytesToRead): + debug "Read finished", + bytesRead = len(readReq.bytesAvailable), + socektAtEof = socket.atEof() + readReq.reader.complete(readReq.bytesAvailable) return ReadFinished else: + debug "Read not finished", + bytesRead = len(readReq.bytesAvailable), + socektAtEof = socket.atEof() + return ReadNotFinished proc eventLoop(socket: UtpSocket) {.async.} = @@ -1503,7 +1527,7 @@ proc eventLoop(socket: UtpSocket) {.async.} = # we processed a packet and rcv buffer size is larger than 0, # check if we can finish some pending readers - while socket.pendingReads.len() > 0 and socket.offset > 0: + while socket.pendingReads.len() > 0: let readResult = socket.onRead(socket.pendingReads[0]) case readResult of ReadFinished: diff --git a/tests/utp/test_protocol_integration.nim b/tests/utp/test_protocol_integration.nim index d3121a9..63fad08 100644 --- a/tests/utp/test_protocol_integration.nim +++ b/tests/utp/test_protocol_integration.nim @@ -120,7 +120,6 @@ procSuite "Utp protocol over udp tests with loss and delays": bytesPerRead: int = 0): TestCase = TestCase(maxDelay: maxDelay, dropRate: dropRate, bytesToTransfer: bytesToTransfer, cfg: cfg, bytesPerRead: bytesPerRead) - let testCases = @[ TestCase.init(45, 10, 40000), TestCase.init(25, 15, 40000), @@ -228,3 +227,32 @@ procSuite "Utp protocol over udp tests with loss and delays": await clientProtocol.shutdownWait() await serverProtocol.shutdownWait() + let testCase2 = @[ + TestCase.init(45, 0, 40000), + TestCase.init(45, 0, 80000), + TestCase.init(25, 15, 40000), + TestCase.init(15, 5, 40000, SocketConfig.init(optRcvBuffer = uint32(10000), remoteWindowResetTimeout = seconds(5))) + ] + + asyncTest "Write large data and read till EOF": + for testCase in testCase2: + let ( + clientProtocol, + clientSocket, + serverProtocol, + serverSocket) = await testScenario(testCase.maxDelay, testCase.dropRate, testcase.cfg) + + + let numBytes = testCase.bytesToTransfer + let bytesToTransfer = generateByteArray(rng[], numBytes) + + discard await clientSocket.write(bytesToTransfer) + clientSocket.close() + + let read = await serverSocket.read() + + check: + read == bytesToTransfer + + await clientProtocol.shutdownWait() + await serverProtocol.shutdownWait()