mirror of
https://github.com/logos-co/nomos-specs.git
synced 2025-02-02 10:34:50 +00:00
46 lines
1.1 KiB
Python
46 lines
1.1 KiB
Python
from typing import Any, Awaitable, Coroutine
|
|
|
|
import usim
|
|
|
|
from mixnet.framework import framework
|
|
|
|
|
|
class Framework(framework.Framework):
|
|
_scope: usim.Scope
|
|
|
|
def __init__(self, scope: usim.Scope) -> None:
|
|
super().__init__()
|
|
self._scope = scope
|
|
|
|
def queue(self) -> framework.Queue:
|
|
return Queue()
|
|
|
|
async def sleep(self, seconds: float) -> None:
|
|
await (usim.time + seconds)
|
|
|
|
def now(self) -> float:
|
|
# round to milliseconds to make analysis not too heavy
|
|
return int(usim.time.now * 1000) / 1000
|
|
|
|
def spawn(
|
|
self, coroutine: Coroutine[Any, Any, framework.RT]
|
|
) -> Awaitable[framework.RT]:
|
|
return self._scope.do(coroutine)
|
|
|
|
|
|
class Queue(framework.Queue):
|
|
_queue: usim.Queue[bytes]
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._queue = usim.Queue()
|
|
|
|
async def put(self, data: bytes) -> None:
|
|
await self._queue.put(data)
|
|
|
|
async def get(self) -> bytes:
|
|
return await self._queue
|
|
|
|
def empty(self) -> bool:
|
|
return len(self._queue._buffer) == 0
|