diff --git a/eth/utp/utp_discv5_protocol.nim b/eth/utp/utp_discv5_protocol.nim index 3cdc906..9241b23 100644 --- a/eth/utp/utp_discv5_protocol.nim +++ b/eth/utp/utp_discv5_protocol.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2021 Status Research & Development GmbH +# Copyright (c) 2021-2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -13,12 +13,12 @@ import ./utp_router, ../keys -export utp_router, protocol, chronicles +export utp_router, protocol logScope: topics = "utp_discv5_protocol" -type +type NodeAddress* = object nodeId*: NodeId address*: Address @@ -27,10 +27,10 @@ type prot: protocol.Protocol router: UtpRouter[NodeAddress] -proc init*(T: type NodeAddress, nodeId: NodeId, address: Address): NodeAddress = +proc init*(T: type NodeAddress, nodeId: NodeId, address: Address): NodeAddress = NodeAddress(nodeId: nodeId, address: address) -proc init*(T: type NodeAddress, node: Node): Option[NodeAddress] = +proc init*(T: type NodeAddress, node: Node): Option[NodeAddress] = node.address.map((address: Address) => NodeAddress(nodeId: node.id, address: address)) proc hash(x: NodeAddress): Hash = @@ -60,7 +60,7 @@ proc talkReqDirect(p: protocol.Protocol, n: NodeAddress, protocol, request: seq[ trace "Send message packet", dstId = n.nodeId, address = n.address, kind = MessageKind.talkreq p.send(n.address, data) - + proc initSendCallback( t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[NodeAddress] = return ( @@ -77,7 +77,7 @@ proc initSendCallback( proc messageHandler(protocol: TalkProtocol, request: seq[byte], srcId: NodeId, srcUdpAddress: Address): seq[byte] = - let + let p = UtpDiscv5Protocol(protocol) nodeAddress = NodeAddress.init(srcId, srcUdpAddress) debug "Received utp payload from known node. Start processing", diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index c934840..03e3579 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2021 Status Research & Development GmbH +# Copyright (c) 2021-2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -13,6 +13,11 @@ import ./utp_socket, ./packets +export utp_socket + +logScope: + topics = "utp_router" + declareCounter utp_received_packets, "All correct received uTP packets" declareCounter utp_failed_packets, @@ -28,11 +33,6 @@ declareCounter utp_success_outgoing, declareCounter utp_failed_outgoing, "Total number of failed outgoing connections" -logScope: - topics = "utp_router" - -export utp_socket - type # New remote client connection callback # ``server`` - UtpProtocol object. @@ -132,7 +132,8 @@ proc new*[A]( # There are different possibilities on how the connection got established, need # to check every case. -proc getSocketOnReset[A](r: UtpRouter[A], sender: A, id: uint16): Option[UtpSocket[A]] = +proc getSocketOnReset[A]( + r: UtpRouter[A], sender: A, id: uint16): Option[UtpSocket[A]] = # id is our recv id let recvKey = UtpSocketKey[A].init(sender, id) @@ -146,7 +147,8 @@ proc getSocketOnReset[A](r: UtpRouter[A], sender: A, id: uint16): Option[UtpSock .orElse(r.getUtpSocket(sendInitKey).filter(s => s.connectionIdSnd == id)) .orElse(r.getUtpSocket(sendNoInitKey).filter(s => s.connectionIdSnd == id)) -proc shouldAllowConnection[A](r: UtpRouter[A], remoteAddress: A, connectionId: uint16): bool = +proc shouldAllowConnection[A]( + r: UtpRouter[A], remoteAddress: A, connectionId: uint16): bool = if r.allowConnection == nil: # if the callback is not configured it means all incoming connections are allowed true @@ -183,14 +185,15 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= debug "Received SYN for new connection. Initiating incoming connection", synSeqNr = p.header.seqNr # Initial ackNr is set to incoming packer seqNr - let incomingSocket = newIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[]) + let incomingSocket = newIncomingSocket[A]( + sender, r.sendCb, r.socketConfig, + p.header.connectionId, p.header.seqNr, r.rng[]) r.registerUtpSocket(incomingSocket) incomingSocket.startIncomingSocket() - # Based on configuration, socket is passed to upper layer either in SynRecv - # or Connected state + # Based on configuration, socket is passed to upper layer either in + # SynRecv or Connected state utp_allowed_incoming.inc() - info "Accepting incoming connection", - to = incomingSocket.socketKey + debug "Accepting incoming connection", src = incomingSocket.socketKey asyncSpawn r.acceptConnection(r, incomingSocket) else: utp_declined_incoming.inc() @@ -203,13 +206,15 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= let socket = maybeSocket.unsafeGet() await socket.processPacket(p) else: - # TODO add keeping track of recently send reset packets and do not send reset - # to peers which we recently send reset to. + # TODO add keeping track of recently send reset packets and do not send + # reset to peers which we recently send reset to. debug "Received FIN/DATA/ACK on not known socket sending reset" - let rstPacket = resetPacket(randUint16(r.rng[]), p.header.connectionId, p.header.seqNr) + let rstPacket = resetPacket( + randUint16(r.rng[]), p.header.connectionId, p.header.seqNr) await r.sendCb(sender, encodePacket(rstPacket)) -proc processIncomingBytes*[A](r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} = +proc processIncomingBytes*[A]( + r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} = if (not r.closed): let dec = decodePacket(bytes) if (dec.isOk()): @@ -218,15 +223,17 @@ proc processIncomingBytes*[A](r: UtpRouter[A], bytes: seq[byte], sender: A) {.as else: utp_failed_packets.inc() let err = dec.error() - warn "failed to decode packet from address", address = sender, msg = err + warn "Failed to decode packet from address", address = sender, msg = err -proc generateNewUniqueSocket[A](r: UtpRouter[A], address: A): Option[UtpSocket[A]] = - ## Tries to generate unique socket, gives up after maxSocketGenerationTries tries +proc generateNewUniqueSocket[A]( + r: UtpRouter[A], address: A):Option[UtpSocket[A]] = + ## Try to generate unique socket, give up after maxSocketGenerationTries tries var tryCount = 0 while tryCount < maxSocketGenerationTries: let rcvId = randUint16(r.rng[]) - let socket = newOutgoingSocket[A](address, r.sendCb, r.socketConfig, rcvId, r.rng[]) + let socket = newOutgoingSocket[A]( + address, r.sendCb, r.socketConfig, rcvId, r.rng[]) if r.registerIfAbsent(socket): return some(socket) @@ -236,32 +243,31 @@ proc generateNewUniqueSocket[A](r: UtpRouter[A], address: A): Option[UtpSocket[A return none[UtpSocket[A]]() proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}= - info "Initiating connection", - to = s.socketKey + debug "Initiating connection", dst = s.socketKey let startFut = s.startOutgoingSocket() startFut.cancelCallback = proc(udata: pointer) {.gcsafe.} = - # if for some reason future will be cancelled, destory socket to clear it from - # active socket list + # if for some reason the future is cancelled, destroy socket to clear it + # from the active socket list s.destroy() try: await startFut utp_success_outgoing.inc() - info "Outgoing connection successful", - to = s.socketKey + debug "Outgoing connection successful", dst = s.socketKey return ok(s) except ConnectionError: utp_failed_outgoing.inc() - info "Outgoing connection timed-out", - to = s.socketKey + debug "Outgoing connection timed-out", dst = s.socketKey s.destroy() return err(OutgoingConnectionError(kind: ConnectionTimedOut)) # Connect to provided address -# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 -proc connectTo*[A](r: UtpRouter[A], address: A): Future[ConnectionResult[A]] {.async.} = +# Reference implementation: +# https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 +proc connectTo*[A]( + r: UtpRouter[A], address: A): Future[ConnectionResult[A]] {.async.} = let maybeSocket = r.generateNewUniqueSocket(address) if (maybeSocket.isNone()): @@ -272,8 +278,11 @@ proc connectTo*[A](r: UtpRouter[A], address: A): Future[ConnectionResult[A]] {.a # Connect to provided address with provided connection id, if socket with this id # and address already exsits return error -proc connectTo*[A](r: UtpRouter[A], address: A, connectionId: uint16): Future[ConnectionResult[A]] {.async.} = - let socket = newOutgoingSocket[A](address, r.sendCb, r.socketConfig, connectionId, r.rng[]) +proc connectTo*[A]( + r: UtpRouter[A], address: A, connectionId: uint16): + Future[ConnectionResult[A]] {.async.} = + let socket = newOutgoingSocket[A]( + address, r.sendCb, r.socketConfig, connectionId, r.rng[]) if (r.registerIfAbsent(socket)): return await socket.connect() @@ -282,7 +291,7 @@ proc connectTo*[A](r: UtpRouter[A], address: A, connectionId: uint16): Future[Co proc shutdown*[A](r: UtpRouter[A]) = # stop processing any new packets and close all sockets in background without - # notifing remote peers + # notifying remote peers r.closed = true for s in r.allSockets(): s.destroy() @@ -290,12 +299,12 @@ proc shutdown*[A](r: UtpRouter[A]) = proc shutdownWait*[A](r: UtpRouter[A]) {.async.} = var activeSockets: seq[UtpSocket[A]] = @[] # stop processing any new packets and close all sockets without - # notifing remote peers + # notifying remote peers r.closed = true - # we need to make copy as calling socket.destroyWait() removes socket from the table - # and iterator throws error. Antother option would be to wait until number of opensockets - # go to 0 + # Need to make a copy as calling socket.destroyWait() removes the socket from + # the table and iterator throws error. Another option would be to wait until + # the number of open sockets drops to 0 for s in r.allSockets(): activeSockets.add(s) diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 4364261..7b24fcf 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2021 Status Research & Development GmbH +# Copyright (c) 2021-2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -203,7 +203,7 @@ type # current size of rcv buffer offset: int - + # readers waiting for data pendingReads: Deque[ReadReq] @@ -304,11 +304,11 @@ type ConnectionResult*[A] = Result[UtpSocket[A], OutgoingConnectionError] const - # Default maximum size of the data packet payload. With such configuration + # Default maximum size of the data packet payload. With such configuration # data packets will have 508 bytes (488 + 20 header). # 508 bytes of udp payload can translate into 576 bytes udp packet i.e # 508bytes paylaod + 60bytes (max ip header) + 8bytes (udp header) = 576bytes. - # 576bytes is defined as minimum reassembly buffer size i.e + # 576bytes is defined as minimum reassembly buffer size i.e # the minimum datagram size that we are guaranteed any implementation must support. # from RFC791: All hosts must be prepared # to accept datagrams of up to 576 octets (whether they arrive whole @@ -346,7 +346,7 @@ const allowedAckWindow*: uint16 = 3 # Timeout after which the send window will be reset to its minimal value after it dropped - # lower than our current packet size. i.e when we received a packet + # lower than our current packet size. i.e when we received a packet # from remote peer with `wndSize` set to number <= current packet size defaultResetWindowTimeout = seconds(15) @@ -357,7 +357,7 @@ const # minimal time before subseqent window decays maxWindowDecay = milliseconds(100) - # Maximal size of reorder buffer as fraction of optRcvBuffer size following + # Maximal size of reorder buffer as fraction of optRcvBuffer size following # semantics apply bases on rcvBuffer set to 1000 bytes: # if there are already 1000 bytes in rcv buffer no more bytes will be accepted to reorder buffer # if there are already 500 bytes in reoreder buffer, no more bytes will be accepted @@ -411,7 +411,7 @@ proc init*( ) # number of bytes which will fit in current send window -proc freeWindowBytes(socket: UtpSocket): uint32 = +proc freeWindowBytes(socket: UtpSocket): uint32 = let maxSend = min(socket.maxRemoteWindow, socket.maxWindow) if (maxSend <= socket.currentWindow): return 0 @@ -439,7 +439,7 @@ proc sendData(socket: UtpSocket, data: seq[byte]) = if f.failed: warn "UTP send failed", msg = f.readError.msg -proc sendPacket(socket: UtpSocket, seqNr: uint16) = +proc sendPacket(socket: UtpSocket, seqNr: uint16) = proc setSend(p: var OutgoingPacket): seq[byte] = let timestampInfo = getMonoTimestamp() @@ -454,9 +454,9 @@ proc sendPacket(socket: UtpSocket, seqNr: uint16) = modifyTimeStampAndAckNr(p.packetBytes, timestampInfo.timestamp, socket.ackNr) return p.packetBytes - + socket.sendData(setSend(socket.outBuffer[seqNr])) - + proc resetSendTimeout(socket: UtpSocket) = socket.retransmitTimeout = socket.rto socket.rtoTimeout = getMonoTimestamp().moment + socket.retransmitTimeout @@ -529,7 +529,7 @@ proc checkTimeouts(socket: UtpSocket) = debug "Reset remote window to minimal value", minRemote = minimalRemoteWindow socket.zeroWindowTimer = none[Moment]() - + if (currentTime > socket.rtoTimeout): debug "CheckTimeouts rto timeout", socketKey = socket.socketKey, @@ -551,7 +551,7 @@ proc checkTimeouts(socket: UtpSocket) = if socket.shouldDisconnectFromFailedRemote(): debug "Remote host failed", state = socket.state, - retransmitCount = socket.retransmitCount + retransmitCount = socket.retransmitCount if socket.state == SynSent and (not socket.connectionFuture.finished()): socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out")) @@ -577,7 +577,7 @@ proc checkTimeouts(socket: UtpSocket) = oldMaxWindow = oldMaxWindow, newMaxWindow = newMaxWindow - socket.maxWindow = newMaxWindow + socket.maxWindow = newMaxWindow elif (socket.maxWindow < currentPacketSize): # due to high delay window has shrunk below packet size # which means that we cannot send more data @@ -601,7 +601,7 @@ proc checkTimeouts(socket: UtpSocket) = if (socket.curWindowPackets > 0 and socket.outBuffer[oldestPacketSeqNr].transmissions > 0): inc socket.retransmitCount socket.fastTimeout = true - + debug "Resending oldest packet", pkSeqNr = oldestPacketSeqNr, retransmitCount = socket.retransmitCount, @@ -610,7 +610,7 @@ proc checkTimeouts(socket: UtpSocket) = # Oldest packet should always be present, so it is safe to call force # resend socket.sendPacket(oldestPacketSeqNr) - + # TODO add sending keep alives when necessary proc checkTimeoutsLoop(s: UtpSocket) {.async.} = @@ -692,12 +692,11 @@ proc isClosed*(socket: UtpSocket): bool = socket.state == Destroy and socket.closeEvent.isSet() proc isClosedAndCleanedUpAllResources*(socket: UtpSocket): bool = - ## Test Api to check that all resources are properly cleaned up + ## Test Api to check that all resources are properly cleaned up socket.isClosed() and socket.eventLoop.cancelled() and socket.checkTimeoutsLoop.cancelled() proc destroy*(s: UtpSocket) = - info "Destroying socket", - to = s.socketKey + debug "Destroying socket", to = s.socketKey ## Moves socket to destroy state and clean all reasources. ## Remote is not notified in any way about socket end of life s.state = Destroy @@ -896,7 +895,7 @@ proc tryDecayWindow(socket: UtpSocket, now: Moment) = if (now - socket.lastWindowDecay >= maxWindowDecay): socket.lastWindowDecay = now let newMaxWindow = max(uint32(0.5 * float64(socket.maxWindow)), uint32(minWindowSize)) - + debug "Decaying maxWindow", oldWindow = socket.maxWindow, newWindow = newMaxWindow @@ -904,7 +903,7 @@ proc tryDecayWindow(socket: UtpSocket, now: Moment) = socket.maxWindow = newMaxWindow socket.slowStart = false socket.slowStartTreshold = newMaxWindow - + # ack packets (removes them from out going buffer) based on selective ack extension header proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension, currentTime: Moment): void = # we add 2, as the first bit in the mask therefore represents ackNr + 2 becouse @@ -930,7 +929,7 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S if (socket.seqNr - v - 1) >= socket.curWindowPackets - 1: dec bits continue - + let bitSet: bool = getBit(ext.acks, bits) if bitSet: @@ -950,7 +949,7 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S discard socket.ackPacket(v, currentTime) dec bits continue - + if counter >= duplicateAcksBeforeResend and (v - socket.fastResendSeqNr) <= reorderBufferMaxSize: debug "No ack for packet", pkAckNr = v, @@ -982,7 +981,7 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S # packet is no longer in send buffer ignore whole further processing dec i continue - + registerLoss = true # it is safe to call as we already checked that packet is in send buffer @@ -1043,7 +1042,7 @@ proc generateAckPacket*(socket: UtpSocket): Packet = bitmask ) -proc sendAck(socket: UtpSocket) = +proc sendAck(socket: UtpSocket) = ## Creates and sends ack, based on current socket state. Acks are different from ## other packets as we do not track them in outgoing buffet @@ -1138,8 +1137,8 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = # the fast-resend on duplicate-ack logic for bi-directional connections # (except in the case of a selective ACK). This is in line with BSD4.4 TCP # implementation. - if socket.curWindowPackets > 0 and - pkAckNr == socket.seqNr - socket.curWindowPackets - 1 and + if socket.curWindowPackets > 0 and + pkAckNr == socket.seqNr - socket.curWindowPackets - 1 and p.header.pType == ST_STATE: inc socket.duplicateAck @@ -1163,7 +1162,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = # if `pastExpected` is really big number (for example: uint16.high) then most # probably we are receiving packet which we already received # example: we already received packet with `seqNr = 10` so our `socket.ackNr = 10` - # if we receive this packet once again then `pastExpected = 10 - 10 - 1` which + # if we receive this packet once again then `pastExpected = 10 - 10 - 1` which # equals (due to wrapping) 65535 # this means that remote most probably did not receive our ack, so we need to resend # it. We are doing it for last `reorderBufferMaxSize` packets @@ -1180,7 +1179,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = debug "Bytes acked by classic ack", bytesAcked = ackedBytes - + if (p.eack.isSome()): let selectiveAckedBytes = socket.calculateSelectiveAckBytes(pkAckNr, p.eack.unsafeGet()) debug "Bytes acked by selective ack", @@ -1310,11 +1309,11 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = pkSeqNr = oldestOutstandingPktSeqNr inc socket.fastResendSeqNr - + # Is is safe to call force resend as we already checked shouldReSendPacket # condition socket.sendPacket(oldestOutstandingPktSeqNr) - + if (p.eack.isSome()): socket.selectiveAckPackets(pkAckNr, p.eack.unsafeGet(), timestampInfo.moment) @@ -1360,7 +1359,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = # await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr p.payload[0], payloadLength) socket.offset = socket.offset + payloadLength - + # Bytes have been passed to upper layer, we can increase number of last # acked packet inc socket.ackNr @@ -1408,8 +1407,8 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = socektAckNr = socket.ackNr, rcvbufferSize = socket.offset, reorderBufferSize = socket.inBufferBytes - - # Rcv buffer and reorder buffer are sized that it is always possible to + + # Rcv buffer and reorder buffer are sized that it is always possible to # move data from reorder buffer to rcv buffer without overflow moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr packet.payload[0], reorderPacketPayloadLength) socket.offset = socket.offset + reorderPacketPayloadLength @@ -1456,7 +1455,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = let payloadLength = uint32(len(p.payload)) if (socket.inBufferBytes + payloadLength <= socket.socketConfig.maxSizeOfReorderBuffer and socket.inBufferBytes + uint32(socket.offset) + payloadLength <= socket.socketConfig.optRcvBuffer): - + debug "store packet in reorder buffer", packetBytes = payloadLength, packetSeqNr = p.header.seqNr, @@ -1471,11 +1470,11 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = socket.inBufferBytes = socket.inBufferBytes + payloadLength debug "added out of order packet to reorder buffer", reorderCount = socket.reorderCount - # we send ack packet, as we reoreder count is > 0, so the eack bitmask will be + # we send ack packet, as we reoreder count is > 0, so the eack bitmask will be # generated socket.sendAck() -proc processPacket*(socket: UtpSocket, p: Packet): Future[void] = +proc processPacket*(socket: UtpSocket, p: Packet): Future[void] = socket.eventQueue.put(SocketEvent(kind: NewPacket, packet: p)) template shiftBuffer(t, c: untyped) = @@ -1495,7 +1494,7 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult = if readReq.reader.finished(): return ReadCancelled - + if socket.atEof(): # buffer is already empty and we reached remote fin, just finish read with whatever # was already read @@ -1547,7 +1546,7 @@ proc eventLoop(socket: UtpSocket) {.async.} = case ev.kind of NewPacket: socket.processPacketInternal(ev.packet) - + # we processed a packet and rcv buffer size is larger than 0, # check if we can finish some pending readers while socket.pendingReads.len() > 0: @@ -1573,7 +1572,7 @@ proc eventLoop(socket: UtpSocket) {.async.} = # close should be last packet send break of Data: - # check if writing was not cancelled in the mean time. This approach + # check if writing was not cancelled in the mean time. This approach # can create partial writes as part of the data could be written with # with WriteReq if (not wr.writer.finished()): @@ -1624,7 +1623,7 @@ proc eventLoop(socket: UtpSocket) {.async.} = of ReadNotFinished: socket.pendingReads.addLast(readReq) else: - # in any other case we do not need to do any thing + # in any other case we do not need to do any thing discard socket.checkTimeouts() except CancelledError as exc: @@ -1665,8 +1664,7 @@ proc close*(socket: UtpSocket) = socket.readShutdown = true if (not socket.sendFinRequested): try: - info "Sending FIN", - to = socket.socketKey + debug "Sending FIN", dst = socket.socketKey # with this approach, all pending writes will be executed before sending fin packet # we could also and method which places close request as first one to process # but it would complicate the write loop @@ -1691,9 +1689,7 @@ proc closeWait*(socket: UtpSocket) {.async.} = await socket.closeEvent.wait() proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] = - info "Write data", - to = socket.socketKey, - length = len(data) + debug "Write data", dst = socket.socketKey, length = len(data) let retFuture = newFuture[WriteResult]("UtpSocket.write") @@ -1943,14 +1939,14 @@ proc getSocketConfig*(socket: UtpSocket): SocketConfig = proc startIncomingSocket*(socket: UtpSocket) = # Make sure ack was flushed before moving forward - socket.sendAck() + socket.sendAck() socket.startEventLoop() socket.startTimeoutLoop() proc startOutgoingSocket*(socket: UtpSocket): Future[void] = doAssert(socket.state == SynSent) let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize()) - debug "Sending SYN packet", + debug "Sending SYN packet", seqNr = packet.header.seqNr, connectionId = packet.header.connectionId # set number of transmissions to 1 as syn packet will be send just after