mirror of https://github.com/status-im/nim-eth.git
Add more logs to utp (#463)
* Add more logs to utp * Change initial timeout for inc socket to positive value
This commit is contained in:
parent
b2dab4be08
commit
45348e7664
|
@ -13,7 +13,10 @@ import
|
|||
./utp_router,
|
||||
../keys
|
||||
|
||||
export utp_router, protocol
|
||||
export utp_router, protocol, chronicles
|
||||
|
||||
logScope:
|
||||
topics = "utp_discv5_protocol"
|
||||
|
||||
type UtpDiscv5Protocol* = ref object of TalkProtocol
|
||||
prot: protocol.Protocol
|
||||
|
@ -52,6 +55,7 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte],
|
|||
let maybeSender = p.prot.getNode(srcId)
|
||||
|
||||
if maybeSender.isSome():
|
||||
debug "Received utp payload from known node. Start processing"
|
||||
let sender = maybeSender.unsafeGet()
|
||||
# processIncomingBytes may respond to remote by using talkreq requests
|
||||
asyncSpawn p.router.processIncomingBytes(request, sender)
|
||||
|
@ -59,6 +63,7 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte],
|
|||
# always receives a talkresp.
|
||||
@[]
|
||||
else:
|
||||
debug "Received utp payload from unknown node. Ignore"
|
||||
@[]
|
||||
|
||||
proc new*(
|
||||
|
|
|
@ -367,9 +367,13 @@ proc flushPackets(socket: UtpSocket) {.async.} =
|
|||
let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true))
|
||||
if (shouldSendPacket):
|
||||
if socket.sendBufferTracker.reserveNBytes(socket.outBuffer[i].payloadLength):
|
||||
debug "Resending packet during flush",
|
||||
pkSeqNr = i
|
||||
let toSend = socket.setSend(socket.outBuffer[i])
|
||||
await socket.sendData(toSend)
|
||||
else:
|
||||
debug "Should resend packet during flush but there is no place in send buffer",
|
||||
pkSeqNr = i
|
||||
# there is no place in send buffer, stop flushing
|
||||
return
|
||||
inc i
|
||||
|
@ -380,6 +384,8 @@ proc markAllPacketAsLost(s: UtpSocket) =
|
|||
|
||||
let packetSeqNr = s.seqNr - 1 - i
|
||||
if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p.transmissions > 0 and p.needResend == false)):
|
||||
debug "Marking packet as lost",
|
||||
pkSeqNr = packetSeqNr
|
||||
s.outBuffer[packetSeqNr].needResend = true
|
||||
let packetPayloadLength = s.outBuffer[packetSeqNr].payloadLength
|
||||
# lack of waiters notification in case of timeout effectivly means that
|
||||
|
@ -453,12 +459,22 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
|
|||
# than to fit at least one packet.
|
||||
let oldMaxWindow = socket.sendBufferTracker.maxWindow
|
||||
let newMaxWindow = max((oldMaxWindow * 2) div 3, currentPacketSize)
|
||||
|
||||
debug "Decaying max window due to socket idling",
|
||||
oldMaxWindow = oldMaxWindow,
|
||||
newMaxWindow = newMaxWindow
|
||||
|
||||
socket.sendBufferTracker.updateMaxWindowSize(
|
||||
# maxRemote window does not change
|
||||
socket.sendBufferTracker.maxRemoteWindow,
|
||||
newMaxWindow
|
||||
)
|
||||
else:
|
||||
|
||||
debug "Reseting window size do fit a least one packet",
|
||||
oldWindowSize = socket.sendBufferTracker.maxWindow,
|
||||
newWindowSize = currentPacketSize
|
||||
|
||||
# delay was so high that window has shrunk below one packet. Reset window
|
||||
# to fit a least one packet and start with slow start
|
||||
socket.sendBufferTracker.updateMaxWindowSize(
|
||||
|
@ -479,18 +495,26 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
|
|||
let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets
|
||||
# TODO add handling of fast timeout
|
||||
|
||||
debug "Resending oldest packet in outBuffer",
|
||||
seqNr = oldestPacketSeqNr,
|
||||
curWindowPackets = socket.curWindowPackets
|
||||
|
||||
doAssert(
|
||||
socket.outBuffer.get(oldestPacketSeqNr).isSome(),
|
||||
"oldest packet should always be available when there is data in flight"
|
||||
)
|
||||
|
||||
let payloadLength = socket.outBuffer[oldestPacketSeqNr].payloadLength
|
||||
if (socket.sendBufferTracker.reserveNBytes(payloadLength)):
|
||||
debug "Resending oldest packet in outBuffer",
|
||||
seqNr = oldestPacketSeqNr,
|
||||
curWindowPackets = socket.curWindowPackets
|
||||
|
||||
let dataToSend = socket.setSend(socket.outBuffer[oldestPacketSeqNr])
|
||||
await socket.sendData(dataToSend)
|
||||
else:
|
||||
# TODO Logs added here to check if we need to check for spcae in send buffer
|
||||
# reference impl does not do it.
|
||||
debug "Should resend oldest packet in outBuffer but there is no place for more bytes in send buffer",
|
||||
seqNr = oldestPacketSeqNr,
|
||||
curWindowPackets = socket.curWindowPackets
|
||||
|
||||
|
||||
# TODO add sending keep alives when necessary
|
||||
|
||||
|
@ -704,7 +728,9 @@ proc newIncomingSocket*[A](
|
|||
# it does not matter what timeout value we put here, as socket will be in
|
||||
# connected state without outgoing packets in buffer so any timeout hit will
|
||||
# just double rto without any penalties
|
||||
(Connected, milliseconds(0))
|
||||
# although we cannont use 0, as then timeout will be constantly re-set to 500ms
|
||||
# and there will be a lot of not usefull work done
|
||||
(Connected, defaultInitialSynTimeout)
|
||||
else:
|
||||
let timeout = cfg.incomingSocketReceiveTimeout.unsafeGet()
|
||||
(SynRecv, timeout)
|
||||
|
@ -1002,6 +1028,12 @@ proc sendAck(socket: UtpSocket): Future[void] =
|
|||
## other packets as we do not track them in outgoing buffet
|
||||
|
||||
let ackPacket = socket.generateAckPacket()
|
||||
|
||||
debug "Sending STATE packet",
|
||||
pkSeqNr = ackPacket.header.seqNr,
|
||||
pkAckNr = ackPacket.header.ackNr,
|
||||
gotEACK = ackPacket.eack.isSome()
|
||||
|
||||
socket.sendData(encodePacket(ackPacket))
|
||||
|
||||
proc startIncomingSocket*(socket: UtpSocket) {.async.} =
|
||||
|
@ -1021,6 +1053,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
|||
socketKey = socket.socketKey,
|
||||
socketAckNr = socket.ackNr,
|
||||
socketSeqNr = socket.seqNr,
|
||||
windowPackets = socket.curWindowPackets,
|
||||
packetType = p.header.pType,
|
||||
seqNr = p.header.seqNr,
|
||||
ackNr = p.header.ackNr,
|
||||
|
@ -1059,6 +1092,10 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
|||
# this case happens if the we already received this ack nr
|
||||
acks = 0
|
||||
|
||||
debug "Packet state variables",
|
||||
pastExpected = pastExpected,
|
||||
acks = acks
|
||||
|
||||
# If packet is totally of the mark short circout the processing
|
||||
if pastExpected >= reorderBufferMaxSize:
|
||||
|
||||
|
@ -1080,9 +1117,12 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
|||
|
||||
var (ackedBytes, minRtt) = socket.calculateAckedbytes(acks, timestampInfo.moment)
|
||||
|
||||
debug "Bytes acked by classic ack",
|
||||
bytesAcked = ackedBytes
|
||||
|
||||
if (p.eack.isSome()):
|
||||
let selectiveAckedBytes = socket.calculateSelectiveAckBytes(pkAckNr, p.eack.unsafeGet())
|
||||
debug "Selective ack bytes",
|
||||
debug "Bytes acked by selective ack",
|
||||
bytesAcked = selectiveAckedBytes
|
||||
ackedBytes = ackedBytes + selectiveAckedBytes
|
||||
|
||||
|
@ -1176,6 +1216,8 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
|||
# a packet that is still waiting to be acked
|
||||
while (socket.curWindowPackets > 0 and socket.outBuffer.get(socket.seqNr - socket.curWindowPackets).isNone()):
|
||||
dec socket.curWindowPackets
|
||||
debug "Packet in front hase been acked by selective ack. Decrese window",
|
||||
windowPackets = socket.curWindowPackets
|
||||
|
||||
if (p.eack.isSome()):
|
||||
socket.selectiveAckPackets(pkAckNr, p.eack.unsafeGet(), timestampInfo.moment)
|
||||
|
@ -1260,6 +1302,12 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
|||
inc socket.ackNr
|
||||
dec socket.reorderCount
|
||||
|
||||
debug "Socket state after processing in order packet",
|
||||
socketKey = socket.socketKey,
|
||||
socketAckNr = socket.ackNr,
|
||||
reorderCount = socket.reorderCount,
|
||||
windowPackets = socket.curWindowPackets
|
||||
|
||||
# TODO for now we just schedule concurrent task with ack sending. It may
|
||||
# need improvement, as with this approach there is no direct control over
|
||||
# how many concurrent tasks there are and how to cancel them when socket
|
||||
|
|
Loading…
Reference in New Issue