Utp top level raises & some clean-up (#445)

* Add few missing top level raises Defect in uTP

- Add top level {.push raises: [Defect].}
- remove some local raises, including some unneeded
CatchableErrors.
- Don't export messageHandler (avoiding annoying naming collisions)
- export utp_router as those connection callbacks are in the API

* Add some missing copyright clauses

* Some ident and max line length cleanup

* Rename utp_discv5_protocol.nim to be more consistent
This commit is contained in:
Kim De Mey 2021-12-10 11:12:24 +01:00 committed by GitHub
parent 09959d2a3f
commit 41d2d3c991
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 154 additions and 122 deletions

View File

@ -1,3 +1,9 @@
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/[options, math, sugar]

View File

@ -1,8 +1,15 @@
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
chronos,
./utp_utils
const targetDelay = milliseconds(100)
# explanation from reference impl:
@ -93,6 +100,3 @@ proc applyCongestionControl*(
newMaxWindowSize = clamp(newMaxWindowSize, minWindowSize, maxSndBufferSize)
(newMaxWindowSize, newSlowStartTreshold, newSlowStart)

View File

@ -1,88 +0,0 @@
import
std/[hashes],
chronos, chronicles,
../p2p/discoveryv5/[protocol, node],
./utp_router,
../keys
type UtpDiscv5Protocol* = ref object of TalkProtocol
prot: protocol.Protocol
router: UtpRouter[Node]
proc hash(x: UtpSocketKey[Node]): Hash =
var h = 0
h = h !& x.remoteAddress.hash
h = h !& x.rcvId.hash
!$h
proc initSendCallback(t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[Node] =
return (
proc (to: Node, data: seq[byte]): Future[void] =
let fut = newFuture[void]()
# TODO In discvoveryv5 each talkreq wait for talkresp, but here we would really
# like the fire and forget semantics (similar to udp).
# For now start talkreq/response in background, and discard its result.
# That way we also lose information about any possible errors.
# Cosider adding talkreq proc which does not wait for response,
discard t.talkreq(to, subProtocolName, data)
fut.complete()
return fut
)
proc messageHandler*(protocol: TalkProtocol, request: seq[byte],
srcId: NodeId, srcUdpAddress: Address): seq[byte] =
let p = UtpDiscv5Protocol(protocol)
let maybeSender = p.prot.getNode(srcId)
if maybeSender.isSome():
let sender = maybeSender.unsafeGet()
# processIncomingBytes may respond to remote by using talkreq requests
asyncSpawn p.router.processIncomingBytes(request, sender)
# We always sending empty response as discv5 spec requires that talkreq always
# receive talkresp
@[]
else:
@[]
proc new*(
T: type UtpDiscv5Protocol,
p: protocol.Protocol,
subProtocolName: seq[byte],
acceptConnectionCb: AcceptConnectionCallback[Node],
socketConfig: SocketConfig = SocketConfig.init(),
allowConnectionCb: AllowConnectionCallback[Node] = nil,
rng = newRng()): UtpDiscv5Protocol {.raises: [Defect, CatchableError].} =
doAssert(not(isNil(acceptConnectionCb)))
let router = UtpRouter[Node].new(
acceptConnectionCb,
allowConnectionCb,
socketConfig,
rng
)
router.sendCb = initSendCallback(p, subProtocolName)
let prot = UtpDiscv5Protocol(
protocolHandler: messageHandler,
prot: p,
router: router
)
p.registerTalkProtocol(subProtocolName, prot).expect(
"Only one protocol should have this id"
)
prot
proc connectTo*(r: UtpDiscv5Protocol, address: Node): Future[ConnectionResult[Node]]=
return r.router.connectTo(address)
proc connectTo*(r: UtpDiscv5Protocol, address: Node, connectionId: uint16): Future[ConnectionResult[Node]]=
return r.router.connectTo(address, connectionId)
proc shutdown*(r: UtpDiscv5Protocol) =
## closes all managed utp connections in background (not closed discovery, it is up to user)
r.router.shutdown()
proc shutdownWait*(r: UtpDiscv5Protocol) {.async.} =
## closes all managed utp connections in background (not closed discovery, it is up to user)
await r.router.shutdownWait()

View File

@ -0,0 +1,103 @@
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
std/[hashes],
chronos, chronicles,
../p2p/discoveryv5/protocol,
./utp_router,
../keys
export utp_router
type UtpDiscv5Protocol* = ref object of TalkProtocol
prot: protocol.Protocol
router: UtpRouter[Node]
proc hash(x: UtpSocketKey[Node]): Hash =
var h = 0
h = h !& x.remoteAddress.hash
h = h !& x.rcvId.hash
!$h
proc initSendCallback(
t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[Node] =
return (
proc (to: Node, data: seq[byte]): Future[void] =
let fut = newFuture[void]()
# TODO: In discovery v5 each talkreq waits for a talkresp, but here we
# would really like the fire and forget semantics (similar to udp).
# For now start talkreq/talkresp in background, and discard its result.
# That way we also lose information about any possible errors.
# Consider adding talkreq proc which does not wait for the response.
discard t.talkreq(to, subProtocolName, data)
fut.complete()
return fut
)
proc messageHandler(protocol: TalkProtocol, request: seq[byte],
srcId: NodeId, srcUdpAddress: Address): seq[byte] =
let p = UtpDiscv5Protocol(protocol)
let maybeSender = p.prot.getNode(srcId)
if maybeSender.isSome():
let sender = maybeSender.unsafeGet()
# processIncomingBytes may respond to remote by using talkreq requests
asyncSpawn p.router.processIncomingBytes(request, sender)
# We always send empty responses as discv5 spec requires that talkreq
# always receives a talkresp.
@[]
else:
@[]
proc new*(
T: type UtpDiscv5Protocol,
p: protocol.Protocol,
subProtocolName: seq[byte],
acceptConnectionCb: AcceptConnectionCallback[Node],
socketConfig: SocketConfig = SocketConfig.init(),
allowConnectionCb: AllowConnectionCallback[Node] = nil,
rng = newRng()): UtpDiscv5Protocol =
doAssert(not(isNil(acceptConnectionCb)))
let router = UtpRouter[Node].new(
acceptConnectionCb,
allowConnectionCb,
socketConfig,
rng
)
router.sendCb = initSendCallback(p, subProtocolName)
let prot = UtpDiscv5Protocol(
protocolHandler: messageHandler,
prot: p,
router: router
)
p.registerTalkProtocol(subProtocolName, prot).expect(
"Only one protocol should have this id"
)
prot
proc connectTo*(r: UtpDiscv5Protocol, address: Node):
Future[ConnectionResult[Node]] =
return r.router.connectTo(address)
proc connectTo*(r: UtpDiscv5Protocol, address: Node, connectionId: uint16):
Future[ConnectionResult[Node]] =
return r.router.connectTo(address, connectionId)
proc shutdown*(r: UtpDiscv5Protocol) =
## Closes all managed utp connections in background (does not close discovery,
## this is up to user)
r.router.shutdown()
proc shutdownWait*(r: UtpDiscv5Protocol) {.async.} =
## Closes all managed utp connections in background (does not close discovery,
## this is up to user)
await r.router.shutdownWait()

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).

View File

@ -1,3 +1,11 @@
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
std/[tables, options, sugar],
chronos, bearssl, chronicles,
@ -15,17 +23,17 @@ type
# ``server`` - UtpProtocol object.
# ``client`` - accepted client utp socket.
AcceptConnectionCallback*[A] = proc(server: UtpRouter[A],
client: UtpSocket[A]): Future[void] {.gcsafe, raises: [Defect].}
client: UtpSocket[A]): Future[void] {.gcsafe, raises: [Defect].}
# Callback to act as fire wall for incoming peers. Should return true if peer is allowed
# to connect.
AllowConnectionCallback*[A] =
proc(r: UtpRouter[A], remoteAddress: A, connectionId: uint16): bool {.gcsafe, raises: [Defect], noSideEffect.}
# Callback to act as firewall for incoming peers. Should return true if peer
# is allowed to connect.
AllowConnectionCallback*[A] = proc(r: UtpRouter[A], remoteAddress: A,
connectionId: uint16): bool {.gcsafe, raises: [Defect], noSideEffect.}
# Oject responsible for creating and maintaing table of of utp sockets.
# caller should use `processIncomingBytes` proc to feed it with incoming byte
# packets, based this input, proper utp sockets will be created, closed, or will
# receive data
# Object responsible for creating and maintaining table of utp sockets.
# Caller should use `processIncomingBytes` proc to feed it with incoming byte
# packets. Based on this input, proper utp sockets will be created, closed,
# or will receive data.
UtpRouter*[A] = ref object
sockets: Table[UtpSocketKey[A], UtpSocket[A]]
socketConfig: SocketConfig
@ -36,13 +44,13 @@ type
rng*: ref BrHmacDrbgContext
const
# Maximal number of tries to genearte unique socket while establishing outgoing
# connection.
# Maximal number of tries to generate unique socket while establishing
# outgoing connection.
maxSocketGenerationTries = 1000
# this should probably be in standard lib, it allows lazy composition of options i.e
# one can write: O1 orElse O2 orElse O3, and chain will be evaluated to first option
# which isSome()
# This should probably be in standard lib, it allows lazy composition of options
# i.e one can write: O1 orElse O2 orElse O3, and chain will be evaluated to
# first option which isSome()
template orElse[A](a: Option[A], b: Option[A]): Option[A] =
if (a.isSome()):
a
@ -70,13 +78,13 @@ proc len*[A](s: UtpRouter[A]): int =
proc registerUtpSocket[A](p: UtpRouter, s: UtpSocket[A]) =
## Register socket, overwriting already existing one
p.sockets[s.socketKey] = s
# Install deregister handler, so when socket will get closed, in will be promptly
# Install deregister handler, so when socket gets closed, in will be promptly
# removed from open sockets table
s.registerCloseCallback(proc () = p.deRegisterUtpSocket(s))
proc registerIfAbsent[A](p: UtpRouter, s: UtpSocket[A]): bool =
## Registers socket only if its not already exsiting in the active sockets table
## return true is socket has been succesfuly registered
## Registers socket only if it's not already existing in the active sockets
## table. Returns true if socket has been succesfuly registered.
if p.sockets.hasKey(s.socketKey):
false
else:
@ -84,11 +92,11 @@ proc registerIfAbsent[A](p: UtpRouter, s: UtpSocket[A]): bool =
true
proc new*[A](
T: type UtpRouter[A],
acceptConnectionCb: AcceptConnectionCallback[A],
allowConnectionCb: AllowConnectionCallback[A],
socketConfig: SocketConfig = SocketConfig.init(),
rng = newRng()): UtpRouter[A] {.raises: [Defect, CatchableError].} =
T: type UtpRouter[A],
acceptConnectionCb: AcceptConnectionCallback[A],
allowConnectionCb: AllowConnectionCallback[A],
socketConfig: SocketConfig = SocketConfig.init(),
rng = newRng()): UtpRouter[A] =
doAssert(not(isNil(acceptConnectionCb)))
UtpRouter[A](
sockets: initTable[UtpSocketKey[A], UtpSocket[A]](),
@ -99,10 +107,10 @@ proc new*[A](
)
proc new*[A](
T: type UtpRouter[A],
acceptConnectionCb: AcceptConnectionCallback[A],
socketConfig: SocketConfig = SocketConfig.init(),
rng = newRng()): UtpRouter[A] {.raises: [Defect, CatchableError].} =
T: type UtpRouter[A],
acceptConnectionCb: AcceptConnectionCallback[A],
socketConfig: SocketConfig = SocketConfig.init(),
rng = newRng()): UtpRouter[A] =
UtpRouter[A].new(acceptConnectionCb, nil, socketConfig, rng)
# There are different possiblites how connection was established, and we need to

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).

View File

@ -12,8 +12,7 @@ import
testutils/unittests,
../../eth/p2p/discoveryv5/[enr, node, routing_table],
../../eth/p2p/discoveryv5/protocol as discv5_protocol,
../../eth/utp/utp_router,
../../eth/utp/utp_discov5_protocol,
../../eth/utp/utp_discv5_protocol,
../../eth/keys
proc localAddress*(port: int): Address =