From 9f2f1010707f9efc80d711190fdb23ce865413c0 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Mon, 13 Sep 2021 14:54:06 +0200 Subject: [PATCH] Add initial skeleton of utp protocol (#397) * Add initial impl of utp over udp * Add more comments * Add licenses and push declarations * Add tests to nimble task * Pr comments Use better random generator Raise assert error in case of buffer io exception --- eth.nimble | 4 + eth/utp/packets.nim | 156 +++++++++++++++++++++++++++++++++++++ eth/utp/utp.nim | 29 +++++++ eth/utp/utp_protocol.nim | 61 +++++++++++++++ tests/utp/test_packets.nim | 48 ++++++++++++ 5 files changed, 298 insertions(+) create mode 100644 eth/utp/packets.nim create mode 100644 eth/utp/utp.nim create mode 100644 eth/utp/utp_protocol.nim create mode 100644 tests/utp/test_packets.nim diff --git a/eth.nimble b/eth.nimble index 85d801a..6016c80 100644 --- a/eth.nimble +++ b/eth.nimble @@ -69,6 +69,9 @@ task test_db, "Run db tests": task test_ssz, "Run ssz tests": runTest("tests/ssz/all_tests") +task test_utp, "Run utp tests": + runTest("tests/utp/test_packets") + task test, "Run all tests": for filename in [ "test_bloom", @@ -82,6 +85,7 @@ task test, "Run all tests": test_trie_task() test_db_task() test_ssz_task() + test_utp_task() task test_discv5_full, "Run discovery v5 and its dependencies tests": test_keys_task() diff --git a/eth/utp/packets.nim b/eth/utp/packets.nim new file mode 100644 index 0000000..0c03149 --- /dev/null +++ b/eth/utp/packets.nim @@ -0,0 +1,156 @@ +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + std/[monotimes], + faststreams, + stew/[endians2, results, objects], bearssl, + ../p2p/discoveryv5/random2 + +export results + +const minimalHeaderSize = 20 +const protocolVersion = 1 + +type + PacketType* = enum + ST_DATA = 0, + ST_FIN = 1, + ST_STATE = 2, + ST_RESET = 3, + ST_SYN = 4 + + MicroSeconds = uint32 + + PacketHeaderV1 = object + pType*: PacketType + version*: uint8 + extension*: uint8 + connectionId*: uint16 + timestamp*: MicroSeconds + timestampDiff*: MicroSeconds + wndSize*: uint32 + seqNr*: uint16 + ackNr*: uint16 + + Packet* = object + header*: PacketHeaderV1 + payload*: seq[uint8] + +# Important timing assumptions for utp protocol here: +# 1. Microsecond precisions +# 2. Monotonicity +# Reference lib have a lot of checks to assume that this is monotonic on +# every system, and warnings when monotonic clock is not avaialable. +# For now we can use basic monotime, later it would be good to analyze: +# https://github.com/bittorrent/libutp/blob/master/utp_utils.cpp, to check all the +# timing assumptions on different platforms +proc getMonoTimeTimeStamp(): uint32 = + let time = getMonoTime() + cast[uint32](time.ticks() div 1000) + +# Simple generator, not useful for cryptography +proc randUint16*(rng: var BrHmacDrbgContext): uint16 = + uint16(rand(rng, int(high(uint16)))) + +# Simple generator, not useful for cryptography +proc randUint32*(rng: var BrHmacDrbgContext): uint32 = + uint32(rand(rng, int(high(uint32)))) + +proc encodeTypeVer(h: PacketHeaderV1): uint8 = + var typeVer = 0'u8 + let typeOrd = uint8(ord(h.pType)) + typeVer = (typeVer and 0xf0) or (h.version and 0xf) + typeVer = (typeVer and 0xf) or (typeOrd shl 4) + typeVer + +proc encodeHeader*(h: PacketHeaderV1): seq[byte] = + var mem = memoryOutput().s + try: + mem.write(encodeTypeVer(h)) + mem.write(h.extension) + mem.write(h.connectionId.toBytesBE()) + mem.write(h.timestamp.toBytesBE()) + mem.write(h.timestampDiff.toBytesBE()) + mem.write(h.wndSize.toBytesBE()) + mem.write(h.seqNr.toBytesBE()) + mem.write(h.ackNr.toBytesBE()) + return mem.getOutput() + except IOError as e: + # TODO not sure how writing to memory buffer could throw. Raise assertion error if + # its happen for now + raiseAssert e.msg + +proc encodePacket*(p: Packet): seq[byte] = + var mem = memoryOutput().s + try: + mem.write(encodeHeader(p.header)) + if (len(p.payload) > 0): + mem.write(p.payload) + mem.getOutput() + except IOError as e: + # TODO not sure how writing to memory buffer could throw. Raise assertion error if + # its happen for now + raiseAssert e.msg + +# TODO for now we do not handle extensions +proc decodePacket*(bytes: openArray[byte]): Result[Packet, string] = + if len(bytes) < minimalHeaderSize: + return err("invalid header size") + + let version = bytes[0] and 0xf + if version != protocolVersion: + return err("invalid packet version") + + var kind: PacketType + if not checkedEnumAssign(kind, (bytes[0] shr 4)): + return err("Invalid message type") + + let header = + PacketHeaderV1( + pType: kind, + version: version, + extension: bytes[1], + connection_id: fromBytesBE(uint16, [bytes[2], bytes[3]]), + timestamp: fromBytesBE(uint32, [bytes[4], bytes[5], bytes[6], bytes[7]]), + timestamp_diff: fromBytesBE(uint32, [bytes[8], bytes[9], bytes[10], bytes[11]]), + wnd_size: fromBytesBE(uint32, [bytes[12], bytes[13], bytes[14], bytes[15]]), + seq_nr: fromBytesBE(uint16, [bytes[16], bytes[17]]), + ack_nr: fromBytesBE(uint16, [bytes[18], bytes[19]]), + ) + + let payload = + if (len(bytes) == 20): + @[] + else: + bytes[20..^1] + + ok(Packet(header: header, payload: payload)) + +# connectionId - should be random not already used number +# bufferSize - should be pre configured initial buffer size for socket +proc synPacket*(rng: var BrHmacDrbgContext, connectionId: uint16, bufferSize: uint32): Packet = + let h = PacketHeaderV1( + pType: ST_SYN, + version: protocolVersion, + # TODO for we do not handle extensions + extension: 0'u8, + # TODO should be random not used number + connectionId: connectionId, + + timestamp: getMonoTimeTimeStamp(), + + timestampDiff: 0'u32, + # TODO shouldbe current available buffer size + wndSize: bufferSize, + seqNr: randUint16(rng), + # Initialy we did not receive any acks + ackNr: 0'u16 + ) + + Packet(header: h, payload: @[]) diff --git a/eth/utp/utp.nim b/eth/utp/utp.nim new file mode 100644 index 0000000..5f21725 --- /dev/null +++ b/eth/utp/utp.nim @@ -0,0 +1,29 @@ +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + chronos, + ./utp_protocol + +# Exemple application to interact with reference implementation server to help with implementation +# To run lib utp server: +# 1. git clone https://github.com/bittorrent/libutp.git +# 2. cd libutp +# 3. make +# 4. ./ucat -ddddd -l -p 9078 - it will run utp server on port 9078 +when isMainModule: + # TODO read client/server ports and address from cmd line or config file + let localAddress = initTAddress("0.0.0.0", 9077) + let utpProt = UtpProtocol.new(localAddress) + + let remoteServer = initTAddress("0.0.0.0", 9078) + let soc = waitFor utpProt.connectTo(remoteServer) + + # Needed to wait for response from server + waitFor(sleepAsync(100)) + waitFor utpProt.closeWait() diff --git a/eth/utp/utp_protocol.nim b/eth/utp/utp_protocol.nim new file mode 100644 index 0000000..570e08e --- /dev/null +++ b/eth/utp/utp_protocol.nim @@ -0,0 +1,61 @@ +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + chronos, chronicles, bearssl, + ./packets, + ../keys + +logScope: + topics = "utp" + +type + # For now utp protocol is tied to udp transport, but ultimatly we would like to + # abstract underlying transport to be able to run utp over udp, discoveryv5 or + # maybe some test transport + UtpProtocol* = ref object + transport: DatagramTransport + rng*: ref BrHmacDrbgContext + + UtpSocket* = ref object + +# TODO not implemented +# for now just log incoming packets +proc processPacket(p: Packet) = + notice "Received packet ", packet = p + +# Connect to provided address +# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 +# TODO not implemented +proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] {.async.} = + let packet = synPacket(p.rng[], randUint16(p.rng[]), 1048576) + notice "Sending packet", packet = packet + let packetEncoded = encodePacket(packet) + await p.transport.sendTo(address, packetEncoded) + return UtpSocket() + +proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): + Future[void] {.async.} = + # TODO: should we use `peekMessage()` to avoid allocation? + let buf = try: transp.getMessage() + except TransportOsError as e: + # This is likely to be local network connection issues. + return + + let dec = decodePacket(buf) + if (dec.isOk()): + processPacket(dec.get()) + else: + warn "failed to decode packet from address", address = raddr + +proc new*(T: type UtpProtocol, address: TransportAddress, rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} = + let ta = newDatagramTransport(processDatagram, local = address) + UtpProtocol(transport: ta, rng: rng) + +proc closeWait*(p: UtpProtocol): Future[void] = + p.transport.closeWait() diff --git a/tests/utp/test_packets.nim b/tests/utp/test_packets.nim new file mode 100644 index 0000000..2506eed --- /dev/null +++ b/tests/utp/test_packets.nim @@ -0,0 +1,48 @@ +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.used.} + +import + unittest, + ../eth/utp/packets, + ../../eth/keys + +suite "Utp packets encoding/decoding": + + let rng = newRng() + + test "Encode/decode syn packet": + let synPacket = synPacket(rng[], 10, 20) + let encoded = encodePacket(synPacket) + let decoded = decodePacket(encoded) + + check: + decoded.isOk() + synPacket == decoded.get() + + test "Decode state packet": + # Packet obtained by interaction with c reference implementation + let pack: array[20, uint8] = [ + 0x21'u8, 0x0, 0x15, 0x72, 0x00, 0xBA, 0x4D, 0x71, 0x0, 0x0, 0x0, 0x0, 0x0, 0x10, 0x0, + 0x0, 0x41, 0xA7, 0x00, 0x01] + let decoded = decodePacket(pack) + + check: + decoded.isOk() + + let packet = decoded.get() + + check: + packet.header.pType == ST_STATE + packet.header.version == 1 + packet.header.extension == 0 + packet.header.connectionId == 5490 + packet.header.timestamp == 12209521 + packet.header.timestampDiff == 0 + packet.header.wndSize == 1048576 + packet.header.seqNr == 16807 + packet.header.ackNr == 1