Add event loop to socket (#475)

- add eventLoop to control all incoming events
- change semantic of write to asynchronously block only when send buffer is full, and not when bytes do not fit into send window
- change handling of receive buffer, to start dropping packets if the reorder buffer and receive buffer are full. Old behaviour was to async block unless there is space which could lead to resource exhaustion attacks
This commit is contained in:
KonradStaniec 2022-02-24 18:22:44 +01:00 committed by GitHub
parent f947827c70
commit 8ef6b13b1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 637 additions and 557 deletions

View File

@ -1,107 +0,0 @@
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
chronos
# Internal Utp data structure to track send window and properly block when there is
# no free space when trying to send more bytes
type SendBufferTracker* = ref object
# number of payload bytes in-flight (i.e not counting header sizes)
# packets that have not yet been sent do not count, packets
# that are marked as needing to be re-sent (due to a timeout)
# don't count either
currentWindow*: uint32
# remote receive window updated based on packed wndSize field
maxRemoteWindow*: uint32
# maximum window size, in bytes, calculated by local congestion controller
maxWindow*: uint32
# configuration option for maxium number of bytes in snd buffer
maxSndBufferSize*: uint32
waiters: seq[(uint32, Future[void])]
proc new*(
T: type SendBufferTracker,
currentWindow: uint32,
maxRemoteWindow: uint32,
maxSndBufferSize: uint32,
maxWindow: uint32): T =
return (
SendBufferTracker(
currentWindow: currentWindow,
maxRemoteWindow: maxRemoteWindow,
maxSndBufferSize: maxSndBufferSize,
maxWindow: maxWindow,
waiters: @[]
)
)
proc currentFreeBytes*(t: SendBufferTracker): uint32 =
let maxSend = min(min(t.maxRemoteWindow, t.maxSndBufferSize), t.maxWindow)
if (maxSend <= t.currentWindow):
return 0
else:
return maxSend - t.currentWindow
proc notifyWaiters*(t: SendBufferTracker) =
var i = 0
while i < len(t.waiters):
let freeSpace = t.currentFreeBytes()
let (required, fut) = t.waiters[i]
if (required <= freeSpace):
# in case future was cancelled
if (not fut.finished()):
t.currentWindow = t.currentWindow + required
fut.complete()
t.waiters.del(i)
else:
# we do not have place for next waiter, just finish processing
return
proc updateMaxRemote*(t: SendBufferTracker, newRemoteWindow: uint32) =
t.maxRemoteWindow = newRemoteWindow
t.notifyWaiters()
proc updateMaxWindow*(t: SendBufferTracker, maxWindow: uint32) =
t.maxWindow = maxWindow
t.notifyWaiters()
proc updateMaxWindowSize*(t: SendBufferTracker, newRemoteWindow: uint32, maxWindow: uint32) =
t.maxRemoteWindow = newRemoteWindow
t.maxWindow = maxWindow
t.notifyWaiters()
proc decreaseCurrentWindow*(t: SendBufferTracker, value: uint32) =
doAssert(t.currentWindow >= value)
t.currentWindow = t.currentWindow - value
proc reserveNBytesWait*(t: SendBufferTracker, n: uint32): Future[void] =
let fut = newFuture[void]("SendBufferTracker.reserveNBytesWait")
let free = t.currentFreeBytes()
if (n <= free):
t.currentWindow = t.currentWindow + n
fut.complete()
else:
t.waiters.add((n, fut))
fut
proc reserveNBytes*(t: SendBufferTracker, n: uint32): bool =
let free = t.currentFreeBytes()
if (n <= free):
t.currentWindow = t.currentWindow + n
return true
else:
return false
proc forceReserveNBytes*(t: SendBufferTracker, n: uint32) =
t.currentWindow = t.currentWindow + n
proc currentBytesInFlight*(t: SendBufferTracker): uint32 = t.currentWindow

View File

@ -168,7 +168,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
# Initial ackNr is set to incoming packer seqNr # Initial ackNr is set to incoming packer seqNr
let incomingSocket = newIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[]) let incomingSocket = newIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[])
r.registerUtpSocket(incomingSocket) r.registerUtpSocket(incomingSocket)
await incomingSocket.startIncomingSocket() incomingSocket.startIncomingSocket()
# Based on configuration, socket is passed to upper layer either in SynRecv # Based on configuration, socket is passed to upper layer either in SynRecv
# or Connected state # or Connected state
info "Accepting incoming connection", info "Accepting incoming connection",
@ -235,12 +235,6 @@ proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}=
to = s.socketKey to = s.socketKey
s.destroy() s.destroy()
return err(OutgoingConnectionError(kind: ConnectionTimedOut)) return err(OutgoingConnectionError(kind: ConnectionTimedOut))
except CatchableError as e:
info "Outgoing connection failed due to send error",
to = s.socketKey
s.destroy()
# this may only happen if user provided callback will for some reason fail
return err(OutgoingConnectionError(kind: ErrorWhileSendingSyn, error: e))
# Connect to provided address # Connect to provided address
# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 # Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732

File diff suppressed because it is too large Load Diff

View File

@ -123,11 +123,10 @@ procSuite "Utp protocol over udp tests with loss and delays":
let testCases = @[ let testCases = @[
TestCase.init(45, 10, 40000), TestCase.init(45, 10, 40000),
TestCase.init(45, 15, 40000), TestCase.init(25, 15, 40000),
TestCase.init(50, 20, 20000),
# super small recv buffer which will be constantly on the brink of being full # super small recv buffer which will be constantly on the brink of being full
TestCase.init(15, 5, 80000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5))), TestCase.init(15, 5, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5))),
TestCase.init(15, 10, 80000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5))) TestCase.init(15, 10, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)))
] ]
asyncTest "Write and Read large data in different network conditions": asyncTest "Write and Read large data in different network conditions":
@ -173,9 +172,9 @@ procSuite "Utp protocol over udp tests with loss and delays":
let testCases1 = @[ let testCases1 = @[
# small buffers so it will fill up between reads # small buffers so it will fill up between reads
TestCase.init(15, 5, 60000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)), 10000), TestCase.init(15, 5, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)), 10000),
TestCase.init(15, 10, 60000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)), 10000), TestCase.init(15, 10, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)), 10000),
TestCase.init(15, 15, 60000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)), 10000) TestCase.init(15, 15, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)), 10000)
] ]
proc readWithMultipleReads(s: UtpSocket[TransportAddress], numOfReads: int, bytesPerRead: int): Future[seq[byte]] {.async.}= proc readWithMultipleReads(s: UtpSocket[TransportAddress], numOfReads: int, bytesPerRead: int): Future[seq[byte]] {.async.}=

View File

@ -44,7 +44,7 @@ template connectOutGoingSocket*(
) )
await sock1.processPacket(responseAck) await sock1.processPacket(responseAck)
await waitUntil(proc (): bool = sock1.isConnected())
check: check:
sock1.isConnected() sock1.isConnected()
@ -72,12 +72,14 @@ template connectOutGoingSocketWithIncoming*(
rng[] rng[]
) )
await incomingSocket.startIncomingSocket() incomingSocket.startIncomingSocket()
let responseAck = await incomingQueue.get() let responseAck = await incomingQueue.get()
await outgoingSocket.processPacket(responseAck) await outgoingSocket.processPacket(responseAck)
await waitUntil(proc (): bool = outgoingSocket.isConnected())
check: check:
outgoingSocket.isConnected() outgoingSocket.isConnected()

View File

@ -188,6 +188,8 @@ procSuite "Utp router unit tests":
await router.processIncomingBytes(encodedData, testSender) await router.processIncomingBytes(encodedData, testSender)
await waitUntil(proc (): bool = socket.numOfEventsInEventQueue() == 0)
check: check:
socket.isConnected() socket.isConnected()
@ -350,8 +352,8 @@ procSuite "Utp router unit tests":
check: check:
connectResult.isErr() connectResult.isErr()
connectResult.error().kind == ErrorWhileSendingSyn # even though send is failing we will just finish with timeout,
cast[TestError](connectResult.error().error) is TestError connectResult.error().kind == ConnectionTimedOut
router.len() == 0 router.len() == 0
asyncTest "Router should clear closed outgoing connections": asyncTest "Router should clear closed outgoing connections":

View File

@ -295,7 +295,6 @@ procSuite "Utp socket unit test":
await outgoingSocket.destroyWait() await outgoingSocket.destroyWait()
asyncTest "Ignoring totally out of order packet": asyncTest "Ignoring totally out of order packet":
# TODO test is valid until implementing selective acks
let q = newAsyncQueue[Packet]() let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16 let initalRemoteSeqNr = 10'u16
@ -305,11 +304,11 @@ procSuite "Utp socket unit test":
await outgoingSocket.processPacket(packets[1024]) await outgoingSocket.processPacket(packets[1024])
check:
outgoingSocket.numPacketsInReordedBuffer() == 0
await outgoingSocket.processPacket(packets[1023]) await outgoingSocket.processPacket(packets[1023])
# give some time to process those packets
await sleepAsync(milliseconds(500))
check: check:
outgoingSocket.numPacketsInReordedBuffer() == 1 outgoingSocket.numPacketsInReordedBuffer() == 1
@ -349,6 +348,8 @@ procSuite "Utp socket unit test":
await outgoingSocket.processPacket(responseAck) await outgoingSocket.processPacket(responseAck)
await waitUntil(proc (): bool = outgoingSocket.numPacketsInOutGoingBuffer() == 0)
check: check:
outgoingSocket.numPacketsInOutGoingBuffer() == 0 outgoingSocket.numPacketsInOutGoingBuffer() == 0
@ -427,7 +428,7 @@ procSuite "Utp socket unit test":
let dataToWrite1 = @[0'u8] let dataToWrite1 = @[0'u8]
let dataToWrite2 = @[1'u8] let dataToWrite2 = @[1'u8]
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, 0) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 0))
let writeFut1 = outgoingSocket.write(dataToWrite1) let writeFut1 = outgoingSocket.write(dataToWrite1)
let writeFut2 = outgoingSocket.write(dataToWrite2) let writeFut2 = outgoingSocket.write(dataToWrite2)
@ -531,6 +532,8 @@ procSuite "Utp socket unit test":
await outgoingSocket.processPacket(responseAck) await outgoingSocket.processPacket(responseAck)
await waitUntil(proc (): bool = outgoingSocket.isConnected())
check: check:
outgoingSocket.isConnected() outgoingSocket.isConnected()
@ -768,6 +771,8 @@ procSuite "Utp socket unit test":
await outgoingSocket.processPacket(responseAck) await outgoingSocket.processPacket(responseAck)
await waitUntil(proc (): bool = not outgoingSocket.isConnected())
check: check:
not outgoingSocket.isConnected() not outgoingSocket.isConnected()
@ -1005,6 +1010,8 @@ procSuite "Utp socket unit test":
await outgoingSocket.processPacket(responseAck) await outgoingSocket.processPacket(responseAck)
await waitUntil(proc (): bool = int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite))
check: check:
# only first packet has been acked so there should still by 5 bytes left # only first packet has been acked so there should still by 5 bytes left
int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite) int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite)
@ -1052,18 +1059,18 @@ procSuite "Utp socket unit test":
let q = newAsyncQueue[Packet]() let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16 let initialRemoteSeq = 10'u16
let dataToWrite = @[1'u8, 2, 3, 4, 5] let dataToWrite = generateByteArray(rng[], 1001)
# remote is initialized with buffer to small to handle whole payload # remote is initialized with buffer to small to handle whole payload
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, uint32(len(dataToWrite) - 1)) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 1000))
let writeFut = outgoingSocket.write(dataToWrite) let writeFut = outgoingSocket.write(dataToWrite)
# wait some time to check future is not finished # wait some time to check future is not finished
await sleepAsync(seconds(2)) await sleepAsync(seconds(2))
# write is not finished as future is blocked from progressing due to to small # write is not finished as future is blocked from progressing due to to full
# send window # send buffer
check: check:
not writeFut.finished() not writeFut.finished()
@ -1071,20 +1078,18 @@ procSuite "Utp socket unit test":
ackPacket( ackPacket(
initialRemoteSeq, initialRemoteSeq,
initialPacket.header.connectionId, initialPacket.header.connectionId,
initialPacket.header.seqNr, initialPacket.header.seqNr + 1,
uint32(len(dataToWrite)), testBufferSize,
0 0
) )
await outgoingSocket.processPacket(someAckFromRemote) await outgoingSocket.processPacket(someAckFromRemote)
# after processing packet with increased buffer size write should complete and # only after processing ack write will progress
# packet should be sent let writeResult = await writeFut
let sentPacket = await q.get()
check: check:
sentPacket.payload == dataToWrite writeResult.isOK()
writeFut.finished()
await outgoingSocket.destroyWait() await outgoingSocket.destroyWait()
@ -1092,30 +1097,21 @@ procSuite "Utp socket unit test":
let q = newAsyncQueue[Packet]() let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16 let initialRemoteSeq = 10'u16
let dataToWrite = @[1'u8, 2, 3, 4, 5] let dataToWirte = 1160
# remote is initialized with buffer to small to handle whole payload # remote is initialized with buffer to small to handle whole payload
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 1160))
let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize())
let someAckFromRemote =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
remoteRcvWindowSize,
0
)
# we are using ack from remote to setup our snd window size to one packet size on one packet let twoPacketData = generateByteArray(rng[], int(dataToWirte))
await outgoingSocket.processPacket(someAckFromRemote)
let twoPacketData = generateByteArray(rng[], int(2 * remoteRcvWindowSize)) let writeResult = await outgoingSocket.write(twoPacketData)
check:
writeResult.isOk()
# this write will not progress as snd buffer is full
let writeFut = outgoingSocket.write(twoPacketData) let writeFut = outgoingSocket.write(twoPacketData)
# after this time first packet will be send and will timeout, but the write should not # we wait for packets to timeout
# finish, as timeouting packets do not notify writing about new space in snd
# buffer
await sleepAsync(seconds(2)) await sleepAsync(seconds(2))
check: check:
@ -1162,15 +1158,22 @@ procSuite "Utp socket unit test":
check: check:
packet.header.pType == ST_DATA packet.header.pType == ST_DATA
uint32(len(packet.payload)) == remoteRcvWindowSize uint32(len(packet.payload)) == remoteRcvWindowSize
not writeFut.finished
let packet1Fut = q.get()
await sleepAsync(milliseconds(500))
check:
not packet1Fut.finished()
await outgoingSocket.processPacket(firstAckFromRemote) await outgoingSocket.processPacket(firstAckFromRemote)
let packet1 = await q.get() # packet is sent only after first packet is acked
let writeResult = await writeFut let packet1 = await packet1Fut
check: check:
packet1.header.pType == ST_DATA packet1.header.pType == ST_DATA
packet1.header.seqNr == packet.header.seqNr + 1
writeFut.finished writeFut.finished
await outgoingSocket.destroyWait() await outgoingSocket.destroyWait()
@ -1192,19 +1195,10 @@ procSuite "Utp socket unit test":
check: check:
outgoingSocket.isConnected() outgoingSocket.isConnected()
let writeFut = outgoingSocket.write(someData) # write result will be successfull as send buffer has space
let writeResult = await outgoingSocket.write(someData)
await sleepAsync(seconds(1))
check:
# Even after 1 second write is not finished as we did not receive any message
# so remote rcv window is still zero
not writeFut.finished()
# Ultimately, after 3 second remote rcv window will be reseted to minimal value
# and write will be able to progress
let writeResult = await writeFut
# this will finish in seconds(3) as only after this time window will be set to min value
let p = await q.get() let p = await q.get()
check: check:

View File

@ -48,6 +48,8 @@ procSuite "Utp socket selective acks unit test":
for p in dataPackets: for p in dataPackets:
await outgoingSocket.processPacket(p) await outgoingSocket.processPacket(p)
await waitUntil(proc (): bool = outgoingSocket.numOfEventsInEventQueue() == 0)
let extArray = outgoingSocket.generateSelectiveAckBitMask() let extArray = outgoingSocket.generateSelectiveAckBitMask()
await outgoingSocket.destroyWait() await outgoingSocket.destroyWait()
@ -170,6 +172,7 @@ procSuite "Utp socket selective acks unit test":
for toDeliver in testCase.packetsDelivered: for toDeliver in testCase.packetsDelivered:
await incomingSocket.processPacket(packets[toDeliver]) await incomingSocket.processPacket(packets[toDeliver])
await waitUntil(proc (): bool = incomingSocket.numOfEventsInEventQueue() == 0)
return (outgoingSocket, incomingSocket, packets) return (outgoingSocket, incomingSocket, packets)
@ -248,8 +251,12 @@ procSuite "Utp socket selective acks unit test":
await outgoingSocket.processPacket(finalAck) await outgoingSocket.processPacket(finalAck)
let expectedPackets = testCase.numOfPackets - len(testCase.packetsDelivered)
await waitUntil(proc (): bool = outgoingSocket.numPacketsInOutGoingBuffer() == expectedPackets)
check: check:
outgoingSocket.numPacketsInOutGoingBuffer() == testCase.numOfPackets - len(testCase.packetsDelivered) outgoingSocket.numPacketsInOutGoingBuffer() == expectedPackets
await outgoingSocket.destroyWait() await outgoingSocket.destroyWait()
await incomingSocket.destroyWait() await incomingSocket.destroyWait()
@ -299,6 +306,8 @@ procSuite "Utp socket selective acks unit test":
await outgoingSocket.processPacket(finalAck) await outgoingSocket.processPacket(finalAck)
await waitUntil(proc (): bool = outgoingSocket.numOfEventsInEventQueue() == 0)
for idx in testCase.packetsResent: for idx in testCase.packetsResent:
let resentPacket = await outgoingQueue.get() let resentPacket = await outgoingQueue.get()
check: check: