diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 708f8c5..90bf33c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -65,7 +65,7 @@ jobs: uses: actions/cache@v1 with: path: rocks-db-cache-${{ matrix.target.cpu }} - key: 'rocksdb-${{ matrix.target.os }}-${{ matrix.target.cpu }}' + key: 'rocksdb-v1-${{ matrix.target.os }}-${{ matrix.target.cpu }}' - name: Build and install rocksdb (Linux i386) # no librocksdb-dev:i386 diff --git a/eth/utp/growable_buffer.nim b/eth/utp/growable_buffer.nim new file mode 100644 index 0000000..7ba4194 --- /dev/null +++ b/eth/utp/growable_buffer.nim @@ -0,0 +1,67 @@ +import + std/[options, math] + +export options + +# Buffer implementation similar to the one in in reference implementation. +# Main rationale for it, is to refer to items in buffer by their sequence number, +# and support out of order packets. +# Therefore it is super specific data structure, and it mostly usefull for +# utp implementation. +# Another alternative would be to use standard deque from deques module, and caluclate +# item indexes from their sequence numbers. +type GrowableCircularBuffer*[A] = object + items: seq[Option[A]] + mask: int + +# provided size will always be adjusted to next power of two +proc init*[A](T: type GrowableCircularBuffer[A], size: Natural = 16): T = + let powOfTwoSize = nextPowerOfTwo(size) + T( + items: newSeq[Option[A]](size), + mask: powOfTwoSize - 1 + ) + +proc get*[A](buff: GrowableCircularBuffer[A], i: Natural): Option[A] = + buff.items[i and buff.mask] + +proc putImpl[A](buff: var GrowableCircularBuffer[A], i: Natural, elem: Option[A]) = + buff.items[i and buff.mask] = elem + +proc put*[A](buff: var GrowableCircularBuffer[A], i: Natural, elem: A) = + buff.putImpl(i, some(elem)) + +proc delete*[A](buff: var GrowableCircularBuffer[A], i: Natural) = + buff.putImpl(i, none[A]()) + +proc len*[A](buff: GrowableCircularBuffer[A]): int = + buff.mask + 1 + +# Item contains the element we want to make space for +# index is the index in the list. +proc ensureSize*[A](buff: var GrowableCircularBuffer[A], item: Natural, index: Natural) = + # Increase size until is next power of 2 which consists given index + proc getNextSize(currentSize: int, index: int): int = + var newSize = currentSize + while true: + newSize = newSize * 2 + if not (index >= newSize): + break + newSize + + if (index > buff.mask): + let currentSize = buff.mask + 1 + let newSize = getNextSize(currentSize, index) + let newMask = newSize - 1 + var newSeq = newSeq[Option[A]](newSize) + var i = 0 + while i <= buff.mask: + let idx = item - index + i + newSeq[idx and newMask] = buff.get(idx) + inc i + buff.items = move(newSeq) + buff.mask = newMask + +iterator items*[A](buff: GrowableCircularBuffer[A]): Option[A] = + for e in buff.items: + yield e diff --git a/eth/utp/packets.nim b/eth/utp/packets.nim index a14ad6a..b9d40b7 100644 --- a/eth/utp/packets.nim +++ b/eth/utp/packets.nim @@ -33,9 +33,13 @@ type extension*: uint8 connectionId*: uint16 timestamp*: MicroSeconds + # This is the difference between the local time, at the time the last packet + # was received, and the timestamp in this last received packet timestampDiff*: MicroSeconds + # The window size is the number of bytes currently in-flight, i.e. sent but not acked wndSize*: uint32 seqNr*: uint16 + # sequence number the sender of the packet last received in the other direction ackNr*: uint16 Packet* = object diff --git a/eth/utp/utp.nim b/eth/utp/utp.nim index a460e63..bade34f 100644 --- a/eth/utp/utp.nim +++ b/eth/utp/utp.nim @@ -24,4 +24,6 @@ when isMainModule: let remoteServer = initTAddress("127.0.0.1", 9078) let soc = waitFor utpProt.connectTo(remoteServer) + doAssert(soc.numPacketsInOutGoingBuffer() == 0) + waitFor utpProt.closeWait() diff --git a/eth/utp/utp_protocol.nim b/eth/utp/utp_protocol.nim index 3ebd4d7..c2e72d4 100644 --- a/eth/utp/utp_protocol.nim +++ b/eth/utp/utp_protocol.nim @@ -10,6 +10,7 @@ import std/[tables, options, hashes], chronos, chronicles, bearssl, ./packets, + ./growable_buffer, ../keys logScope: @@ -41,16 +42,30 @@ type seqNr: uint16 # All seq number up to this havve been correctly acked by us ackNr: uint16 - + # Should be completed after succesful connection to remote host. # TODO check if nim gc handles properly cyclic references, as this future will # contain reference to socket which hold this future. # If that is not the case, then this future will need to be hold independly connectionFuture: Future[UtpSocket] + + # the number of packets in the send queue. Packets that haven't + # yet been sent count as well as packets marked as needing resend + # the oldest un-acked packet in the send queue is seq_nr - cur_window_packets + curWindowPackets: uint16 + + # out going buffer for all send packets + outBuffer: GrowableCircularBuffer[Packet] + + # incoming buffer for out of order packets + inBuffer: GrowableCircularBuffer[Packet] UtpSocketsContainerRef = ref object sockets: Table[UtpSocketKey, UtpSocket] + AckResult = enum + PacketAcked, PacketAlreadyAcked, PacketNotSentYet + # For now utp protocol is tied to udp transport, but ultimatly we would like to # abstract underlying transport to be able to run utp over udp, discoveryv5 or # maybe some test transport @@ -62,6 +77,9 @@ type proc new(T: type UtpSocketsContainerRef): T = UtpSocketsContainerRef(sockets: initTable[UtpSocketKey, UtpSocket]()) +proc init(T: type UtpSocketKey, remoteAddress: TransportAddress, rcvId: uint16): T = + UtpSocketKey(remoteAddress: remoteAddress, rcvId: rcvId) + # This should probably be defined in TransportAddress module, as hash function should # be consitent with equality function # in nim zero arrays always have hash equal to 0, irrespectively of array size, to @@ -117,7 +135,9 @@ proc initOutgoingSocket(to: TransportAddress, rng: var BrHmacDrbgContext): UtpSo connectionIdRcv: rcvConnectionId, connectionIdSnd: sndConnectionId, seqNr: initialSeqNr, - connectionFuture: newFuture[UtpSocket]() + connectionFuture: newFuture[UtpSocket](), + outBuffer: GrowableCircularBuffer[Packet].init(), + inBuffer: GrowableCircularBuffer[Packet].init() ) proc initIncomingSocket(to: TransportAddress, connectionId: uint16, ackNr: uint16, rng: var BrHmacDrbgContext): UtpSocket = @@ -129,21 +149,78 @@ proc initIncomingSocket(to: TransportAddress, connectionId: uint16, ackNr: uint1 connectionIdSnd: connectionId, seqNr: initialSeqNr, ackNr: ackNr, - connectionFuture: newFuture[UtpSocket]() + connectionFuture: newFuture[UtpSocket](), + outBuffer: GrowableCircularBuffer[Packet].init(), + inBuffer: GrowableCircularBuffer[Packet].init() ) -proc ack(socket: UtpSocket): Packet = +proc getAckPacket(socket: UtpSocket): Packet = ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576) +proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult = + let packetOpt = socket.outBuffer.get(seqNr) + if packetOpt.isSome(): + let packet = packetOpt.get() + # TODO Add number of transmision to each packet to track which packet was sent + # how many times, and handle here case when we try to ack packet which was not + # sent yet + socket.outBuffer.delete(seqNr) + # TODO Update estimates about roundtrip time, when we are acking packed which + # acked without re sends + PacketAcked + else: + # the packet has already been acked (or not sent) + PacketAlreadyAcked + +proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) = + var i = 0 + while i < int(nrPacketsToack): + let result = socket.ackPacket(socket.seqNr - socket.curWindowPackets) + case result + of PacketAcked: + dec socket.curWindowPackets + of PacketAlreadyAcked: + dec socket.curWindowPackets + of PacketNotSentYet: + debug "Tried to ack packed which was not sent yet" + break + + inc i + +proc getSocketKey(socket: UtpSocket): UtpSocketKey = + UtpSocketKey.init(socket.remoteAddress, socket.connectionIdRcv) + +proc initSynPacket(socket: UtpSocket): seq[byte] = + assert(socket.state == SynSent) + let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576) + socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets) + socket.outBuffer.put(socket.seqNr, packet) + inc socket.seqNr + inc socket.curWindowPackets + encodePacket(packet) + proc isConnected*(socket: UtpSocket): bool = socket.state == Connected +# Check how many packets are still in the out going buffer, usefull for tests or +# debugging. +# It throws assertion error when number of elements in buffer do not equal kept counter +proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int = + var num = 0 + for e in socket.outBuffer.items(): + if e.isSome(): + inc num + assert(num == int(socket.curWindowPackets)) + num + # TODO not implemented # for now just log incoming packets proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) = notice "Received packet ", packet = p - let socketKey = UtpSocketKey(remoteAddress: sender, rcvId: p.header.connectionId) + let socketKey = UtpSocketKey.init(sender, p.header.connectionId) let maybeSocket = prot.activeSockets.getUtpSocket(socketKey) + let pkSeqNr = p.header.seqNr + let pkAckNr = p.header.ackNr if (maybeSocket.isSome()): let socket = maybeSocket.unsafeGet() case p.header.pType @@ -155,12 +232,31 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) = notice "Received ST_FIN on known socket" of ST_STATE: notice "Received ST_STATE on known socket" + # acks is the number of packets that was acked, in normal case - no selective + # acks, no losses, no resends, it will usually be equal to 1 + let acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets) + socket.ackPackets(acks) + if (socket.state == SynSent): socket.state = Connected - socket.ackNr = p.header.seqNr + # TODO reference implementation sets ackNr (p.header.seqNr - 1), although + # spec mention that it should be equal p.header.seqNr. For now follow the + # reference impl to be compatible with it. Later investigate trin compatibility. + socket.ackNr = p.header.seqNr - 1 + # In case of SynSent complate the future as last thing to make sure user of libray will + # receive socket in correct state socket.connectionFuture.complete(socket) - # TODO to finish handhske we should respond with ST_DATA packet, without it - # socket is left in half-open state + + # number of packets past the expected + # ack_nr is the last acked, seq_nr is the + # current. Subtracring 1 makes 0 mean "this is the next expected packet" + let pastExpected = pkSeqNr - socket.ackNr - 1 + + # TODO to finish handhske we should respond with ST_DATA packet, without it + # socket is left in half-open state. + # Actual reference implementation waits for user to send data, as it assumes + # existence of application level handshake over utp. We may need to modify this + # to automaticly send ST_DATA . of ST_RESET: # TODO not implemented notice "Received ST_RESET on known socket" @@ -173,12 +269,10 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) = if (p.header.pType == ST_SYN): # Initial ackNr is set to incoming packer seqNr let incomingSocket = initIncomingSocket(sender, p.header.connectionId, p.header.seqNr, prot.rng[]) - let socketKey = UtpSocketKey(remoteAddress: incomingSocket.remoteAddress, rcvId: incomingSocket.connectionIdRcv) - prot.activeSockets.registerUtpSocket(socketKey, incomingSocket) - let synAck = incomingSocket.ack() - let encoded = encodePacket(synAck) + prot.activeSockets.registerUtpSocket(incomingSocket.getSocketKey(), incomingSocket) + let encodedAck= encodePacket(incomingSocket.getAckPacket()) # TODO sending should be done from UtpSocket context - discard prot.transport.sendTo(sender, encoded) + discard prot.transport.sendTo(sender, encodedAck) notice "Received ST_SYN and socket is not known" else: # TODO not implemented @@ -189,16 +283,13 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) = # TODO not implemented proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] = let socket = initOutgoingSocket(address, p.rng[]) - let socketKey = UtpSocketKey(remoteAddress: socket.remoteAddress, rcvId: socket.connectionIdRcv) - # TODO Buffer in syn packet should be based on our current buffer size - let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576) - notice "Sending packet", packet = packet - let packetEncoded = encodePacket(packet) - p.activeSockets.registerUtpSocket(socketKey, socket) + p.activeSockets.registerUtpSocket(socket.getSocketKey(), socket) + let synEncoded = socket.initSynPacket() + notice "Sending packet", packet = synEncoded # TODO add callback to handle errors and cancellation i.e unregister socket on # send error and finish connection future with failure # sending should be done from UtpSocketContext - discard p.transport.sendTo(address, packetEncoded) + discard p.transport.sendTo(address, synEncoded) return socket.connectionFuture proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): diff --git a/tests/utp/all_utp_tests.nim b/tests/utp/all_utp_tests.nim index 6a3b401..d2f23a8 100644 --- a/tests/utp/all_utp_tests.nim +++ b/tests/utp/all_utp_tests.nim @@ -8,4 +8,5 @@ import ./test_packets, - ./test_protocol + ./test_protocol, + ./test_buffer diff --git a/tests/utp/test_buffer.nim b/tests/utp/test_buffer.nim new file mode 100644 index 0000000..0610036 --- /dev/null +++ b/tests/utp/test_buffer.nim @@ -0,0 +1,88 @@ +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.used.} + +import + unittest, + ../../eth/utp/growable_buffer + +suite "Utp ring buffer": + test "Empty buffer": + let buff = GrowableCircularBuffer[int].init(size = 4) + check: + buff.len() == 4 + buff.get(0).isNone() + + test "Adding elements to buffer": + var buff = GrowableCircularBuffer[int].init(size = 4) + buff.put(11, 11) + buff.put(12, 12) + buff.put(13, 13) + buff.put(14, 14) + + check: + buff.get(11) == some(11) + buff.get(12) == some(12) + buff.get(13) == some(13) + buff.get(14) == some(14) + + test "Deleting elements from buffer": + var buff = GrowableCircularBuffer[int].init(size = 4) + buff.put(11, 11) + + check: + buff.get(11) == some(11) + + buff.delete(11) + + check: + buff.get(11) == none[int]() + + test "Adding elements to buffer while ensuring proper size": + var buff = GrowableCircularBuffer[int].init(size = 4) + + buff.put(11, 11) + buff.put(12, 12) + buff.put(13, 13) + buff.put(14, 14) + + # next element will be 5 in buffer, so it has index equal to 4 + buff.ensureSize(15, 4) + buff.put(15, 15) + + check: + # it growed to next power of two + buff.len() == 8 + buff.get(11) == some(11) + buff.get(12) == some(12) + buff.get(13) == some(13) + buff.get(14) == some(14) + buff.get(15) == some(15) + + test "Adding out of order elements to buffer while ensuring proper size": + var buff = GrowableCircularBuffer[int].init(size = 4) + + buff.put(11, 11) + buff.put(12, 12) + buff.put(13, 13) + buff.put(14, 14) + + # element with nr 17 will be on needed on index 6 + buff.ensureSize(17, 6) + buff.put(17, 17) + + check: + # it growed to next power of two + buff.len() == 8 + buff.get(11) == some(11) + buff.get(12) == some(12) + buff.get(13) == some(13) + buff.get(14) == some(14) + # elements 15 and 16 are not present yet + buff.get(15) == none[int]() + buff.get(16) == none[int]() + buff.get(17) == some(17) diff --git a/tests/utp/test_packets.nim b/tests/utp/test_packets.nim index cd3249b..d10e017 100644 --- a/tests/utp/test_packets.nim +++ b/tests/utp/test_packets.nim @@ -8,8 +8,8 @@ import unittest, - ../eth/utp/packets, - ../../eth/keys + ../../eth/utp/packets, + ../../eth/keys suite "Utp packets encoding/decoding": diff --git a/tests/utp/test_protocol.nim b/tests/utp/test_protocol.nim index 65f804d..888cbbd 100644 --- a/tests/utp/test_protocol.nim +++ b/tests/utp/test_protocol.nim @@ -26,6 +26,9 @@ procSuite "Utp protocol tests": check: sock.isConnected() + # after successful connection outgoing buffer should be empty as syn packet + # should be correctly acked + sock.numPacketsInOutGoingBuffer() == 0 await utpProt1.closeWait() await utpProt2.closeWait()