Fix defect when writing over send buffer (#564)

* Fix defect when writing over send buffer
This commit is contained in:
KonradStaniec 2022-11-30 09:34:08 +01:00 committed by GitHub
parent 8f4ef19fc9
commit 22d0ac81e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 126 additions and 24 deletions

View File

@ -1554,10 +1554,10 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
proc eventLoop(socket: UtpSocket) {.async.} =
try:
while true:
let ev = await socket.eventQueue.get()
case ev.kind
let socketEvent = await socket.eventQueue.get()
case socketEvent.kind
of NewPacket:
socket.processPacketInternal(ev.packet)
socket.processPacketInternal(socketEvent.packet)
# we processed a packet and rcv buffer size is larger than 0,
# check if we can finish some pending readers
@ -1577,8 +1577,8 @@ proc eventLoop(socket: UtpSocket) {.async.} =
# we processed packet, so there could more place in the send buffer
while socket.pendingWrites.len() > 0:
let wr = socket.pendingWrites.popFirst()
case wr.kind
let pendingWrite = socket.pendingWrites.popFirst()
case pendingWrite.kind
of Close:
socket.handleClose()
# close should be last packet send
@ -1587,15 +1587,24 @@ proc eventLoop(socket: UtpSocket) {.async.} =
# check if writing was not cancelled in the mean time. This approach
# can create partial writes as part of the data could be written with
# with WriteReq
if (not wr.writer.finished()):
let bytesWritten = socket.handleDataWrite(wr.data)
if (bytesWritten == len(wr.data)):
# all bytes were written we can finish external future
wr.writer.complete(Result[int, WriteError].ok(bytesWritten))
if (not pendingWrite.writer.finished()):
let bytesWritten = socket.handleDataWrite(pendingWrite.data)
if (bytesWritten == len(pendingWrite.data)):
# all bytes were written we can finish external future
pendingWrite.writer.complete(
Result[int, WriteError].ok(bytesWritten)
)
else:
let bytesLeft = wr.data[bytesWritten..ev.data.high]
let bytesLeft =
pendingWrite.data[bytesWritten..pendingWrite.data.high]
# bytes partially written to buffer, schedule rest of data for later
socket.pendingWrites.addFirst(WriteRequest(kind: Data, data: bytesLeft, writer: ev.writer))
socket.pendingWrites.addFirst(
WriteRequest(
kind: Data,
data: bytesLeft,
writer: pendingWrite.writer
)
)
# there is no more place in the buffer break from the loop
break
of CheckTimeouts:
@ -1608,28 +1617,43 @@ proc eventLoop(socket: UtpSocket) {.async.} =
socket.handleClose()
of WriteReq:
# check if the writer was not cancelled in mean time
if (not ev.writer.finished()):
if (not socketEvent.writer.finished()):
if (socket.pendingWrites.len() > 0):
# there are still some unfinished writes, waiting to be finished schdule this batch for later
socket.pendingWrites.addLast(WriteRequest(kind: Data, data: ev.data, writer: ev.writer))
socket.pendingWrites.addLast(
WriteRequest(
kind: Data,
data: socketEvent.data,
writer: socketEvent.writer
)
)
else:
let bytesWritten = socket.handleDataWrite(ev.data)
if (bytesWritten == len(ev.data)):
let bytesWritten = socket.handleDataWrite(socketEvent.data)
if (bytesWritten == len(socketEvent.data)):
# all bytes were written we can finish external future
ev.writer.complete(Result[int, WriteError].ok(bytesWritten))
socketEvent.writer.complete(
Result[int, WriteError].ok(bytesWritten)
)
else:
let bytesLeft = ev.data[bytesWritten..ev.data.high]
let bytesLeft =
socketEvent.data[bytesWritten..socketEvent.data.high]
# bytes partially written to buffer, schedule rest of data for later
socket.pendingWrites.addLast(WriteRequest(kind: Data, data: bytesLeft, writer: ev.writer))
socket.pendingWrites.addLast(
WriteRequest(
kind: Data,
data: bytesLeft,
writer: socketEvent.writer
)
)
of ReadReqType:
# check if the writer was not cancelled in mean time
if (not ev.readReq.reader.finished()):
if (not socketEvent.readReq.reader.finished()):
if (socket.pendingReads.len() > 0):
# there is already pending unfinished read request, schedule this one for
# later
socket.pendingReads.addLast(ev.readReq)
socket.pendingReads.addLast(socketEvent.readReq)
else:
var readReq = ev.readReq
var readReq = socketEvent.readReq
let readResult = socket.onRead(readReq)
case readResult
of ReadNotFinished:
@ -1641,7 +1665,9 @@ proc eventLoop(socket: UtpSocket) {.async.} =
except CancelledError as exc:
for w in socket.pendingWrites.items():
if w.kind == Data and (not w.writer.finished()):
let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
let res = Result[int, WriteError].err(
WriteError(kind: SocketNotWriteable, currentState: socket.state)
)
w.writer.complete(res)
for r in socket.pendingReads.items():
# complete every reader with already read bytes

View File

@ -6,7 +6,7 @@ import
type AssertionCallback = proc(): bool {.gcsafe, raises: [Defect].}
let testBufferSize = 1024'u32
let testBufferSize* = 1024'u32
let defaultRcvOutgoingId = 314'u16
proc waitUntil*(f: AssertionCallback): Future[void] {.async.} =

View File

@ -1552,3 +1552,79 @@ procSuite "Utp socket unit test":
len(dp2.payload) == int(maxPayloadSize)
await outgoingSocket.destroyWait()
proc sendAndDrainPacketByPacket(
socket: UtpSocket[TransportAddress],
socketSendQueue: AsyncQueue[Packet],
initalRemoteSeq: uint16,
initialPacket: Packet,
bytes: seq[byte]): Future[seq[byte]] {.async.} =
var sentAndAckedBytes: seq[byte]
let numBytesToSend = len(bytes)
if numBytesToSend == 0:
return sentAndAckedBytes
let writeFut = socket.write(bytes)
while true:
if len(sentAndAckedBytes) == numBytesToSend:
check:
writeFut.finished()
return sentAndAckedBytes
let sentPacket = await socketSendQueue.get()
check:
sentPacket.header.pType == ST_DATA
let ackPacket = ackPacket(
initalRemoteSeq,
initialPacket.header.connectionId,
sentPacket.header.seqNr,
# remote buffer always allows for only one packet
uint32(socket.getPacketSize()),
0
)
await socket.processPacket(ackPacket)
# remote received and acked packet add it sent and received bytes
sentAndAckedBytes.add(sentPacket.payload)
asyncTest "Async block large write until there is space in snd buffer":
# remote is initialized with buffer to small to handle whole payload
let
sndBufferSize = 5000
dataToWrite = 2 * sndBufferSize
q = newAsyncQueue[Packet]()
initialRemoteSeq = 10'u16
customConfig = SocketConfig.init(
# small write buffer. Big write should async block, and process write
# as soon as buffer is freed by processing remote acks.
optSndBuffer = uint32(sndBufferSize)
)
(outgoingSocket, initialPacket) = connectOutGoingSocket(
initialRemoteSeq,
q,
testBufferSize,
customConfig
)
largeDataToWrite = rng[].generateBytes(dataToWrite)
# As we are sending data larger than send buffer socket.write will not finish
# immediately but will progreass with each packet acked by remote.
let bytesReceivedByRemote = await sendAndDrainPacketByPacket(
outgoingSocket,
q,
initialRemoteSeq,
initialPacket,
largeDataToWrite
)
check:
bytesReceivedByRemote == largeDataToWrite
await outgoingSocket.destroyWait()