diff --git a/eth/utp/packets.nim b/eth/utp/packets.nim index 87f1b37..16f3f42 100644 --- a/eth/utp/packets.nim +++ b/eth/utp/packets.nim @@ -9,13 +9,16 @@ import std/[monotimes], faststreams, + chronos, stew/[endians2, results, objects, arrayops], bearssl, ../p2p/discoveryv5/random2 export results -const minimalHeaderSize = 20 -const protocolVersion = 1 +const + minimalHeaderSize = 20 + protocolVersion = 1 + zeroMoment = Moment.init(0, Nanosecond) type PacketType* = enum @@ -47,22 +50,28 @@ type header*: PacketHeaderV1 payload*: seq[uint8] + TimeStampInfo* = object + moment*: Moment + timestamp*: uint32 + # Important timing assumptions for utp protocol here: # 1. Microsecond precisions # 2. Monotonicity # Reference lib have a lot of checks to assume that this is monotonic on # every system, and warnings when monotonic clock is not avaialable. -# For now we can use basic monotime, later it would be good to analyze: -# https://github.com/bittorrent/libutp/blob/master/utp_utils.cpp, to check all the -# timing assumptions on different platforms -proc getMonoTimeTimeStamp*(): uint32 = - # TODO - # this value is equivalent of: - # uint32((Moment.now() - Moment.init(0, Microseconds)).microseconds()) - # on macOs - # so we we can use moment here and return (Moment, uint32) tuple from this function - let time = getMonoTime() - cast[uint32](time.ticks() div 1000) +proc getMonoTimestamp*(): TimeStampInfo = + let currentMoment = Moment.now() + + # Casting this value from int64 to uin32, my lead to some sudden spikes in + # timestamp numeric values i.e it is possible that timestamp can suddenly change + # from 4294967296 to for example 10, this may lead to sudden spikes in + # calculated delays + # uTP implementation is resistant to those spikes are as it keeps history of + # few last delays on uses smallest one for calculating ledbat window. + # so any outlier huge value will be ignored + # + let timestamp = uint32((currentMoment - zeroMoment).microseconds()) + TimeStampInfo(moment: currentMoment, timestamp: timestamp) # Simple generator, not useful for cryptography proc randUint16*(rng: var BrHmacDrbgContext): uint16 = @@ -157,7 +166,7 @@ proc synPacket*(seqNr: uint16, rcvConnectionId: uint16, bufferSize: uint32): Pac # TODO for we do not handle extensions extension: 0'u8, connectionId: rcvConnectionId, - timestamp: getMonoTimeTimeStamp(), + timestamp: getMonoTimestamp().timestamp, timestampDiff: 0'u32, wndSize: bufferSize, seqNr: seqNr, @@ -174,7 +183,7 @@ proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSiz # TODO Handle selective acks extension: 0'u8, connectionId: sndConnectionId, - timestamp: getMonoTimeTimeStamp(), + timestamp: getMonoTimestamp().timestamp, timestampDiff: timestampDiff, wndSize: bufferSize, seqNr: seqNr, @@ -197,7 +206,7 @@ proc dataPacket*( # data packets always have extension field set to 0 extension: 0'u8, connectionId: sndConnectionId, - timestamp: getMonoTimeTimeStamp(), + timestamp: getMonoTimestamp().timestamp, timestampDiff: timestampDiff, wndSize: bufferSize, seqNr: seqNr, @@ -213,7 +222,7 @@ proc resetPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16): Packet # data packets always have extension field set to 0 extension: 0'u8, connectionId: sndConnectionId, - timestamp: getMonoTimeTimeStamp(), + timestamp: getMonoTimestamp().timestamp, # reset packet informs remote about lack of state for given connection, therefore # we do not inform remote about its delay. timestampDiff: 0, @@ -237,7 +246,7 @@ proc finPacket*( # fin packets always have extension field set to 0 extension: 0'u8, connectionId: sndConnectionId, - timestamp: getMonoTimeTimeStamp(), + timestamp: getMonoTimestamp().timestamp, timestampDiff: timestampDiff, wndSize: bufferSize, seqNr: seqNr, diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index f91871a..20f48c3 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -297,7 +297,7 @@ proc init( transmissions: uint16, needResend: bool, payloadLength: uint32, - timeSent: Moment = Moment.now()): T = + timeSent: Moment = getMonoTimestamp().moment): T = OutgoingPacket( packetBytes: packetBytes, transmissions: transmissions, @@ -357,15 +357,14 @@ proc sendAck(socket: UtpSocket): Future[void] = # Should be called before sending packet proc setSend(s: UtpSocket, p: var OutgoingPacket): seq[byte] = - let currentMoment = Moment.now() - let currentTimeStamp = getMonoTimeTimeStamp() + let timestampInfo = getMonoTimestamp() inc p.transmissions p.needResend = false - p.timeSent = currentMoment + p.timeSent = timestampInfo.moment # all bytearrays in outgoing buffer should be properly encoded utp packets # so it is safe to directly modify fields - modifyTimeStampAndAckNr(p.packetBytes, currentTimeStamp, s.ackNr) + modifyTimeStampAndAckNr(p.packetBytes, timestampInfo.timestamp, s.ackNr) return p.packetBytes @@ -410,7 +409,7 @@ proc shouldDisconnectFromFailedRemote(socket: UtpSocket): bool = (socket.retransmitCount >= socket.socketConfig.dataResendsBeforeFailure) proc checkTimeouts(socket: UtpSocket) {.async.} = - let currentTime = Moment.now() + let currentTime = getMonoTimestamp().moment # flush all packets which needs to be re-send if socket.state != Destroy: await socket.flushPackets() @@ -508,7 +507,7 @@ proc getPacketSize*(socket: UtpSocket): int = proc resetSendTimeout(socket: UtpSocket) = socket.retransmitTimeout = socket.rto - socket.rtoTimeout = Moment.now() + socket.retransmitTimeout + socket.rtoTimeout = getMonoTimestamp().moment + socket.retransmitTimeout proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteResult]): Future[void] {.async.} = if writeFut.finished(): @@ -620,7 +619,7 @@ proc new[A]( initialAckNr: uint16, initialTimeout: Duration ): T = - let currentTime = Moment.now() + let currentTime = getMonoTimestamp().moment T( remoteAddress: to, state: state, @@ -793,7 +792,7 @@ proc updateTimeouts(socket: UtpSocket, timeSent: Moment, currentTime: Moment) = # but usually spec lags after implementation so milliseconds(1000) is used socket.rto = max(socket.rtt + (socket.rttVar * 4), milliseconds(1000)) -proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult = +proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult = let packetOpt = socket.outBuffer.get(seqNr) if packetOpt.isSome(): let packet = packetOpt.get() @@ -805,8 +804,6 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult = # TODO analyze if this case can happen with our impl return PacketNotSentYet - let currentTime = Moment.now() - socket.outBuffer.delete(seqNr) # from spec: The rtt and rtt_var is only updated for packets that were sent only once. @@ -830,11 +827,11 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult = # the packet has already been acked (or not sent) PacketAlreadyAcked -proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) = +proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16, currentTime: Moment) = ## Ack packets in outgoing buffer based on ack number in the received packet var i = 0 while i < int(nrPacketsToack): - let result = socket.ackPacket(socket.seqNr - socket.curWindowPackets) + let result = socket.ackPacket(socket.seqNr - socket.curWindowPackets, currentTime) case result of PacketAcked: dec socket.curWindowPackets @@ -890,7 +887,7 @@ proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool = # to scheduler which means there could be potentialy several processPacket procs # running proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = - let receiptTime = Moment.now() + let timestampInfo = getMonoTimestamp() if socket.isAckNrInvalid(p): notice "Received packet with invalid ack nr" @@ -923,11 +920,9 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = notice "Received packet is totally of the mark" return - var (ackedBytes, minRtt) = socket.calculateAckedbytes(acks, receiptTime) + var (ackedBytes, minRtt) = socket.calculateAckedbytes(acks, timestampInfo.moment) # TODO caluclate bytes acked by selective acks here (if thats the case) - let receiptTimestamp = getMonoTimeTimeStamp() - let sentTimeRemote = p.header.timestamp # we are using uint32 not a Duration, to wrap a round in case of @@ -939,14 +934,14 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = if (sentTimeRemote == 0): 0'u32 else: - receiptTimestamp - sentTimeRemote + timestampInfo.timestamp - sentTimeRemote socket.replayMicro = remoteDelay let prevRemoteDelayBase = socket.remoteHistogram.delayBase if (remoteDelay != 0): - socket.remoteHistogram.addSample(remoteDelay, receiptTime) + socket.remoteHistogram.addSample(remoteDelay, timestampInfo.moment) # remote new delay base is less than previous # shift our delay base in other direction to take clock skew into account @@ -959,8 +954,8 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = let actualDelay = p.header.timestampDiff if actualDelay != 0: - socket.ourHistogram.addSample(actualDelay, receiptTime) - socket.driftCalculator.addSample(actualDelay, receiptTime) + socket.ourHistogram.addSample(actualDelay, timestampInfo.moment) + socket.driftCalculator.addSample(actualDelay, timestampInfo.moment) # adjust base delay if delay estimates exceeds rtt if (socket.ourHistogram.getValue() > minRtt): @@ -989,7 +984,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = if (socket.sendBufferTracker.maxRemoteWindow == 0): # when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0 # then it will be reset to minimal value - socket.zeroWindowTimer = Moment.now() + socket.socketConfig.remoteWindowResetTimeout + socket.zeroWindowTimer = timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout # socket.curWindowPackets == acks means that this packet acked all remaining packets # including the sent fin packets @@ -1002,7 +997,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # but in theory remote could stil write some data on this socket (or even its own fin) socket.destroy() - socket.ackPackets(acks) + socket.ackPackets(acks, timestampInfo.moment) case p.header.pType of ST_DATA, ST_FIN: