From 7ae287ad1bf9e88bd3424f07642bf5a906102958 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Mon, 11 Oct 2021 14:16:06 +0200 Subject: [PATCH] Add rudimentary connect function (#405) * Add rudimentary connect function --- eth.nimble | 2 +- eth/utp/packets.nim | 30 ++++-- eth/utp/utp.nim | 4 +- eth/utp/utp_protocol.nim | 184 ++++++++++++++++++++++++++++++++++-- tests/utp/all_utp_tests.nim | 11 +++ tests/utp/test_packets.nim | 2 +- tests/utp/test_protocol.nim | 31 ++++++ 7 files changed, 243 insertions(+), 21 deletions(-) create mode 100644 tests/utp/all_utp_tests.nim create mode 100644 tests/utp/test_protocol.nim diff --git a/eth.nimble b/eth.nimble index 6016c80..165ef84 100644 --- a/eth.nimble +++ b/eth.nimble @@ -70,7 +70,7 @@ task test_ssz, "Run ssz tests": runTest("tests/ssz/all_tests") task test_utp, "Run utp tests": - runTest("tests/utp/test_packets") + runTest("tests/utp/all_utp_tests") task test, "Run all tests": for filename in [ diff --git a/eth/utp/packets.nim b/eth/utp/packets.nim index 2a00de6..a14ad6a 100644 --- a/eth/utp/packets.nim +++ b/eth/utp/packets.nim @@ -130,23 +130,39 @@ proc decodePacket*(bytes: openArray[byte]): Result[Packet, string] = # connectionId - should be random not already used number # bufferSize - should be pre configured initial buffer size for socket -proc synPacket*(rng: var BrHmacDrbgContext, connectionId: uint16, bufferSize: uint32): Packet = +# SYN packets are special, and should have the receive ID in the connid field, +# instead of conn_id_send. +proc synPacket*(seqNr: uint16, rcvConnectionId: uint16, bufferSize: uint32): Packet = let h = PacketHeaderV1( pType: ST_SYN, version: protocolVersion, # TODO for we do not handle extensions extension: 0'u8, - # TODO should be random not used number - connectionId: connectionId, - + connectionId: rcvConnectionId, timestamp: getMonoTimeTimeStamp(), - timestampDiff: 0'u32, - # TODO shouldbe current available buffer size wndSize: bufferSize, - seqNr: randUint16(rng), + seqNr: seqNr, # Initialy we did not receive any acks ackNr: 0'u16 ) Packet(header: h, payload: @[]) + +proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32): Packet = + let h = PacketHeaderV1( + pType: ST_STATE, + version: protocolVersion, + # ack packets always have extension field set to 0 + extension: 0'u8, + connectionId: sndConnectionId, + timestamp: getMonoTimeTimeStamp(), + # TODO for not we are using 0, but this value should be calculated on socket + # level + timestampDiff: 0'u32, + wndSize: bufferSize, + seqNr: seqNr, + ackNr: ackNr + ) + + Packet(header: h, payload: @[]) diff --git a/eth/utp/utp.nim b/eth/utp/utp.nim index 5f21725..a460e63 100644 --- a/eth/utp/utp.nim +++ b/eth/utp/utp.nim @@ -21,9 +21,7 @@ when isMainModule: let localAddress = initTAddress("0.0.0.0", 9077) let utpProt = UtpProtocol.new(localAddress) - let remoteServer = initTAddress("0.0.0.0", 9078) + let remoteServer = initTAddress("127.0.0.1", 9078) let soc = waitFor utpProt.connectTo(remoteServer) - # Needed to wait for response from server - waitFor(sleepAsync(100)) waitFor utpProt.closeWait() diff --git a/eth/utp/utp_protocol.nim b/eth/utp/utp_protocol.nim index 570e08e..3ebd4d7 100644 --- a/eth/utp/utp_protocol.nim +++ b/eth/utp/utp_protocol.nim @@ -7,6 +7,7 @@ {.push raises: [Defect].} import + std/[tables, options, hashes], chronos, chronicles, bearssl, ./packets, ../keys @@ -15,32 +16,194 @@ logScope: topics = "utp" type + ConnectionState = enum + Uninitialized, + Idle, + SynSent, + SynRecv, + Connected, + ConnectedFull, + Reset, + Destroy + + UtpSocketKey = object + remoteAddress: TransportAddress + rcvId: uint16 + + UtpSocket* = ref object + remoteAddress*: TransportAddress + state: ConnectionState + # Connection id for packets we receive + connectionIdRcv: uint16 + # Connection id for packets we send + connectionIdSnd: uint16 + # Sequence number for the next packet to be sent. + seqNr: uint16 + # All seq number up to this havve been correctly acked by us + ackNr: uint16 + + # Should be completed after succesful connection to remote host. + # TODO check if nim gc handles properly cyclic references, as this future will + # contain reference to socket which hold this future. + # If that is not the case, then this future will need to be hold independly + connectionFuture: Future[UtpSocket] + + UtpSocketsContainerRef = ref object + sockets: Table[UtpSocketKey, UtpSocket] + # For now utp protocol is tied to udp transport, but ultimatly we would like to # abstract underlying transport to be able to run utp over udp, discoveryv5 or # maybe some test transport UtpProtocol* = ref object transport: DatagramTransport + activeSockets: UtpSocketsContainerRef rng*: ref BrHmacDrbgContext - UtpSocket* = ref object +proc new(T: type UtpSocketsContainerRef): T = + UtpSocketsContainerRef(sockets: initTable[UtpSocketKey, UtpSocket]()) + +# This should probably be defined in TransportAddress module, as hash function should +# be consitent with equality function +# in nim zero arrays always have hash equal to 0, irrespectively of array size, to +# avoid clashes betweend different types of addresses, each type have mixed different +# magic number +proc hash(x: TransportAddress): Hash = + var h: Hash = 0 + case x.family + of AddressFamily.None: + h = h !& 31 + !$h + of AddressFamily.IPv4: + h = h !& x.address_v4.hash + h = h !& x.port.hash + h = h !& 37 + !$h + of AddressFamily.IPv6: + h = h !& x.address_v6.hash + h = h !& x.port.hash + h = h !& 41 + !$h + of AddressFamily.Unix: + h = h !& x.address_un.hash + h = h !& 43 + !$h + +# Required to use socketKey as key in hashtable +proc hash(x: UtpSocketKey): Hash = + var h = 0 + h = h !& x.remoteAddress.hash + h = h !& x.rcvId.hash + !$h + +proc getUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey): Option[UtpSocket] = + let s = s.sockets.getOrDefault(k) + if s == nil: + none[UtpSocket]() + else: + some(s) + +proc registerUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey, socket: UtpSocket) = + # TODO Handle duplicates + s.sockets[k] = socket + +proc initOutgoingSocket(to: TransportAddress, rng: var BrHmacDrbgContext): UtpSocket = + # TODO handle possible clashes and overflows + let rcvConnectionId = randUint16(rng) + let sndConnectionId = rcvConnectionId + 1 + let initialSeqNr = randUint16(rng) + UtpSocket( + remoteAddress: to, + state: SynSent, + connectionIdRcv: rcvConnectionId, + connectionIdSnd: sndConnectionId, + seqNr: initialSeqNr, + connectionFuture: newFuture[UtpSocket]() + ) + +proc initIncomingSocket(to: TransportAddress, connectionId: uint16, ackNr: uint16, rng: var BrHmacDrbgContext): UtpSocket = + let initialSeqNr = randUint16(rng) + UtpSocket( + remoteAddress: to, + state: SynRecv, + connectionIdRcv: connectionId + 1, + connectionIdSnd: connectionId, + seqNr: initialSeqNr, + ackNr: ackNr, + connectionFuture: newFuture[UtpSocket]() + ) + +proc ack(socket: UtpSocket): Packet = + ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576) + +proc isConnected*(socket: UtpSocket): bool = + socket.state == Connected # TODO not implemented # for now just log incoming packets -proc processPacket(p: Packet) = +proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) = notice "Received packet ", packet = p + let socketKey = UtpSocketKey(remoteAddress: sender, rcvId: p.header.connectionId) + let maybeSocket = prot.activeSockets.getUtpSocket(socketKey) + if (maybeSocket.isSome()): + let socket = maybeSocket.unsafeGet() + case p.header.pType + of ST_DATA: + # TODO not implemented + notice "Received ST_DATA on known socket" + of ST_FIN: + # TODO not implemented + notice "Received ST_FIN on known socket" + of ST_STATE: + notice "Received ST_STATE on known socket" + if (socket.state == SynSent): + socket.state = Connected + socket.ackNr = p.header.seqNr + socket.connectionFuture.complete(socket) + # TODO to finish handhske we should respond with ST_DATA packet, without it + # socket is left in half-open state + of ST_RESET: + # TODO not implemented + notice "Received ST_RESET on known socket" + of ST_SYN: + # TODO not implemented + notice "Received ST_SYN on known socket" + else: + # We got packet for which we do not have active socket. If the packet is not a + # SynPacket we should reject it and send rst packet to sender in some cases + if (p.header.pType == ST_SYN): + # Initial ackNr is set to incoming packer seqNr + let incomingSocket = initIncomingSocket(sender, p.header.connectionId, p.header.seqNr, prot.rng[]) + let socketKey = UtpSocketKey(remoteAddress: incomingSocket.remoteAddress, rcvId: incomingSocket.connectionIdRcv) + prot.activeSockets.registerUtpSocket(socketKey, incomingSocket) + let synAck = incomingSocket.ack() + let encoded = encodePacket(synAck) + # TODO sending should be done from UtpSocket context + discard prot.transport.sendTo(sender, encoded) + notice "Received ST_SYN and socket is not known" + else: + # TODO not implemented + notice "Received not ST_SYN and socket is not know" # Connect to provided address # Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 # TODO not implemented -proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] {.async.} = - let packet = synPacket(p.rng[], randUint16(p.rng[]), 1048576) +proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] = + let socket = initOutgoingSocket(address, p.rng[]) + let socketKey = UtpSocketKey(remoteAddress: socket.remoteAddress, rcvId: socket.connectionIdRcv) + # TODO Buffer in syn packet should be based on our current buffer size + let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576) notice "Sending packet", packet = packet let packetEncoded = encodePacket(packet) - await p.transport.sendTo(address, packetEncoded) - return UtpSocket() + p.activeSockets.registerUtpSocket(socketKey, socket) + # TODO add callback to handle errors and cancellation i.e unregister socket on + # send error and finish connection future with failure + # sending should be done from UtpSocketContext + discard p.transport.sendTo(address, packetEncoded) + return socket.connectionFuture proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): Future[void] {.async.} = + let utpProt = getUserData[UtpProtocol](transp) # TODO: should we use `peekMessage()` to avoid allocation? let buf = try: transp.getMessage() except TransportOsError as e: @@ -49,13 +212,16 @@ proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): let dec = decodePacket(buf) if (dec.isOk()): - processPacket(dec.get()) + processPacket(utpProt, dec.get(), raddr) else: warn "failed to decode packet from address", address = raddr proc new*(T: type UtpProtocol, address: TransportAddress, rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} = - let ta = newDatagramTransport(processDatagram, local = address) - UtpProtocol(transport: ta, rng: rng) + let activeSockets = UtpSocketsContainerRef.new() + let utp = UtpProtocol(activeSockets: activeSockets, rng: rng) + let ta = newDatagramTransport(processDatagram, udata = utp, local = address) + utp.transport = ta + utp proc closeWait*(p: UtpProtocol): Future[void] = p.transport.closeWait() diff --git a/tests/utp/all_utp_tests.nim b/tests/utp/all_utp_tests.nim new file mode 100644 index 0000000..6a3b401 --- /dev/null +++ b/tests/utp/all_utp_tests.nim @@ -0,0 +1,11 @@ +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.used.} + +import + ./test_packets, + ./test_protocol diff --git a/tests/utp/test_packets.nim b/tests/utp/test_packets.nim index 2506eed..cd3249b 100644 --- a/tests/utp/test_packets.nim +++ b/tests/utp/test_packets.nim @@ -16,7 +16,7 @@ suite "Utp packets encoding/decoding": let rng = newRng() test "Encode/decode syn packet": - let synPacket = synPacket(rng[], 10, 20) + let synPacket = synPacket(5, 10, 20) let encoded = encodePacket(synPacket) let decoded = decodePacket(encoded) diff --git a/tests/utp/test_protocol.nim b/tests/utp/test_protocol.nim new file mode 100644 index 0000000..65f804d --- /dev/null +++ b/tests/utp/test_protocol.nim @@ -0,0 +1,31 @@ +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.used.} + +import + chronos, + testutils/unittests, + ../../eth/utp/utp_protocol, + ../../eth/keys + +procSuite "Utp protocol tests": + let rng = newRng() + + asyncTest "Success connect to remote host": + let address = initTAddress("127.0.0.1", 9079) + let utpProt1 = UtpProtocol.new(address) + + let address1 = initTAddress("127.0.0.1", 9080) + let utpProt2 = UtpProtocol.new(address1) + + let sock = await utpProt1.connectTo(address1) + + check: + sock.isConnected() + + await utpProt1.closeWait() + await utpProt2.closeWait()