diff --git a/eth.nimble b/eth.nimble index 165ef84..d8ffd8f 100644 --- a/eth.nimble +++ b/eth.nimble @@ -4,7 +4,7 @@ description = "Ethereum Common library" license = "MIT" skipDirs = @["tests"] -requires "nim >= 1.2.0 & <= 1.2.12", +requires "nim >= 1.2.0 & <= 1.2.14", "nimcrypto", "stint", "secp256k1", diff --git a/eth/utp/packets.nim b/eth/utp/packets.nim index ce465dd..fe6d534 100644 --- a/eth/utp/packets.nim +++ b/eth/utp/packets.nim @@ -37,6 +37,7 @@ type # was received, and the timestamp in this last received packet timestampDiff*: MicroSeconds # The window size is the number of bytes currently in-flight, i.e. sent but not acked + # When sending packets, this should be set to the number of bytes left in the socket's receive buffer. wndSize*: uint32 seqNr*: uint16 # sequence number the sender of the packet last received in the other direction diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index f95485c..a2d94c3 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -54,6 +54,9 @@ type # failed dataResendsBeforeFailure*: uint16 + # Maximnal size of receive buffer in bytes + optRcvBuffer*: uint32 + UtpSocket*[A] = ref object remoteAddress*: A state: ConnectionState @@ -179,12 +182,27 @@ const # dead. 4 is taken from reference implementation defaultDataResendsBeforeFailure = 4'u16 + # default size of rcv buffer in bytes + # rationale form C reference impl: + # 1 MB of receive buffer (i.e. max bandwidth delay product) + # means that from a peer with 200 ms RTT, we cannot receive + # faster than 5 MB/s + # from a peer with 10 ms RTT, we cannot receive faster than + # 100 MB/s. This is assumed to be good enough, since bandwidth + # often is proportional to RTT anyway + defaultOptRcvBuffer: uint32 = 1024 * 1024 + reorderBufferMaxSize = 1024 proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId) -proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16, needResend: bool, timeSent: Moment = Moment.now()): T = +proc init( + T: type OutgoingPacket, + packetBytes: seq[byte], + transmissions: uint16, + needResend: bool, + timeSent: Moment = Moment.now()): T = OutgoingPacket( packetBytes: packetBytes, transmissions: transmissions, @@ -195,13 +213,22 @@ proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16, proc init*( T: type SocketConfig, initialSynTimeout: Duration = defaultInitialSynTimeout, - dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure + dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure, + optRcvBuffer: uint32 = defaultOptRcvBuffer ): T = SocketConfig( initialSynTimeout: initialSynTimeout, - dataResendsBeforeFailure: dataResendsBeforeFailure + dataResendsBeforeFailure: dataResendsBeforeFailure, + optRcvBuffer: optRcvBuffer ) +proc getRcvWindowSize(socket: UtpSocket): uint32 = + let currentDataSize = socket.buffer.dataLen() + if currentDataSize > int(socket.socketConfig.optRcvBuffer): + 0'u32 + else: + socket.socketConfig.optRcvBuffer - uint32(currentDataSize) + proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) = ## Adds packet to outgoing buffer and updates all related fields socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets) @@ -216,12 +243,18 @@ proc sendAck(socket: UtpSocket): Future[void] = ## Creates and sends ack, based on current socket state. Acks are different from ## other packets as we do not track them in outgoing buffet - let ackPacket = ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576) + let ackPacket = + ackPacket( + socket.seqNr, + socket.connectionIdSnd, + socket.ackNr, + socket.getRcvWindowSize() + ) socket.sendData(encodePacket(ackPacket)) proc sendSyn(socket: UtpSocket): Future[void] = doAssert(socket.state == SynSent , "syn can only be send when in SynSent state") - let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576) + let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize()) notice "Sending syn packet packet", packet = packet # set number of transmissions to 1 as syn packet will be send just after # initiliazation @@ -373,9 +406,7 @@ proc new[A]( rtt: milliseconds(0), rttVar: milliseconds(800), rto: milliseconds(3000), - # Default 1MB buffer - # TODO add posibility to configure buffer size - buffer: AsyncBuffer.init(1024 * 1024), + buffer: AsyncBuffer.init(int(cfg.optRcvBuffer)), closeEvent: newAsyncEvent(), closeCallbacks: newSeq[Future[void]](), socketKey: UtpSocketKey.init(to, rcvId), @@ -742,7 +773,7 @@ proc close*(socket: UtpSocket) {.async.} = if socket.curWindowPackets == 0: socket.resetSendTimeout() - let finEncoded = encodePacket(finPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576)) + let finEncoded = encodePacket(finPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, socket.getRcvWindowSize())) socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true)) socket.finSent = true await socket.sendData(finEncoded) @@ -784,11 +815,12 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] {.async.} = let pSize = socket.getPacketSize() let endIndex = data.high() var i = 0 + let wndSize = socket.getRcvWindowSize() while i <= data.high: let lastIndex = i + pSize - 1 let lastOrEnd = min(lastIndex, endIndex) let dataSlice = data[i..lastOrEnd] - let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576, dataSlice) + let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, wndSize, dataSlice) socket.registerOutgoingPacket(OutgoingPacket.init(encodePacket(dataPacket), 0, false)) bytesWritten = bytesWritten + len(dataSlice) i = lastOrEnd + 1 diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index c4edf6b..852a947 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -57,8 +57,11 @@ procSuite "Utp socket unit test": resultBytes.add(p.payload) return resultBytes - template connectOutGoingSocket(initialRemoteSeq: uint16, q: AsyncQueue[Packet]): (UtpSocket[TransportAddress], Packet) = - let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(), rng[]) + template connectOutGoingSocket( + initialRemoteSeq: uint16, + q: AsyncQueue[Packet], + cfg: SocketConfig = SocketConfig.init()): (UtpSocket[TransportAddress], Packet) = + let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), cfg, rng[]) await sock1.startOutgoingSocket() let initialPacket = await q.get() @@ -76,12 +79,14 @@ procSuite "Utp socket unit test": asyncTest "Starting outgoing socket should send Syn packet": let q = newAsyncQueue[Packet]() - let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(), rng[]) + let defaultConfig = SocketConfig.init() + let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), defaultConfig, rng[]) await sock1.startOutgoingSocket() let initialPacket = await q.get() check: initialPacket.header.pType == ST_SYN + initialPacket.header.wndSize == defaultConfig.optRcvBuffer asyncTest "Outgoing socket should re-send syn packet 2 times before declaring failure": let q = newAsyncQueue[Packet]() @@ -477,7 +482,6 @@ procSuite "Utp socket unit test": error.kind == SocketNotWriteable error.currentState == Destroy - asyncTest "Trying to write data onto closed socket which sent fin": let q = newAsyncQueue[Packet]() let initialRemoteSeq = 10'u16 @@ -495,3 +499,72 @@ procSuite "Utp socket unit test": check: error.kind == FinSent + + asyncTest "Processing data packet should update window size accordingly and use it in all send packets": + let q = newAsyncQueue[Packet]() + let initialRemoteSeqNr = 10'u16 + let initialRcvBufferSize = 10'u32 + let data = @[1'u8, 2'u8, 3'u8] + let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize) + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q, sCfg) + + let dataP1 = dataPacket(initialRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) + + await outgoingSocket.processPacket(dataP1) + + let ack1 = await q.get() + + check: + ack1.header.pType == ST_STATE + ack1.header.ackNr == initialRemoteSeqNr + ack1.header.wndSize == initialRcvBufferSize - uint32(len(data)) + + let written = await outgoingSocket.write(data) + + let sentData = await q.get() + + check: + sentData.header.pType == ST_DATA + sentData.header.wndSize == initialRcvBufferSize - uint32(len(data)) + + await outgoingSocket.close() + + let sentFin = await q.get() + + check: + sentFin.header.pType == ST_FIN + sentFin.header.wndSize == initialRcvBufferSize - uint32(len(data)) + + asyncTest "Reading data from the buffer shoud increase receive window": + let q = newAsyncQueue[Packet]() + let initalRemoteSeqNr = 10'u16 + let initialRcvBufferSize = 10'u32 + let data = @[1'u8, 2'u8, 3'u8] + let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize) + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, sCfg) + + let dataP1 = dataPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) + + await outgoingSocket.processPacket(dataP1) + + let ack1 = await q.get() + + check: + ack1.header.pType == ST_STATE + ack1.header.ackNr == initalRemoteSeqNr + ack1.header.wndSize == initialRcvBufferSize - uint32(len(data)) + + let readData = await outgoingSocket.read(data.len()) + + check: + readData == data + + discard await outgoingSocket.write(data) + + let sentData = await q.get() + + check: + sentData.header.pType == ST_DATA + # we have read all data from rcv buffer, advertised window should go back to + # initial size + sentData.header.wndSize == initialRcvBufferSize