uTP clarification on SYN-ACK seqnr and some cleanup (#596)

- Clarify why on SYN-ACK seqNr gets -1
- Cleanup comments
- Style fixes
- Reuse WriteResult
This commit is contained in:
Kim De Mey 2023-04-04 17:11:36 +02:00 committed by GitHub
parent 4754543605
commit ea3c164a00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 141 additions and 115 deletions

View File

@ -1,4 +1,4 @@
# Copyright (c) 2021-2022 Status Research & Development GmbH # Copyright (c) 2021-2023 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -97,9 +97,10 @@ proc registerUtpSocket[A](p: UtpRouter, s: UtpSocket[A]) =
## Register socket, overwriting already existing one ## Register socket, overwriting already existing one
p.sockets[s.socketKey] = s p.sockets[s.socketKey] = s
utp_established_connections.set(int64(len(p.sockets))) utp_established_connections.set(int64(len(p.sockets)))
debug "Registered new utp socket", dst = s.socketKey, lenSockets = len(p.sockets) debug "Registered new uTP socket",
# Install deregister handler, so when socket gets closed, in will be promptly dst = s.socketKey, totalSockets = len(p.sockets)
# removed from open sockets table # Install deregister handler so that when the socket gets closed, it gets
# removed from open sockets table.
s.registerCloseCallback(proc () = p.deRegisterUtpSocket(s)) s.registerCloseCallback(proc () = p.deRegisterUtpSocket(s))
proc registerIfAbsent[A](p: UtpRouter, s: UtpSocket[A]): bool = proc registerIfAbsent[A](p: UtpRouter, s: UtpSocket[A]): bool =
@ -144,7 +145,9 @@ proc new*[A](
rng = newRng()): UtpRouter[A] = rng = newRng()): UtpRouter[A] =
doAssert(not(isNil(acceptConnectionCb))) doAssert(not(isNil(acceptConnectionCb)))
GC_ref(udata) GC_ref(udata)
UtpRouter[A].new(acceptConnectionCb, allowConnectionCb, cast[pointer](udata), socketConfig, rng) UtpRouter[A].new(
acceptConnectionCb, allowConnectionCb,
cast[pointer](udata), socketConfig, rng)
proc new*[A]( proc new*[A](
T: type UtpRouter[A], T: type UtpRouter[A],
@ -168,7 +171,8 @@ proc getSocketOnReset[A](
# id is our send id, and we did initiate the connection, our recv id is id - 1 # id is our send id, and we did initiate the connection, our recv id is id - 1
let sendInitKey = UtpSocketKey[A].init(sender, id - 1) let sendInitKey = UtpSocketKey[A].init(sender, id - 1)
# id is our send id, and we did not initiate the connection, so our recv id is id + 1 # id is our send id, and we did not initiate the connection,
# our recv id is id + 1
let sendNoInitKey = UtpSocketKey[A].init(sender, id + 1) let sendNoInitKey = UtpSocketKey[A].init(sender, id + 1)
r.getUtpSocket(recvKey) r.getUtpSocket(recvKey)
@ -178,7 +182,7 @@ proc getSocketOnReset[A](
proc shouldAllowConnection[A]( proc shouldAllowConnection[A](
r: UtpRouter[A], remoteAddress: A, connectionId: uint16): bool = r: UtpRouter[A], remoteAddress: A, connectionId: uint16): bool =
if r.allowConnection == nil: if r.allowConnection == nil:
# if the callback is not configured it means all incoming connections are allowed # if the callback is not configured all incoming connections are allowed
true true
else: else:
r.allowConnection(r, remoteAddress, connectionId) r.allowConnection(r, remoteAddress, connectionId)
@ -194,16 +198,17 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
if maybeSocket.isSome(): if maybeSocket.isSome():
debug "Received RST packet on known connection, closing socket" debug "Received RST packet on known connection, closing socket"
let socket = maybeSocket.unsafeGet() let socket = maybeSocket.unsafeGet()
# reference implementation actually changes the socket state to reset state unless # The reference implementation actually changes the socket state to reset
# user explicitly closed socket before. The only difference between reset and destroy # state unless the user explicitly closed the socket before. The only
# state is that socket in destroy state is ultimately deleted from active connection # difference between the reset and the destroy state is that a socket in
# list but socket in reset state lingers there until user of library closes it # the destroy state is ultimately deleted from active connection list but
# explicitly. # a socket in reset state lingers there until the user of library closes
# it explicitly.
socket.destroy() socket.destroy()
else: else:
debug "Received RST packet for unknown connection, ignoring" debug "Received RST packet for unknown connection, ignoring"
of ST_SYN: of ST_SYN:
# Syn packet are special, and we need to add 1 to header connectionId # SYN packets are special and need an addition of 1 to header connectionId
let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId + 1) let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId + 1)
let maybeSocket = r.getUtpSocket(socketKey) let maybeSocket = r.getUtpSocket(socketKey)
if (maybeSocket.isSome()): if (maybeSocket.isSome()):
@ -220,7 +225,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
if (r.shouldAllowConnection(sender, p.header.connectionId)): if (r.shouldAllowConnection(sender, p.header.connectionId)):
debug "Received SYN for new connection. Initiating incoming connection", debug "Received SYN for new connection. Initiating incoming connection",
synSeqNr = p.header.seqNr synSeqNr = p.header.seqNr
# Initial ackNr is set to incoming packer seqNr # Initial ackNr is set to incoming packet seqNr
let incomingSocket = newIncomingSocket[A]( let incomingSocket = newIncomingSocket[A](
sender, r.sendCb, r.socketConfig, sender, r.sendCb, r.socketConfig,
p.header.connectionId, p.header.seqNr, r.rng[]) p.header.connectionId, p.header.seqNr, r.rng[])
@ -242,7 +247,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
let socket = maybeSocket.unsafeGet() let socket = maybeSocket.unsafeGet()
await socket.processPacket(p) await socket.processPacket(p)
else: else:
# TODO add keeping track of recently send reset packets and do not send # TODO: add keeping track of recently send reset packets and do not send
# reset to peers which we recently send reset to. # reset to peers which we recently send reset to.
debug "Received FIN/DATA/ACK on not known socket sending reset" debug "Received FIN/DATA/ACK on not known socket sending reset"
let rstPacket = resetPacket( let rstPacket = resetPacket(
@ -334,8 +339,8 @@ proc connectTo*[A](
let connFut = socket.connect() let connFut = socket.connect()
return connFut return connFut
# Connect to provided address with provided connection id, if socket with this id # Connect to provided address with provided connection id. If the socket with
# and address already exists return error # this id and address already exists, return error
proc connectTo*[A]( proc connectTo*[A](
r: UtpRouter[A], address: A, connectionId: uint16): r: UtpRouter[A], address: A, connectionId: uint16):
Future[ConnectionResult[A]] = Future[ConnectionResult[A]] =

View File

@ -1,4 +1,4 @@
# Copyright (c) 2021-2022 Status Research & Development GmbH # Copyright (c) 2021-2023 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -48,16 +48,18 @@ type
PacketAcked, PacketAlreadyAcked, PacketNotSentYet PacketAcked, PacketAlreadyAcked, PacketNotSentYet
# Socket callback to send data to remote peer # Socket callback to send data to remote peer
SendCallback*[A] = proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect]} SendCallback*[A] =
proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect]}
SocketConfig* = object SocketConfig* = object
# This is configurable (in contrast to reference impl), as with standard 2 syn resends # This is configurable (in contrast to reference impl), as with standard 2
# default timeout set to 3seconds and doubling of timeout with each re-send, it # SYN resends, the default timeout of 3 seconds and the doubling of the
# means that initial connection would timeout after 21s, which seems rather long # timeout with each resend, it means that the initial connection would
# timeout only after 21s, which seems rather long.
initialSynTimeout*: Duration initialSynTimeout*: Duration
# Number of resend re-tries of each data packet, before declaring connection # Number of resend retries of each data packet, before declaring the
# failed # connection as failed.
dataResendsBeforeFailure*: uint16 dataResendsBeforeFailure*: uint16
# Maximal size of receive buffer in bytes # Maximal size of receive buffer in bytes
@ -73,23 +75,24 @@ type
# state and will be able to transfer data. # state and will be able to transfer data.
incomingSocketReceiveTimeout*: Option[Duration] incomingSocketReceiveTimeout*: Option[Duration]
# Timeout after which the send window will be reset to its minimal value after it dropped # Timeout after which the send window will be reset to its minimal value
# to zero. i.e when we received a packet from remote peer with `wndSize` set to 0. # after it dropped to zero.
# i.e when a packet is received from a peer with `wndSize` set to 0.
remoteWindowResetTimeout*: Duration remoteWindowResetTimeout*: Duration
# Size of reorder buffer calculated as fraction of optRcvBuffer # Size of reorder buffer calculated as fraction of optRcvBuffer
maxSizeOfReorderBuffer: uint32 maxSizeOfReorderBuffer: uint32
# Maximal number of payload bytes per data packet. Total packet size will be equal to # Maximal number of payload bytes per data packet. Total packet size will be
# payloadSize + 20 (size of header of data packet) # equal to payloadSize + 20 (size of header of data packet).
# TODO for now we enable only static configuration of packet sizes. In the future # TODO: for now we enable only static configuration of packet sizes. In the
# it would be nice to add option which enables automatic packet size discovery # future it would be nice to add option which enables automatic packet size
# based on traffic # discovery based on traffic.
payloadSize*: uint32 payloadSize*: uint32
# Maximal number of open uTP connections. When hit, no more incoming connections # Maximal number of open uTP connections. When hit, no more incoming
# will be allowed, but it will still be possible to open new outgoing uTP # connections will be allowed, but it will still be possible to open new
# connections # outgoing uTP connections.
maxNumberOfOpenConnections*: int maxNumberOfOpenConnections*: int
WriteErrorType* = enum WriteErrorType* = enum
@ -147,25 +150,25 @@ type
direction: ConnectionDirection direction: ConnectionDirection
socketConfig: SocketConfig socketConfig: SocketConfig
# Connection id for packets we receive # Connection id for received packets
connectionIdRcv*: uint16 connectionIdRcv*: uint16
# Connection id for packets we send # Connection id for send packets
connectionIdSnd*: uint16 connectionIdSnd*: uint16
# Sequence number for the next packet to be sent. # Sequence number for the next packet to be sent.
seqNr: uint16 seqNr: uint16
# All seq number up to this have been correctly acked by us # All sequence numbers up to this have been correctly acked by us.
ackNr: uint16 ackNr: uint16
# Should be completed after successful connection to remote host or after timeout # Should be completed after successful connection to remote host or after
# for the first syn packet # timeout for the first SYN packet.
connectionFuture: Future[void] connectionFuture: Future[void]
# the number of packets in the send queue. Packets that haven't # The number of packets in the send queue. Packets that haven't
# yet been sent count as well as packets marked as needing resend # been sent yet and packets marked as needing to be resend count.
# the oldest un-acked packet in the send queue is seq_nr - cur_window_packets # The oldest un-acked packet in the send queue is seq_nr - cur_window_packets
curWindowPackets: uint16 curWindowPackets: uint16
# out going buffer for all send packets # outgoing buffer for all send packets
outBuffer: GrowableCircularBuffer[OutgoingPacket] outBuffer: GrowableCircularBuffer[OutgoingPacket]
# current number of bytes in send buffer # current number of bytes in send buffer
@ -309,29 +312,28 @@ type
ConnectionResult*[A] = Result[UtpSocket[A], OutgoingConnectionError] ConnectionResult*[A] = Result[UtpSocket[A], OutgoingConnectionError]
const const
# Default maximum size of the data packet payload. With such configuration # Default maximum size of the data packet payload. With this configuration
# data packets will have 508 bytes (488 + 20 header). # data packets will have 508 bytes (488 + 20 header).
# 508 bytes of udp payload can translate into 576 bytes udp packet i.e # 508 bytes of UDP payload can translate into 576 bytes UDP packet i.e
# 508bytes payload + 60bytes (max ip header) + 8bytes (udp header) = 576bytes. # 508 bytes + 60 bytes (max IP header) + 8 bytes (UDP header) = 576 bytes.
# 576bytes is defined as minimum reassembly buffer size i.e # 576 bytes is defined as minimum reassembly buffer size, i.e the minimum
# the minimum datagram size that we are guaranteed any implementation must support. # datagram size that any implementation must support.
# from RFC791: All hosts must be prepared # From RFC791: All hosts must be prepared to accept datagrams of up to 576
# to accept datagrams of up to 576 octets (whether they arrive whole # octets (whether they arrive whole or in fragments).
# or in fragments).
defaultPayloadSize = 488 defaultPayloadSize = 488
# How often each socket check its different on going timers # How often each socket check its different ongoing timers
checkTimeoutsLoopInterval = milliseconds(500) checkTimeoutsLoopInterval = milliseconds(500)
# Default initial timeout for first Syn packet # Default initial timeout for first SYN packet
defaultInitialSynTimeout = milliseconds(3000) defaultInitialSynTimeout = milliseconds(3000)
# Initial timeout to receive first Data data packet after receiving initial Syn # Initial timeout to receive first Data data packet after receiving initial
# packet. # SYN packet.
defaultRcvRetransmitTimeout = milliseconds(10000) defaultRcvRetransmitTimeout = milliseconds(10000)
# Number of times each data packet will be resend before declaring connection # Number of times each data packet will be resend before declaring connection
# dead. 4 is taken from reference implementation # dead. 4 is taken from reference implementation.
defaultDataResendsBeforeFailure = 4'u16 defaultDataResendsBeforeFailure = 4'u16
# default size of rcv buffer in bytes # default size of rcv buffer in bytes
@ -350,9 +352,9 @@ const
# considered suspicious and ignored # considered suspicious and ignored
allowedAckWindow*: uint16 = 3 allowedAckWindow*: uint16 = 3
# Timeout after which the send window will be reset to its minimal value after it dropped # Timeout after which the send window will be reset to its minimal value after
# lower than our current packet size. i.e when we received a packet # it dropped lower than our current packet size. i.e when we received a packet
# from remote peer with `wndSize` set to number <= current packet size # from remote peer with `wndSize` set to number <= current packet size.
defaultResetWindowTimeout = seconds(15) defaultResetWindowTimeout = seconds(15)
reorderBufferMaxSize = 1024 reorderBufferMaxSize = 1024
@ -362,18 +364,19 @@ const
# minimal time before subsequent window decays # minimal time before subsequent window decays
maxWindowDecay = milliseconds(100) maxWindowDecay = milliseconds(100)
# Maximal size of reorder buffer as fraction of optRcvBuffer size following # Maximal size of reorder buffer as fraction of optRcvBuffer size.
# semantics apply bases on rcvBuffer set to 1000 bytes: # Following semantics apply based on a rcvBuffer set to 1000 bytes:
# if there are already 1000 bytes in rcv buffer no more bytes will be accepted to reorder buffer # - if there are already 1000 bytes in rcvBuffer no more bytes will be
# if there are already 500 bytes in reorder buffer, no more bytes will be accepted # accepted to reorder buffer
# to it, and only 500 bytes can be accepted to rcv buffer # - if there are already 500 bytes in reorder buffer, no more bytes will be
# this way there is always a space in rcv buffer to fit new data if the reordering # accepted to it, and only 500 bytes can be accepted to rcvBuffer
# happens # This way there is always a space in rcvBuffer to fit new data if the
# reordering happens.
maxReorderBufferSize = 0.5 maxReorderBufferSize = 0.5
# Default number of of open utp connections # Default number of of open utp connections
# libutp uses 3000 # - libutp uses 3000
# libtorrent uses ~16000 # - libtorrent uses ~16000
defaultMaxOpenConnections = 8000 defaultMaxOpenConnections = 8000
proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T =
@ -405,9 +408,11 @@ proc init*(
payloadSize: uint32 = defaultPayloadSize, payloadSize: uint32 = defaultPayloadSize,
maxNumberOfOpenConnections: int = defaultMaxOpenConnections maxNumberOfOpenConnections: int = defaultMaxOpenConnections
): T = ): T =
# make sure there is always some payload in data packets, and that packets are not to large. # Make sure there is always some payload in data packets, and that packets are
# with 1480 packet boundary, data packets will have 1500 bytes which seems reasonable # not to large. With 1480 packet boundary, data packets will have 1500 bytes
doAssert(payloadSize > 0 and payloadSize <= 1480, "payloadSize should always be positive number <= 1480") # which seems reasonable.
doAssert(payloadSize > 0 and payloadSize <= 1480,
"payloadSize should always be positive number <= 1480")
# TODO make sure optRcvBuffer is nicely divisible by maxReorderBufferSize # TODO make sure optRcvBuffer is nicely divisible by maxReorderBufferSize
let reorderBufferSize = uint32(maxReorderBufferSize * float64(optRcvBuffer)) let reorderBufferSize = uint32(maxReorderBufferSize * float64(optRcvBuffer))
SocketConfig( SocketConfig(
@ -478,11 +483,14 @@ proc flushPackets(socket: UtpSocket) =
var i: uint16 = oldestOutgoingPacketSeqNr var i: uint16 = oldestOutgoingPacketSeqNr
while i != socket.seqNr: while i != socket.seqNr:
# sending only packet which were not transmitted yet or need a resend # sending only packet which were not transmitted yet or need a resend
let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true)) let shouldSendPacket = socket.outBuffer.exists(
i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true))
if (shouldSendPacket): if (shouldSendPacket):
if (socket.freeWindowBytes() > 0): if (socket.freeWindowBytes() > 0):
# this our first send packet reset rto timeout # this our first send packet reset rto timeout
if i == oldestOutgoingPacketSeqNr and socket.curWindowPackets == 1 and socket.outBuffer[i].transmissions == 0: if i == oldestOutgoingPacketSeqNr and
socket.curWindowPackets == 1 and
socket.outBuffer[i].transmissions == 0:
socket.resetSendTimeout() socket.resetSendTimeout()
debug "Flushing packet", debug "Flushing packet",
@ -501,14 +509,16 @@ proc flushPackets(socket: UtpSocket) =
proc markAllPacketAsLost(s: UtpSocket) = proc markAllPacketAsLost(s: UtpSocket) =
var i = 0'u16 var i = 0'u16
while i < s.curWindowPackets: while i < s.curWindowPackets:
let packetSeqNr = s.seqNr - 1 - i let packetSeqNr = s.seqNr - 1 - i
if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p.transmissions > 0 and p.needResend == false)): if (s.outBuffer.exists(
packetSeqNr,
(p: OutgoingPacket) => p.transmissions > 0 and p.needResend == false)):
debug "Marking packet as lost", debug "Marking packet as lost",
pkSeqNr = packetSeqNr pkSeqNr = packetSeqNr
s.outBuffer[packetSeqNr].needResend = true s.outBuffer[packetSeqNr].needResend = true
let packetPayloadLength = s.outBuffer[packetSeqNr].payloadLength let packetPayloadLength = s.outBuffer[packetSeqNr].payloadLength
doAssert(s.currentWindow >= packetPayloadLength, "Window should always be larger than packet length") doAssert(s.currentWindow >= packetPayloadLength,
"Window should always be larger than packet length")
s.currentWindow = s.currentWindow - packetPayloadLength s.currentWindow = s.currentWindow - packetPayloadLength
inc i inc i
@ -655,7 +665,6 @@ proc handleDataWrite(socket: UtpSocket, data: seq[byte]): int =
let lastOrEnd = min(lastIndex, endIndex) let lastOrEnd = min(lastIndex, endIndex)
let dataSlice = data[i..lastOrEnd] let dataSlice = data[i..lastOrEnd]
let payloadLength = uint32(len(dataSlice)) let payloadLength = uint32(len(dataSlice))
if (socket.outBufferBytes + payloadLength <= socket.socketConfig.optSndBuffer): if (socket.outBufferBytes + payloadLength <= socket.socketConfig.optSndBuffer):
let wndSize = socket.getRcvWindowSize() let wndSize = socket.getRcvWindowSize()
let dataPacket = let dataPacket =
@ -667,9 +676,12 @@ proc handleDataWrite(socket: UtpSocket, data: seq[byte]): int =
dataSlice, dataSlice,
socket.replayMicro socket.replayMicro
) )
let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 0, false, payloadLength) let outgoingPacket = OutgoingPacket.init(
encodePacket(dataPacket), 0, false, payloadLength)
socket.registerOutgoingPacket(outgoingPacket) socket.registerOutgoingPacket(outgoingPacket)
bytesWritten = bytesWritten + len(dataSlice) bytesWritten = bytesWritten + len(dataSlice)
# TODO: When flushPackets early ended because of send window being full,
# it keeps trying here again for each dataSlice. Sounds waistfull?
socket.flushPackets() socket.flushPackets()
else: else:
debug "No more place in write buffer", debug "No more place in write buffer",
@ -851,6 +863,14 @@ proc calculateAckedbytes(socket: UtpSocket, nrPacketsToAck: uint16, now: Moment)
proc initializeAckNr(socket: UtpSocket, packetSeqNr: uint16) = proc initializeAckNr(socket: UtpSocket, packetSeqNr: uint16) =
if (socket.state == SynSent): if (socket.state == SynSent):
# Different from the uTP spec but in accordance with libutp and libtorrent.
# When receiving the ACK of a SYN packet, the socket ackNr gets initialized
# as the packet seqNr - 1. This way, the socket ackNr is set up as one less
# the next seqNr for an incoming DATA packet. The seqNr in STATE packets
# should basically be seen as the seqNr for the next DATA or FIN packet.
# See also:
# - libutp: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L1874
# - libtorrent: https://github.com/arvidn/libtorrent/blob/RC_2_0/src/utp_stream.cpp#L2924
socket.ackNr = packetSeqNr - 1 socket.ackNr = packetSeqNr - 1
proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool = proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool =
@ -1055,9 +1075,8 @@ proc generateAckPacket*(socket: UtpSocket): Packet =
) )
proc sendAck(socket: UtpSocket) = proc sendAck(socket: UtpSocket) =
## Creates and sends ack, based on current socket state. Acks are different from ## Creates and sends ack, based on current socket state. Acks are different
## other packets as we do not track them in outgoing buffet ## from other packets as we do not track them in outgoing buffer.
let ackPacket = socket.generateAckPacket() let ackPacket = socket.generateAckPacket()
debug "Sending STATE packet", debug "Sending STATE packet",
@ -1086,10 +1105,9 @@ proc tryfinalizeConnection(socket: UtpSocket, p: Packet) =
if (not socket.connectionFuture.finished()): if (not socket.connectionFuture.finished()):
socket.connectionFuture.complete() socket.connectionFuture.complete()
# TODO at socket level we should handle only FIN/DATA/ACK packets. Refactor to make # TODO: at socket level we should handle only FIN/DATA/ACK packets. Refactor to
# it enforceable by type system # make it enforceable by type system
proc processPacketInternal(socket: UtpSocket, p: Packet) = proc processPacketInternal(socket: UtpSocket, p: Packet) =
debug "Process packet", debug "Process packet",
socketKey = socket.socketKey, socketKey = socket.socketKey,
socketAckNr = socket.ackNr, socketAckNr = socket.ackNr,
@ -1571,8 +1589,8 @@ proc eventLoop(socket: UtpSocket) {.async.} =
# stop processing further reads # stop processing further reads
break break
else: else:
# read was cancelled or socket is already finished move on to next read # read was cancelled or socket is already finished move on to next
# request # read request
discard socket.pendingReads.popFirst() discard socket.pendingReads.popFirst()
# we processed packet, so there could more place in the send buffer # we processed packet, so there could more place in the send buffer
@ -1592,12 +1610,13 @@ proc eventLoop(socket: UtpSocket) {.async.} =
if (bytesWritten == len(pendingWrite.data)): if (bytesWritten == len(pendingWrite.data)):
# all bytes were written we can finish external future # all bytes were written we can finish external future
pendingWrite.writer.complete( pendingWrite.writer.complete(
Result[int, WriteError].ok(bytesWritten) WriteResult.ok(bytesWritten)
) )
else: else:
let bytesLeft = let bytesLeft =
pendingWrite.data[bytesWritten..pendingWrite.data.high] pendingWrite.data[bytesWritten..pendingWrite.data.high]
# bytes partially written to buffer, schedule rest of data for later # bytes partially written to buffer, schedule rest of data for
# later
socket.pendingWrites.addFirst( socket.pendingWrites.addFirst(
WriteRequest( WriteRequest(
kind: Data, kind: Data,
@ -1619,7 +1638,8 @@ proc eventLoop(socket: UtpSocket) {.async.} =
# check if the writer was not cancelled in mean time # check if the writer was not cancelled in mean time
if (not socketEvent.writer.finished()): if (not socketEvent.writer.finished()):
if (socket.pendingWrites.len() > 0): if (socket.pendingWrites.len() > 0):
# there are still some unfinished writes, waiting to be finished schdule this batch for later # there are still some unfinished writes, waiting to be finished
# schedule this batch for later
socket.pendingWrites.addLast( socket.pendingWrites.addLast(
WriteRequest( WriteRequest(
kind: Data, kind: Data,
@ -1632,7 +1652,7 @@ proc eventLoop(socket: UtpSocket) {.async.} =
if (bytesWritten == len(socketEvent.data)): if (bytesWritten == len(socketEvent.data)):
# all bytes were written we can finish external future # all bytes were written we can finish external future
socketEvent.writer.complete( socketEvent.writer.complete(
Result[int, WriteError].ok(bytesWritten) WriteResult.ok(bytesWritten)
) )
else: else:
let bytesLeft = let bytesLeft =
@ -1649,8 +1669,8 @@ proc eventLoop(socket: UtpSocket) {.async.} =
# check if the writer was not cancelled in mean time # check if the writer was not cancelled in mean time
if (not socketEvent.readReq.reader.finished()): if (not socketEvent.readReq.reader.finished()):
if (socket.pendingReads.len() > 0): if (socket.pendingReads.len() > 0):
# there is already pending unfinished read request, schedule this one for # there is already pending unfinished read request, schedule this
# later # one for later
socket.pendingReads.addLast(socketEvent.readReq) socket.pendingReads.addLast(socketEvent.readReq)
else: else:
var readReq = socketEvent.readReq var readReq = socketEvent.readReq
@ -1665,19 +1685,19 @@ proc eventLoop(socket: UtpSocket) {.async.} =
except CancelledError as exc: except CancelledError as exc:
for w in socket.pendingWrites.items(): for w in socket.pendingWrites.items():
if w.kind == Data and (not w.writer.finished()): if w.kind == Data and (not w.writer.finished()):
let res = Result[int, WriteError].err( let res = WriteResult.err(
WriteError(kind: SocketNotWriteable, currentState: socket.state) WriteError(kind: SocketNotWriteable, currentState: socket.state)
) )
w.writer.complete(res) w.writer.complete(res)
for r in socket.pendingReads.items(): for r in socket.pendingReads.items():
# complete every reader with already read bytes # complete every reader with already read bytes
# TODO: it maybe better to refine read api to return Future[Result[seq[byte], E]] # TODO: it may be better to refine read API to return
# and return errors for not finished reads # Future[Result[seq[byte], E]] and return errors for not finished reads
if (not r.reader.finished()): if (not r.reader.finished()):
r.reader.complete(r.bytesAvailable) r.reader.complete(r.bytesAvailable)
socket.pendingWrites.clear() socket.pendingWrites.clear()
socket.pendingReads.clear() socket.pendingReads.clear()
# main eventLoop has been cancelled, try to cancel check timeouts loop # main eventLoop has been cancelled, try to cancel `checkTimeoutsLoop`
socket.checkTimeoutsLoop.cancel() socket.checkTimeoutsLoop.cancel()
trace "main socket event loop cancelled" trace "main socket event loop cancelled"
raise exc raise exc
@ -1686,16 +1706,16 @@ proc startEventLoop(s: UtpSocket) =
s.eventLoop = eventLoop(s) s.eventLoop = eventLoop(s)
proc atEof*(socket: UtpSocket): bool = proc atEof*(socket: UtpSocket): bool =
# socket is considered at eof when remote side sent us fin packet # The socket is considered at eof when the remote side sent us a FIN packet
# and we have processed all packets up to fin # and all packets up to the FIN have been processed.
socket.offset == 0 and socket.reachedFin socket.offset == 0 and socket.reachedFin
proc readingClosed(socket: UtpSocket): bool = proc readingClosed(socket: UtpSocket): bool =
socket.atEof() or socket.state == Destroy socket.atEof() or socket.state == Destroy
proc close*(socket: UtpSocket) = proc close*(socket: UtpSocket) =
## Gracefully closes connection (send FIN) if socket is in connected state ## Gracefully close the connection (send FIN) if the socket is in the
## does not wait for socket to close ## connected state. Does not wait for the socket to close.
if socket.state != Destroy: if socket.state != Destroy:
case socket.state case socket.state
of Connected: of Connected:
@ -1703,26 +1723,26 @@ proc close*(socket: UtpSocket) =
if (not socket.sendFinRequested): if (not socket.sendFinRequested):
try: try:
debug "Sending FIN", dst = socket.socketKey debug "Sending FIN", dst = socket.socketKey
# with this approach, all pending writes will be executed before sending fin packet # With this approach, all pending writes will be executed before
# we could also and method which places close request as first one to process # sending the FIN packet.
# but it would complicate the write loop
socket.eventQueue.putNoWait(SocketEvent(kind: CloseReq)) socket.eventQueue.putNoWait(SocketEvent(kind: CloseReq))
except AsyncQueueFullError as e: except AsyncQueueFullError as e:
# should not happen as our write queue is unbounded # Should not happen as our write queue is unbounded.
raiseAssert e.msg raiseAssert e.msg
socket.sendFinRequested = true socket.sendFinRequested = true
else: else:
# In any other case like connection is not established so sending fin make # When connection is not established, sending FIN makes no sense, just
# no sense, we can just out right close it # destroy the socket.
socket.destroy() socket.destroy()
proc closeWait*(socket: UtpSocket) {.async.} = proc closeWait*(socket: UtpSocket) {.async.} =
## Gracefully closes connection (send FIN) if socket is in connected state ## Gracefully close the connection (send FIN) if the socket is in the
## and waits for socket to be closed. ## connected state and wait for the socket to be closed.
## Warning: if FIN packet for some reason will be lost, then socket will be closed ## Warning: if the FIN packet is lost, then the socket might get closed due to
## due to retransmission failure which may take some time. ## retransmission failures, which will take some time.
## default is 4 retransmissions with doubling of rto between each retransmission ## The default is 4 retransmissions with doubling of rto between each
## retransmission.
socket.close() socket.close()
await socket.closeEvent.wait() await socket.closeEvent.wait()
@ -1732,26 +1752,27 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] =
let retFuture = newFuture[WriteResult]("UtpSocket.write") let retFuture = newFuture[WriteResult]("UtpSocket.write")
if (socket.state != Connected): if (socket.state != Connected):
let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) let res = WriteResult.err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
retFuture.complete(res) retFuture.complete(res)
return retFuture return retFuture
# fin should be last packet received by remote side, therefore trying to write # fin should be last packet received by remote side, therefore trying to write
# after sending fin is considered error # after sending fin is considered error
if socket.sendFinRequested or socket.finSent: if socket.sendFinRequested or socket.finSent:
let res = Result[int, WriteError].err(WriteError(kind: FinSent)) let res = WriteResult.err(WriteError(kind: FinSent))
retFuture.complete(res) retFuture.complete(res)
return retFuture return retFuture
var bytesWritten = 0 var bytesWritten = 0
if len(data) == 0: if len(data) == 0:
let res = Result[int, WriteError].ok(bytesWritten) let res = WriteResult.ok(bytesWritten)
retFuture.complete(res) retFuture.complete(res)
return retFuture return retFuture
try: try:
socket.eventQueue.putNoWait(SocketEvent(kind: WriteReq, data: data, writer: retFuture)) socket.eventQueue.putNoWait(SocketEvent(
kind: WriteReq, data: data, writer: retFuture))
except AsyncQueueFullError as e: except AsyncQueueFullError as e:
# this should not happen as out write queue is unbounded # this should not happen as out write queue is unbounded
raiseAssert e.msg raiseAssert e.msg