nim-eth/eth/utp/utp_socket.nim
KonradStaniec b56e19a590
Improve handling of timestamps (#446)
* Improve handling of timestamps
2021-12-10 16:28:00 +01:00

1259 lines
44 KiB
Nim

# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
std/sugar,
chronos, chronicles, bearssl,
stew/results,
./send_buffer_tracker,
./growable_buffer,
./packets,
./ledbat_congestion_control,
./delay_histogram,
./utp_utils,
./clock_drift_calculator
logScope:
topics = "utp_socket"
type
ConnectionState* = enum
SynSent,
SynRecv,
Connected,
Destroy
ConnectionDirection = enum
Outgoing, Incoming
UtpSocketKey*[A] = object
remoteAddress*: A
rcvId*: uint16
OutgoingPacket = object
packetBytes: seq[byte]
transmissions: uint16
needResend: bool
payloadLength: uint32
timeSent: Moment
AckResult = enum
PacketAcked, PacketAlreadyAcked, PacketNotSentYet
# Socket callback to send data to remote peer
SendCallback*[A] = proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect]}
SocketConfig* = object
# This is configurable (in contrast to reference impl), as with standard 2 syn resends
# default timeout set to 3seconds and doubling of timeout with each re-send, it
# means that initial connection would timeout after 21s, which seems rather long
initialSynTimeout*: Duration
# Number of resend re-tries of each data packet, before daclaring connection
# failed
dataResendsBeforeFailure*: uint16
# Maximnal size of receive buffer in bytes
optRcvBuffer*: uint32
# Maximnal size of send buffer in bytes
optSndBuffer*: uint32
# If set to some(`Duration`), the incoming socket will be initialized in
# `SynRecv` state and the remote peer will have `Duration` to transfer data
# to move the socket in `Connected` state.
# If set to none, the incoming socket will immediately be set to `Connected`
# state and will be able to transfer data.
incomingSocketReceiveTimeout*: Option[Duration]
# Timeout after which the send window will be reset to its minimal value after it dropped
# to zero. i.e when we received a packet from remote peer with `wndSize` set to 0.
remoteWindowResetTimeout*: Duration
WriteErrorType* = enum
SocketNotWriteable,
FinSent
WriteError* = object
case kind*: WriteErrorType
of SocketNotWriteable:
currentState*: ConnectionState
of FinSent:
discard
WriteResult* = Result[int, WriteError]
WriteRequestType = enum
Data, Close
WriteRequest = object
case kind: WriteRequestType
of Data:
data: seq[byte]
writer: Future[WriteResult]
of Close:
discard
UtpSocket*[A] = ref object
remoteAddress*: A
state: ConnectionState
direction: ConnectionDirection
socketConfig: SocketConfig
# Connection id for packets we receive
connectionIdRcv*: uint16
# Connection id for packets we send
connectionIdSnd*: uint16
# Sequence number for the next packet to be sent.
seqNr: uint16
# All seq number up to this havve been correctly acked by us
ackNr: uint16
# Should be completed after succesful connection to remote host or after timeout
# for the first syn packet
connectionFuture: Future[void]
# the number of packets in the send queue. Packets that haven't
# yet been sent count as well as packets marked as needing resend
# the oldest un-acked packet in the send queue is seq_nr - cur_window_packets
curWindowPackets: uint16
# out going buffer for all send packets
outBuffer: GrowableCircularBuffer[OutgoingPacket]
# incoming buffer for out of order packets
inBuffer: GrowableCircularBuffer[Packet]
# Number of packets waiting in reorder buffer
reorderCount: uint16
# current retransmit Timeout used to calculate rtoTimeout
retransmitTimeout: Duration
# calculated round trip time during communication with remote peer
rtt: Duration
# calculated round trip time variance
rttVar: Duration
# Round trip timeout dynamicaly updated based on acks received from remote
# peer
rto: Duration
# RTO timeout will happen when currenTime > rtoTimeout
rtoTimeout: Moment
# rcvBuffer
buffer: AsyncBuffer
# loop called every 500ms to check for on going timeout status
checkTimeoutsLoop: Future[void]
# number on consecutive re-transsmisions
retransmitCount: uint32
# Event which will complete whenever socket gets in destory state
closeEvent: AsyncEvent
# All callback to be called whenever socket gets in destroy state
closeCallbacks: seq[Future[void]]
# socket is closed for reading
readShutdown: bool
# we sent out fin packet
finSent: bool
# we requested to close the socket by sending fin packet
sendFinRequested: bool
# have our fin been acked
finAcked: bool
# have we received remote fin
gotFin: bool
# have we reached remote fin packet
reachedFin: bool
# sequence number of remoted fin packet
eofPktNr: uint16
sendBufferTracker: SendBufferTracker
writeQueue: AsyncQueue[WriteRequest]
writeLoop: Future[void]
zeroWindowTimer: Moment
# last measured delay between current local timestamp, and remote sent
# timestamp. In microseconds
replayMicro: uint32
# indicator if we're in slow-start (exponential growth) phase
slowStart: bool
#the slow-start threshold, in bytes
slowStartTreshold: uint32
# history of our delays
ourHistogram: DelayHistogram
# history of remote delays
remoteHistogram: DelayHistogram
# calculator of drifiting between local and remote clocks
driftCalculator: ClockDriftCalculator
# socket identifier
socketKey*: UtpSocketKey[A]
send: SendCallback[A]
# User driven call back to be called whenever socket is permanently closed i.e
# reaches destroy state
SocketCloseCallback* = proc (): void {.gcsafe, raises: [Defect].}
ConnectionError* = object of CatchableError
OutgoingConnectionErrorType* = enum
SocketAlreadyExists, ConnectionTimedOut, ErrorWhileSendingSyn
OutgoingConnectionError* = object
case kind*: OutgoingConnectionErrorType
of ErrorWhileSendingSyn:
error*: ref CatchableError
of SocketAlreadyExists, ConnectionTimedOut:
discard
ConnectionResult*[A] = Result[UtpSocket[A], OutgoingConnectionError]
const
# Maximal number of payload bytes per packet. Total packet size will be equal to
# mtuSize + sizeof(header) = 600 bytes
# TODO for now it is just some random value. Ultimatly this value should be dynamically
# adjusted based on traffic.
mtuSize = 580
# How often each socket check its different on going timers
checkTimeoutsLoopInterval = milliseconds(500)
# Defualt initial timeout for first Syn packet
defaultInitialSynTimeout = milliseconds(3000)
# Initial timeout to receive first Data data packet after receiving initial Syn
# packet.
defaultRcvRetransmitTimeout = milliseconds(10000)
# Number of times each data packet will be resend before declaring connection
# dead. 4 is taken from reference implementation
defaultDataResendsBeforeFailure = 4'u16
# default size of rcv buffer in bytes
# rationale form C reference impl:
# 1 MB of receive buffer (i.e. max bandwidth delay product)
# means that from a peer with 200 ms RTT, we cannot receive
# faster than 5 MB/s
# from a peer with 10 ms RTT, we cannot receive faster than
# 100 MB/s. This is assumed to be good enough, since bandwidth
# often is proportional to RTT anyway
defaultOptRcvBuffer: uint32 = 1024 * 1024
# rationale from C reference impl:
# Allow a reception window of at least 3 ack_nrs behind seq_nr
# A non-SYN packet with an ack_nr difference greater than this is
# considered suspicious and ignored
allowedAckWindow*: uint16 = 3
# Timeout after which the send window will be reset to its minimal value after it dropped
# to zero. i.e when we received a packet from remote peer with `wndSize` set to 0.
defaultResetWindowTimeout = seconds(15)
# If remote peer window drops to zero, then after some time we will reset it
# to this value even if we do not receive any more messages from remote peers.
# Reset period is configured in `SocketConfig`
minimalRemoteWindow: uint32 = 1500
# Initial max window size. Reference implementation uses value which enables one packet
# to be transfered.
# We use value two times higher as we do not yet have proper mtu estimation, and
# our impl should work over udp and discovery v5 (where proper estmation may be harder
# as packets already have discvoveryv5 envelope)
startMaxWindow* = 2 * mtuSize
reorderBufferMaxSize = 1024
proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T =
UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId)
proc init(
T: type OutgoingPacket,
packetBytes: seq[byte],
transmissions: uint16,
needResend: bool,
payloadLength: uint32,
timeSent: Moment = getMonoTimestamp().moment): T =
OutgoingPacket(
packetBytes: packetBytes,
transmissions: transmissions,
needResend: needResend,
payloadLength: payloadLength,
timeSent: timeSent
)
proc init*(
T: type SocketConfig,
initialSynTimeout: Duration = defaultInitialSynTimeout,
dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure,
optRcvBuffer: uint32 = defaultOptRcvBuffer,
incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout),
remoteWindowResetTimeout: Duration = defaultResetWindowTimeout,
optSndBuffer: uint32 = defaultOptRcvBuffer
): T =
SocketConfig(
initialSynTimeout: initialSynTimeout,
dataResendsBeforeFailure: dataResendsBeforeFailure,
optRcvBuffer: optRcvBuffer,
optSndBuffer: optSndBuffer,
incomingSocketReceiveTimeout: incomingSocketReceiveTimeout,
remoteWindowResetTimeout: remoteWindowResetTimeout
)
proc getRcvWindowSize(socket: UtpSocket): uint32 =
let currentDataSize = socket.buffer.dataLen()
if currentDataSize > int(socket.socketConfig.optRcvBuffer):
0'u32
else:
socket.socketConfig.optRcvBuffer - uint32(currentDataSize)
proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) =
## Adds packet to outgoing buffer and updates all related fields
socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets)
socket.outBuffer.put(socket.seqNr, oPacket)
inc socket.seqNr
inc socket.curWindowPackets
proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] =
socket.send(socket.remoteAddress, data)
proc sendAck(socket: UtpSocket): Future[void] =
## Creates and sends ack, based on current socket state. Acks are different from
## other packets as we do not track them in outgoing buffet
let ackPacket =
ackPacket(
socket.seqNr,
socket.connectionIdSnd,
socket.ackNr,
socket.getRcvWindowSize(),
socket.replayMicro
)
socket.sendData(encodePacket(ackPacket))
# Should be called before sending packet
proc setSend(s: UtpSocket, p: var OutgoingPacket): seq[byte] =
let timestampInfo = getMonoTimestamp()
inc p.transmissions
p.needResend = false
p.timeSent = timestampInfo.moment
# all bytearrays in outgoing buffer should be properly encoded utp packets
# so it is safe to directly modify fields
modifyTimeStampAndAckNr(p.packetBytes, timestampInfo.timestamp, s.ackNr)
return p.packetBytes
proc flushPackets(socket: UtpSocket) {.async.} =
var i: uint16 = socket.seqNr - socket.curWindowPackets
while i != socket.seqNr:
# sending only packet which were not transmitted yet or need a resend
let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true))
if (shouldSendPacket):
if socket.sendBufferTracker.reserveNBytes(socket.outBuffer[i].payloadLength):
let toSend = socket.setSend(socket.outBuffer[i])
await socket.sendData(toSend)
else:
# there is no place in send buffer, stop flushing
return
inc i
proc markAllPacketAsLost(s: UtpSocket) =
var i = 0'u16
while i < s.curWindowPackets:
let packetSeqNr = s.seqNr - 1 - i
if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p.transmissions > 0 and p.needResend == false)):
s.outBuffer[packetSeqNr].needResend = true
let packetPayloadLength = s.outBuffer[packetSeqNr].payloadLength
# lack of waiters notification in case of timeout effectivly means that
# we do not allow any new bytes to enter snd buffer in case of new free space
# due to timeout.
s.sendBufferTracker.decreaseCurrentWindow(packetPayloadLength, notifyWaiters = false)
inc i
proc isOpened(socket:UtpSocket): bool =
return (
socket.state == SynRecv or
socket.state == SynSent or
socket.state == Connected
)
proc shouldDisconnectFromFailedRemote(socket: UtpSocket): bool =
(socket.state == SynSent and socket.retransmitCount >= 2) or
(socket.retransmitCount >= socket.socketConfig.dataResendsBeforeFailure)
proc checkTimeouts(socket: UtpSocket) {.async.} =
let currentTime = getMonoTimestamp().moment
# flush all packets which needs to be re-send
if socket.state != Destroy:
await socket.flushPackets()
if socket.isOpened():
if (socket.sendBufferTracker.maxRemoteWindow == 0 and currentTime > socket.zeroWindowTimer):
debug "Reset remote window to minimal value"
socket.sendBufferTracker.updateMaxRemote(minimalRemoteWindow)
if (currentTime > socket.rtoTimeout):
# TODO add handling of probe time outs. Reference implemenation has mechanism
# of sending probes to determine mtu size. Probe timeouts do not count to standard
# timeouts calculations
# client initiated connections, but did not send following data packet in rto
# time and our socket is configured to start in SynRecv state.
if (socket.state == SynRecv):
socket.destroy()
return
if socket.shouldDisconnectFromFailedRemote():
if socket.state == SynSent and (not socket.connectionFuture.finished()):
socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out"))
socket.destroy()
return
let newTimeout = socket.retransmitTimeout * 2
socket.retransmitTimeout = newTimeout
socket.rtoTimeout = currentTime + newTimeout
let currentPacketSize = uint32(socket.getPacketSize())
if (socket.curWindowPackets == 0 and socket.sendBufferTracker.maxWindow > currentPacketSize):
# there are no packets in flight even though there is place for more than whole packet
# this means connection is just idling. Reset window by 1/3'rd but no more
# than to fit at least one packet.
let oldMaxWindow = socket.sendBufferTracker.maxWindow
let newMaxWindow = max((oldMaxWindow * 2) div 3, currentPacketSize)
socket.sendBufferTracker.updateMaxWindowSize(
# maxRemote window does not change
socket.sendBufferTracker.maxRemoteWindow,
newMaxWindow
)
else:
# delay was so high that window has shrunk below one packet. Reset window
# to fit a least one packet and start with slow start
socket.sendBufferTracker.updateMaxWindowSize(
# maxRemote window does not change
socket.sendBufferTracker.maxRemoteWindow,
currentPacketSize
)
socket.slowStart = true
# This will have much more sense when we will add handling of selective acks
# as then every selecivly acked packet restes timeout timer and removes packet
# from out buffer.
markAllPacketAsLost(socket)
# resend oldest packet if there are some packets in flight
if (socket.curWindowPackets > 0):
notice "resending oldest packet in outBuffer"
inc socket.retransmitCount
let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets
# TODO add handling of fast timeout
doAssert(
socket.outBuffer.get(oldestPacketSeqNr).isSome(),
"oldest packet should always be available when there is data in flight"
)
let payloadLength = socket.outBuffer[oldestPacketSeqNr].payloadLength
if (socket.sendBufferTracker.reserveNBytes(payloadLength)):
let dataToSend = socket.setSend(socket.outBuffer[oldestPacketSeqNr])
await socket.sendData(dataToSend)
# TODO add sending keep alives when necessary
proc checkTimeoutsLoop(s: UtpSocket) {.async.} =
## Loop that check timeouts in the socket.
try:
while true:
await sleepAsync(checkTimeoutsLoopInterval)
await s.checkTimeouts()
except CancelledError:
trace "checkTimeoutsLoop canceled"
proc startTimeoutLoop(s: UtpSocket) =
s.checkTimeoutsLoop = checkTimeoutsLoop(s)
proc getPacketSize*(socket: UtpSocket): int =
# TODO currently returning constant, ultimatly it should be bases on mtu estimates
mtuSize
proc resetSendTimeout(socket: UtpSocket) =
socket.retransmitTimeout = socket.rto
socket.rtoTimeout = getMonoTimestamp().moment + socket.retransmitTimeout
proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteResult]): Future[void] {.async.} =
if writeFut.finished():
# write future was cancelled befere we got chance to process it, short circuit
# processing and move to next loop iteration
return
let pSize = socket.getPacketSize()
let endIndex = data.high()
var i = 0
var bytesWritten = 0
let wndSize = socket.getRcvWindowSize()
while i <= endIndex:
let lastIndex = i + pSize - 1
let lastOrEnd = min(lastIndex, endIndex)
let dataSlice = data[i..lastOrEnd]
let payloadLength = uint32(len(dataSlice))
try:
await socket.sendBufferTracker.reserveNBytesWait(payloadLength)
if socket.curWindowPackets == 0:
socket.resetSendTimeout()
let dataPacket =
dataPacket(
socket.seqNr,
socket.connectionIdSnd,
socket.ackNr,
wndSize,
dataSlice,
socket.replayMicro
)
let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 1, false, payloadLength)
socket.registerOutgoingPacket(outgoingPacket)
await socket.sendData(outgoingPacket.packetBytes)
except CancelledError as exc:
# write loop has been cancelled in the middle of processing due to the
# socket closing
# this approach can create partial write in case destroyin socket in the
# the middle of the write
doAssert(socket.state == Destroy)
if (not writeFut.finished()):
let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
writeFut.complete(res)
# we need to re-raise exception so the outer loop will be properly cancelled too
raise exc
bytesWritten = bytesWritten + len(dataSlice)
i = lastOrEnd + 1
# Before completeing future with success (as all data was sent sucessfuly)
# we need to check if user did not cancel write on his end
if (not writeFut.finished()):
writeFut.complete(Result[int, WriteError].ok(bytesWritten))
proc handleClose(socket: UtpSocket): Future[void] {.async.} =
try:
if socket.curWindowPackets == 0:
socket.resetSendTimeout()
let finEncoded =
encodePacket(
finPacket(
socket.seqNr,
socket.connectionIdSnd,
socket.ackNr,
socket.getRcvWindowSize(),
socket.replayMicro
)
)
socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true, 0))
await socket.sendData(finEncoded)
socket.finSent = true
except CancelledError as exc:
raise exc
proc writeLoop(socket: UtpSocket): Future[void] {.async.} =
## Loop that processes writes on socket
try:
while true:
let req = await socket.writeQueue.get()
case req.kind
of Data:
await socket.handleDataWrite(req.data, req.writer)
of Close:
await socket.handleClose()
except CancelledError:
doAssert(socket.state == Destroy)
for req in socket.writeQueue.items:
if (req.kind == Data and not req.writer.finished()):
let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
req.writer.complete(res)
socket.writeQueue.clear()
trace "writeLoop canceled"
proc startWriteLoop(s: UtpSocket) =
s.writeLoop = writeLoop(s)
proc new[A](
T: type UtpSocket[A],
to: A,
snd: SendCallback[A],
state: ConnectionState,
cfg: SocketConfig,
direction: ConnectionDirection,
rcvId: uint16,
sndId: uint16,
initialSeqNr: uint16,
initialAckNr: uint16,
initialTimeout: Duration
): T =
let currentTime = getMonoTimestamp().moment
T(
remoteAddress: to,
state: state,
direction: direction,
socketConfig: cfg,
connectionIdRcv: rcvId,
connectionIdSnd: sndId,
seqNr: initialSeqNr,
ackNr: initialAckNr,
connectionFuture: newFuture[void](),
outBuffer: GrowableCircularBuffer[OutgoingPacket].init(),
inBuffer: GrowableCircularBuffer[Packet].init(),
retransmitTimeout: initialTimeout,
rtoTimeout: currentTime + initialTimeout,
# Initial timeout values taken from reference implemntation
rtt: milliseconds(0),
rttVar: milliseconds(800),
rto: milliseconds(3000),
buffer: AsyncBuffer.init(int(cfg.optRcvBuffer)),
closeEvent: newAsyncEvent(),
closeCallbacks: newSeq[Future[void]](),
# start with 1mb assumption, field will be updated with first received packet
sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024, cfg.optSndBuffer, startMaxWindow),
# queue with infinite size
writeQueue: newAsyncQueue[WriteRequest](),
zeroWindowTimer: currentTime + cfg.remoteWindowResetTimeout,
socketKey: UtpSocketKey.init(to, rcvId),
slowStart: true,
slowStartTreshold: cfg.optSndBuffer,
ourHistogram: DelayHistogram.init(currentTime),
remoteHistogram: DelayHistogram.init(currentTime),
driftCalculator: ClockDriftCalculator.init(currentTime),
send: snd
)
proc newOutgoingSocket*[A](
to: A,
snd: SendCallback[A],
cfg: SocketConfig,
rcvConnectionId: uint16,
rng: var BrHmacDrbgContext
): UtpSocket[A] =
let sndConnectionId = rcvConnectionId + 1
let initialSeqNr = randUint16(rng)
UtpSocket[A].new(
to,
snd,
SynSent,
cfg,
Outgoing,
rcvConnectionId,
sndConnectionId,
initialSeqNr,
# Initialy ack nr is 0, as we do not know remote inital seqnr
0,
cfg.initialSynTimeout
)
proc newIncomingSocket*[A](
to: A,
snd: SendCallback[A],
cfg: SocketConfig,
connectionId: uint16,
ackNr: uint16,
rng: var BrHmacDrbgContext
): UtpSocket[A] =
let initialSeqNr = randUint16(rng)
let (initialState, initialTimeout) =
if (cfg.incomingSocketReceiveTimeout.isNone()):
# it does not matter what timeout value we put here, as socket will be in
# connected state without outgoing packets in buffer so any timeout hit will
# just double rto without any penalties
(Connected, milliseconds(0))
else:
let timeout = cfg.incomingSocketReceiveTimeout.unsafeGet()
(SynRecv, timeout)
UtpSocket[A].new(
to,
snd,
initialState,
cfg,
Incoming,
connectionId + 1,
connectionId,
initialSeqNr,
ackNr,
initialTimeout
)
proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} =
doAssert(socket.state == SynSent)
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize())
notice "Sending syn packet packet", packet = packet
# set number of transmissions to 1 as syn packet will be send just after
# initiliazation
let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false, 0)
socket.registerOutgoingPacket(outgoingPacket)
socket.startWriteLoop()
socket.startTimeoutLoop()
await socket.sendData(outgoingPacket.packetBytes)
await socket.connectionFuture
proc startIncomingSocket*(socket: UtpSocket) {.async.} =
# Make sure ack was flushed before moving forward
await socket.sendAck()
socket.startWriteLoop()
socket.startTimeoutLoop()
proc isConnected*(socket: UtpSocket): bool =
socket.state == Connected
proc isClosed*(socket: UtpSocket): bool =
socket.state == Destroy and socket.closeEvent.isSet()
proc destroy*(s: UtpSocket) =
## Moves socket to destroy state and clean all reasources.
## Remote is not notified in any way about socket end of life
s.state = Destroy
s.writeLoop.cancel()
s.checkTimeoutsLoop.cancel()
s.closeEvent.fire()
proc destroyWait*(s: UtpSocket) {.async.} =
## Moves socket to destroy state and clean all reasources and wait for all registered
## callback to fire
## Remote is not notified in any way about socket end of life
s.destroy()
await s.closeEvent.wait()
await allFutures(s.closeCallbacks)
proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async.} =
## Set callback which will be called whenever the socket is permanently closed
try:
await s.closeEvent.wait()
cb()
except CancelledError:
trace "closeCallback cancelled"
proc registerCloseCallback*(s: UtpSocket, cb: SocketCloseCallback) =
s.closeCallbacks.add(s.setCloseCallback(cb))
proc updateTimeouts(socket: UtpSocket, timeSent: Moment, currentTime: Moment) =
## Update timeouts according to spec:
## delta = rtt - packet_rtt
## rtt_var += (abs(delta) - rtt_var) / 4;
## rtt += (packet_rtt - rtt) / 8;
let packetRtt = currentTime - timeSent
if (socket.rtt.isZero):
socket.rtt = packetRtt
socket.rttVar = packetRtt div 2
else:
let packetRttMicro = packetRtt.microseconds()
let rttVarMicro = socket.rttVar.microseconds()
let rttMicro = socket.rtt.microseconds()
let delta = rttMicro - packetRttMicro
let newVar = microseconds(rttVarMicro + (abs(delta) - rttVarMicro) div 4)
let newRtt = socket.rtt - (socket.rtt div 8) + (packetRtt div 8)
socket.rttVar = newVar
socket.rtt = newRtt
# according to spec it should be: timeout = max(rtt + rtt_var * 4, 500)
# but usually spec lags after implementation so milliseconds(1000) is used
socket.rto = max(socket.rtt + (socket.rttVar * 4), milliseconds(1000))
proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult =
let packetOpt = socket.outBuffer.get(seqNr)
if packetOpt.isSome():
let packet = packetOpt.get()
if packet.transmissions == 0:
# according to reference impl it can happen when we get an ack_nr that
# does not exceed what we have stuffed into the outgoing buffer,
# but does exceed what we have sent
# TODO analyze if this case can happen with our impl
return PacketNotSentYet
socket.outBuffer.delete(seqNr)
# from spec: The rtt and rtt_var is only updated for packets that were sent only once.
# This avoids problems with figuring out which packet was acked, the first or the second one.
# it is standard solution to retransmission ambiguity problem
if packet.transmissions == 1:
socket.updateTimeouts(packet.timeSent, currentTime)
socket.retransmitTimeout = socket.rto
socket.rtoTimeout = currentTime + socket.rto
# if need_resend is set, this packet has already
# been considered timed-out, and is not included in
# the cur_window anymore
if (not packet.needResend):
socket.sendBufferTracker.decreaseCurrentWindow(packet.payloadLength, notifyWaiters = true)
socket.retransmitCount = 0
PacketAcked
else:
# the packet has already been acked (or not sent)
PacketAlreadyAcked
proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16, currentTime: Moment) =
## Ack packets in outgoing buffer based on ack number in the received packet
var i = 0
while i < int(nrPacketsToack):
let result = socket.ackPacket(socket.seqNr - socket.curWindowPackets, currentTime)
case result
of PacketAcked:
dec socket.curWindowPackets
of PacketAlreadyAcked:
dec socket.curWindowPackets
of PacketNotSentYet:
debug "Tried to ack packed which was not sent yet"
break
inc i
proc calculateAckedbytes(socket: UtpSocket, nrPacketsToAck: uint16, now: Moment): (uint32, Duration) =
var i: uint16 = 0
var ackedBytes: uint32 = 0
var minRtt: Duration = InfiniteDuration
while i < nrPacketsToack:
let seqNr = socket.seqNr - socket.curWindowPackets + i
let packetOpt = socket.outBuffer.get(seqNr)
if (packetOpt.isSome() and packetOpt.unsafeGet().transmissions > 0):
let packet = packetOpt.unsafeGet()
ackedBytes = ackedBytes + packet.payloadLength
# safety check in case clock is not monotonic
if packet.timeSent < now:
minRtt = min(minRtt, now - packet.timeSent)
else:
minRtt = min(minRtt, microseconds(50000))
inc i
(ackedBytes, minRtt)
proc initializeAckNr(socket: UtpSocket, packetSeqNr: uint16) =
if (socket.state == SynSent):
socket.ackNr = packetSeqNr - 1
proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool =
let ackWindow = max(socket.curWindowPackets + allowedAckWindow, allowedAckWindow)
(
(packet.header.pType != ST_SYN or socket.state != SynRecv) and
(
# packet ack number must be smaller than our last send packet i.e
# remote should not ack packets from the future
wrapCompareLess(socket.seqNr - 1, packet.header.ackNr) or
# packet ack number should not be too old
wrapCompareLess(packet.header.ackNr, socket.seqNr - 1 - ackWindow)
)
)
# TODO at socket level we should handle only FIN/DATA/ACK packets. Refactor to make
# it enforcable by type system
# TODO re-think synchronization of this procedure, as each await inside gives control
# to scheduler which means there could be potentialy several processPacket procs
# running
proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
let timestampInfo = getMonoTimestamp()
if socket.isAckNrInvalid(p):
notice "Received packet with invalid ack nr"
return
## Updates socket state based on received packet, and sends ack when necessary.
## Shoyuld be called in main packet receiving loop
let pkSeqNr = p.header.seqNr
let pkAckNr = p.header.ackNr
socket.initializeAckNr(pkSeqNr)
# number of packets past the expected
# ack_nr is the last acked, seq_nr is the
# current. Subtracring 1 makes 0 mean "this is the next expected packet"
let pastExpected = pkSeqNr - socket.ackNr - 1
# acks is the number of packets that was acked, in normal case - no selective
# acks, no losses, no resends, it will usually be equal to 1
# we can calculate it here and not only for ST_STATE packet, as each utp
# packet has info about remote side last acked packet.
var acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets)
if acks > socket.curWindowPackets:
# this case happens if the we already received this ack nr
acks = 0
# If packet is totally of the mark short circout the processing
if pastExpected >= reorderBufferMaxSize:
notice "Received packet is totally of the mark"
return
var (ackedBytes, minRtt) = socket.calculateAckedbytes(acks, timestampInfo.moment)
# TODO caluclate bytes acked by selective acks here (if thats the case)
let sentTimeRemote = p.header.timestamp
# we are using uint32 not a Duration, to wrap a round in case of
# sentTimeRemote > receipTimestamp. This can happen as local and remote
# clock can be not synchornized or even using different system clock.
# i.e this number itself does not tell anything and is only used to feedback it
# to remote peer with each sent packet
let remoteDelay =
if (sentTimeRemote == 0):
0'u32
else:
timestampInfo.timestamp - sentTimeRemote
socket.replayMicro = remoteDelay
let prevRemoteDelayBase = socket.remoteHistogram.delayBase
if (remoteDelay != 0):
socket.remoteHistogram.addSample(remoteDelay, timestampInfo.moment)
# remote new delay base is less than previous
# shift our delay base in other direction to take clock skew into account
# but no more than 10ms
if (prevRemoteDelayBase != 0 and
wrapCompareLess(socket.remoteHistogram.delayBase, prevRemoteDelayBase) and
prevRemoteDelayBase - socket.remoteHistogram.delayBase <= 10000'u32):
socket.ourHistogram.shift(prevRemoteDelayBase - socket.remoteHistogram.delayBase)
let actualDelay = p.header.timestampDiff
if actualDelay != 0:
socket.ourHistogram.addSample(actualDelay, timestampInfo.moment)
socket.driftCalculator.addSample(actualDelay, timestampInfo.moment)
# adjust base delay if delay estimates exceeds rtt
if (socket.ourHistogram.getValue() > minRtt):
let diff = uint32((socket.ourHistogram.getValue() - minRtt).microseconds())
socket.ourHistogram.shift(diff)
let (newMaxWindow, newSlowStartTreshold, newSlowStart) =
applyCongestionControl(
socket.sendBufferTracker.maxWindow,
socket.slowStart,
socket.slowStartTreshold,
socket.socketConfig.optSndBuffer,
uint32(socket.getPacketSize()),
microseconds(actualDelay),
ackedBytes,
minRtt,
socket.ourHistogram.getValue(),
socket.driftCalculator.clockDrift
)
# update remote window size and max window
socket.sendBufferTracker.updateMaxWindowSize(p.header.wndSize, newMaxWindow)
socket.slowStart = newSlowStart
socket.slowStartTreshold = newSlowStartTreshold
if (socket.sendBufferTracker.maxRemoteWindow == 0):
# when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0
# then it will be reset to minimal value
socket.zeroWindowTimer = timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout
# socket.curWindowPackets == acks means that this packet acked all remaining packets
# including the sent fin packets
if (socket.finSent and socket.curWindowPackets == acks):
notice "FIN acked, destroying socket"
socket.finAcked = true
# this bit of utp spec is a bit under specified (i.e there is not specification at all)
# reference implementation moves socket to destroy state in case that our fin was acked
# and socket is considered closed for reading and writing.
# but in theory remote could stil write some data on this socket (or even its own fin)
socket.destroy()
socket.ackPackets(acks, timestampInfo.moment)
case p.header.pType
of ST_DATA, ST_FIN:
# To avoid amplification attacks, server socket is in SynRecv state until
# it receices first data transfer
# https://www.usenix.org/system/files/conference/woot15/woot15-paper-adamsky.pdf
# Socket is in SynRecv state only when recv timeout is configured
if (socket.state == SynRecv and p.header.pType == ST_DATA):
socket.state = Connected
if (p.header.pType == ST_FIN and (not socket.gotFin)):
socket.gotFin = true
socket.eofPktNr = pkSeqNr
# we got in order packet
if (pastExpected == 0 and (not socket.reachedFin)):
if (len(p.payload) > 0 and (not socket.readShutdown)):
# we are getting in order data packet, we can flush data directly to the incoming buffer
await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len())
# Bytes have been passed to upper layer, we can increase number of last
# acked packet
inc socket.ackNr
# check if the following packets are in reorder buffer
while true:
# We are doing this in reoreder loop, to handle the case when we already received
# fin but there were some gaps before eof
# we have reached remote eof, and should not receive more packets from remote
if ((not socket.reachedFin) and socket.gotFin and socket.eofPktNr == socket.ackNr):
notice "Reached socket EOF"
# In case of reaching eof, it is up to user of library what to to with
# it. With the current implementation, the most apropriate way would be to
# destory it (as with our implementation we know that remote is destroying its acked fin)
# as any other send will either generate timeout, or socket will be forcefully
# closed by reset
socket.reachedFin = true
# this is not necessarily true, but as we have already reached eof we can
# ignore following packets
socket.reorderCount = 0
# notify all readers we have reached eof
socket.buffer.forget()
if socket.reorderCount == 0:
break
let nextPacketNum = socket.ackNr + 1
let maybePacket = socket.inBuffer.get(nextPacketNum)
if maybePacket.isNone():
break
let packet = maybePacket.unsafeGet()
if (len(packet.payload) > 0 and (not socket.readShutdown)):
await upload(addr socket.buffer, unsafeAddr packet.payload[0], packet.payload.len())
socket.inBuffer.delete(nextPacketNum)
inc socket.ackNr
dec socket.reorderCount
# TODO for now we just schedule concurrent task with ack sending. It may
# need improvement, as with this approach there is no direct control over
# how many concurrent tasks there are and how to cancel them when socket
# is closed
asyncSpawn socket.sendAck()
# we got packet out of order
else:
notice "Got out of order packet"
if (socket.gotFin and pkSeqNr > socket.eofPktNr):
notice "Got packet past eof"
return
# growing buffer before checking the packet is already there to avoid
# looking at older packet due to indices wrap aroud
socket.inBuffer.ensureSize(pkSeqNr + 1, pastExpected + 1)
if (socket.inBuffer.get(pkSeqNr).isSome()):
notice "packet already received"
else:
socket.inBuffer.put(pkSeqNr, p)
inc socket.reorderCount
notice "added out of order packet in reorder buffer"
# TODO for now we do not sent any ack as we do not handle selective acks
# add sending of selective acks
of ST_STATE:
if (socket.state == SynSent and (not socket.connectionFuture.finished())):
socket.state = Connected
# TODO reference implementation sets ackNr (p.header.seqNr - 1), although
# spec mention that it should be equal p.header.seqNr. For now follow the
# reference impl to be compatible with it. Later investigate trin compatibility.
socket.ackNr = p.header.seqNr - 1
# In case of SynSent complate the future as last thing to make sure user of libray will
# receive socket in correct state
socket.connectionFuture.complete()
of ST_RESET:
notice "Received ST_RESET on known socket, ignoring"
of ST_SYN:
notice "Received ST_SYN on known socket, ignoring"
proc atEof*(socket: UtpSocket): bool =
# socket is considered at eof when remote side sent us fin packet
# and we have processed all packets up to fin
socket.buffer.dataLen() == 0 and socket.reachedFin
proc readingClosed(socket: UtpSocket): bool =
socket.atEof() or socket.state == Destroy
proc close*(socket: UtpSocket) =
## Gracefully closes conneciton (send FIN) if socket is in connected state
## does not wait for socket to close
if socket.state != Destroy:
case socket.state
of Connected:
socket.readShutdown = true
if (not socket.sendFinRequested):
try:
# with this approach, all pending writes will be executed before sending fin packet
# we could also and method which places close request as first one to process
# but it would complicate the write loop
socket.writeQueue.putNoWait(WriteRequest(kind: Close))
except AsyncQueueFullError as e:
# should not happen as our write queue is unbounded
raiseAssert e.msg
socket.sendFinRequested = true
else:
# In any other case like connection is not established so sending fin make
# no sense, we can just out right close it
socket.destroy()
proc closeWait*(socket: UtpSocket) {.async.} =
## Gracefully closes conneciton (send FIN) if socket is in connected state
## and waits for socket to be closed.
## Warning: if FIN packet for some reason will be lost, then socket will be closed
## due to retransmission failure which may take some time.
## default is 4 retransmissions with doubling of rto between each retranssmision
socket.close()
await socket.closeEvent.wait()
proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] =
let retFuture = newFuture[WriteResult]("UtpSocket.write")
if (socket.state != Connected):
let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
retFuture.complete(res)
return retFuture
# fin should be last packet received by remote side, therefore trying to write
# after sending fin is considered error
if socket.sendFinRequested or socket.finSent:
let res = Result[int, WriteError].err(WriteError(kind: FinSent))
retFuture.complete(res)
return retFuture
var bytesWritten = 0
if len(data) == 0:
let res = Result[int, WriteError].ok(bytesWritten)
retFuture.complete(res)
return retFuture
try:
socket.writeQueue.putNoWait(WriteRequest(kind: Data, data: data, writer: retFuture))
except AsyncQueueFullError as e:
# this should not happen as out write queue is unbounded
raiseAssert e.msg
return retFuture
template readLoop(body: untyped): untyped =
while true:
let (consumed, done) = body
socket.buffer.shift(consumed)
if done:
break
else:
if not(socket.readingClosed()):
await socket.buffer.wait()
proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}=
## Read all bytes `n` bytes from socket ``socket``.
##
## This procedure allocates buffer seq[byte] and return it as result.
var bytes = newSeq[byte]()
if n == 0:
return bytes
readLoop():
if socket.readingClosed():
(0, true)
else:
let count = min(socket.buffer.dataLen(), n - len(bytes))
bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1))
(count, len(bytes) == n)
return bytes
proc read*(socket: UtpSocket): Future[seq[byte]] {.async.}=
## Read all bytes from socket ``socket``.
##
## This procedure allocates buffer seq[byte] and return it as result.
var bytes = newSeq[byte]()
readLoop():
if socket.readingClosed():
(0, true)
else:
let count = socket.buffer.dataLen()
bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1))
(count, false)
return bytes
# Check how many packets are still in the out going buffer, usefull for tests or
# debugging.
# It throws assertion error when number of elements in buffer do not equal kept counter
proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int =
var num = 0
for e in socket.outBuffer.items():
if e.isSome():
inc num
doAssert(num == int(socket.curWindowPackets))
num
# Check how many payload bytes are still in flight
proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.sendBufferTracker.currentBytesInFlight()
# Check how many packets are still in the reorder buffer, usefull for tests or
# debugging.
# It throws assertion error when number of elements in buffer do not equal kept counter
proc numPacketsInReordedBuffer*(socket: UtpSocket): int =
var num = 0
for e in socket.inBUffer.items():
if e.isSome():
inc num
doAssert(num == int(socket.reorderCount))
num
proc connectionId*[A](socket: UtpSocket[A]): uint16 =
## Connection id is id which is used in first SYN packet which establishes the connection
## so for Outgoing side it is actually its rcv_id, and for Incoming side it is
## its snd_id
case socket.direction
of Incoming:
socket.connectionIdSnd
of Outgoing:
socket.connectionIdRcv
# Check what is current available window size for this socket
proc currentMaxWindowSize*[A](socket: UtpSocket[A]): uint32 =
socket.sendBufferTracker.maxWindow