Add initial handling of acks (#406)

* Add initial handling of acks

Add implemetaion of circular buffer based on reference implementation
Add way to test number of packet in flight
Add acking of initial syn packet
This commit is contained in:
KonradStaniec 2021-10-15 13:38:51 +02:00 committed by GitHub
parent 7ae287ad1b
commit 6fbf129ba9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 280 additions and 24 deletions

View File

@ -65,7 +65,7 @@ jobs:
uses: actions/cache@v1 uses: actions/cache@v1
with: with:
path: rocks-db-cache-${{ matrix.target.cpu }} path: rocks-db-cache-${{ matrix.target.cpu }}
key: 'rocksdb-${{ matrix.target.os }}-${{ matrix.target.cpu }}' key: 'rocksdb-v1-${{ matrix.target.os }}-${{ matrix.target.cpu }}'
- name: Build and install rocksdb (Linux i386) - name: Build and install rocksdb (Linux i386)
# no librocksdb-dev:i386 # no librocksdb-dev:i386

View File

@ -0,0 +1,67 @@
import
std/[options, math]
export options
# Buffer implementation similar to the one in in reference implementation.
# Main rationale for it, is to refer to items in buffer by their sequence number,
# and support out of order packets.
# Therefore it is super specific data structure, and it mostly usefull for
# utp implementation.
# Another alternative would be to use standard deque from deques module, and caluclate
# item indexes from their sequence numbers.
type GrowableCircularBuffer*[A] = object
items: seq[Option[A]]
mask: int
# provided size will always be adjusted to next power of two
proc init*[A](T: type GrowableCircularBuffer[A], size: Natural = 16): T =
let powOfTwoSize = nextPowerOfTwo(size)
T(
items: newSeq[Option[A]](size),
mask: powOfTwoSize - 1
)
proc get*[A](buff: GrowableCircularBuffer[A], i: Natural): Option[A] =
buff.items[i and buff.mask]
proc putImpl[A](buff: var GrowableCircularBuffer[A], i: Natural, elem: Option[A]) =
buff.items[i and buff.mask] = elem
proc put*[A](buff: var GrowableCircularBuffer[A], i: Natural, elem: A) =
buff.putImpl(i, some(elem))
proc delete*[A](buff: var GrowableCircularBuffer[A], i: Natural) =
buff.putImpl(i, none[A]())
proc len*[A](buff: GrowableCircularBuffer[A]): int =
buff.mask + 1
# Item contains the element we want to make space for
# index is the index in the list.
proc ensureSize*[A](buff: var GrowableCircularBuffer[A], item: Natural, index: Natural) =
# Increase size until is next power of 2 which consists given index
proc getNextSize(currentSize: int, index: int): int =
var newSize = currentSize
while true:
newSize = newSize * 2
if not (index >= newSize):
break
newSize
if (index > buff.mask):
let currentSize = buff.mask + 1
let newSize = getNextSize(currentSize, index)
let newMask = newSize - 1
var newSeq = newSeq[Option[A]](newSize)
var i = 0
while i <= buff.mask:
let idx = item - index + i
newSeq[idx and newMask] = buff.get(idx)
inc i
buff.items = move(newSeq)
buff.mask = newMask
iterator items*[A](buff: GrowableCircularBuffer[A]): Option[A] =
for e in buff.items:
yield e

View File

@ -33,9 +33,13 @@ type
extension*: uint8 extension*: uint8
connectionId*: uint16 connectionId*: uint16
timestamp*: MicroSeconds timestamp*: MicroSeconds
# This is the difference between the local time, at the time the last packet
# was received, and the timestamp in this last received packet
timestampDiff*: MicroSeconds timestampDiff*: MicroSeconds
# The window size is the number of bytes currently in-flight, i.e. sent but not acked
wndSize*: uint32 wndSize*: uint32
seqNr*: uint16 seqNr*: uint16
# sequence number the sender of the packet last received in the other direction
ackNr*: uint16 ackNr*: uint16
Packet* = object Packet* = object

View File

@ -24,4 +24,6 @@ when isMainModule:
let remoteServer = initTAddress("127.0.0.1", 9078) let remoteServer = initTAddress("127.0.0.1", 9078)
let soc = waitFor utpProt.connectTo(remoteServer) let soc = waitFor utpProt.connectTo(remoteServer)
doAssert(soc.numPacketsInOutGoingBuffer() == 0)
waitFor utpProt.closeWait() waitFor utpProt.closeWait()

View File

@ -10,6 +10,7 @@ import
std/[tables, options, hashes], std/[tables, options, hashes],
chronos, chronicles, bearssl, chronos, chronicles, bearssl,
./packets, ./packets,
./growable_buffer,
../keys ../keys
logScope: logScope:
@ -41,16 +42,30 @@ type
seqNr: uint16 seqNr: uint16
# All seq number up to this havve been correctly acked by us # All seq number up to this havve been correctly acked by us
ackNr: uint16 ackNr: uint16
# Should be completed after succesful connection to remote host. # Should be completed after succesful connection to remote host.
# TODO check if nim gc handles properly cyclic references, as this future will # TODO check if nim gc handles properly cyclic references, as this future will
# contain reference to socket which hold this future. # contain reference to socket which hold this future.
# If that is not the case, then this future will need to be hold independly # If that is not the case, then this future will need to be hold independly
connectionFuture: Future[UtpSocket] connectionFuture: Future[UtpSocket]
# the number of packets in the send queue. Packets that haven't
# yet been sent count as well as packets marked as needing resend
# the oldest un-acked packet in the send queue is seq_nr - cur_window_packets
curWindowPackets: uint16
# out going buffer for all send packets
outBuffer: GrowableCircularBuffer[Packet]
# incoming buffer for out of order packets
inBuffer: GrowableCircularBuffer[Packet]
UtpSocketsContainerRef = ref object UtpSocketsContainerRef = ref object
sockets: Table[UtpSocketKey, UtpSocket] sockets: Table[UtpSocketKey, UtpSocket]
AckResult = enum
PacketAcked, PacketAlreadyAcked, PacketNotSentYet
# For now utp protocol is tied to udp transport, but ultimatly we would like to # For now utp protocol is tied to udp transport, but ultimatly we would like to
# abstract underlying transport to be able to run utp over udp, discoveryv5 or # abstract underlying transport to be able to run utp over udp, discoveryv5 or
# maybe some test transport # maybe some test transport
@ -62,6 +77,9 @@ type
proc new(T: type UtpSocketsContainerRef): T = proc new(T: type UtpSocketsContainerRef): T =
UtpSocketsContainerRef(sockets: initTable[UtpSocketKey, UtpSocket]()) UtpSocketsContainerRef(sockets: initTable[UtpSocketKey, UtpSocket]())
proc init(T: type UtpSocketKey, remoteAddress: TransportAddress, rcvId: uint16): T =
UtpSocketKey(remoteAddress: remoteAddress, rcvId: rcvId)
# This should probably be defined in TransportAddress module, as hash function should # This should probably be defined in TransportAddress module, as hash function should
# be consitent with equality function # be consitent with equality function
# in nim zero arrays always have hash equal to 0, irrespectively of array size, to # in nim zero arrays always have hash equal to 0, irrespectively of array size, to
@ -117,7 +135,9 @@ proc initOutgoingSocket(to: TransportAddress, rng: var BrHmacDrbgContext): UtpSo
connectionIdRcv: rcvConnectionId, connectionIdRcv: rcvConnectionId,
connectionIdSnd: sndConnectionId, connectionIdSnd: sndConnectionId,
seqNr: initialSeqNr, seqNr: initialSeqNr,
connectionFuture: newFuture[UtpSocket]() connectionFuture: newFuture[UtpSocket](),
outBuffer: GrowableCircularBuffer[Packet].init(),
inBuffer: GrowableCircularBuffer[Packet].init()
) )
proc initIncomingSocket(to: TransportAddress, connectionId: uint16, ackNr: uint16, rng: var BrHmacDrbgContext): UtpSocket = proc initIncomingSocket(to: TransportAddress, connectionId: uint16, ackNr: uint16, rng: var BrHmacDrbgContext): UtpSocket =
@ -129,21 +149,78 @@ proc initIncomingSocket(to: TransportAddress, connectionId: uint16, ackNr: uint1
connectionIdSnd: connectionId, connectionIdSnd: connectionId,
seqNr: initialSeqNr, seqNr: initialSeqNr,
ackNr: ackNr, ackNr: ackNr,
connectionFuture: newFuture[UtpSocket]() connectionFuture: newFuture[UtpSocket](),
outBuffer: GrowableCircularBuffer[Packet].init(),
inBuffer: GrowableCircularBuffer[Packet].init()
) )
proc ack(socket: UtpSocket): Packet = proc getAckPacket(socket: UtpSocket): Packet =
ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576) ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576)
proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult =
let packetOpt = socket.outBuffer.get(seqNr)
if packetOpt.isSome():
let packet = packetOpt.get()
# TODO Add number of transmision to each packet to track which packet was sent
# how many times, and handle here case when we try to ack packet which was not
# sent yet
socket.outBuffer.delete(seqNr)
# TODO Update estimates about roundtrip time, when we are acking packed which
# acked without re sends
PacketAcked
else:
# the packet has already been acked (or not sent)
PacketAlreadyAcked
proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) =
var i = 0
while i < int(nrPacketsToack):
let result = socket.ackPacket(socket.seqNr - socket.curWindowPackets)
case result
of PacketAcked:
dec socket.curWindowPackets
of PacketAlreadyAcked:
dec socket.curWindowPackets
of PacketNotSentYet:
debug "Tried to ack packed which was not sent yet"
break
inc i
proc getSocketKey(socket: UtpSocket): UtpSocketKey =
UtpSocketKey.init(socket.remoteAddress, socket.connectionIdRcv)
proc initSynPacket(socket: UtpSocket): seq[byte] =
assert(socket.state == SynSent)
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576)
socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets)
socket.outBuffer.put(socket.seqNr, packet)
inc socket.seqNr
inc socket.curWindowPackets
encodePacket(packet)
proc isConnected*(socket: UtpSocket): bool = proc isConnected*(socket: UtpSocket): bool =
socket.state == Connected socket.state == Connected
# Check how many packets are still in the out going buffer, usefull for tests or
# debugging.
# It throws assertion error when number of elements in buffer do not equal kept counter
proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int =
var num = 0
for e in socket.outBuffer.items():
if e.isSome():
inc num
assert(num == int(socket.curWindowPackets))
num
# TODO not implemented # TODO not implemented
# for now just log incoming packets # for now just log incoming packets
proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) = proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) =
notice "Received packet ", packet = p notice "Received packet ", packet = p
let socketKey = UtpSocketKey(remoteAddress: sender, rcvId: p.header.connectionId) let socketKey = UtpSocketKey.init(sender, p.header.connectionId)
let maybeSocket = prot.activeSockets.getUtpSocket(socketKey) let maybeSocket = prot.activeSockets.getUtpSocket(socketKey)
let pkSeqNr = p.header.seqNr
let pkAckNr = p.header.ackNr
if (maybeSocket.isSome()): if (maybeSocket.isSome()):
let socket = maybeSocket.unsafeGet() let socket = maybeSocket.unsafeGet()
case p.header.pType case p.header.pType
@ -155,12 +232,31 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) =
notice "Received ST_FIN on known socket" notice "Received ST_FIN on known socket"
of ST_STATE: of ST_STATE:
notice "Received ST_STATE on known socket" notice "Received ST_STATE on known socket"
# acks is the number of packets that was acked, in normal case - no selective
# acks, no losses, no resends, it will usually be equal to 1
let acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets)
socket.ackPackets(acks)
if (socket.state == SynSent): if (socket.state == SynSent):
socket.state = Connected socket.state = Connected
socket.ackNr = p.header.seqNr # TODO reference implementation sets ackNr (p.header.seqNr - 1), although
# spec mention that it should be equal p.header.seqNr. For now follow the
# reference impl to be compatible with it. Later investigate trin compatibility.
socket.ackNr = p.header.seqNr - 1
# In case of SynSent complate the future as last thing to make sure user of libray will
# receive socket in correct state
socket.connectionFuture.complete(socket) socket.connectionFuture.complete(socket)
# TODO to finish handhske we should respond with ST_DATA packet, without it
# socket is left in half-open state # number of packets past the expected
# ack_nr is the last acked, seq_nr is the
# current. Subtracring 1 makes 0 mean "this is the next expected packet"
let pastExpected = pkSeqNr - socket.ackNr - 1
# TODO to finish handhske we should respond with ST_DATA packet, without it
# socket is left in half-open state.
# Actual reference implementation waits for user to send data, as it assumes
# existence of application level handshake over utp. We may need to modify this
# to automaticly send ST_DATA .
of ST_RESET: of ST_RESET:
# TODO not implemented # TODO not implemented
notice "Received ST_RESET on known socket" notice "Received ST_RESET on known socket"
@ -173,12 +269,10 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) =
if (p.header.pType == ST_SYN): if (p.header.pType == ST_SYN):
# Initial ackNr is set to incoming packer seqNr # Initial ackNr is set to incoming packer seqNr
let incomingSocket = initIncomingSocket(sender, p.header.connectionId, p.header.seqNr, prot.rng[]) let incomingSocket = initIncomingSocket(sender, p.header.connectionId, p.header.seqNr, prot.rng[])
let socketKey = UtpSocketKey(remoteAddress: incomingSocket.remoteAddress, rcvId: incomingSocket.connectionIdRcv) prot.activeSockets.registerUtpSocket(incomingSocket.getSocketKey(), incomingSocket)
prot.activeSockets.registerUtpSocket(socketKey, incomingSocket) let encodedAck= encodePacket(incomingSocket.getAckPacket())
let synAck = incomingSocket.ack()
let encoded = encodePacket(synAck)
# TODO sending should be done from UtpSocket context # TODO sending should be done from UtpSocket context
discard prot.transport.sendTo(sender, encoded) discard prot.transport.sendTo(sender, encodedAck)
notice "Received ST_SYN and socket is not known" notice "Received ST_SYN and socket is not known"
else: else:
# TODO not implemented # TODO not implemented
@ -189,16 +283,13 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) =
# TODO not implemented # TODO not implemented
proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] = proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] =
let socket = initOutgoingSocket(address, p.rng[]) let socket = initOutgoingSocket(address, p.rng[])
let socketKey = UtpSocketKey(remoteAddress: socket.remoteAddress, rcvId: socket.connectionIdRcv) p.activeSockets.registerUtpSocket(socket.getSocketKey(), socket)
# TODO Buffer in syn packet should be based on our current buffer size let synEncoded = socket.initSynPacket()
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576) notice "Sending packet", packet = synEncoded
notice "Sending packet", packet = packet
let packetEncoded = encodePacket(packet)
p.activeSockets.registerUtpSocket(socketKey, socket)
# TODO add callback to handle errors and cancellation i.e unregister socket on # TODO add callback to handle errors and cancellation i.e unregister socket on
# send error and finish connection future with failure # send error and finish connection future with failure
# sending should be done from UtpSocketContext # sending should be done from UtpSocketContext
discard p.transport.sendTo(address, packetEncoded) discard p.transport.sendTo(address, synEncoded)
return socket.connectionFuture return socket.connectionFuture
proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): proc processDatagram(transp: DatagramTransport, raddr: TransportAddress):

View File

@ -8,4 +8,5 @@
import import
./test_packets, ./test_packets,
./test_protocol ./test_protocol,
./test_buffer

88
tests/utp/test_buffer.nim Normal file
View File

@ -0,0 +1,88 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
unittest,
../../eth/utp/growable_buffer
suite "Utp ring buffer":
test "Empty buffer":
let buff = GrowableCircularBuffer[int].init(size = 4)
check:
buff.len() == 4
buff.get(0).isNone()
test "Adding elements to buffer":
var buff = GrowableCircularBuffer[int].init(size = 4)
buff.put(11, 11)
buff.put(12, 12)
buff.put(13, 13)
buff.put(14, 14)
check:
buff.get(11) == some(11)
buff.get(12) == some(12)
buff.get(13) == some(13)
buff.get(14) == some(14)
test "Deleting elements from buffer":
var buff = GrowableCircularBuffer[int].init(size = 4)
buff.put(11, 11)
check:
buff.get(11) == some(11)
buff.delete(11)
check:
buff.get(11) == none[int]()
test "Adding elements to buffer while ensuring proper size":
var buff = GrowableCircularBuffer[int].init(size = 4)
buff.put(11, 11)
buff.put(12, 12)
buff.put(13, 13)
buff.put(14, 14)
# next element will be 5 in buffer, so it has index equal to 4
buff.ensureSize(15, 4)
buff.put(15, 15)
check:
# it growed to next power of two
buff.len() == 8
buff.get(11) == some(11)
buff.get(12) == some(12)
buff.get(13) == some(13)
buff.get(14) == some(14)
buff.get(15) == some(15)
test "Adding out of order elements to buffer while ensuring proper size":
var buff = GrowableCircularBuffer[int].init(size = 4)
buff.put(11, 11)
buff.put(12, 12)
buff.put(13, 13)
buff.put(14, 14)
# element with nr 17 will be on needed on index 6
buff.ensureSize(17, 6)
buff.put(17, 17)
check:
# it growed to next power of two
buff.len() == 8
buff.get(11) == some(11)
buff.get(12) == some(12)
buff.get(13) == some(13)
buff.get(14) == some(14)
# elements 15 and 16 are not present yet
buff.get(15) == none[int]()
buff.get(16) == none[int]()
buff.get(17) == some(17)

View File

@ -8,8 +8,8 @@
import import
unittest, unittest,
../eth/utp/packets, ../../eth/utp/packets,
../../eth/keys ../../eth/keys
suite "Utp packets encoding/decoding": suite "Utp packets encoding/decoding":

View File

@ -26,6 +26,9 @@ procSuite "Utp protocol tests":
check: check:
sock.isConnected() sock.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
sock.numPacketsInOutGoingBuffer() == 0
await utpProt1.closeWait() await utpProt1.closeWait()
await utpProt2.closeWait() await utpProt2.closeWait()