From 34bac6e70392fefa581690760befd3c07c4816fc Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Thu, 28 Oct 2021 11:41:43 +0200 Subject: [PATCH] Utp code cleanup (#417) * Refactor tests and move socket to separate file * Move sockets handling to separate class * Abstract over underlying transport * Fix bug with receiving duplicated SYN packet * Fix race condition in connect --- eth/utp/utp.nim | 1 + eth/utp/utp_discov5_protocol.nim | 75 +++ eth/utp/utp_protocol.nim | 707 ++--------------------------- eth/utp/utp_router.nim | 126 +++++ eth/utp/utp_socket.nim | 630 +++++++++++++++++++++++++ tests/utp/all_utp_tests.nim | 1 + tests/utp/test_discv5_protocol.nim | 117 +++++ tests/utp/test_protocol.nim | 254 ++++++----- 8 files changed, 1127 insertions(+), 784 deletions(-) create mode 100644 eth/utp/utp_discov5_protocol.nim create mode 100644 eth/utp/utp_router.nim create mode 100644 eth/utp/utp_socket.nim create mode 100644 tests/utp/test_discv5_protocol.nim diff --git a/eth/utp/utp.nim b/eth/utp/utp.nim index 8875598..02004ec 100644 --- a/eth/utp/utp.nim +++ b/eth/utp/utp.nim @@ -8,6 +8,7 @@ import chronos, stew/byteutils, + ./utp_socket, ./utp_protocol # Exemple application to interact with reference implementation server to help with implementation diff --git a/eth/utp/utp_discov5_protocol.nim b/eth/utp/utp_discov5_protocol.nim new file mode 100644 index 0000000..76a9fd5 --- /dev/null +++ b/eth/utp/utp_discov5_protocol.nim @@ -0,0 +1,75 @@ +import + std/[hashes], + chronos, chronicles, + ../p2p/discoveryv5/[protocol, node], + ./utp_router, + ../keys + +type UtpDiscv5Protocol* = ref object of TalkProtocol + prot: protocol.Protocol + router: UtpRouter[Node] + +proc hash(x: UtpSocketKey[Node]): Hash = + var h = 0 + h = h !& x.remoteAddress.hash + h = h !& x.rcvId.hash + !$h + +proc initSendCallback(t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[Node] = + return ( + proc (to: Node, data: seq[byte]): Future[void] = + let fut = newFuture[void]() + # TODO In discvoveryv5 each talkreq wait for talkresp, but here we would really + # like the fire and forget semantics (similar to udp). + # For now start talkreq/response in background, and discard its result. + # That way we also lose information about any possible errors. + # Cosider adding talkreq proc which does not wait for response, + discard t.talkreq(to, subProtocolName, data) + fut.complete() + return fut + ) + +proc messageHandler*(protocol: TalkProtocol, request: seq[byte], + srcId: NodeId, srcUdpAddress: Address): seq[byte] = + let p = UtpDiscv5Protocol(protocol) + let maybeSender = p.prot.getNode(srcId) + + if maybeSender.isSome(): + let sender = maybeSender.unsafeGet() + # processIncomingBytes may respond to remote by using talkreq requests + asyncSpawn p.router.processIncomingBytes(request, sender) + # We always sending empty response as discv5 spec requires that talkreq always + # receive talkresp + @[] + else: + @[] + +proc new*( + T: type UtpDiscv5Protocol, + p: protocol.Protocol, + subProtocolName: seq[byte], + acceptConnectionCb: AcceptConnectionCallback[Node], + socketConfig: SocketConfig = SocketConfig.init(), + rng = newRng()): UtpDiscv5Protocol {.raises: [Defect, CatchableError].} = + doAssert(not(isNil(acceptConnectionCb))) + + let router = UtpRouter[Node].new( + acceptConnectionCb, + socketConfig, + rng + ) + router.sendCb = initSendCallback(p, subProtocolName) + + let prot = UtpDiscv5Protocol( + protocolHandler: messageHandler, + prot: p, + router: router + ) + + p.registerTalkProtocol(subProtocolName, prot).expect( + "Only one protocol should have this id" + ) + prot + +proc connectTo*(r: UtpDiscv5Protocol, address: Node): Future[UtpSocket[Node]]= + return r.router.connectTo(address) diff --git a/eth/utp/utp_protocol.nim b/eth/utp/utp_protocol.nim index 7fbf8f3..6c9bf92 100644 --- a/eth/utp/utp_protocol.nim +++ b/eth/utp/utp_protocol.nim @@ -9,159 +9,19 @@ import std/[tables, options, hashes, sugar, math], chronos, chronicles, bearssl, - ./packets, - ./growable_buffer, + ./utp_router, ../keys logScope: topics = "utp" type - ConnectionState = enum - Uninitialized, - Idle, - SynSent, - SynRecv, - Connected, - ConnectedFull, - Reset, - Destroy - - UtpSocketKey = object - remoteAddress: TransportAddress - rcvId: uint16 - - OutgoingPacket = object - packetBytes: seq[byte] - transmissions: uint16 - needResend: bool - timeSent: Moment - - UtpSocket* = ref object - remoteAddress*: TransportAddress - state: ConnectionState - # Connection id for packets we receive - connectionIdRcv: uint16 - # Connection id for packets we send - connectionIdSnd: uint16 - # Sequence number for the next packet to be sent. - seqNr: uint16 - # All seq number up to this havve been correctly acked by us - ackNr: uint16 - - # Should be completed after succesful connection to remote host or after timeout - # for the first syn packet - connectionFuture: Future[UtpSocket] - - # the number of packets in the send queue. Packets that haven't - # yet been sent count as well as packets marked as needing resend - # the oldest un-acked packet in the send queue is seq_nr - cur_window_packets - curWindowPackets: uint16 - - # out going buffer for all send packets - outBuffer: GrowableCircularBuffer[OutgoingPacket] - - # incoming buffer for out of order packets - inBuffer: GrowableCircularBuffer[Packet] - - # current retransmit Timeout used to calculate rtoTimeout - retransmitTimeout: Duration - - # calculated round trip time during communication with remote peer - rtt: Duration - # calculated round trip time variance - rttVar: Duration - # Round trip timeout dynamicaly updated based on acks received from remote - # peer - rto: Duration - - # RTO timeout will happen when currenTime > rtoTimeout - rtoTimeout: Moment - - # rcvBuffer - buffer: AsyncBuffer - - # loop called every 500ms to check for on going timeout status - checkTimeoutsLoop: Future[void] - - # number on consecutive re-transsmisions - retransmitCount: uint32 - - # Event which will complete whenever socket gets in destory statate - closeEvent: AsyncEvent - - # All callback to be called whenever socket gets in destroy state - closeCallbacks: seq[Future[void]] - - utpProt: UtpProtocol - - UtpSocketsContainerRef = ref object - sockets: Table[UtpSocketKey, UtpSocket] - - AckResult = enum - PacketAcked, PacketAlreadyAcked, PacketNotSentYet - - SocketConfig* = object - # This is configurable (in contrast to reference impl), as with standard 2 syn resends - # default timeout set to 3seconds and doubling of timeout with each re-send, it - # means that initial connection would timeout after 21s, which seems rather long - initialSynTimeout*: Duration - # For now utp protocol is tied to udp transport, but ultimatly we would like to # abstract underlying transport to be able to run utp over udp, discoveryv5 or # maybe some test transport UtpProtocol* = ref object transport: DatagramTransport - activeSockets: UtpSocketsContainerRef - acceptConnectionCb: AcceptConnectionCallback - socketConfig: SocketConfig - rng*: ref BrHmacDrbgContext - - # New remote client connection callback - # ``server`` - UtpProtocol object. - # ``client`` - accepted client utp socket. - AcceptConnectionCallback* = proc(server: UtpProtocol, - client: UtpSocket): Future[void] {.gcsafe, raises: [Defect].} - - - # Callback to be called whenever socket is closed - SocketCloseCallback = proc (): void {.gcsafe, raises: [Defect].} - - ConnectionError* = object of CatchableError - -const - # Maximal number of payload bytes per packet. Total packet size will be equal to - # mtuSize + sizeof(header) = 600 bytes - # TODO for now it is just some random value. Ultimatly this value should be dynamically - # adjusted based on traffic. - mtuSize = 580 - - # How often each socket check its different on going timers - checkTimeoutsLoopInterval = milliseconds(500) - - # Defualt initial timeout for first Syn packet - defaultInitialSynTimeout = milliseconds(3000) - - # Initial timeout to receive first Data data packet after receiving initial Syn - # packet. (TODO it should only be set when working over udp) - initialRcvRetransmitTimeout = milliseconds(10000) - -proc new(T: type UtpSocketsContainerRef): T = - UtpSocketsContainerRef(sockets: initTable[UtpSocketKey, UtpSocket]()) - -proc init(T: type UtpSocketKey, remoteAddress: TransportAddress, rcvId: uint16): T = - UtpSocketKey(remoteAddress: remoteAddress, rcvId: rcvId) - -proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16, needResend: bool, timeSent: Moment = Moment.now()): T = - OutgoingPacket( - packetBytes: packetBytes, - transmissions: transmissions, - needResend: needResend, - timeSent: timeSent - ) - -proc init*(T: type SocketConfig, initialSynTimeout: Duration = defaultInitialSynTimeout): T = - SocketConfig(initialSynTimeout: initialSynTimeout) + utpRouter: UtpRouter[TransportAddress] # This should probably be defined in TransportAddress module, as hash function should # be consitent with equality function @@ -190,566 +50,55 @@ proc hash(x: TransportAddress): Hash = !$h # Required to use socketKey as key in hashtable -proc hash(x: UtpSocketKey): Hash = +proc hash(x: UtpSocketKey[TransportAddress]): Hash = var h = 0 h = h !& x.remoteAddress.hash h = h !& x.rcvId.hash !$h -proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async.} = - ## Set callback which will be called whenever the socket is permanently closed - try: - await s.closeEvent.wait() - cb() - except CancelledError: - trace "closeCallback cancelled" - -proc registerCloseCallback*(s: UtpSocket, cb: SocketCloseCallback) = - s.closeCallbacks.add(s.setCloseCallback(cb)) - -proc getUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey): Option[UtpSocket] = - let s = s.sockets.getOrDefault(k) - if s == nil: - none[UtpSocket]() - else: - some(s) - -proc registerUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey, socket: UtpSocket) = - # TODO Handle duplicates - s.sockets[k] = socket - -proc deRegisterUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey) = - s.sockets.del(k) - -iterator allSockets(s: UtpSocketsContainerRef): UtpSocket = - for socket in s.sockets.values(): - yield socket - -proc len(s: UtpSocketsContainerRef): int = - len(s.sockets) - -# TODO extract similiar code between Outgoinhg and Incoming socket initialization -proc initOutgoingSocket(to: TransportAddress, p: UtpProtocol, cfg: SocketConfig, rng: var BrHmacDrbgContext): UtpSocket = - # TODO handle possible clashes and overflows - let rcvConnectionId = randUint16(rng) - let sndConnectionId = rcvConnectionId + 1 - let initialSeqNr = randUint16(rng) - - UtpSocket( - remoteAddress: to, - state: SynSent, - connectionIdRcv: rcvConnectionId, - connectionIdSnd: sndConnectionId, - seqNr: initialSeqNr, - connectionFuture: newFuture[UtpSocket](), - outBuffer: GrowableCircularBuffer[OutgoingPacket].init(), - inBuffer: GrowableCircularBuffer[Packet].init(), - retransmitTimeout: cfg.initialSynTimeout, - rtoTimeout: Moment.now() + cfg.initialSynTimeout, - # Initial timeout values taken from reference implemntation - rtt: milliseconds(0), - rttVar: milliseconds(800), - rto: milliseconds(3000), - # Default 1MB buffer - # TODO add posibility to configure buffer size - buffer: AsyncBuffer.init(1024 * 1024), - closeEvent: newAsyncEvent(), - closeCallbacks: newSeq[Future[void]](), - utpProt: p - ) - -proc initIncomingSocket(to: TransportAddress, p: UtpProtocol, connectionId: uint16, ackNr: uint16, rng: var BrHmacDrbgContext): UtpSocket = - let initialSeqNr = randUint16(rng) - UtpSocket( - remoteAddress: to, - state: SynRecv, - connectionIdRcv: connectionId + 1, - connectionIdSnd: connectionId, - seqNr: initialSeqNr, - ackNr: ackNr, - connectionFuture: newFuture[UtpSocket](), - outBuffer: GrowableCircularBuffer[OutgoingPacket].init(), - inBuffer: GrowableCircularBuffer[Packet].init(), - retransmitTimeout: initialRcvRetransmitTimeout, - rtoTimeout: Moment.now() + initialRcvRetransmitTimeout, - # Initial timeout values taken from reference implemntation - rtt: milliseconds(0), - rttVar: milliseconds(800), - rto: milliseconds(3000), - # Default 1MB buffer - # TODO add posibility to configure buffer size - buffer: AsyncBuffer.init(1024 * 1024), - closeEvent: newAsyncEvent(), - closeCallbacks: newSeq[Future[void]](), - utpProt: p - ) - -proc createAckPacket(socket: UtpSocket): Packet = - ## Creates ack packet based on the socket current state - ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576) - -proc max(a, b: Duration): Duration = - if (a > b): - a - else: - b - -proc updateTimeouts(socket: UtpSocket, timeSent: Moment, currentTime: Moment) = - ## Update timeouts according to spec: - ## delta = rtt - packet_rtt - ## rtt_var += (abs(delta) - rtt_var) / 4; - ## rtt += (packet_rtt - rtt) / 8; - - let packetRtt = currentTime - timeSent - - if (socket.rtt.isZero): - socket.rtt = packetRtt - socket.rttVar = packetRtt div 2 - else: - let packetRttMicro = packetRtt.microseconds() - let rttVarMicro = socket.rttVar.microseconds() - let rttMicro = socket.rtt.microseconds() - - let delta = rttMicro - packetRttMicro - - let newVar = microseconds(rttVarMicro + (abs(delta) - rttVarMicro) div 4) - let newRtt = socket.rtt - (socket.rtt div 8) + (packetRtt div 8) - - socket.rttVar = newVar - socket.rtt = newRtt - - # according to spec it should be: timeout = max(rtt + rtt_var * 4, 500) - # but usually spec lags after implementation so milliseconds(1000) is used - socket.rto = max(socket.rtt + (socket.rttVar * 4), milliseconds(1000)) - -proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult = - let packetOpt = socket.outBuffer.get(seqNr) - if packetOpt.isSome(): - let packet = packetOpt.get() - - if packet.transmissions == 0: - # according to reference impl it can happen when we get an ack_nr that - # does not exceed what we have stuffed into the outgoing buffer, - # but does exceed what we have sent - # TODO analyze if this case can happen with our impl - return PacketNotSentYet - - let currentTime = Moment.now() - - socket.outBuffer.delete(seqNr) - - # from spec: The rtt and rtt_var is only updated for packets that were sent only once. - # This avoids problems with figuring out which packet was acked, the first or the second one. - # it is standard solution to retransmission ambiguity problem - if packet.transmissions == 1: - socket.updateTimeouts(packet.timeSent, currentTime) - - socket.retransmitTimeout = socket.rto - socket.rtoTimeout = currentTime + socket.rto - - # TODO Add handlig of decreasing bytes window, whenadding handling of congestion control - - socket.retransmitCount = 0 - PacketAcked - else: - # the packet has already been acked (or not sent) - PacketAlreadyAcked - -proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) = - var i = 0 - while i < int(nrPacketsToack): - let result = socket.ackPacket(socket.seqNr - socket.curWindowPackets) - case result - of PacketAcked: - dec socket.curWindowPackets - of PacketAlreadyAcked: - dec socket.curWindowPackets - of PacketNotSentYet: - debug "Tried to ack packed which was not sent yet" - break - - inc i - -proc getSocketKey(socket: UtpSocket): UtpSocketKey = - UtpSocketKey.init(socket.remoteAddress, socket.connectionIdRcv) - -proc isConnected*(socket: UtpSocket): bool = - socket.state == Connected - -template readLoop(body: untyped): untyped = - while true: - # TODO error handling - let (consumed, done) = body - socket.buffer.shift(consumed) - if done: - break - else: - # TODO add condition to handle socket closing - await socket.buffer.wait() - -# Check how many packets are still in the out going buffer, usefull for tests or -# debugging. -# It throws assertion error when number of elements in buffer do not equal kept counter -proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int = - var num = 0 - for e in socket.outBuffer.items(): - if e.isSome(): - inc num - assert(num == int(socket.curWindowPackets)) - num - -proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] = - socket.utpProt.transport.sendTo(socket.remoteAddress, data) - -proc sendPacket(socket: UtpSocket, packet: Packet): Future[void] = - socket.sendData(encodePacket(packet)) - -# Should be called before flushing data onto the socket -proc setSend(p: var OutgoingPacket): seq[byte] = - inc p.transmissions - p.needResend = false - p.timeSent = Moment.now() - return p.packetBytes - -proc flushPackets(socket: UtpSocket) {.async.} = - var i: uint16 = socket.seqNr - socket.curWindowPackets - while i != socket.seqNr: - # sending only packet which were not transmitted yet or need a resend - let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true)) - if (shouldSendPacket): - let toSend = setSend(socket.outBuffer[i]) - await socket.sendData(toSend) - inc i - -proc getPacketSize(socket: UtpSocket): int = - # TODO currently returning constant, ultimatly it should be bases on mtu estimates - mtuSize - -proc resetSendTimeout(socket: UtpSocket) = - socket.retransmitTimeout = socket.rto - socket.rtoTimeout = Moment.now() + socket.retransmitTimeout - -proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} = - var bytesWritten = 0 - # TODO - # Handle different socket state i.e do not write when socket is full or not - # connected - # Handle growing of send window - - if len(data) == 0: - return bytesWritten - - if socket.curWindowPackets == 0: - socket.resetSendTimeout() - - let pSize = socket.getPacketSize() - let endIndex = data.high() - var i = 0 - while i <= data.high: - let lastIndex = i + pSize - 1 - let lastOrEnd = min(lastIndex, endIndex) - let dataSlice = data[i..lastOrEnd] - let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576, dataSlice) - socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets) - socket.outBuffer.put(socket.seqNr, OutgoingPacket.init(encodePacket(dataPacket), 0, false)) - inc socket.seqNr - inc socket.curWindowPackets - bytesWritten = bytesWritten + len(dataSlice) - i = lastOrEnd + 1 - await socket.flushPackets() - return bytesWritten - -proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}= - ## Read all bytes `n` bytes from socket ``socket``. - ## - ## This procedure allocates buffer seq[byte] and return it as result. - var bytes = newSeq[byte]() - - if n == 0: - return bytes - - readLoop(): - # TODO Add handling of socket closing - let count = min(socket.buffer.dataLen(), n - len(bytes)) - bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1)) - (count, len(bytes) == n) - - return bytes - -proc isOpened(socket:UtpSocket): bool = - return ( - socket.state == SynRecv or - socket.state == SynSent or - socket.state == Connected or - socket.state == ConnectedFull - ) - -proc markAllPacketAsLost(s: UtpSocket) = - var i = 0'u16 - while i < s.curWindowPackets: - - let packetSeqNr = s.seqNr - 1 - i - if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p. transmissions > 0 and p.needResend == false)): - s.outBuffer[packetSeqNr].needResend = true - # TODO here we should also decrease number of bytes in flight. This should be - # done when working on congestion control - - inc i - -proc checkTimeouts(socket: UtpSocket) {.async.} = - let currentTime = Moment.now() - # flush all packets which needs to be re-send - if socket.state != Destroy: - await socket.flushPackets() - - if socket.isOpened(): - if (currentTime > socket.rtoTimeout): - - # TODO add handling of probe time outs. Reference implemenation has mechanism - # of sending probes to determine mtu size. Probe timeouts do not count to standard - # timeouts calculations - - # client initiated connections, but did not send following data packet in rto - # time. TODO this should be configurable - if (socket.state == SynRecv): - socket.state = Destroy - socket.closeEvent.fire() - return - - if (socket.state == SynSent and socket.retransmitCount >= 2) or (socket.retransmitCount >= 4): - if socket.state == SynSent and (not socket.connectionFuture.finished()): - # TODO standard stream interface result in failed future in case of failed connections, - # but maybe it would be more clean to use result - socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out")) - - socket.state = Destroy - socket.closeEvent.fire() - return - - let newTimeout = socket.retransmitTimeout * 2 - socket.retransmitTimeout = newTimeout - socket.rtoTimeout = currentTime + newTimeout - - # TODO Add handling of congestion control - - # This will have much more sense when we will add handling of selective acks - # as then every selecivly acked packet restes timeout timer and removes packet - # from out buffer. - markAllPacketAsLost(socket) - - # resend oldest packet if there are some packets in flight - if (socket.curWindowPackets > 0): - notice "resending oldest packet in outBuffer" - inc socket.retransmitCount - let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets - # TODO add handling of fast timeout - - doAssert( - socket.outBuffer.get(oldestPacketSeqNr).isSome(), - "oldest packet should always be available when there is data in flight" - ) - let dataToSend = setSend(socket.outBuffer[oldestPacketSeqNr]) - await socket.sendData(dataToSend) - - # TODO add sending keep alives when necessary - -proc checkTimeoutsLoop(s: UtpSocket) {.async.} = - ## Loop that check timeoutsin the socket. - try: - while true: - await sleepAsync(checkTimeoutsLoopInterval) - await s.checkTimeouts() - except CancelledError: - trace "checkTimeoutsLoop canceled" - -proc startTimeoutLoop(s: UtpSocket) = - s.checkTimeoutsLoop = checkTimeoutsLoop(s) - -proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.async.}= - notice "Received packet ", packet = p - let socketKey = UtpSocketKey.init(sender, p.header.connectionId) - let maybeSocket = prot.activeSockets.getUtpSocket(socketKey) - let pkSeqNr = p.header.seqNr - let pkAckNr = p.header.ackNr - - if (maybeSocket.isSome()): - let socket = maybeSocket.unsafeGet() - - case p.header.pType - of ST_DATA: - # To avoid amplification attacks, server socket is in SynRecv state until - # it receices first data transfer - # https://www.usenix.org/system/files/conference/woot15/woot15-paper-adamsky.pdf - # TODO when intgrating with discv5 this need to be configurable - if (socket.state == SynRecv): - socket.state = Connected - - notice "Received ST_DATA on known socket" - # number of packets past the expected - # ack_nr is the last acked, seq_nr is the - # current. Subtracring 1 makes 0 mean "this is the next expected packet" - let pastExpected = pkSeqNr - socket.ackNr - 1 - - if (pastExpected == 0): - # we are getting in order data packet, we can flush data directly to the incoming buffer - await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) - - # TODO handle the case when there may be some packets in incoming buffer which - # are direct extension of this packet and therefore we could pass also their - # content to upper layer. This may need to be done when handling selective - # acks. - - # Bytes have been passed to upper layer, we can increase number of last - # acked packet - inc socket.ackNr - - # TODO for now we just schedule concurrent task with ack sending. It may - # need improvement, as with this approach there is no direct control over - # how many concurrent tasks there are and how to cancel them when socket - # is closed - let ack = socket.createAckPacket() - asyncSpawn socket.sendPacket(ack) - else: - # TODO handle out of order packets - notice "Got out of order packet" - of ST_FIN: - # TODO not implemented - notice "Received ST_FIN on known socket" - of ST_STATE: - notice "Received ST_STATE on known socket" - # acks is the number of packets that was acked, in normal case - no selective - # acks, no losses, no resends, it will usually be equal to 1 - var acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets) - - if acks > socket.curWindowPackets: - # this case happens if the we already received this ack nr - acks = 0 - - socket.ackPackets(acks) - - if (socket.state == SynSent and (not socket.connectionFuture.finished())): - socket.state = Connected - # TODO reference implementation sets ackNr (p.header.seqNr - 1), although - # spec mention that it should be equal p.header.seqNr. For now follow the - # reference impl to be compatible with it. Later investigate trin compatibility. - socket.ackNr = p.header.seqNr - 1 - # In case of SynSent complate the future as last thing to make sure user of libray will - # receive socket in correct state - socket.connectionFuture.complete(socket) - # TODO to finish handhske we should respond with ST_DATA packet, without it - # socket is left in half-open state. - # Actual reference implementation waits for user to send data, as it assumes - # existence of application level handshake over utp. We may need to modify this - # to automaticly send ST_DATA . - of ST_RESET: - # TODO not implemented - notice "Received ST_RESET on known socket" - of ST_SYN: - # TODO not implemented - notice "Received ST_SYN on known socket" - else: - # We got packet for which we do not have active socket. If the packet is not a - # SynPacket we should reject it and send rst packet to sender in some cases - if (p.header.pType == ST_SYN): - # Initial ackNr is set to incoming packer seqNr - let incomingSocket = initIncomingSocket(sender, prot, p.header.connectionId, p.header.seqNr, prot.rng[]) - let socketKey = incomingSocket.getSocketKey() - prot.activeSockets.registerUtpSocket(socketKey, incomingSocket) - # whenever socket get permanently closed, deregister it - incomingSocket.registerCloseCallback(proc () = prot.activeSockets.deRegisterUtpSocket(socketKey)) - # Make sure ack was flushed onto datagram socket before passing connction - # to upper layer - await incomingSocket.sendPacket(incomingSocket.createAckPacket()) - incomingSocket.startTimeoutLoop() - # TODO By default (when we have utp over udp) socket here is passed to upper layer - # in SynRecv state, which is not writeable i.e user of socket cannot write - # data to it unless some data will be received. This is counter measure to - # amplification attacks. - # During integration with discovery v5 (i.e utp over discovv5), we must re-think - # this. - asyncSpawn prot.acceptConnectionCb(prot, incomingSocket) - notice "Received ST_SYN and socket is not known" - else: - # TODO not implemented - notice "Received not ST_SYN and socket is not know" - -proc initSynPacket(socket: UtpSocket): OutgoingPacket = - ## creates syncPacket based on socket current state and put it in its outgoing - ## buffer - doAssert(socket.state == SynSent) - let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576) - # set number of transmissions to 1 as syn packet will be send just after - # initiliazation - let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false) - socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets) - socket.outBuffer.put(socket.seqNr, outgoingPacket) - inc socket.seqNr - inc socket.curWindowPackets - outgoingPacket - -proc openSockets*(p: UtpProtocol): int = - ## Returns number of currently active sockets - len(p.activeSockets) - -proc close*(s: UtpSocket) = - # TODO Rething all this when working on FIN and RESET packets and proper handling - # of resources - s.checkTimeoutsLoop.cancel() - s.closeEvent.fire() - -# Connect to provided address -# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 -proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] = - let socket = initOutgoingSocket(address, p, p.socketConfig, p.rng[]) - let socketKey = socket.getSocketKey() - p.activeSockets.registerUtpSocket(socketKey, socket) - # whenever socket get permanently closed, deregister it - socket.registerCloseCallback(proc () = p.activeSockets.deRegisterUtpSocket(socketKey)) - var outgoingSyn = socket.initSynPacket() - notice "Sending syn packet packet", packet = outgoingSyn - # TODO add callback to handle errors and cancellation i.e unregister socket on - # send error and finish connection future with failure - # sending should be done from UtpSocketContext - discard socket.sendData(outgoingSyn.packetBytes) - socket.startTimeoutLoop() - return socket.connectionFuture - proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): Future[void] {.async.} = - let utpProt = getUserData[UtpProtocol](transp) + let router = getUserData[UtpRouter[TransportAddress]](transp) # TODO: should we use `peekMessage()` to avoid allocation? let buf = try: transp.getMessage() except TransportOsError as e: # This is likely to be local network connection issues. return + await processIncomingBytes[TransportAddress](router, buf, raddr) - let dec = decodePacket(buf) - if (dec.isOk()): - await processPacket(utpProt, dec.get(), raddr) - else: - warn "failed to decode packet from address", address = raddr +proc initSendCallback(t: DatagramTransport): SendCallback[TransportAddress] = + return ( + proc (to: TransportAddress, data: seq[byte]): Future[void] = + t.sendTo(to, data) + ) proc new*( T: type UtpProtocol, - acceptConnectionCb: AcceptConnectionCallback, + acceptConnectionCb: AcceptConnectionCallback[TransportAddress], address: TransportAddress, socketConfig: SocketConfig = SocketConfig.init(), rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} = + doAssert(not(isNil(acceptConnectionCb))) - let activeSockets = UtpSocketsContainerRef.new() - let utp = UtpProtocol( - activeSockets: activeSockets, - acceptConnectionCb: acceptConnectionCb, - socketConfig: socketConfig, - rng: rng + + let router = UtpRouter[TransportAddress].new( + acceptConnectionCb, + socketConfig, + rng ) - let ta = newDatagramTransport(processDatagram, udata = utp, local = address) - utp.transport = ta - utp + + let ta = newDatagramTransport(processDatagram, udata = router, local = address) + router.sendCb = initSendCallback(ta) + UtpProtocol(transport: ta, utpRouter: router) proc closeWait*(p: UtpProtocol): Future[void] {.async.} = # TODO Rething all this when working on FIN and RESET packets and proper handling # of resources await p.transport.closeWait() - for s in p.activeSockets.allSockets(): - s.close() + p.utpRouter.close() + +proc connectTo*(r: UtpProtocol, address: TransportAddress): Future[UtpSocket[TransportAddress]] = + return r.utpRouter.connectTo(address) + +proc openSockets*(r: UtpProtocol): int = + len(r.utpRouter) diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim new file mode 100644 index 0000000..3ff09fa --- /dev/null +++ b/eth/utp/utp_router.nim @@ -0,0 +1,126 @@ +import + std/[tables, options], + chronos, bearssl, chronicles, + ../keys, + ./utp_socket, + ./packets + +logScope: + topics = "utp_router" + +export utp_socket + +type + # New remote client connection callback + # ``server`` - UtpProtocol object. + # ``client`` - accepted client utp socket. + AcceptConnectionCallback*[A] = proc(server: UtpRouter[A], + client: UtpSocket[A]): Future[void] {.gcsafe, raises: [Defect].} + + # Oject responsible for creating and maintaing table of of utp sockets. + # caller should use `processIncomingBytes` proc to feed it with incoming byte + # packets, based this input, proper utp sockets will be created, closed, or will + # receive data + UtpRouter*[A] = ref object + sockets: Table[UtpSocketKey[A], UtpSocket[A]] + socketConfig: SocketConfig + acceptConnection: AcceptConnectionCallback[A] + sendCb*: SendCallback[A] + rng*: ref BrHmacDrbgContext + +proc getUtpSocket[A](s: UtpRouter[A], k: UtpSocketKey[A]): Option[UtpSocket[A]] = + let s = s.sockets.getOrDefault(k) + if s == nil: + none[UtpSocket[A]]() + else: + some(s) + +proc deRegisterUtpSocket[A](s: UtpRouter[A], socket: UtpSocket[A]) = + s.sockets.del(socket.socketKey) + +iterator allSockets[A](s: UtpRouter[A]): UtpSocket[A] = + for socket in s.sockets.values(): + yield socket + +proc len*[A](s: UtpRouter[A]): int = + len(s.sockets) + +proc registerUtpSocket[A](p: UtpRouter, s: UtpSocket[A]) = + # TODO Handle duplicates + p.sockets[s.socketKey] = s + # Install deregister handler, so when socket will get closed, in will be promptly + # removed from open sockets table + s.registerCloseCallback(proc () = p.deRegisterUtpSocket(s)) + +proc new*[A]( + T: type UtpRouter[A], + acceptConnectionCb: AcceptConnectionCallback[A], + socketConfig: SocketConfig = SocketConfig.init(), + rng = newRng()): UtpRouter[A] {.raises: [Defect, CatchableError].} = + doAssert(not(isNil(acceptConnectionCb))) + UtpRouter[A]( + sockets: initTable[UtpSocketKey[A], UtpSocket[A]](), + acceptConnection: acceptConnectionCb, + socketConfig: socketConfig, + rng: rng + ) + +proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= + notice "Received packet ", packet = p + let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId) + let maybeSocket = r.getUtpSocket(socketKey) + + case p.header.pType + of ST_RESET: + # TODO Properly handle Reset packet, and close socket + notice "Received RESET packet" + of ST_SYN: + # Syn packet are special, and we need to add 1 to header connectionId + let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId + 1) + let maybeSocket = r.getUtpSocket(socketKey) + if (maybeSocket.isSome()): + notice "Ignoring SYN for already existing connection" + else: + notice "Received SYN for not known connection. Initiating incoming connection" + # Initial ackNr is set to incoming packer seqNr + let incomingSocket = initIncomingSocket[A](sender, r.sendCb, p.header.connectionId, p.header.seqNr, r.rng[]) + r.registerUtpSocket(incomingSocket) + await incomingSocket.startIncomingSocket() + # TODO By default (when we have utp over udp) socket here is passed to upper layer + # in SynRecv state, which is not writeable i.e user of socket cannot write + # data to it unless some data will be received. This is counter measure to + # amplification attacks. + # During integration with discovery v5 (i.e utp over discovv5), we must re-think + # this. + asyncSpawn r.acceptConnection(r, incomingSocket) + else: + let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId) + let maybeSocket = r.getUtpSocket(socketKey) + if (maybeSocket.isSome()): + let socket = maybeSocket.unsafeGet() + await socket.processPacket(p) + else: + # TODO add handling of respondig with reset + notice "Recevied FIN/DATA/ACK on not known socket" + +proc processIncomingBytes*[A](r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} = + let dec = decodePacket(bytes) + if (dec.isOk()): + await processPacket[A](r, dec.get(), sender) + else: + warn "failed to decode packet from address", address = sender + +# Connect to provided address +# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 +proc connectTo*[A](r: UtpRouter[A], address: A): Future[UtpSocket[A]] {.async.}= + let socket = initOutgoingSocket[A](address, r.sendCb, r.socketConfig, r.rng[]) + r.registerUtpSocket(socket) + await socket.startOutgoingSocket() + await socket.waitFotSocketToConnect() + return socket + +proc close*[A](r: UtpRouter[A]) = + # TODO Rething all this when working on FIN and RESET packets and proper handling + # of resources + for s in r.allSockets(): + s.close() diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim new file mode 100644 index 0000000..c2e82db --- /dev/null +++ b/eth/utp/utp_socket.nim @@ -0,0 +1,630 @@ +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + std/sugar, + chronos, chronicles, bearssl, + ./growable_buffer, + ./packets + +logScope: + topics = "utp_socket" + +type + ConnectionState = enum + SynSent, + SynRecv, + Connected, + ConnectedFull, + Reset, + Destroy + + UtpSocketKey*[A] = object + remoteAddress*: A + rcvId*: uint16 + + OutgoingPacket = object + packetBytes: seq[byte] + transmissions: uint16 + needResend: bool + timeSent: Moment + + AckResult = enum + PacketAcked, PacketAlreadyAcked, PacketNotSentYet + + # Socket callback to send data to remote peer + SendCallback*[A] = proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect]} + + UtpSocket*[A] = ref object + remoteAddress*: A + state: ConnectionState + # Connection id for packets we receive + connectionIdRcv: uint16 + # Connection id for packets we send + connectionIdSnd: uint16 + # Sequence number for the next packet to be sent. + seqNr: uint16 + # All seq number up to this havve been correctly acked by us + ackNr: uint16 + + # Should be completed after succesful connection to remote host or after timeout + # for the first syn packet + connectionFuture: Future[void] + + # the number of packets in the send queue. Packets that haven't + # yet been sent count as well as packets marked as needing resend + # the oldest un-acked packet in the send queue is seq_nr - cur_window_packets + curWindowPackets: uint16 + + # out going buffer for all send packets + outBuffer: GrowableCircularBuffer[OutgoingPacket] + + # incoming buffer for out of order packets + inBuffer: GrowableCircularBuffer[Packet] + + # current retransmit Timeout used to calculate rtoTimeout + retransmitTimeout: Duration + + # calculated round trip time during communication with remote peer + rtt: Duration + # calculated round trip time variance + rttVar: Duration + # Round trip timeout dynamicaly updated based on acks received from remote + # peer + rto: Duration + + # RTO timeout will happen when currenTime > rtoTimeout + rtoTimeout: Moment + + # rcvBuffer + buffer: AsyncBuffer + + # loop called every 500ms to check for on going timeout status + checkTimeoutsLoop: Future[void] + + # number on consecutive re-transsmisions + retransmitCount: uint32 + + # Event which will complete whenever socket gets in destory statate + closeEvent: AsyncEvent + + # All callback to be called whenever socket gets in destroy state + closeCallbacks: seq[Future[void]] + + # socket identifier + socketKey*: UtpSocketKey[A] + + send: SendCallback[A] + + SocketConfig* = object + # This is configurable (in contrast to reference impl), as with standard 2 syn resends + # default timeout set to 3seconds and doubling of timeout with each re-send, it + # means that initial connection would timeout after 21s, which seems rather long + initialSynTimeout*: Duration + + # User driven call back to be called whenever socket is permanently closed i.e + # reaches destroy state + SocketCloseCallback* = proc (): void {.gcsafe, raises: [Defect].} + + ConnectionError* = object of CatchableError + +const + # Maximal number of payload bytes per packet. Total packet size will be equal to + # mtuSize + sizeof(header) = 600 bytes + # TODO for now it is just some random value. Ultimatly this value should be dynamically + # adjusted based on traffic. + mtuSize = 580 + + # How often each socket check its different on going timers + checkTimeoutsLoopInterval = milliseconds(500) + + # Defualt initial timeout for first Syn packet + defaultInitialSynTimeout = milliseconds(3000) + + # Initial timeout to receive first Data data packet after receiving initial Syn + # packet. (TODO it should only be set when working over udp) + initialRcvRetransmitTimeout = milliseconds(10000) + +proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = + UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId) + +proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16, needResend: bool, timeSent: Moment = Moment.now()): T = + OutgoingPacket( + packetBytes: packetBytes, + transmissions: transmissions, + needResend: needResend, + timeSent: timeSent + ) + +proc init*(T: type SocketConfig, initialSynTimeout: Duration = defaultInitialSynTimeout): T = + SocketConfig(initialSynTimeout: initialSynTimeout) + +proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) = + ## Adds packet to outgoing buffer and updates all related fields + socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets) + socket.outBuffer.put(socket.seqNr, oPacket) + inc socket.seqNr + inc socket.curWindowPackets + +proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] = + socket.send(socket.remoteAddress, data) + +proc sendAck(socket: UtpSocket): Future[void] = + ## Creates and sends ack, based on current socket state. Acks are different from + ## other packets as we do not track them in outgoing buffet + + let ackPacket = ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576) + socket.sendData(encodePacket(ackPacket)) + +proc sendSyn(socket: UtpSocket): Future[void] = + doAssert(socket.state == SynSent , "syn can only be send when in SynSent state") + let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576) + notice "Sending syn packet packet", packet = packet + # set number of transmissions to 1 as syn packet will be send just after + # initiliazation + let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false) + socket.registerOutgoingPacket(outgoingPacket) + socket.sendData(outgoingPacket.packetBytes) + +# Should be called before sending packet +proc setSend(p: var OutgoingPacket): seq[byte] = + inc p.transmissions + p.needResend = false + p.timeSent = Moment.now() + return p.packetBytes + +proc flushPackets(socket: UtpSocket) {.async.} = + var i: uint16 = socket.seqNr - socket.curWindowPackets + while i != socket.seqNr: + # sending only packet which were not transmitted yet or need a resend + let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true)) + if (shouldSendPacket): + let toSend = setSend(socket.outBuffer[i]) + await socket.sendData(toSend) + inc i + +proc markAllPacketAsLost(s: UtpSocket) = + var i = 0'u16 + while i < s.curWindowPackets: + + let packetSeqNr = s.seqNr - 1 - i + if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p. transmissions > 0 and p.needResend == false)): + s.outBuffer[packetSeqNr].needResend = true + # TODO here we should also decrease number of bytes in flight. This should be + # done when working on congestion control + + inc i + +proc isOpened(socket:UtpSocket): bool = + return ( + socket.state == SynRecv or + socket.state == SynSent or + socket.state == Connected or + socket.state == ConnectedFull + ) + +proc checkTimeouts(socket: UtpSocket) {.async.} = + let currentTime = Moment.now() + # flush all packets which needs to be re-send + if socket.state != Destroy: + await socket.flushPackets() + + if socket.isOpened(): + if (currentTime > socket.rtoTimeout): + + # TODO add handling of probe time outs. Reference implemenation has mechanism + # of sending probes to determine mtu size. Probe timeouts do not count to standard + # timeouts calculations + + # client initiated connections, but did not send following data packet in rto + # time. TODO this should be configurable + if (socket.state == SynRecv): + socket.state = Destroy + socket.closeEvent.fire() + return + + if (socket.state == SynSent and socket.retransmitCount >= 2) or (socket.retransmitCount >= 4): + if socket.state == SynSent and (not socket.connectionFuture.finished()): + # TODO standard stream interface result in failed future in case of failed connections, + # but maybe it would be more clean to use result + socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out")) + + socket.state = Destroy + socket.closeEvent.fire() + return + + let newTimeout = socket.retransmitTimeout * 2 + socket.retransmitTimeout = newTimeout + socket.rtoTimeout = currentTime + newTimeout + + # TODO Add handling of congestion control + + # This will have much more sense when we will add handling of selective acks + # as then every selecivly acked packet restes timeout timer and removes packet + # from out buffer. + markAllPacketAsLost(socket) + + # resend oldest packet if there are some packets in flight + if (socket.curWindowPackets > 0): + notice "resending oldest packet in outBuffer" + inc socket.retransmitCount + let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets + # TODO add handling of fast timeout + + doAssert( + socket.outBuffer.get(oldestPacketSeqNr).isSome(), + "oldest packet should always be available when there is data in flight" + ) + let dataToSend = setSend(socket.outBuffer[oldestPacketSeqNr]) + await socket.sendData(dataToSend) + + # TODO add sending keep alives when necessary + +proc checkTimeoutsLoop(s: UtpSocket) {.async.} = + ## Loop that check timeoutsin the socket. + try: + while true: + await sleepAsync(checkTimeoutsLoopInterval) + await s.checkTimeouts() + except CancelledError: + trace "checkTimeoutsLoop canceled" + +proc startTimeoutLoop(s: UtpSocket) = + s.checkTimeoutsLoop = checkTimeoutsLoop(s) + +proc new[A]( + T: type UtpSocket[A], + to: A, + snd: SendCallback[A], + state: ConnectionState, + initialTimeout: Duration, + rcvId: uint16, + sndId: uint16, + initialSeqNr: uint16, + initialAckNr: uint16 +): T = + T( + remoteAddress: to, + state: state, + connectionIdRcv: rcvId, + connectionIdSnd: sndId, + seqNr: initialSeqNr, + ackNr: initialAckNr, + connectionFuture: newFuture[void](), + outBuffer: GrowableCircularBuffer[OutgoingPacket].init(), + inBuffer: GrowableCircularBuffer[Packet].init(), + retransmitTimeout: initialTimeout, + rtoTimeout: Moment.now() + initialTimeout, + # Initial timeout values taken from reference implemntation + rtt: milliseconds(0), + rttVar: milliseconds(800), + rto: milliseconds(3000), + # Default 1MB buffer + # TODO add posibility to configure buffer size + buffer: AsyncBuffer.init(1024 * 1024), + closeEvent: newAsyncEvent(), + closeCallbacks: newSeq[Future[void]](), + socketKey: UtpSocketKey.init(to, rcvId), + send: snd + ) + +proc initOutgoingSocket*[A]( + to: A, + snd: SendCallback[A], + cfg: SocketConfig, + rng: var BrHmacDrbgContext +): UtpSocket[A] = + # TODO handle possible clashes and overflows + let rcvConnectionId = randUint16(rng) + let sndConnectionId = rcvConnectionId + 1 + let initialSeqNr = randUint16(rng) + + UtpSocket[A].new( + to, + snd, + SynSent, + cfg.initialSynTimeout, + rcvConnectionId, + sndConnectionId, + initialSeqNr, + # Initialy ack nr is 0, as we do not know remote inital seqnr + 0 + ) + +proc initIncomingSocket*[A]( + to: A, + snd: SendCallback[A], + connectionId: uint16, + ackNr: uint16, + rng: var BrHmacDrbgContext +): UtpSocket[A] = + let initialSeqNr = randUint16(rng) + + UtpSocket[A].new( + to, + snd, + SynRecv, + initialRcvRetransmitTimeout, + connectionId + 1, + connectionId, + initialSeqNr, + ackNr + ) + +proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} = + doAssert(socket.state == SynSent) + # TODO add callback to handle errors and cancellation i.e unregister socket on + # send error and finish connection future with failure + # sending should be done from UtpSocketContext + await socket.sendSyn() + socket.startTimeoutLoop() + +proc waitFotSocketToConnect*(socket: UtpSocket): Future[void] {.async.} = + await socket.connectionFuture + +proc startIncomingSocket*(socket: UtpSocket) {.async.} = + doAssert(socket.state == SynRecv) + # Make sure ack was flushed before movig forward + await socket.sendAck() + socket.startTimeoutLoop() + +proc isConnected*(socket: UtpSocket): bool = + socket.state == Connected or socket.state == ConnectedFull + +proc close*(s: UtpSocket) = + # TODO Rething all this when working on FIN and RESET packets and proper handling + # of resources + s.checkTimeoutsLoop.cancel() + s.closeEvent.fire() + +proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async.} = + ## Set callback which will be called whenever the socket is permanently closed + try: + await s.closeEvent.wait() + cb() + except CancelledError: + trace "closeCallback cancelled" + +proc registerCloseCallback*(s: UtpSocket, cb: SocketCloseCallback) = + s.closeCallbacks.add(s.setCloseCallback(cb)) + +proc max(a, b: Duration): Duration = + if (a > b): + a + else: + b + +proc updateTimeouts(socket: UtpSocket, timeSent: Moment, currentTime: Moment) = + ## Update timeouts according to spec: + ## delta = rtt - packet_rtt + ## rtt_var += (abs(delta) - rtt_var) / 4; + ## rtt += (packet_rtt - rtt) / 8; + + let packetRtt = currentTime - timeSent + + if (socket.rtt.isZero): + socket.rtt = packetRtt + socket.rttVar = packetRtt div 2 + else: + let packetRttMicro = packetRtt.microseconds() + let rttVarMicro = socket.rttVar.microseconds() + let rttMicro = socket.rtt.microseconds() + + let delta = rttMicro - packetRttMicro + + let newVar = microseconds(rttVarMicro + (abs(delta) - rttVarMicro) div 4) + let newRtt = socket.rtt - (socket.rtt div 8) + (packetRtt div 8) + + socket.rttVar = newVar + socket.rtt = newRtt + + # according to spec it should be: timeout = max(rtt + rtt_var * 4, 500) + # but usually spec lags after implementation so milliseconds(1000) is used + socket.rto = max(socket.rtt + (socket.rttVar * 4), milliseconds(1000)) + +proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult = + let packetOpt = socket.outBuffer.get(seqNr) + if packetOpt.isSome(): + let packet = packetOpt.get() + + if packet.transmissions == 0: + # according to reference impl it can happen when we get an ack_nr that + # does not exceed what we have stuffed into the outgoing buffer, + # but does exceed what we have sent + # TODO analyze if this case can happen with our impl + return PacketNotSentYet + + let currentTime = Moment.now() + + socket.outBuffer.delete(seqNr) + + # from spec: The rtt and rtt_var is only updated for packets that were sent only once. + # This avoids problems with figuring out which packet was acked, the first or the second one. + # it is standard solution to retransmission ambiguity problem + if packet.transmissions == 1: + socket.updateTimeouts(packet.timeSent, currentTime) + + socket.retransmitTimeout = socket.rto + socket.rtoTimeout = currentTime + socket.rto + + # TODO Add handlig of decreasing bytes window, whenadding handling of congestion control + + socket.retransmitCount = 0 + PacketAcked + else: + # the packet has already been acked (or not sent) + PacketAlreadyAcked + +proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) = + ## Ack packets in outgoing buffer based on ack number in the received packet + var i = 0 + while i < int(nrPacketsToack): + let result = socket.ackPacket(socket.seqNr - socket.curWindowPackets) + case result + of PacketAcked: + dec socket.curWindowPackets + of PacketAlreadyAcked: + dec socket.curWindowPackets + of PacketNotSentYet: + debug "Tried to ack packed which was not sent yet" + break + + inc i + +# TODO at socket level we should handle only FIN/DATA/ACK packets. Refactor to make +# it enforcable by type system +proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = + ## Updates socket state based on received packet, and sends ack when necessary. + ## Shoyuld be called in main packet receiving loop + let pkSeqNr = p.header.seqNr + let pkAckNr = p.header.ackNr + case p.header.pType + of ST_DATA: + # To avoid amplification attacks, server socket is in SynRecv state until + # it receices first data transfer + # https://www.usenix.org/system/files/conference/woot15/woot15-paper-adamsky.pdf + # TODO when intgrating with discv5 this need to be configurable + if (socket.state == SynRecv): + socket.state = Connected + + notice "Received ST_DATA on known socket" + # number of packets past the expected + # ack_nr is the last acked, seq_nr is the + # current. Subtracring 1 makes 0 mean "this is the next expected packet" + let pastExpected = pkSeqNr - socket.ackNr - 1 + + if (pastExpected == 0): + # we are getting in order data packet, we can flush data directly to the incoming buffer + await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) + + # TODO handle the case when there may be some packets in incoming buffer which + # are direct extension of this packet and therefore we could pass also their + # content to upper layer. This may need to be done when handling selective + # acks. + + # Bytes have been passed to upper layer, we can increase number of last + # acked packet + inc socket.ackNr + + # TODO for now we just schedule concurrent task with ack sending. It may + # need improvement, as with this approach there is no direct control over + # how many concurrent tasks there are and how to cancel them when socket + # is closed + asyncSpawn socket.sendAck() + else: + # TODO handle out of order packets + notice "Got out of order packet" + of ST_FIN: + # TODO not implemented + notice "Received ST_FIN on known socket" + of ST_STATE: + notice "Received ST_STATE on known socket" + # acks is the number of packets that was acked, in normal case - no selective + # acks, no losses, no resends, it will usually be equal to 1 + var acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets) + + if acks > socket.curWindowPackets: + # this case happens if the we already received this ack nr + acks = 0 + + socket.ackPackets(acks) + + if (socket.state == SynSent and (not socket.connectionFuture.finished())): + socket.state = Connected + # TODO reference implementation sets ackNr (p.header.seqNr - 1), although + # spec mention that it should be equal p.header.seqNr. For now follow the + # reference impl to be compatible with it. Later investigate trin compatibility. + socket.ackNr = p.header.seqNr - 1 + # In case of SynSent complate the future as last thing to make sure user of libray will + # receive socket in correct state + socket.connectionFuture.complete() + # TODO to finish handhske we should respond with ST_DATA packet, without it + # socket is left in half-open state. + # Actual reference implementation waits for user to send data, as it assumes + # existence of application level handshake over utp. We may need to modify this + # to automaticly send ST_DATA . + of ST_RESET: + # TODO not implemented + notice "Received ST_RESET on known socket" + of ST_SYN: + # TODO not implemented + notice "Received ST_SYN on known socket" + +template readLoop(body: untyped): untyped = + while true: + # TODO error handling + let (consumed, done) = body + socket.buffer.shift(consumed) + if done: + break + else: + # TODO add condition to handle socket closing + await socket.buffer.wait() + +proc getPacketSize(socket: UtpSocket): int = + # TODO currently returning constant, ultimatly it should be bases on mtu estimates + mtuSize + +proc resetSendTimeout(socket: UtpSocket) = + socket.retransmitTimeout = socket.rto + socket.rtoTimeout = Moment.now() + socket.retransmitTimeout + +proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} = + var bytesWritten = 0 + # TODO + # Handle different socket state i.e do not write when socket is full or not + # connected + # Handle growing of send window + + if len(data) == 0: + return bytesWritten + + if socket.curWindowPackets == 0: + socket.resetSendTimeout() + + let pSize = socket.getPacketSize() + let endIndex = data.high() + var i = 0 + while i <= data.high: + let lastIndex = i + pSize - 1 + let lastOrEnd = min(lastIndex, endIndex) + let dataSlice = data[i..lastOrEnd] + let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576, dataSlice) + socket.registerOutgoingPacket(OutgoingPacket.init(encodePacket(dataPacket), 0, false)) + bytesWritten = bytesWritten + len(dataSlice) + i = lastOrEnd + 1 + await socket.flushPackets() + return bytesWritten + +proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}= + ## Read all bytes `n` bytes from socket ``socket``. + ## + ## This procedure allocates buffer seq[byte] and return it as result. + var bytes = newSeq[byte]() + + if n == 0: + return bytes + + readLoop(): + # TODO Add handling of socket closing + let count = min(socket.buffer.dataLen(), n - len(bytes)) + bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1)) + (count, len(bytes) == n) + + return bytes + +# Check how many packets are still in the out going buffer, usefull for tests or +# debugging. +# It throws assertion error when number of elements in buffer do not equal kept counter +proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int = + var num = 0 + for e in socket.outBuffer.items(): + if e.isSome(): + inc num + doAssert(num == int(socket.curWindowPackets)) + num diff --git a/tests/utp/all_utp_tests.nim b/tests/utp/all_utp_tests.nim index d2f23a8..a9ff4f2 100644 --- a/tests/utp/all_utp_tests.nim +++ b/tests/utp/all_utp_tests.nim @@ -9,4 +9,5 @@ import ./test_packets, ./test_protocol, + ./test_discv5_protocol, ./test_buffer diff --git a/tests/utp/test_discv5_protocol.nim b/tests/utp/test_discv5_protocol.nim new file mode 100644 index 0000000..bb192ed --- /dev/null +++ b/tests/utp/test_discv5_protocol.nim @@ -0,0 +1,117 @@ +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.used.} + +import + chronos, bearssl, + stew/shims/net, stew/byteutils, + testutils/unittests, + ../../eth/p2p/discoveryv5/[enr, node, routing_table], + ../../eth/p2p/discoveryv5/protocol as discv5_protocol, + ../../eth/utp/utp_router, + ../../eth/utp/utp_discov5_protocol, + ../../eth/keys + +proc localAddress*(port: int): Address = + Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(port)) + +proc initDiscoveryNode*(rng: ref BrHmacDrbgContext, + privKey: PrivateKey, + address: Address, + bootstrapRecords: openarray[Record] = [], + localEnrFields: openarray[(string, seq[byte])] = [], + previousRecord = none[enr.Record]()): discv5_protocol.Protocol = + # set bucketIpLimit to allow bucket split + let tableIpLimits = TableIpLimits(tableIpLimit: 1000, bucketIpLimit: 24) + + result = newProtocol(privKey, + some(address.ip), + some(address.port), some(address.port), + bindPort = address.port, + bootstrapRecords = bootstrapRecords, + localEnrFields = localEnrFields, + previousRecord = previousRecord, + tableIpLimits = tableIpLimits, + rng = rng) + + result.open() + +proc generateByteArray(rng: var BrHmacDrbgContext, length: int): seq[byte] = + var bytes = newSeq[byte](length) + brHmacDrbgGenerate(rng, bytes) + return bytes + +procSuite "Utp protocol over discovery v5 tests": + let rng = newRng() + let utpProtId = "test-utp".toBytes() + + proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[Node] = + return ( + proc(server: UtpRouter[Node], client: UtpSocket[Node]): Future[void] = + serverSockets.addLast(client) + ) + + # TODO Add more tests to discovery v5 suite, especially those which will differ + # from standard utp case + asyncTest "Success connect to remote host": + let + queue = newAsyncQueue[UtpSocket[Node]]() + node1 = initDiscoveryNode( + rng, PrivateKey.random(rng[]), localAddress(20302)) + node2 = initDiscoveryNode( + rng, PrivateKey.random(rng[]), localAddress(20303)) + + utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue)) + utp2 = UtpDiscv5Protocol.new(node2, utpProtId, registerIncomingSocketCallback(queue)) + + # nodes must know about each other + check: + node1.addNode(node2.localNode) + node2.addNode(node1.localNode) + + let clientSocket = await utp1.connectTo(node2.localNode) + + check: + clientSocket.isConnected() + + await node1.closeWait() + await node2.closeWait() + + asyncTest "Success write data over packet size to remote host": + let + queue = newAsyncQueue[UtpSocket[Node]]() + node1 = initDiscoveryNode( + rng, PrivateKey.random(rng[]), localAddress(20302)) + node2 = initDiscoveryNode( + rng, PrivateKey.random(rng[]), localAddress(20303)) + + utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue)) + utp2 = UtpDiscv5Protocol.new(node2, utpProtId, registerIncomingSocketCallback(queue)) + + # nodes must know about each other + check: + node1.addNode(node2.localNode) + node2.addNode(node1.localNode) + + let numOfBytes = 5000 + let clientSocket = await utp1.connectTo(node2.localNode) + let serverSocket = await queue.get() + + let bytesToTransfer = generateByteArray(rng[], numOfBytes) + let written = await clientSocket.write(bytesToTransfer) + + let received = await serverSocket.read(numOfBytes) + + check: + written == numOfBytes + bytesToTransfer == received + clientSocket.isConnected() + serverSocket.isConnected() + + + await node1.closeWait() + await node2.closeWait() diff --git a/tests/utp/test_protocol.nim b/tests/utp/test_protocol.nim index 2784b1c..4f15c8e 100644 --- a/tests/utp/test_protocol.nim +++ b/tests/utp/test_protocol.nim @@ -10,6 +10,7 @@ import sequtils, chronos, bearssl, testutils/unittests, + ../../eth/utp/utp_router, ../../eth/utp/utp_protocol, ../../eth/keys @@ -20,6 +21,21 @@ proc generateByteArray(rng: var BrHmacDrbgContext, length: int): seq[byte] = type AssertionCallback = proc(): bool {.gcsafe, raises: [Defect].} +proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback[TransportAddress] = + return ( + proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = + let fut = newFuture[void]() + event.fire() + fut.complete() + fut + ) + +proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[TransportAddress] = + return ( + proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = + serverSockets.addLast(client) + ) + proc waitUntil(f: AssertionCallback): Future[void] {.async.} = while true: let res = f() @@ -28,33 +44,90 @@ proc waitUntil(f: AssertionCallback): Future[void] {.async.} = else: await sleepAsync(milliseconds(50)) -proc transferData(sender: UtpSocket, receiver: UtpSocket, data: seq[byte]): Future[seq[byte]] {.async.}= +proc transferData(sender: UtpSocket[TransportAddress], receiver: UtpSocket[TransportAddress], data: seq[byte]): Future[seq[byte]] {.async.}= let bytesWritten = await sender.write(data) doAssert bytesWritten == len(data) let received = await receiver.read(len(data)) return received -procSuite "Utp protocol tests": +type + ClientServerScenario = object + utp1: UtpProtocol + utp2: UtpProtocol + clientSocket: UtpSocket[TransportAddress] + serverSocket: UtpSocket[TransportAddress] + + TwoClientsServerScenario = object + utp1: UtpProtocol + utp2: UtpProtocol + utp3: UtpProtocol + clientSocket1: UtpSocket[TransportAddress] + clientSocket2: UtpSocket[TransportAddress] + serverSocket1: UtpSocket[TransportAddress] + serverSocket2: UtpSocket[TransportAddress] + +proc initClientServerScenario(): Future[ClientServerScenario] {.async.} = + let q = newAsyncQueue[UtpSocket[TransportAddress]]() + var server1Called = newAsyncEvent() + let address = initTAddress("127.0.0.1", 9079) + let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address) + + let address1 = initTAddress("127.0.0.1", 9080) + let utpProt2 = UtpProtocol.new(registerIncomingSocketCallback(q), address1) + let clientSocket = await utpProt1.connectTo(address1) + # this future will be completed when we called accepted connection callback + let serverSocket = await q.popFirst() + + return ClientServerScenario( + utp1: utpProt1, + utp2: utpProt2, + clientSocket: clientSocket, + serverSocket: serverSocket + ) + +proc close(s: ClientServerScenario) {.async.} = + await s.utp1.closeWait() + await s.utp2.closeWait() + +proc init2ClientsServerScenario(): Future[TwoClientsServerScenario] {.async.} = + var serverSockets = newAsyncQueue[UtpSocket[TransportAddress]]() + var server1Called = newAsyncEvent() + let address1 = initTAddress("127.0.0.1", 9079) + let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address1) + + let address2 = initTAddress("127.0.0.1", 9080) + let utpProt2 = UtpProtocol.new(registerIncomingSocketCallback(serverSockets), address2) + + let address3 = initTAddress("127.0.0.1", 9081) + let utpProt3 = UtpProtocol.new(registerIncomingSocketCallback(serverSockets), address3) + + let clientSocket1 = await utpProt1.connectTo(address2) + let clientSocket2 = await utpProt1.connectTo(address3) + + await waitUntil(proc (): bool = len(serverSockets) == 2) + + # this future will be completed when we called accepted connection callback + let serverSocket1 = serverSockets[0] + let serverSocket2 = serverSockets[1] + + return TwoClientsServerScenario( + utp1: utpProt1, + utp2: utpProt2, + utp3: utpProt3, + clientSocket1: clientSocket1, + clientSocket2: clientSocket2, + serverSocket1: serverSocket1, + serverSocket2: serverSocket2 + ) + +proc close(s: TwoClientsServerScenario) {.async.} = + await s.utp1.closeWait() + await s.utp2.closeWait() + await s.utp3.closeWait() + +procSuite "Utp protocol over udp tests": let rng = newRng() - proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback = - return ( - proc(server: UtpProtocol, client: UtpSocket): Future[void] = - let fut = newFuture[void]() - event.fire() - fut.complete() - fut - ) - - proc setIncomingSocketCallback(socketPromise: Future[UtpSocket]): AcceptConnectionCallback = - return ( - proc(server: UtpProtocol, client: UtpSocket): Future[void] = - let fut = newFuture[void]() - socketPromise.complete(client) - fut.complete() - fut - ) - asyncTest "Success connect to remote host": let server1Called = newAsyncEvent() let address = initTAddress("127.0.0.1", 9079) @@ -129,148 +202,119 @@ procSuite "Utp protocol tests": await utpProt2.closeWait() asyncTest "Success data transfer when data fits into one packet": - var server1Called = newAsyncEvent() - let address = initTAddress("127.0.0.1", 9079) - let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address) - - var serverSocketFut = newFuture[UtpSocket]() - let address1 = initTAddress("127.0.0.1", 9080) - let utpProt2 = UtpProtocol.new(setIncomingSocketCallback(serverSocketFut), address1) - - let clientSocket = await utpProt1.connectTo(address1) - - # this future will be completed when we called accepted connection callback - discard await serverSocketFut - - let serverSocket = - try: - serverSocketFut.read() - except: - raiseAssert "Unexpected error when reading finished future" - + let s = await initClientServerScenario() + check: - clientSocket.isConnected() + s.clientSocket.isConnected() # after successful connection outgoing buffer should be empty as syn packet # should be correctly acked - clientSocket.numPacketsInOutGoingBuffer() == 0 + s.clientSocket.numPacketsInOutGoingBuffer() == 0 # Server socket is not in connected state, until first data transfer - (not serverSocket.isConnected()) + (not s.serverSocket.isConnected()) let bytesToTransfer = generateByteArray(rng[], 100) - let bytesReceivedFromClient = await transferData(clientSocket, serverSocket, bytesToTransfer) + let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) check: bytesToTransfer == bytesReceivedFromClient - serverSocket.isConnected() + s.serverSocket.isConnected() - let bytesReceivedFromServer = await transferData(serverSocket, clientSocket, bytesToTransfer) + let bytesReceivedFromServer = await transferData(s.serverSocket, s.clientSocket, bytesToTransfer) check: bytesToTransfer == bytesReceivedFromServer - await utpProt1.closeWait() - await utpProt2.closeWait() + await s.close() asyncTest "Success data transfer when data need to be sliced into multiple packets": - var server1Called = newAsyncEvent() - let address = initTAddress("127.0.0.1", 9079) - let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address) - - var serverSocketFut = newFuture[UtpSocket]() - let address1 = initTAddress("127.0.0.1", 9080) - let utpProt2 = UtpProtocol.new(setIncomingSocketCallback(serverSocketFut), address1) - - let clientSocket = await utpProt1.connectTo(address1) - - # this future will be completed when we called accepted connection callback - discard await serverSocketFut - - let serverSocket = - try: - serverSocketFut.read() - except: - raiseAssert "Unexpected error when reading finished future" - + let s = await initClientServerScenario() + check: - clientSocket.isConnected() + s.clientSocket.isConnected() # after successful connection outgoing buffer should be empty as syn packet # should be correctly acked - clientSocket.numPacketsInOutGoingBuffer() == 0 + s.clientSocket.numPacketsInOutGoingBuffer() == 0 - (not serverSocket.isConnected()) + (not s.serverSocket.isConnected()) # 5000 bytes is over maximal packet size let bytesToTransfer = generateByteArray(rng[], 5000) - let bytesReceivedFromClient = await transferData(clientSocket, serverSocket, bytesToTransfer) - let bytesReceivedFromServer = await transferData(serverSocket, clientSocket, bytesToTransfer) + let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) + let bytesReceivedFromServer = await transferData(s.serverSocket, s.clientSocket, bytesToTransfer) # ultimatly all send packets will acked, and outgoing buffer will be empty - await waitUntil(proc (): bool = clientSocket.numPacketsInOutGoingBuffer() == 0) - await waitUntil(proc (): bool = serverSocket.numPacketsInOutGoingBuffer() == 0) + await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0) + await waitUntil(proc (): bool = s.serverSocket.numPacketsInOutGoingBuffer() == 0) check: - serverSocket.isConnected() - clientSocket.numPacketsInOutGoingBuffer() == 0 - serverSocket.numPacketsInOutGoingBuffer() == 0 + s.serverSocket.isConnected() + s.clientSocket.numPacketsInOutGoingBuffer() == 0 + s.serverSocket.numPacketsInOutGoingBuffer() == 0 bytesReceivedFromClient == bytesToTransfer bytesReceivedFromServer == bytesToTransfer - await utpProt1.closeWait() - await utpProt2.closeWait() + await s.close() asyncTest "Success multiple data transfers when data need to be sliced into multiple packets": - var server1Called = newAsyncEvent() - let address = initTAddress("127.0.0.1", 9079) - let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address) - - var serverSocketFut = newFuture[UtpSocket]() - let address1 = initTAddress("127.0.0.1", 9080) - let utpProt2 = UtpProtocol.new(setIncomingSocketCallback(serverSocketFut), address1) - - let clientSocket = await utpProt1.connectTo(address1) - - # this future will be completed when we called accepted connection callback - discard await serverSocketFut - - let serverSocket = - try: - serverSocketFut.read() - except: - raiseAssert "Unexpected error when reading finished future" + let s = await initClientServerScenario() check: - clientSocket.isConnected() + s.clientSocket.isConnected() # after successful connection outgoing buffer should be empty as syn packet # should be correctly acked - clientSocket.numPacketsInOutGoingBuffer() == 0 + s.clientSocket.numPacketsInOutGoingBuffer() == 0 # 5000 bytes is over maximal packet size let bytesToTransfer = generateByteArray(rng[], 5000) - let written = await clientSocket.write(bytesToTransfer) + let written = await s.clientSocket.write(bytesToTransfer) check: written == len(bytesToTransfer) let bytesToTransfer1 = generateByteArray(rng[], 5000) - let written1 = await clientSocket.write(bytesToTransfer1) + let written1 = await s.clientSocket.write(bytesToTransfer1) check: written1 == len(bytesToTransfer) - let bytesReceived = await serverSocket.read(len(bytesToTransfer) + len(bytesToTransfer1)) + let bytesReceived = await s.serverSocket.read(len(bytesToTransfer) + len(bytesToTransfer1)) # ultimatly all send packets will acked, and outgoing buffer will be empty - await waitUntil(proc (): bool = clientSocket.numPacketsInOutGoingBuffer() == 0) + await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0) check: - clientSocket.numPacketsInOutGoingBuffer() == 0 + s.clientSocket.numPacketsInOutGoingBuffer() == 0 bytesToTransfer.concat(bytesToTransfer1) == bytesReceived + + await s.close() + + asyncTest "Success data transfers from multiple clients": + let s = await init2ClientsServerScenario() - await utpProt1.closeWait() - await utpProt2.closeWait() + check: + s.clientSocket1.isConnected() + s.clientSocket2.isConnected() + s.clientSocket1.numPacketsInOutGoingBuffer() == 0 + s.clientSocket2.numPacketsInOutGoingBuffer() == 0 + + let numBytesToTransfer = 5000 + let client1Data = generateByteArray(rng[], numBytesToTransfer) + let client2Data = generateByteArray(rng[], numBytesToTransfer) + + discard s.clientSocket1.write(client1Data) + discard s.clientSocket2.write(client2Data) + + let server1ReadBytes = await s.serverSocket1.read(numBytesToTransfer) + let server2ReadBytes = await s.serverSocket2.read(numBytesToTransfer) + + check: + client1Data == server1ReadBytes + client2Data == server2ReadBytes + + await s.close()