Make initial state of socket configurable (#428)

* Make initial state of socket configurable
This commit is contained in:
KonradStaniec 2021-11-19 11:36:46 +01:00 committed by GitHub
parent d5e5ec9f90
commit ce296ff76e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 228 additions and 42 deletions

View File

@ -155,15 +155,11 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
if (r.shouldAllowConnection(sender, p.header.connectionId)):
notice "Received SYN for not known connection. Initiating incoming connection"
# Initial ackNr is set to incoming packer seqNr
let incomingSocket = initIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[])
let incomingSocket = newIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[])
r.registerUtpSocket(incomingSocket)
await incomingSocket.startIncomingSocket()
# TODO By default (when we have utp over udp) socket here is passed to upper layer
# in SynRecv state, which is not writeable i.e user of socket cannot write
# data to it unless some data will be received. This is counter measure to
# amplification attacks.
# During integration with discovery v5 (i.e utp over discovv5), we must re-think
# this.
# Based on configuration, socket is passed to upper layer either in SynRecv
# or Connected state
asyncSpawn r.acceptConnection(r, incomingSocket)
else:
notice "Connection declined"
@ -194,7 +190,7 @@ proc generateNewUniqueSocket[A](r: UtpRouter[A], address: A): Option[UtpSocket[A
while tryCount < maxSocketGenerationTries:
let rcvId = randUint16(r.rng[])
let socket = initOutgoingSocket[A](address, r.sendCb, r.socketConfig, rcvId, r.rng[])
let socket = newOutgoingSocket[A](address, r.sendCb, r.socketConfig, rcvId, r.rng[])
if r.registerIfAbsent(socket):
return some(socket)
@ -236,7 +232,7 @@ proc connectTo*[A](r: UtpRouter[A], address: A): Future[ConnectionResult[A]] {.a
# Connect to provided address with provided connection id, if socket with this id
# and address already exsits return error
proc connectTo*[A](r: UtpRouter[A], address: A, connectionId: uint16): Future[ConnectionResult[A]] {.async.} =
let socket = initOutgoingSocket[A](address, r.sendCb, r.socketConfig, connectionId, r.rng[])
let socket = newOutgoingSocket[A](address, r.sendCb, r.socketConfig, connectionId, r.rng[])
if (r.registerIfAbsent(socket)):
return await socket.connect()

View File

@ -57,6 +57,13 @@ type
# Maximnal size of receive buffer in bytes
optRcvBuffer*: uint32
# If set to some(`Duration`), the incoming socket will be initialized in
# `SynRecv` state and the remote peer will have `Duration` to transfer data
# to move the socket in `Connected` state.
# If set to none, the incoming socket will immediately be set to `Connected`
# state and will be able to transfer data.
incomingSocketReceiveTimeout*: Option[Duration]
UtpSocket*[A] = ref object
remoteAddress*: A
state: ConnectionState
@ -187,8 +194,8 @@ const
defaultInitialSynTimeout = milliseconds(3000)
# Initial timeout to receive first Data data packet after receiving initial Syn
# packet. (TODO it should only be set when working over udp)
initialRcvRetransmitTimeout = milliseconds(10000)
# packet.
defaultRcvRetransmitTimeout = milliseconds(10000)
# Number of times each data packet will be resend before declaring connection
# dead. 4 is taken from reference implementation
@ -232,12 +239,14 @@ proc init*(
T: type SocketConfig,
initialSynTimeout: Duration = defaultInitialSynTimeout,
dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure,
optRcvBuffer: uint32 = defaultOptRcvBuffer
optRcvBuffer: uint32 = defaultOptRcvBuffer,
incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout)
): T =
SocketConfig(
initialSynTimeout: initialSynTimeout,
dataResendsBeforeFailure: dataResendsBeforeFailure,
optRcvBuffer: optRcvBuffer
optRcvBuffer: optRcvBuffer,
incomingSocketReceiveTimeout: incomingSocketReceiveTimeout
)
proc getRcvWindowSize(socket: UtpSocket): uint32 =
@ -325,15 +334,13 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
# timeouts calculations
# client initiated connections, but did not send following data packet in rto
# time. TODO this should be configurable
# time and our socket is configured to start in SynRecv state.
if (socket.state == SynRecv):
socket.destroy()
return
if socket.shouldDisconnectFromFailedRemote():
if socket.state == SynSent and (not socket.connectionFuture.finished()):
# TODO standard stream interface result in failed future in case of failed connections,
# but maybe it would be more clean to use result
socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out"))
socket.destroy()
@ -367,7 +374,7 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
# TODO add sending keep alives when necessary
proc checkTimeoutsLoop(s: UtpSocket) {.async.} =
## Loop that check timeoutsin the socket.
## Loop that check timeouts in the socket.
try:
while true:
await sleepAsync(checkTimeoutsLoopInterval)
@ -388,14 +395,9 @@ proc new[A](
rcvId: uint16,
sndId: uint16,
initialSeqNr: uint16,
initialAckNr: uint16
initialAckNr: uint16,
initialTimeout: Duration
): T =
let initialTimeout =
if direction == Outgoing:
cfg.initialSynTimeout
else :
initialRcvRetransmitTimeout
T(
remoteAddress: to,
state: state,
@ -421,7 +423,7 @@ proc new[A](
send: snd
)
proc initOutgoingSocket*[A](
proc newOutgoingSocket*[A](
to: A,
snd: SendCallback[A],
cfg: SocketConfig,
@ -441,10 +443,11 @@ proc initOutgoingSocket*[A](
sndConnectionId,
initialSeqNr,
# Initialy ack nr is 0, as we do not know remote inital seqnr
0
0,
cfg.initialSynTimeout
)
proc initIncomingSocket*[A](
proc newIncomingSocket*[A](
to: A,
snd: SendCallback[A],
cfg: SocketConfig,
@ -454,16 +457,27 @@ proc initIncomingSocket*[A](
): UtpSocket[A] =
let initialSeqNr = randUint16(rng)
let (initialState, initialTimeout) =
if (cfg.incomingSocketReceiveTimeout.isNone()):
# it does not matter what timeout value we put here, as socket will be in
# connected state without outgoing packets in buffer so any timeout hit will
# just double rto without any penalties
(Connected, milliseconds(0))
else:
let timeout = cfg.incomingSocketReceiveTimeout.unsafeGet()
(SynRecv, timeout)
UtpSocket[A].new(
to,
snd,
SynRecv,
initialState,
cfg,
Incoming,
connectionId + 1,
connectionId,
initialSeqNr,
ackNr
ackNr,
initialTimeout
)
proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} =
@ -479,7 +493,6 @@ proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} =
await socket.connectionFuture
proc startIncomingSocket*(socket: UtpSocket) {.async.} =
doAssert(socket.state == SynRecv)
# Make sure ack was flushed before moving forward
await socket.sendAck()
socket.startTimeoutLoop()
@ -487,6 +500,9 @@ proc startIncomingSocket*(socket: UtpSocket) {.async.} =
proc isConnected*(socket: UtpSocket): bool =
socket.state == Connected or socket.state == ConnectedFull
proc isClosed*(socket: UtpSocket): bool =
socket.state == Destroy and socket.closeEvent.isSet()
proc destroy*(s: UtpSocket) =
## Moves socket to destroy state and clean all reasources.
## Remote is not notified in any way about socket end of life
@ -679,7 +695,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
# To avoid amplification attacks, server socket is in SynRecv state until
# it receices first data transfer
# https://www.usenix.org/system/files/conference/woot15/woot15-paper-adamsky.pdf
# TODO when intgrating with discv5 this need to be configurable
# Socket is in SynRecv state only when recv timeout is configured
if (socket.state == SynRecv and p.header.pType == ST_DATA):
socket.state = Connected
@ -772,11 +788,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
# In case of SynSent complate the future as last thing to make sure user of libray will
# receive socket in correct state
socket.connectionFuture.complete()
# TODO to finish handhske we should respond with ST_DATA packet, without it
# socket is left in half-open state.
# Actual reference implementation waits for user to send data, as it assumes
# existence of application level handshake over utp. We may need to modify this
# to automaticly send ST_DATA .
of ST_RESET:
notice "Received ST_RESET on known socket, ignoring"
of ST_SYN:

View File

@ -81,11 +81,16 @@ procSuite "Utp protocol over discovery v5 tests":
let clientSocketResult = await utp1.connectTo(node2.localNode)
let clientSocket = clientSocketResult.get()
let serverSocket = await queue.get()
check:
clientSocket.isConnected()
# in this test we do not configure the socket to be connected just after
# accepting incoming connection
not serverSocket.isConnected()
await clientSocket.destroyWait()
await serverSocket.destroyWait()
await node1.closeWait()
await node2.closeWait()
@ -169,5 +174,53 @@ procSuite "Utp protocol over discovery v5 tests":
clientSocket.connectionId() == allowedId
serverSocket.connectionId() == allowedId
await clientSocket.destroyWait()
await serverSocket.destroyWait()
await node1.closeWait()
await node2.closeWait()
asyncTest "Configure incoming connections to be in connected state":
let
queue = newAsyncQueue[UtpSocket[Node]]()
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue))
utp2 = UtpDiscv5Protocol.new(
node2,
utpProtId,
registerIncomingSocketCallback(queue),
SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]())
)
# nodes must know about each other
check:
node1.addNode(node2.localNode)
node2.addNode(node1.localNode)
let clientSocketResult = await utp1.connectTo(node2.localNode)
let clientSocket = clientSocketResult.get()
let serverSocket = await queue.get()
check:
clientSocket.isConnected()
serverSocket.isConnected()
let serverData = @[1'u8]
let wResult = await serverSocket.write(serverData)
check:
wResult.isOk()
let readData = await clientSocket.read(len(serverData))
check:
readData == serverData
await clientSocket.destroyWait()
await serverSocket.destroyWait()
await node1.closeWait()
await node2.closeWait()

View File

@ -7,7 +7,7 @@
{.used.}
import
std/hashes,
std/[hashes, options],
chronos, bearssl, chronicles,
testutils/unittests,
./test_utils,
@ -89,6 +89,92 @@ procSuite "Utp router unit tests":
check:
router.len() == 1
asyncTest "Incoming connection should be closed when not receving data for period of time when configured":
let q = newAsyncQueue[UtpSocket[int]]()
let router =
UtpRouter[int].new(
registerIncomingSocketCallback(q),
SocketConfig.init(incomingSocketReceiveTimeout = some(seconds(2))),
rng
)
router.sendCb = testSend
let encodedSyn = encodePacket(synPacket(10, 10, 10))
await router.processIncomingBytes(encodedSyn, testSender)
let socket = await q.get()
check:
router.len() == 1
# socket is not configured to be connected until receiving data
not socket.isConnected()
await waitUntil(proc (): bool = socket.isClosed())
check:
router.len() == 0
asyncTest "Incoming connection should be in connected state when configured":
let q = newAsyncQueue[UtpSocket[int]]()
let router =
UtpRouter[int].new(
registerIncomingSocketCallback(q),
SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]()),
rng
)
router.sendCb = testSend
let encodedSyn = encodePacket(synPacket(10, 10, 10))
await router.processIncomingBytes(encodedSyn, testSender)
let socket = await q.get()
check:
router.len() == 1
socket.isConnected()
# wait a while to trigger timeout and check that socket is still connected
await sleepAsync(seconds(3))
check:
router.len() == 1
socket.isConnected()
asyncTest "Incoming connection should change state to connected when receiving data packet":
let q = newAsyncQueue[UtpSocket[int]]()
let pq = newAsyncQueue[(Packet, int)]()
let router =
UtpRouter[int].new(
registerIncomingSocketCallback(q),
SocketConfig.init(incomingSocketReceiveTimeout = some(seconds(3))),
rng
)
router.sendCb = initTestSnd(pq)
let dataToSend = @[1'u8]
let initSeq: uint16 = 10
let initConnId: uint16 = 10
let encodedSyn = encodePacket(synPacket(initSeq, initConnId, 10))
await router.processIncomingBytes(encodedSyn, testSender)
let (initialPacket, _) = await pq.get()
let socket = await q.get()
check:
router.len() == 1
# socket is not configured to be connected until receiving data
not socket.isConnected()
let encodedData = encodePacket(dataPacket(initSeq + 1, initConnId + 1, initialPacket.header.seqNr - 1, 10, dataToSend))
await router.processIncomingBytes(encodedData, testSender)
check:
socket.isConnected()
asyncTest "Router should create new incoming socket when receiving same syn packet from diffrent sender":
let q = newAsyncQueue[UtpSocket[int]]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)

View File

@ -62,7 +62,7 @@ procSuite "Utp socket unit test":
initialRemoteSeq: uint16,
q: AsyncQueue[Packet],
cfg: SocketConfig = SocketConfig.init()): (UtpSocket[TransportAddress], Packet) =
let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), cfg, defaultRcvOutgoingId, rng[])
let sock1 = newOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), cfg, defaultRcvOutgoingId, rng[])
asyncSpawn sock1.startOutgoingSocket()
let initialPacket = await q.get()
@ -81,7 +81,7 @@ procSuite "Utp socket unit test":
asyncTest "Starting outgoing socket should send Syn packet":
let q = newAsyncQueue[Packet]()
let defaultConfig = SocketConfig.init()
let sock1 = initOutgoingSocket[TransportAddress](
let sock1 = newOutgoingSocket[TransportAddress](
testAddress,
initTestSnd(q),
defaultConfig,
@ -95,11 +95,12 @@ procSuite "Utp socket unit test":
initialPacket.header.pType == ST_SYN
initialPacket.header.wndSize == defaultConfig.optRcvBuffer
await sock1.destroyWait()
fut1.cancel()
asyncTest "Outgoing socket should re-send syn packet 2 times before declaring failure":
let q = newAsyncQueue[Packet]()
let sock1 = initOutgoingSocket[TransportAddress](
let sock1 = newOutgoingSocket[TransportAddress](
testAddress,
initTestSnd(q),
SocketConfig.init(milliseconds(100)),
@ -128,13 +129,19 @@ procSuite "Utp socket unit test":
check:
not sock1.isConnected()
await sock1.destroyWait()
fut1.cancel()
asyncTest "Processing in order ack should make socket connected":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
discard connectOutGoingSocket(initialRemoteSeq, q)
let (sock1, packet) = connectOutGoingSocket(initialRemoteSeq, q)
check:
sock1.isConnected()
await sock1.destroyWait()
asyncTest "Processing in order data packet should upload it to buffer and ack packet":
let q = newAsyncQueue[Packet]()
@ -157,6 +164,8 @@ procSuite "Utp socket unit test":
check:
receivedBytes == data
await outgoingSocket.destroyWait()
asyncTest "Processing out of order data packet should buffer it until receiving in order one":
# TODO test is valid until implementing selective acks
let q = newAsyncQueue[Packet]()
@ -185,6 +194,8 @@ procSuite "Utp socket unit test":
check:
receivedData == data
await outgoingSocket.destroyWait()
asyncTest "Processing out of order data packet should ignore duplicated not ordered packets":
# TODO test is valid until implementing selective acks
@ -218,6 +229,8 @@ procSuite "Utp socket unit test":
check:
receivedData == data
await outgoingSocket.destroyWait()
asyncTest "Processing packets in random order":
# TODO test is valid until implementing selective acks
@ -244,6 +257,8 @@ procSuite "Utp socket unit test":
# as they can be fired at any point. What matters is that data is passed
# in same order as received.
receivedData == data
await outgoingSocket.destroyWait()
asyncTest "Ignoring totally out of order packet":
# TODO test is valid until implementing selective acks
@ -264,6 +279,8 @@ procSuite "Utp socket unit test":
check:
outgoingSocket.numPacketsInReordedBuffer() == 1
await outgoingSocket.destroyWait()
asyncTest "Writing small enough data should produce 1 data packet":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
@ -294,11 +311,13 @@ procSuite "Utp socket unit test":
check:
outgoingSocket.numPacketsInOutGoingBuffer() == 0
await outgoingSocket.destroyWait()
asyncTest "Socket should re-send data packet configurable number of times before declaring failure":
let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16
let outgoingSocket = initOutgoingSocket[TransportAddress](
let outgoingSocket = newOutgoingSocket[TransportAddress](
testAddress,
initTestSnd(q),
SocketConfig.init(milliseconds(3000), 2),
@ -358,6 +377,8 @@ procSuite "Utp socket unit test":
not outgoingSocket.isConnected()
len(q) == 0
await outgoingSocket.destroyWait()
asyncTest "Processing in order fin should make socket reach eof and ack this packet":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
@ -373,6 +394,8 @@ procSuite "Utp socket unit test":
ack1.header.pType == ST_STATE
outgoingSocket.atEof()
await outgoingSocket.destroyWait()
asyncTest "Processing out of order fin should buffer it until receiving all remaining packets":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
@ -409,6 +432,8 @@ procSuite "Utp socket unit test":
outgoingSocket.atEof()
bytes == concat(data, data1)
await outgoingSocket.destroyWait()
asyncTest "Socket should ignore data past eof packet":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
@ -453,6 +478,8 @@ procSuite "Utp socket unit test":
outgoingSocket.atEof()
bytes == concat(data)
await outgoingSocket.destroyWait()
asyncTest "Calling close should send fin packet":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
@ -466,6 +493,8 @@ procSuite "Utp socket unit test":
check:
sendFin.header.pType == ST_FIN
await outgoingSocket.destroyWait()
asyncTest "Receiving ack for fin packet should destroy socket":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
@ -487,6 +516,8 @@ procSuite "Utp socket unit test":
check:
not outgoingSocket.isConnected()
await outgoingSocket.destroyWait()
asyncTest "Trying to write data onto closed socket should return error":
let q = newAsyncQueue[Packet]()
@ -525,6 +556,8 @@ procSuite "Utp socket unit test":
check:
error.kind == FinSent
await outgoingSocket.destroyWait()
asyncTest "Processing data packet should update window size accordingly and use it in all send packets":
let q = newAsyncQueue[Packet]()
let initialRemoteSeqNr = 10'u16
@ -560,6 +593,8 @@ procSuite "Utp socket unit test":
sentFin.header.pType == ST_FIN
sentFin.header.wndSize == initialRcvBufferSize - uint32(len(data))
await outgoingSocket.destroyWait()
asyncTest "Reading data from the buffer shoud increase receive window":
let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16
@ -593,6 +628,8 @@ procSuite "Utp socket unit test":
# we have read all data from rcv buffer, advertised window should go back to
# initial size
sentData.header.wndSize == initialRcvBufferSize
await outgoingSocket.destroyWait()
asyncTest "Socket should ignore packets with bad ack number":
let q = newAsyncQueue[Packet]()
@ -619,3 +656,5 @@ procSuite "Utp socket unit test":
check:
# data1 and data2 were sent in bad packets we should only receive data3
receivedBytes == data3
await outgoingSocket.destroyWait()