Add initial skeleton of utp protocol (#397)

* Add initial impl of utp over udp

* Add more comments

* Add licenses and push declarations

* Add tests to nimble task

* Pr comments

Use better random generator
Raise assert error in case of buffer io exception
This commit is contained in:
KonradStaniec 2021-09-13 14:54:06 +02:00 committed by GitHub
parent a95b205cf7
commit 9f2f101070
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 298 additions and 0 deletions

View File

@ -69,6 +69,9 @@ task test_db, "Run db tests":
task test_ssz, "Run ssz tests":
runTest("tests/ssz/all_tests")
task test_utp, "Run utp tests":
runTest("tests/utp/test_packets")
task test, "Run all tests":
for filename in [
"test_bloom",
@ -82,6 +85,7 @@ task test, "Run all tests":
test_trie_task()
test_db_task()
test_ssz_task()
test_utp_task()
task test_discv5_full, "Run discovery v5 and its dependencies tests":
test_keys_task()

156
eth/utp/packets.nim Normal file
View File

@ -0,0 +1,156 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
std/[monotimes],
faststreams,
stew/[endians2, results, objects], bearssl,
../p2p/discoveryv5/random2
export results
const minimalHeaderSize = 20
const protocolVersion = 1
type
PacketType* = enum
ST_DATA = 0,
ST_FIN = 1,
ST_STATE = 2,
ST_RESET = 3,
ST_SYN = 4
MicroSeconds = uint32
PacketHeaderV1 = object
pType*: PacketType
version*: uint8
extension*: uint8
connectionId*: uint16
timestamp*: MicroSeconds
timestampDiff*: MicroSeconds
wndSize*: uint32
seqNr*: uint16
ackNr*: uint16
Packet* = object
header*: PacketHeaderV1
payload*: seq[uint8]
# Important timing assumptions for utp protocol here:
# 1. Microsecond precisions
# 2. Monotonicity
# Reference lib have a lot of checks to assume that this is monotonic on
# every system, and warnings when monotonic clock is not avaialable.
# For now we can use basic monotime, later it would be good to analyze:
# https://github.com/bittorrent/libutp/blob/master/utp_utils.cpp, to check all the
# timing assumptions on different platforms
proc getMonoTimeTimeStamp(): uint32 =
let time = getMonoTime()
cast[uint32](time.ticks() div 1000)
# Simple generator, not useful for cryptography
proc randUint16*(rng: var BrHmacDrbgContext): uint16 =
uint16(rand(rng, int(high(uint16))))
# Simple generator, not useful for cryptography
proc randUint32*(rng: var BrHmacDrbgContext): uint32 =
uint32(rand(rng, int(high(uint32))))
proc encodeTypeVer(h: PacketHeaderV1): uint8 =
var typeVer = 0'u8
let typeOrd = uint8(ord(h.pType))
typeVer = (typeVer and 0xf0) or (h.version and 0xf)
typeVer = (typeVer and 0xf) or (typeOrd shl 4)
typeVer
proc encodeHeader*(h: PacketHeaderV1): seq[byte] =
var mem = memoryOutput().s
try:
mem.write(encodeTypeVer(h))
mem.write(h.extension)
mem.write(h.connectionId.toBytesBE())
mem.write(h.timestamp.toBytesBE())
mem.write(h.timestampDiff.toBytesBE())
mem.write(h.wndSize.toBytesBE())
mem.write(h.seqNr.toBytesBE())
mem.write(h.ackNr.toBytesBE())
return mem.getOutput()
except IOError as e:
# TODO not sure how writing to memory buffer could throw. Raise assertion error if
# its happen for now
raiseAssert e.msg
proc encodePacket*(p: Packet): seq[byte] =
var mem = memoryOutput().s
try:
mem.write(encodeHeader(p.header))
if (len(p.payload) > 0):
mem.write(p.payload)
mem.getOutput()
except IOError as e:
# TODO not sure how writing to memory buffer could throw. Raise assertion error if
# its happen for now
raiseAssert e.msg
# TODO for now we do not handle extensions
proc decodePacket*(bytes: openArray[byte]): Result[Packet, string] =
if len(bytes) < minimalHeaderSize:
return err("invalid header size")
let version = bytes[0] and 0xf
if version != protocolVersion:
return err("invalid packet version")
var kind: PacketType
if not checkedEnumAssign(kind, (bytes[0] shr 4)):
return err("Invalid message type")
let header =
PacketHeaderV1(
pType: kind,
version: version,
extension: bytes[1],
connection_id: fromBytesBE(uint16, [bytes[2], bytes[3]]),
timestamp: fromBytesBE(uint32, [bytes[4], bytes[5], bytes[6], bytes[7]]),
timestamp_diff: fromBytesBE(uint32, [bytes[8], bytes[9], bytes[10], bytes[11]]),
wnd_size: fromBytesBE(uint32, [bytes[12], bytes[13], bytes[14], bytes[15]]),
seq_nr: fromBytesBE(uint16, [bytes[16], bytes[17]]),
ack_nr: fromBytesBE(uint16, [bytes[18], bytes[19]]),
)
let payload =
if (len(bytes) == 20):
@[]
else:
bytes[20..^1]
ok(Packet(header: header, payload: payload))
# connectionId - should be random not already used number
# bufferSize - should be pre configured initial buffer size for socket
proc synPacket*(rng: var BrHmacDrbgContext, connectionId: uint16, bufferSize: uint32): Packet =
let h = PacketHeaderV1(
pType: ST_SYN,
version: protocolVersion,
# TODO for we do not handle extensions
extension: 0'u8,
# TODO should be random not used number
connectionId: connectionId,
timestamp: getMonoTimeTimeStamp(),
timestampDiff: 0'u32,
# TODO shouldbe current available buffer size
wndSize: bufferSize,
seqNr: randUint16(rng),
# Initialy we did not receive any acks
ackNr: 0'u16
)
Packet(header: h, payload: @[])

29
eth/utp/utp.nim Normal file
View File

@ -0,0 +1,29 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
chronos,
./utp_protocol
# Exemple application to interact with reference implementation server to help with implementation
# To run lib utp server:
# 1. git clone https://github.com/bittorrent/libutp.git
# 2. cd libutp
# 3. make
# 4. ./ucat -ddddd -l -p 9078 - it will run utp server on port 9078
when isMainModule:
# TODO read client/server ports and address from cmd line or config file
let localAddress = initTAddress("0.0.0.0", 9077)
let utpProt = UtpProtocol.new(localAddress)
let remoteServer = initTAddress("0.0.0.0", 9078)
let soc = waitFor utpProt.connectTo(remoteServer)
# Needed to wait for response from server
waitFor(sleepAsync(100))
waitFor utpProt.closeWait()

61
eth/utp/utp_protocol.nim Normal file
View File

@ -0,0 +1,61 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
chronos, chronicles, bearssl,
./packets,
../keys
logScope:
topics = "utp"
type
# For now utp protocol is tied to udp transport, but ultimatly we would like to
# abstract underlying transport to be able to run utp over udp, discoveryv5 or
# maybe some test transport
UtpProtocol* = ref object
transport: DatagramTransport
rng*: ref BrHmacDrbgContext
UtpSocket* = ref object
# TODO not implemented
# for now just log incoming packets
proc processPacket(p: Packet) =
notice "Received packet ", packet = p
# Connect to provided address
# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732
# TODO not implemented
proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] {.async.} =
let packet = synPacket(p.rng[], randUint16(p.rng[]), 1048576)
notice "Sending packet", packet = packet
let packetEncoded = encodePacket(packet)
await p.transport.sendTo(address, packetEncoded)
return UtpSocket()
proc processDatagram(transp: DatagramTransport, raddr: TransportAddress):
Future[void] {.async.} =
# TODO: should we use `peekMessage()` to avoid allocation?
let buf = try: transp.getMessage()
except TransportOsError as e:
# This is likely to be local network connection issues.
return
let dec = decodePacket(buf)
if (dec.isOk()):
processPacket(dec.get())
else:
warn "failed to decode packet from address", address = raddr
proc new*(T: type UtpProtocol, address: TransportAddress, rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} =
let ta = newDatagramTransport(processDatagram, local = address)
UtpProtocol(transport: ta, rng: rng)
proc closeWait*(p: UtpProtocol): Future[void] =
p.transport.closeWait()

View File

@ -0,0 +1,48 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
unittest,
../eth/utp/packets,
../../eth/keys
suite "Utp packets encoding/decoding":
let rng = newRng()
test "Encode/decode syn packet":
let synPacket = synPacket(rng[], 10, 20)
let encoded = encodePacket(synPacket)
let decoded = decodePacket(encoded)
check:
decoded.isOk()
synPacket == decoded.get()
test "Decode state packet":
# Packet obtained by interaction with c reference implementation
let pack: array[20, uint8] = [
0x21'u8, 0x0, 0x15, 0x72, 0x00, 0xBA, 0x4D, 0x71, 0x0, 0x0, 0x0, 0x0, 0x0, 0x10, 0x0,
0x0, 0x41, 0xA7, 0x00, 0x01]
let decoded = decodePacket(pack)
check:
decoded.isOk()
let packet = decoded.get()
check:
packet.header.pType == ST_STATE
packet.header.version == 1
packet.header.extension == 0
packet.header.connectionId == 5490
packet.header.timestamp == 12209521
packet.header.timestampDiff == 0
packet.header.wndSize == 1048576
packet.header.seqNr == 16807
packet.header.ackNr == 1