Advertise correct rcv buffer size (#423)

* Advertise correct rcv buffer size
This commit is contained in:
KonradStaniec 2021-11-12 10:58:49 +01:00 committed by GitHub
parent b671f6c901
commit 8139aae346
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 121 additions and 15 deletions

View File

@ -4,7 +4,7 @@ description = "Ethereum Common library"
license = "MIT"
skipDirs = @["tests"]
requires "nim >= 1.2.0 & <= 1.2.12",
requires "nim >= 1.2.0 & <= 1.2.14",
"nimcrypto",
"stint",
"secp256k1",

View File

@ -37,6 +37,7 @@ type
# was received, and the timestamp in this last received packet
timestampDiff*: MicroSeconds
# The window size is the number of bytes currently in-flight, i.e. sent but not acked
# When sending packets, this should be set to the number of bytes left in the socket's receive buffer.
wndSize*: uint32
seqNr*: uint16
# sequence number the sender of the packet last received in the other direction

View File

@ -54,6 +54,9 @@ type
# failed
dataResendsBeforeFailure*: uint16
# Maximnal size of receive buffer in bytes
optRcvBuffer*: uint32
UtpSocket*[A] = ref object
remoteAddress*: A
state: ConnectionState
@ -179,12 +182,27 @@ const
# dead. 4 is taken from reference implementation
defaultDataResendsBeforeFailure = 4'u16
# default size of rcv buffer in bytes
# rationale form C reference impl:
# 1 MB of receive buffer (i.e. max bandwidth delay product)
# means that from a peer with 200 ms RTT, we cannot receive
# faster than 5 MB/s
# from a peer with 10 ms RTT, we cannot receive faster than
# 100 MB/s. This is assumed to be good enough, since bandwidth
# often is proportional to RTT anyway
defaultOptRcvBuffer: uint32 = 1024 * 1024
reorderBufferMaxSize = 1024
proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T =
UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId)
proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16, needResend: bool, timeSent: Moment = Moment.now()): T =
proc init(
T: type OutgoingPacket,
packetBytes: seq[byte],
transmissions: uint16,
needResend: bool,
timeSent: Moment = Moment.now()): T =
OutgoingPacket(
packetBytes: packetBytes,
transmissions: transmissions,
@ -195,13 +213,22 @@ proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16,
proc init*(
T: type SocketConfig,
initialSynTimeout: Duration = defaultInitialSynTimeout,
dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure
dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure,
optRcvBuffer: uint32 = defaultOptRcvBuffer
): T =
SocketConfig(
initialSynTimeout: initialSynTimeout,
dataResendsBeforeFailure: dataResendsBeforeFailure
dataResendsBeforeFailure: dataResendsBeforeFailure,
optRcvBuffer: optRcvBuffer
)
proc getRcvWindowSize(socket: UtpSocket): uint32 =
let currentDataSize = socket.buffer.dataLen()
if currentDataSize > int(socket.socketConfig.optRcvBuffer):
0'u32
else:
socket.socketConfig.optRcvBuffer - uint32(currentDataSize)
proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) =
## Adds packet to outgoing buffer and updates all related fields
socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets)
@ -216,12 +243,18 @@ proc sendAck(socket: UtpSocket): Future[void] =
## Creates and sends ack, based on current socket state. Acks are different from
## other packets as we do not track them in outgoing buffet
let ackPacket = ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576)
let ackPacket =
ackPacket(
socket.seqNr,
socket.connectionIdSnd,
socket.ackNr,
socket.getRcvWindowSize()
)
socket.sendData(encodePacket(ackPacket))
proc sendSyn(socket: UtpSocket): Future[void] =
doAssert(socket.state == SynSent , "syn can only be send when in SynSent state")
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576)
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize())
notice "Sending syn packet packet", packet = packet
# set number of transmissions to 1 as syn packet will be send just after
# initiliazation
@ -373,9 +406,7 @@ proc new[A](
rtt: milliseconds(0),
rttVar: milliseconds(800),
rto: milliseconds(3000),
# Default 1MB buffer
# TODO add posibility to configure buffer size
buffer: AsyncBuffer.init(1024 * 1024),
buffer: AsyncBuffer.init(int(cfg.optRcvBuffer)),
closeEvent: newAsyncEvent(),
closeCallbacks: newSeq[Future[void]](),
socketKey: UtpSocketKey.init(to, rcvId),
@ -742,7 +773,7 @@ proc close*(socket: UtpSocket) {.async.} =
if socket.curWindowPackets == 0:
socket.resetSendTimeout()
let finEncoded = encodePacket(finPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576))
let finEncoded = encodePacket(finPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, socket.getRcvWindowSize()))
socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true))
socket.finSent = true
await socket.sendData(finEncoded)
@ -784,11 +815,12 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] {.async.} =
let pSize = socket.getPacketSize()
let endIndex = data.high()
var i = 0
let wndSize = socket.getRcvWindowSize()
while i <= data.high:
let lastIndex = i + pSize - 1
let lastOrEnd = min(lastIndex, endIndex)
let dataSlice = data[i..lastOrEnd]
let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576, dataSlice)
let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, wndSize, dataSlice)
socket.registerOutgoingPacket(OutgoingPacket.init(encodePacket(dataPacket), 0, false))
bytesWritten = bytesWritten + len(dataSlice)
i = lastOrEnd + 1

View File

@ -57,8 +57,11 @@ procSuite "Utp socket unit test":
resultBytes.add(p.payload)
return resultBytes
template connectOutGoingSocket(initialRemoteSeq: uint16, q: AsyncQueue[Packet]): (UtpSocket[TransportAddress], Packet) =
let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(), rng[])
template connectOutGoingSocket(
initialRemoteSeq: uint16,
q: AsyncQueue[Packet],
cfg: SocketConfig = SocketConfig.init()): (UtpSocket[TransportAddress], Packet) =
let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), cfg, rng[])
await sock1.startOutgoingSocket()
let initialPacket = await q.get()
@ -76,12 +79,14 @@ procSuite "Utp socket unit test":
asyncTest "Starting outgoing socket should send Syn packet":
let q = newAsyncQueue[Packet]()
let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(), rng[])
let defaultConfig = SocketConfig.init()
let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), defaultConfig, rng[])
await sock1.startOutgoingSocket()
let initialPacket = await q.get()
check:
initialPacket.header.pType == ST_SYN
initialPacket.header.wndSize == defaultConfig.optRcvBuffer
asyncTest "Outgoing socket should re-send syn packet 2 times before declaring failure":
let q = newAsyncQueue[Packet]()
@ -477,7 +482,6 @@ procSuite "Utp socket unit test":
error.kind == SocketNotWriteable
error.currentState == Destroy
asyncTest "Trying to write data onto closed socket which sent fin":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
@ -495,3 +499,72 @@ procSuite "Utp socket unit test":
check:
error.kind == FinSent
asyncTest "Processing data packet should update window size accordingly and use it in all send packets":
let q = newAsyncQueue[Packet]()
let initialRemoteSeqNr = 10'u16
let initialRcvBufferSize = 10'u32
let data = @[1'u8, 2'u8, 3'u8]
let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize)
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q, sCfg)
let dataP1 = dataPacket(initialRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data)
await outgoingSocket.processPacket(dataP1)
let ack1 = await q.get()
check:
ack1.header.pType == ST_STATE
ack1.header.ackNr == initialRemoteSeqNr
ack1.header.wndSize == initialRcvBufferSize - uint32(len(data))
let written = await outgoingSocket.write(data)
let sentData = await q.get()
check:
sentData.header.pType == ST_DATA
sentData.header.wndSize == initialRcvBufferSize - uint32(len(data))
await outgoingSocket.close()
let sentFin = await q.get()
check:
sentFin.header.pType == ST_FIN
sentFin.header.wndSize == initialRcvBufferSize - uint32(len(data))
asyncTest "Reading data from the buffer shoud increase receive window":
let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16
let initialRcvBufferSize = 10'u32
let data = @[1'u8, 2'u8, 3'u8]
let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize)
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, sCfg)
let dataP1 = dataPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data)
await outgoingSocket.processPacket(dataP1)
let ack1 = await q.get()
check:
ack1.header.pType == ST_STATE
ack1.header.ackNr == initalRemoteSeqNr
ack1.header.wndSize == initialRcvBufferSize - uint32(len(data))
let readData = await outgoingSocket.read(data.len())
check:
readData == data
discard await outgoingSocket.write(data)
let sentData = await q.get()
check:
sentData.header.pType == ST_DATA
# we have read all data from rcv buffer, advertised window should go back to
# initial size
sentData.header.wndSize == initialRcvBufferSize