nim-eth/eth/utp/utp_protocol.nim

157 lines
4.9 KiB
Nim
Raw Permalink Normal View History

# Copyright (c) 2021-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/[tables, options, hashes, math],
2022-06-17 20:45:37 +00:00
chronos, chronicles,
./utp_router,
../common/keys
logScope:
topics = "eth utp"
type
2022-11-16 16:44:00 +00:00
# For now utp protocol is tied to udp transport, but ultimately we would like to
# abstract underlying transport to be able to run utp over udp, discoveryv5 or
# maybe some test transport
UtpProtocol* = ref object
transport: DatagramTransport
utpRouter: UtpRouter[TransportAddress]
SendCallbackBuilder* =
proc (d: DatagramTransport):
SendCallback[TransportAddress] {.gcsafe, raises: [].}
chronicles.formatIt(TransportAddress): $it
# This should probably be defined in TransportAddress module, as hash function should
2022-11-16 16:44:00 +00:00
# be consistent with equality function
# in nim zero arrays always have hash equal to 0, irrespectively of array size, to
2022-11-16 16:44:00 +00:00
# avoid clashes between different types of addresses, each type have mixed different
# magic number
proc hash(x: TransportAddress): Hash =
var h: Hash = 0
case x.family
of AddressFamily.None:
h = h !& 31
!$h
of AddressFamily.IPv4:
h = h !& x.address_v4.hash
h = h !& x.port.hash
h = h !& 37
!$h
of AddressFamily.IPv6:
h = h !& x.address_v6.hash
h = h !& x.port.hash
h = h !& 41
!$h
of AddressFamily.Unix:
h = h !& x.address_un.hash
h = h !& 43
!$h
# Required to use socketKey as key in hashtable
proc hash(x: UtpSocketKey[TransportAddress]): Hash =
var h = 0
h = h !& x.remoteAddress.hash
h = h !& x.rcvId.hash
!$h
proc processDatagram(transp: DatagramTransport, raddr: TransportAddress):
Future[void] {.async: (raises: []).} =
let router = getUserData[UtpRouter[TransportAddress]](transp)
# TODO: should we use `peekMessage()` to avoid allocation?
let buf = try: transp.getMessage()
except TransportError as e:
trace "Error reading datagram msg: ", error = e.msg
# This is likely to be local network connection issues.
return
try:
await processIncomingBytes[TransportAddress](router, buf, raddr)
except CancelledError:
debug "processIncomingBytes canceled"
proc initSendCallback(t: DatagramTransport): SendCallback[TransportAddress] =
return (
proc (
to: TransportAddress, data: seq[byte]
) {.raises: [], gcsafe.} =
let fut = t.sendTo(to, data)
let cb = proc(data: pointer) {.gcsafe.} =
if fut.failed:
debug "uTP send failed", msg = fut.readError.msg
fut.addCallback cb
)
proc new*(
2022-08-12 13:09:59 +00:00
T: type UtpProtocol,
acceptConnectionCb: AcceptConnectionCallback[TransportAddress],
address: TransportAddress,
udata: pointer = nil,
socketConfig: SocketConfig = SocketConfig.init(),
allowConnectionCb: AllowConnectionCallback[TransportAddress] = nil,
sendCallbackBuilder: SendCallbackBuilder = nil,
rng = newRng()): UtpProtocol {.raises: [TransportOsError].} =
doAssert(not(isNil(acceptConnectionCb)))
let router = UtpRouter[TransportAddress].new(
acceptConnectionCb,
allowConnectionCb,
2022-08-12 13:09:59 +00:00
udata,
socketConfig,
rng
)
let ta = newDatagramTransport(processDatagram, udata = router, local = address)
if (sendCallbackBuilder == nil):
router.sendCb = initSendCallback(ta)
else:
router.sendCb = sendCallbackBuilder(ta)
UtpProtocol(transport: ta, utpRouter: router)
2022-08-12 13:09:59 +00:00
proc new*(
T: type UtpProtocol,
acceptConnectionCb: AcceptConnectionCallback[TransportAddress],
address: TransportAddress,
udata: ref,
socketConfig: SocketConfig = SocketConfig.init(),
allowConnectionCb: AllowConnectionCallback[TransportAddress] = nil,
sendCallbackBuilder: SendCallbackBuilder = nil,
rng = newRng()): UtpProtocol {.raises: [TransportOsError].} =
2022-08-12 13:09:59 +00:00
GC_ref(udata)
UtpProtocol.new(
acceptConnectionCb,
address,
cast[pointer](udata),
socketConfig,
allowConnectionCb,
sendCallbackBuilder,
rng
)
proc shutdownWait*(p: UtpProtocol): Future[void] {.async: (raises: []).} =
## Closes all managed utp sockets and then underlying transport
await p.utpRouter.shutdownWait()
await p.transport.closeWait()
proc connectTo*(
r: UtpProtocol, address: TransportAddress
): Future[ConnectionResult[TransportAddress]] {.async: (raw: true, raises: [CancelledError]).} =
r.utpRouter.connectTo(address)
proc connectTo*(
r: UtpProtocol, address: TransportAddress, connectionId: uint16
): Future[ConnectionResult[TransportAddress]] {.async: (raw: true, raises: [CancelledError]).} =
r.utpRouter.connectTo(address, connectionId)
proc openSockets*(r: UtpProtocol): int =
len(r.utpRouter)