Adjust uTP log levels to be more user focused (#502)

Also cleanup several whitespace and line char limits
This commit is contained in:
Kim De Mey 2022-04-12 21:11:01 +02:00 committed by GitHub
parent 6d4b1f4fe1
commit 01684a2130
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 97 additions and 92 deletions

View File

@ -1,4 +1,4 @@
# Copyright (c) 2021 Status Research & Development GmbH # Copyright (c) 2021-2022 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -13,12 +13,12 @@ import
./utp_router, ./utp_router,
../keys ../keys
export utp_router, protocol, chronicles export utp_router, protocol
logScope: logScope:
topics = "utp_discv5_protocol" topics = "utp_discv5_protocol"
type type
NodeAddress* = object NodeAddress* = object
nodeId*: NodeId nodeId*: NodeId
address*: Address address*: Address
@ -27,10 +27,10 @@ type
prot: protocol.Protocol prot: protocol.Protocol
router: UtpRouter[NodeAddress] router: UtpRouter[NodeAddress]
proc init*(T: type NodeAddress, nodeId: NodeId, address: Address): NodeAddress = proc init*(T: type NodeAddress, nodeId: NodeId, address: Address): NodeAddress =
NodeAddress(nodeId: nodeId, address: address) NodeAddress(nodeId: nodeId, address: address)
proc init*(T: type NodeAddress, node: Node): Option[NodeAddress] = proc init*(T: type NodeAddress, node: Node): Option[NodeAddress] =
node.address.map((address: Address) => NodeAddress(nodeId: node.id, address: address)) node.address.map((address: Address) => NodeAddress(nodeId: node.id, address: address))
proc hash(x: NodeAddress): Hash = proc hash(x: NodeAddress): Hash =
@ -60,7 +60,7 @@ proc talkReqDirect(p: protocol.Protocol, n: NodeAddress, protocol, request: seq[
trace "Send message packet", dstId = n.nodeId, address = n.address, kind = MessageKind.talkreq trace "Send message packet", dstId = n.nodeId, address = n.address, kind = MessageKind.talkreq
p.send(n.address, data) p.send(n.address, data)
proc initSendCallback( proc initSendCallback(
t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[NodeAddress] = t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[NodeAddress] =
return ( return (
@ -77,7 +77,7 @@ proc initSendCallback(
proc messageHandler(protocol: TalkProtocol, request: seq[byte], proc messageHandler(protocol: TalkProtocol, request: seq[byte],
srcId: NodeId, srcUdpAddress: Address): seq[byte] = srcId: NodeId, srcUdpAddress: Address): seq[byte] =
let let
p = UtpDiscv5Protocol(protocol) p = UtpDiscv5Protocol(protocol)
nodeAddress = NodeAddress.init(srcId, srcUdpAddress) nodeAddress = NodeAddress.init(srcId, srcUdpAddress)
debug "Received utp payload from known node. Start processing", debug "Received utp payload from known node. Start processing",

View File

@ -1,4 +1,4 @@
# Copyright (c) 2021 Status Research & Development GmbH # Copyright (c) 2021-2022 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -13,6 +13,11 @@ import
./utp_socket, ./utp_socket,
./packets ./packets
export utp_socket
logScope:
topics = "utp_router"
declareCounter utp_received_packets, declareCounter utp_received_packets,
"All correct received uTP packets" "All correct received uTP packets"
declareCounter utp_failed_packets, declareCounter utp_failed_packets,
@ -28,11 +33,6 @@ declareCounter utp_success_outgoing,
declareCounter utp_failed_outgoing, declareCounter utp_failed_outgoing,
"Total number of failed outgoing connections" "Total number of failed outgoing connections"
logScope:
topics = "utp_router"
export utp_socket
type type
# New remote client connection callback # New remote client connection callback
# ``server`` - UtpProtocol object. # ``server`` - UtpProtocol object.
@ -132,7 +132,8 @@ proc new*[A](
# There are different possibilities on how the connection got established, need # There are different possibilities on how the connection got established, need
# to check every case. # to check every case.
proc getSocketOnReset[A](r: UtpRouter[A], sender: A, id: uint16): Option[UtpSocket[A]] = proc getSocketOnReset[A](
r: UtpRouter[A], sender: A, id: uint16): Option[UtpSocket[A]] =
# id is our recv id # id is our recv id
let recvKey = UtpSocketKey[A].init(sender, id) let recvKey = UtpSocketKey[A].init(sender, id)
@ -146,7 +147,8 @@ proc getSocketOnReset[A](r: UtpRouter[A], sender: A, id: uint16): Option[UtpSock
.orElse(r.getUtpSocket(sendInitKey).filter(s => s.connectionIdSnd == id)) .orElse(r.getUtpSocket(sendInitKey).filter(s => s.connectionIdSnd == id))
.orElse(r.getUtpSocket(sendNoInitKey).filter(s => s.connectionIdSnd == id)) .orElse(r.getUtpSocket(sendNoInitKey).filter(s => s.connectionIdSnd == id))
proc shouldAllowConnection[A](r: UtpRouter[A], remoteAddress: A, connectionId: uint16): bool = proc shouldAllowConnection[A](
r: UtpRouter[A], remoteAddress: A, connectionId: uint16): bool =
if r.allowConnection == nil: if r.allowConnection == nil:
# if the callback is not configured it means all incoming connections are allowed # if the callback is not configured it means all incoming connections are allowed
true true
@ -183,14 +185,15 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
debug "Received SYN for new connection. Initiating incoming connection", debug "Received SYN for new connection. Initiating incoming connection",
synSeqNr = p.header.seqNr synSeqNr = p.header.seqNr
# Initial ackNr is set to incoming packer seqNr # Initial ackNr is set to incoming packer seqNr
let incomingSocket = newIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[]) let incomingSocket = newIncomingSocket[A](
sender, r.sendCb, r.socketConfig,
p.header.connectionId, p.header.seqNr, r.rng[])
r.registerUtpSocket(incomingSocket) r.registerUtpSocket(incomingSocket)
incomingSocket.startIncomingSocket() incomingSocket.startIncomingSocket()
# Based on configuration, socket is passed to upper layer either in SynRecv # Based on configuration, socket is passed to upper layer either in
# or Connected state # SynRecv or Connected state
utp_allowed_incoming.inc() utp_allowed_incoming.inc()
info "Accepting incoming connection", debug "Accepting incoming connection", src = incomingSocket.socketKey
to = incomingSocket.socketKey
asyncSpawn r.acceptConnection(r, incomingSocket) asyncSpawn r.acceptConnection(r, incomingSocket)
else: else:
utp_declined_incoming.inc() utp_declined_incoming.inc()
@ -203,13 +206,15 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
let socket = maybeSocket.unsafeGet() let socket = maybeSocket.unsafeGet()
await socket.processPacket(p) await socket.processPacket(p)
else: else:
# TODO add keeping track of recently send reset packets and do not send reset # TODO add keeping track of recently send reset packets and do not send
# to peers which we recently send reset to. # reset to peers which we recently send reset to.
debug "Received FIN/DATA/ACK on not known socket sending reset" debug "Received FIN/DATA/ACK on not known socket sending reset"
let rstPacket = resetPacket(randUint16(r.rng[]), p.header.connectionId, p.header.seqNr) let rstPacket = resetPacket(
randUint16(r.rng[]), p.header.connectionId, p.header.seqNr)
await r.sendCb(sender, encodePacket(rstPacket)) await r.sendCb(sender, encodePacket(rstPacket))
proc processIncomingBytes*[A](r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} = proc processIncomingBytes*[A](
r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} =
if (not r.closed): if (not r.closed):
let dec = decodePacket(bytes) let dec = decodePacket(bytes)
if (dec.isOk()): if (dec.isOk()):
@ -218,15 +223,17 @@ proc processIncomingBytes*[A](r: UtpRouter[A], bytes: seq[byte], sender: A) {.as
else: else:
utp_failed_packets.inc() utp_failed_packets.inc()
let err = dec.error() let err = dec.error()
warn "failed to decode packet from address", address = sender, msg = err warn "Failed to decode packet from address", address = sender, msg = err
proc generateNewUniqueSocket[A](r: UtpRouter[A], address: A): Option[UtpSocket[A]] = proc generateNewUniqueSocket[A](
## Tries to generate unique socket, gives up after maxSocketGenerationTries tries r: UtpRouter[A], address: A):Option[UtpSocket[A]] =
## Try to generate unique socket, give up after maxSocketGenerationTries tries
var tryCount = 0 var tryCount = 0
while tryCount < maxSocketGenerationTries: while tryCount < maxSocketGenerationTries:
let rcvId = randUint16(r.rng[]) let rcvId = randUint16(r.rng[])
let socket = newOutgoingSocket[A](address, r.sendCb, r.socketConfig, rcvId, r.rng[]) let socket = newOutgoingSocket[A](
address, r.sendCb, r.socketConfig, rcvId, r.rng[])
if r.registerIfAbsent(socket): if r.registerIfAbsent(socket):
return some(socket) return some(socket)
@ -236,32 +243,31 @@ proc generateNewUniqueSocket[A](r: UtpRouter[A], address: A): Option[UtpSocket[A
return none[UtpSocket[A]]() return none[UtpSocket[A]]()
proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}= proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}=
info "Initiating connection", debug "Initiating connection", dst = s.socketKey
to = s.socketKey
let startFut = s.startOutgoingSocket() let startFut = s.startOutgoingSocket()
startFut.cancelCallback = proc(udata: pointer) {.gcsafe.} = startFut.cancelCallback = proc(udata: pointer) {.gcsafe.} =
# if for some reason future will be cancelled, destory socket to clear it from # if for some reason the future is cancelled, destroy socket to clear it
# active socket list # from the active socket list
s.destroy() s.destroy()
try: try:
await startFut await startFut
utp_success_outgoing.inc() utp_success_outgoing.inc()
info "Outgoing connection successful", debug "Outgoing connection successful", dst = s.socketKey
to = s.socketKey
return ok(s) return ok(s)
except ConnectionError: except ConnectionError:
utp_failed_outgoing.inc() utp_failed_outgoing.inc()
info "Outgoing connection timed-out", debug "Outgoing connection timed-out", dst = s.socketKey
to = s.socketKey
s.destroy() s.destroy()
return err(OutgoingConnectionError(kind: ConnectionTimedOut)) return err(OutgoingConnectionError(kind: ConnectionTimedOut))
# Connect to provided address # Connect to provided address
# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 # Reference implementation:
proc connectTo*[A](r: UtpRouter[A], address: A): Future[ConnectionResult[A]] {.async.} = # https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732
proc connectTo*[A](
r: UtpRouter[A], address: A): Future[ConnectionResult[A]] {.async.} =
let maybeSocket = r.generateNewUniqueSocket(address) let maybeSocket = r.generateNewUniqueSocket(address)
if (maybeSocket.isNone()): if (maybeSocket.isNone()):
@ -272,8 +278,11 @@ proc connectTo*[A](r: UtpRouter[A], address: A): Future[ConnectionResult[A]] {.a
# Connect to provided address with provided connection id, if socket with this id # Connect to provided address with provided connection id, if socket with this id
# and address already exsits return error # and address already exsits return error
proc connectTo*[A](r: UtpRouter[A], address: A, connectionId: uint16): Future[ConnectionResult[A]] {.async.} = proc connectTo*[A](
let socket = newOutgoingSocket[A](address, r.sendCb, r.socketConfig, connectionId, r.rng[]) r: UtpRouter[A], address: A, connectionId: uint16):
Future[ConnectionResult[A]] {.async.} =
let socket = newOutgoingSocket[A](
address, r.sendCb, r.socketConfig, connectionId, r.rng[])
if (r.registerIfAbsent(socket)): if (r.registerIfAbsent(socket)):
return await socket.connect() return await socket.connect()
@ -282,7 +291,7 @@ proc connectTo*[A](r: UtpRouter[A], address: A, connectionId: uint16): Future[Co
proc shutdown*[A](r: UtpRouter[A]) = proc shutdown*[A](r: UtpRouter[A]) =
# stop processing any new packets and close all sockets in background without # stop processing any new packets and close all sockets in background without
# notifing remote peers # notifying remote peers
r.closed = true r.closed = true
for s in r.allSockets(): for s in r.allSockets():
s.destroy() s.destroy()
@ -290,12 +299,12 @@ proc shutdown*[A](r: UtpRouter[A]) =
proc shutdownWait*[A](r: UtpRouter[A]) {.async.} = proc shutdownWait*[A](r: UtpRouter[A]) {.async.} =
var activeSockets: seq[UtpSocket[A]] = @[] var activeSockets: seq[UtpSocket[A]] = @[]
# stop processing any new packets and close all sockets without # stop processing any new packets and close all sockets without
# notifing remote peers # notifying remote peers
r.closed = true r.closed = true
# we need to make copy as calling socket.destroyWait() removes socket from the table # Need to make a copy as calling socket.destroyWait() removes the socket from
# and iterator throws error. Antother option would be to wait until number of opensockets # the table and iterator throws error. Another option would be to wait until
# go to 0 # the number of open sockets drops to 0
for s in r.allSockets(): for s in r.allSockets():
activeSockets.add(s) activeSockets.add(s)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2021 Status Research & Development GmbH # Copyright (c) 2021-2022 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -203,7 +203,7 @@ type
# current size of rcv buffer # current size of rcv buffer
offset: int offset: int
# readers waiting for data # readers waiting for data
pendingReads: Deque[ReadReq] pendingReads: Deque[ReadReq]
@ -304,11 +304,11 @@ type
ConnectionResult*[A] = Result[UtpSocket[A], OutgoingConnectionError] ConnectionResult*[A] = Result[UtpSocket[A], OutgoingConnectionError]
const const
# Default maximum size of the data packet payload. With such configuration # Default maximum size of the data packet payload. With such configuration
# data packets will have 508 bytes (488 + 20 header). # data packets will have 508 bytes (488 + 20 header).
# 508 bytes of udp payload can translate into 576 bytes udp packet i.e # 508 bytes of udp payload can translate into 576 bytes udp packet i.e
# 508bytes paylaod + 60bytes (max ip header) + 8bytes (udp header) = 576bytes. # 508bytes paylaod + 60bytes (max ip header) + 8bytes (udp header) = 576bytes.
# 576bytes is defined as minimum reassembly buffer size i.e # 576bytes is defined as minimum reassembly buffer size i.e
# the minimum datagram size that we are guaranteed any implementation must support. # the minimum datagram size that we are guaranteed any implementation must support.
# from RFC791: All hosts must be prepared # from RFC791: All hosts must be prepared
# to accept datagrams of up to 576 octets (whether they arrive whole # to accept datagrams of up to 576 octets (whether they arrive whole
@ -346,7 +346,7 @@ const
allowedAckWindow*: uint16 = 3 allowedAckWindow*: uint16 = 3
# Timeout after which the send window will be reset to its minimal value after it dropped # Timeout after which the send window will be reset to its minimal value after it dropped
# lower than our current packet size. i.e when we received a packet # lower than our current packet size. i.e when we received a packet
# from remote peer with `wndSize` set to number <= current packet size # from remote peer with `wndSize` set to number <= current packet size
defaultResetWindowTimeout = seconds(15) defaultResetWindowTimeout = seconds(15)
@ -357,7 +357,7 @@ const
# minimal time before subseqent window decays # minimal time before subseqent window decays
maxWindowDecay = milliseconds(100) maxWindowDecay = milliseconds(100)
# Maximal size of reorder buffer as fraction of optRcvBuffer size following # Maximal size of reorder buffer as fraction of optRcvBuffer size following
# semantics apply bases on rcvBuffer set to 1000 bytes: # semantics apply bases on rcvBuffer set to 1000 bytes:
# if there are already 1000 bytes in rcv buffer no more bytes will be accepted to reorder buffer # if there are already 1000 bytes in rcv buffer no more bytes will be accepted to reorder buffer
# if there are already 500 bytes in reoreder buffer, no more bytes will be accepted # if there are already 500 bytes in reoreder buffer, no more bytes will be accepted
@ -411,7 +411,7 @@ proc init*(
) )
# number of bytes which will fit in current send window # number of bytes which will fit in current send window
proc freeWindowBytes(socket: UtpSocket): uint32 = proc freeWindowBytes(socket: UtpSocket): uint32 =
let maxSend = min(socket.maxRemoteWindow, socket.maxWindow) let maxSend = min(socket.maxRemoteWindow, socket.maxWindow)
if (maxSend <= socket.currentWindow): if (maxSend <= socket.currentWindow):
return 0 return 0
@ -439,7 +439,7 @@ proc sendData(socket: UtpSocket, data: seq[byte]) =
if f.failed: if f.failed:
warn "UTP send failed", msg = f.readError.msg warn "UTP send failed", msg = f.readError.msg
proc sendPacket(socket: UtpSocket, seqNr: uint16) = proc sendPacket(socket: UtpSocket, seqNr: uint16) =
proc setSend(p: var OutgoingPacket): seq[byte] = proc setSend(p: var OutgoingPacket): seq[byte] =
let timestampInfo = getMonoTimestamp() let timestampInfo = getMonoTimestamp()
@ -454,9 +454,9 @@ proc sendPacket(socket: UtpSocket, seqNr: uint16) =
modifyTimeStampAndAckNr(p.packetBytes, timestampInfo.timestamp, socket.ackNr) modifyTimeStampAndAckNr(p.packetBytes, timestampInfo.timestamp, socket.ackNr)
return p.packetBytes return p.packetBytes
socket.sendData(setSend(socket.outBuffer[seqNr])) socket.sendData(setSend(socket.outBuffer[seqNr]))
proc resetSendTimeout(socket: UtpSocket) = proc resetSendTimeout(socket: UtpSocket) =
socket.retransmitTimeout = socket.rto socket.retransmitTimeout = socket.rto
socket.rtoTimeout = getMonoTimestamp().moment + socket.retransmitTimeout socket.rtoTimeout = getMonoTimestamp().moment + socket.retransmitTimeout
@ -529,7 +529,7 @@ proc checkTimeouts(socket: UtpSocket) =
debug "Reset remote window to minimal value", debug "Reset remote window to minimal value",
minRemote = minimalRemoteWindow minRemote = minimalRemoteWindow
socket.zeroWindowTimer = none[Moment]() socket.zeroWindowTimer = none[Moment]()
if (currentTime > socket.rtoTimeout): if (currentTime > socket.rtoTimeout):
debug "CheckTimeouts rto timeout", debug "CheckTimeouts rto timeout",
socketKey = socket.socketKey, socketKey = socket.socketKey,
@ -551,7 +551,7 @@ proc checkTimeouts(socket: UtpSocket) =
if socket.shouldDisconnectFromFailedRemote(): if socket.shouldDisconnectFromFailedRemote():
debug "Remote host failed", debug "Remote host failed",
state = socket.state, state = socket.state,
retransmitCount = socket.retransmitCount retransmitCount = socket.retransmitCount
if socket.state == SynSent and (not socket.connectionFuture.finished()): if socket.state == SynSent and (not socket.connectionFuture.finished()):
socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out")) socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out"))
@ -577,7 +577,7 @@ proc checkTimeouts(socket: UtpSocket) =
oldMaxWindow = oldMaxWindow, oldMaxWindow = oldMaxWindow,
newMaxWindow = newMaxWindow newMaxWindow = newMaxWindow
socket.maxWindow = newMaxWindow socket.maxWindow = newMaxWindow
elif (socket.maxWindow < currentPacketSize): elif (socket.maxWindow < currentPacketSize):
# due to high delay window has shrunk below packet size # due to high delay window has shrunk below packet size
# which means that we cannot send more data # which means that we cannot send more data
@ -601,7 +601,7 @@ proc checkTimeouts(socket: UtpSocket) =
if (socket.curWindowPackets > 0 and socket.outBuffer[oldestPacketSeqNr].transmissions > 0): if (socket.curWindowPackets > 0 and socket.outBuffer[oldestPacketSeqNr].transmissions > 0):
inc socket.retransmitCount inc socket.retransmitCount
socket.fastTimeout = true socket.fastTimeout = true
debug "Resending oldest packet", debug "Resending oldest packet",
pkSeqNr = oldestPacketSeqNr, pkSeqNr = oldestPacketSeqNr,
retransmitCount = socket.retransmitCount, retransmitCount = socket.retransmitCount,
@ -610,7 +610,7 @@ proc checkTimeouts(socket: UtpSocket) =
# Oldest packet should always be present, so it is safe to call force # Oldest packet should always be present, so it is safe to call force
# resend # resend
socket.sendPacket(oldestPacketSeqNr) socket.sendPacket(oldestPacketSeqNr)
# TODO add sending keep alives when necessary # TODO add sending keep alives when necessary
proc checkTimeoutsLoop(s: UtpSocket) {.async.} = proc checkTimeoutsLoop(s: UtpSocket) {.async.} =
@ -692,12 +692,11 @@ proc isClosed*(socket: UtpSocket): bool =
socket.state == Destroy and socket.closeEvent.isSet() socket.state == Destroy and socket.closeEvent.isSet()
proc isClosedAndCleanedUpAllResources*(socket: UtpSocket): bool = proc isClosedAndCleanedUpAllResources*(socket: UtpSocket): bool =
## Test Api to check that all resources are properly cleaned up ## Test Api to check that all resources are properly cleaned up
socket.isClosed() and socket.eventLoop.cancelled() and socket.checkTimeoutsLoop.cancelled() socket.isClosed() and socket.eventLoop.cancelled() and socket.checkTimeoutsLoop.cancelled()
proc destroy*(s: UtpSocket) = proc destroy*(s: UtpSocket) =
info "Destroying socket", debug "Destroying socket", to = s.socketKey
to = s.socketKey
## Moves socket to destroy state and clean all reasources. ## Moves socket to destroy state and clean all reasources.
## Remote is not notified in any way about socket end of life ## Remote is not notified in any way about socket end of life
s.state = Destroy s.state = Destroy
@ -896,7 +895,7 @@ proc tryDecayWindow(socket: UtpSocket, now: Moment) =
if (now - socket.lastWindowDecay >= maxWindowDecay): if (now - socket.lastWindowDecay >= maxWindowDecay):
socket.lastWindowDecay = now socket.lastWindowDecay = now
let newMaxWindow = max(uint32(0.5 * float64(socket.maxWindow)), uint32(minWindowSize)) let newMaxWindow = max(uint32(0.5 * float64(socket.maxWindow)), uint32(minWindowSize))
debug "Decaying maxWindow", debug "Decaying maxWindow",
oldWindow = socket.maxWindow, oldWindow = socket.maxWindow,
newWindow = newMaxWindow newWindow = newMaxWindow
@ -904,7 +903,7 @@ proc tryDecayWindow(socket: UtpSocket, now: Moment) =
socket.maxWindow = newMaxWindow socket.maxWindow = newMaxWindow
socket.slowStart = false socket.slowStart = false
socket.slowStartTreshold = newMaxWindow socket.slowStartTreshold = newMaxWindow
# ack packets (removes them from out going buffer) based on selective ack extension header # ack packets (removes them from out going buffer) based on selective ack extension header
proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension, currentTime: Moment): void = proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension, currentTime: Moment): void =
# we add 2, as the first bit in the mask therefore represents ackNr + 2 becouse # we add 2, as the first bit in the mask therefore represents ackNr + 2 becouse
@ -930,7 +929,7 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
if (socket.seqNr - v - 1) >= socket.curWindowPackets - 1: if (socket.seqNr - v - 1) >= socket.curWindowPackets - 1:
dec bits dec bits
continue continue
let bitSet: bool = getBit(ext.acks, bits) let bitSet: bool = getBit(ext.acks, bits)
if bitSet: if bitSet:
@ -950,7 +949,7 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
discard socket.ackPacket(v, currentTime) discard socket.ackPacket(v, currentTime)
dec bits dec bits
continue continue
if counter >= duplicateAcksBeforeResend and (v - socket.fastResendSeqNr) <= reorderBufferMaxSize: if counter >= duplicateAcksBeforeResend and (v - socket.fastResendSeqNr) <= reorderBufferMaxSize:
debug "No ack for packet", debug "No ack for packet",
pkAckNr = v, pkAckNr = v,
@ -982,7 +981,7 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
# packet is no longer in send buffer ignore whole further processing # packet is no longer in send buffer ignore whole further processing
dec i dec i
continue continue
registerLoss = true registerLoss = true
# it is safe to call as we already checked that packet is in send buffer # it is safe to call as we already checked that packet is in send buffer
@ -1043,7 +1042,7 @@ proc generateAckPacket*(socket: UtpSocket): Packet =
bitmask bitmask
) )
proc sendAck(socket: UtpSocket) = proc sendAck(socket: UtpSocket) =
## Creates and sends ack, based on current socket state. Acks are different from ## Creates and sends ack, based on current socket state. Acks are different from
## other packets as we do not track them in outgoing buffet ## other packets as we do not track them in outgoing buffet
@ -1138,8 +1137,8 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
# the fast-resend on duplicate-ack logic for bi-directional connections # the fast-resend on duplicate-ack logic for bi-directional connections
# (except in the case of a selective ACK). This is in line with BSD4.4 TCP # (except in the case of a selective ACK). This is in line with BSD4.4 TCP
# implementation. # implementation.
if socket.curWindowPackets > 0 and if socket.curWindowPackets > 0 and
pkAckNr == socket.seqNr - socket.curWindowPackets - 1 and pkAckNr == socket.seqNr - socket.curWindowPackets - 1 and
p.header.pType == ST_STATE: p.header.pType == ST_STATE:
inc socket.duplicateAck inc socket.duplicateAck
@ -1163,7 +1162,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
# if `pastExpected` is really big number (for example: uint16.high) then most # if `pastExpected` is really big number (for example: uint16.high) then most
# probably we are receiving packet which we already received # probably we are receiving packet which we already received
# example: we already received packet with `seqNr = 10` so our `socket.ackNr = 10` # example: we already received packet with `seqNr = 10` so our `socket.ackNr = 10`
# if we receive this packet once again then `pastExpected = 10 - 10 - 1` which # if we receive this packet once again then `pastExpected = 10 - 10 - 1` which
# equals (due to wrapping) 65535 # equals (due to wrapping) 65535
# this means that remote most probably did not receive our ack, so we need to resend # this means that remote most probably did not receive our ack, so we need to resend
# it. We are doing it for last `reorderBufferMaxSize` packets # it. We are doing it for last `reorderBufferMaxSize` packets
@ -1180,7 +1179,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
debug "Bytes acked by classic ack", debug "Bytes acked by classic ack",
bytesAcked = ackedBytes bytesAcked = ackedBytes
if (p.eack.isSome()): if (p.eack.isSome()):
let selectiveAckedBytes = socket.calculateSelectiveAckBytes(pkAckNr, p.eack.unsafeGet()) let selectiveAckedBytes = socket.calculateSelectiveAckBytes(pkAckNr, p.eack.unsafeGet())
debug "Bytes acked by selective ack", debug "Bytes acked by selective ack",
@ -1310,11 +1309,11 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
pkSeqNr = oldestOutstandingPktSeqNr pkSeqNr = oldestOutstandingPktSeqNr
inc socket.fastResendSeqNr inc socket.fastResendSeqNr
# Is is safe to call force resend as we already checked shouldReSendPacket # Is is safe to call force resend as we already checked shouldReSendPacket
# condition # condition
socket.sendPacket(oldestOutstandingPktSeqNr) socket.sendPacket(oldestOutstandingPktSeqNr)
if (p.eack.isSome()): if (p.eack.isSome()):
socket.selectiveAckPackets(pkAckNr, p.eack.unsafeGet(), timestampInfo.moment) socket.selectiveAckPackets(pkAckNr, p.eack.unsafeGet(), timestampInfo.moment)
@ -1360,7 +1359,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
# await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) # await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len())
moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr p.payload[0], payloadLength) moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr p.payload[0], payloadLength)
socket.offset = socket.offset + payloadLength socket.offset = socket.offset + payloadLength
# Bytes have been passed to upper layer, we can increase number of last # Bytes have been passed to upper layer, we can increase number of last
# acked packet # acked packet
inc socket.ackNr inc socket.ackNr
@ -1408,8 +1407,8 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
socektAckNr = socket.ackNr, socektAckNr = socket.ackNr,
rcvbufferSize = socket.offset, rcvbufferSize = socket.offset,
reorderBufferSize = socket.inBufferBytes reorderBufferSize = socket.inBufferBytes
# Rcv buffer and reorder buffer are sized that it is always possible to # Rcv buffer and reorder buffer are sized that it is always possible to
# move data from reorder buffer to rcv buffer without overflow # move data from reorder buffer to rcv buffer without overflow
moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr packet.payload[0], reorderPacketPayloadLength) moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr packet.payload[0], reorderPacketPayloadLength)
socket.offset = socket.offset + reorderPacketPayloadLength socket.offset = socket.offset + reorderPacketPayloadLength
@ -1456,7 +1455,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
let payloadLength = uint32(len(p.payload)) let payloadLength = uint32(len(p.payload))
if (socket.inBufferBytes + payloadLength <= socket.socketConfig.maxSizeOfReorderBuffer and if (socket.inBufferBytes + payloadLength <= socket.socketConfig.maxSizeOfReorderBuffer and
socket.inBufferBytes + uint32(socket.offset) + payloadLength <= socket.socketConfig.optRcvBuffer): socket.inBufferBytes + uint32(socket.offset) + payloadLength <= socket.socketConfig.optRcvBuffer):
debug "store packet in reorder buffer", debug "store packet in reorder buffer",
packetBytes = payloadLength, packetBytes = payloadLength,
packetSeqNr = p.header.seqNr, packetSeqNr = p.header.seqNr,
@ -1471,11 +1470,11 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
socket.inBufferBytes = socket.inBufferBytes + payloadLength socket.inBufferBytes = socket.inBufferBytes + payloadLength
debug "added out of order packet to reorder buffer", debug "added out of order packet to reorder buffer",
reorderCount = socket.reorderCount reorderCount = socket.reorderCount
# we send ack packet, as we reoreder count is > 0, so the eack bitmask will be # we send ack packet, as we reoreder count is > 0, so the eack bitmask will be
# generated # generated
socket.sendAck() socket.sendAck()
proc processPacket*(socket: UtpSocket, p: Packet): Future[void] = proc processPacket*(socket: UtpSocket, p: Packet): Future[void] =
socket.eventQueue.put(SocketEvent(kind: NewPacket, packet: p)) socket.eventQueue.put(SocketEvent(kind: NewPacket, packet: p))
template shiftBuffer(t, c: untyped) = template shiftBuffer(t, c: untyped) =
@ -1495,7 +1494,7 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
if readReq.reader.finished(): if readReq.reader.finished():
return ReadCancelled return ReadCancelled
if socket.atEof(): if socket.atEof():
# buffer is already empty and we reached remote fin, just finish read with whatever # buffer is already empty and we reached remote fin, just finish read with whatever
# was already read # was already read
@ -1547,7 +1546,7 @@ proc eventLoop(socket: UtpSocket) {.async.} =
case ev.kind case ev.kind
of NewPacket: of NewPacket:
socket.processPacketInternal(ev.packet) socket.processPacketInternal(ev.packet)
# we processed a packet and rcv buffer size is larger than 0, # we processed a packet and rcv buffer size is larger than 0,
# check if we can finish some pending readers # check if we can finish some pending readers
while socket.pendingReads.len() > 0: while socket.pendingReads.len() > 0:
@ -1573,7 +1572,7 @@ proc eventLoop(socket: UtpSocket) {.async.} =
# close should be last packet send # close should be last packet send
break break
of Data: of Data:
# check if writing was not cancelled in the mean time. This approach # check if writing was not cancelled in the mean time. This approach
# can create partial writes as part of the data could be written with # can create partial writes as part of the data could be written with
# with WriteReq # with WriteReq
if (not wr.writer.finished()): if (not wr.writer.finished()):
@ -1624,7 +1623,7 @@ proc eventLoop(socket: UtpSocket) {.async.} =
of ReadNotFinished: of ReadNotFinished:
socket.pendingReads.addLast(readReq) socket.pendingReads.addLast(readReq)
else: else:
# in any other case we do not need to do any thing # in any other case we do not need to do any thing
discard discard
socket.checkTimeouts() socket.checkTimeouts()
except CancelledError as exc: except CancelledError as exc:
@ -1665,8 +1664,7 @@ proc close*(socket: UtpSocket) =
socket.readShutdown = true socket.readShutdown = true
if (not socket.sendFinRequested): if (not socket.sendFinRequested):
try: try:
info "Sending FIN", debug "Sending FIN", dst = socket.socketKey
to = socket.socketKey
# with this approach, all pending writes will be executed before sending fin packet # with this approach, all pending writes will be executed before sending fin packet
# we could also and method which places close request as first one to process # we could also and method which places close request as first one to process
# but it would complicate the write loop # but it would complicate the write loop
@ -1691,9 +1689,7 @@ proc closeWait*(socket: UtpSocket) {.async.} =
await socket.closeEvent.wait() await socket.closeEvent.wait()
proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] = proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] =
info "Write data", debug "Write data", dst = socket.socketKey, length = len(data)
to = socket.socketKey,
length = len(data)
let retFuture = newFuture[WriteResult]("UtpSocket.write") let retFuture = newFuture[WriteResult]("UtpSocket.write")
@ -1943,14 +1939,14 @@ proc getSocketConfig*(socket: UtpSocket): SocketConfig =
proc startIncomingSocket*(socket: UtpSocket) = proc startIncomingSocket*(socket: UtpSocket) =
# Make sure ack was flushed before moving forward # Make sure ack was flushed before moving forward
socket.sendAck() socket.sendAck()
socket.startEventLoop() socket.startEventLoop()
socket.startTimeoutLoop() socket.startTimeoutLoop()
proc startOutgoingSocket*(socket: UtpSocket): Future[void] = proc startOutgoingSocket*(socket: UtpSocket): Future[void] =
doAssert(socket.state == SynSent) doAssert(socket.state == SynSent)
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize()) let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize())
debug "Sending SYN packet", debug "Sending SYN packet",
seqNr = packet.header.seqNr, seqNr = packet.header.seqNr,
connectionId = packet.header.connectionId connectionId = packet.header.connectionId
# set number of transmissions to 1 as syn packet will be send just after # set number of transmissions to 1 as syn packet will be send just after