diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index 09e7023..3fa301f 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -163,7 +163,8 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= debug "Ignoring SYN for already existing connection" else: if (r.shouldAllowConnection(sender, p.header.connectionId)): - debug "Received SYN for new connection. Initiating incoming connection" + debug "Received SYN for new connection. Initiating incoming connection", + synSeqNr = p.header.seqNr # Initial ackNr is set to incoming packer seqNr let incomingSocket = newIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[]) r.registerUtpSocket(incomingSocket) diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 949f58e..febc639 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -1019,6 +1019,8 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = debug "Process packet", socketKey = socket.socketKey, + socketAckNr = socket.ackNr, + socketSeqNr = socket.seqNr, packetType = p.header.pType, seqNr = p.header.seqNr, ackNr = p.header.ackNr, @@ -1059,6 +1061,19 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # If packet is totally of the mark short circout the processing if pastExpected >= reorderBufferMaxSize: + + # if `pastExpected` is really big number (for example: uint16.high) then most + # probably we are receiving packet which we already received + # example: we already received packet with `seqNr = 10` so our `socket.ackNr = 10` + # if we receive this packet once again then `pastExpected = 10 - 10 - 1` which + # equals (due to wrapping) 65535 + # this means that remote most probably did not receive our ack, so we need to resend + # it. We are doing it for last `reorderBufferMaxSize` packets + let isPossibleDuplicatedOldPacket = pastExpected >= (int(uint16.high) + 1) - reorderBufferMaxSize + + if (isPossibleDuplicatedOldPacket and p.header.pType != ST_STATE): + asyncSpawn socket.sendAck() + debug "Got an invalid packet sequence number, too far off", pastExpected = pastExpected return @@ -1231,6 +1246,13 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = let packet = maybePacket.unsafeGet() if (len(packet.payload) > 0 and (not socket.readShutdown)): + debug "Got packet from reorder buffer", + packetBytes = len(packet.payload), + packetSeqNr = packet.header.seqNr, + packetAckNr = packet.header.ackNr, + socketSeqNr = socket.seqNr, + socekrAckNr = socket.ackNr + await upload(addr socket.buffer, unsafeAddr packet.payload[0], packet.payload.len()) socket.inBuffer.delete(nextPacketNum) @@ -1428,6 +1450,9 @@ proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int = # Check how many payload bytes are still in flight proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.sendBufferTracker.currentBytesInFlight() +# Check how many bytes are in incoming buffer +proc numOfBytesInIncomingBuffer*(socket: UtpSocket): uint32 = uint32(socket.buffer.dataLen()) + # Check how many packets are still in the reorder buffer, usefull for tests or # debugging. # It throws assertion error when number of elements in buffer do not equal kept counter diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index fdf617f..d27b449 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -124,6 +124,49 @@ procSuite "Utp socket unit test": await outgoingSocket.destroyWait() + asyncTest "Processing duplicated fresh data packet should ack it and stop processing": + let q = newAsyncQueue[Packet]() + let initalRemoteSeqNr = 10'u16 + let data = @[1'u8, 2'u8, 3'u8] + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q) + + let dataP1 = + dataPacket( + initalRemoteSeqNr, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + data, + 0 + ) + + await outgoingSocket.processPacket(dataP1) + + let ack1 = await q.get() + + check: + ack1.header.pType == ST_STATE + ack1.header.ackNr == initalRemoteSeqNr + + let receivedBytes = await outgoingSocket.read(len(data)) + + check: + receivedBytes == data + + # remote re-send data packet, most probably due to lost ack + await outgoingSocket.processPacket(dataP1) + + let ack2 = await q.get() + + check: + ack2.header.pType == ST_STATE + ack2.header.ackNr == initalRemoteSeqNr + # we do not upload data one more time + outgoingSocket.numOfBytesInIncomingBuffer() == 0'u32 + + await outgoingSocket.destroyWait() + asyncTest "Processing out of order data packet should buffer it until receiving in order one": # TODO test is valid until implementing selective acks let q = newAsyncQueue[Packet]()