Add sending and receiving data procedures (#407)

* Add sending and receiving data procedures
This commit is contained in:
KonradStaniec 2021-10-19 13:36:57 +02:00 committed by GitHub
parent f101c83626
commit 88795c6477
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 404 additions and 33 deletions

View File

@ -54,7 +54,7 @@ type
# For now we can use basic monotime, later it would be good to analyze:
# https://github.com/bittorrent/libutp/blob/master/utp_utils.cpp, to check all the
# timing assumptions on different platforms
proc getMonoTimeTimeStamp(): uint32 =
proc getMonoTimeTimeStamp*(): uint32 =
let time = getMonoTime()
cast[uint32](time.ticks() div 1000)
@ -170,3 +170,21 @@ proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSiz
)
Packet(header: h, payload: @[])
proc dataPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32, payload: seq[byte]): Packet =
let h = PacketHeaderV1(
pType: ST_DATA,
version: protocolVersion,
# data packets always have extension field set to 0
extension: 0'u8,
connectionId: sndConnectionId,
timestamp: getMonoTimeTimeStamp(),
# TODO for not we are using 0, but this value should be calculated on socket
# level
timestampDiff: 0'u32,
wndSize: bufferSize,
seqNr: seqNr,
ackNr: ackNr
)
Packet(header: h, payload: payload)

View File

@ -7,7 +7,7 @@
{.push raises: [Defect].}
import
chronos,
chronos, stew/byteutils,
./utp_protocol
# Exemple application to interact with reference implementation server to help with implementation
@ -17,13 +17,28 @@ import
# 3. make
# 4. ./ucat -ddddd -l -p 9078 - it will run utp server on port 9078
when isMainModule:
proc echoIncomingSocketCallBack(): AcceptConnectionCallback =
return (
proc (server: UtpProtocol, client: UtpSocket): Future[void] {.gcsafe, raises: [Defect].} =
echo "received incoming connection"
let fakeFuture = newFuture[void]()
fakeFuture.complete()
return fakeFuture
)
# TODO read client/server ports and address from cmd line or config file
let localAddress = initTAddress("0.0.0.0", 9077)
let utpProt = UtpProtocol.new(localAddress)
let utpProt = UtpProtocol.new(echoIncomingSocketCallBack(), localAddress)
let remoteServer = initTAddress("127.0.0.1", 9078)
let soc = waitFor utpProt.connectTo(remoteServer)
doAssert(soc.numPacketsInOutGoingBuffer() == 0)
let helloUtp = "Helllo from nim implementation"
let bytes = helloUtp.toBytes()
waitFor soc.write(bytes)
runForever()
waitFor utpProt.closeWait()

View File

@ -60,6 +60,11 @@ type
# incoming buffer for out of order packets
inBuffer: GrowableCircularBuffer[Packet]
# rcvBuffer
buffer: AsyncBuffer
utpProt: UtpProtocol
UtpSocketsContainerRef = ref object
sockets: Table[UtpSocketKey, UtpSocket]
@ -72,8 +77,22 @@ type
UtpProtocol* = ref object
transport: DatagramTransport
activeSockets: UtpSocketsContainerRef
acceptConnectionCb: AcceptConnectionCallback
rng*: ref BrHmacDrbgContext
## New remote client connection callback
## ``server`` - UtpProtocol object.
## ``client`` - accepted client utp socket.
AcceptConnectionCallback* = proc(server: UtpProtocol,
client: UtpSocket): Future[void] {.gcsafe, raises: [Defect].}
const
# Maximal number of payload bytes per packet. Total packet size will be equal to
# mtuSize + sizeof(header) = 600 bytes
# TODO for now it is just some random value. Ultimatly this value should be dynamically
# adjusted based on traffic.
mtuSize = 580
proc new(T: type UtpSocketsContainerRef): T =
UtpSocketsContainerRef(sockets: initTable[UtpSocketKey, UtpSocket]())
@ -124,7 +143,7 @@ proc registerUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey, socket: UtpSo
# TODO Handle duplicates
s.sockets[k] = socket
proc initOutgoingSocket(to: TransportAddress, rng: var BrHmacDrbgContext): UtpSocket =
proc initOutgoingSocket(to: TransportAddress, p: UtpProtocol, rng: var BrHmacDrbgContext): UtpSocket =
# TODO handle possible clashes and overflows
let rcvConnectionId = randUint16(rng)
let sndConnectionId = rcvConnectionId + 1
@ -137,10 +156,14 @@ proc initOutgoingSocket(to: TransportAddress, rng: var BrHmacDrbgContext): UtpSo
seqNr: initialSeqNr,
connectionFuture: newFuture[UtpSocket](),
outBuffer: GrowableCircularBuffer[Packet].init(),
inBuffer: GrowableCircularBuffer[Packet].init()
inBuffer: GrowableCircularBuffer[Packet].init(),
# Default 1MB buffer
# TODO add posibility to configure buffer size
buffer: AsyncBuffer.init(1024 * 1024),
utpProt: p
)
proc initIncomingSocket(to: TransportAddress, connectionId: uint16, ackNr: uint16, rng: var BrHmacDrbgContext): UtpSocket =
proc initIncomingSocket(to: TransportAddress, p: UtpProtocol, connectionId: uint16, ackNr: uint16, rng: var BrHmacDrbgContext): UtpSocket =
let initialSeqNr = randUint16(rng)
UtpSocket(
remoteAddress: to,
@ -151,10 +174,15 @@ proc initIncomingSocket(to: TransportAddress, connectionId: uint16, ackNr: uint1
ackNr: ackNr,
connectionFuture: newFuture[UtpSocket](),
outBuffer: GrowableCircularBuffer[Packet].init(),
inBuffer: GrowableCircularBuffer[Packet].init()
inBuffer: GrowableCircularBuffer[Packet].init(),
# Default 1MB buffer
# TODO add posibility to configure buffer size
buffer: AsyncBuffer.init(1024 * 1024),
utpProt: p
)
proc getAckPacket(socket: UtpSocket): Packet =
proc createAckPacket(socket: UtpSocket): Packet =
## Creates ack packet based on the socket current state
ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576)
proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult =
@ -202,6 +230,17 @@ proc initSynPacket(socket: UtpSocket): seq[byte] =
proc isConnected*(socket: UtpSocket): bool =
socket.state == Connected
template readLoop(body: untyped): untyped =
while true:
# TODO error handling
let (consumed, done) = body
socket.buffer.shift(consumed)
if done:
break
else:
# TODO add condition to handle socket closing
await socket.buffer.wait()
# Check how many packets are still in the out going buffer, usefull for tests or
# debugging.
# It throws assertion error when number of elements in buffer do not equal kept counter
@ -213,20 +252,119 @@ proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int =
assert(num == int(socket.curWindowPackets))
num
# TODO not implemented
# for now just log incoming packets
proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) =
proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] =
socket.utpProt.transport.sendTo(socket.remoteAddress, data)
proc sendPacket(socket: UtpSocket, packet: Packet): Future[void] =
socket.sendData(encodePacket(packet))
proc flushPackets(socket: UtpSocket) {.async.} =
var i: uint16 = socket.seqNr - socket.curWindowPackets
while i != socket.seqNr:
let maybePacket = socket.outBuffer.get(i)
if (maybePacket.isSome()):
let p = maybePacket.get()
# TODO we should keep encoded packets in outgoing buffer to avoid, re-encoding
# them with each resend
await socket.sendData(encodePacket(p))
inc i
proc getPacketSize(socket: UtpSocket): int =
# TODO currently returning constant, ultimatly it should be bases on mtu estimates
mtuSize
proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} =
var bytesWritten = 0
# TODO
# Handle different socket state i.e do not write when socket is full or not
# connected
# Handle growing of send window
if len(data) == 0:
return bytesWritten
let pSize = socket.getPacketSize()
let endIndex = data.high()
var i = 0
while i <= data.high:
let lastIndex = i + pSize - 1
let lastOrEnd = min(lastIndex, endIndex)
let dataSlice = data[i..lastOrEnd]
let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576, dataSlice)
socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets)
socket.outBuffer.put(socket.seqNr, dataPacket)
inc socket.seqNr
inc socket.curWindowPackets
bytesWritten = bytesWritten + len(dataSlice)
i = lastOrEnd + 1
await socket.flushPackets()
return bytesWritten
proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}=
## Read all bytes `n` bytes from socket ``socket``.
##
## This procedure allocates buffer seq[byte] and return it as result.
var bytes = newSeq[byte]()
if n == 0:
return bytes
readLoop():
# TODO Add handling of socket closing
let count = min(socket.buffer.dataLen(), n - len(bytes))
bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1))
(count, len(bytes) == n)
return bytes
proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.async.}=
notice "Received packet ", packet = p
let socketKey = UtpSocketKey.init(sender, p.header.connectionId)
let maybeSocket = prot.activeSockets.getUtpSocket(socketKey)
let pkSeqNr = p.header.seqNr
let pkAckNr = p.header.ackNr
if (maybeSocket.isSome()):
let socket = maybeSocket.unsafeGet()
case p.header.pType
of ST_DATA:
# TODO not implemented
# To avoid amplification attacks, server socket is in SynRecv state until
# it receices first data transfer
# https://www.usenix.org/system/files/conference/woot15/woot15-paper-adamsky.pdf
# TODO when intgrating with discv5 this need to be configurable
if (socket.state == SynRecv):
socket.state = Connected
notice "Received ST_DATA on known socket"
# number of packets past the expected
# ack_nr is the last acked, seq_nr is the
# current. Subtracring 1 makes 0 mean "this is the next expected packet"
let pastExpected = pkSeqNr - socket.ackNr - 1
if (pastExpected == 0):
# we are getting in order data packet, we can flush data directly to the incoming buffer
await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len())
# TODO handle the case when there may be some packets in incoming buffer which
# are direct extension of this packet and therefore we could pass also their
# content to upper layer. This may need to be done when handling selective
# acks.
# Bytes have been passed to upper layer, we can increase number of last
# acked packet
inc socket.ackNr
# TODO for now we just schedule concurrent task with ack sending. It may
# need improvement, as with this approach there is no direct control over
# how many concurrent tasks there are and how to cancel them when socket
# is closed
let ack = socket.createAckPacket()
asyncSpawn socket.sendPacket(ack)
else:
# TODO handle out of order packets
notice "Got out of order packet"
of ST_FIN:
# TODO not implemented
notice "Received ST_FIN on known socket"
@ -246,12 +384,6 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) =
# In case of SynSent complate the future as last thing to make sure user of libray will
# receive socket in correct state
socket.connectionFuture.complete(socket)
# number of packets past the expected
# ack_nr is the last acked, seq_nr is the
# current. Subtracring 1 makes 0 mean "this is the next expected packet"
let pastExpected = pkSeqNr - socket.ackNr - 1
# TODO to finish handhske we should respond with ST_DATA packet, without it
# socket is left in half-open state.
# Actual reference implementation waits for user to send data, as it assumes
@ -268,11 +400,18 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) =
# SynPacket we should reject it and send rst packet to sender in some cases
if (p.header.pType == ST_SYN):
# Initial ackNr is set to incoming packer seqNr
let incomingSocket = initIncomingSocket(sender, p.header.connectionId, p.header.seqNr, prot.rng[])
let incomingSocket = initIncomingSocket(sender, prot, p.header.connectionId, p.header.seqNr, prot.rng[])
prot.activeSockets.registerUtpSocket(incomingSocket.getSocketKey(), incomingSocket)
let encodedAck= encodePacket(incomingSocket.getAckPacket())
# TODO sending should be done from UtpSocket context
discard prot.transport.sendTo(sender, encodedAck)
# Make sure ack was flushed onto datagram socket before passing connction
# to upper layer
await incomingSocket.sendPacket(incomingSocket.createAckPacket())
# TODO By default (when we have utp over udp) socket here is passed to upper layer
# in SynRecv state, which is not writeable i.e user of socket cannot write
# data to it unless some data will be received. This is counter measure to
# amplification attacks.
# During integration with discovery v5 (i.e utp over discovv5), we must re-think
# this.
asyncSpawn prot.acceptConnectionCb(prot, incomingSocket)
notice "Received ST_SYN and socket is not known"
else:
# TODO not implemented
@ -282,14 +421,14 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) =
# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732
# TODO not implemented
proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] =
let socket = initOutgoingSocket(address, p.rng[])
let socket = initOutgoingSocket(address, p, p.rng[])
p.activeSockets.registerUtpSocket(socket.getSocketKey(), socket)
let synEncoded = socket.initSynPacket()
notice "Sending packet", packet = synEncoded
# TODO add callback to handle errors and cancellation i.e unregister socket on
# send error and finish connection future with failure
# sending should be done from UtpSocketContext
discard p.transport.sendTo(address, synEncoded)
discard socket.sendData(synEncoded)
return socket.connectionFuture
proc processDatagram(transp: DatagramTransport, raddr: TransportAddress):
@ -303,13 +442,18 @@ proc processDatagram(transp: DatagramTransport, raddr: TransportAddress):
let dec = decodePacket(buf)
if (dec.isOk()):
processPacket(utpProt, dec.get(), raddr)
await processPacket(utpProt, dec.get(), raddr)
else:
warn "failed to decode packet from address", address = raddr
proc new*(T: type UtpProtocol, address: TransportAddress, rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} =
proc new*(
T: type UtpProtocol,
acceptConnectionCb: AcceptConnectionCallback,
address: TransportAddress,
rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} =
doAssert(not(isNil(acceptConnectionCb)))
let activeSockets = UtpSocketsContainerRef.new()
let utp = UtpProtocol(activeSockets: activeSockets, rng: rng)
let utp = UtpProtocol(activeSockets: activeSockets, acceptConnectionCb: acceptConnectionCb, rng: rng)
let ta = newDatagramTransport(processDatagram, udata = utp, local = address)
utp.transport = ta
utp

View File

@ -7,28 +7,222 @@
{.used.}
import
chronos,
sequtils,
chronos, bearssl,
testutils/unittests,
../../eth/utp/utp_protocol,
../../eth/keys
proc generateByteArray(rng: var BrHmacDrbgContext, length: int): seq[byte] =
var bytes = newSeq[byte](length)
brHmacDrbgGenerate(rng, bytes)
return bytes
type AssertionCallback = proc(): bool {.gcsafe, raises: [Defect].}
proc waitUntil(f: AssertionCallback): Future[void] {.async.} =
while true:
let res = f()
if res:
break
else:
await sleepAsync(milliseconds(50))
proc transferData(sender: UtpSocket, receiver: UtpSocket, data: seq[byte]): Future[seq[byte]] {.async.}=
let bytesWritten = await sender.write(data)
doAssert bytesWritten == len(data)
let received = await receiver.read(len(data))
return received
procSuite "Utp protocol tests":
let rng = newRng()
asyncTest "Success connect to remote host":
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(address)
proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback =
return (
proc(server: UtpProtocol, client: UtpSocket): Future[void] =
let fut = newFuture[void]()
event.fire()
fut.complete()
fut
)
proc setIncomingSocketCallback(socketPromise: Future[UtpSocket]): AcceptConnectionCallback =
return (
proc(server: UtpProtocol, client: UtpSocket): Future[void] =
let fut = newFuture[void]()
socketPromise.complete(client)
fut.complete()
fut
)
asyncTest "Success connect to remote host":
let server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address)
var server2Called = newAsyncEvent()
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt2 = UtpProtocol.new(address1)
let utpProt2 = UtpProtocol.new(setAcceptedCallback(server2Called), address1)
let sock = await utpProt1.connectTo(address1)
# this future will be completed when we called accepted connection callback
await server2Called.wait()
check:
sock.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
sock.numPacketsInOutGoingBuffer() == 0
server2Called.isSet()
await utpProt1.closeWait()
await utpProt2.closeWait()
asyncTest "Success data transfer when data fits into one packet":
var server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address)
var serverSocketFut = newFuture[UtpSocket]()
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt2 = UtpProtocol.new(setIncomingSocketCallback(serverSocketFut), address1)
let clientSocket = await utpProt1.connectTo(address1)
# this future will be completed when we called accepted connection callback
discard await serverSocketFut
let serverSocket =
try:
serverSocketFut.read()
except:
raiseAssert "Unexpected error when reading finished future"
check:
clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
clientSocket.numPacketsInOutGoingBuffer() == 0
# Server socket is not in connected state, until first data transfer
(not serverSocket.isConnected())
let bytesToTransfer = generateByteArray(rng[], 100)
let bytesReceivedFromClient = await transferData(clientSocket, serverSocket, bytesToTransfer)
check:
bytesToTransfer == bytesReceivedFromClient
serverSocket.isConnected()
let bytesReceivedFromServer = await transferData(serverSocket, clientSocket, bytesToTransfer)
check:
bytesToTransfer == bytesReceivedFromServer
await utpProt1.closeWait()
await utpProt2.closeWait()
asyncTest "Success data transfer when data need to be sliced into multiple packets":
var server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address)
var serverSocketFut = newFuture[UtpSocket]()
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt2 = UtpProtocol.new(setIncomingSocketCallback(serverSocketFut), address1)
let clientSocket = await utpProt1.connectTo(address1)
# this future will be completed when we called accepted connection callback
discard await serverSocketFut
let serverSocket =
try:
serverSocketFut.read()
except:
raiseAssert "Unexpected error when reading finished future"
check:
clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
clientSocket.numPacketsInOutGoingBuffer() == 0
(not serverSocket.isConnected())
# 5000 bytes is over maximal packet size
let bytesToTransfer = generateByteArray(rng[], 5000)
let bytesReceivedFromClient = await transferData(clientSocket, serverSocket, bytesToTransfer)
let bytesReceivedFromServer = await transferData(serverSocket, clientSocket, bytesToTransfer)
# ultimatly all send packets will acked, and outgoing buffer will be empty
await waitUntil(proc (): bool = clientSocket.numPacketsInOutGoingBuffer() == 0)
await waitUntil(proc (): bool = serverSocket.numPacketsInOutGoingBuffer() == 0)
check:
serverSocket.isConnected()
clientSocket.numPacketsInOutGoingBuffer() == 0
serverSocket.numPacketsInOutGoingBuffer() == 0
bytesReceivedFromClient == bytesToTransfer
bytesReceivedFromServer == bytesToTransfer
await utpProt1.closeWait()
await utpProt2.closeWait()
asyncTest "Success multiple data transfers when data need to be sliced into multiple packets":
var server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address)
var serverSocketFut = newFuture[UtpSocket]()
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt2 = UtpProtocol.new(setIncomingSocketCallback(serverSocketFut), address1)
let clientSocket = await utpProt1.connectTo(address1)
# this future will be completed when we called accepted connection callback
discard await serverSocketFut
let serverSocket =
try:
serverSocketFut.read()
except:
raiseAssert "Unexpected error when reading finished future"
check:
clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
clientSocket.numPacketsInOutGoingBuffer() == 0
# 5000 bytes is over maximal packet size
let bytesToTransfer = generateByteArray(rng[], 5000)
let written = await clientSocket.write(bytesToTransfer)
check:
written == len(bytesToTransfer)
let bytesToTransfer1 = generateByteArray(rng[], 5000)
let written1 = await clientSocket.write(bytesToTransfer1)
check:
written1 == len(bytesToTransfer)
let bytesReceived = await serverSocket.read(len(bytesToTransfer) + len(bytesToTransfer1))
# ultimatly all send packets will acked, and outgoing buffer will be empty
await waitUntil(proc (): bool = clientSocket.numPacketsInOutGoingBuffer() == 0)
check:
clientSocket.numPacketsInOutGoingBuffer() == 0
bytesToTransfer.concat(bytesToTransfer1) == bytesReceived
await utpProt1.closeWait()
await utpProt2.closeWait()