Add more tests stressing conccurent reading and writing on utp socket (#474)

* Add more tests stressing concurrent reading and writing

* Fix bug when remote window dropped below packet size
This commit is contained in:
KonradStaniec 2022-02-10 08:05:44 +01:00 committed by GitHub
parent 05ef9a8e00
commit 779d767b02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 320 additions and 66 deletions

View File

@ -23,6 +23,8 @@ type
transport: DatagramTransport transport: DatagramTransport
utpRouter: UtpRouter[TransportAddress] utpRouter: UtpRouter[TransportAddress]
SendCallbackBuilder* = proc (d: DatagramTransport): SendCallback[TransportAddress] {.gcsafe, raises: [Defect].}
# This should probably be defined in TransportAddress module, as hash function should # This should probably be defined in TransportAddress module, as hash function should
# be consitent with equality function # be consitent with equality function
# in nim zero arrays always have hash equal to 0, irrespectively of array size, to # in nim zero arrays always have hash equal to 0, irrespectively of array size, to
@ -78,6 +80,7 @@ proc new*(
address: TransportAddress, address: TransportAddress,
socketConfig: SocketConfig = SocketConfig.init(), socketConfig: SocketConfig = SocketConfig.init(),
allowConnectionCb: AllowConnectionCallback[TransportAddress] = nil, allowConnectionCb: AllowConnectionCallback[TransportAddress] = nil,
sendCallbackBuilder: SendCallbackBuilder = nil,
rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} = rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} =
doAssert(not(isNil(acceptConnectionCb))) doAssert(not(isNil(acceptConnectionCb)))
@ -90,7 +93,12 @@ proc new*(
) )
let ta = newDatagramTransport(processDatagram, udata = router, local = address) let ta = newDatagramTransport(processDatagram, udata = router, local = address)
router.sendCb = initSendCallback(ta)
if (sendCallbackBuilder == nil):
router.sendCb = initSendCallback(ta)
else:
router.sendCb = sendCallbackBuilder(ta)
UtpProtocol(transport: ta, utpRouter: router) UtpProtocol(transport: ta, utpRouter: router)
proc shutdownWait*(p: UtpProtocol): Future[void] {.async.} = proc shutdownWait*(p: UtpProtocol): Future[void] {.async.} =

View File

@ -191,7 +191,8 @@ type
writeLoop: Future[void] writeLoop: Future[void]
zeroWindowTimer: Moment # timer which is started when peer max window drops below current packet size
zeroWindowTimer: Option[Moment]
# last measured delay between current local timestamp, and remote sent # last measured delay between current local timestamp, and remote sent
# timestamp. In microseconds # timestamp. In microseconds
@ -287,7 +288,8 @@ const
allowedAckWindow*: uint16 = 3 allowedAckWindow*: uint16 = 3
# Timeout after which the send window will be reset to its minimal value after it dropped # Timeout after which the send window will be reset to its minimal value after it dropped
# to zero. i.e when we received a packet from remote peer with `wndSize` set to 0. # lower than our current packet size. i.e when we received a packet
# from remote peer with `wndSize` set to number <= current packet size
defaultResetWindowTimeout = seconds(15) defaultResetWindowTimeout = seconds(15)
# If remote peer window drops to zero, then after some time we will reset it # If remote peer window drops to zero, then after some time we will reset it
@ -446,10 +448,15 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
await socket.flushPackets() await socket.flushPackets()
if socket.isOpened(): if socket.isOpened():
let currentPacketSize = uint32(socket.getPacketSize())
if (socket.zeroWindowTimer.isSome() and currentTime > socket.zeroWindowTimer.unsafeGet()):
if socket.sendBufferTracker.maxRemoteWindow <= currentPacketSize:
socket.sendBufferTracker.updateMaxRemote(minimalRemoteWindow)
socket.zeroWindowTimer = none[Moment]()
debug "Reset remote window to minimal value",
minRemote = minimalRemoteWindow
if (socket.sendBufferTracker.maxRemoteWindow == 0 and currentTime > socket.zeroWindowTimer):
debug "Reset remote window to minimal value"
socket.sendBufferTracker.updateMaxRemote(minimalRemoteWindow)
if (currentTime > socket.rtoTimeout): if (currentTime > socket.rtoTimeout):
debug "CheckTimeouts rto timeout", debug "CheckTimeouts rto timeout",
@ -487,7 +494,7 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
# on timeout reset duplicate ack counter # on timeout reset duplicate ack counter
socket.duplicateAck = 0 socket.duplicateAck = 0
let currentPacketSize = uint32(socket.getPacketSize())
if (socket.curWindowPackets == 0 and socket.sendBufferTracker.maxWindow > currentPacketSize): if (socket.curWindowPackets == 0 and socket.sendBufferTracker.maxWindow > currentPacketSize):
# there are no packets in flight even though there is place for more than whole packet # there are no packets in flight even though there is place for more than whole packet
@ -566,57 +573,59 @@ proc resetSendTimeout(socket: UtpSocket) =
socket.rtoTimeout = getMonoTimestamp().moment + socket.retransmitTimeout socket.rtoTimeout = getMonoTimestamp().moment + socket.retransmitTimeout
proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteResult]): Future[void] {.async.} = proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteResult]): Future[void] {.async.} =
if writeFut.finished(): if writeFut.finished():
# write future was cancelled befere we got chance to process it, short circuit # write future was cancelled befere we got chance to process it, short circuit
# processing and move to next loop iteration # processing and move to next loop iteration
return return
let pSize = socket.getPacketSize()
let endIndex = data.high()
var i = 0
var bytesWritten = 0
while i <= endIndex:
let lastIndex = i + pSize - 1
let lastOrEnd = min(lastIndex, endIndex)
let dataSlice = data[i..lastOrEnd]
let payloadLength = uint32(len(dataSlice))
try:
await socket.sendBufferTracker.reserveNBytesWait(payloadLength)
if socket.curWindowPackets == 0:
socket.resetSendTimeout()
let pSize = socket.getPacketSize()
let endIndex = data.high()
var i = 0
var bytesWritten = 0
let wndSize = socket.getRcvWindowSize() let wndSize = socket.getRcvWindowSize()
while i <= endIndex: let dataPacket =
let lastIndex = i + pSize - 1 dataPacket(
let lastOrEnd = min(lastIndex, endIndex) socket.seqNr,
let dataSlice = data[i..lastOrEnd] socket.connectionIdSnd,
let payloadLength = uint32(len(dataSlice)) socket.ackNr,
try: wndSize,
await socket.sendBufferTracker.reserveNBytesWait(payloadLength) dataSlice,
if socket.curWindowPackets == 0: socket.replayMicro
socket.resetSendTimeout() )
let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 1, false, payloadLength)
let dataPacket = socket.registerOutgoingPacket(outgoingPacket)
dataPacket( await socket.sendData(outgoingPacket.packetBytes)
socket.seqNr, except CancelledError as exc:
socket.connectionIdSnd, # write loop has been cancelled in the middle of processing due to the
socket.ackNr, # socket closing
wndSize, # this approach can create partial write in when destroying the socket in the
dataSlice, # the middle of the write
socket.replayMicro doAssert(socket.state == Destroy)
)
let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 1, false, payloadLength)
socket.registerOutgoingPacket(outgoingPacket)
await socket.sendData(outgoingPacket.packetBytes)
except CancelledError as exc:
# write loop has been cancelled in the middle of processing due to the
# socket closing
# this approach can create partial write in case destroyin socket in the
# the middle of the write
doAssert(socket.state == Destroy)
if (not writeFut.finished()):
let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
writeFut.complete(res)
# we need to re-raise exception so the outer loop will be properly cancelled too
raise exc
bytesWritten = bytesWritten + len(dataSlice)
i = lastOrEnd + 1
# Before completeing future with success (as all data was sent sucessfuly)
# we need to check if user did not cancel write on his end
if (not writeFut.finished()): if (not writeFut.finished()):
writeFut.complete(Result[int, WriteError].ok(bytesWritten)) let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
writeFut.complete(res)
# we need to re-raise exception so the outer loop will be properly cancelled too
raise exc
bytesWritten = bytesWritten + len(dataSlice)
i = lastOrEnd + 1
# Before completing the future with success (as all data was sent successfully)
# we need to check if user did not cancel write on his end
if (not writeFut.finished()):
writeFut.complete(Result[int, WriteError].ok(bytesWritten))
proc handleClose(socket: UtpSocket): Future[void] {.async.} = proc handleClose(socket: UtpSocket): Future[void] {.async.} =
try: try:
@ -706,7 +715,7 @@ proc new[A](
sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024, cfg.optSndBuffer, startMaxWindow), sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024, cfg.optSndBuffer, startMaxWindow),
# queue with infinite size # queue with infinite size
writeQueue: newAsyncQueue[WriteRequest](), writeQueue: newAsyncQueue[WriteRequest](),
zeroWindowTimer: currentTime + cfg.remoteWindowResetTimeout, zeroWindowTimer: none[Moment](),
socketKey: UtpSocketKey.init(to, rcvId), socketKey: UtpSocketKey.init(to, rcvId),
slowStart: true, slowStart: true,
fastTimeout: false, fastTimeout: false,
@ -1131,11 +1140,13 @@ proc generateAckPacket*(socket: UtpSocket): Packet =
else: else:
none[array[4, byte]]() none[array[4, byte]]()
let bufferSize = socket.getRcvWindowSize()
ackPacket( ackPacket(
socket.seqNr, socket.seqNr,
socket.connectionIdSnd, socket.connectionIdSnd,
socket.ackNr, socket.ackNr,
socket.getRcvWindowSize(), bufferSize,
socket.replayMicro, socket.replayMicro,
bitmask bitmask
) )
@ -1175,7 +1186,8 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
seqNr = p.header.seqNr, seqNr = p.header.seqNr,
ackNr = p.header.ackNr, ackNr = p.header.ackNr,
timestamp = p.header.timestamp, timestamp = p.header.timestamp,
timestampDiff = p.header.timestampDiff timestampDiff = p.header.timestampDiff,
remoteWindow = p.header.wndSize
let timestampInfo = getMonoTimestamp() let timestampInfo = getMonoTimestamp()
@ -1255,7 +1267,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
let isPossibleDuplicatedOldPacket = pastExpected >= (int(uint16.high) + 1) - reorderBufferMaxSize let isPossibleDuplicatedOldPacket = pastExpected >= (int(uint16.high) + 1) - reorderBufferMaxSize
if (isPossibleDuplicatedOldPacket and p.header.pType != ST_STATE): if (isPossibleDuplicatedOldPacket and p.header.pType != ST_STATE):
asyncSpawn socket.sendAck() discard socket.sendAck()
debug "Got an invalid packet sequence number, too far off", debug "Got an invalid packet sequence number, too far off",
pastExpected = pastExpected pastExpected = pastExpected
@ -1311,13 +1323,14 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
let diff = uint32((socket.ourHistogram.getValue() - minRtt).microseconds()) let diff = uint32((socket.ourHistogram.getValue() - minRtt).microseconds())
socket.ourHistogram.shift(diff) socket.ourHistogram.shift(diff)
let currentPacketSize = uint32(socket.getPacketSize())
let (newMaxWindow, newSlowStartTreshold, newSlowStart) = let (newMaxWindow, newSlowStartTreshold, newSlowStart) =
applyCongestionControl( applyCongestionControl(
socket.sendBufferTracker.maxWindow, socket.sendBufferTracker.maxWindow,
socket.slowStart, socket.slowStart,
socket.slowStartTreshold, socket.slowStartTreshold,
socket.socketConfig.optSndBuffer, socket.socketConfig.optSndBuffer,
uint32(socket.getPacketSize()), currentPacketSize,
microseconds(actualDelay), microseconds(actualDelay),
ackedBytes, ackedBytes,
minRtt, minRtt,
@ -1336,14 +1349,15 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
slowStartTreshold = newSlowStartTreshold, slowStartTreshold = newSlowStartTreshold,
slowstart = newSlowStart slowstart = newSlowStart
if (socket.sendBufferTracker.maxRemoteWindow == 0): if (socket.zeroWindowTimer.isNone() and socket.sendBufferTracker.maxRemoteWindow <= currentPacketSize):
# when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0 # when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0
# then it will be reset to minimal value # then it will be reset to minimal value
socket.zeroWindowTimer = timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout socket.zeroWindowTimer = some(timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout)
debug "Remote window size dropped to 0", debug "Remote window size dropped below packet size",
currentTime = timestampInfo.moment, currentTime = timestampInfo.moment,
resetZeroWindowTime = socket.zeroWindowTimer resetZeroWindowTime = socket.zeroWindowTimer,
currentPacketSize = currentPacketSize
# socket.curWindowPackets == acks means that this packet acked all remaining packets # socket.curWindowPackets == acks means that this packet acked all remaining packets
# including the sent fin packets # including the sent fin packets
@ -1488,7 +1502,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
# need improvement, as with this approach there is no direct control over # need improvement, as with this approach there is no direct control over
# how many concurrent tasks there are and how to cancel them when socket # how many concurrent tasks there are and how to cancel them when socket
# is closed # is closed
asyncSpawn socket.sendAck() discard socket.sendAck()
# we got packet out of order # we got packet out of order
else: else:
@ -1515,7 +1529,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
reorderCount = socket.reorderCount reorderCount = socket.reorderCount
# we send ack packet, as we reoreder count is > 0, so the eack bitmask will be # we send ack packet, as we reoreder count is > 0, so the eack bitmask will be
# generated # generated
asyncSpawn socket.sendAck() discard socket.sendAck()
of ST_STATE: of ST_STATE:
if (socket.state == SynSent and (not socket.connectionFuture.finished())): if (socket.state == SynSent and (not socket.connectionFuture.finished())):

View File

@ -15,4 +15,5 @@ import
./test_utp_socket, ./test_utp_socket,
./test_utp_socket_sack, ./test_utp_socket_sack,
./test_utp_router, ./test_utp_router,
./test_clock_drift_calculator ./test_clock_drift_calculator,
./test_protocol_integration

View File

@ -0,0 +1,231 @@
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
std/[sequtils, tables, options, sugar],
chronos, bearssl,
testutils/unittests,
./test_utils,
../../eth/utp/utp_router,
../../eth/utp/utp_protocol,
../../eth/keys,
../../eth/p2p/discoveryv5/random2
proc connectTillSuccess(p: UtpProtocol, to: TransportAddress, maxTries: int = 20): Future[UtpSocket[TransportAddress]] {.async.} =
var i = 0
while true:
let res = await p.connectTo(to)
if res.isOk():
return res.unsafeGet()
else:
inc i
if i >= maxTries:
raise newException(CatchableError, "Connection failed")
proc buildAcceptConnection(
t: ref Table[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]]
): AcceptConnectionCallback[TransportAddress] =
return (
proc (server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] =
let fut = newFuture[void]()
let key = client.socketKey
t[key] = client
fut.complete()
return fut
)
proc getServerSocket(
t: ref Table[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]],
clientAddress: TransportAddress,
clientConnectionId: uint16): Option[UtpSocket[TransportAddress]] =
let serverSocketKey = UtpSocketKey[TransportAddress](remoteAddress: clientAddress, rcvId: clientConnectionId + 1)
let srvSocket = t.getOrDefault(serverSocketKey)
if srvSocket == nil:
return none[UtpSocket[TransportAddress]]()
else:
return some(srvSocket)
procSuite "Utp protocol over udp tests with loss and delays":
let rng = newRng()
proc sendBuilder(maxDelay: int, packetDropRate: int): SendCallbackBuilder =
return (
proc (d: DatagramTransport): SendCallback[TransportAddress] =
return (
proc (to: TransportAddress, data: seq[byte]): Future[void] {.async.} =
let i = rand(rng[], 99)
if i >= packetDropRate:
let delay = milliseconds(rand(rng[], maxDelay))
await sleepAsync(delay)
await d.sendTo(to, data)
)
)
proc testScenario(maxDelay: int, dropRate: int, cfg: SocketConfig = SocketConfig.init()):
Future[(
UtpProtocol,
UtpSocket[TransportAddress],
UtpProtocol,
UtpSocket[TransportAddress])
] {.async.} =
var connections1 = newTable[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]]()
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt1 =
UtpProtocol.new(
buildAcceptConnection(connections1),
address1,
socketConfig = cfg,
sendCallbackBuilder = sendBuilder(maxDelay, dropRate),
rng = rng)
var connections2 = newTable[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]]()
let address2 = initTAddress("127.0.0.1", 9081)
let utpProt2 =
UtpProtocol.new(
buildAcceptConnection(connections2),
address2,
socketConfig = cfg,
sendCallbackBuilder = sendBuilder(maxDelay, dropRate),
rng = rng)
let clientSocket = await utpProt1.connectTillSuccess(address2)
let maybeServerSocket = connections2.getServerSocket(address1, clientSocket.socketKey.rcvId)
let serverSocket = maybeServerSocket.unsafeGet()
return (utpProt1, clientSocket, utpProt2, serverSocket)
type TestCase = object
# in miliseconds
maxDelay: int
dropRate: int
bytesToTransfer: int
bytesPerRead: int
cfg: SocketConfig
proc init(
T: type TestCase,
maxDelay: int,
dropRate: int,
bytesToTransfer: int,
cfg: SocketConfig = SocketConfig.init(),
bytesPerRead: int = 0): TestCase =
TestCase(maxDelay: maxDelay, dropRate: dropRate, bytesToTransfer: bytesToTransfer, cfg: cfg, bytesPerRead: bytesPerRead)
let testCases = @[
TestCase.init(45, 10, 40000),
TestCase.init(45, 15, 40000),
TestCase.init(50, 20, 20000),
# super small recv buffer which will be constantly on the brink of being full
TestCase.init(15, 5, 80000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5))),
TestCase.init(15, 10, 80000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)))
]
asyncTest "Write and Read large data in different network conditions":
for testCase in testCases:
let (
clientProtocol,
clientSocket,
serverProtocol,
serverSocket) = await testScenario(testCase.maxDelay, testCase.dropRate, testcase.cfg)
let smallBytes = 10
let smallBytesToTransfer = generateByteArray(rng[], smallBytes)
# first transfer and read to make server socket connecteced
let write1 = await clientSocket.write(smallBytesToTransfer)
let read1 = await serverSocket.read(smallBytes)
check:
write1.isOk()
read1 == smallBytesToTransfer
let numBytes = testCase.bytesToTransfer
let bytesToTransfer = generateByteArray(rng[], numBytes)
discard clientSocket.write(bytesToTransfer)
discard serverSocket.write(bytesToTransfer)
let serverReadFut = serverSocket.read(numBytes)
let clientReadFut = clientSocket.read(numBytes)
yield serverReadFut
yield clientReadFut
let clientRead = clientReadFut.read()
let serverRead = serverReadFut.read()
check:
clientRead == bytesToTransfer
serverRead == bytesToTransfer
await clientProtocol.shutdownWait()
await serverProtocol.shutdownWait()
let testCases1 = @[
# small buffers so it will fill up between reads
TestCase.init(15, 5, 60000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)), 10000),
TestCase.init(15, 10, 60000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)), 10000),
TestCase.init(15, 15, 60000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)), 10000)
]
proc readWithMultipleReads(s: UtpSocket[TransportAddress], numOfReads: int, bytesPerRead: int): Future[seq[byte]] {.async.}=
var i = 0
var res: seq[byte] = @[]
while i < numOfReads:
let bytes = await s.read(bytesPerRead)
res.add(bytes)
inc i
return res
asyncTest "Write and Read large data in different network conditions split over several reads":
for testCase in testCases1:
let (
clientProtocol,
clientSocket,
serverProtocol,
serverSocket) = await testScenario(testCase.maxDelay, testCase.dropRate, testcase.cfg)
let smallBytes = 10
let smallBytesToTransfer = generateByteArray(rng[], smallBytes)
# first transfer and read to make server socket connecteced
let write1 = await clientSocket.write(smallBytesToTransfer)
let read1 = await serverSocket.read(smallBytes)
check:
read1 == smallBytesToTransfer
let numBytes = testCase.bytesToTransfer
let bytesToTransfer = generateByteArray(rng[], numBytes)
discard clientSocket.write(bytesToTransfer)
discard serverSocket.write(bytesToTransfer)
let numOfReads = int(testCase.bytesToTransfer / testCase.bytesPerRead)
let serverReadFut = serverSocket.readWithMultipleReads(numOfReads, testCase.bytesPerRead)
let clientReadFut = clientSocket.readWithMultipleReads(numOfReads, testCase.bytesPerRead)
yield serverReadFut
yield clientReadFut
let clientRead = clientReadFut.read()
let serverRead = serverReadFut.read()
check:
clientRead == bytesToTransfer
serverRead == bytesToTransfer
await clientProtocol.shutdownWait()
await serverProtocol.shutdownWait()