diff --git a/libp2pdht/private/eth/p2p/discoveryv5/chronosim.nim b/libp2pdht/private/eth/p2p/discoveryv5/chronosim.nim new file mode 100644 index 0000000..77c5d27 --- /dev/null +++ b/libp2pdht/private/eth/p2p/discoveryv5/chronosim.nim @@ -0,0 +1,94 @@ +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# ChronoSim: simulation/emulation wrapper around Chronos + +import + std/[tables, deques], + chronos, + chronicles + +when(true): #enable network emulator + type + DatagramCallback* = proc(transp: DatagramTransport, + remote: TransportAddress): Future[void] {. + gcsafe, raises: [Defect].} + + DatagramTransport* = ref object + udata*: pointer # User-driven pointer + local: TransportAddress # Local address + callback: DatagramCallback # Receive data callback + ingress: Deque[seq[byte]] + egress: Deque[(TransportAddress, seq[byte])] # simple FIFO for now + + var network = initTable[Port, DatagramTransport]() + + proc `$`*(transp: DatagramTransport): string = + $transp.local + + proc recvFrom[T](transp: DatagramTransport, remote: TransportAddress, + msg: sink seq[T], msglen = -1) = + info "recv from ", remote + {.gcsafe.}: + transp.ingress.addLast(msg) + # call the callback on remote + asyncCheck transp.callback(transp, remote) + + proc getLatency(src: TransportAddress, dst: TransportAddress) : Duration = + 50.milliseconds + + proc getLineTime(transp: DatagramTransport, msg: seq[byte]) : Duration = + # let bandwith = transp.bandwidth + let bandwidth = 100 # Bytes/ms = KB/sec + (msg.len div bandwidth).milliseconds + + proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress, + msg: sink seq[T], msglen = -1) {.async.} = + + #transp.egress.addLast(remote, msg) + #await sleepAsync(getLineTime(transp, msg)) + + await sleepAsync(getLatency(transp.local, remote)) + {.gcsafe.}: + network[remote.port].recvFrom(transp.local, msg) + + proc getMessage*(t: DatagramTransport,): seq[byte] {. + raises: [Defect, CatchableError].} = + #echo "getMessage " + t.ingress.popFirst() + + proc close*(transp: DatagramTransport) = + debug "close" + + proc closed*(transp: DatagramTransport): bool {.inline.} = + result = false + + proc closeWait*(transp: DatagramTransport) {.async.} = + debug "closeWait " + + proc getUserData*[T](transp: DatagramTransport): T {.inline.} = + ## Obtain user data stored in ``transp`` object. + result = cast[T](transp.udata) + + proc newDatagramTransport*[T](cbproc: DatagramCallback, + udata: ref T, + local: TransportAddress = AnyAddress, + ): DatagramTransport {. + raises: [Defect, CatchableError].} = + debug "new" + result = DatagramTransport() + GC_ref(udata) + result.udata = cast[pointer](udata) + result.local = local + result.callback = cbproc + {.gcsafe.}: + network[local.port] = result + +export seconds, milliseconds +export TransportAddress, initTAddress +export async, sleepAsync, complete, await +export Future, FutureBase, newFuture, futureContinue +export TransportOsError \ No newline at end of file diff --git a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim index a39a7b2..d58f8a1 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim @@ -8,7 +8,7 @@ import std/[tables, options, deques], bearssl/rand, - chronos, + chronosim, chronicles, libp2p/crypto/crypto, stew/shims/net, @@ -20,83 +20,6 @@ const responseTimeout* = 4.seconds ## timeout for the response of a request-response ## call -when(true): #enable network emulator - type - DatagramCallback* = proc(transp: DatagramTransport, - remote: TransportAddress): Future[void] {. - gcsafe, raises: [Defect].} - - DatagramTransport = ref object - udata*: pointer # User-driven pointer - local: TransportAddress # Local address - callback: DatagramCallback # Receive data callback - ingress: Deque[seq[byte]] - egress: Deque[(TransportAddress, seq[byte])] # simple FIFO for now - - var network = initTable[Port, DatagramTransport]() - - proc `$`*(transp: DatagramTransport): string = - $transp.local - - proc recvFrom[T](transp: DatagramTransport, remote: TransportAddress, - msg: sink seq[T], msglen = -1) = - #echo "recv from ", remote - {.gcsafe.}: - transp.ingress.addLast(msg) - # call the callback on remote - asyncCheck transp.callback(transp, remote) - - proc getLatency(src: TransportAddress, dst: TransportAddress) : Duration = - 50.milliseconds - - proc getLineTime(transp: DatagramTransport, msg: seq[byte]) : Duration = - # let bandwith = transp.bandwidth - let bandwidth = 100 # Bytes/ms = KB/sec - (msg.len div bandwidth).milliseconds - - proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress, - msg: sink seq[T], msglen = -1) {.async.} = - - #transp.egress.addLast(remote, msg) - #await sleepAsync(getLineTime(transp, msg)) - - await sleepAsync(getLatency(transp.local, remote)) - {.gcsafe.}: - network[remote.port].recvFrom(transp.local, msg) - - proc getMessage*(t: DatagramTransport,): seq[byte] {. - raises: [Defect, CatchableError].} = - #echo "getMessage " - t.ingress.popFirst() - - proc close*(transp: DatagramTransport) = - debug "close" - - proc closed*(transp: DatagramTransport): bool {.inline.} = - result = false - - proc closeWait*(transp: DatagramTransport) {.async.} = - debug "closeWait " - - proc getUserData*[T](transp: DatagramTransport): T {.inline.} = - ## Obtain user data stored in ``transp`` object. - result = cast[T](transp.udata) - - proc newDatagramTransport*[T](cbproc: DatagramCallback, - udata: ref T, - local: TransportAddress = AnyAddress, - ): DatagramTransport {. - raises: [Defect, CatchableError].} = - debug "new" - result = DatagramTransport() - GC_ref(udata) - result.udata = cast[pointer](udata) - result.local = local - result.callback = cbproc - {.gcsafe.}: - network[local.port] = result - - type Transport* [Client] = ref object client: Client