Handling of fin packet (#421)

* Handling of connection finalization by sending and receiving FIN packets
This commit is contained in:
KonradStaniec 2021-11-09 15:29:59 +01:00 committed by GitHub
parent df802248d8
commit b671f6c901
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 494 additions and 88 deletions

View File

@ -157,7 +157,7 @@ proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSiz
let h = PacketHeaderV1( let h = PacketHeaderV1(
pType: ST_STATE, pType: ST_STATE,
version: protocolVersion, version: protocolVersion,
# ack packets always have extension field set to 0 # TODO Handle selective acks
extension: 0'u8, extension: 0'u8,
connectionId: sndConnectionId, connectionId: sndConnectionId,
timestamp: getMonoTimeTimeStamp(), timestamp: getMonoTimeTimeStamp(),
@ -206,3 +206,21 @@ proc resetPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16): Packet
) )
Packet(header: h, payload: @[]) Packet(header: h, payload: @[])
proc finPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32): Packet =
let h = PacketHeaderV1(
pType: ST_FIN,
version: protocolVersion,
# fin packets always have extension field set to 0
extension: 0'u8,
connectionId: sndConnectionId,
timestamp: getMonoTimeTimeStamp(),
# TODO for not we are using 0, but this value should be calculated on socket
# level
timestampDiff: 0'u32,
wndSize: bufferSize,
seqNr: seqNr,
ackNr: ackNr
)
Packet(header: h, payload: @[])

View File

@ -47,4 +47,4 @@ when isMainModule:
runForever() runForever()
waitFor utpProt.closeWait() waitFor utpProt.shutdownWait()

View File

@ -73,3 +73,11 @@ proc new*(
proc connectTo*(r: UtpDiscv5Protocol, address: Node): Future[UtpSocket[Node]]= proc connectTo*(r: UtpDiscv5Protocol, address: Node): Future[UtpSocket[Node]]=
return r.router.connectTo(address) return r.router.connectTo(address)
proc shutdown*(r: UtpDiscv5Protocol) =
## closes all managed utp connections in background (not closed discovery, it is up to user)
r.router.shutdown()
proc shutdownWait*(r: UtpDiscv5Protocol) {.async.} =
## closes all managed utp connections in background (not closed discovery, it is up to user)
await r.router.shutdownWait()

View File

@ -91,11 +91,10 @@ proc new*(
router.sendCb = initSendCallback(ta) router.sendCb = initSendCallback(ta)
UtpProtocol(transport: ta, utpRouter: router) UtpProtocol(transport: ta, utpRouter: router)
proc closeWait*(p: UtpProtocol): Future[void] {.async.} = proc shutdownWait*(p: UtpProtocol): Future[void] {.async.} =
# TODO Rething all this when working on FIN and RESET packets and proper handling ## closes all managed utp sockets and then underlying transport
# of resources await p.utpRouter.shutdownWait()
await p.transport.closeWait() await p.transport.closeWait()
p.utpRouter.close()
proc connectTo*(r: UtpProtocol, address: TransportAddress): Future[UtpSocket[TransportAddress]] = proc connectTo*(r: UtpProtocol, address: TransportAddress): Future[UtpSocket[TransportAddress]] =
return r.utpRouter.connectTo(address) return r.utpRouter.connectTo(address)

View File

@ -25,6 +25,7 @@ type
sockets: Table[UtpSocketKey[A], UtpSocket[A]] sockets: Table[UtpSocketKey[A], UtpSocket[A]]
socketConfig: SocketConfig socketConfig: SocketConfig
acceptConnection: AcceptConnectionCallback[A] acceptConnection: AcceptConnectionCallback[A]
closed: bool
sendCb*: SendCallback[A] sendCb*: SendCallback[A]
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
@ -105,7 +106,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
# state is that socket in destroy state is ultimatly deleted from active connection # state is that socket in destroy state is ultimatly deleted from active connection
# list but socket in reset state lingers there until user of library closes it # list but socket in reset state lingers there until user of library closes it
# explictly. # explictly.
socket.close() socket.destroy()
else: else:
notice "Received rst packet for not known connection" notice "Received rst packet for not known connection"
of ST_SYN: of ST_SYN:
@ -141,6 +142,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
await r.sendCb(sender, encodePacket(rstPacket)) await r.sendCb(sender, encodePacket(rstPacket))
proc processIncomingBytes*[A](r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} = proc processIncomingBytes*[A](r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} =
if (not r.closed):
let dec = decodePacket(bytes) let dec = decodePacket(bytes)
if (dec.isOk()): if (dec.isOk()):
await processPacket[A](r, dec.get(), sender) await processPacket[A](r, dec.get(), sender)
@ -156,8 +158,24 @@ proc connectTo*[A](r: UtpRouter[A], address: A): Future[UtpSocket[A]] {.async.}=
await socket.waitFotSocketToConnect() await socket.waitFotSocketToConnect()
return socket return socket
proc close*[A](r: UtpRouter[A]) = proc shutdown*[A](r: UtpRouter[A]) =
# TODO Rething all this when working on FIN and RESET packets and proper handling # stop processing any new packets and close all sockets in background without
# of resources # notifing remote peers
r.closed = true
for s in r.allSockets(): for s in r.allSockets():
s.close() s.destroy()
proc shutdownWait*[A](r: UtpRouter[A]) {.async.} =
var activeSockets: seq[UtpSocket[A]] = @[]
# stop processing any new packets and close all sockets without
# notifing remote peers
r.closed = true
# we need to make copy as calling socket.destroyWait() removes socket from the table
# and iterator throws error. Antother option would be to wait until number of opensockets
# go to 0
for s in r.allSockets():
activeSockets.add(s)
for s in activeSockets:
yield s.destroyWait()

View File

@ -9,6 +9,7 @@
import import
std/sugar, std/sugar,
chronos, chronicles, bearssl, chronos, chronicles, bearssl,
stew/results,
./growable_buffer, ./growable_buffer,
./packets ./packets
@ -16,7 +17,7 @@ logScope:
topics = "utp_socket" topics = "utp_socket"
type type
ConnectionState = enum ConnectionState* = enum
SynSent, SynSent,
SynRecv, SynRecv,
Connected, Connected,
@ -109,12 +110,30 @@ type
# number on consecutive re-transsmisions # number on consecutive re-transsmisions
retransmitCount: uint32 retransmitCount: uint32
# Event which will complete whenever socket gets in destory statate # Event which will complete whenever socket gets in destory state
closeEvent: AsyncEvent closeEvent: AsyncEvent
# All callback to be called whenever socket gets in destroy state # All callback to be called whenever socket gets in destroy state
closeCallbacks: seq[Future[void]] closeCallbacks: seq[Future[void]]
# socket is closed for reading
readShutdown: bool
# we sent out fin packet
finSent: bool
# have our fin been acked
finAcked: bool
# have we received remote fin
gotFin: bool
# have we reached remote fin packet
reachedFin: bool
# sequence number of remoted fin packet
eofPktNr: uint16
# socket identifier # socket identifier
socketKey*: UtpSocketKey[A] socketKey*: UtpSocketKey[A]
@ -126,6 +145,19 @@ type
ConnectionError* = object of CatchableError ConnectionError* = object of CatchableError
WriteErrorType* = enum
SocketNotWriteable,
FinSent
WriteError* = object
case kind*: WriteErrorType
of SocketNotWriteable:
currentState*: ConnectionState
of FinSent:
discard
WriteResult* = Result[int, WriteError]
const const
# Maximal number of payload bytes per packet. Total packet size will be equal to # Maximal number of payload bytes per packet. Total packet size will be equal to
# mtuSize + sizeof(header) = 600 bytes # mtuSize + sizeof(header) = 600 bytes
@ -254,7 +286,7 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
# client initiated connections, but did not send following data packet in rto # client initiated connections, but did not send following data packet in rto
# time. TODO this should be configurable # time. TODO this should be configurable
if (socket.state == SynRecv): if (socket.state == SynRecv):
socket.close() socket.destroy()
return return
if socket.shouldDisconnectFromFailedRemote(): if socket.shouldDisconnectFromFailedRemote():
@ -263,7 +295,7 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
# but maybe it would be more clean to use result # but maybe it would be more clean to use result
socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out")) socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out"))
socket.close() socket.destroy()
return return
let newTimeout = socket.retransmitTimeout * 2 let newTimeout = socket.retransmitTimeout * 2
@ -416,17 +448,19 @@ proc startIncomingSocket*(socket: UtpSocket) {.async.} =
proc isConnected*(socket: UtpSocket): bool = proc isConnected*(socket: UtpSocket): bool =
socket.state == Connected or socket.state == ConnectedFull socket.state == Connected or socket.state == ConnectedFull
proc close*(s: UtpSocket) = proc destroy*(s: UtpSocket) =
# TODO Rething all this when working on FIN packets and proper handling ## Moves socket to destroy state and clean all reasources.
# of resources ## Remote is not notified in any way about socket end of life
s.state = Destroy s.state = Destroy
s.checkTimeoutsLoop.cancel() s.checkTimeoutsLoop.cancel()
s.closeEvent.fire() s.closeEvent.fire()
proc closeWait*(s: UtpSocket) {.async.} = proc destroyWait*(s: UtpSocket) {.async.} =
# TODO Rething all this when working on FIN packets and proper handling ## Moves socket to destroy state and clean all reasources and wait for all registered
# of resources ## callback to fire
s.close() ## Remote is not notified in any way about socket end of life
s.destroy()
await s.closeEvent.wait()
await allFutures(s.closeCallbacks) await allFutures(s.closeCallbacks)
proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async.} = proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async.} =
@ -560,34 +594,66 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
notice "Received packet is totally of the mark" notice "Received packet is totally of the mark"
return return
# socket.curWindowPackets == acks means that this packet acked all remaining packets
# including the sent fin packets
if (socket.finSent and socket.curWindowPackets == acks):
notice "FIN acked, destroying socket"
socket.finAcked = true
# this bit of utp spec is a bit under specified (i.e there is not specification at all)
# reference implementation moves socket to destroy state in case that our fin was acked
# and socket is considered closed for reading and writing.
# but in theory remote could stil write some data on this socket (or even its own fin)
socket.destroy()
socket.ackPackets(acks) socket.ackPackets(acks)
case p.header.pType case p.header.pType
of ST_DATA: of ST_DATA, ST_FIN:
# To avoid amplification attacks, server socket is in SynRecv state until # To avoid amplification attacks, server socket is in SynRecv state until
# it receices first data transfer # it receices first data transfer
# https://www.usenix.org/system/files/conference/woot15/woot15-paper-adamsky.pdf # https://www.usenix.org/system/files/conference/woot15/woot15-paper-adamsky.pdf
# TODO when intgrating with discv5 this need to be configurable # TODO when intgrating with discv5 this need to be configurable
if (socket.state == SynRecv): if (socket.state == SynRecv and p.header.pType == ST_DATA):
socket.state = Connected socket.state = Connected
notice "Received ST_DATA on known socket" if (p.header.pType == ST_FIN and (not socket.gotFin)):
socket.gotFin = true
socket.eofPktNr = pkSeqNr
if (pastExpected == 0): # we got in order packet
if (pastExpected == 0 and (not socket.reachedFin)):
if (len(p.payload) > 0 and (not socket.readShutdown)):
# we are getting in order data packet, we can flush data directly to the incoming buffer # we are getting in order data packet, we can flush data directly to the incoming buffer
await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len())
# Bytes have been passed to upper layer, we can increase number of last # Bytes have been passed to upper layer, we can increase number of last
# acked packet # acked packet
inc socket.ackNr inc socket.ackNr
# check if the following packets are in reorder buffer # check if the following packets are in reorder buffer
while true: while true:
# We are doing this in reoreder loop, to handle the case when we already received
# fin but there were some gaps before eof
# we have reached remote eof, and should not receive more packets from remote
if ((not socket.reachedFin) and socket.gotFin and socket.eofPktNr == socket.ackNr):
notice "Reached socket EOF"
# In case of reaching eof, it is up to user of library what to to with
# it. With the current implementation, the most apropriate way would be to
# destory it (as with our implementation we know that remote is destroying its acked fin)
# as any other send will either generate timeout, or socket will be forcefully
# closed by reset
socket.reachedFin = true
# this is not necessarily true, but as we have already reached eof we can
# ignore following packets
socket.reorderCount = 0
# notify all readers we have reached eof
socket.buffer.forget()
if socket.reorderCount == 0: if socket.reorderCount == 0:
break break
# TODO Handle case when we have reached eof becouse of fin packet
let nextPacketNum = socket.ackNr + 1 let nextPacketNum = socket.ackNr + 1
let maybePacket = socket.inBuffer.get(nextPacketNum) let maybePacket = socket.inBuffer.get(nextPacketNum)
if maybePacket.isNone(): if maybePacket.isNone():
@ -595,6 +661,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
let packet = maybePacket.unsafeGet() let packet = maybePacket.unsafeGet()
if (len(packet.payload) > 0 and (not socket.readShutdown)):
await upload(addr socket.buffer, unsafeAddr packet.payload[0], packet.payload.len()) await upload(addr socket.buffer, unsafeAddr packet.payload[0], packet.payload.len())
socket.inBuffer.delete(nextPacketNum) socket.inBuffer.delete(nextPacketNum)
@ -607,10 +674,15 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
# how many concurrent tasks there are and how to cancel them when socket # how many concurrent tasks there are and how to cancel them when socket
# is closed # is closed
asyncSpawn socket.sendAck() asyncSpawn socket.sendAck()
# we got packet out of order
else: else:
# TODO Handle case when out of order is out of eof range
notice "Got out of order packet" notice "Got out of order packet"
if (socket.gotFin and pkSeqNr > socket.eofPktNr):
notice "Got packet past eof"
return
# growing buffer before checking the packet is already there to avoid # growing buffer before checking the packet is already there to avoid
# looking at older packet due to indices wrap aroud # looking at older packet due to indices wrap aroud
socket.inBuffer.ensureSize(pkSeqNr + 1, pastExpected + 1) socket.inBuffer.ensureSize(pkSeqNr + 1, pastExpected + 1)
@ -623,12 +695,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
notice "added out of order packet in reorder buffer" notice "added out of order packet in reorder buffer"
# TODO for now we do not sent any ack as we do not handle selective acks # TODO for now we do not sent any ack as we do not handle selective acks
# add sending of selective acks # add sending of selective acks
of ST_FIN:
# TODO not implemented
notice "Received ST_FIN on known socket"
of ST_STATE: of ST_STATE:
notice "Received ST_STATE on known socket"
if (socket.state == SynSent and (not socket.connectionFuture.finished())): if (socket.state == SynSent and (not socket.connectionFuture.finished())):
socket.state = Connected socket.state = Connected
# TODO reference implementation sets ackNr (p.header.seqNr - 1), although # TODO reference implementation sets ackNr (p.header.seqNr - 1), although
@ -644,22 +711,17 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
# existence of application level handshake over utp. We may need to modify this # existence of application level handshake over utp. We may need to modify this
# to automaticly send ST_DATA . # to automaticly send ST_DATA .
of ST_RESET: of ST_RESET:
# TODO not implemented notice "Received ST_RESET on known socket, ignoring"
notice "Received ST_RESET on known socket"
of ST_SYN: of ST_SYN:
# TODO not implemented notice "Received ST_SYN on known socket, ignoring"
notice "Received ST_SYN on known socket"
template readLoop(body: untyped): untyped = proc atEof*(socket: UtpSocket): bool =
while true: # socket is considered at eof when remote side sent us fin packet
# TODO error handling # and we have processed all packets up to fin
let (consumed, done) = body socket.buffer.dataLen() == 0 and socket.reachedFin
socket.buffer.shift(consumed)
if done: proc readingClosed(socket: UtpSocket): bool =
break socket.atEof() or socket.state == Destroy
else:
# TODO add condition to handle socket closing
await socket.buffer.wait()
proc getPacketSize(socket: UtpSocket): int = proc getPacketSize(socket: UtpSocket): int =
# TODO currently returning constant, ultimatly it should be bases on mtu estimates # TODO currently returning constant, ultimatly it should be bases on mtu estimates
@ -669,15 +731,52 @@ proc resetSendTimeout(socket: UtpSocket) =
socket.retransmitTimeout = socket.rto socket.retransmitTimeout = socket.rto
socket.rtoTimeout = Moment.now() + socket.retransmitTimeout socket.rtoTimeout = Moment.now() + socket.retransmitTimeout
proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} = proc close*(socket: UtpSocket) {.async.} =
## Gracefully closes conneciton (send FIN) if socket is in connected state
## does not wait for socket to close
if socket.state != Destroy:
case socket.state
of Connected, ConnectedFull:
socket.readShutdown = true
if (not socket.finSent):
if socket.curWindowPackets == 0:
socket.resetSendTimeout()
let finEncoded = encodePacket(finPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576))
socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true))
socket.finSent = true
await socket.sendData(finEncoded)
else:
# In any other case like connection is not established so sending fin make
# no sense, we can just out right close it
socket.destroy()
proc closeWait*(socket: UtpSocket) {.async.} =
## Gracefully closes conneciton (send FIN) if socket is in connected state
## and waits for socket to be closed.
## Warning: if FIN packet for some reason will be lost, then socket will be closed
## due to retransmission failure which may take some time.
## default is 4 retransmissions with doubling of rto between each retranssmision
await socket.close()
await socket.closeEvent.wait()
proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] {.async.} =
if (socket.state != Connected):
return err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
# fin should be last packet received by remote side, therefore trying to write
# after sending fin is considered error
if socket.finSent:
return err(WriteError(kind: FinSent))
var bytesWritten = 0 var bytesWritten = 0
# TODO # TODO
# Handle different socket state i.e do not write when socket is full or not
# connected
# Handle growing of send window # Handle growing of send window
if len(data) == 0: if len(data) == 0:
return bytesWritten return ok(bytesWritten)
if socket.curWindowPackets == 0: if socket.curWindowPackets == 0:
socket.resetSendTimeout() socket.resetSendTimeout()
@ -694,7 +793,18 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} =
bytesWritten = bytesWritten + len(dataSlice) bytesWritten = bytesWritten + len(dataSlice)
i = lastOrEnd + 1 i = lastOrEnd + 1
await socket.flushPackets() await socket.flushPackets()
return bytesWritten
return ok(bytesWritten)
template readLoop(body: untyped): untyped =
while true:
let (consumed, done) = body
socket.buffer.shift(consumed)
if done:
break
else:
if not(socket.readingClosed()):
await socket.buffer.wait()
proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}= proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}=
## Read all bytes `n` bytes from socket ``socket``. ## Read all bytes `n` bytes from socket ``socket``.
@ -706,13 +816,31 @@ proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}=
return bytes return bytes
readLoop(): readLoop():
# TODO Add handling of socket closing if socket.readingClosed():
(0, true)
else:
let count = min(socket.buffer.dataLen(), n - len(bytes)) let count = min(socket.buffer.dataLen(), n - len(bytes))
bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1)) bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1))
(count, len(bytes) == n) (count, len(bytes) == n)
return bytes return bytes
proc read*(socket: UtpSocket): Future[seq[byte]] {.async.}=
## Read all bytes from socket ``socket``.
##
## This procedure allocates buffer seq[byte] and return it as result.
var bytes = newSeq[byte]()
readLoop():
if socket.readingClosed():
(0, true)
else:
let count = socket.buffer.dataLen()
bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1))
(count, false)
return bytes
# Check how many packets are still in the out going buffer, usefull for tests or # Check how many packets are still in the out going buffer, usefull for tests or
# debugging. # debugging.
# It throws assertion error when number of elements in buffer do not equal kept counter # It throws assertion error when number of elements in buffer do not equal kept counter

View File

@ -78,7 +78,7 @@ procSuite "Utp protocol over discovery v5 tests":
check: check:
clientSocket.isConnected() clientSocket.isConnected()
clientSocket.close() await clientSocket.destroyWait()
await node1.closeWait() await node1.closeWait()
await node2.closeWait() await node2.closeWait()
@ -108,12 +108,12 @@ procSuite "Utp protocol over discovery v5 tests":
let received = await serverSocket.read(numOfBytes) let received = await serverSocket.read(numOfBytes)
check: check:
written == numOfBytes written.get() == numOfBytes
bytesToTransfer == received bytesToTransfer == received
clientSocket.isConnected() clientSocket.isConnected()
serverSocket.isConnected() serverSocket.isConnected()
clientSocket.close() await clientSocket.destroyWait()
serverSocket.close() await serverSocket.destroyWait()
await node1.closeWait() await node1.closeWait()
await node2.closeWait() await node2.closeWait()

View File

@ -32,7 +32,7 @@ proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnection
proc transferData(sender: UtpSocket[TransportAddress], receiver: UtpSocket[TransportAddress], data: seq[byte]): Future[seq[byte]] {.async.}= proc transferData(sender: UtpSocket[TransportAddress], receiver: UtpSocket[TransportAddress], data: seq[byte]): Future[seq[byte]] {.async.}=
let bytesWritten = await sender.write(data) let bytesWritten = await sender.write(data)
doAssert bytesWritten == len(data) doAssert bytesWritten.get() == len(data)
let received = await receiver.read(len(data)) let received = await receiver.read(len(data))
return received return received
@ -72,8 +72,10 @@ proc initClientServerScenario(): Future[ClientServerScenario] {.async.} =
) )
proc close(s: ClientServerScenario) {.async.} = proc close(s: ClientServerScenario) {.async.} =
await s.utp1.closeWait() await s.clientSocket.destroyWait()
await s.utp2.closeWait() await s.serverSocket.destroyWait()
await s.utp1.shutdownWait()
await s.utp2.shutdownWait()
proc init2ClientsServerScenario(): Future[TwoClientsServerScenario] {.async.} = proc init2ClientsServerScenario(): Future[TwoClientsServerScenario] {.async.} =
var serverSockets = newAsyncQueue[UtpSocket[TransportAddress]]() var serverSockets = newAsyncQueue[UtpSocket[TransportAddress]]()
@ -107,9 +109,9 @@ proc init2ClientsServerScenario(): Future[TwoClientsServerScenario] {.async.} =
) )
proc close(s: TwoClientsServerScenario) {.async.} = proc close(s: TwoClientsServerScenario) {.async.} =
await s.utp1.closeWait() await s.utp1.shutdownWait()
await s.utp2.closeWait() await s.utp2.shutdownWait()
await s.utp3.closeWait() await s.utp3.shutdownWait()
procSuite "Utp protocol over udp tests": procSuite "Utp protocol over udp tests":
let rng = newRng() let rng = newRng()
@ -136,8 +138,8 @@ procSuite "Utp protocol over udp tests":
server2Called.isSet() server2Called.isSet()
await utpProt1.closeWait() await utpProt1.shutdownWait()
await utpProt2.closeWait() await utpProt2.shutdownWait()
asyncTest "Fail to connect to offline remote host": asyncTest "Fail to connect to offline remote host":
let server1Called = newAsyncEvent() let server1Called = newAsyncEvent()
@ -158,7 +160,7 @@ procSuite "Utp protocol over udp tests":
check: check:
utpProt1.openSockets() == 0 utpProt1.openSockets() == 0
await utpProt1.closeWait() await utpProt1.shutdownWait()
asyncTest "Success connect to remote host which initialy was offline": asyncTest "Success connect to remote host which initialy was offline":
let server1Called = newAsyncEvent() let server1Called = newAsyncEvent()
@ -184,8 +186,8 @@ procSuite "Utp protocol over udp tests":
futSock.finished() and (not futsock.failed()) and (not futsock.cancelled()) futSock.finished() and (not futsock.failed()) and (not futsock.cancelled())
server2Called.isSet() server2Called.isSet()
await utpProt1.closeWait() await utpProt1.shutdownWait()
await utpProt2.closeWait() await utpProt2.shutdownWait()
asyncTest "Success data transfer when data fits into one packet": asyncTest "Success data transfer when data fits into one packet":
let s = await initClientServerScenario() let s = await initClientServerScenario()
@ -260,14 +262,14 @@ procSuite "Utp protocol over udp tests":
let written = await s.clientSocket.write(bytesToTransfer) let written = await s.clientSocket.write(bytesToTransfer)
check: check:
written == len(bytesToTransfer) written.get() == len(bytesToTransfer)
let bytesToTransfer1 = generateByteArray(rng[], 5000) let bytesToTransfer1 = generateByteArray(rng[], 5000)
let written1 = await s.clientSocket.write(bytesToTransfer1) let written1 = await s.clientSocket.write(bytesToTransfer1)
check: check:
written1 == len(bytesToTransfer) written1.get() == len(bytesToTransfer)
let bytesReceived = await s.serverSocket.read(len(bytesToTransfer) + len(bytesToTransfer1)) let bytesReceived = await s.serverSocket.read(len(bytesToTransfer) + len(bytesToTransfer1))
@ -304,3 +306,67 @@ procSuite "Utp protocol over udp tests":
client2Data == server2ReadBytes client2Data == server2ReadBytes
await s.close() await s.close()
asyncTest "Gracefull stop of the socket":
let s = await initClientServerScenario()
check:
s.clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
s.clientSocket.numPacketsInOutGoingBuffer() == 0
# Server socket is not in connected state, until first data transfer
(not s.serverSocket.isConnected())
let bytesToTransfer = generateByteArray(rng[], 100)
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer)
check:
bytesToTransfer == bytesReceivedFromClient
s.serverSocket.isConnected()
await s.clientSocket.closeWait()
check:
not s.clientSocket.isConnected()
s.serverSocket.atEof()
s.utp1.openSockets() == 0
s.utp2.openSockets() == 1
await s.serverSocket.destroyWait()
check:
not s.serverSocket.isConnected()
s.utp2.openSockets() == 0
await s.close()
asyncTest "Reading data until eof":
let s = await initClientServerScenario()
check:
s.clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
s.clientSocket.numPacketsInOutGoingBuffer() == 0
# Server socket is not in connected state, until first data transfer
(not s.serverSocket.isConnected())
let bytesToTransfer1 = generateByteArray(rng[], 1000)
let bytesToTransfer2 = generateByteArray(rng[], 1000)
let bytesToTransfer3 = generateByteArray(rng[], 1000)
let w1 = await s.clientSocket.write(bytesToTransfer1)
let w2 = await s.clientSocket.write(bytesToTransfer2)
let w3 = await s.clientSocket.write(bytesToTransfer3)
await s.clientSocket.closeWait()
let readData = await s.serverSocket.read()
check:
readData == concat(bytesToTransfer1, bytesToTransfer2, bytesToTransfer3)
s.serverSocket.atEof()
s.utp1.openSockets() == 0
await s.close()

View File

@ -131,7 +131,7 @@ procSuite "Utp router unit tests":
check: check:
router.len() == 1 router.len() == 1
await socket.closeWait() await socket.destroyWait()
check: check:
not socket.isConnected() not socket.isConnected()
@ -161,7 +161,7 @@ procSuite "Utp router unit tests":
outgoingSocket.isConnected() outgoingSocket.isConnected()
router.len() == 1 router.len() == 1
await outgoingSocket.closeWait() await outgoingSocket.destroyWait()
check: check:
not outgoingSocket.isConnected() not outgoingSocket.isConnected()

View File

@ -7,11 +7,12 @@
{.used.} {.used.}
import import
std/[algorithm, random], std/[algorithm, random, sequtils],
chronos, bearssl, chronicles, chronos, bearssl, chronicles,
testutils/unittests, testutils/unittests,
./test_utils, ./test_utils,
../../eth/utp/utp_router, ../../eth/utp/utp_router,
../../eth/utp/utp_socket,
../../eth/utp/packets, ../../eth/utp/packets,
../../eth/keys ../../eth/keys
@ -252,7 +253,7 @@ procSuite "Utp socket unit test":
let bytesWritten = await outgoingSocket.write(dataToWrite) let bytesWritten = await outgoingSocket.write(dataToWrite)
check: check:
bytesWritten == len(dataToWrite) bytesWritten.get() == len(dataToWrite)
let sentPacket = await q.get() let sentPacket = await q.get()
@ -294,7 +295,7 @@ procSuite "Utp socket unit test":
let bytesWritten = await outgoingSocket.write(dataToWrite) let bytesWritten = await outgoingSocket.write(dataToWrite)
check: check:
bytesWritten == len(dataToWrite) bytesWritten.get() == len(dataToWrite)
let sentPacket = await q.get() let sentPacket = await q.get()
@ -326,3 +327,171 @@ procSuite "Utp socket unit test":
check: check:
not outgoingSocket.isConnected() not outgoingSocket.isConnected()
len(q) == 0 len(q) == 0
asyncTest "Processing in order fin should make socket reach eof and ack this packet":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let finP = finPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize)
await outgoingSocket.processPacket(finP)
let ack1 = await q.get()
check:
ack1.header.pType == ST_STATE
outgoingSocket.atEof()
asyncTest "Processing out of order fin should buffer it until receiving all remaining packets":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let data = @[1'u8, 2'u8, 3'u8]
let data1 = @[4'u8, 5'u8, 6'u8]
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let readF = outgoingSocket.read()
let dataP = dataPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data)
let dataP1 = dataPacket(initialRemoteSeq + 1, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data1)
let finP = finPacket(initialRemoteSeq + 2, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize)
await outgoingSocket.processPacket(finP)
check:
not readF.finished()
not outgoingSocket.atEof()
await outgoingSocket.processPacket(dataP1)
check:
not readF.finished()
not outgoingSocket.atEof()
await outgoingSocket.processPacket(dataP)
let bytes = await readF
check:
readF.finished()
outgoingSocket.atEof()
bytes == concat(data, data1)
asyncTest "Socket should ignore data past eof packet":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let data = @[1'u8, 2'u8, 3'u8]
let data1 = @[4'u8, 5'u8, 6'u8]
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let readF = outgoingSocket.read()
let dataP = dataPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data)
let finP = finPacket(initialRemoteSeq + 1, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize)
# dataP1 has seqNr larger than fin, there fore it should be considered past eof and never passed
# to user of library
let dataP1 = dataPacket(initialRemoteSeq + 2, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data1)
await outgoingSocket.processPacket(finP)
check:
not readF.finished()
not outgoingSocket.atEof()
# it is out of order dataP1 (as we still not processed dataP packet)
await outgoingSocket.processPacket(dataP1)
check:
not readF.finished()
not outgoingSocket.atEof()
await outgoingSocket.processPacket(dataP)
# it is in order dataP1, as we have now processed dataP + fin which came before
# but it is past eof so it should be ignored
await outgoingSocket.processPacket(dataP1)
let bytes = await readF
check:
readF.finished()
outgoingSocket.atEof()
bytes == concat(data)
asyncTest "Calling close should send fin packet":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
await outgoingSocket.close()
let sendFin = await q.get()
check:
sendFin.header.pType == ST_FIN
asyncTest "Receiving ack for fin packet should destroy socket":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let closeF = outgoingSocket.close()
let sendFin = await q.get()
check:
sendFin.header.pType == ST_FIN
let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, sendFin.header.seqNr, testBufferSize)
await outgoingSocket.processPacket(responseAck)
await closeF
check:
not outgoingSocket.isConnected()
asyncTest "Trying to write data onto closed socket should return error":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
await outgoingSocket.destroyWait()
let writeResult = await outgoingSocket.write(@[1'u8])
check:
writeResult.isErr()
let error = writeResult.error()
check:
error.kind == SocketNotWriteable
error.currentState == Destroy
asyncTest "Trying to write data onto closed socket which sent fin":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
await outgoingSocket.close()
let writeResult = await outgoingSocket.write(@[1'u8])
check:
writeResult.isErr()
let error = writeResult.error()
check:
error.kind == FinSent