From 88795c64771c5a24266ef309e31fb5b31b945aee Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Tue, 19 Oct 2021 13:36:57 +0200 Subject: [PATCH] Add sending and receiving data procedures (#407) * Add sending and receiving data procedures --- eth/utp/packets.nim | 20 +++- eth/utp/utp.nim | 19 +++- eth/utp/utp_protocol.nim | 192 ++++++++++++++++++++++++++++----- tests/utp/test_protocol.nim | 206 ++++++++++++++++++++++++++++++++++-- 4 files changed, 404 insertions(+), 33 deletions(-) diff --git a/eth/utp/packets.nim b/eth/utp/packets.nim index b9d40b7..38a20c5 100644 --- a/eth/utp/packets.nim +++ b/eth/utp/packets.nim @@ -54,7 +54,7 @@ type # For now we can use basic monotime, later it would be good to analyze: # https://github.com/bittorrent/libutp/blob/master/utp_utils.cpp, to check all the # timing assumptions on different platforms -proc getMonoTimeTimeStamp(): uint32 = +proc getMonoTimeTimeStamp*(): uint32 = let time = getMonoTime() cast[uint32](time.ticks() div 1000) @@ -170,3 +170,21 @@ proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSiz ) Packet(header: h, payload: @[]) + +proc dataPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32, payload: seq[byte]): Packet = + let h = PacketHeaderV1( + pType: ST_DATA, + version: protocolVersion, + # data packets always have extension field set to 0 + extension: 0'u8, + connectionId: sndConnectionId, + timestamp: getMonoTimeTimeStamp(), + # TODO for not we are using 0, but this value should be calculated on socket + # level + timestampDiff: 0'u32, + wndSize: bufferSize, + seqNr: seqNr, + ackNr: ackNr + ) + + Packet(header: h, payload: payload) diff --git a/eth/utp/utp.nim b/eth/utp/utp.nim index bade34f..9c4dd51 100644 --- a/eth/utp/utp.nim +++ b/eth/utp/utp.nim @@ -7,7 +7,7 @@ {.push raises: [Defect].} import - chronos, + chronos, stew/byteutils, ./utp_protocol # Exemple application to interact with reference implementation server to help with implementation @@ -17,13 +17,28 @@ import # 3. make # 4. ./ucat -ddddd -l -p 9078 - it will run utp server on port 9078 when isMainModule: + proc echoIncomingSocketCallBack(): AcceptConnectionCallback = + return ( + proc (server: UtpProtocol, client: UtpSocket): Future[void] {.gcsafe, raises: [Defect].} = + echo "received incoming connection" + let fakeFuture = newFuture[void]() + fakeFuture.complete() + return fakeFuture + ) # TODO read client/server ports and address from cmd line or config file let localAddress = initTAddress("0.0.0.0", 9077) - let utpProt = UtpProtocol.new(localAddress) + let utpProt = UtpProtocol.new(echoIncomingSocketCallBack(), localAddress) let remoteServer = initTAddress("127.0.0.1", 9078) let soc = waitFor utpProt.connectTo(remoteServer) doAssert(soc.numPacketsInOutGoingBuffer() == 0) + let helloUtp = "Helllo from nim implementation" + let bytes = helloUtp.toBytes() + + waitFor soc.write(bytes) + + runForever() + waitFor utpProt.closeWait() diff --git a/eth/utp/utp_protocol.nim b/eth/utp/utp_protocol.nim index c2e72d4..24898f3 100644 --- a/eth/utp/utp_protocol.nim +++ b/eth/utp/utp_protocol.nim @@ -60,6 +60,11 @@ type # incoming buffer for out of order packets inBuffer: GrowableCircularBuffer[Packet] + # rcvBuffer + buffer: AsyncBuffer + + utpProt: UtpProtocol + UtpSocketsContainerRef = ref object sockets: Table[UtpSocketKey, UtpSocket] @@ -72,8 +77,22 @@ type UtpProtocol* = ref object transport: DatagramTransport activeSockets: UtpSocketsContainerRef + acceptConnectionCb: AcceptConnectionCallback rng*: ref BrHmacDrbgContext + ## New remote client connection callback + ## ``server`` - UtpProtocol object. + ## ``client`` - accepted client utp socket. + AcceptConnectionCallback* = proc(server: UtpProtocol, + client: UtpSocket): Future[void] {.gcsafe, raises: [Defect].} + +const + # Maximal number of payload bytes per packet. Total packet size will be equal to + # mtuSize + sizeof(header) = 600 bytes + # TODO for now it is just some random value. Ultimatly this value should be dynamically + # adjusted based on traffic. + mtuSize = 580 + proc new(T: type UtpSocketsContainerRef): T = UtpSocketsContainerRef(sockets: initTable[UtpSocketKey, UtpSocket]()) @@ -124,7 +143,7 @@ proc registerUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey, socket: UtpSo # TODO Handle duplicates s.sockets[k] = socket -proc initOutgoingSocket(to: TransportAddress, rng: var BrHmacDrbgContext): UtpSocket = +proc initOutgoingSocket(to: TransportAddress, p: UtpProtocol, rng: var BrHmacDrbgContext): UtpSocket = # TODO handle possible clashes and overflows let rcvConnectionId = randUint16(rng) let sndConnectionId = rcvConnectionId + 1 @@ -137,10 +156,14 @@ proc initOutgoingSocket(to: TransportAddress, rng: var BrHmacDrbgContext): UtpSo seqNr: initialSeqNr, connectionFuture: newFuture[UtpSocket](), outBuffer: GrowableCircularBuffer[Packet].init(), - inBuffer: GrowableCircularBuffer[Packet].init() + inBuffer: GrowableCircularBuffer[Packet].init(), + # Default 1MB buffer + # TODO add posibility to configure buffer size + buffer: AsyncBuffer.init(1024 * 1024), + utpProt: p ) -proc initIncomingSocket(to: TransportAddress, connectionId: uint16, ackNr: uint16, rng: var BrHmacDrbgContext): UtpSocket = +proc initIncomingSocket(to: TransportAddress, p: UtpProtocol, connectionId: uint16, ackNr: uint16, rng: var BrHmacDrbgContext): UtpSocket = let initialSeqNr = randUint16(rng) UtpSocket( remoteAddress: to, @@ -151,10 +174,15 @@ proc initIncomingSocket(to: TransportAddress, connectionId: uint16, ackNr: uint1 ackNr: ackNr, connectionFuture: newFuture[UtpSocket](), outBuffer: GrowableCircularBuffer[Packet].init(), - inBuffer: GrowableCircularBuffer[Packet].init() + inBuffer: GrowableCircularBuffer[Packet].init(), + # Default 1MB buffer + # TODO add posibility to configure buffer size + buffer: AsyncBuffer.init(1024 * 1024), + utpProt: p ) -proc getAckPacket(socket: UtpSocket): Packet = +proc createAckPacket(socket: UtpSocket): Packet = + ## Creates ack packet based on the socket current state ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576) proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult = @@ -202,6 +230,17 @@ proc initSynPacket(socket: UtpSocket): seq[byte] = proc isConnected*(socket: UtpSocket): bool = socket.state == Connected +template readLoop(body: untyped): untyped = + while true: + # TODO error handling + let (consumed, done) = body + socket.buffer.shift(consumed) + if done: + break + else: + # TODO add condition to handle socket closing + await socket.buffer.wait() + # Check how many packets are still in the out going buffer, usefull for tests or # debugging. # It throws assertion error when number of elements in buffer do not equal kept counter @@ -213,20 +252,119 @@ proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int = assert(num == int(socket.curWindowPackets)) num -# TODO not implemented -# for now just log incoming packets -proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) = +proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] = + socket.utpProt.transport.sendTo(socket.remoteAddress, data) + +proc sendPacket(socket: UtpSocket, packet: Packet): Future[void] = + socket.sendData(encodePacket(packet)) + +proc flushPackets(socket: UtpSocket) {.async.} = + var i: uint16 = socket.seqNr - socket.curWindowPackets + while i != socket.seqNr: + let maybePacket = socket.outBuffer.get(i) + if (maybePacket.isSome()): + let p = maybePacket.get() + # TODO we should keep encoded packets in outgoing buffer to avoid, re-encoding + # them with each resend + await socket.sendData(encodePacket(p)) + inc i + +proc getPacketSize(socket: UtpSocket): int = + # TODO currently returning constant, ultimatly it should be bases on mtu estimates + mtuSize + +proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} = + var bytesWritten = 0 + # TODO + # Handle different socket state i.e do not write when socket is full or not + # connected + # Handle growing of send window + + if len(data) == 0: + return bytesWritten + + let pSize = socket.getPacketSize() + let endIndex = data.high() + var i = 0 + while i <= data.high: + let lastIndex = i + pSize - 1 + let lastOrEnd = min(lastIndex, endIndex) + let dataSlice = data[i..lastOrEnd] + let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576, dataSlice) + socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets) + socket.outBuffer.put(socket.seqNr, dataPacket) + inc socket.seqNr + inc socket.curWindowPackets + bytesWritten = bytesWritten + len(dataSlice) + i = lastOrEnd + 1 + await socket.flushPackets() + return bytesWritten + +proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}= + ## Read all bytes `n` bytes from socket ``socket``. + ## + ## This procedure allocates buffer seq[byte] and return it as result. + var bytes = newSeq[byte]() + + if n == 0: + return bytes + + readLoop(): + # TODO Add handling of socket closing + let count = min(socket.buffer.dataLen(), n - len(bytes)) + bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1)) + (count, len(bytes) == n) + + return bytes + +proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.async.}= notice "Received packet ", packet = p let socketKey = UtpSocketKey.init(sender, p.header.connectionId) let maybeSocket = prot.activeSockets.getUtpSocket(socketKey) let pkSeqNr = p.header.seqNr let pkAckNr = p.header.ackNr + if (maybeSocket.isSome()): let socket = maybeSocket.unsafeGet() + case p.header.pType of ST_DATA: - # TODO not implemented + # To avoid amplification attacks, server socket is in SynRecv state until + # it receices first data transfer + # https://www.usenix.org/system/files/conference/woot15/woot15-paper-adamsky.pdf + # TODO when intgrating with discv5 this need to be configurable + if (socket.state == SynRecv): + socket.state = Connected + notice "Received ST_DATA on known socket" + # number of packets past the expected + # ack_nr is the last acked, seq_nr is the + # current. Subtracring 1 makes 0 mean "this is the next expected packet" + let pastExpected = pkSeqNr - socket.ackNr - 1 + + if (pastExpected == 0): + # we are getting in order data packet, we can flush data directly to the incoming buffer + await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) + + # TODO handle the case when there may be some packets in incoming buffer which + # are direct extension of this packet and therefore we could pass also their + # content to upper layer. This may need to be done when handling selective + # acks. + + # Bytes have been passed to upper layer, we can increase number of last + # acked packet + inc socket.ackNr + + # TODO for now we just schedule concurrent task with ack sending. It may + # need improvement, as with this approach there is no direct control over + # how many concurrent tasks there are and how to cancel them when socket + # is closed + let ack = socket.createAckPacket() + asyncSpawn socket.sendPacket(ack) + else: + # TODO handle out of order packets + notice "Got out of order packet" + of ST_FIN: # TODO not implemented notice "Received ST_FIN on known socket" @@ -246,12 +384,6 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) = # In case of SynSent complate the future as last thing to make sure user of libray will # receive socket in correct state socket.connectionFuture.complete(socket) - - # number of packets past the expected - # ack_nr is the last acked, seq_nr is the - # current. Subtracring 1 makes 0 mean "this is the next expected packet" - let pastExpected = pkSeqNr - socket.ackNr - 1 - # TODO to finish handhske we should respond with ST_DATA packet, without it # socket is left in half-open state. # Actual reference implementation waits for user to send data, as it assumes @@ -268,11 +400,18 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) = # SynPacket we should reject it and send rst packet to sender in some cases if (p.header.pType == ST_SYN): # Initial ackNr is set to incoming packer seqNr - let incomingSocket = initIncomingSocket(sender, p.header.connectionId, p.header.seqNr, prot.rng[]) + let incomingSocket = initIncomingSocket(sender, prot, p.header.connectionId, p.header.seqNr, prot.rng[]) prot.activeSockets.registerUtpSocket(incomingSocket.getSocketKey(), incomingSocket) - let encodedAck= encodePacket(incomingSocket.getAckPacket()) - # TODO sending should be done from UtpSocket context - discard prot.transport.sendTo(sender, encodedAck) + # Make sure ack was flushed onto datagram socket before passing connction + # to upper layer + await incomingSocket.sendPacket(incomingSocket.createAckPacket()) + # TODO By default (when we have utp over udp) socket here is passed to upper layer + # in SynRecv state, which is not writeable i.e user of socket cannot write + # data to it unless some data will be received. This is counter measure to + # amplification attacks. + # During integration with discovery v5 (i.e utp over discovv5), we must re-think + # this. + asyncSpawn prot.acceptConnectionCb(prot, incomingSocket) notice "Received ST_SYN and socket is not known" else: # TODO not implemented @@ -282,14 +421,14 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) = # Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 # TODO not implemented proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] = - let socket = initOutgoingSocket(address, p.rng[]) + let socket = initOutgoingSocket(address, p, p.rng[]) p.activeSockets.registerUtpSocket(socket.getSocketKey(), socket) let synEncoded = socket.initSynPacket() notice "Sending packet", packet = synEncoded # TODO add callback to handle errors and cancellation i.e unregister socket on # send error and finish connection future with failure # sending should be done from UtpSocketContext - discard p.transport.sendTo(address, synEncoded) + discard socket.sendData(synEncoded) return socket.connectionFuture proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): @@ -303,13 +442,18 @@ proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): let dec = decodePacket(buf) if (dec.isOk()): - processPacket(utpProt, dec.get(), raddr) + await processPacket(utpProt, dec.get(), raddr) else: warn "failed to decode packet from address", address = raddr -proc new*(T: type UtpProtocol, address: TransportAddress, rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} = +proc new*( + T: type UtpProtocol, + acceptConnectionCb: AcceptConnectionCallback, + address: TransportAddress, + rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} = + doAssert(not(isNil(acceptConnectionCb))) let activeSockets = UtpSocketsContainerRef.new() - let utp = UtpProtocol(activeSockets: activeSockets, rng: rng) + let utp = UtpProtocol(activeSockets: activeSockets, acceptConnectionCb: acceptConnectionCb, rng: rng) let ta = newDatagramTransport(processDatagram, udata = utp, local = address) utp.transport = ta utp diff --git a/tests/utp/test_protocol.nim b/tests/utp/test_protocol.nim index 888cbbd..c14b4ed 100644 --- a/tests/utp/test_protocol.nim +++ b/tests/utp/test_protocol.nim @@ -7,28 +7,222 @@ {.used.} import - chronos, + sequtils, + chronos, bearssl, testutils/unittests, ../../eth/utp/utp_protocol, ../../eth/keys +proc generateByteArray(rng: var BrHmacDrbgContext, length: int): seq[byte] = + var bytes = newSeq[byte](length) + brHmacDrbgGenerate(rng, bytes) + return bytes + +type AssertionCallback = proc(): bool {.gcsafe, raises: [Defect].} + +proc waitUntil(f: AssertionCallback): Future[void] {.async.} = + while true: + let res = f() + if res: + break + else: + await sleepAsync(milliseconds(50)) + +proc transferData(sender: UtpSocket, receiver: UtpSocket, data: seq[byte]): Future[seq[byte]] {.async.}= + let bytesWritten = await sender.write(data) + doAssert bytesWritten == len(data) + let received = await receiver.read(len(data)) + return received + procSuite "Utp protocol tests": let rng = newRng() - asyncTest "Success connect to remote host": - let address = initTAddress("127.0.0.1", 9079) - let utpProt1 = UtpProtocol.new(address) + proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback = + return ( + proc(server: UtpProtocol, client: UtpSocket): Future[void] = + let fut = newFuture[void]() + event.fire() + fut.complete() + fut + ) + proc setIncomingSocketCallback(socketPromise: Future[UtpSocket]): AcceptConnectionCallback = + return ( + proc(server: UtpProtocol, client: UtpSocket): Future[void] = + let fut = newFuture[void]() + socketPromise.complete(client) + fut.complete() + fut + ) + + asyncTest "Success connect to remote host": + let server1Called = newAsyncEvent() + let address = initTAddress("127.0.0.1", 9079) + let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address) + + var server2Called = newAsyncEvent() let address1 = initTAddress("127.0.0.1", 9080) - let utpProt2 = UtpProtocol.new(address1) + let utpProt2 = UtpProtocol.new(setAcceptedCallback(server2Called), address1) let sock = await utpProt1.connectTo(address1) - + + # this future will be completed when we called accepted connection callback + await server2Called.wait() + check: sock.isConnected() # after successful connection outgoing buffer should be empty as syn packet # should be correctly acked sock.numPacketsInOutGoingBuffer() == 0 + + server2Called.isSet() await utpProt1.closeWait() await utpProt2.closeWait() + + asyncTest "Success data transfer when data fits into one packet": + var server1Called = newAsyncEvent() + let address = initTAddress("127.0.0.1", 9079) + let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address) + + var serverSocketFut = newFuture[UtpSocket]() + let address1 = initTAddress("127.0.0.1", 9080) + let utpProt2 = UtpProtocol.new(setIncomingSocketCallback(serverSocketFut), address1) + + let clientSocket = await utpProt1.connectTo(address1) + + # this future will be completed when we called accepted connection callback + discard await serverSocketFut + + let serverSocket = + try: + serverSocketFut.read() + except: + raiseAssert "Unexpected error when reading finished future" + + check: + clientSocket.isConnected() + # after successful connection outgoing buffer should be empty as syn packet + # should be correctly acked + clientSocket.numPacketsInOutGoingBuffer() == 0 + + # Server socket is not in connected state, until first data transfer + (not serverSocket.isConnected()) + + let bytesToTransfer = generateByteArray(rng[], 100) + + let bytesReceivedFromClient = await transferData(clientSocket, serverSocket, bytesToTransfer) + + check: + bytesToTransfer == bytesReceivedFromClient + serverSocket.isConnected() + + let bytesReceivedFromServer = await transferData(serverSocket, clientSocket, bytesToTransfer) + + check: + bytesToTransfer == bytesReceivedFromServer + + await utpProt1.closeWait() + await utpProt2.closeWait() + + asyncTest "Success data transfer when data need to be sliced into multiple packets": + var server1Called = newAsyncEvent() + let address = initTAddress("127.0.0.1", 9079) + let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address) + + var serverSocketFut = newFuture[UtpSocket]() + let address1 = initTAddress("127.0.0.1", 9080) + let utpProt2 = UtpProtocol.new(setIncomingSocketCallback(serverSocketFut), address1) + + let clientSocket = await utpProt1.connectTo(address1) + + # this future will be completed when we called accepted connection callback + discard await serverSocketFut + + let serverSocket = + try: + serverSocketFut.read() + except: + raiseAssert "Unexpected error when reading finished future" + + check: + clientSocket.isConnected() + # after successful connection outgoing buffer should be empty as syn packet + # should be correctly acked + clientSocket.numPacketsInOutGoingBuffer() == 0 + + (not serverSocket.isConnected()) + + # 5000 bytes is over maximal packet size + let bytesToTransfer = generateByteArray(rng[], 5000) + + let bytesReceivedFromClient = await transferData(clientSocket, serverSocket, bytesToTransfer) + let bytesReceivedFromServer = await transferData(serverSocket, clientSocket, bytesToTransfer) + + # ultimatly all send packets will acked, and outgoing buffer will be empty + await waitUntil(proc (): bool = clientSocket.numPacketsInOutGoingBuffer() == 0) + await waitUntil(proc (): bool = serverSocket.numPacketsInOutGoingBuffer() == 0) + + check: + serverSocket.isConnected() + clientSocket.numPacketsInOutGoingBuffer() == 0 + serverSocket.numPacketsInOutGoingBuffer() == 0 + bytesReceivedFromClient == bytesToTransfer + bytesReceivedFromServer == bytesToTransfer + + await utpProt1.closeWait() + await utpProt2.closeWait() + + asyncTest "Success multiple data transfers when data need to be sliced into multiple packets": + var server1Called = newAsyncEvent() + let address = initTAddress("127.0.0.1", 9079) + let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address) + + var serverSocketFut = newFuture[UtpSocket]() + let address1 = initTAddress("127.0.0.1", 9080) + let utpProt2 = UtpProtocol.new(setIncomingSocketCallback(serverSocketFut), address1) + + let clientSocket = await utpProt1.connectTo(address1) + + # this future will be completed when we called accepted connection callback + discard await serverSocketFut + + let serverSocket = + try: + serverSocketFut.read() + except: + raiseAssert "Unexpected error when reading finished future" + + check: + clientSocket.isConnected() + # after successful connection outgoing buffer should be empty as syn packet + # should be correctly acked + clientSocket.numPacketsInOutGoingBuffer() == 0 + + + # 5000 bytes is over maximal packet size + let bytesToTransfer = generateByteArray(rng[], 5000) + + let written = await clientSocket.write(bytesToTransfer) + + check: + written == len(bytesToTransfer) + + let bytesToTransfer1 = generateByteArray(rng[], 5000) + + let written1 = await clientSocket.write(bytesToTransfer1) + + check: + written1 == len(bytesToTransfer) + + let bytesReceived = await serverSocket.read(len(bytesToTransfer) + len(bytesToTransfer1)) + + # ultimatly all send packets will acked, and outgoing buffer will be empty + await waitUntil(proc (): bool = clientSocket.numPacketsInOutGoingBuffer() == 0) + + check: + clientSocket.numPacketsInOutGoingBuffer() == 0 + bytesToTransfer.concat(bytesToTransfer1) == bytesReceived + + await utpProt1.closeWait() + await utpProt2.closeWait()