Add timeout loop (#416)

* Modify outbuffer

Each element of outbuffer keeps encoded packet ,number
of transmissions of givern packet and information if
given packet needs to be re-send.

* Add initial handling of timeouts

* Add tests for syn re-sends
This commit is contained in:
KonradStaniec 2021-10-25 09:58:13 +02:00 committed by GitHub
parent d34d3409da
commit fd4f78d1c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 449 additions and 53 deletions

View File

@ -1,5 +1,5 @@
import import
std/[options, math] std/[options, math, sugar]
export options export options
@ -10,9 +10,10 @@ export options
# utp implementation. # utp implementation.
# Another alternative would be to use standard deque from deques module, and caluclate # Another alternative would be to use standard deque from deques module, and caluclate
# item indexes from their sequence numbers. # item indexes from their sequence numbers.
type GrowableCircularBuffer*[A] = object type
items: seq[Option[A]] GrowableCircularBuffer*[A] = object
mask: int items: seq[Option[A]]
mask: int
# provided size will always be adjusted to next power of two # provided size will always be adjusted to next power of two
proc init*[A](T: type GrowableCircularBuffer[A], size: Natural = 16): T = proc init*[A](T: type GrowableCircularBuffer[A], size: Natural = 16): T =
@ -34,6 +35,22 @@ proc put*[A](buff: var GrowableCircularBuffer[A], i: Natural, elem: A) =
proc delete*[A](buff: var GrowableCircularBuffer[A], i: Natural) = proc delete*[A](buff: var GrowableCircularBuffer[A], i: Natural) =
buff.putImpl(i, none[A]()) buff.putImpl(i, none[A]())
proc hasKey*[A](buff: GrowableCircularBuffer[A], i: Natural): bool =
buff.get(i).isSome()
proc exists*[A](buff: GrowableCircularBuffer[A], i: Natural, check: proc (x: A): bool): bool =
let maybeElem = buff.get(i)
if (maybeElem.isSome()):
let elem = maybeElem.unsafeGet()
check(elem)
else:
false
proc `[]`*[A](buff: var GrowableCircularBuffer[A], i: Natural): var A =
## Returns contents of the `var GrowableCircularBuffer`. If it is not set, then an exception
## is thrown.
buff.items[i and buff.mask].get()
proc len*[A](buff: GrowableCircularBuffer[A]): int = proc len*[A](buff: GrowableCircularBuffer[A]): int =
buff.mask + 1 buff.mask + 1

View File

@ -37,7 +37,7 @@ when isMainModule:
let helloUtp = "Helllo from nim implementation" let helloUtp = "Helllo from nim implementation"
let bytes = helloUtp.toBytes() let bytes = helloUtp.toBytes()
waitFor soc.write(bytes) discard waitFor soc.write(bytes)
runForever() runForever()

View File

@ -7,7 +7,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/[tables, options, hashes], std/[tables, options, hashes, sugar, math],
chronos, chronicles, bearssl, chronos, chronicles, bearssl,
./packets, ./packets,
./growable_buffer, ./growable_buffer,
@ -30,6 +30,12 @@ type
UtpSocketKey = object UtpSocketKey = object
remoteAddress: TransportAddress remoteAddress: TransportAddress
rcvId: uint16 rcvId: uint16
OutgoingPacket = object
packetBytes: seq[byte]
transmissions: uint16
needResend: bool
timeSent: Moment
UtpSocket* = ref object UtpSocket* = ref object
remoteAddress*: TransportAddress remoteAddress*: TransportAddress
@ -43,10 +49,8 @@ type
# All seq number up to this havve been correctly acked by us # All seq number up to this havve been correctly acked by us
ackNr: uint16 ackNr: uint16
# Should be completed after succesful connection to remote host. # Should be completed after succesful connection to remote host or after timeout
# TODO check if nim gc handles properly cyclic references, as this future will # for the first syn packet
# contain reference to socket which hold this future.
# If that is not the case, then this future will need to be hold independly
connectionFuture: Future[UtpSocket] connectionFuture: Future[UtpSocket]
# the number of packets in the send queue. Packets that haven't # the number of packets in the send queue. Packets that haven't
@ -55,14 +59,40 @@ type
curWindowPackets: uint16 curWindowPackets: uint16
# out going buffer for all send packets # out going buffer for all send packets
outBuffer: GrowableCircularBuffer[Packet] outBuffer: GrowableCircularBuffer[OutgoingPacket]
# incoming buffer for out of order packets # incoming buffer for out of order packets
inBuffer: GrowableCircularBuffer[Packet] inBuffer: GrowableCircularBuffer[Packet]
# current retransmit Timeout used to calculate rtoTimeout
retransmitTimeout: Duration
# calculated round trip time during communication with remote peer
rtt: Duration
# calculated round trip time variance
rttVar: Duration
# Round trip timeout dynamicaly updated based on acks received from remote
# peer
rto: Duration
# RTO timeout will happen when currenTime > rtoTimeout
rtoTimeout: Moment
# rcvBuffer # rcvBuffer
buffer: AsyncBuffer buffer: AsyncBuffer
# loop called every 500ms to check for on going timeout status
checkTimeoutsLoop: Future[void]
# number on consecutive re-transsmisions
retransmitCount: uint32
# Event which will complete whenever socket gets in destory statate
closeEvent: AsyncEvent
# All callback to be called whenever socket gets in destroy state
closeCallbacks: seq[Future[void]]
utpProt: UtpProtocol utpProt: UtpProtocol
UtpSocketsContainerRef = ref object UtpSocketsContainerRef = ref object
@ -71,6 +101,12 @@ type
AckResult = enum AckResult = enum
PacketAcked, PacketAlreadyAcked, PacketNotSentYet PacketAcked, PacketAlreadyAcked, PacketNotSentYet
SocketConfig* = object
# This is configurable (in contrast to reference impl), as with standard 2 syn resends
# default timeout set to 3seconds and doubling of timeout with each re-send, it
# means that initial connection would timeout after 21s, which seems rather long
initialSynTimeout*: Duration
# For now utp protocol is tied to udp transport, but ultimatly we would like to # For now utp protocol is tied to udp transport, but ultimatly we would like to
# abstract underlying transport to be able to run utp over udp, discoveryv5 or # abstract underlying transport to be able to run utp over udp, discoveryv5 or
# maybe some test transport # maybe some test transport
@ -78,14 +114,21 @@ type
transport: DatagramTransport transport: DatagramTransport
activeSockets: UtpSocketsContainerRef activeSockets: UtpSocketsContainerRef
acceptConnectionCb: AcceptConnectionCallback acceptConnectionCb: AcceptConnectionCallback
socketConfig: SocketConfig
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
## New remote client connection callback # New remote client connection callback
## ``server`` - UtpProtocol object. # ``server`` - UtpProtocol object.
## ``client`` - accepted client utp socket. # ``client`` - accepted client utp socket.
AcceptConnectionCallback* = proc(server: UtpProtocol, AcceptConnectionCallback* = proc(server: UtpProtocol,
client: UtpSocket): Future[void] {.gcsafe, raises: [Defect].} client: UtpSocket): Future[void] {.gcsafe, raises: [Defect].}
# Callback to be called whenever socket is closed
SocketCloseCallback = proc (): void {.gcsafe, raises: [Defect].}
ConnectionError* = object of CatchableError
const const
# Maximal number of payload bytes per packet. Total packet size will be equal to # Maximal number of payload bytes per packet. Total packet size will be equal to
# mtuSize + sizeof(header) = 600 bytes # mtuSize + sizeof(header) = 600 bytes
@ -93,12 +136,33 @@ const
# adjusted based on traffic. # adjusted based on traffic.
mtuSize = 580 mtuSize = 580
# How often each socket check its different on going timers
checkTimeoutsLoopInterval = milliseconds(500)
# Defualt initial timeout for first Syn packet
defaultInitialSynTimeout = milliseconds(3000)
# Initial timeout to receive first Data data packet after receiving initial Syn
# packet. (TODO it should only be set when working over udp)
initialRcvRetransmitTimeout = milliseconds(10000)
proc new(T: type UtpSocketsContainerRef): T = proc new(T: type UtpSocketsContainerRef): T =
UtpSocketsContainerRef(sockets: initTable[UtpSocketKey, UtpSocket]()) UtpSocketsContainerRef(sockets: initTable[UtpSocketKey, UtpSocket]())
proc init(T: type UtpSocketKey, remoteAddress: TransportAddress, rcvId: uint16): T = proc init(T: type UtpSocketKey, remoteAddress: TransportAddress, rcvId: uint16): T =
UtpSocketKey(remoteAddress: remoteAddress, rcvId: rcvId) UtpSocketKey(remoteAddress: remoteAddress, rcvId: rcvId)
proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16, needResend: bool, timeSent: Moment = Moment.now()): T =
OutgoingPacket(
packetBytes: packetBytes,
transmissions: transmissions,
needResend: needResend,
timeSent: timeSent
)
proc init*(T: type SocketConfig, initialSynTimeout: Duration = defaultInitialSynTimeout): T =
SocketConfig(initialSynTimeout: initialSynTimeout)
# This should probably be defined in TransportAddress module, as hash function should # This should probably be defined in TransportAddress module, as hash function should
# be consitent with equality function # be consitent with equality function
# in nim zero arrays always have hash equal to 0, irrespectively of array size, to # in nim zero arrays always have hash equal to 0, irrespectively of array size, to
@ -132,6 +196,17 @@ proc hash(x: UtpSocketKey): Hash =
h = h !& x.rcvId.hash h = h !& x.rcvId.hash
!$h !$h
proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async.} =
## Set callback which will be called whenever the socket is permanently closed
try:
await s.closeEvent.wait()
cb()
except CancelledError:
trace "closeCallback cancelled"
proc registerCloseCallback*(s: UtpSocket, cb: SocketCloseCallback) =
s.closeCallbacks.add(s.setCloseCallback(cb))
proc getUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey): Option[UtpSocket] = proc getUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey): Option[UtpSocket] =
let s = s.sockets.getOrDefault(k) let s = s.sockets.getOrDefault(k)
if s == nil: if s == nil:
@ -143,11 +218,23 @@ proc registerUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey, socket: UtpSo
# TODO Handle duplicates # TODO Handle duplicates
s.sockets[k] = socket s.sockets[k] = socket
proc initOutgoingSocket(to: TransportAddress, p: UtpProtocol, rng: var BrHmacDrbgContext): UtpSocket = proc deRegisterUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey) =
s.sockets.del(k)
iterator allSockets(s: UtpSocketsContainerRef): UtpSocket =
for socket in s.sockets.values():
yield socket
proc len(s: UtpSocketsContainerRef): int =
len(s.sockets)
# TODO extract similiar code between Outgoinhg and Incoming socket initialization
proc initOutgoingSocket(to: TransportAddress, p: UtpProtocol, cfg: SocketConfig, rng: var BrHmacDrbgContext): UtpSocket =
# TODO handle possible clashes and overflows # TODO handle possible clashes and overflows
let rcvConnectionId = randUint16(rng) let rcvConnectionId = randUint16(rng)
let sndConnectionId = rcvConnectionId + 1 let sndConnectionId = rcvConnectionId + 1
let initialSeqNr = randUint16(rng) let initialSeqNr = randUint16(rng)
UtpSocket( UtpSocket(
remoteAddress: to, remoteAddress: to,
state: SynSent, state: SynSent,
@ -155,11 +242,19 @@ proc initOutgoingSocket(to: TransportAddress, p: UtpProtocol, rng: var BrHmacDrb
connectionIdSnd: sndConnectionId, connectionIdSnd: sndConnectionId,
seqNr: initialSeqNr, seqNr: initialSeqNr,
connectionFuture: newFuture[UtpSocket](), connectionFuture: newFuture[UtpSocket](),
outBuffer: GrowableCircularBuffer[Packet].init(), outBuffer: GrowableCircularBuffer[OutgoingPacket].init(),
inBuffer: GrowableCircularBuffer[Packet].init(), inBuffer: GrowableCircularBuffer[Packet].init(),
retransmitTimeout: cfg.initialSynTimeout,
rtoTimeout: Moment.now() + cfg.initialSynTimeout,
# Initial timeout values taken from reference implemntation
rtt: milliseconds(0),
rttVar: milliseconds(800),
rto: milliseconds(3000),
# Default 1MB buffer # Default 1MB buffer
# TODO add posibility to configure buffer size # TODO add posibility to configure buffer size
buffer: AsyncBuffer.init(1024 * 1024), buffer: AsyncBuffer.init(1024 * 1024),
closeEvent: newAsyncEvent(),
closeCallbacks: newSeq[Future[void]](),
utpProt: p utpProt: p
) )
@ -173,11 +268,19 @@ proc initIncomingSocket(to: TransportAddress, p: UtpProtocol, connectionId: uin
seqNr: initialSeqNr, seqNr: initialSeqNr,
ackNr: ackNr, ackNr: ackNr,
connectionFuture: newFuture[UtpSocket](), connectionFuture: newFuture[UtpSocket](),
outBuffer: GrowableCircularBuffer[Packet].init(), outBuffer: GrowableCircularBuffer[OutgoingPacket].init(),
inBuffer: GrowableCircularBuffer[Packet].init(), inBuffer: GrowableCircularBuffer[Packet].init(),
retransmitTimeout: initialRcvRetransmitTimeout,
rtoTimeout: Moment.now() + initialRcvRetransmitTimeout,
# Initial timeout values taken from reference implemntation
rtt: milliseconds(0),
rttVar: milliseconds(800),
rto: milliseconds(3000),
# Default 1MB buffer # Default 1MB buffer
# TODO add posibility to configure buffer size # TODO add posibility to configure buffer size
buffer: AsyncBuffer.init(1024 * 1024), buffer: AsyncBuffer.init(1024 * 1024),
closeEvent: newAsyncEvent(),
closeCallbacks: newSeq[Future[void]](),
utpProt: p utpProt: p
) )
@ -185,16 +288,68 @@ proc createAckPacket(socket: UtpSocket): Packet =
## Creates ack packet based on the socket current state ## Creates ack packet based on the socket current state
ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576) ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576)
proc max(a, b: Duration): Duration =
if (a > b):
a
else:
b
proc updateTimeouts(socket: UtpSocket, timeSent: Moment, currentTime: Moment) =
## Update timeouts according to spec:
## delta = rtt - packet_rtt
## rtt_var += (abs(delta) - rtt_var) / 4;
## rtt += (packet_rtt - rtt) / 8;
let packetRtt = currentTime - timeSent
if (socket.rtt.isZero):
socket.rtt = packetRtt
socket.rttVar = packetRtt div 2
else:
let packetRttMicro = packetRtt.microseconds()
let rttVarMicro = socket.rttVar.microseconds()
let rttMicro = socket.rtt.microseconds()
let delta = rttMicro - packetRttMicro
let newVar = microseconds(rttVarMicro + (abs(delta) - rttVarMicro) div 4)
let newRtt = socket.rtt - (socket.rtt div 8) + (packetRtt div 8)
socket.rttVar = newVar
socket.rtt = newRtt
# according to spec it should be: timeout = max(rtt + rtt_var * 4, 500)
# but usually spec lags after implementation so milliseconds(1000) is used
socket.rto = max(socket.rtt + (socket.rttVar * 4), milliseconds(1000))
proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult = proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult =
let packetOpt = socket.outBuffer.get(seqNr) let packetOpt = socket.outBuffer.get(seqNr)
if packetOpt.isSome(): if packetOpt.isSome():
let packet = packetOpt.get() let packet = packetOpt.get()
# TODO Add number of transmision to each packet to track which packet was sent
# how many times, and handle here case when we try to ack packet which was not if packet.transmissions == 0:
# sent yet # according to reference impl it can happen when we get an ack_nr that
# does not exceed what we have stuffed into the outgoing buffer,
# but does exceed what we have sent
# TODO analyze if this case can happen with our impl
return PacketNotSentYet
let currentTime = Moment.now()
socket.outBuffer.delete(seqNr) socket.outBuffer.delete(seqNr)
# TODO Update estimates about roundtrip time, when we are acking packed which
# acked without re sends # from spec: The rtt and rtt_var is only updated for packets that were sent only once.
# This avoids problems with figuring out which packet was acked, the first or the second one.
# it is standard solution to retransmission ambiguity problem
if packet.transmissions == 1:
socket.updateTimeouts(packet.timeSent, currentTime)
socket.retransmitTimeout = socket.rto
socket.rtoTimeout = currentTime + socket.rto
# TODO Add handlig of decreasing bytes window, whenadding handling of congestion control
socket.retransmitCount = 0
PacketAcked PacketAcked
else: else:
# the packet has already been acked (or not sent) # the packet has already been acked (or not sent)
@ -218,15 +373,6 @@ proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) =
proc getSocketKey(socket: UtpSocket): UtpSocketKey = proc getSocketKey(socket: UtpSocket): UtpSocketKey =
UtpSocketKey.init(socket.remoteAddress, socket.connectionIdRcv) UtpSocketKey.init(socket.remoteAddress, socket.connectionIdRcv)
proc initSynPacket(socket: UtpSocket): seq[byte] =
assert(socket.state == SynSent)
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576)
socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets)
socket.outBuffer.put(socket.seqNr, packet)
inc socket.seqNr
inc socket.curWindowPackets
encodePacket(packet)
proc isConnected*(socket: UtpSocket): bool = proc isConnected*(socket: UtpSocket): bool =
socket.state == Connected socket.state == Connected
@ -258,21 +404,31 @@ proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] =
proc sendPacket(socket: UtpSocket, packet: Packet): Future[void] = proc sendPacket(socket: UtpSocket, packet: Packet): Future[void] =
socket.sendData(encodePacket(packet)) socket.sendData(encodePacket(packet))
# Should be called before flushing data onto the socket
proc setSend(p: var OutgoingPacket): seq[byte] =
inc p.transmissions
p.needResend = false
p.timeSent = Moment.now()
return p.packetBytes
proc flushPackets(socket: UtpSocket) {.async.} = proc flushPackets(socket: UtpSocket) {.async.} =
var i: uint16 = socket.seqNr - socket.curWindowPackets var i: uint16 = socket.seqNr - socket.curWindowPackets
while i != socket.seqNr: while i != socket.seqNr:
let maybePacket = socket.outBuffer.get(i) # sending only packet which were not transmitted yet or need a resend
if (maybePacket.isSome()): let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true))
let p = maybePacket.get() if (shouldSendPacket):
# TODO we should keep encoded packets in outgoing buffer to avoid, re-encoding let toSend = setSend(socket.outBuffer[i])
# them with each resend await socket.sendData(toSend)
await socket.sendData(encodePacket(p))
inc i inc i
proc getPacketSize(socket: UtpSocket): int = proc getPacketSize(socket: UtpSocket): int =
# TODO currently returning constant, ultimatly it should be bases on mtu estimates # TODO currently returning constant, ultimatly it should be bases on mtu estimates
mtuSize mtuSize
proc resetSendTimeout(socket: UtpSocket) =
socket.retransmitTimeout = socket.rto
socket.rtoTimeout = Moment.now() + socket.retransmitTimeout
proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} = proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} =
var bytesWritten = 0 var bytesWritten = 0
# TODO # TODO
@ -283,6 +439,9 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} =
if len(data) == 0: if len(data) == 0:
return bytesWritten return bytesWritten
if socket.curWindowPackets == 0:
socket.resetSendTimeout()
let pSize = socket.getPacketSize() let pSize = socket.getPacketSize()
let endIndex = data.high() let endIndex = data.high()
var i = 0 var i = 0
@ -292,7 +451,7 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} =
let dataSlice = data[i..lastOrEnd] let dataSlice = data[i..lastOrEnd]
let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576, dataSlice) let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576, dataSlice)
socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets) socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets)
socket.outBuffer.put(socket.seqNr, dataPacket) socket.outBuffer.put(socket.seqNr, OutgoingPacket.init(encodePacket(dataPacket), 0, false))
inc socket.seqNr inc socket.seqNr
inc socket.curWindowPackets inc socket.curWindowPackets
bytesWritten = bytesWritten + len(dataSlice) bytesWritten = bytesWritten + len(dataSlice)
@ -317,6 +476,95 @@ proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}=
return bytes return bytes
proc isOpened(socket:UtpSocket): bool =
return (
socket.state == SynRecv or
socket.state == SynSent or
socket.state == Connected or
socket.state == ConnectedFull
)
proc markAllPacketAsLost(s: UtpSocket) =
var i = 0'u16
while i < s.curWindowPackets:
let packetSeqNr = s.seqNr - 1 - i
if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p. transmissions > 0 and p.needResend == false)):
s.outBuffer[packetSeqNr].needResend = true
# TODO here we should also decrease number of bytes in flight. This should be
# done when working on congestion control
inc i
proc checkTimeouts(socket: UtpSocket) {.async.} =
let currentTime = Moment.now()
# flush all packets which needs to be re-send
if socket.state != Destroy:
await socket.flushPackets()
if socket.isOpened():
if (currentTime > socket.rtoTimeout):
# TODO add handling of probe time outs. Reference implemenation has mechanism
# of sending probes to determine mtu size. Probe timeouts do not count to standard
# timeouts calculations
# client initiated connections, but did not send following data packet in rto
# time. TODO this should be configurable
if (socket.state == SynRecv):
socket.state = Destroy
socket.closeEvent.fire()
return
if (socket.state == SynSent and socket.retransmitCount >= 2) or (socket.retransmitCount >= 4):
if socket.state == SynSent and (not socket.connectionFuture.finished()):
# TODO standard stream interface result in failed future in case of failed connections,
# but maybe it would be more clean to use result
socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out"))
socket.state = Destroy
socket.closeEvent.fire()
return
let newTimeout = socket.retransmitTimeout * 2
socket.retransmitTimeout = newTimeout
socket.rtoTimeout = currentTime + newTimeout
# TODO Add handling of congestion control
# This will have much more sense when we will add handling of selective acks
# as then every selecivly acked packet restes timeout timer and removes packet
# from out buffer.
markAllPacketAsLost(socket)
# resend oldest packet if there are some packets in flight
if (socket.curWindowPackets > 0):
notice "resending oldest packet in outBuffer"
inc socket.retransmitCount
let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets
# TODO add handling of fast timeout
doAssert(
socket.outBuffer.get(oldestPacketSeqNr).isSome(),
"oldest packet should always be available when there is data in flight"
)
let dataToSend = setSend(socket.outBuffer[oldestPacketSeqNr])
await socket.sendData(dataToSend)
# TODO add sending keep alives when necessary
proc checkTimeoutsLoop(s: UtpSocket) {.async.} =
## Loop that check timeoutsin the socket.
try:
while true:
await sleepAsync(checkTimeoutsLoopInterval)
await s.checkTimeouts()
except CancelledError:
trace "checkTimeoutsLoop canceled"
proc startTimeoutLoop(s: UtpSocket) =
s.checkTimeoutsLoop = checkTimeoutsLoop(s)
proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.async.}= proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.async.}=
notice "Received packet ", packet = p notice "Received packet ", packet = p
let socketKey = UtpSocketKey.init(sender, p.header.connectionId) let socketKey = UtpSocketKey.init(sender, p.header.connectionId)
@ -364,7 +612,6 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.asy
else: else:
# TODO handle out of order packets # TODO handle out of order packets
notice "Got out of order packet" notice "Got out of order packet"
of ST_FIN: of ST_FIN:
# TODO not implemented # TODO not implemented
notice "Received ST_FIN on known socket" notice "Received ST_FIN on known socket"
@ -372,10 +619,15 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.asy
notice "Received ST_STATE on known socket" notice "Received ST_STATE on known socket"
# acks is the number of packets that was acked, in normal case - no selective # acks is the number of packets that was acked, in normal case - no selective
# acks, no losses, no resends, it will usually be equal to 1 # acks, no losses, no resends, it will usually be equal to 1
let acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets) var acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets)
if acks > socket.curWindowPackets:
# this case happens if the we already received this ack nr
acks = 0
socket.ackPackets(acks) socket.ackPackets(acks)
if (socket.state == SynSent): if (socket.state == SynSent and (not socket.connectionFuture.finished())):
socket.state = Connected socket.state = Connected
# TODO reference implementation sets ackNr (p.header.seqNr - 1), although # TODO reference implementation sets ackNr (p.header.seqNr - 1), although
# spec mention that it should be equal p.header.seqNr. For now follow the # spec mention that it should be equal p.header.seqNr. For now follow the
@ -401,10 +653,14 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.asy
if (p.header.pType == ST_SYN): if (p.header.pType == ST_SYN):
# Initial ackNr is set to incoming packer seqNr # Initial ackNr is set to incoming packer seqNr
let incomingSocket = initIncomingSocket(sender, prot, p.header.connectionId, p.header.seqNr, prot.rng[]) let incomingSocket = initIncomingSocket(sender, prot, p.header.connectionId, p.header.seqNr, prot.rng[])
prot.activeSockets.registerUtpSocket(incomingSocket.getSocketKey(), incomingSocket) let socketKey = incomingSocket.getSocketKey()
prot.activeSockets.registerUtpSocket(socketKey, incomingSocket)
# whenever socket get permanently closed, deregister it
incomingSocket.registerCloseCallback(proc () = prot.activeSockets.deRegisterUtpSocket(socketKey))
# Make sure ack was flushed onto datagram socket before passing connction # Make sure ack was flushed onto datagram socket before passing connction
# to upper layer # to upper layer
await incomingSocket.sendPacket(incomingSocket.createAckPacket()) await incomingSocket.sendPacket(incomingSocket.createAckPacket())
incomingSocket.startTimeoutLoop()
# TODO By default (when we have utp over udp) socket here is passed to upper layer # TODO By default (when we have utp over udp) socket here is passed to upper layer
# in SynRecv state, which is not writeable i.e user of socket cannot write # in SynRecv state, which is not writeable i.e user of socket cannot write
# data to it unless some data will be received. This is counter measure to # data to it unless some data will be received. This is counter measure to
@ -417,18 +673,45 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.asy
# TODO not implemented # TODO not implemented
notice "Received not ST_SYN and socket is not know" notice "Received not ST_SYN and socket is not know"
proc initSynPacket(socket: UtpSocket): OutgoingPacket =
## creates syncPacket based on socket current state and put it in its outgoing
## buffer
doAssert(socket.state == SynSent)
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576)
# set number of transmissions to 1 as syn packet will be send just after
# initiliazation
let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false)
socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets)
socket.outBuffer.put(socket.seqNr, outgoingPacket)
inc socket.seqNr
inc socket.curWindowPackets
outgoingPacket
proc openSockets*(p: UtpProtocol): int =
## Returns number of currently active sockets
len(p.activeSockets)
proc close*(s: UtpSocket) =
# TODO Rething all this when working on FIN and RESET packets and proper handling
# of resources
s.checkTimeoutsLoop.cancel()
s.closeEvent.fire()
# Connect to provided address # Connect to provided address
# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 # Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732
# TODO not implemented
proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] = proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] =
let socket = initOutgoingSocket(address, p, p.rng[]) let socket = initOutgoingSocket(address, p, p.socketConfig, p.rng[])
p.activeSockets.registerUtpSocket(socket.getSocketKey(), socket) let socketKey = socket.getSocketKey()
let synEncoded = socket.initSynPacket() p.activeSockets.registerUtpSocket(socketKey, socket)
notice "Sending packet", packet = synEncoded # whenever socket get permanently closed, deregister it
socket.registerCloseCallback(proc () = p.activeSockets.deRegisterUtpSocket(socketKey))
var outgoingSyn = socket.initSynPacket()
notice "Sending syn packet packet", packet = outgoingSyn
# TODO add callback to handle errors and cancellation i.e unregister socket on # TODO add callback to handle errors and cancellation i.e unregister socket on
# send error and finish connection future with failure # send error and finish connection future with failure
# sending should be done from UtpSocketContext # sending should be done from UtpSocketContext
discard socket.sendData(synEncoded) discard socket.sendData(outgoingSyn.packetBytes)
socket.startTimeoutLoop()
return socket.connectionFuture return socket.connectionFuture
proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): proc processDatagram(transp: DatagramTransport, raddr: TransportAddress):
@ -449,14 +732,24 @@ proc processDatagram(transp: DatagramTransport, raddr: TransportAddress):
proc new*( proc new*(
T: type UtpProtocol, T: type UtpProtocol,
acceptConnectionCb: AcceptConnectionCallback, acceptConnectionCb: AcceptConnectionCallback,
address: TransportAddress, address: TransportAddress,
socketConfig: SocketConfig = SocketConfig.init(),
rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} = rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} =
doAssert(not(isNil(acceptConnectionCb))) doAssert(not(isNil(acceptConnectionCb)))
let activeSockets = UtpSocketsContainerRef.new() let activeSockets = UtpSocketsContainerRef.new()
let utp = UtpProtocol(activeSockets: activeSockets, acceptConnectionCb: acceptConnectionCb, rng: rng) let utp = UtpProtocol(
activeSockets: activeSockets,
acceptConnectionCb: acceptConnectionCb,
socketConfig: socketConfig,
rng: rng
)
let ta = newDatagramTransport(processDatagram, udata = utp, local = address) let ta = newDatagramTransport(processDatagram, udata = utp, local = address)
utp.transport = ta utp.transport = ta
utp utp
proc closeWait*(p: UtpProtocol): Future[void] = proc closeWait*(p: UtpProtocol): Future[void] {.async.} =
p.transport.closeWait() # TODO Rething all this when working on FIN and RESET packets and proper handling
# of resources
await p.transport.closeWait()
for s in p.activeSockets.allSockets():
s.close()

View File

@ -7,9 +7,14 @@
{.used.} {.used.}
import import
std/sugar,
unittest, unittest,
../../eth/utp/growable_buffer ../../eth/utp/growable_buffer
type TestObj = object
foo: string
suite "Utp ring buffer": suite "Utp ring buffer":
test "Empty buffer": test "Empty buffer":
let buff = GrowableCircularBuffer[int].init(size = 4) let buff = GrowableCircularBuffer[int].init(size = 4)
@ -30,6 +35,39 @@ suite "Utp ring buffer":
buff.get(13) == some(13) buff.get(13) == some(13)
buff.get(14) == some(14) buff.get(14) == some(14)
test "Modifing existing element in buffer":
var buff = GrowableCircularBuffer[TestObj].init(size = 4)
let oldText = "test"
let newText = "testChanged"
buff.put(11, TestObj(foo: oldText))
check:
buff.get(11).get() == TestObj(foo: oldText)
buff[11].foo = newText
check:
buff.get(11).get() == TestObj(foo: newText)
test "Checking if element exists and has some properties":
var buff = GrowableCircularBuffer[TestObj].init(size = 4)
let text = "test"
let textIdx = 11
check:
not buff.exists(textIdx, x => x.foo == text)
buff.put(textIdx, TestObj(foo: "old"))
check:
not buff.exists(textIdx, x => x.foo == text)
buff[textIdx].foo = text
check:
buff.exists(textIdx, x => x.foo == text)
test "Deleting elements from buffer": test "Deleting elements from buffer":
var buff = GrowableCircularBuffer[int].init(size = 4) var buff = GrowableCircularBuffer[int].init(size = 4)
buff.put(11, 11) buff.put(11, 11)

View File

@ -80,6 +80,54 @@ procSuite "Utp protocol tests":
await utpProt1.closeWait() await utpProt1.closeWait()
await utpProt2.closeWait() await utpProt2.closeWait()
asyncTest "Fail to connect to offline remote host":
let server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address , SocketConfig.init(milliseconds(200)))
let address1 = initTAddress("127.0.0.1", 9080)
let fut = utpProt1.connectTo(address1)
yield fut
check:
fut.failed()
await waitUntil(proc (): bool = utpProt1.openSockets() == 0)
check:
utpProt1.openSockets() == 0
await utpProt1.closeWait()
asyncTest "Success connect to remote host which initialy was offline":
let server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address, SocketConfig.init(milliseconds(500)))
let address1 = initTAddress("127.0.0.1", 9080)
let futSock = utpProt1.connectTo(address1)
# waiting 400 milisecond will trigger at least one re-send
await sleepAsync(milliseconds(400))
var server2Called = newAsyncEvent()
let utpProt2 = UtpProtocol.new(setAcceptedCallback(server2Called), address1)
# this future will be completed when we called accepted connection callback
await server2Called.wait()
yield futSock
check:
futSock.finished() and (not futsock.failed()) and (not futsock.cancelled())
server2Called.isSet()
await utpProt1.closeWait()
await utpProt2.closeWait()
asyncTest "Success data transfer when data fits into one packet": asyncTest "Success data transfer when data fits into one packet":
var server1Called = newAsyncEvent() var server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079) let address = initTAddress("127.0.0.1", 9079)