Utp code cleanup (#417)

* Refactor tests and move socket to separate file

* Move sockets handling to separate class

* Abstract over underlying transport

* Fix bug with receiving duplicated SYN packet

* Fix race condition in connect
This commit is contained in:
KonradStaniec 2021-10-28 11:41:43 +02:00 committed by GitHub
parent fd4f78d1c0
commit 34bac6e703
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1127 additions and 784 deletions

View File

@ -8,6 +8,7 @@
import
chronos, stew/byteutils,
./utp_socket,
./utp_protocol
# Exemple application to interact with reference implementation server to help with implementation

View File

@ -0,0 +1,75 @@
import
std/[hashes],
chronos, chronicles,
../p2p/discoveryv5/[protocol, node],
./utp_router,
../keys
type UtpDiscv5Protocol* = ref object of TalkProtocol
prot: protocol.Protocol
router: UtpRouter[Node]
proc hash(x: UtpSocketKey[Node]): Hash =
var h = 0
h = h !& x.remoteAddress.hash
h = h !& x.rcvId.hash
!$h
proc initSendCallback(t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[Node] =
return (
proc (to: Node, data: seq[byte]): Future[void] =
let fut = newFuture[void]()
# TODO In discvoveryv5 each talkreq wait for talkresp, but here we would really
# like the fire and forget semantics (similar to udp).
# For now start talkreq/response in background, and discard its result.
# That way we also lose information about any possible errors.
# Cosider adding talkreq proc which does not wait for response,
discard t.talkreq(to, subProtocolName, data)
fut.complete()
return fut
)
proc messageHandler*(protocol: TalkProtocol, request: seq[byte],
srcId: NodeId, srcUdpAddress: Address): seq[byte] =
let p = UtpDiscv5Protocol(protocol)
let maybeSender = p.prot.getNode(srcId)
if maybeSender.isSome():
let sender = maybeSender.unsafeGet()
# processIncomingBytes may respond to remote by using talkreq requests
asyncSpawn p.router.processIncomingBytes(request, sender)
# We always sending empty response as discv5 spec requires that talkreq always
# receive talkresp
@[]
else:
@[]
proc new*(
T: type UtpDiscv5Protocol,
p: protocol.Protocol,
subProtocolName: seq[byte],
acceptConnectionCb: AcceptConnectionCallback[Node],
socketConfig: SocketConfig = SocketConfig.init(),
rng = newRng()): UtpDiscv5Protocol {.raises: [Defect, CatchableError].} =
doAssert(not(isNil(acceptConnectionCb)))
let router = UtpRouter[Node].new(
acceptConnectionCb,
socketConfig,
rng
)
router.sendCb = initSendCallback(p, subProtocolName)
let prot = UtpDiscv5Protocol(
protocolHandler: messageHandler,
prot: p,
router: router
)
p.registerTalkProtocol(subProtocolName, prot).expect(
"Only one protocol should have this id"
)
prot
proc connectTo*(r: UtpDiscv5Protocol, address: Node): Future[UtpSocket[Node]]=
return r.router.connectTo(address)

View File

@ -9,159 +9,19 @@
import
std/[tables, options, hashes, sugar, math],
chronos, chronicles, bearssl,
./packets,
./growable_buffer,
./utp_router,
../keys
logScope:
topics = "utp"
type
ConnectionState = enum
Uninitialized,
Idle,
SynSent,
SynRecv,
Connected,
ConnectedFull,
Reset,
Destroy
UtpSocketKey = object
remoteAddress: TransportAddress
rcvId: uint16
OutgoingPacket = object
packetBytes: seq[byte]
transmissions: uint16
needResend: bool
timeSent: Moment
UtpSocket* = ref object
remoteAddress*: TransportAddress
state: ConnectionState
# Connection id for packets we receive
connectionIdRcv: uint16
# Connection id for packets we send
connectionIdSnd: uint16
# Sequence number for the next packet to be sent.
seqNr: uint16
# All seq number up to this havve been correctly acked by us
ackNr: uint16
# Should be completed after succesful connection to remote host or after timeout
# for the first syn packet
connectionFuture: Future[UtpSocket]
# the number of packets in the send queue. Packets that haven't
# yet been sent count as well as packets marked as needing resend
# the oldest un-acked packet in the send queue is seq_nr - cur_window_packets
curWindowPackets: uint16
# out going buffer for all send packets
outBuffer: GrowableCircularBuffer[OutgoingPacket]
# incoming buffer for out of order packets
inBuffer: GrowableCircularBuffer[Packet]
# current retransmit Timeout used to calculate rtoTimeout
retransmitTimeout: Duration
# calculated round trip time during communication with remote peer
rtt: Duration
# calculated round trip time variance
rttVar: Duration
# Round trip timeout dynamicaly updated based on acks received from remote
# peer
rto: Duration
# RTO timeout will happen when currenTime > rtoTimeout
rtoTimeout: Moment
# rcvBuffer
buffer: AsyncBuffer
# loop called every 500ms to check for on going timeout status
checkTimeoutsLoop: Future[void]
# number on consecutive re-transsmisions
retransmitCount: uint32
# Event which will complete whenever socket gets in destory statate
closeEvent: AsyncEvent
# All callback to be called whenever socket gets in destroy state
closeCallbacks: seq[Future[void]]
utpProt: UtpProtocol
UtpSocketsContainerRef = ref object
sockets: Table[UtpSocketKey, UtpSocket]
AckResult = enum
PacketAcked, PacketAlreadyAcked, PacketNotSentYet
SocketConfig* = object
# This is configurable (in contrast to reference impl), as with standard 2 syn resends
# default timeout set to 3seconds and doubling of timeout with each re-send, it
# means that initial connection would timeout after 21s, which seems rather long
initialSynTimeout*: Duration
# For now utp protocol is tied to udp transport, but ultimatly we would like to
# abstract underlying transport to be able to run utp over udp, discoveryv5 or
# maybe some test transport
UtpProtocol* = ref object
transport: DatagramTransport
activeSockets: UtpSocketsContainerRef
acceptConnectionCb: AcceptConnectionCallback
socketConfig: SocketConfig
rng*: ref BrHmacDrbgContext
# New remote client connection callback
# ``server`` - UtpProtocol object.
# ``client`` - accepted client utp socket.
AcceptConnectionCallback* = proc(server: UtpProtocol,
client: UtpSocket): Future[void] {.gcsafe, raises: [Defect].}
# Callback to be called whenever socket is closed
SocketCloseCallback = proc (): void {.gcsafe, raises: [Defect].}
ConnectionError* = object of CatchableError
const
# Maximal number of payload bytes per packet. Total packet size will be equal to
# mtuSize + sizeof(header) = 600 bytes
# TODO for now it is just some random value. Ultimatly this value should be dynamically
# adjusted based on traffic.
mtuSize = 580
# How often each socket check its different on going timers
checkTimeoutsLoopInterval = milliseconds(500)
# Defualt initial timeout for first Syn packet
defaultInitialSynTimeout = milliseconds(3000)
# Initial timeout to receive first Data data packet after receiving initial Syn
# packet. (TODO it should only be set when working over udp)
initialRcvRetransmitTimeout = milliseconds(10000)
proc new(T: type UtpSocketsContainerRef): T =
UtpSocketsContainerRef(sockets: initTable[UtpSocketKey, UtpSocket]())
proc init(T: type UtpSocketKey, remoteAddress: TransportAddress, rcvId: uint16): T =
UtpSocketKey(remoteAddress: remoteAddress, rcvId: rcvId)
proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16, needResend: bool, timeSent: Moment = Moment.now()): T =
OutgoingPacket(
packetBytes: packetBytes,
transmissions: transmissions,
needResend: needResend,
timeSent: timeSent
)
proc init*(T: type SocketConfig, initialSynTimeout: Duration = defaultInitialSynTimeout): T =
SocketConfig(initialSynTimeout: initialSynTimeout)
utpRouter: UtpRouter[TransportAddress]
# This should probably be defined in TransportAddress module, as hash function should
# be consitent with equality function
@ -190,566 +50,55 @@ proc hash(x: TransportAddress): Hash =
!$h
# Required to use socketKey as key in hashtable
proc hash(x: UtpSocketKey): Hash =
proc hash(x: UtpSocketKey[TransportAddress]): Hash =
var h = 0
h = h !& x.remoteAddress.hash
h = h !& x.rcvId.hash
!$h
proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async.} =
## Set callback which will be called whenever the socket is permanently closed
try:
await s.closeEvent.wait()
cb()
except CancelledError:
trace "closeCallback cancelled"
proc registerCloseCallback*(s: UtpSocket, cb: SocketCloseCallback) =
s.closeCallbacks.add(s.setCloseCallback(cb))
proc getUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey): Option[UtpSocket] =
let s = s.sockets.getOrDefault(k)
if s == nil:
none[UtpSocket]()
else:
some(s)
proc registerUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey, socket: UtpSocket) =
# TODO Handle duplicates
s.sockets[k] = socket
proc deRegisterUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey) =
s.sockets.del(k)
iterator allSockets(s: UtpSocketsContainerRef): UtpSocket =
for socket in s.sockets.values():
yield socket
proc len(s: UtpSocketsContainerRef): int =
len(s.sockets)
# TODO extract similiar code between Outgoinhg and Incoming socket initialization
proc initOutgoingSocket(to: TransportAddress, p: UtpProtocol, cfg: SocketConfig, rng: var BrHmacDrbgContext): UtpSocket =
# TODO handle possible clashes and overflows
let rcvConnectionId = randUint16(rng)
let sndConnectionId = rcvConnectionId + 1
let initialSeqNr = randUint16(rng)
UtpSocket(
remoteAddress: to,
state: SynSent,
connectionIdRcv: rcvConnectionId,
connectionIdSnd: sndConnectionId,
seqNr: initialSeqNr,
connectionFuture: newFuture[UtpSocket](),
outBuffer: GrowableCircularBuffer[OutgoingPacket].init(),
inBuffer: GrowableCircularBuffer[Packet].init(),
retransmitTimeout: cfg.initialSynTimeout,
rtoTimeout: Moment.now() + cfg.initialSynTimeout,
# Initial timeout values taken from reference implemntation
rtt: milliseconds(0),
rttVar: milliseconds(800),
rto: milliseconds(3000),
# Default 1MB buffer
# TODO add posibility to configure buffer size
buffer: AsyncBuffer.init(1024 * 1024),
closeEvent: newAsyncEvent(),
closeCallbacks: newSeq[Future[void]](),
utpProt: p
)
proc initIncomingSocket(to: TransportAddress, p: UtpProtocol, connectionId: uint16, ackNr: uint16, rng: var BrHmacDrbgContext): UtpSocket =
let initialSeqNr = randUint16(rng)
UtpSocket(
remoteAddress: to,
state: SynRecv,
connectionIdRcv: connectionId + 1,
connectionIdSnd: connectionId,
seqNr: initialSeqNr,
ackNr: ackNr,
connectionFuture: newFuture[UtpSocket](),
outBuffer: GrowableCircularBuffer[OutgoingPacket].init(),
inBuffer: GrowableCircularBuffer[Packet].init(),
retransmitTimeout: initialRcvRetransmitTimeout,
rtoTimeout: Moment.now() + initialRcvRetransmitTimeout,
# Initial timeout values taken from reference implemntation
rtt: milliseconds(0),
rttVar: milliseconds(800),
rto: milliseconds(3000),
# Default 1MB buffer
# TODO add posibility to configure buffer size
buffer: AsyncBuffer.init(1024 * 1024),
closeEvent: newAsyncEvent(),
closeCallbacks: newSeq[Future[void]](),
utpProt: p
)
proc createAckPacket(socket: UtpSocket): Packet =
## Creates ack packet based on the socket current state
ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576)
proc max(a, b: Duration): Duration =
if (a > b):
a
else:
b
proc updateTimeouts(socket: UtpSocket, timeSent: Moment, currentTime: Moment) =
## Update timeouts according to spec:
## delta = rtt - packet_rtt
## rtt_var += (abs(delta) - rtt_var) / 4;
## rtt += (packet_rtt - rtt) / 8;
let packetRtt = currentTime - timeSent
if (socket.rtt.isZero):
socket.rtt = packetRtt
socket.rttVar = packetRtt div 2
else:
let packetRttMicro = packetRtt.microseconds()
let rttVarMicro = socket.rttVar.microseconds()
let rttMicro = socket.rtt.microseconds()
let delta = rttMicro - packetRttMicro
let newVar = microseconds(rttVarMicro + (abs(delta) - rttVarMicro) div 4)
let newRtt = socket.rtt - (socket.rtt div 8) + (packetRtt div 8)
socket.rttVar = newVar
socket.rtt = newRtt
# according to spec it should be: timeout = max(rtt + rtt_var * 4, 500)
# but usually spec lags after implementation so milliseconds(1000) is used
socket.rto = max(socket.rtt + (socket.rttVar * 4), milliseconds(1000))
proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult =
let packetOpt = socket.outBuffer.get(seqNr)
if packetOpt.isSome():
let packet = packetOpt.get()
if packet.transmissions == 0:
# according to reference impl it can happen when we get an ack_nr that
# does not exceed what we have stuffed into the outgoing buffer,
# but does exceed what we have sent
# TODO analyze if this case can happen with our impl
return PacketNotSentYet
let currentTime = Moment.now()
socket.outBuffer.delete(seqNr)
# from spec: The rtt and rtt_var is only updated for packets that were sent only once.
# This avoids problems with figuring out which packet was acked, the first or the second one.
# it is standard solution to retransmission ambiguity problem
if packet.transmissions == 1:
socket.updateTimeouts(packet.timeSent, currentTime)
socket.retransmitTimeout = socket.rto
socket.rtoTimeout = currentTime + socket.rto
# TODO Add handlig of decreasing bytes window, whenadding handling of congestion control
socket.retransmitCount = 0
PacketAcked
else:
# the packet has already been acked (or not sent)
PacketAlreadyAcked
proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) =
var i = 0
while i < int(nrPacketsToack):
let result = socket.ackPacket(socket.seqNr - socket.curWindowPackets)
case result
of PacketAcked:
dec socket.curWindowPackets
of PacketAlreadyAcked:
dec socket.curWindowPackets
of PacketNotSentYet:
debug "Tried to ack packed which was not sent yet"
break
inc i
proc getSocketKey(socket: UtpSocket): UtpSocketKey =
UtpSocketKey.init(socket.remoteAddress, socket.connectionIdRcv)
proc isConnected*(socket: UtpSocket): bool =
socket.state == Connected
template readLoop(body: untyped): untyped =
while true:
# TODO error handling
let (consumed, done) = body
socket.buffer.shift(consumed)
if done:
break
else:
# TODO add condition to handle socket closing
await socket.buffer.wait()
# Check how many packets are still in the out going buffer, usefull for tests or
# debugging.
# It throws assertion error when number of elements in buffer do not equal kept counter
proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int =
var num = 0
for e in socket.outBuffer.items():
if e.isSome():
inc num
assert(num == int(socket.curWindowPackets))
num
proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] =
socket.utpProt.transport.sendTo(socket.remoteAddress, data)
proc sendPacket(socket: UtpSocket, packet: Packet): Future[void] =
socket.sendData(encodePacket(packet))
# Should be called before flushing data onto the socket
proc setSend(p: var OutgoingPacket): seq[byte] =
inc p.transmissions
p.needResend = false
p.timeSent = Moment.now()
return p.packetBytes
proc flushPackets(socket: UtpSocket) {.async.} =
var i: uint16 = socket.seqNr - socket.curWindowPackets
while i != socket.seqNr:
# sending only packet which were not transmitted yet or need a resend
let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true))
if (shouldSendPacket):
let toSend = setSend(socket.outBuffer[i])
await socket.sendData(toSend)
inc i
proc getPacketSize(socket: UtpSocket): int =
# TODO currently returning constant, ultimatly it should be bases on mtu estimates
mtuSize
proc resetSendTimeout(socket: UtpSocket) =
socket.retransmitTimeout = socket.rto
socket.rtoTimeout = Moment.now() + socket.retransmitTimeout
proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} =
var bytesWritten = 0
# TODO
# Handle different socket state i.e do not write when socket is full or not
# connected
# Handle growing of send window
if len(data) == 0:
return bytesWritten
if socket.curWindowPackets == 0:
socket.resetSendTimeout()
let pSize = socket.getPacketSize()
let endIndex = data.high()
var i = 0
while i <= data.high:
let lastIndex = i + pSize - 1
let lastOrEnd = min(lastIndex, endIndex)
let dataSlice = data[i..lastOrEnd]
let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576, dataSlice)
socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets)
socket.outBuffer.put(socket.seqNr, OutgoingPacket.init(encodePacket(dataPacket), 0, false))
inc socket.seqNr
inc socket.curWindowPackets
bytesWritten = bytesWritten + len(dataSlice)
i = lastOrEnd + 1
await socket.flushPackets()
return bytesWritten
proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}=
## Read all bytes `n` bytes from socket ``socket``.
##
## This procedure allocates buffer seq[byte] and return it as result.
var bytes = newSeq[byte]()
if n == 0:
return bytes
readLoop():
# TODO Add handling of socket closing
let count = min(socket.buffer.dataLen(), n - len(bytes))
bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1))
(count, len(bytes) == n)
return bytes
proc isOpened(socket:UtpSocket): bool =
return (
socket.state == SynRecv or
socket.state == SynSent or
socket.state == Connected or
socket.state == ConnectedFull
)
proc markAllPacketAsLost(s: UtpSocket) =
var i = 0'u16
while i < s.curWindowPackets:
let packetSeqNr = s.seqNr - 1 - i
if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p. transmissions > 0 and p.needResend == false)):
s.outBuffer[packetSeqNr].needResend = true
# TODO here we should also decrease number of bytes in flight. This should be
# done when working on congestion control
inc i
proc checkTimeouts(socket: UtpSocket) {.async.} =
let currentTime = Moment.now()
# flush all packets which needs to be re-send
if socket.state != Destroy:
await socket.flushPackets()
if socket.isOpened():
if (currentTime > socket.rtoTimeout):
# TODO add handling of probe time outs. Reference implemenation has mechanism
# of sending probes to determine mtu size. Probe timeouts do not count to standard
# timeouts calculations
# client initiated connections, but did not send following data packet in rto
# time. TODO this should be configurable
if (socket.state == SynRecv):
socket.state = Destroy
socket.closeEvent.fire()
return
if (socket.state == SynSent and socket.retransmitCount >= 2) or (socket.retransmitCount >= 4):
if socket.state == SynSent and (not socket.connectionFuture.finished()):
# TODO standard stream interface result in failed future in case of failed connections,
# but maybe it would be more clean to use result
socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out"))
socket.state = Destroy
socket.closeEvent.fire()
return
let newTimeout = socket.retransmitTimeout * 2
socket.retransmitTimeout = newTimeout
socket.rtoTimeout = currentTime + newTimeout
# TODO Add handling of congestion control
# This will have much more sense when we will add handling of selective acks
# as then every selecivly acked packet restes timeout timer and removes packet
# from out buffer.
markAllPacketAsLost(socket)
# resend oldest packet if there are some packets in flight
if (socket.curWindowPackets > 0):
notice "resending oldest packet in outBuffer"
inc socket.retransmitCount
let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets
# TODO add handling of fast timeout
doAssert(
socket.outBuffer.get(oldestPacketSeqNr).isSome(),
"oldest packet should always be available when there is data in flight"
)
let dataToSend = setSend(socket.outBuffer[oldestPacketSeqNr])
await socket.sendData(dataToSend)
# TODO add sending keep alives when necessary
proc checkTimeoutsLoop(s: UtpSocket) {.async.} =
## Loop that check timeoutsin the socket.
try:
while true:
await sleepAsync(checkTimeoutsLoopInterval)
await s.checkTimeouts()
except CancelledError:
trace "checkTimeoutsLoop canceled"
proc startTimeoutLoop(s: UtpSocket) =
s.checkTimeoutsLoop = checkTimeoutsLoop(s)
proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.async.}=
notice "Received packet ", packet = p
let socketKey = UtpSocketKey.init(sender, p.header.connectionId)
let maybeSocket = prot.activeSockets.getUtpSocket(socketKey)
let pkSeqNr = p.header.seqNr
let pkAckNr = p.header.ackNr
if (maybeSocket.isSome()):
let socket = maybeSocket.unsafeGet()
case p.header.pType
of ST_DATA:
# To avoid amplification attacks, server socket is in SynRecv state until
# it receices first data transfer
# https://www.usenix.org/system/files/conference/woot15/woot15-paper-adamsky.pdf
# TODO when intgrating with discv5 this need to be configurable
if (socket.state == SynRecv):
socket.state = Connected
notice "Received ST_DATA on known socket"
# number of packets past the expected
# ack_nr is the last acked, seq_nr is the
# current. Subtracring 1 makes 0 mean "this is the next expected packet"
let pastExpected = pkSeqNr - socket.ackNr - 1
if (pastExpected == 0):
# we are getting in order data packet, we can flush data directly to the incoming buffer
await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len())
# TODO handle the case when there may be some packets in incoming buffer which
# are direct extension of this packet and therefore we could pass also their
# content to upper layer. This may need to be done when handling selective
# acks.
# Bytes have been passed to upper layer, we can increase number of last
# acked packet
inc socket.ackNr
# TODO for now we just schedule concurrent task with ack sending. It may
# need improvement, as with this approach there is no direct control over
# how many concurrent tasks there are and how to cancel them when socket
# is closed
let ack = socket.createAckPacket()
asyncSpawn socket.sendPacket(ack)
else:
# TODO handle out of order packets
notice "Got out of order packet"
of ST_FIN:
# TODO not implemented
notice "Received ST_FIN on known socket"
of ST_STATE:
notice "Received ST_STATE on known socket"
# acks is the number of packets that was acked, in normal case - no selective
# acks, no losses, no resends, it will usually be equal to 1
var acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets)
if acks > socket.curWindowPackets:
# this case happens if the we already received this ack nr
acks = 0
socket.ackPackets(acks)
if (socket.state == SynSent and (not socket.connectionFuture.finished())):
socket.state = Connected
# TODO reference implementation sets ackNr (p.header.seqNr - 1), although
# spec mention that it should be equal p.header.seqNr. For now follow the
# reference impl to be compatible with it. Later investigate trin compatibility.
socket.ackNr = p.header.seqNr - 1
# In case of SynSent complate the future as last thing to make sure user of libray will
# receive socket in correct state
socket.connectionFuture.complete(socket)
# TODO to finish handhske we should respond with ST_DATA packet, without it
# socket is left in half-open state.
# Actual reference implementation waits for user to send data, as it assumes
# existence of application level handshake over utp. We may need to modify this
# to automaticly send ST_DATA .
of ST_RESET:
# TODO not implemented
notice "Received ST_RESET on known socket"
of ST_SYN:
# TODO not implemented
notice "Received ST_SYN on known socket"
else:
# We got packet for which we do not have active socket. If the packet is not a
# SynPacket we should reject it and send rst packet to sender in some cases
if (p.header.pType == ST_SYN):
# Initial ackNr is set to incoming packer seqNr
let incomingSocket = initIncomingSocket(sender, prot, p.header.connectionId, p.header.seqNr, prot.rng[])
let socketKey = incomingSocket.getSocketKey()
prot.activeSockets.registerUtpSocket(socketKey, incomingSocket)
# whenever socket get permanently closed, deregister it
incomingSocket.registerCloseCallback(proc () = prot.activeSockets.deRegisterUtpSocket(socketKey))
# Make sure ack was flushed onto datagram socket before passing connction
# to upper layer
await incomingSocket.sendPacket(incomingSocket.createAckPacket())
incomingSocket.startTimeoutLoop()
# TODO By default (when we have utp over udp) socket here is passed to upper layer
# in SynRecv state, which is not writeable i.e user of socket cannot write
# data to it unless some data will be received. This is counter measure to
# amplification attacks.
# During integration with discovery v5 (i.e utp over discovv5), we must re-think
# this.
asyncSpawn prot.acceptConnectionCb(prot, incomingSocket)
notice "Received ST_SYN and socket is not known"
else:
# TODO not implemented
notice "Received not ST_SYN and socket is not know"
proc initSynPacket(socket: UtpSocket): OutgoingPacket =
## creates syncPacket based on socket current state and put it in its outgoing
## buffer
doAssert(socket.state == SynSent)
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576)
# set number of transmissions to 1 as syn packet will be send just after
# initiliazation
let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false)
socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets)
socket.outBuffer.put(socket.seqNr, outgoingPacket)
inc socket.seqNr
inc socket.curWindowPackets
outgoingPacket
proc openSockets*(p: UtpProtocol): int =
## Returns number of currently active sockets
len(p.activeSockets)
proc close*(s: UtpSocket) =
# TODO Rething all this when working on FIN and RESET packets and proper handling
# of resources
s.checkTimeoutsLoop.cancel()
s.closeEvent.fire()
# Connect to provided address
# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732
proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] =
let socket = initOutgoingSocket(address, p, p.socketConfig, p.rng[])
let socketKey = socket.getSocketKey()
p.activeSockets.registerUtpSocket(socketKey, socket)
# whenever socket get permanently closed, deregister it
socket.registerCloseCallback(proc () = p.activeSockets.deRegisterUtpSocket(socketKey))
var outgoingSyn = socket.initSynPacket()
notice "Sending syn packet packet", packet = outgoingSyn
# TODO add callback to handle errors and cancellation i.e unregister socket on
# send error and finish connection future with failure
# sending should be done from UtpSocketContext
discard socket.sendData(outgoingSyn.packetBytes)
socket.startTimeoutLoop()
return socket.connectionFuture
proc processDatagram(transp: DatagramTransport, raddr: TransportAddress):
Future[void] {.async.} =
let utpProt = getUserData[UtpProtocol](transp)
let router = getUserData[UtpRouter[TransportAddress]](transp)
# TODO: should we use `peekMessage()` to avoid allocation?
let buf = try: transp.getMessage()
except TransportOsError as e:
# This is likely to be local network connection issues.
return
await processIncomingBytes[TransportAddress](router, buf, raddr)
let dec = decodePacket(buf)
if (dec.isOk()):
await processPacket(utpProt, dec.get(), raddr)
else:
warn "failed to decode packet from address", address = raddr
proc initSendCallback(t: DatagramTransport): SendCallback[TransportAddress] =
return (
proc (to: TransportAddress, data: seq[byte]): Future[void] =
t.sendTo(to, data)
)
proc new*(
T: type UtpProtocol,
acceptConnectionCb: AcceptConnectionCallback,
acceptConnectionCb: AcceptConnectionCallback[TransportAddress],
address: TransportAddress,
socketConfig: SocketConfig = SocketConfig.init(),
rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} =
doAssert(not(isNil(acceptConnectionCb)))
let activeSockets = UtpSocketsContainerRef.new()
let utp = UtpProtocol(
activeSockets: activeSockets,
acceptConnectionCb: acceptConnectionCb,
socketConfig: socketConfig,
rng: rng
let router = UtpRouter[TransportAddress].new(
acceptConnectionCb,
socketConfig,
rng
)
let ta = newDatagramTransport(processDatagram, udata = utp, local = address)
utp.transport = ta
utp
let ta = newDatagramTransport(processDatagram, udata = router, local = address)
router.sendCb = initSendCallback(ta)
UtpProtocol(transport: ta, utpRouter: router)
proc closeWait*(p: UtpProtocol): Future[void] {.async.} =
# TODO Rething all this when working on FIN and RESET packets and proper handling
# of resources
await p.transport.closeWait()
for s in p.activeSockets.allSockets():
s.close()
p.utpRouter.close()
proc connectTo*(r: UtpProtocol, address: TransportAddress): Future[UtpSocket[TransportAddress]] =
return r.utpRouter.connectTo(address)
proc openSockets*(r: UtpProtocol): int =
len(r.utpRouter)

126
eth/utp/utp_router.nim Normal file
View File

@ -0,0 +1,126 @@
import
std/[tables, options],
chronos, bearssl, chronicles,
../keys,
./utp_socket,
./packets
logScope:
topics = "utp_router"
export utp_socket
type
# New remote client connection callback
# ``server`` - UtpProtocol object.
# ``client`` - accepted client utp socket.
AcceptConnectionCallback*[A] = proc(server: UtpRouter[A],
client: UtpSocket[A]): Future[void] {.gcsafe, raises: [Defect].}
# Oject responsible for creating and maintaing table of of utp sockets.
# caller should use `processIncomingBytes` proc to feed it with incoming byte
# packets, based this input, proper utp sockets will be created, closed, or will
# receive data
UtpRouter*[A] = ref object
sockets: Table[UtpSocketKey[A], UtpSocket[A]]
socketConfig: SocketConfig
acceptConnection: AcceptConnectionCallback[A]
sendCb*: SendCallback[A]
rng*: ref BrHmacDrbgContext
proc getUtpSocket[A](s: UtpRouter[A], k: UtpSocketKey[A]): Option[UtpSocket[A]] =
let s = s.sockets.getOrDefault(k)
if s == nil:
none[UtpSocket[A]]()
else:
some(s)
proc deRegisterUtpSocket[A](s: UtpRouter[A], socket: UtpSocket[A]) =
s.sockets.del(socket.socketKey)
iterator allSockets[A](s: UtpRouter[A]): UtpSocket[A] =
for socket in s.sockets.values():
yield socket
proc len*[A](s: UtpRouter[A]): int =
len(s.sockets)
proc registerUtpSocket[A](p: UtpRouter, s: UtpSocket[A]) =
# TODO Handle duplicates
p.sockets[s.socketKey] = s
# Install deregister handler, so when socket will get closed, in will be promptly
# removed from open sockets table
s.registerCloseCallback(proc () = p.deRegisterUtpSocket(s))
proc new*[A](
T: type UtpRouter[A],
acceptConnectionCb: AcceptConnectionCallback[A],
socketConfig: SocketConfig = SocketConfig.init(),
rng = newRng()): UtpRouter[A] {.raises: [Defect, CatchableError].} =
doAssert(not(isNil(acceptConnectionCb)))
UtpRouter[A](
sockets: initTable[UtpSocketKey[A], UtpSocket[A]](),
acceptConnection: acceptConnectionCb,
socketConfig: socketConfig,
rng: rng
)
proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
notice "Received packet ", packet = p
let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId)
let maybeSocket = r.getUtpSocket(socketKey)
case p.header.pType
of ST_RESET:
# TODO Properly handle Reset packet, and close socket
notice "Received RESET packet"
of ST_SYN:
# Syn packet are special, and we need to add 1 to header connectionId
let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId + 1)
let maybeSocket = r.getUtpSocket(socketKey)
if (maybeSocket.isSome()):
notice "Ignoring SYN for already existing connection"
else:
notice "Received SYN for not known connection. Initiating incoming connection"
# Initial ackNr is set to incoming packer seqNr
let incomingSocket = initIncomingSocket[A](sender, r.sendCb, p.header.connectionId, p.header.seqNr, r.rng[])
r.registerUtpSocket(incomingSocket)
await incomingSocket.startIncomingSocket()
# TODO By default (when we have utp over udp) socket here is passed to upper layer
# in SynRecv state, which is not writeable i.e user of socket cannot write
# data to it unless some data will be received. This is counter measure to
# amplification attacks.
# During integration with discovery v5 (i.e utp over discovv5), we must re-think
# this.
asyncSpawn r.acceptConnection(r, incomingSocket)
else:
let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId)
let maybeSocket = r.getUtpSocket(socketKey)
if (maybeSocket.isSome()):
let socket = maybeSocket.unsafeGet()
await socket.processPacket(p)
else:
# TODO add handling of respondig with reset
notice "Recevied FIN/DATA/ACK on not known socket"
proc processIncomingBytes*[A](r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} =
let dec = decodePacket(bytes)
if (dec.isOk()):
await processPacket[A](r, dec.get(), sender)
else:
warn "failed to decode packet from address", address = sender
# Connect to provided address
# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732
proc connectTo*[A](r: UtpRouter[A], address: A): Future[UtpSocket[A]] {.async.}=
let socket = initOutgoingSocket[A](address, r.sendCb, r.socketConfig, r.rng[])
r.registerUtpSocket(socket)
await socket.startOutgoingSocket()
await socket.waitFotSocketToConnect()
return socket
proc close*[A](r: UtpRouter[A]) =
# TODO Rething all this when working on FIN and RESET packets and proper handling
# of resources
for s in r.allSockets():
s.close()

630
eth/utp/utp_socket.nim Normal file
View File

@ -0,0 +1,630 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
std/sugar,
chronos, chronicles, bearssl,
./growable_buffer,
./packets
logScope:
topics = "utp_socket"
type
ConnectionState = enum
SynSent,
SynRecv,
Connected,
ConnectedFull,
Reset,
Destroy
UtpSocketKey*[A] = object
remoteAddress*: A
rcvId*: uint16
OutgoingPacket = object
packetBytes: seq[byte]
transmissions: uint16
needResend: bool
timeSent: Moment
AckResult = enum
PacketAcked, PacketAlreadyAcked, PacketNotSentYet
# Socket callback to send data to remote peer
SendCallback*[A] = proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect]}
UtpSocket*[A] = ref object
remoteAddress*: A
state: ConnectionState
# Connection id for packets we receive
connectionIdRcv: uint16
# Connection id for packets we send
connectionIdSnd: uint16
# Sequence number for the next packet to be sent.
seqNr: uint16
# All seq number up to this havve been correctly acked by us
ackNr: uint16
# Should be completed after succesful connection to remote host or after timeout
# for the first syn packet
connectionFuture: Future[void]
# the number of packets in the send queue. Packets that haven't
# yet been sent count as well as packets marked as needing resend
# the oldest un-acked packet in the send queue is seq_nr - cur_window_packets
curWindowPackets: uint16
# out going buffer for all send packets
outBuffer: GrowableCircularBuffer[OutgoingPacket]
# incoming buffer for out of order packets
inBuffer: GrowableCircularBuffer[Packet]
# current retransmit Timeout used to calculate rtoTimeout
retransmitTimeout: Duration
# calculated round trip time during communication with remote peer
rtt: Duration
# calculated round trip time variance
rttVar: Duration
# Round trip timeout dynamicaly updated based on acks received from remote
# peer
rto: Duration
# RTO timeout will happen when currenTime > rtoTimeout
rtoTimeout: Moment
# rcvBuffer
buffer: AsyncBuffer
# loop called every 500ms to check for on going timeout status
checkTimeoutsLoop: Future[void]
# number on consecutive re-transsmisions
retransmitCount: uint32
# Event which will complete whenever socket gets in destory statate
closeEvent: AsyncEvent
# All callback to be called whenever socket gets in destroy state
closeCallbacks: seq[Future[void]]
# socket identifier
socketKey*: UtpSocketKey[A]
send: SendCallback[A]
SocketConfig* = object
# This is configurable (in contrast to reference impl), as with standard 2 syn resends
# default timeout set to 3seconds and doubling of timeout with each re-send, it
# means that initial connection would timeout after 21s, which seems rather long
initialSynTimeout*: Duration
# User driven call back to be called whenever socket is permanently closed i.e
# reaches destroy state
SocketCloseCallback* = proc (): void {.gcsafe, raises: [Defect].}
ConnectionError* = object of CatchableError
const
# Maximal number of payload bytes per packet. Total packet size will be equal to
# mtuSize + sizeof(header) = 600 bytes
# TODO for now it is just some random value. Ultimatly this value should be dynamically
# adjusted based on traffic.
mtuSize = 580
# How often each socket check its different on going timers
checkTimeoutsLoopInterval = milliseconds(500)
# Defualt initial timeout for first Syn packet
defaultInitialSynTimeout = milliseconds(3000)
# Initial timeout to receive first Data data packet after receiving initial Syn
# packet. (TODO it should only be set when working over udp)
initialRcvRetransmitTimeout = milliseconds(10000)
proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T =
UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId)
proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16, needResend: bool, timeSent: Moment = Moment.now()): T =
OutgoingPacket(
packetBytes: packetBytes,
transmissions: transmissions,
needResend: needResend,
timeSent: timeSent
)
proc init*(T: type SocketConfig, initialSynTimeout: Duration = defaultInitialSynTimeout): T =
SocketConfig(initialSynTimeout: initialSynTimeout)
proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) =
## Adds packet to outgoing buffer and updates all related fields
socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets)
socket.outBuffer.put(socket.seqNr, oPacket)
inc socket.seqNr
inc socket.curWindowPackets
proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] =
socket.send(socket.remoteAddress, data)
proc sendAck(socket: UtpSocket): Future[void] =
## Creates and sends ack, based on current socket state. Acks are different from
## other packets as we do not track them in outgoing buffet
let ackPacket = ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576)
socket.sendData(encodePacket(ackPacket))
proc sendSyn(socket: UtpSocket): Future[void] =
doAssert(socket.state == SynSent , "syn can only be send when in SynSent state")
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576)
notice "Sending syn packet packet", packet = packet
# set number of transmissions to 1 as syn packet will be send just after
# initiliazation
let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false)
socket.registerOutgoingPacket(outgoingPacket)
socket.sendData(outgoingPacket.packetBytes)
# Should be called before sending packet
proc setSend(p: var OutgoingPacket): seq[byte] =
inc p.transmissions
p.needResend = false
p.timeSent = Moment.now()
return p.packetBytes
proc flushPackets(socket: UtpSocket) {.async.} =
var i: uint16 = socket.seqNr - socket.curWindowPackets
while i != socket.seqNr:
# sending only packet which were not transmitted yet or need a resend
let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true))
if (shouldSendPacket):
let toSend = setSend(socket.outBuffer[i])
await socket.sendData(toSend)
inc i
proc markAllPacketAsLost(s: UtpSocket) =
var i = 0'u16
while i < s.curWindowPackets:
let packetSeqNr = s.seqNr - 1 - i
if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p. transmissions > 0 and p.needResend == false)):
s.outBuffer[packetSeqNr].needResend = true
# TODO here we should also decrease number of bytes in flight. This should be
# done when working on congestion control
inc i
proc isOpened(socket:UtpSocket): bool =
return (
socket.state == SynRecv or
socket.state == SynSent or
socket.state == Connected or
socket.state == ConnectedFull
)
proc checkTimeouts(socket: UtpSocket) {.async.} =
let currentTime = Moment.now()
# flush all packets which needs to be re-send
if socket.state != Destroy:
await socket.flushPackets()
if socket.isOpened():
if (currentTime > socket.rtoTimeout):
# TODO add handling of probe time outs. Reference implemenation has mechanism
# of sending probes to determine mtu size. Probe timeouts do not count to standard
# timeouts calculations
# client initiated connections, but did not send following data packet in rto
# time. TODO this should be configurable
if (socket.state == SynRecv):
socket.state = Destroy
socket.closeEvent.fire()
return
if (socket.state == SynSent and socket.retransmitCount >= 2) or (socket.retransmitCount >= 4):
if socket.state == SynSent and (not socket.connectionFuture.finished()):
# TODO standard stream interface result in failed future in case of failed connections,
# but maybe it would be more clean to use result
socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out"))
socket.state = Destroy
socket.closeEvent.fire()
return
let newTimeout = socket.retransmitTimeout * 2
socket.retransmitTimeout = newTimeout
socket.rtoTimeout = currentTime + newTimeout
# TODO Add handling of congestion control
# This will have much more sense when we will add handling of selective acks
# as then every selecivly acked packet restes timeout timer and removes packet
# from out buffer.
markAllPacketAsLost(socket)
# resend oldest packet if there are some packets in flight
if (socket.curWindowPackets > 0):
notice "resending oldest packet in outBuffer"
inc socket.retransmitCount
let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets
# TODO add handling of fast timeout
doAssert(
socket.outBuffer.get(oldestPacketSeqNr).isSome(),
"oldest packet should always be available when there is data in flight"
)
let dataToSend = setSend(socket.outBuffer[oldestPacketSeqNr])
await socket.sendData(dataToSend)
# TODO add sending keep alives when necessary
proc checkTimeoutsLoop(s: UtpSocket) {.async.} =
## Loop that check timeoutsin the socket.
try:
while true:
await sleepAsync(checkTimeoutsLoopInterval)
await s.checkTimeouts()
except CancelledError:
trace "checkTimeoutsLoop canceled"
proc startTimeoutLoop(s: UtpSocket) =
s.checkTimeoutsLoop = checkTimeoutsLoop(s)
proc new[A](
T: type UtpSocket[A],
to: A,
snd: SendCallback[A],
state: ConnectionState,
initialTimeout: Duration,
rcvId: uint16,
sndId: uint16,
initialSeqNr: uint16,
initialAckNr: uint16
): T =
T(
remoteAddress: to,
state: state,
connectionIdRcv: rcvId,
connectionIdSnd: sndId,
seqNr: initialSeqNr,
ackNr: initialAckNr,
connectionFuture: newFuture[void](),
outBuffer: GrowableCircularBuffer[OutgoingPacket].init(),
inBuffer: GrowableCircularBuffer[Packet].init(),
retransmitTimeout: initialTimeout,
rtoTimeout: Moment.now() + initialTimeout,
# Initial timeout values taken from reference implemntation
rtt: milliseconds(0),
rttVar: milliseconds(800),
rto: milliseconds(3000),
# Default 1MB buffer
# TODO add posibility to configure buffer size
buffer: AsyncBuffer.init(1024 * 1024),
closeEvent: newAsyncEvent(),
closeCallbacks: newSeq[Future[void]](),
socketKey: UtpSocketKey.init(to, rcvId),
send: snd
)
proc initOutgoingSocket*[A](
to: A,
snd: SendCallback[A],
cfg: SocketConfig,
rng: var BrHmacDrbgContext
): UtpSocket[A] =
# TODO handle possible clashes and overflows
let rcvConnectionId = randUint16(rng)
let sndConnectionId = rcvConnectionId + 1
let initialSeqNr = randUint16(rng)
UtpSocket[A].new(
to,
snd,
SynSent,
cfg.initialSynTimeout,
rcvConnectionId,
sndConnectionId,
initialSeqNr,
# Initialy ack nr is 0, as we do not know remote inital seqnr
0
)
proc initIncomingSocket*[A](
to: A,
snd: SendCallback[A],
connectionId: uint16,
ackNr: uint16,
rng: var BrHmacDrbgContext
): UtpSocket[A] =
let initialSeqNr = randUint16(rng)
UtpSocket[A].new(
to,
snd,
SynRecv,
initialRcvRetransmitTimeout,
connectionId + 1,
connectionId,
initialSeqNr,
ackNr
)
proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} =
doAssert(socket.state == SynSent)
# TODO add callback to handle errors and cancellation i.e unregister socket on
# send error and finish connection future with failure
# sending should be done from UtpSocketContext
await socket.sendSyn()
socket.startTimeoutLoop()
proc waitFotSocketToConnect*(socket: UtpSocket): Future[void] {.async.} =
await socket.connectionFuture
proc startIncomingSocket*(socket: UtpSocket) {.async.} =
doAssert(socket.state == SynRecv)
# Make sure ack was flushed before movig forward
await socket.sendAck()
socket.startTimeoutLoop()
proc isConnected*(socket: UtpSocket): bool =
socket.state == Connected or socket.state == ConnectedFull
proc close*(s: UtpSocket) =
# TODO Rething all this when working on FIN and RESET packets and proper handling
# of resources
s.checkTimeoutsLoop.cancel()
s.closeEvent.fire()
proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async.} =
## Set callback which will be called whenever the socket is permanently closed
try:
await s.closeEvent.wait()
cb()
except CancelledError:
trace "closeCallback cancelled"
proc registerCloseCallback*(s: UtpSocket, cb: SocketCloseCallback) =
s.closeCallbacks.add(s.setCloseCallback(cb))
proc max(a, b: Duration): Duration =
if (a > b):
a
else:
b
proc updateTimeouts(socket: UtpSocket, timeSent: Moment, currentTime: Moment) =
## Update timeouts according to spec:
## delta = rtt - packet_rtt
## rtt_var += (abs(delta) - rtt_var) / 4;
## rtt += (packet_rtt - rtt) / 8;
let packetRtt = currentTime - timeSent
if (socket.rtt.isZero):
socket.rtt = packetRtt
socket.rttVar = packetRtt div 2
else:
let packetRttMicro = packetRtt.microseconds()
let rttVarMicro = socket.rttVar.microseconds()
let rttMicro = socket.rtt.microseconds()
let delta = rttMicro - packetRttMicro
let newVar = microseconds(rttVarMicro + (abs(delta) - rttVarMicro) div 4)
let newRtt = socket.rtt - (socket.rtt div 8) + (packetRtt div 8)
socket.rttVar = newVar
socket.rtt = newRtt
# according to spec it should be: timeout = max(rtt + rtt_var * 4, 500)
# but usually spec lags after implementation so milliseconds(1000) is used
socket.rto = max(socket.rtt + (socket.rttVar * 4), milliseconds(1000))
proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult =
let packetOpt = socket.outBuffer.get(seqNr)
if packetOpt.isSome():
let packet = packetOpt.get()
if packet.transmissions == 0:
# according to reference impl it can happen when we get an ack_nr that
# does not exceed what we have stuffed into the outgoing buffer,
# but does exceed what we have sent
# TODO analyze if this case can happen with our impl
return PacketNotSentYet
let currentTime = Moment.now()
socket.outBuffer.delete(seqNr)
# from spec: The rtt and rtt_var is only updated for packets that were sent only once.
# This avoids problems with figuring out which packet was acked, the first or the second one.
# it is standard solution to retransmission ambiguity problem
if packet.transmissions == 1:
socket.updateTimeouts(packet.timeSent, currentTime)
socket.retransmitTimeout = socket.rto
socket.rtoTimeout = currentTime + socket.rto
# TODO Add handlig of decreasing bytes window, whenadding handling of congestion control
socket.retransmitCount = 0
PacketAcked
else:
# the packet has already been acked (or not sent)
PacketAlreadyAcked
proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) =
## Ack packets in outgoing buffer based on ack number in the received packet
var i = 0
while i < int(nrPacketsToack):
let result = socket.ackPacket(socket.seqNr - socket.curWindowPackets)
case result
of PacketAcked:
dec socket.curWindowPackets
of PacketAlreadyAcked:
dec socket.curWindowPackets
of PacketNotSentYet:
debug "Tried to ack packed which was not sent yet"
break
inc i
# TODO at socket level we should handle only FIN/DATA/ACK packets. Refactor to make
# it enforcable by type system
proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
## Updates socket state based on received packet, and sends ack when necessary.
## Shoyuld be called in main packet receiving loop
let pkSeqNr = p.header.seqNr
let pkAckNr = p.header.ackNr
case p.header.pType
of ST_DATA:
# To avoid amplification attacks, server socket is in SynRecv state until
# it receices first data transfer
# https://www.usenix.org/system/files/conference/woot15/woot15-paper-adamsky.pdf
# TODO when intgrating with discv5 this need to be configurable
if (socket.state == SynRecv):
socket.state = Connected
notice "Received ST_DATA on known socket"
# number of packets past the expected
# ack_nr is the last acked, seq_nr is the
# current. Subtracring 1 makes 0 mean "this is the next expected packet"
let pastExpected = pkSeqNr - socket.ackNr - 1
if (pastExpected == 0):
# we are getting in order data packet, we can flush data directly to the incoming buffer
await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len())
# TODO handle the case when there may be some packets in incoming buffer which
# are direct extension of this packet and therefore we could pass also their
# content to upper layer. This may need to be done when handling selective
# acks.
# Bytes have been passed to upper layer, we can increase number of last
# acked packet
inc socket.ackNr
# TODO for now we just schedule concurrent task with ack sending. It may
# need improvement, as with this approach there is no direct control over
# how many concurrent tasks there are and how to cancel them when socket
# is closed
asyncSpawn socket.sendAck()
else:
# TODO handle out of order packets
notice "Got out of order packet"
of ST_FIN:
# TODO not implemented
notice "Received ST_FIN on known socket"
of ST_STATE:
notice "Received ST_STATE on known socket"
# acks is the number of packets that was acked, in normal case - no selective
# acks, no losses, no resends, it will usually be equal to 1
var acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets)
if acks > socket.curWindowPackets:
# this case happens if the we already received this ack nr
acks = 0
socket.ackPackets(acks)
if (socket.state == SynSent and (not socket.connectionFuture.finished())):
socket.state = Connected
# TODO reference implementation sets ackNr (p.header.seqNr - 1), although
# spec mention that it should be equal p.header.seqNr. For now follow the
# reference impl to be compatible with it. Later investigate trin compatibility.
socket.ackNr = p.header.seqNr - 1
# In case of SynSent complate the future as last thing to make sure user of libray will
# receive socket in correct state
socket.connectionFuture.complete()
# TODO to finish handhske we should respond with ST_DATA packet, without it
# socket is left in half-open state.
# Actual reference implementation waits for user to send data, as it assumes
# existence of application level handshake over utp. We may need to modify this
# to automaticly send ST_DATA .
of ST_RESET:
# TODO not implemented
notice "Received ST_RESET on known socket"
of ST_SYN:
# TODO not implemented
notice "Received ST_SYN on known socket"
template readLoop(body: untyped): untyped =
while true:
# TODO error handling
let (consumed, done) = body
socket.buffer.shift(consumed)
if done:
break
else:
# TODO add condition to handle socket closing
await socket.buffer.wait()
proc getPacketSize(socket: UtpSocket): int =
# TODO currently returning constant, ultimatly it should be bases on mtu estimates
mtuSize
proc resetSendTimeout(socket: UtpSocket) =
socket.retransmitTimeout = socket.rto
socket.rtoTimeout = Moment.now() + socket.retransmitTimeout
proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} =
var bytesWritten = 0
# TODO
# Handle different socket state i.e do not write when socket is full or not
# connected
# Handle growing of send window
if len(data) == 0:
return bytesWritten
if socket.curWindowPackets == 0:
socket.resetSendTimeout()
let pSize = socket.getPacketSize()
let endIndex = data.high()
var i = 0
while i <= data.high:
let lastIndex = i + pSize - 1
let lastOrEnd = min(lastIndex, endIndex)
let dataSlice = data[i..lastOrEnd]
let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576, dataSlice)
socket.registerOutgoingPacket(OutgoingPacket.init(encodePacket(dataPacket), 0, false))
bytesWritten = bytesWritten + len(dataSlice)
i = lastOrEnd + 1
await socket.flushPackets()
return bytesWritten
proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}=
## Read all bytes `n` bytes from socket ``socket``.
##
## This procedure allocates buffer seq[byte] and return it as result.
var bytes = newSeq[byte]()
if n == 0:
return bytes
readLoop():
# TODO Add handling of socket closing
let count = min(socket.buffer.dataLen(), n - len(bytes))
bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1))
(count, len(bytes) == n)
return bytes
# Check how many packets are still in the out going buffer, usefull for tests or
# debugging.
# It throws assertion error when number of elements in buffer do not equal kept counter
proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int =
var num = 0
for e in socket.outBuffer.items():
if e.isSome():
inc num
doAssert(num == int(socket.curWindowPackets))
num

View File

@ -9,4 +9,5 @@
import
./test_packets,
./test_protocol,
./test_discv5_protocol,
./test_buffer

View File

@ -0,0 +1,117 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
chronos, bearssl,
stew/shims/net, stew/byteutils,
testutils/unittests,
../../eth/p2p/discoveryv5/[enr, node, routing_table],
../../eth/p2p/discoveryv5/protocol as discv5_protocol,
../../eth/utp/utp_router,
../../eth/utp/utp_discov5_protocol,
../../eth/keys
proc localAddress*(port: int): Address =
Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(port))
proc initDiscoveryNode*(rng: ref BrHmacDrbgContext,
privKey: PrivateKey,
address: Address,
bootstrapRecords: openarray[Record] = [],
localEnrFields: openarray[(string, seq[byte])] = [],
previousRecord = none[enr.Record]()): discv5_protocol.Protocol =
# set bucketIpLimit to allow bucket split
let tableIpLimits = TableIpLimits(tableIpLimit: 1000, bucketIpLimit: 24)
result = newProtocol(privKey,
some(address.ip),
some(address.port), some(address.port),
bindPort = address.port,
bootstrapRecords = bootstrapRecords,
localEnrFields = localEnrFields,
previousRecord = previousRecord,
tableIpLimits = tableIpLimits,
rng = rng)
result.open()
proc generateByteArray(rng: var BrHmacDrbgContext, length: int): seq[byte] =
var bytes = newSeq[byte](length)
brHmacDrbgGenerate(rng, bytes)
return bytes
procSuite "Utp protocol over discovery v5 tests":
let rng = newRng()
let utpProtId = "test-utp".toBytes()
proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[Node] =
return (
proc(server: UtpRouter[Node], client: UtpSocket[Node]): Future[void] =
serverSockets.addLast(client)
)
# TODO Add more tests to discovery v5 suite, especially those which will differ
# from standard utp case
asyncTest "Success connect to remote host":
let
queue = newAsyncQueue[UtpSocket[Node]]()
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue))
utp2 = UtpDiscv5Protocol.new(node2, utpProtId, registerIncomingSocketCallback(queue))
# nodes must know about each other
check:
node1.addNode(node2.localNode)
node2.addNode(node1.localNode)
let clientSocket = await utp1.connectTo(node2.localNode)
check:
clientSocket.isConnected()
await node1.closeWait()
await node2.closeWait()
asyncTest "Success write data over packet size to remote host":
let
queue = newAsyncQueue[UtpSocket[Node]]()
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue))
utp2 = UtpDiscv5Protocol.new(node2, utpProtId, registerIncomingSocketCallback(queue))
# nodes must know about each other
check:
node1.addNode(node2.localNode)
node2.addNode(node1.localNode)
let numOfBytes = 5000
let clientSocket = await utp1.connectTo(node2.localNode)
let serverSocket = await queue.get()
let bytesToTransfer = generateByteArray(rng[], numOfBytes)
let written = await clientSocket.write(bytesToTransfer)
let received = await serverSocket.read(numOfBytes)
check:
written == numOfBytes
bytesToTransfer == received
clientSocket.isConnected()
serverSocket.isConnected()
await node1.closeWait()
await node2.closeWait()

View File

@ -10,6 +10,7 @@ import
sequtils,
chronos, bearssl,
testutils/unittests,
../../eth/utp/utp_router,
../../eth/utp/utp_protocol,
../../eth/keys
@ -20,6 +21,21 @@ proc generateByteArray(rng: var BrHmacDrbgContext, length: int): seq[byte] =
type AssertionCallback = proc(): bool {.gcsafe, raises: [Defect].}
proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback[TransportAddress] =
return (
proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] =
let fut = newFuture[void]()
event.fire()
fut.complete()
fut
)
proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[TransportAddress] =
return (
proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] =
serverSockets.addLast(client)
)
proc waitUntil(f: AssertionCallback): Future[void] {.async.} =
while true:
let res = f()
@ -28,33 +44,90 @@ proc waitUntil(f: AssertionCallback): Future[void] {.async.} =
else:
await sleepAsync(milliseconds(50))
proc transferData(sender: UtpSocket, receiver: UtpSocket, data: seq[byte]): Future[seq[byte]] {.async.}=
proc transferData(sender: UtpSocket[TransportAddress], receiver: UtpSocket[TransportAddress], data: seq[byte]): Future[seq[byte]] {.async.}=
let bytesWritten = await sender.write(data)
doAssert bytesWritten == len(data)
let received = await receiver.read(len(data))
return received
procSuite "Utp protocol tests":
type
ClientServerScenario = object
utp1: UtpProtocol
utp2: UtpProtocol
clientSocket: UtpSocket[TransportAddress]
serverSocket: UtpSocket[TransportAddress]
TwoClientsServerScenario = object
utp1: UtpProtocol
utp2: UtpProtocol
utp3: UtpProtocol
clientSocket1: UtpSocket[TransportAddress]
clientSocket2: UtpSocket[TransportAddress]
serverSocket1: UtpSocket[TransportAddress]
serverSocket2: UtpSocket[TransportAddress]
proc initClientServerScenario(): Future[ClientServerScenario] {.async.} =
let q = newAsyncQueue[UtpSocket[TransportAddress]]()
var server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address)
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt2 = UtpProtocol.new(registerIncomingSocketCallback(q), address1)
let clientSocket = await utpProt1.connectTo(address1)
# this future will be completed when we called accepted connection callback
let serverSocket = await q.popFirst()
return ClientServerScenario(
utp1: utpProt1,
utp2: utpProt2,
clientSocket: clientSocket,
serverSocket: serverSocket
)
proc close(s: ClientServerScenario) {.async.} =
await s.utp1.closeWait()
await s.utp2.closeWait()
proc init2ClientsServerScenario(): Future[TwoClientsServerScenario] {.async.} =
var serverSockets = newAsyncQueue[UtpSocket[TransportAddress]]()
var server1Called = newAsyncEvent()
let address1 = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address1)
let address2 = initTAddress("127.0.0.1", 9080)
let utpProt2 = UtpProtocol.new(registerIncomingSocketCallback(serverSockets), address2)
let address3 = initTAddress("127.0.0.1", 9081)
let utpProt3 = UtpProtocol.new(registerIncomingSocketCallback(serverSockets), address3)
let clientSocket1 = await utpProt1.connectTo(address2)
let clientSocket2 = await utpProt1.connectTo(address3)
await waitUntil(proc (): bool = len(serverSockets) == 2)
# this future will be completed when we called accepted connection callback
let serverSocket1 = serverSockets[0]
let serverSocket2 = serverSockets[1]
return TwoClientsServerScenario(
utp1: utpProt1,
utp2: utpProt2,
utp3: utpProt3,
clientSocket1: clientSocket1,
clientSocket2: clientSocket2,
serverSocket1: serverSocket1,
serverSocket2: serverSocket2
)
proc close(s: TwoClientsServerScenario) {.async.} =
await s.utp1.closeWait()
await s.utp2.closeWait()
await s.utp3.closeWait()
procSuite "Utp protocol over udp tests":
let rng = newRng()
proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback =
return (
proc(server: UtpProtocol, client: UtpSocket): Future[void] =
let fut = newFuture[void]()
event.fire()
fut.complete()
fut
)
proc setIncomingSocketCallback(socketPromise: Future[UtpSocket]): AcceptConnectionCallback =
return (
proc(server: UtpProtocol, client: UtpSocket): Future[void] =
let fut = newFuture[void]()
socketPromise.complete(client)
fut.complete()
fut
)
asyncTest "Success connect to remote host":
let server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079)
@ -129,148 +202,119 @@ procSuite "Utp protocol tests":
await utpProt2.closeWait()
asyncTest "Success data transfer when data fits into one packet":
var server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address)
var serverSocketFut = newFuture[UtpSocket]()
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt2 = UtpProtocol.new(setIncomingSocketCallback(serverSocketFut), address1)
let clientSocket = await utpProt1.connectTo(address1)
# this future will be completed when we called accepted connection callback
discard await serverSocketFut
let serverSocket =
try:
serverSocketFut.read()
except:
raiseAssert "Unexpected error when reading finished future"
let s = await initClientServerScenario()
check:
clientSocket.isConnected()
s.clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
clientSocket.numPacketsInOutGoingBuffer() == 0
s.clientSocket.numPacketsInOutGoingBuffer() == 0
# Server socket is not in connected state, until first data transfer
(not serverSocket.isConnected())
(not s.serverSocket.isConnected())
let bytesToTransfer = generateByteArray(rng[], 100)
let bytesReceivedFromClient = await transferData(clientSocket, serverSocket, bytesToTransfer)
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer)
check:
bytesToTransfer == bytesReceivedFromClient
serverSocket.isConnected()
s.serverSocket.isConnected()
let bytesReceivedFromServer = await transferData(serverSocket, clientSocket, bytesToTransfer)
let bytesReceivedFromServer = await transferData(s.serverSocket, s.clientSocket, bytesToTransfer)
check:
bytesToTransfer == bytesReceivedFromServer
await utpProt1.closeWait()
await utpProt2.closeWait()
await s.close()
asyncTest "Success data transfer when data need to be sliced into multiple packets":
var server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address)
var serverSocketFut = newFuture[UtpSocket]()
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt2 = UtpProtocol.new(setIncomingSocketCallback(serverSocketFut), address1)
let clientSocket = await utpProt1.connectTo(address1)
# this future will be completed when we called accepted connection callback
discard await serverSocketFut
let serverSocket =
try:
serverSocketFut.read()
except:
raiseAssert "Unexpected error when reading finished future"
let s = await initClientServerScenario()
check:
clientSocket.isConnected()
s.clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
clientSocket.numPacketsInOutGoingBuffer() == 0
s.clientSocket.numPacketsInOutGoingBuffer() == 0
(not serverSocket.isConnected())
(not s.serverSocket.isConnected())
# 5000 bytes is over maximal packet size
let bytesToTransfer = generateByteArray(rng[], 5000)
let bytesReceivedFromClient = await transferData(clientSocket, serverSocket, bytesToTransfer)
let bytesReceivedFromServer = await transferData(serverSocket, clientSocket, bytesToTransfer)
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer)
let bytesReceivedFromServer = await transferData(s.serverSocket, s.clientSocket, bytesToTransfer)
# ultimatly all send packets will acked, and outgoing buffer will be empty
await waitUntil(proc (): bool = clientSocket.numPacketsInOutGoingBuffer() == 0)
await waitUntil(proc (): bool = serverSocket.numPacketsInOutGoingBuffer() == 0)
await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0)
await waitUntil(proc (): bool = s.serverSocket.numPacketsInOutGoingBuffer() == 0)
check:
serverSocket.isConnected()
clientSocket.numPacketsInOutGoingBuffer() == 0
serverSocket.numPacketsInOutGoingBuffer() == 0
s.serverSocket.isConnected()
s.clientSocket.numPacketsInOutGoingBuffer() == 0
s.serverSocket.numPacketsInOutGoingBuffer() == 0
bytesReceivedFromClient == bytesToTransfer
bytesReceivedFromServer == bytesToTransfer
await utpProt1.closeWait()
await utpProt2.closeWait()
await s.close()
asyncTest "Success multiple data transfers when data need to be sliced into multiple packets":
var server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address)
var serverSocketFut = newFuture[UtpSocket]()
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt2 = UtpProtocol.new(setIncomingSocketCallback(serverSocketFut), address1)
let clientSocket = await utpProt1.connectTo(address1)
# this future will be completed when we called accepted connection callback
discard await serverSocketFut
let serverSocket =
try:
serverSocketFut.read()
except:
raiseAssert "Unexpected error when reading finished future"
let s = await initClientServerScenario()
check:
clientSocket.isConnected()
s.clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
clientSocket.numPacketsInOutGoingBuffer() == 0
s.clientSocket.numPacketsInOutGoingBuffer() == 0
# 5000 bytes is over maximal packet size
let bytesToTransfer = generateByteArray(rng[], 5000)
let written = await clientSocket.write(bytesToTransfer)
let written = await s.clientSocket.write(bytesToTransfer)
check:
written == len(bytesToTransfer)
let bytesToTransfer1 = generateByteArray(rng[], 5000)
let written1 = await clientSocket.write(bytesToTransfer1)
let written1 = await s.clientSocket.write(bytesToTransfer1)
check:
written1 == len(bytesToTransfer)
let bytesReceived = await serverSocket.read(len(bytesToTransfer) + len(bytesToTransfer1))
let bytesReceived = await s.serverSocket.read(len(bytesToTransfer) + len(bytesToTransfer1))
# ultimatly all send packets will acked, and outgoing buffer will be empty
await waitUntil(proc (): bool = clientSocket.numPacketsInOutGoingBuffer() == 0)
await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0)
check:
clientSocket.numPacketsInOutGoingBuffer() == 0
s.clientSocket.numPacketsInOutGoingBuffer() == 0
bytesToTransfer.concat(bytesToTransfer1) == bytesReceived
await s.close()
asyncTest "Success data transfers from multiple clients":
let s = await init2ClientsServerScenario()
await utpProt1.closeWait()
await utpProt2.closeWait()
check:
s.clientSocket1.isConnected()
s.clientSocket2.isConnected()
s.clientSocket1.numPacketsInOutGoingBuffer() == 0
s.clientSocket2.numPacketsInOutGoingBuffer() == 0
let numBytesToTransfer = 5000
let client1Data = generateByteArray(rng[], numBytesToTransfer)
let client2Data = generateByteArray(rng[], numBytesToTransfer)
discard s.clientSocket1.write(client1Data)
discard s.clientSocket2.write(client2Data)
let server1ReadBytes = await s.serverSocket1.read(numBytesToTransfer)
let server2ReadBytes = await s.serverSocket2.read(numBytesToTransfer)
check:
client1Data == server1ReadBytes
client2Data == server2ReadBytes
await s.close()