Add rudimentary connect function (#405)

* Add rudimentary connect function
This commit is contained in:
KonradStaniec 2021-10-11 14:16:06 +02:00 committed by GitHub
parent 5125a438db
commit 7ae287ad1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 243 additions and 21 deletions

View File

@ -70,7 +70,7 @@ task test_ssz, "Run ssz tests":
runTest("tests/ssz/all_tests") runTest("tests/ssz/all_tests")
task test_utp, "Run utp tests": task test_utp, "Run utp tests":
runTest("tests/utp/test_packets") runTest("tests/utp/all_utp_tests")
task test, "Run all tests": task test, "Run all tests":
for filename in [ for filename in [

View File

@ -130,23 +130,39 @@ proc decodePacket*(bytes: openArray[byte]): Result[Packet, string] =
# connectionId - should be random not already used number # connectionId - should be random not already used number
# bufferSize - should be pre configured initial buffer size for socket # bufferSize - should be pre configured initial buffer size for socket
proc synPacket*(rng: var BrHmacDrbgContext, connectionId: uint16, bufferSize: uint32): Packet = # SYN packets are special, and should have the receive ID in the connid field,
# instead of conn_id_send.
proc synPacket*(seqNr: uint16, rcvConnectionId: uint16, bufferSize: uint32): Packet =
let h = PacketHeaderV1( let h = PacketHeaderV1(
pType: ST_SYN, pType: ST_SYN,
version: protocolVersion, version: protocolVersion,
# TODO for we do not handle extensions # TODO for we do not handle extensions
extension: 0'u8, extension: 0'u8,
# TODO should be random not used number connectionId: rcvConnectionId,
connectionId: connectionId,
timestamp: getMonoTimeTimeStamp(), timestamp: getMonoTimeTimeStamp(),
timestampDiff: 0'u32, timestampDiff: 0'u32,
# TODO shouldbe current available buffer size
wndSize: bufferSize, wndSize: bufferSize,
seqNr: randUint16(rng), seqNr: seqNr,
# Initialy we did not receive any acks # Initialy we did not receive any acks
ackNr: 0'u16 ackNr: 0'u16
) )
Packet(header: h, payload: @[]) Packet(header: h, payload: @[])
proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32): Packet =
let h = PacketHeaderV1(
pType: ST_STATE,
version: protocolVersion,
# ack packets always have extension field set to 0
extension: 0'u8,
connectionId: sndConnectionId,
timestamp: getMonoTimeTimeStamp(),
# TODO for not we are using 0, but this value should be calculated on socket
# level
timestampDiff: 0'u32,
wndSize: bufferSize,
seqNr: seqNr,
ackNr: ackNr
)
Packet(header: h, payload: @[])

View File

@ -21,9 +21,7 @@ when isMainModule:
let localAddress = initTAddress("0.0.0.0", 9077) let localAddress = initTAddress("0.0.0.0", 9077)
let utpProt = UtpProtocol.new(localAddress) let utpProt = UtpProtocol.new(localAddress)
let remoteServer = initTAddress("0.0.0.0", 9078) let remoteServer = initTAddress("127.0.0.1", 9078)
let soc = waitFor utpProt.connectTo(remoteServer) let soc = waitFor utpProt.connectTo(remoteServer)
# Needed to wait for response from server
waitFor(sleepAsync(100))
waitFor utpProt.closeWait() waitFor utpProt.closeWait()

View File

@ -7,6 +7,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/[tables, options, hashes],
chronos, chronicles, bearssl, chronos, chronicles, bearssl,
./packets, ./packets,
../keys ../keys
@ -15,32 +16,194 @@ logScope:
topics = "utp" topics = "utp"
type type
ConnectionState = enum
Uninitialized,
Idle,
SynSent,
SynRecv,
Connected,
ConnectedFull,
Reset,
Destroy
UtpSocketKey = object
remoteAddress: TransportAddress
rcvId: uint16
UtpSocket* = ref object
remoteAddress*: TransportAddress
state: ConnectionState
# Connection id for packets we receive
connectionIdRcv: uint16
# Connection id for packets we send
connectionIdSnd: uint16
# Sequence number for the next packet to be sent.
seqNr: uint16
# All seq number up to this havve been correctly acked by us
ackNr: uint16
# Should be completed after succesful connection to remote host.
# TODO check if nim gc handles properly cyclic references, as this future will
# contain reference to socket which hold this future.
# If that is not the case, then this future will need to be hold independly
connectionFuture: Future[UtpSocket]
UtpSocketsContainerRef = ref object
sockets: Table[UtpSocketKey, UtpSocket]
# For now utp protocol is tied to udp transport, but ultimatly we would like to # For now utp protocol is tied to udp transport, but ultimatly we would like to
# abstract underlying transport to be able to run utp over udp, discoveryv5 or # abstract underlying transport to be able to run utp over udp, discoveryv5 or
# maybe some test transport # maybe some test transport
UtpProtocol* = ref object UtpProtocol* = ref object
transport: DatagramTransport transport: DatagramTransport
activeSockets: UtpSocketsContainerRef
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
UtpSocket* = ref object proc new(T: type UtpSocketsContainerRef): T =
UtpSocketsContainerRef(sockets: initTable[UtpSocketKey, UtpSocket]())
# This should probably be defined in TransportAddress module, as hash function should
# be consitent with equality function
# in nim zero arrays always have hash equal to 0, irrespectively of array size, to
# avoid clashes betweend different types of addresses, each type have mixed different
# magic number
proc hash(x: TransportAddress): Hash =
var h: Hash = 0
case x.family
of AddressFamily.None:
h = h !& 31
!$h
of AddressFamily.IPv4:
h = h !& x.address_v4.hash
h = h !& x.port.hash
h = h !& 37
!$h
of AddressFamily.IPv6:
h = h !& x.address_v6.hash
h = h !& x.port.hash
h = h !& 41
!$h
of AddressFamily.Unix:
h = h !& x.address_un.hash
h = h !& 43
!$h
# Required to use socketKey as key in hashtable
proc hash(x: UtpSocketKey): Hash =
var h = 0
h = h !& x.remoteAddress.hash
h = h !& x.rcvId.hash
!$h
proc getUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey): Option[UtpSocket] =
let s = s.sockets.getOrDefault(k)
if s == nil:
none[UtpSocket]()
else:
some(s)
proc registerUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey, socket: UtpSocket) =
# TODO Handle duplicates
s.sockets[k] = socket
proc initOutgoingSocket(to: TransportAddress, rng: var BrHmacDrbgContext): UtpSocket =
# TODO handle possible clashes and overflows
let rcvConnectionId = randUint16(rng)
let sndConnectionId = rcvConnectionId + 1
let initialSeqNr = randUint16(rng)
UtpSocket(
remoteAddress: to,
state: SynSent,
connectionIdRcv: rcvConnectionId,
connectionIdSnd: sndConnectionId,
seqNr: initialSeqNr,
connectionFuture: newFuture[UtpSocket]()
)
proc initIncomingSocket(to: TransportAddress, connectionId: uint16, ackNr: uint16, rng: var BrHmacDrbgContext): UtpSocket =
let initialSeqNr = randUint16(rng)
UtpSocket(
remoteAddress: to,
state: SynRecv,
connectionIdRcv: connectionId + 1,
connectionIdSnd: connectionId,
seqNr: initialSeqNr,
ackNr: ackNr,
connectionFuture: newFuture[UtpSocket]()
)
proc ack(socket: UtpSocket): Packet =
ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576)
proc isConnected*(socket: UtpSocket): bool =
socket.state == Connected
# TODO not implemented # TODO not implemented
# for now just log incoming packets # for now just log incoming packets
proc processPacket(p: Packet) = proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) =
notice "Received packet ", packet = p notice "Received packet ", packet = p
let socketKey = UtpSocketKey(remoteAddress: sender, rcvId: p.header.connectionId)
let maybeSocket = prot.activeSockets.getUtpSocket(socketKey)
if (maybeSocket.isSome()):
let socket = maybeSocket.unsafeGet()
case p.header.pType
of ST_DATA:
# TODO not implemented
notice "Received ST_DATA on known socket"
of ST_FIN:
# TODO not implemented
notice "Received ST_FIN on known socket"
of ST_STATE:
notice "Received ST_STATE on known socket"
if (socket.state == SynSent):
socket.state = Connected
socket.ackNr = p.header.seqNr
socket.connectionFuture.complete(socket)
# TODO to finish handhske we should respond with ST_DATA packet, without it
# socket is left in half-open state
of ST_RESET:
# TODO not implemented
notice "Received ST_RESET on known socket"
of ST_SYN:
# TODO not implemented
notice "Received ST_SYN on known socket"
else:
# We got packet for which we do not have active socket. If the packet is not a
# SynPacket we should reject it and send rst packet to sender in some cases
if (p.header.pType == ST_SYN):
# Initial ackNr is set to incoming packer seqNr
let incomingSocket = initIncomingSocket(sender, p.header.connectionId, p.header.seqNr, prot.rng[])
let socketKey = UtpSocketKey(remoteAddress: incomingSocket.remoteAddress, rcvId: incomingSocket.connectionIdRcv)
prot.activeSockets.registerUtpSocket(socketKey, incomingSocket)
let synAck = incomingSocket.ack()
let encoded = encodePacket(synAck)
# TODO sending should be done from UtpSocket context
discard prot.transport.sendTo(sender, encoded)
notice "Received ST_SYN and socket is not known"
else:
# TODO not implemented
notice "Received not ST_SYN and socket is not know"
# Connect to provided address # Connect to provided address
# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 # Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732
# TODO not implemented # TODO not implemented
proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] {.async.} = proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] =
let packet = synPacket(p.rng[], randUint16(p.rng[]), 1048576) let socket = initOutgoingSocket(address, p.rng[])
let socketKey = UtpSocketKey(remoteAddress: socket.remoteAddress, rcvId: socket.connectionIdRcv)
# TODO Buffer in syn packet should be based on our current buffer size
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576)
notice "Sending packet", packet = packet notice "Sending packet", packet = packet
let packetEncoded = encodePacket(packet) let packetEncoded = encodePacket(packet)
await p.transport.sendTo(address, packetEncoded) p.activeSockets.registerUtpSocket(socketKey, socket)
return UtpSocket() # TODO add callback to handle errors and cancellation i.e unregister socket on
# send error and finish connection future with failure
# sending should be done from UtpSocketContext
discard p.transport.sendTo(address, packetEncoded)
return socket.connectionFuture
proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): proc processDatagram(transp: DatagramTransport, raddr: TransportAddress):
Future[void] {.async.} = Future[void] {.async.} =
let utpProt = getUserData[UtpProtocol](transp)
# TODO: should we use `peekMessage()` to avoid allocation? # TODO: should we use `peekMessage()` to avoid allocation?
let buf = try: transp.getMessage() let buf = try: transp.getMessage()
except TransportOsError as e: except TransportOsError as e:
@ -49,13 +212,16 @@ proc processDatagram(transp: DatagramTransport, raddr: TransportAddress):
let dec = decodePacket(buf) let dec = decodePacket(buf)
if (dec.isOk()): if (dec.isOk()):
processPacket(dec.get()) processPacket(utpProt, dec.get(), raddr)
else: else:
warn "failed to decode packet from address", address = raddr warn "failed to decode packet from address", address = raddr
proc new*(T: type UtpProtocol, address: TransportAddress, rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} = proc new*(T: type UtpProtocol, address: TransportAddress, rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} =
let ta = newDatagramTransport(processDatagram, local = address) let activeSockets = UtpSocketsContainerRef.new()
UtpProtocol(transport: ta, rng: rng) let utp = UtpProtocol(activeSockets: activeSockets, rng: rng)
let ta = newDatagramTransport(processDatagram, udata = utp, local = address)
utp.transport = ta
utp
proc closeWait*(p: UtpProtocol): Future[void] = proc closeWait*(p: UtpProtocol): Future[void] =
p.transport.closeWait() p.transport.closeWait()

View File

@ -0,0 +1,11 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
./test_packets,
./test_protocol

View File

@ -16,7 +16,7 @@ suite "Utp packets encoding/decoding":
let rng = newRng() let rng = newRng()
test "Encode/decode syn packet": test "Encode/decode syn packet":
let synPacket = synPacket(rng[], 10, 20) let synPacket = synPacket(5, 10, 20)
let encoded = encodePacket(synPacket) let encoded = encodePacket(synPacket)
let decoded = decodePacket(encoded) let decoded = decodePacket(encoded)

View File

@ -0,0 +1,31 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
chronos,
testutils/unittests,
../../eth/utp/utp_protocol,
../../eth/keys
procSuite "Utp protocol tests":
let rng = newRng()
asyncTest "Success connect to remote host":
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(address)
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt2 = UtpProtocol.new(address1)
let sock = await utpProt1.connectTo(address1)
check:
sock.isConnected()
await utpProt1.closeWait()
await utpProt2.closeWait()