diff --git a/eth/utp/utp_discv5_protocol.nim b/eth/utp/utp_discv5_protocol.nim index 2412971..abe4292 100644 --- a/eth/utp/utp_discv5_protocol.nim +++ b/eth/utp/utp_discv5_protocol.nim @@ -13,7 +13,10 @@ import ./utp_router, ../keys -export utp_router, protocol +export utp_router, protocol, chronicles + +logScope: + topics = "utp_discv5_protocol" type UtpDiscv5Protocol* = ref object of TalkProtocol prot: protocol.Protocol @@ -52,6 +55,7 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte], let maybeSender = p.prot.getNode(srcId) if maybeSender.isSome(): + debug "Received utp payload from known node. Start processing" let sender = maybeSender.unsafeGet() # processIncomingBytes may respond to remote by using talkreq requests asyncSpawn p.router.processIncomingBytes(request, sender) @@ -59,6 +63,7 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte], # always receives a talkresp. @[] else: + debug "Received utp payload from unknown node. Ignore" @[] proc new*( diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index febc639..867152e 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -367,9 +367,13 @@ proc flushPackets(socket: UtpSocket) {.async.} = let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true)) if (shouldSendPacket): if socket.sendBufferTracker.reserveNBytes(socket.outBuffer[i].payloadLength): + debug "Resending packet during flush", + pkSeqNr = i let toSend = socket.setSend(socket.outBuffer[i]) await socket.sendData(toSend) else: + debug "Should resend packet during flush but there is no place in send buffer", + pkSeqNr = i # there is no place in send buffer, stop flushing return inc i @@ -380,6 +384,8 @@ proc markAllPacketAsLost(s: UtpSocket) = let packetSeqNr = s.seqNr - 1 - i if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p.transmissions > 0 and p.needResend == false)): + debug "Marking packet as lost", + pkSeqNr = packetSeqNr s.outBuffer[packetSeqNr].needResend = true let packetPayloadLength = s.outBuffer[packetSeqNr].payloadLength # lack of waiters notification in case of timeout effectivly means that @@ -453,12 +459,22 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = # than to fit at least one packet. let oldMaxWindow = socket.sendBufferTracker.maxWindow let newMaxWindow = max((oldMaxWindow * 2) div 3, currentPacketSize) + + debug "Decaying max window due to socket idling", + oldMaxWindow = oldMaxWindow, + newMaxWindow = newMaxWindow + socket.sendBufferTracker.updateMaxWindowSize( # maxRemote window does not change socket.sendBufferTracker.maxRemoteWindow, newMaxWindow ) else: + + debug "Reseting window size do fit a least one packet", + oldWindowSize = socket.sendBufferTracker.maxWindow, + newWindowSize = currentPacketSize + # delay was so high that window has shrunk below one packet. Reset window # to fit a least one packet and start with slow start socket.sendBufferTracker.updateMaxWindowSize( @@ -479,18 +495,26 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets # TODO add handling of fast timeout - debug "Resending oldest packet in outBuffer", - seqNr = oldestPacketSeqNr, - curWindowPackets = socket.curWindowPackets - doAssert( socket.outBuffer.get(oldestPacketSeqNr).isSome(), "oldest packet should always be available when there is data in flight" ) + let payloadLength = socket.outBuffer[oldestPacketSeqNr].payloadLength if (socket.sendBufferTracker.reserveNBytes(payloadLength)): + debug "Resending oldest packet in outBuffer", + seqNr = oldestPacketSeqNr, + curWindowPackets = socket.curWindowPackets + let dataToSend = socket.setSend(socket.outBuffer[oldestPacketSeqNr]) await socket.sendData(dataToSend) + else: + # TODO Logs added here to check if we need to check for spcae in send buffer + # reference impl does not do it. + debug "Should resend oldest packet in outBuffer but there is no place for more bytes in send buffer", + seqNr = oldestPacketSeqNr, + curWindowPackets = socket.curWindowPackets + # TODO add sending keep alives when necessary @@ -704,7 +728,9 @@ proc newIncomingSocket*[A]( # it does not matter what timeout value we put here, as socket will be in # connected state without outgoing packets in buffer so any timeout hit will # just double rto without any penalties - (Connected, milliseconds(0)) + # although we cannont use 0, as then timeout will be constantly re-set to 500ms + # and there will be a lot of not usefull work done + (Connected, defaultInitialSynTimeout) else: let timeout = cfg.incomingSocketReceiveTimeout.unsafeGet() (SynRecv, timeout) @@ -1002,6 +1028,12 @@ proc sendAck(socket: UtpSocket): Future[void] = ## other packets as we do not track them in outgoing buffet let ackPacket = socket.generateAckPacket() + + debug "Sending STATE packet", + pkSeqNr = ackPacket.header.seqNr, + pkAckNr = ackPacket.header.ackNr, + gotEACK = ackPacket.eack.isSome() + socket.sendData(encodePacket(ackPacket)) proc startIncomingSocket*(socket: UtpSocket) {.async.} = @@ -1021,6 +1053,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = socketKey = socket.socketKey, socketAckNr = socket.ackNr, socketSeqNr = socket.seqNr, + windowPackets = socket.curWindowPackets, packetType = p.header.pType, seqNr = p.header.seqNr, ackNr = p.header.ackNr, @@ -1059,6 +1092,10 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # this case happens if the we already received this ack nr acks = 0 + debug "Packet state variables", + pastExpected = pastExpected, + acks = acks + # If packet is totally of the mark short circout the processing if pastExpected >= reorderBufferMaxSize: @@ -1080,9 +1117,12 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = var (ackedBytes, minRtt) = socket.calculateAckedbytes(acks, timestampInfo.moment) + debug "Bytes acked by classic ack", + bytesAcked = ackedBytes + if (p.eack.isSome()): let selectiveAckedBytes = socket.calculateSelectiveAckBytes(pkAckNr, p.eack.unsafeGet()) - debug "Selective ack bytes", + debug "Bytes acked by selective ack", bytesAcked = selectiveAckedBytes ackedBytes = ackedBytes + selectiveAckedBytes @@ -1176,6 +1216,8 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # a packet that is still waiting to be acked while (socket.curWindowPackets > 0 and socket.outBuffer.get(socket.seqNr - socket.curWindowPackets).isNone()): dec socket.curWindowPackets + debug "Packet in front hase been acked by selective ack. Decrese window", + windowPackets = socket.curWindowPackets if (p.eack.isSome()): socket.selectiveAckPackets(pkAckNr, p.eack.unsafeGet(), timestampInfo.moment) @@ -1260,6 +1302,12 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = inc socket.ackNr dec socket.reorderCount + debug "Socket state after processing in order packet", + socketKey = socket.socketKey, + socketAckNr = socket.ackNr, + reorderCount = socket.reorderCount, + windowPackets = socket.curWindowPackets + # TODO for now we just schedule concurrent task with ack sending. It may # need improvement, as with this approach there is no direct control over # how many concurrent tasks there are and how to cancel them when socket