From 41d2d3c99195ffd1c8a5baf688127df973f5329e Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Fri, 10 Dec 2021 11:12:24 +0100 Subject: [PATCH] Utp top level raises & some clean-up (#445) * Add few missing top level raises Defect in uTP - Add top level {.push raises: [Defect].} - remove some local raises, including some unneeded CatchableErrors. - Don't export messageHandler (avoiding annoying naming collisions) - export utp_router as those connection callbacks are in the API * Add some missing copyright clauses * Some ident and max line length cleanup * Rename utp_discv5_protocol.nim to be more consistent --- eth/utp/growable_buffer.nim | 6 ++ eth/utp/ledbat_congestion_control.nim | 12 ++- eth/utp/utp_discov5_protocol.nim | 88 ---------------------- eth/utp/utp_discv5_protocol.nim | 103 ++++++++++++++++++++++++++ eth/utp/utp_protocol.nim | 2 +- eth/utp/utp_router.nim | 60 ++++++++------- eth/utp/utp_socket.nim | 2 +- tests/utp/test_discv5_protocol.nim | 3 +- 8 files changed, 154 insertions(+), 122 deletions(-) delete mode 100644 eth/utp/utp_discov5_protocol.nim create mode 100644 eth/utp/utp_discv5_protocol.nim diff --git a/eth/utp/growable_buffer.nim b/eth/utp/growable_buffer.nim index 07c0fe6..796ffda 100644 --- a/eth/utp/growable_buffer.nim +++ b/eth/utp/growable_buffer.nim @@ -1,3 +1,9 @@ +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + import std/[options, math, sugar] diff --git a/eth/utp/ledbat_congestion_control.nim b/eth/utp/ledbat_congestion_control.nim index 49baef4..41b0c01 100644 --- a/eth/utp/ledbat_congestion_control.nim +++ b/eth/utp/ledbat_congestion_control.nim @@ -1,8 +1,15 @@ +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + import chronos, ./utp_utils - const targetDelay = milliseconds(100) # explanation from reference impl: @@ -93,6 +100,3 @@ proc applyCongestionControl*( newMaxWindowSize = clamp(newMaxWindowSize, minWindowSize, maxSndBufferSize) (newMaxWindowSize, newSlowStartTreshold, newSlowStart) - - - diff --git a/eth/utp/utp_discov5_protocol.nim b/eth/utp/utp_discov5_protocol.nim deleted file mode 100644 index 791e129..0000000 --- a/eth/utp/utp_discov5_protocol.nim +++ /dev/null @@ -1,88 +0,0 @@ -import - std/[hashes], - chronos, chronicles, - ../p2p/discoveryv5/[protocol, node], - ./utp_router, - ../keys - -type UtpDiscv5Protocol* = ref object of TalkProtocol - prot: protocol.Protocol - router: UtpRouter[Node] - -proc hash(x: UtpSocketKey[Node]): Hash = - var h = 0 - h = h !& x.remoteAddress.hash - h = h !& x.rcvId.hash - !$h - -proc initSendCallback(t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[Node] = - return ( - proc (to: Node, data: seq[byte]): Future[void] = - let fut = newFuture[void]() - # TODO In discvoveryv5 each talkreq wait for talkresp, but here we would really - # like the fire and forget semantics (similar to udp). - # For now start talkreq/response in background, and discard its result. - # That way we also lose information about any possible errors. - # Cosider adding talkreq proc which does not wait for response, - discard t.talkreq(to, subProtocolName, data) - fut.complete() - return fut - ) - -proc messageHandler*(protocol: TalkProtocol, request: seq[byte], - srcId: NodeId, srcUdpAddress: Address): seq[byte] = - let p = UtpDiscv5Protocol(protocol) - let maybeSender = p.prot.getNode(srcId) - - if maybeSender.isSome(): - let sender = maybeSender.unsafeGet() - # processIncomingBytes may respond to remote by using talkreq requests - asyncSpawn p.router.processIncomingBytes(request, sender) - # We always sending empty response as discv5 spec requires that talkreq always - # receive talkresp - @[] - else: - @[] - -proc new*( - T: type UtpDiscv5Protocol, - p: protocol.Protocol, - subProtocolName: seq[byte], - acceptConnectionCb: AcceptConnectionCallback[Node], - socketConfig: SocketConfig = SocketConfig.init(), - allowConnectionCb: AllowConnectionCallback[Node] = nil, - rng = newRng()): UtpDiscv5Protocol {.raises: [Defect, CatchableError].} = - doAssert(not(isNil(acceptConnectionCb))) - - let router = UtpRouter[Node].new( - acceptConnectionCb, - allowConnectionCb, - socketConfig, - rng - ) - router.sendCb = initSendCallback(p, subProtocolName) - - let prot = UtpDiscv5Protocol( - protocolHandler: messageHandler, - prot: p, - router: router - ) - - p.registerTalkProtocol(subProtocolName, prot).expect( - "Only one protocol should have this id" - ) - prot - -proc connectTo*(r: UtpDiscv5Protocol, address: Node): Future[ConnectionResult[Node]]= - return r.router.connectTo(address) - -proc connectTo*(r: UtpDiscv5Protocol, address: Node, connectionId: uint16): Future[ConnectionResult[Node]]= - return r.router.connectTo(address, connectionId) - -proc shutdown*(r: UtpDiscv5Protocol) = - ## closes all managed utp connections in background (not closed discovery, it is up to user) - r.router.shutdown() - -proc shutdownWait*(r: UtpDiscv5Protocol) {.async.} = - ## closes all managed utp connections in background (not closed discovery, it is up to user) - await r.router.shutdownWait() diff --git a/eth/utp/utp_discv5_protocol.nim b/eth/utp/utp_discv5_protocol.nim new file mode 100644 index 0000000..a990e16 --- /dev/null +++ b/eth/utp/utp_discv5_protocol.nim @@ -0,0 +1,103 @@ +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + std/[hashes], + chronos, chronicles, + ../p2p/discoveryv5/protocol, + ./utp_router, + ../keys + +export utp_router + +type UtpDiscv5Protocol* = ref object of TalkProtocol + prot: protocol.Protocol + router: UtpRouter[Node] + +proc hash(x: UtpSocketKey[Node]): Hash = + var h = 0 + h = h !& x.remoteAddress.hash + h = h !& x.rcvId.hash + !$h + +proc initSendCallback( + t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[Node] = + return ( + proc (to: Node, data: seq[byte]): Future[void] = + let fut = newFuture[void]() + # TODO: In discovery v5 each talkreq waits for a talkresp, but here we + # would really like the fire and forget semantics (similar to udp). + # For now start talkreq/talkresp in background, and discard its result. + # That way we also lose information about any possible errors. + # Consider adding talkreq proc which does not wait for the response. + discard t.talkreq(to, subProtocolName, data) + fut.complete() + return fut + ) + +proc messageHandler(protocol: TalkProtocol, request: seq[byte], + srcId: NodeId, srcUdpAddress: Address): seq[byte] = + let p = UtpDiscv5Protocol(protocol) + let maybeSender = p.prot.getNode(srcId) + + if maybeSender.isSome(): + let sender = maybeSender.unsafeGet() + # processIncomingBytes may respond to remote by using talkreq requests + asyncSpawn p.router.processIncomingBytes(request, sender) + # We always send empty responses as discv5 spec requires that talkreq + # always receives a talkresp. + @[] + else: + @[] + +proc new*( + T: type UtpDiscv5Protocol, + p: protocol.Protocol, + subProtocolName: seq[byte], + acceptConnectionCb: AcceptConnectionCallback[Node], + socketConfig: SocketConfig = SocketConfig.init(), + allowConnectionCb: AllowConnectionCallback[Node] = nil, + rng = newRng()): UtpDiscv5Protocol = + doAssert(not(isNil(acceptConnectionCb))) + + let router = UtpRouter[Node].new( + acceptConnectionCb, + allowConnectionCb, + socketConfig, + rng + ) + router.sendCb = initSendCallback(p, subProtocolName) + + let prot = UtpDiscv5Protocol( + protocolHandler: messageHandler, + prot: p, + router: router + ) + + p.registerTalkProtocol(subProtocolName, prot).expect( + "Only one protocol should have this id" + ) + prot + +proc connectTo*(r: UtpDiscv5Protocol, address: Node): + Future[ConnectionResult[Node]] = + return r.router.connectTo(address) + +proc connectTo*(r: UtpDiscv5Protocol, address: Node, connectionId: uint16): + Future[ConnectionResult[Node]] = + return r.router.connectTo(address, connectionId) + +proc shutdown*(r: UtpDiscv5Protocol) = + ## Closes all managed utp connections in background (does not close discovery, + ## this is up to user) + r.router.shutdown() + +proc shutdownWait*(r: UtpDiscv5Protocol) {.async.} = + ## Closes all managed utp connections in background (does not close discovery, + ## this is up to user) + await r.router.shutdownWait() diff --git a/eth/utp/utp_protocol.nim b/eth/utp/utp_protocol.nim index 8ef687e..70e1caf 100644 --- a/eth/utp/utp_protocol.nim +++ b/eth/utp/utp_protocol.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021 Status Research & Development GmbH +# Copyright (c) 2021 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index 58946bd..e9b107c 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -1,3 +1,11 @@ +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + import std/[tables, options, sugar], chronos, bearssl, chronicles, @@ -15,17 +23,17 @@ type # ``server`` - UtpProtocol object. # ``client`` - accepted client utp socket. AcceptConnectionCallback*[A] = proc(server: UtpRouter[A], - client: UtpSocket[A]): Future[void] {.gcsafe, raises: [Defect].} + client: UtpSocket[A]): Future[void] {.gcsafe, raises: [Defect].} - # Callback to act as fire wall for incoming peers. Should return true if peer is allowed - # to connect. - AllowConnectionCallback*[A] = - proc(r: UtpRouter[A], remoteAddress: A, connectionId: uint16): bool {.gcsafe, raises: [Defect], noSideEffect.} + # Callback to act as firewall for incoming peers. Should return true if peer + # is allowed to connect. + AllowConnectionCallback*[A] = proc(r: UtpRouter[A], remoteAddress: A, + connectionId: uint16): bool {.gcsafe, raises: [Defect], noSideEffect.} - # Oject responsible for creating and maintaing table of of utp sockets. - # caller should use `processIncomingBytes` proc to feed it with incoming byte - # packets, based this input, proper utp sockets will be created, closed, or will - # receive data + # Object responsible for creating and maintaining table of utp sockets. + # Caller should use `processIncomingBytes` proc to feed it with incoming byte + # packets. Based on this input, proper utp sockets will be created, closed, + # or will receive data. UtpRouter*[A] = ref object sockets: Table[UtpSocketKey[A], UtpSocket[A]] socketConfig: SocketConfig @@ -36,13 +44,13 @@ type rng*: ref BrHmacDrbgContext const - # Maximal number of tries to genearte unique socket while establishing outgoing - # connection. + # Maximal number of tries to generate unique socket while establishing + # outgoing connection. maxSocketGenerationTries = 1000 -# this should probably be in standard lib, it allows lazy composition of options i.e -# one can write: O1 orElse O2 orElse O3, and chain will be evaluated to first option -# which isSome() +# This should probably be in standard lib, it allows lazy composition of options +# i.e one can write: O1 orElse O2 orElse O3, and chain will be evaluated to +# first option which isSome() template orElse[A](a: Option[A], b: Option[A]): Option[A] = if (a.isSome()): a @@ -70,13 +78,13 @@ proc len*[A](s: UtpRouter[A]): int = proc registerUtpSocket[A](p: UtpRouter, s: UtpSocket[A]) = ## Register socket, overwriting already existing one p.sockets[s.socketKey] = s - # Install deregister handler, so when socket will get closed, in will be promptly + # Install deregister handler, so when socket gets closed, in will be promptly # removed from open sockets table s.registerCloseCallback(proc () = p.deRegisterUtpSocket(s)) proc registerIfAbsent[A](p: UtpRouter, s: UtpSocket[A]): bool = - ## Registers socket only if its not already exsiting in the active sockets table - ## return true is socket has been succesfuly registered + ## Registers socket only if it's not already existing in the active sockets + ## table. Returns true if socket has been succesfuly registered. if p.sockets.hasKey(s.socketKey): false else: @@ -84,11 +92,11 @@ proc registerIfAbsent[A](p: UtpRouter, s: UtpSocket[A]): bool = true proc new*[A]( - T: type UtpRouter[A], - acceptConnectionCb: AcceptConnectionCallback[A], - allowConnectionCb: AllowConnectionCallback[A], - socketConfig: SocketConfig = SocketConfig.init(), - rng = newRng()): UtpRouter[A] {.raises: [Defect, CatchableError].} = + T: type UtpRouter[A], + acceptConnectionCb: AcceptConnectionCallback[A], + allowConnectionCb: AllowConnectionCallback[A], + socketConfig: SocketConfig = SocketConfig.init(), + rng = newRng()): UtpRouter[A] = doAssert(not(isNil(acceptConnectionCb))) UtpRouter[A]( sockets: initTable[UtpSocketKey[A], UtpSocket[A]](), @@ -99,10 +107,10 @@ proc new*[A]( ) proc new*[A]( - T: type UtpRouter[A], - acceptConnectionCb: AcceptConnectionCallback[A], - socketConfig: SocketConfig = SocketConfig.init(), - rng = newRng()): UtpRouter[A] {.raises: [Defect, CatchableError].} = + T: type UtpRouter[A], + acceptConnectionCb: AcceptConnectionCallback[A], + socketConfig: SocketConfig = SocketConfig.init(), + rng = newRng()): UtpRouter[A] = UtpRouter[A].new(acceptConnectionCb, nil, socketConfig, rng) # There are different possiblites how connection was established, and we need to diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 60c69b8..f91871a 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021 Status Research & Development GmbH +# Copyright (c) 2021 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). diff --git a/tests/utp/test_discv5_protocol.nim b/tests/utp/test_discv5_protocol.nim index 295f79b..391dce5 100644 --- a/tests/utp/test_discv5_protocol.nim +++ b/tests/utp/test_discv5_protocol.nim @@ -12,8 +12,7 @@ import testutils/unittests, ../../eth/p2p/discoveryv5/[enr, node, routing_table], ../../eth/p2p/discoveryv5/protocol as discv5_protocol, - ../../eth/utp/utp_router, - ../../eth/utp/utp_discov5_protocol, + ../../eth/utp/utp_discv5_protocol, ../../eth/keys proc localAddress*(port: int): Address =