Track send buffer and properly handle back pressoure when window is to small to process data (#437)

* Add separate datastructure to keep track of window

* Asynchronously block write until until new space in snd buffer

* Introduce write loop

* Properly handle write cancellation

* Proper handling of sending fin packet

* Reset remote window after configured amount of time
This commit is contained in:
KonradStaniec 2021-12-02 15:46:18 +01:00 committed by GitHub
parent 6e21b32f0d
commit 3c8915cae1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 512 additions and 95 deletions

View File

@ -0,0 +1,78 @@
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
chronos
# Internal Utp data structure to track send window and properly block when there is
# no free space when trying to send more bytes
type SendBufferTracker* = ref object
# number of payload bytes in-flight (i.e not counting header sizes)
# packets that have not yet been sent do not count, packets
# that are marked as needing to be re-sent (due to a timeout)
# don't count either
currentWindow*: uint32
# remote receive window updated based on packed wndSize field
maxRemoteWindow*: uint32
waiters: seq[(uint32, Future[void])]
proc new*(T: type SendBufferTracker, currentWindow: uint32, maxRemoteWindow: uint32): T =
return SendBufferTracker(currentWindow: currentWindow, maxRemoteWindow: maxRemoteWindow, waiters: @[])
proc currentFreeBytes*(t: SendBufferTracker): uint32 =
let maxSend = t.maxRemoteWindow
if (maxSend <= t.currentWindow):
return 0
else:
return maxSend - t.currentWindow
proc checkWaiters(t: SendBufferTracker) =
var i = 0
while i < len(t.waiters):
let freeSpace = t.currentFreeBytes()
let (required, fut) = t.waiters[i]
if (required <= freeSpace):
# in case future was cancelled
if (not fut.finished()):
t.currentWindow = t.currentWindow + required
fut.complete()
t.waiters.del(i)
else:
# we do not have place for next waiter, just finish processing
return
proc updateMaxRemote*(t: SendBufferTracker, newRemoteWindow: uint32) =
t.maxRemoteWindow = newRemoteWindow
t.checkWaiters()
proc decreaseCurrentWindow*(t: SendBufferTracker, value: uint32, notifyWaiters: bool) =
doAssert(t.currentWindow >= value)
t.currentWindow = t.currentWindow - value
if (notifyWaiters):
t.checkWaiters()
proc reserveNBytesWait*(t: SendBufferTracker, n: uint32): Future[void] =
let fut = newFuture[void]("SendBufferTracker.reserveNBytesWait")
let free = t.currentFreeBytes()
if (n <= free):
t.currentWindow = t.currentWindow + n
fut.complete()
else:
t.waiters.add((n, fut))
fut
proc reserveNBytes*(t: SendBufferTracker, n: uint32): bool =
let free = t.currentFreeBytes()
if (n <= free):
t.currentWindow = t.currentWindow + n
return true
else:
return false
proc currentBytesInFlight*(t: SendBufferTracker): uint32 = t.currentWindow

View File

@ -10,6 +10,7 @@ import
std/sugar, std/sugar,
chronos, chronicles, bearssl, chronos, chronicles, bearssl,
stew/results, stew/results,
./send_buffer_tracker,
./growable_buffer, ./growable_buffer,
./packets ./packets
@ -21,8 +22,6 @@ type
SynSent, SynSent,
SynRecv, SynRecv,
Connected, Connected,
ConnectedFull,
Reset,
Destroy Destroy
ConnectionDirection = enum ConnectionDirection = enum
@ -65,6 +64,34 @@ type
# state and will be able to transfer data. # state and will be able to transfer data.
incomingSocketReceiveTimeout*: Option[Duration] incomingSocketReceiveTimeout*: Option[Duration]
# Timeout after which the send window will be reset to its minimal value after it dropped
# to zero. i.e when we received a packet from remote peer with `wndSize` set to 0.
remoteWindowResetTimeout*: Duration
WriteErrorType* = enum
SocketNotWriteable,
FinSent
WriteError* = object
case kind*: WriteErrorType
of SocketNotWriteable:
currentState*: ConnectionState
of FinSent:
discard
WriteResult* = Result[int, WriteError]
WriteRequestType = enum
Data, Close
WriteRequest = object
case kind: WriteRequestType
of Data:
data: seq[byte]
writer: Future[WriteResult]
of Close:
discard
UtpSocket*[A] = ref object UtpSocket*[A] = ref object
remoteAddress*: A remoteAddress*: A
state: ConnectionState state: ConnectionState
@ -133,6 +160,9 @@ type
# we sent out fin packet # we sent out fin packet
finSent: bool finSent: bool
# we requested to close the socket by sending fin packet
sendFinRequested: bool
# have our fin been acked # have our fin been acked
finAcked: bool finAcked: bool
@ -145,11 +175,13 @@ type
# sequence number of remoted fin packet # sequence number of remoted fin packet
eofPktNr: uint16 eofPktNr: uint16
# number payload bytes in-flight (i.e not countig header sizes) sendBufferTracker: SendBufferTracker
# packets that have not yet been sent do not count, packets
# that are marked as needing to be re-sent (due to a timeout) writeQueue: AsyncQueue[WriteRequest]
# don't count either
currentWindow: uint32 writeLoop: Future[void]
zeroWindowTimer: Moment
# socket identifier # socket identifier
socketKey*: UtpSocketKey[A] socketKey*: UtpSocketKey[A]
@ -162,19 +194,6 @@ type
ConnectionError* = object of CatchableError ConnectionError* = object of CatchableError
WriteErrorType* = enum
SocketNotWriteable,
FinSent
WriteError* = object
case kind*: WriteErrorType
of SocketNotWriteable:
currentState*: ConnectionState
of FinSent:
discard
WriteResult* = Result[int, WriteError]
OutgoingConnectionErrorType* = enum OutgoingConnectionErrorType* = enum
SocketAlreadyExists, ConnectionTimedOut, ErrorWhileSendingSyn SocketAlreadyExists, ConnectionTimedOut, ErrorWhileSendingSyn
@ -224,6 +243,15 @@ const
# considered suspicious and ignored # considered suspicious and ignored
allowedAckWindow*: uint16 = 3 allowedAckWindow*: uint16 = 3
# Timeout after which the send window will be reset to its minimal value after it dropped
# to zero. i.e when we received a packet from remote peer with `wndSize` set to 0.
defaultResetWindowTimeout = seconds(15)
# If remote peer window drops to zero, then after some time we will reset it
# to this value even if we do not receive any more messages from remote peers.
# Reset period is configured in `SocketConfig`
minimalRemoteWindow: uint32 = 1500
reorderBufferMaxSize = 1024 reorderBufferMaxSize = 1024
proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T =
@ -249,13 +277,15 @@ proc init*(
initialSynTimeout: Duration = defaultInitialSynTimeout, initialSynTimeout: Duration = defaultInitialSynTimeout,
dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure, dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure,
optRcvBuffer: uint32 = defaultOptRcvBuffer, optRcvBuffer: uint32 = defaultOptRcvBuffer,
incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout) incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout),
remoteWindowResetTimeout: Duration = defaultResetWindowTimeout
): T = ): T =
SocketConfig( SocketConfig(
initialSynTimeout: initialSynTimeout, initialSynTimeout: initialSynTimeout,
dataResendsBeforeFailure: dataResendsBeforeFailure, dataResendsBeforeFailure: dataResendsBeforeFailure,
optRcvBuffer: optRcvBuffer, optRcvBuffer: optRcvBuffer,
incomingSocketReceiveTimeout: incomingSocketReceiveTimeout incomingSocketReceiveTimeout: incomingSocketReceiveTimeout,
remoteWindowResetTimeout: remoteWindowResetTimeout
) )
proc getRcvWindowSize(socket: UtpSocket): uint32 = proc getRcvWindowSize(socket: UtpSocket): uint32 =
@ -290,10 +320,6 @@ proc sendAck(socket: UtpSocket): Future[void] =
# Should be called before sending packet # Should be called before sending packet
proc setSend(s: UtpSocket, p: var OutgoingPacket): seq[byte] = proc setSend(s: UtpSocket, p: var OutgoingPacket): seq[byte] =
if (p.transmissions == 0 or p.needResend):
s.currentWindow = s.currentWindow + p.payloadLength
inc p.transmissions inc p.transmissions
p.needResend = false p.needResend = false
p.timeSent = Moment.now() p.timeSent = Moment.now()
@ -305,8 +331,12 @@ proc flushPackets(socket: UtpSocket) {.async.} =
# sending only packet which were not transmitted yet or need a resend # sending only packet which were not transmitted yet or need a resend
let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true)) let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true))
if (shouldSendPacket): if (shouldSendPacket):
let toSend = socket.setSend(socket.outBuffer[i]) if socket.sendBufferTracker.reserveNBytes(socket.outBuffer[i].payloadLength):
await socket.sendData(toSend) let toSend = socket.setSend(socket.outBuffer[i])
await socket.sendData(toSend)
else:
# there is no place in send buffer, stop flushing
return
inc i inc i
proc markAllPacketAsLost(s: UtpSocket) = proc markAllPacketAsLost(s: UtpSocket) =
@ -314,11 +344,13 @@ proc markAllPacketAsLost(s: UtpSocket) =
while i < s.curWindowPackets: while i < s.curWindowPackets:
let packetSeqNr = s.seqNr - 1 - i let packetSeqNr = s.seqNr - 1 - i
if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p. transmissions > 0 and p.needResend == false)): if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p.transmissions > 0 and p.needResend == false)):
s.outBuffer[packetSeqNr].needResend = true s.outBuffer[packetSeqNr].needResend = true
let packetPayloadLength = s.outBuffer[packetSeqNr].payloadLength let packetPayloadLength = s.outBuffer[packetSeqNr].payloadLength
doAssert(s.currentWindow >= packetPayloadLength) # lack of waiters notification in case of timeout effectivly means that
s.currentWindow = s.currentWindow - packetPayloadLength # we do not allow any new bytes to enter snd buffer in case of new free space
# due to timeout.
s.sendBufferTracker.decreaseCurrentWindow(packetPayloadLength, notifyWaiters = false)
inc i inc i
@ -326,8 +358,7 @@ proc isOpened(socket:UtpSocket): bool =
return ( return (
socket.state == SynRecv or socket.state == SynRecv or
socket.state == SynSent or socket.state == SynSent or
socket.state == Connected or socket.state == Connected
socket.state == ConnectedFull
) )
proc shouldDisconnectFromFailedRemote(socket: UtpSocket): bool = proc shouldDisconnectFromFailedRemote(socket: UtpSocket): bool =
@ -341,6 +372,11 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
await socket.flushPackets() await socket.flushPackets()
if socket.isOpened(): if socket.isOpened():
if (socket.sendBufferTracker.maxRemoteWindow == 0 and currentTime > socket.zeroWindowTimer):
debug "Reset remote window to minimal value"
socket.sendBufferTracker.updateMaxRemote(minimalRemoteWindow)
if (currentTime > socket.rtoTimeout): if (currentTime > socket.rtoTimeout):
# TODO add handling of probe time outs. Reference implemenation has mechanism # TODO add handling of probe time outs. Reference implemenation has mechanism
@ -382,8 +418,10 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
socket.outBuffer.get(oldestPacketSeqNr).isSome(), socket.outBuffer.get(oldestPacketSeqNr).isSome(),
"oldest packet should always be available when there is data in flight" "oldest packet should always be available when there is data in flight"
) )
let dataToSend = socket.setSend(socket.outBuffer[oldestPacketSeqNr]) let payloadLength = socket.outBuffer[oldestPacketSeqNr].payloadLength
await socket.sendData(dataToSend) if (socket.sendBufferTracker.reserveNBytes(payloadLength)):
let dataToSend = socket.setSend(socket.outBuffer[oldestPacketSeqNr])
await socket.sendData(dataToSend)
# TODO add sending keep alives when necessary # TODO add sending keep alives when necessary
@ -399,6 +437,94 @@ proc checkTimeoutsLoop(s: UtpSocket) {.async.} =
proc startTimeoutLoop(s: UtpSocket) = proc startTimeoutLoop(s: UtpSocket) =
s.checkTimeoutsLoop = checkTimeoutsLoop(s) s.checkTimeoutsLoop = checkTimeoutsLoop(s)
proc getPacketSize*(socket: UtpSocket): int =
# TODO currently returning constant, ultimatly it should be bases on mtu estimates
mtuSize
proc resetSendTimeout(socket: UtpSocket) =
socket.retransmitTimeout = socket.rto
socket.rtoTimeout = Moment.now() + socket.retransmitTimeout
proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteResult]): Future[void] {.async.} =
if writeFut.finished():
# write future was cancelled befere we got chance to process it, short circuit
# processing and move to next loop iteration
return
let pSize = socket.getPacketSize()
let endIndex = data.high()
var i = 0
var bytesWritten = 0
let wndSize = socket.getRcvWindowSize()
while i <= endIndex:
let lastIndex = i + pSize - 1
let lastOrEnd = min(lastIndex, endIndex)
let dataSlice = data[i..lastOrEnd]
let payloadLength = uint32(len(dataSlice))
try:
await socket.sendBufferTracker.reserveNBytesWait(payloadLength)
if socket.curWindowPackets == 0:
socket.resetSendTimeout()
let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, wndSize, dataSlice)
let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 1, false, payloadLength)
socket.registerOutgoingPacket(outgoingPacket)
await socket.sendData(outgoingPacket.packetBytes)
except CancelledError as exc:
# write loop has been cancelled in the middle of processing due to the
# socket closing
# this approach can create partial write in case destroyin socket in the
# the middle of the write
doAssert(socket.state == Destroy)
if (not writeFut.finished()):
let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
writeFut.complete(res)
# we need to re-raise exception so the outer loop will be properly cancelled too
raise exc
bytesWritten = bytesWritten + len(dataSlice)
i = lastOrEnd + 1
# Before completeing future with success (as all data was sent sucessfuly)
# we need to check if user did not cancel write on his end
if (not writeFut.finished()):
writeFut.complete(Result[int, WriteError].ok(bytesWritten))
proc handleClose(socket: UtpSocket): Future[void] {.async.} =
try:
if socket.curWindowPackets == 0:
socket.resetSendTimeout()
let finEncoded = encodePacket(finPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, socket.getRcvWindowSize()))
socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true, 0))
await socket.sendData(finEncoded)
socket.finSent = true
except CancelledError as exc:
raise exc
proc writeLoop(socket: UtpSocket): Future[void] {.async.} =
## Loop that processes writes on socket
try:
while true:
let req = await socket.writeQueue.get()
case req.kind
of Data:
await socket.handleDataWrite(req.data, req.writer)
of Close:
await socket.handleClose()
except CancelledError:
doAssert(socket.state == Destroy)
for req in socket.writeQueue.items:
if (req.kind == Data and not req.writer.finished()):
let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
req.writer.complete(res)
socket.writeQueue.clear()
trace "writeLoop canceled"
proc startWriteLoop(s: UtpSocket) =
s.writeLoop = writeLoop(s)
proc new[A]( proc new[A](
T: type UtpSocket[A], T: type UtpSocket[A],
to: A, to: A,
@ -433,6 +559,11 @@ proc new[A](
buffer: AsyncBuffer.init(int(cfg.optRcvBuffer)), buffer: AsyncBuffer.init(int(cfg.optRcvBuffer)),
closeEvent: newAsyncEvent(), closeEvent: newAsyncEvent(),
closeCallbacks: newSeq[Future[void]](), closeCallbacks: newSeq[Future[void]](),
# start with 1mb assumption, field will be updated with first received packet
sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024),
# queue with infinite size
writeQueue: newAsyncQueue[WriteRequest](),
zeroWindowTimer: Moment.now() + cfg.remoteWindowResetTimeout,
socketKey: UtpSocketKey.init(to, rcvId), socketKey: UtpSocketKey.init(to, rcvId),
send: snd send: snd
) )
@ -502,6 +633,7 @@ proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} =
# initiliazation # initiliazation
let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false, 0) let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false, 0)
socket.registerOutgoingPacket(outgoingPacket) socket.registerOutgoingPacket(outgoingPacket)
socket.startWriteLoop()
socket.startTimeoutLoop() socket.startTimeoutLoop()
await socket.sendData(outgoingPacket.packetBytes) await socket.sendData(outgoingPacket.packetBytes)
await socket.connectionFuture await socket.connectionFuture
@ -509,10 +641,11 @@ proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} =
proc startIncomingSocket*(socket: UtpSocket) {.async.} = proc startIncomingSocket*(socket: UtpSocket) {.async.} =
# Make sure ack was flushed before moving forward # Make sure ack was flushed before moving forward
await socket.sendAck() await socket.sendAck()
socket.startWriteLoop()
socket.startTimeoutLoop() socket.startTimeoutLoop()
proc isConnected*(socket: UtpSocket): bool = proc isConnected*(socket: UtpSocket): bool =
socket.state == Connected or socket.state == ConnectedFull socket.state == Connected
proc isClosed*(socket: UtpSocket): bool = proc isClosed*(socket: UtpSocket): bool =
socket.state == Destroy and socket.closeEvent.isSet() socket.state == Destroy and socket.closeEvent.isSet()
@ -521,6 +654,7 @@ proc destroy*(s: UtpSocket) =
## Moves socket to destroy state and clean all reasources. ## Moves socket to destroy state and clean all reasources.
## Remote is not notified in any way about socket end of life ## Remote is not notified in any way about socket end of life
s.state = Destroy s.state = Destroy
s.writeLoop.cancel()
s.checkTimeoutsLoop.cancel() s.checkTimeoutsLoop.cancel()
s.closeEvent.fire() s.closeEvent.fire()
@ -606,8 +740,7 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult =
# been considered timed-out, and is not included in # been considered timed-out, and is not included in
# the cur_window anymore # the cur_window anymore
if (not packet.needResend): if (not packet.needResend):
doAssert(socket.currentWindow >= packet.payloadLength) socket.sendBufferTracker.decreaseCurrentWindow(packet.payloadLength, notifyWaiters = true)
socket.currentWindow = socket.currentWindow - packet.payloadLength
socket.retransmitCount = 0 socket.retransmitCount = 0
PacketAcked PacketAcked
@ -696,6 +829,15 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
notice "Received packet is totally of the mark" notice "Received packet is totally of the mark"
return return
# update remote window size
socket.sendBufferTracker.updateMaxRemote(p.header.wndSize)
if (socket.sendBufferTracker.maxRemoteWindow == 0):
# when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0
# then it will be reset to minimal value
socket.zeroWindowTimer = Moment.now() + socket.socketConfig.remoteWindowResetTimeout
# socket.curWindowPackets == acks means that this packet acked all remaining packets # socket.curWindowPackets == acks means that this packet acked all remaining packets
# including the sent fin packets # including the sent fin packets
if (socket.finSent and socket.curWindowPackets == acks): if (socket.finSent and socket.curWindowPackets == acks):
@ -821,29 +963,24 @@ proc atEof*(socket: UtpSocket): bool =
proc readingClosed(socket: UtpSocket): bool = proc readingClosed(socket: UtpSocket): bool =
socket.atEof() or socket.state == Destroy socket.atEof() or socket.state == Destroy
proc getPacketSize(socket: UtpSocket): int = proc close*(socket: UtpSocket) =
# TODO currently returning constant, ultimatly it should be bases on mtu estimates
mtuSize
proc resetSendTimeout(socket: UtpSocket) =
socket.retransmitTimeout = socket.rto
socket.rtoTimeout = Moment.now() + socket.retransmitTimeout
proc close*(socket: UtpSocket) {.async.} =
## Gracefully closes conneciton (send FIN) if socket is in connected state ## Gracefully closes conneciton (send FIN) if socket is in connected state
## does not wait for socket to close ## does not wait for socket to close
if socket.state != Destroy: if socket.state != Destroy:
case socket.state case socket.state
of Connected, ConnectedFull: of Connected:
socket.readShutdown = true socket.readShutdown = true
if (not socket.finSent): if (not socket.sendFinRequested):
if socket.curWindowPackets == 0: try:
socket.resetSendTimeout() # with this approach, all pending writes will be executed before sending fin packet
# we could also and method which places close request as first one to process
let finEncoded = encodePacket(finPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, socket.getRcvWindowSize())) # but it would complicate the write loop
socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true, 0)) socket.writeQueue.putNoWait(WriteRequest(kind: Close))
socket.finSent = true except AsyncQueueFullError as e:
await socket.sendData(finEncoded) # should not happen as our write queue is unbounded
raiseAssert e.msg
socket.sendFinRequested = true
else: else:
# In any other case like connection is not established so sending fin make # In any other case like connection is not established so sending fin make
# no sense, we can just out right close it # no sense, we can just out right close it
@ -855,46 +992,38 @@ proc closeWait*(socket: UtpSocket) {.async.} =
## Warning: if FIN packet for some reason will be lost, then socket will be closed ## Warning: if FIN packet for some reason will be lost, then socket will be closed
## due to retransmission failure which may take some time. ## due to retransmission failure which may take some time.
## default is 4 retransmissions with doubling of rto between each retranssmision ## default is 4 retransmissions with doubling of rto between each retranssmision
await socket.close() socket.close()
await socket.closeEvent.wait() await socket.closeEvent.wait()
proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] {.async.} = proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] =
let retFuture = newFuture[WriteResult]("UtpSocket.write")
if (socket.state != Connected): if (socket.state != Connected):
return err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
retFuture.complete(res)
return retFuture
# fin should be last packet received by remote side, therefore trying to write # fin should be last packet received by remote side, therefore trying to write
# after sending fin is considered error # after sending fin is considered error
if socket.finSent: if socket.sendFinRequested or socket.finSent:
return err(WriteError(kind: FinSent)) let res = Result[int, WriteError].err(WriteError(kind: FinSent))
retFuture.complete(res)
return retFuture
var bytesWritten = 0 var bytesWritten = 0
# TODO
# Handle growing of send window
if len(data) == 0: if len(data) == 0:
return ok(bytesWritten) let res = Result[int, WriteError].ok(bytesWritten)
retFuture.complete(res)
return retFuture
try:
socket.writeQueue.putNoWait(WriteRequest(kind: Data, data: data, writer: retFuture))
except AsyncQueueFullError as e:
# this should not happen as out write queue is unbounded
raiseAssert e.msg
if socket.curWindowPackets == 0: return retFuture
socket.resetSendTimeout()
let pSize = socket.getPacketSize()
let endIndex = data.high()
var i = 0
let wndSize = socket.getRcvWindowSize()
while i <= data.high:
let lastIndex = i + pSize - 1
let lastOrEnd = min(lastIndex, endIndex)
let dataSlice = data[i..lastOrEnd]
let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, wndSize, dataSlice)
let payloadLength = uint32(len(dataSlice))
socket.registerOutgoingPacket(OutgoingPacket.init(encodePacket(dataPacket), 0, false, payloadLength))
bytesWritten = bytesWritten + len(dataSlice)
i = lastOrEnd + 1
await socket.flushPackets()
return ok(bytesWritten)
template readLoop(body: untyped): untyped = template readLoop(body: untyped): untyped =
while true: while true:
@ -953,7 +1082,7 @@ proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int =
num num
# Check how many payload bytes are still in flight # Check how many payload bytes are still in flight
proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.currentWindow proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.sendBufferTracker.currentBytesInFlight()
# Check how many packets are still in the reorder buffer, usefull for tests or # Check how many packets are still in the reorder buffer, usefull for tests or
# debugging. # debugging.

View File

@ -61,6 +61,7 @@ procSuite "Utp socket unit test":
template connectOutGoingSocket( template connectOutGoingSocket(
initialRemoteSeq: uint16, initialRemoteSeq: uint16,
q: AsyncQueue[Packet], q: AsyncQueue[Packet],
remoteReceiveBuffer: uint32 = testBufferSize,
cfg: SocketConfig = SocketConfig.init()): (UtpSocket[TransportAddress], Packet) = cfg: SocketConfig = SocketConfig.init()): (UtpSocket[TransportAddress], Packet) =
let sock1 = newOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), cfg, defaultRcvOutgoingId, rng[]) let sock1 = newOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), cfg, defaultRcvOutgoingId, rng[])
asyncSpawn sock1.startOutgoingSocket() asyncSpawn sock1.startOutgoingSocket()
@ -69,7 +70,7 @@ procSuite "Utp socket unit test":
check: check:
initialPacket.header.pType == ST_SYN initialPacket.header.pType == ST_SYN
let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, remoteReceiveBuffer)
await sock1.processPacket(responseAck) await sock1.processPacket(responseAck)
@ -313,6 +314,80 @@ procSuite "Utp socket unit test":
await outgoingSocket.destroyWait() await outgoingSocket.destroyWait()
asyncTest "Blocked writing futures should be properly finished when socket is closed":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let dataToWrite1 = @[0'u8]
let dataToWrite2 = @[1'u8]
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, 0)
let writeFut1 = outgoingSocket.write(dataToWrite1)
let writeFut2 = outgoingSocket.write(dataToWrite2)
# wait a little to show that futures are not progressing
await sleepAsync(seconds(1))
check:
not writeFut1.finished()
not writeFut2.finished()
outgoingSocket.destroy()
yield writeFut1
yield writeFut2
check:
writeFut1.completed()
writeFut2.completed()
writeFut1.read().isErr()
writeFut2.read().isErr()
await outgoingSocket.destroyWait()
asyncTest "Cancelled write futures should not be processed if cancelled before processing":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let dataToWrite1 = @[0'u8]
let dataToWrite2 = @[1'u8]
let dataToWrite3 = @[2'u8]
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, 0)
# only writeFut1 will progress as to processing stage, writeFut2 and writeFut3
# will be blocked in queue
let writeFut1 = outgoingSocket.write(dataToWrite1)
let writeFut2 = outgoingSocket.write(dataToWrite2)
let writeFut3 = outgoingSocket.write(dataToWrite3)
# user decided to cancel second write
await writeFut2.cancelAndWait()
# remote increased wnd size enough for all writes
let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, 10)
await outgoingSocket.processPacket(someAckFromRemote)
yield writeFut1
yield writeFut2
yield writeFut3
check:
writeFut1.completed()
writeFut2.cancelled()
writeFut3.completed()
let p1 = await q.get()
let p2 = await q.get
check:
# we produce only two data packets as write with dataToWrite2 was cancelled
p1.payload == dataToWrite1
p2.payload == dataToWrite3
await outgoingSocket.destroyWait()
asyncTest "Socket should re-send data packet configurable number of times before declaring failure": asyncTest "Socket should re-send data packet configurable number of times before declaring failure":
let q = newAsyncQueue[Packet]() let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16 let initalRemoteSeqNr = 10'u16
@ -486,7 +561,7 @@ procSuite "Utp socket unit test":
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
await outgoingSocket.close() outgoingSocket.close()
let sendFin = await q.get() let sendFin = await q.get()
@ -501,7 +576,7 @@ procSuite "Utp socket unit test":
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let closeF = outgoingSocket.close() outgoingSocket.close()
let sendFin = await q.get() let sendFin = await q.get()
@ -512,8 +587,6 @@ procSuite "Utp socket unit test":
await outgoingSocket.processPacket(responseAck) await outgoingSocket.processPacket(responseAck)
await closeF
check: check:
not outgoingSocket.isConnected() not outgoingSocket.isConnected()
@ -544,7 +617,7 @@ procSuite "Utp socket unit test":
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
await outgoingSocket.close() outgoingSocket.close()
let writeResult = await outgoingSocket.write(@[1'u8]) let writeResult = await outgoingSocket.write(@[1'u8])
@ -564,7 +637,7 @@ procSuite "Utp socket unit test":
let initialRcvBufferSize = 10'u32 let initialRcvBufferSize = 10'u32
let data = @[1'u8, 2'u8, 3'u8] let data = @[1'u8, 2'u8, 3'u8]
let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize) let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize)
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q, sCfg) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q, testBufferSize, sCfg)
let dataP1 = dataPacket(initialRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) let dataP1 = dataPacket(initialRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data)
@ -585,7 +658,7 @@ procSuite "Utp socket unit test":
sentData.header.pType == ST_DATA sentData.header.pType == ST_DATA
sentData.header.wndSize == initialRcvBufferSize - uint32(len(data)) sentData.header.wndSize == initialRcvBufferSize - uint32(len(data))
await outgoingSocket.close() outgoingSocket.close()
let sentFin = await q.get() let sentFin = await q.get()
@ -601,7 +674,7 @@ procSuite "Utp socket unit test":
let initialRcvBufferSize = 10'u32 let initialRcvBufferSize = 10'u32
let data = @[1'u8, 2'u8, 3'u8] let data = @[1'u8, 2'u8, 3'u8]
let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize) let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize)
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, sCfg) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, testBufferSize, sCfg)
let dataP1 = dataPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) let dataP1 = dataPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data)
@ -746,3 +819,140 @@ procSuite "Utp socket unit test":
int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite) int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite)
await outgoingSocket.destroyWait() await outgoingSocket.destroyWait()
asyncTest "Writing data should asynchronously block until there is enough space in snd buffer":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let dataToWrite = @[1'u8, 2, 3, 4, 5]
# remote is initialized with buffer to small to handle whole payload
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, uint32(len(dataToWrite) - 1))
let writeFut = outgoingSocket.write(dataToWrite)
# wait some time to check future is not finished
await sleepAsync(seconds(2))
# write is not finished as future is blocked from progressing due to to small
# send window
check:
not writeFut.finished()
let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, uint32(len(dataToWrite)))
await outgoingSocket.processPacket(someAckFromRemote)
# after processing packet with increased buffer size write should complete and
# packet should be sent
let sentPacket = await q.get()
check:
sentPacket.payload == dataToWrite
writeFut.finished()
await outgoingSocket.destroyWait()
asyncTest "Writing data should not progress in case of timeouting packets and small snd window":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let dataToWrite = @[1'u8, 2, 3, 4, 5]
# remote is initialized with buffer to small to handle whole payload
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize())
let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, remoteRcvWindowSize)
# we are using ack from remote to setup our snd window size to one packet size on one packet
await outgoingSocket.processPacket(someAckFromRemote)
let twoPacketData = generateByteArray(rng[], int(2 * remoteRcvWindowSize))
let writeFut = outgoingSocket.write(twoPacketData)
# after this time first packet will be send and will timeout, but the write should not
# finish, as timeouting packets do not notify writing about new space in snd
# buffer
await sleepAsync(seconds(2))
check:
not writeFut.finished()
await outgoingSocket.destroyWait()
asyncTest "Writing data should respect remote rcv window size":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let dataToWrite = @[1'u8, 2, 3, 4, 5]
# remote is initialized with buffer to small to handle whole payload
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize())
let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, remoteRcvWindowSize)
# we are using ack from remote to setup our snd window size to one packet size on one packet
await outgoingSocket.processPacket(someAckFromRemote)
let twoPacketData = generateByteArray(rng[], int(2 * remoteRcvWindowSize))
let writeFut = outgoingSocket.write(twoPacketData)
let firstAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr + 1, remoteRcvWindowSize)
let packet = await q.get()
check:
packet.header.pType == ST_DATA
uint32(len(packet.payload)) == remoteRcvWindowSize
not writeFut.finished
await outgoingSocket.processPacket(firstAckFromRemote)
let packet1 = await q.get()
let writeResult = await writeFut
check:
packet1.header.pType == ST_DATA
writeFut.finished
await outgoingSocket.destroyWait()
asyncTest "Remote window should be reseted to minimal value after configured amount of time":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let someData = @[1'u8]
let (outgoingSocket, packet) =
connectOutGoingSocket(
initialRemoteSeq,
q,
remoteReceiveBuffer = 0,
cfg = SocketConfig.init(
remoteWindowResetTimeout = seconds(3)
)
)
check:
outgoingSocket.isConnected()
let writeFut = outgoingSocket.write(someData)
await sleepAsync(seconds(1))
check:
# Even after 1 second write is not finished as we did not receive any message
# so remote rcv window is still zero
not writeFut.finished()
# Ultimately, after 3 second remote rcv window will be reseted to minimal value
# and write will be able to progress
let writeResult = await writeFut
let p = await q.get()
check:
writeResult.isOk()
p.payload == someData
await outgoingSocket.destroyWait()