mirror of https://github.com/status-im/nim-eth.git
Add config for max snd buffer size (#440)
* Add config for max snd buffer size
This commit is contained in:
parent
3c8915cae1
commit
e7bc10ab00
|
@ -20,13 +20,27 @@ type SendBufferTracker* = ref object
|
||||||
|
|
||||||
# remote receive window updated based on packed wndSize field
|
# remote receive window updated based on packed wndSize field
|
||||||
maxRemoteWindow*: uint32
|
maxRemoteWindow*: uint32
|
||||||
|
|
||||||
|
# configuration option for maxium number of bytes in snd buffer
|
||||||
|
maxSndBufferSize*: uint32
|
||||||
waiters: seq[(uint32, Future[void])]
|
waiters: seq[(uint32, Future[void])]
|
||||||
|
|
||||||
proc new*(T: type SendBufferTracker, currentWindow: uint32, maxRemoteWindow: uint32): T =
|
proc new*(
|
||||||
return SendBufferTracker(currentWindow: currentWindow, maxRemoteWindow: maxRemoteWindow, waiters: @[])
|
T: type SendBufferTracker,
|
||||||
|
currentWindow: uint32,
|
||||||
|
maxRemoteWindow: uint32,
|
||||||
|
maxSndBufferSize: uint32): T =
|
||||||
|
return (
|
||||||
|
SendBufferTracker(
|
||||||
|
currentWindow: currentWindow,
|
||||||
|
maxRemoteWindow: maxRemoteWindow,
|
||||||
|
maxSndBufferSize: maxSndBufferSize,
|
||||||
|
waiters: @[]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
proc currentFreeBytes*(t: SendBufferTracker): uint32 =
|
proc currentFreeBytes*(t: SendBufferTracker): uint32 =
|
||||||
let maxSend = t.maxRemoteWindow
|
let maxSend = min(t.maxRemoteWindow, t.maxSndBufferSize)
|
||||||
if (maxSend <= t.currentWindow):
|
if (maxSend <= t.currentWindow):
|
||||||
return 0
|
return 0
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -57,6 +57,9 @@ type
|
||||||
# Maximnal size of receive buffer in bytes
|
# Maximnal size of receive buffer in bytes
|
||||||
optRcvBuffer*: uint32
|
optRcvBuffer*: uint32
|
||||||
|
|
||||||
|
# Maximnal size of send buffer in bytes
|
||||||
|
optSndBuffer*: uint32
|
||||||
|
|
||||||
# If set to some(`Duration`), the incoming socket will be initialized in
|
# If set to some(`Duration`), the incoming socket will be initialized in
|
||||||
# `SynRecv` state and the remote peer will have `Duration` to transfer data
|
# `SynRecv` state and the remote peer will have `Duration` to transfer data
|
||||||
# to move the socket in `Connected` state.
|
# to move the socket in `Connected` state.
|
||||||
|
@ -278,12 +281,14 @@ proc init*(
|
||||||
dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure,
|
dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure,
|
||||||
optRcvBuffer: uint32 = defaultOptRcvBuffer,
|
optRcvBuffer: uint32 = defaultOptRcvBuffer,
|
||||||
incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout),
|
incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout),
|
||||||
remoteWindowResetTimeout: Duration = defaultResetWindowTimeout
|
remoteWindowResetTimeout: Duration = defaultResetWindowTimeout,
|
||||||
|
optSndBuffer: uint32 = defaultOptRcvBuffer
|
||||||
): T =
|
): T =
|
||||||
SocketConfig(
|
SocketConfig(
|
||||||
initialSynTimeout: initialSynTimeout,
|
initialSynTimeout: initialSynTimeout,
|
||||||
dataResendsBeforeFailure: dataResendsBeforeFailure,
|
dataResendsBeforeFailure: dataResendsBeforeFailure,
|
||||||
optRcvBuffer: optRcvBuffer,
|
optRcvBuffer: optRcvBuffer,
|
||||||
|
optSndBuffer: optSndBuffer,
|
||||||
incomingSocketReceiveTimeout: incomingSocketReceiveTimeout,
|
incomingSocketReceiveTimeout: incomingSocketReceiveTimeout,
|
||||||
remoteWindowResetTimeout: remoteWindowResetTimeout
|
remoteWindowResetTimeout: remoteWindowResetTimeout
|
||||||
)
|
)
|
||||||
|
@ -560,7 +565,7 @@ proc new[A](
|
||||||
closeEvent: newAsyncEvent(),
|
closeEvent: newAsyncEvent(),
|
||||||
closeCallbacks: newSeq[Future[void]](),
|
closeCallbacks: newSeq[Future[void]](),
|
||||||
# start with 1mb assumption, field will be updated with first received packet
|
# start with 1mb assumption, field will be updated with first received packet
|
||||||
sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024),
|
sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024, cfg.optSndBuffer),
|
||||||
# queue with infinite size
|
# queue with infinite size
|
||||||
writeQueue: newAsyncQueue[WriteRequest](),
|
writeQueue: newAsyncQueue[WriteRequest](),
|
||||||
zeroWindowTimer: Moment.now() + cfg.remoteWindowResetTimeout,
|
zeroWindowTimer: Moment.now() + cfg.remoteWindowResetTimeout,
|
||||||
|
|
|
@ -956,3 +956,51 @@ procSuite "Utp socket unit test":
|
||||||
p.payload == someData
|
p.payload == someData
|
||||||
|
|
||||||
await outgoingSocket.destroyWait()
|
await outgoingSocket.destroyWait()
|
||||||
|
|
||||||
|
asyncTest "Writing data should respect max snd buffer option":
|
||||||
|
let q = newAsyncQueue[Packet]()
|
||||||
|
let initialRemoteSeq = 10'u16
|
||||||
|
let someData1 = @[1'u8]
|
||||||
|
let somedata2 = @[2'u8]
|
||||||
|
let (outgoingSocket, initialPacket) =
|
||||||
|
connectOutGoingSocket(
|
||||||
|
initialRemoteSeq,
|
||||||
|
q,
|
||||||
|
cfg = SocketConfig.init(
|
||||||
|
optSndBuffer = 1
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
check:
|
||||||
|
outgoingSocket.isConnected()
|
||||||
|
|
||||||
|
# snd buffer got 1 byte of space so this future shold finish
|
||||||
|
let write1 = await outgoingSocket.write(someData1)
|
||||||
|
|
||||||
|
let writeFut2 = outgoingSocket.write(someData2)
|
||||||
|
|
||||||
|
# wait until 2 re-sends to check we do not accidently free buffer during re-sends
|
||||||
|
discard await q.get()
|
||||||
|
discard await q.get()
|
||||||
|
let firstPacket = await q.get()
|
||||||
|
|
||||||
|
check:
|
||||||
|
# this write still cannot progress as 1st write is not acked
|
||||||
|
not writeFut2.finished()
|
||||||
|
|
||||||
|
let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr + 1, 10)
|
||||||
|
|
||||||
|
# acks first write, so there is space in buffer for new data and second
|
||||||
|
# write should progress
|
||||||
|
await outgoingSocket.processPacket(someAckFromRemote)
|
||||||
|
|
||||||
|
yield writeFut2
|
||||||
|
|
||||||
|
let secondPacket = await q.get()
|
||||||
|
|
||||||
|
check:
|
||||||
|
writeFut2.finished()
|
||||||
|
firstPacket.payload == someData1
|
||||||
|
secondPacket.payload == somedata2
|
||||||
|
|
||||||
|
await outgoingSocket.destroyWait()
|
||||||
|
|
Loading…
Reference in New Issue