mirror of https://github.com/status-im/nim-eth.git
Fix bug when reading till eof (#483)
This commit is contained in:
parent
6c4d04562d
commit
dff9040cc1
|
@ -1048,6 +1048,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
||||||
socketAckNr = socket.ackNr,
|
socketAckNr = socket.ackNr,
|
||||||
socketSeqNr = socket.seqNr,
|
socketSeqNr = socket.seqNr,
|
||||||
windowPackets = socket.curWindowPackets,
|
windowPackets = socket.curWindowPackets,
|
||||||
|
rcvBufferSize = socket.offset,
|
||||||
packetType = p.header.pType,
|
packetType = p.header.pType,
|
||||||
seqNr = p.header.seqNr,
|
seqNr = p.header.seqNr,
|
||||||
ackNr = p.header.ackNr,
|
ackNr = p.header.ackNr,
|
||||||
|
@ -1463,6 +1464,12 @@ template shiftBuffer(t, c: untyped) =
|
||||||
(t).offset = 0
|
(t).offset = 0
|
||||||
|
|
||||||
proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
|
proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
|
||||||
|
debug "Handling incoming read",
|
||||||
|
rcvBufferSize = socket.offset,
|
||||||
|
reorderBufferSize = socket.inBufferBytes,
|
||||||
|
socketAtEOF = socket.atEof(),
|
||||||
|
readTillEOF = readReq.bytesToRead == 0
|
||||||
|
|
||||||
if readReq.reader.finished():
|
if readReq.reader.finished():
|
||||||
return ReadCancelled
|
return ReadCancelled
|
||||||
|
|
||||||
|
@ -1477,9 +1484,18 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
|
||||||
readReq.bytesAvailable.add(socket.rcvBuffer.toOpenArray(0, socket.offset - 1))
|
readReq.bytesAvailable.add(socket.rcvBuffer.toOpenArray(0, socket.offset - 1))
|
||||||
socket.shiftBuffer(socket.offset)
|
socket.shiftBuffer(socket.offset)
|
||||||
if (socket.atEof()):
|
if (socket.atEof()):
|
||||||
|
|
||||||
|
debug "Read finished",
|
||||||
|
bytesRead = len(readReq.bytesAvailable),
|
||||||
|
socektAtEof = socket.atEof()
|
||||||
|
|
||||||
readReq.reader.complete(readReq.bytesAvailable)
|
readReq.reader.complete(readReq.bytesAvailable)
|
||||||
return ReadFinished
|
return ReadFinished
|
||||||
else:
|
else:
|
||||||
|
debug "Read not finished",
|
||||||
|
bytesRead = len(readReq.bytesAvailable),
|
||||||
|
socektAtEof = socket.atEof()
|
||||||
|
|
||||||
return ReadNotFinished
|
return ReadNotFinished
|
||||||
else:
|
else:
|
||||||
let bytesAlreadyRead = len(readReq.bytesAvailable)
|
let bytesAlreadyRead = len(readReq.bytesAvailable)
|
||||||
|
@ -1488,9 +1504,17 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
|
||||||
readReq.bytesAvailable.add(socket.rcvBuffer.toOpenArray(0, count - 1))
|
readReq.bytesAvailable.add(socket.rcvBuffer.toOpenArray(0, count - 1))
|
||||||
socket.shiftBuffer(count)
|
socket.shiftBuffer(count)
|
||||||
if (len(readReq.bytesAvailable) == readReq.bytesToRead):
|
if (len(readReq.bytesAvailable) == readReq.bytesToRead):
|
||||||
|
debug "Read finished",
|
||||||
|
bytesRead = len(readReq.bytesAvailable),
|
||||||
|
socektAtEof = socket.atEof()
|
||||||
|
|
||||||
readReq.reader.complete(readReq.bytesAvailable)
|
readReq.reader.complete(readReq.bytesAvailable)
|
||||||
return ReadFinished
|
return ReadFinished
|
||||||
else:
|
else:
|
||||||
|
debug "Read not finished",
|
||||||
|
bytesRead = len(readReq.bytesAvailable),
|
||||||
|
socektAtEof = socket.atEof()
|
||||||
|
|
||||||
return ReadNotFinished
|
return ReadNotFinished
|
||||||
|
|
||||||
proc eventLoop(socket: UtpSocket) {.async.} =
|
proc eventLoop(socket: UtpSocket) {.async.} =
|
||||||
|
@ -1503,7 +1527,7 @@ proc eventLoop(socket: UtpSocket) {.async.} =
|
||||||
|
|
||||||
# we processed a packet and rcv buffer size is larger than 0,
|
# we processed a packet and rcv buffer size is larger than 0,
|
||||||
# check if we can finish some pending readers
|
# check if we can finish some pending readers
|
||||||
while socket.pendingReads.len() > 0 and socket.offset > 0:
|
while socket.pendingReads.len() > 0:
|
||||||
let readResult = socket.onRead(socket.pendingReads[0])
|
let readResult = socket.onRead(socket.pendingReads[0])
|
||||||
case readResult
|
case readResult
|
||||||
of ReadFinished:
|
of ReadFinished:
|
||||||
|
|
|
@ -120,7 +120,6 @@ procSuite "Utp protocol over udp tests with loss and delays":
|
||||||
bytesPerRead: int = 0): TestCase =
|
bytesPerRead: int = 0): TestCase =
|
||||||
TestCase(maxDelay: maxDelay, dropRate: dropRate, bytesToTransfer: bytesToTransfer, cfg: cfg, bytesPerRead: bytesPerRead)
|
TestCase(maxDelay: maxDelay, dropRate: dropRate, bytesToTransfer: bytesToTransfer, cfg: cfg, bytesPerRead: bytesPerRead)
|
||||||
|
|
||||||
|
|
||||||
let testCases = @[
|
let testCases = @[
|
||||||
TestCase.init(45, 10, 40000),
|
TestCase.init(45, 10, 40000),
|
||||||
TestCase.init(25, 15, 40000),
|
TestCase.init(25, 15, 40000),
|
||||||
|
@ -228,3 +227,32 @@ procSuite "Utp protocol over udp tests with loss and delays":
|
||||||
await clientProtocol.shutdownWait()
|
await clientProtocol.shutdownWait()
|
||||||
await serverProtocol.shutdownWait()
|
await serverProtocol.shutdownWait()
|
||||||
|
|
||||||
|
let testCase2 = @[
|
||||||
|
TestCase.init(45, 0, 40000),
|
||||||
|
TestCase.init(45, 0, 80000),
|
||||||
|
TestCase.init(25, 15, 40000),
|
||||||
|
TestCase.init(15, 5, 40000, SocketConfig.init(optRcvBuffer = uint32(10000), remoteWindowResetTimeout = seconds(5)))
|
||||||
|
]
|
||||||
|
|
||||||
|
asyncTest "Write large data and read till EOF":
|
||||||
|
for testCase in testCase2:
|
||||||
|
let (
|
||||||
|
clientProtocol,
|
||||||
|
clientSocket,
|
||||||
|
serverProtocol,
|
||||||
|
serverSocket) = await testScenario(testCase.maxDelay, testCase.dropRate, testcase.cfg)
|
||||||
|
|
||||||
|
|
||||||
|
let numBytes = testCase.bytesToTransfer
|
||||||
|
let bytesToTransfer = generateByteArray(rng[], numBytes)
|
||||||
|
|
||||||
|
discard await clientSocket.write(bytesToTransfer)
|
||||||
|
clientSocket.close()
|
||||||
|
|
||||||
|
let read = await serverSocket.read()
|
||||||
|
|
||||||
|
check:
|
||||||
|
read == bytesToTransfer
|
||||||
|
|
||||||
|
await clientProtocol.shutdownWait()
|
||||||
|
await serverProtocol.shutdownWait()
|
||||||
|
|
Loading…
Reference in New Issue