diff --git a/eth/utp/utp_discv5_protocol.nim b/eth/utp/utp_discv5_protocol.nim index 4cd26f8..5dacebc 100644 --- a/eth/utp/utp_discv5_protocol.nim +++ b/eth/utp/utp_discv5_protocol.nim @@ -25,6 +25,12 @@ proc hash(x: UtpSocketKey[Node]): Hash = h = h !& x.rcvId.hash !$h +func `$`*(x: UtpSocketKey[Node]): string = + "(remoteId: " & $x.remoteAddress.id & + ", remoteAddress: " & $x.remoteAddress.address & + ", rcvId: "& $x.rcvId & + ")" + proc initSendCallback( t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[Node] = return ( diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index aefb1be..f967ea9 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -137,13 +137,15 @@ proc shouldAllowConnection[A](r: UtpRouter[A], remoteAddress: A, connectionId: u r.allowConnection(r, remoteAddress, connectionId) proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= - notice "Received packet ", packet = p + debug "Received packet ", + sender = sender, + packetType = p.header.pType case p.header.pType of ST_RESET: let maybeSocket = r.getSocketOnReset(sender, p.header.connectionId) if maybeSocket.isSome(): - notice "Received rst packet on known connection closing" + debug "Received RST packet on known connection, closing socket" let socket = maybeSocket.unsafeGet() # reference implementation acutally changes the socket state to reset state unless # user explicitly closed socket before. The only difference between reset and destroy @@ -152,35 +154,38 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= # explictly. socket.destroy() else: - notice "Received rst packet for not known connection" + debug "Received RST packet for unknown connection, ignoring" of ST_SYN: # Syn packet are special, and we need to add 1 to header connectionId let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId + 1) let maybeSocket = r.getUtpSocket(socketKey) if (maybeSocket.isSome()): - notice "Ignoring SYN for already existing connection" + debug "Ignoring SYN for already existing connection" else: if (r.shouldAllowConnection(sender, p.header.connectionId)): - notice "Received SYN for not known connection. Initiating incoming connection" + debug "Received SYN for new connection. Initiating incoming connection" # Initial ackNr is set to incoming packer seqNr let incomingSocket = newIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[]) r.registerUtpSocket(incomingSocket) await incomingSocket.startIncomingSocket() # Based on configuration, socket is passed to upper layer either in SynRecv # or Connected state + info "Accepting incoming connection", + to = incomingSocket.socketKey asyncSpawn r.acceptConnection(r, incomingSocket) else: - notice "Connection declined" + debug "Connection declined" else: let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId) let maybeSocket = r.getUtpSocket(socketKey) if (maybeSocket.isSome()): + debug "Received FIN/DATA/ACK packet on existing socket" let socket = maybeSocket.unsafeGet() await socket.processPacket(p) else: # TODO add keeping track of recently send reset packets and do not send reset # to peers which we recently send reset to. - notice "Recevied FIN/DATA/ACK on not known socket sending reset" + debug "Received FIN/DATA/ACK on not known socket sending reset" let rstPacket = resetPacket(randUint16(r.rng[]), p.header.connectionId, p.header.seqNr) await r.sendCb(sender, encodePacket(rstPacket)) @@ -190,7 +195,8 @@ proc processIncomingBytes*[A](r: UtpRouter[A], bytes: seq[byte], sender: A) {.as if (dec.isOk()): await processPacket[A](r, dec.get(), sender) else: - warn "failed to decode packet from address", address = sender + let err = dec.error() + warn "failed to decode packet from address", address = sender, msg = err proc generateNewUniqueSocket[A](r: UtpRouter[A], address: A): Option[UtpSocket[A]] = ## Tries to generate unique socket, gives up after maxSocketGenerationTries tries @@ -208,6 +214,9 @@ proc generateNewUniqueSocket[A](r: UtpRouter[A], address: A): Option[UtpSocket[A return none[UtpSocket[A]]() proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}= + info "Initiating connection", + to = s.socketKey + let startFut = s.startOutgoingSocket() startFut.cancelCallback = proc(udata: pointer) {.gcsafe.} = @@ -217,11 +226,17 @@ proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}= try: await startFut + info "Outgoing connection successful", + to = s.socketKey return ok(s) except ConnectionError: + info "Outgoing connection timed-out", + to = s.socketKey s.destroy() return err(OutgoingConnectionError(kind: ConnectionTimedOut)) except CatchableError as e: + info "Outgoing connection failed due to send error", + to = s.socketKey s.destroy() # this may only happen if user provided callback will for some reason fail return err(OutgoingConnectionError(kind: ErrorWhileSendingSyn, error: e)) diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index b0f7154..418b848 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -18,6 +18,8 @@ import ./utp_utils, ./clock_drift_calculator +export + chronicles logScope: topics = "utp_socket" @@ -339,7 +341,11 @@ proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) = inc socket.curWindowPackets proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] = - socket.send(socket.remoteAddress, data) + let f = socket.send(socket.remoteAddress, data) + f.callback = proc(data: pointer) {.gcsafe.} = + if f.failed: + warn "UTP send failed", msg = f.readError.msg + return f # Should be called before sending packet proc setSend(s: UtpSocket, p: var OutgoingPacket): seq[byte] = @@ -407,6 +413,12 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = socket.sendBufferTracker.updateMaxRemote(minimalRemoteWindow) if (currentTime > socket.rtoTimeout): + debug "CheckTimeouts rto timeout", + socketKey = socket.socketKey, + state = socket.state, + maxWindow = socket.sendBufferTracker.maxWindow, + curWindowPackets = socket.curWindowPackets, + curWindowBytes = socket.sendBufferTracker.currentWindow # TODO add handling of probe time outs. Reference implemenation has mechanism # of sending probes to determine mtu size. Probe timeouts do not count to standard @@ -419,6 +431,10 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = return if socket.shouldDisconnectFromFailedRemote(): + debug "Remote host failed", + state = socket.state, + retransmitCount = socket.retransmitCount + if socket.state == SynSent and (not socket.connectionFuture.finished()): socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out")) @@ -459,11 +475,14 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = # resend oldest packet if there are some packets in flight if (socket.curWindowPackets > 0): - notice "resending oldest packet in outBuffer" inc socket.retransmitCount let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets # TODO add handling of fast timeout + debug "Resending oldest packet in outBuffer", + seqNr = oldestPacketSeqNr, + curWindowPackets = socket.curWindowPackets + doAssert( socket.outBuffer.get(oldestPacketSeqNr).isSome(), "oldest packet should always be available when there is data in flight" @@ -577,8 +596,13 @@ proc writeLoop(socket: UtpSocket): Future[void] {.async.} = case req.kind of Data: await socket.handleDataWrite(req.data, req.writer) + info "Written data to remote", + to = socket.socketKey, + bytesWritten = len(req.data) of Close: await socket.handleClose() + info "Sent FIN to remote", + to = socket.socketKey except CancelledError: doAssert(socket.state == Destroy) @@ -701,7 +725,9 @@ proc newIncomingSocket*[A]( proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} = doAssert(socket.state == SynSent) let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize()) - notice "Sending syn packet packet", packet = packet + debug "Sending SYN packet", + seqNr = packet.header.seqNr, + connectionId = packet.header.connectionId # set number of transmissions to 1 as syn packet will be send just after # initiliazation let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false, 0) @@ -718,6 +744,8 @@ proc isClosed*(socket: UtpSocket): bool = socket.state == Destroy and socket.closeEvent.isSet() proc destroy*(s: UtpSocket) = + info "Destroying socket", + to = s.socketKey ## Moves socket to destroy state and clean all reasources. ## Remote is not notified in any way about socket end of life s.state = Destroy @@ -786,6 +814,11 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult socket.outBuffer.delete(seqNr) + debug "Acked packet (deleted from outgoing buffer)", + pkSeqNr = seqNr, + pkTransmissions = packet.transmissions, + pkNeedReesend = packet.needResend + # from spec: The rtt and rtt_var is only updated for packets that were sent only once. # This avoids problems with figuring out which packet was acked, the first or the second one. # it is standard solution to retransmission ambiguity problem @@ -804,6 +837,7 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult socket.retransmitCount = 0 PacketAcked else: + debug "Tried to ack packet which was already acked or not sent yet" # the packet has already been acked (or not sent) PacketAlreadyAcked @@ -982,10 +1016,23 @@ proc startIncomingSocket*(socket: UtpSocket) {.async.} = # to scheduler which means there could be potentialy several processPacket procs # running proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = + + debug "Process packet", + socketKey = socket.socketKey, + packetType = p.header.pType, + seqNr = p.header.seqNr, + ackNr = p.header.ackNr, + timestamp = p.header.timestamp, + timestampDiff = p.header.timestampDiff + let timestampInfo = getMonoTimestamp() if socket.isAckNrInvalid(p): - notice "Received packet with invalid ack nr" + debug "Received packet with invalid ack number", + ackNr = p.header.ackNr, + localSeqNr = socket.seqNr, + lastUnacked = socket.seqNr - socket.curWindowPackets + return ## Updates socket state based on received packet, and sends ack when necessary. @@ -1012,14 +1059,16 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # If packet is totally of the mark short circout the processing if pastExpected >= reorderBufferMaxSize: - notice "Received packet is totally of the mark" + debug "Got an invalid packet sequence number, too far off", + pastExpected = pastExpected return var (ackedBytes, minRtt) = socket.calculateAckedbytes(acks, timestampInfo.moment) - # TODO caluclate bytes acked by selective acks here (if thats the case) if (p.eack.isSome()): let selectiveAckedBytes = socket.calculateSelectiveAckBytes(pkAckNr, p.eack.unsafeGet()) + debug "Selective ack bytes", + bytesAcked = selectiveAckedBytes ackedBytes = ackedBytes + selectiveAckedBytes let sentTimeRemote = p.header.timestamp @@ -1080,15 +1129,25 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = socket.slowStart = newSlowStart socket.slowStartTreshold = newSlowStartTreshold + debug "Applied ledbat congestion controller", + maxWindow = newMaxWindow, + remoteWindow = p.header.wndSize, + slowStartTreshold = newSlowStartTreshold, + slowstart = newSlowStart + if (socket.sendBufferTracker.maxRemoteWindow == 0): # when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0 # then it will be reset to minimal value socket.zeroWindowTimer = timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout + debug "Remote window size dropped to 0", + currentTime = timestampInfo.moment, + resetZeroWindowTime = socket.zeroWindowTimer + # socket.curWindowPackets == acks means that this packet acked all remaining packets # including the sent fin packets if (socket.finSent and socket.curWindowPackets == acks): - notice "FIN acked, destroying socket" + debug "FIN acked, destroying socket" socket.finAcked = true # this bit of utp spec is a bit under specified (i.e there is not specification at all) # reference implementation moves socket to destroy state in case that our fin was acked @@ -1116,13 +1175,19 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = socket.state = Connected if (p.header.pType == ST_FIN and (not socket.gotFin)): + debug "Received FIN packet", + eofPktNr = pkSeqNr, + curAckNr = socket.ackNr + socket.gotFin = true socket.eofPktNr = pkSeqNr # we got in order packet if (pastExpected == 0 and (not socket.reachedFin)): - notice "Got in order packet" + debug "Received in order packet" if (len(p.payload) > 0 and (not socket.readShutdown)): + debug "Received data packet", + bytesReceived = len(p.payload) # we are getting in order data packet, we can flush data directly to the incoming buffer await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) # Bytes have been passed to upper layer, we can increase number of last @@ -1130,12 +1195,16 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = inc socket.ackNr # check if the following packets are in reorder buffer + + debug "Looking for packets in re-order buffer", + reorderCount = socket.reorderCount + while true: # We are doing this in reoreder loop, to handle the case when we already received # fin but there were some gaps before eof # we have reached remote eof, and should not receive more packets from remote if ((not socket.reachedFin) and socket.gotFin and socket.eofPktNr == socket.ackNr): - notice "Reached socket EOF" + debug "Reached socket EOF" # In case of reaching eof, it is up to user of library what to to with # it. With the current implementation, the most apropriate way would be to # destory it (as with our implementation we know that remote is destroying its acked fin) @@ -1177,10 +1246,13 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # we got packet out of order else: - notice "Got out of order packet" + debug "Got out of order packet" if (socket.gotFin and pkSeqNr > socket.eofPktNr): - notice "Got packet past eof" + debug "Got packet past eof", + pkSeqNr = pkSeqNr, + eofPktNr = socket.eofPktNr + return # growing buffer before checking the packet is already there to avoid @@ -1188,11 +1260,13 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = socket.inBuffer.ensureSize(pkSeqNr + 1, pastExpected + 1) if (socket.inBuffer.get(pkSeqNr).isSome()): - notice "packet already received" + debug "Packet with seqNr already received", + seqNr = pkSeqNr else: socket.inBuffer.put(pkSeqNr, p) inc socket.reorderCount - notice "added out of order packet in reorder buffer" + debug "added out of order packet to reorder buffer", + reorderCount = socket.reorderCount # we send ack packet, as we reoreder count is > 0, so the eack bitmask will be # generated asyncSpawn socket.sendAck() @@ -1209,9 +1283,9 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = socket.connectionFuture.complete() of ST_RESET: - notice "Received ST_RESET on known socket, ignoring" + debug "Received ST_RESET on known socket, ignoring" of ST_SYN: - notice "Received ST_SYN on known socket, ignoring" + debug "Received ST_SYN on known socket, ignoring" proc atEof*(socket: UtpSocket): bool = # socket is considered at eof when remote side sent us fin packet @@ -1230,6 +1304,8 @@ proc close*(socket: UtpSocket) = socket.readShutdown = true if (not socket.sendFinRequested): try: + info "Sending FIN", + to = socket.socketKey # with this approach, all pending writes will be executed before sending fin packet # we could also and method which places close request as first one to process # but it would complicate the write loop @@ -1254,6 +1330,10 @@ proc closeWait*(socket: UtpSocket) {.async.} = await socket.closeEvent.wait() proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] = + info "Write data", + to = socket.socketKey, + length = len(data) + let retFuture = newFuture[WriteResult]("UtpSocket.write") if (socket.state != Connected): @@ -1310,6 +1390,10 @@ proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}= bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1)) (count, len(bytes) == n) + debug "Read data ", + remote = socket.socketKey, + length = n + return bytes proc read*(socket: UtpSocket): Future[seq[byte]] {.async.}= @@ -1326,6 +1410,10 @@ proc read*(socket: UtpSocket): Future[seq[byte]] {.async.}= bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1)) (count, false) + debug "Read data ", + remote = socket.socketKey, + length = len(bytes) + return bytes # Check how many packets are still in the out going buffer, usefull for tests or