diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 2bffc2de..c47d5891 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -10,6 +10,7 @@ import std/sequtils import pkg/chronos +import ../../asyncyeah import pkg/chronicles import pkg/libp2p import pkg/metrics @@ -64,7 +65,7 @@ type inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests advertiseType*: BlockType # Advertice blocks, manifests or both -proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = +proc discoveryQueueLoop(b: DiscoveryEngine) {.asyncyeah.} = while b.discEngineRunning: for cid in toSeq(b.pendingBlocks.wantList): try: @@ -79,11 +80,11 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = trace "About to sleep discovery loop" await sleepAsync(b.discoveryLoopSleep) -proc heartbeatLoop(b: DiscoveryEngine) {.async.} = +proc heartbeatLoop(b: DiscoveryEngine) {.asyncyeah.} = while b.discEngineRunning: await sleepAsync(1.seconds) -proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} = +proc advertiseQueueLoop*(b: DiscoveryEngine) {.asyncyeah.} = while b.discEngineRunning: if cids =? await b.localStore.listBlocks(blockType = b.advertiseType): for c in cids: @@ -96,7 +97,7 @@ proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} = trace "Exiting advertise task loop" -proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = +proc advertiseTaskLoop(b: DiscoveryEngine) {.asyncyeah.} = ## Run advertise tasks ## @@ -127,7 +128,7 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = trace "Exiting advertise task runner" -proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = +proc discoveryTaskLoop(b: DiscoveryEngine) {.asyncyeah.} = ## Run discovery tasks ## @@ -192,7 +193,7 @@ proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = except CatchableError as exc: trace "Exception queueing discovery request", exc = exc.msg -proc start*(b: DiscoveryEngine) {.async.} = +proc start*(b: DiscoveryEngine) {.asyncyeah.} = ## Start the discengine task ## @@ -213,7 +214,7 @@ proc start*(b: DiscoveryEngine) {.async.} = b.discoveryLoop = discoveryQueueLoop(b) b.heartbeatLoop = heartbeatLoop(b) -proc stop*(b: DiscoveryEngine) {.async.} = +proc stop*(b: DiscoveryEngine) {.asyncyeah.} = ## Stop the discovery engine ## diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index d1fe1dfe..1280c254 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -13,6 +13,7 @@ import std/options import std/algorithm import pkg/chronos +import ../../asyncyeah import pkg/chronicles import pkg/libp2p import pkg/stint @@ -83,7 +84,7 @@ proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} = proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.} -proc start*(b: BlockExcEngine) {.async.} = +proc start*(b: BlockExcEngine) {.asyncyeah.} = ## Start the blockexc task ## @@ -98,7 +99,7 @@ proc start*(b: BlockExcEngine) {.async.} = for i in 0.. bool: not b.peers.anyIt( cid in it.peerHave )) -proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = +proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.asyncyeah.} = trace "Schedule a task for new blocks", items = blocks.len let @@ -270,7 +271,7 @@ proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = break # do next peer -proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = +proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.asyncyeah.} = ## Resolve pending blocks from the pending blocks manager ## and schedule any new task to be ran ## @@ -283,7 +284,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, - blocks: seq[bt.Block]) {.async.} = + blocks: seq[bt.Block]) {.asyncyeah.} = trace "Paying for blocks", blocks = blocks.len let @@ -297,7 +298,7 @@ proc payForBlocks(engine: BlockExcEngine, proc blocksHandler*( b: BlockExcEngine, peer: PeerId, - blocks: seq[bt.Block]) {.async.} = + blocks: seq[bt.Block]) {.asyncyeah.} = ## handle incoming blocks ## @@ -318,7 +319,7 @@ proc blocksHandler*( proc wantListHandler*( b: BlockExcEngine, peer: PeerId, - wantList: Wantlist) {.async.} = + wantList: Wantlist) {.asyncyeah.} = ## Handle incoming want lists ## @@ -386,7 +387,7 @@ proc wantListHandler*( proc accountHandler*( engine: BlockExcEngine, peer: PeerId, - account: Account) {.async.} = + account: Account) {.asyncyeah.} = let context = engine.peers.get(peer) if context.isNil: return @@ -396,7 +397,7 @@ proc accountHandler*( proc paymentHandler*( engine: BlockExcEngine, peer: PeerId, - payment: SignedState) {.async.} = + payment: SignedState) {.asyncyeah.} = trace "Handling payments", peer without context =? engine.peers.get(peer).option and @@ -410,7 +411,7 @@ proc paymentHandler*( else: context.paymentChannel = engine.wallet.acceptChannel(payment).option -proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} = +proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.asyncyeah.} = ## Perform initial setup, such as want ## list exchange ## @@ -439,7 +440,7 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerId) = # drop the peer from the peers table b.peers.remove(peer) -proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = +proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, asyncyeah.} = trace "Handling task for peer", peer = task.id # Send to the peer blocks he wants to get, @@ -483,7 +484,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = ) trace "Removed entries from peerWants", items = task.peerWants.len -proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = +proc blockexcTaskRunner(b: BlockExcEngine) {.asyncyeah.} = ## process tasks ## @@ -512,7 +513,7 @@ proc new*( peersPerRequest = DefaultMaxPeersPerRequest ): BlockExcEngine = ## Create new block exchange engine instance - ## + ## let engine = BlockExcEngine( diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 171311f4..fbd21fea 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -15,6 +15,7 @@ push: {.upraises: [].} import pkg/chronicles import pkg/chronos +import ../../asyncyeah import pkg/libp2p import ../../blocktype @@ -38,7 +39,7 @@ proc getWantHandle*( cid: Cid, timeout = DefaultBlockTimeout, inFlight = false -): Future[Block] {.async.} = +): Future[Block] {.asyncyeah.} = ## Add an event for a block ## diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index ef048e45..18a5de26 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -12,6 +12,7 @@ import std/sequtils import pkg/chronicles import pkg/chronos +import ../../asyncyeah import pkg/libp2p import pkg/libp2p/utils/semaphore @@ -75,7 +76,7 @@ type getConn: ConnProvider inflightSema: AsyncSemaphore -proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = +proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.asyncyeah.} = ## Send message to peer ## @@ -92,7 +93,7 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = proc handleWantList( b: BlockExcNetwork, peer: NetworkPeer, - list: Wantlist) {.async.} = + list: Wantlist) {.asyncyeah.} = ## Handle incoming want list ## @@ -110,8 +111,8 @@ proc makeWantList*( sendDontHave: bool = false ): Wantlist = ## make list of wanted entries - ## - + ## + Wantlist( entries: cids.mapIt( Entry( @@ -150,7 +151,7 @@ proc handleBlocks( b: BlockExcNetwork, peer: NetworkPeer, blocks: seq[pb.Block] -) {.async.} = +) {.asyncyeah.} = ## Handle incoming blocks ## @@ -191,7 +192,7 @@ proc sendBlocks*( proc handleBlockPresence( b: BlockExcNetwork, peer: NetworkPeer, - presence: seq[BlockPresence]) {.async.} = + presence: seq[BlockPresence]) {.asyncyeah.} = ## Handle block presence ## @@ -212,7 +213,7 @@ proc handleAccount( network: BlockExcNetwork, peer: NetworkPeer, account: Account -) {.async.} = +) {.asyncyeah.} = ## Handle account info ## @@ -243,7 +244,7 @@ proc handlePayment( network: BlockExcNetwork, peer: NetworkPeer, payment: SignedState -) {.async.} = +) {.asyncyeah.} = ## Handle payment ## @@ -254,9 +255,9 @@ proc rpcHandler( b: BlockExcNetwork, peer: NetworkPeer, msg: Message -) {.async.} = +) {.asyncyeah.} = ## handle rpc messages - ## + ## try: if msg.wantlist.entries.len > 0: asyncSpawn b.handleWantList(peer, msg.wantlist) @@ -310,7 +311,7 @@ proc setupPeer*(b: BlockExcNetwork, peer: PeerId) = discard b.getOrCreatePeer(peer) -proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = +proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.asyncyeah.} = await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) proc dropPeer*(b: BlockExcNetwork, peer: PeerId) = @@ -323,7 +324,7 @@ method init*(b: BlockExcNetwork) = ## Perform protocol initialization ## - proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = + proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.asyncyeah.} = if event.kind == PeerEventKind.Joined: b.setupPeer(peerId) else: @@ -332,7 +333,7 @@ method init*(b: BlockExcNetwork) = b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + proc handle(conn: Connection, proto: string) {.asyncyeah, gcsafe, closure.} = let peerId = conn.peerId let blockexcPeer = b.getOrCreatePeer(peerId) await blockexcPeer.readLoop(conn) # attach read loop diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index 2c8890b0..63979751 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -11,6 +11,7 @@ import pkg/upraises push: {.upraises: [].} import pkg/chronos +import ../../asyncyeah import pkg/chronicles import pkg/libp2p @@ -38,7 +39,7 @@ proc connected*(b: NetworkPeer): bool = not(isNil(b.sendConn)) and not(b.sendConn.closed or b.sendConn.atEof) -proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = +proc readLoop*(b: NetworkPeer, conn: Connection) {.asyncyeah.} = if isNil(conn): return @@ -54,7 +55,7 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = finally: await conn.close() -proc connect*(b: NetworkPeer): Future[Connection] {.async.} = +proc connect*(b: NetworkPeer): Future[Connection] {.asyncyeah.} = if b.connected: return b.sendConn @@ -62,7 +63,7 @@ proc connect*(b: NetworkPeer): Future[Connection] {.async.} = asyncSpawn b.readLoop(b.sendConn) return b.sendConn -proc send*(b: NetworkPeer, msg: Message) {.async.} = +proc send*(b: NetworkPeer, msg: Message) {.asyncyeah.} = let conn = await b.connect() if isNil(conn): @@ -73,7 +74,7 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} = await conn.writeLp(protobufEncode(msg)) proc broadcast*(b: NetworkPeer, msg: Message) = - proc sendAwaiter() {.async.} = + proc sendAwaiter() {.asyncyeah.} = try: await b.send(msg) except CatchableError as exc: diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index 6c9eac1c..963ac157 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -13,6 +13,7 @@ import std/tables import pkg/chronicles import pkg/libp2p import pkg/chronos +import ../../asyncyeah import pkg/nitro import pkg/questionable diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index a38196b2..5866da0a 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -16,6 +16,7 @@ import pkg/upraises push: {.upraises: [].} import pkg/chronos +import ../../asyncyeah import pkg/chronicles import pkg/libp2p diff --git a/codex/chunker.nim b/codex/chunker.nim index 58fcb4a3..7b5a7d9e 100644 --- a/codex/chunker.nim +++ b/codex/chunker.nim @@ -17,6 +17,7 @@ import pkg/chronicles import pkg/questionable import pkg/questionable/results import pkg/chronos +import ./asyncyeah import pkg/libp2p except shuffle import ./blocktype @@ -41,7 +42,7 @@ type FileChunker* = Chunker LPStreamChunker* = Chunker -proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} = +proc getBytes*(c: Chunker): Future[seq[byte]] {.asyncyeah.} = ## returns a chunk of bytes from ## the instantiated chunker ## @@ -83,7 +84,7 @@ proc new*( ## proc reader(data: ChunkBuffer, len: int): Future[int] - {.gcsafe, async, raises: [Defect].} = + {.gcsafe, asyncyeah, raises: [Defect].} = var res = 0 try: while res < len: @@ -111,7 +112,7 @@ proc new*( ## proc reader(data: ChunkBuffer, len: int): Future[int] - {.gcsafe, async, raises: [Defect].} = + {.gcsafe, asyncyeah, raises: [Defect].} = var total = 0 try: while total < len: diff --git a/codex/clock.nim b/codex/clock.nim index 88df94da..c1d173e6 100644 --- a/codex/clock.nim +++ b/codex/clock.nim @@ -1,4 +1,5 @@ import pkg/chronos +import ./asyncyeah import pkg/stew/endians2 import pkg/upraises @@ -10,12 +11,12 @@ type method now*(clock: Clock): SecondsSince1970 {.base, upraises: [].} = raiseAssert "not implemented" -method waitUntil*(clock: Clock, time: SecondsSince1970) {.base, async.} = +method waitUntil*(clock: Clock, time: SecondsSince1970) {.base, asyncyeah.} = raiseAssert "not implemented" proc withTimeout*(future: Future[void], clock: Clock, - expiry: SecondsSince1970) {.async.} = + expiry: SecondsSince1970) {.asyncyeah.} = let timeout = clock.waitUntil(expiry) try: await future or timeout diff --git a/codex/codex.nim b/codex/codex.nim index a7de75f4..95f0454b 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -13,6 +13,7 @@ import std/tables import pkg/chronicles import pkg/chronos +import ./asyncyeah import pkg/presto import pkg/libp2p import pkg/confutils @@ -55,7 +56,7 @@ type proc bootstrapInteractions( config: CodexConf, repo: RepoStore -): Future[Contracts] {.async.} = +): Future[Contracts] {.asyncyeah.} = ## bootstrap interactions and return contracts ## using clients, hosts, validators pairings ## @@ -105,7 +106,7 @@ proc bootstrapInteractions( return (client, host, validator) -proc start*(s: CodexServer) {.async.} = +proc start*(s: CodexServer) {.asyncyeah.} = notice "Starting codex node" await s.repoStore.start() @@ -143,7 +144,7 @@ proc start*(s: CodexServer) {.async.} = s.runHandle = newFuture[void]("codex.runHandle") await s.runHandle -proc stop*(s: CodexServer) {.async.} = +proc stop*(s: CodexServer) {.asyncyeah.} = notice "Stopping codex node" await allFuturesThrowing( diff --git a/codex/conf.nim b/codex/conf.nim index 5eedd463..4032c9d7 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -18,6 +18,7 @@ import std/strutils import std/typetraits import pkg/chronos +import ./asyncyeah import pkg/chronicles import pkg/chronicles/helpers import pkg/chronicles/topics_registry diff --git a/codex/contracts/clock.nim b/codex/contracts/clock.nim index d7136573..a2630a2c 100644 --- a/codex/contracts/clock.nim +++ b/codex/contracts/clock.nim @@ -1,6 +1,7 @@ import std/times import pkg/ethers import pkg/chronos +import ../asyncyeah import pkg/stint import ../clock @@ -17,12 +18,12 @@ type proc new*(_: type OnChainClock, provider: Provider): OnChainClock = OnChainClock(provider: provider, newBlock: newAsyncEvent()) -proc start*(clock: OnChainClock) {.async.} = +proc start*(clock: OnChainClock) {.asyncyeah.} = if clock.started: return clock.started = true - proc onBlock(blck: Block) {.async, upraises:[].} = + proc onBlock(blck: Block) {.asyncyeah, upraises:[].} = let blockTime = initTime(blck.timestamp.truncate(int64), 0) let computerTime = getTime() clock.offset = blockTime - computerTime @@ -33,7 +34,7 @@ proc start*(clock: OnChainClock) {.async.} = clock.subscription = await clock.provider.subscribe(onBlock) -proc stop*(clock: OnChainClock) {.async.} = +proc stop*(clock: OnChainClock) {.asyncyeah.} = if not clock.started: return clock.started = false @@ -44,7 +45,7 @@ method now*(clock: OnChainClock): SecondsSince1970 = doAssert clock.started, "clock should be started before calling now()" toUnix(getTime() + clock.offset) -method waitUntil*(clock: OnChainClock, time: SecondsSince1970) {.async.} = +method waitUntil*(clock: OnChainClock, time: SecondsSince1970) {.asyncyeah.} = while (let difference = time - clock.now(); difference > 0): clock.newBlock.clear() discard await clock.newBlock.wait().withTimeout(chronos.seconds(difference)) diff --git a/codex/contracts/deployment.nim b/codex/contracts/deployment.nim index c5526064..8ccc8da9 100644 --- a/codex/contracts/deployment.nim +++ b/codex/contracts/deployment.nim @@ -7,6 +7,7 @@ import pkg/chronicles import ../conf import ./marketplace +import ../asyncyeah type Deployment* = ref object provider: Provider @@ -31,7 +32,7 @@ proc getKnownAddress(T: type, chainId: UInt256): ?Address = proc new*(_: type Deployment, provider: Provider, config: CodexConf): Deployment = Deployment(provider: provider, config: config) -proc address*(deployment: Deployment, contract: type): Future[?Address] {.async.} = +proc address*(deployment: Deployment, contract: type): Future[?Address] {.asyncyeah.} = when contract is Marketplace: if address =? deployment.config.marketplaceAddress: return some address diff --git a/codex/contracts/interactions/clientinteractions.nim b/codex/contracts/interactions/clientinteractions.nim index c30a22e4..08a9fa92 100644 --- a/codex/contracts/interactions/clientinteractions.nim +++ b/codex/contracts/interactions/clientinteractions.nim @@ -5,6 +5,7 @@ import ../../purchasing import ../market import ../clock import ./interactions +import ../../asyncyeah export purchasing export chronicles @@ -18,10 +19,10 @@ proc new*(_: type ClientInteractions, purchasing: Purchasing): ClientInteractions = ClientInteractions(clock: clock, purchasing: purchasing) -proc start*(self: ClientInteractions) {.async.} = +proc start*(self: ClientInteractions) {.asyncyeah.} = await procCall ContractInteractions(self).start() await self.purchasing.start() -proc stop*(self: ClientInteractions) {.async.} = +proc stop*(self: ClientInteractions) {.asyncyeah.} = await self.purchasing.stop() await procCall ContractInteractions(self).stop() diff --git a/codex/contracts/interactions/hostinteractions.nim b/codex/contracts/interactions/hostinteractions.nim index 5e8beac5..1b1490c1 100644 --- a/codex/contracts/interactions/hostinteractions.nim +++ b/codex/contracts/interactions/hostinteractions.nim @@ -4,6 +4,7 @@ import pkg/chronicles import ../../sales import ../../proving import ./interactions +import ../../asyncyeah export sales export proving @@ -21,15 +22,15 @@ proc new*( proving: Proving ): HostInteractions = ## Create a new HostInteractions instance - ## + ## HostInteractions(clock: clock, sales: sales, proving: proving) -method start*(self: HostInteractions) {.async.} = +method start*(self: HostInteractions) {.asyncyeah.} = await procCall ContractInteractions(self).start() await self.sales.start() await self.proving.start() -method stop*(self: HostInteractions) {.async.} = +method stop*(self: HostInteractions) {.asyncyeah.} = await self.sales.stop() await self.proving.stop() await procCall ContractInteractions(self).start() diff --git a/codex/contracts/interactions/interactions.nim b/codex/contracts/interactions/interactions.nim index 3ad67991..62dc5014 100644 --- a/codex/contracts/interactions/interactions.nim +++ b/codex/contracts/interactions/interactions.nim @@ -2,6 +2,7 @@ import pkg/ethers import ../clock import ../marketplace import ../market +import ../../asyncyeah export clock @@ -9,8 +10,8 @@ type ContractInteractions* = ref object of RootObj clock*: OnChainClock -method start*(self: ContractInteractions) {.async, base.} = +method start*(self: ContractInteractions) {.asyncyeah, base.} = await self.clock.start() -method stop*(self: ContractInteractions) {.async, base.} = +method stop*(self: ContractInteractions) {.asyncyeah, base.} = await self.clock.stop() diff --git a/codex/contracts/interactions/validatorinteractions.nim b/codex/contracts/interactions/validatorinteractions.nim index 1aa4026c..365abc54 100644 --- a/codex/contracts/interactions/validatorinteractions.nim +++ b/codex/contracts/interactions/validatorinteractions.nim @@ -1,5 +1,6 @@ import ./interactions import ../../validation +import ../../asyncyeah export validation @@ -12,10 +13,10 @@ proc new*(_: type ValidatorInteractions, validation: Validation): ValidatorInteractions = ValidatorInteractions(clock: clock, validation: validation) -proc start*(self: ValidatorInteractions) {.async.} = +proc start*(self: ValidatorInteractions) {.asyncyeah.} = await procCall ContractInteractions(self).start() await self.validation.start() -proc stop*(self: ValidatorInteractions) {.async.} = +proc stop*(self: ValidatorInteractions) {.asyncyeah.} = await self.validation.stop() await procCall ContractInteractions(self).stop() diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index b7b7c94f..c666288b 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -8,6 +8,7 @@ import pkg/questionable import pkg/chronicles import ../market import ./marketplace +import ../asyncyeah export market @@ -31,41 +32,41 @@ func new*(_: type OnChainMarket, contract: Marketplace): OnChainMarket = signer: signer, ) -proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} = +proc approveFunds(market: OnChainMarket, amount: UInt256) {.asyncyeah.} = debug "Approving tokens", amount let tokenAddress = await market.contract.token() let token = Erc20Token.new(tokenAddress, market.signer) await token.approve(market.contract.address(), amount) -method getSigner*(market: OnChainMarket): Future[Address] {.async.} = +method getSigner*(market: OnChainMarket): Future[Address] {.asyncyeah.} = return await market.signer.getAddress() -method periodicity*(market: OnChainMarket): Future[Periodicity] {.async.} = +method periodicity*(market: OnChainMarket): Future[Periodicity] {.asyncyeah.} = let config = await market.contract.config() let period = config.proofs.period return Periodicity(seconds: period) -method proofTimeout*(market: OnChainMarket): Future[UInt256] {.async.} = +method proofTimeout*(market: OnChainMarket): Future[UInt256] {.asyncyeah.} = let config = await market.contract.config() return config.proofs.timeout -method myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async.} = +method myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.asyncyeah.} = return await market.contract.myRequests -method mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async.} = +method mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.asyncyeah.} = let slots = await market.contract.mySlots() debug "Fetched my slots", numSlots=len(slots) return slots -method requestStorage(market: OnChainMarket, request: StorageRequest){.async.} = +method requestStorage(market: OnChainMarket, request: StorageRequest){.asyncyeah.} = debug "Requesting storage" await market.approveFunds(request.price()) await market.contract.requestStorage(request) method getRequest(market: OnChainMarket, - id: RequestId): Future[?StorageRequest] {.async.} = + id: RequestId): Future[?StorageRequest] {.asyncyeah.} = try: return some await market.contract.getRequest(id) except ProviderError as e: @@ -74,7 +75,7 @@ method getRequest(market: OnChainMarket, raise e method requestState*(market: OnChainMarket, - requestId: RequestId): Future[?RequestState] {.async.} = + requestId: RequestId): Future[?RequestState] {.asyncyeah.} = try: return some await market.contract.requestState(requestId) except ProviderError as e: @@ -83,16 +84,16 @@ method requestState*(market: OnChainMarket, raise e method slotState*(market: OnChainMarket, - slotId: SlotId): Future[SlotState] {.async.} = + slotId: SlotId): Future[SlotState] {.asyncyeah.} = return await market.contract.slotState(slotId) method getRequestEnd*(market: OnChainMarket, - id: RequestId): Future[SecondsSince1970] {.async.} = + id: RequestId): Future[SecondsSince1970] {.asyncyeah.} = return await market.contract.requestEnd(id) method getHost(market: OnChainMarket, requestId: RequestId, - slotIndex: UInt256): Future[?Address] {.async.} = + slotIndex: UInt256): Future[?Address] {.asyncyeah.} = let slotId = slotId(requestId, slotIndex) let address = await market.contract.getHost(slotId) if address != Address.default: @@ -101,7 +102,7 @@ method getHost(market: OnChainMarket, return none Address method getActiveSlot*(market: OnChainMarket, - slotId: SlotId): Future[?Slot] {.async.} = + slotId: SlotId): Future[?Slot] {.asyncyeah.} = try: return some await market.contract.getActiveSlot(slotId) @@ -114,19 +115,19 @@ method fillSlot(market: OnChainMarket, requestId: RequestId, slotIndex: UInt256, proof: seq[byte], - collateral: UInt256) {.async.} = + collateral: UInt256) {.asyncyeah.} = await market.approveFunds(collateral) await market.contract.fillSlot(requestId, slotIndex, proof) -method freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} = +method freeSlot*(market: OnChainMarket, slotId: SlotId) {.asyncyeah.} = await market.contract.freeSlot(slotId) method withdrawFunds(market: OnChainMarket, - requestId: RequestId) {.async.} = + requestId: RequestId) {.asyncyeah.} = await market.contract.withdrawFunds(requestId) method isProofRequired*(market: OnChainMarket, - id: SlotId): Future[bool] {.async.} = + id: SlotId): Future[bool] {.asyncyeah.} = try: return await market.contract.isProofRequired(id) except ProviderError as e: @@ -135,7 +136,7 @@ method isProofRequired*(market: OnChainMarket, raise e method willProofBeRequired*(market: OnChainMarket, - id: SlotId): Future[bool] {.async.} = + id: SlotId): Future[bool] {.asyncyeah.} = try: return await market.contract.willProofBeRequired(id) except ProviderError as e: @@ -145,19 +146,19 @@ method willProofBeRequired*(market: OnChainMarket, method submitProof*(market: OnChainMarket, id: SlotId, - proof: seq[byte]) {.async.} = + proof: seq[byte]) {.asyncyeah.} = await market.contract.submitProof(id, proof) method markProofAsMissing*(market: OnChainMarket, id: SlotId, - period: Period) {.async.} = + period: Period) {.asyncyeah.} = await market.contract.markProofAsMissing(id, period) method canProofBeMarkedAsMissing*( market: OnChainMarket, id: SlotId, period: Period -): Future[bool] {.async.} = +): Future[bool] {.asyncyeah.} = let provider = market.contract.provider let contractWithoutSigner = market.contract.connect(provider) let overrides = CallOverrides(blockTag: some BlockTag.pending) @@ -170,7 +171,7 @@ method canProofBeMarkedAsMissing*( method subscribeRequests(market: OnChainMarket, callback: OnRequest): - Future[MarketSubscription] {.async.} = + Future[MarketSubscription] {.asyncyeah.} = proc onEvent(event: StorageRequested) {.upraises:[].} = callback(event.requestId, event.ask) let subscription = await market.contract.subscribe(StorageRequested, onEvent) @@ -178,7 +179,7 @@ method subscribeRequests(market: OnChainMarket, method subscribeSlotFilled*(market: OnChainMarket, callback: OnSlotFilled): - Future[MarketSubscription] {.async.} = + Future[MarketSubscription] {.asyncyeah.} = proc onEvent(event: SlotFilled) {.upraises:[].} = callback(event.requestId, event.slotIndex) let subscription = await market.contract.subscribe(SlotFilled, onEvent) @@ -188,7 +189,7 @@ method subscribeSlotFilled*(market: OnChainMarket, requestId: RequestId, slotIndex: UInt256, callback: OnSlotFilled): - Future[MarketSubscription] {.async.} = + Future[MarketSubscription] {.asyncyeah.} = proc onSlotFilled(eventRequestId: RequestId, eventSlotIndex: UInt256) = if eventRequestId == requestId and eventSlotIndex == slotIndex: callback(requestId, slotIndex) @@ -196,7 +197,7 @@ method subscribeSlotFilled*(market: OnChainMarket, method subscribeSlotFreed*(market: OnChainMarket, callback: OnSlotFreed): - Future[MarketSubscription] {.async.} = + Future[MarketSubscription] {.asyncyeah.} = proc onEvent(event: SlotFreed) {.upraises:[].} = callback(event.slotId) let subscription = await market.contract.subscribe(SlotFreed, onEvent) @@ -205,7 +206,7 @@ method subscribeSlotFreed*(market: OnChainMarket, method subscribeFulfillment(market: OnChainMarket, requestId: RequestId, callback: OnFulfillment): - Future[MarketSubscription] {.async.} = + Future[MarketSubscription] {.asyncyeah.} = proc onEvent(event: RequestFulfilled) {.upraises:[].} = if event.requestId == requestId: callback(event.requestId) @@ -215,7 +216,7 @@ method subscribeFulfillment(market: OnChainMarket, method subscribeRequestCancelled*(market: OnChainMarket, requestId: RequestId, callback: OnRequestCancelled): - Future[MarketSubscription] {.async.} = + Future[MarketSubscription] {.asyncyeah.} = proc onEvent(event: RequestCancelled) {.upraises:[].} = if event.requestId == requestId: callback(event.requestId) @@ -225,7 +226,7 @@ method subscribeRequestCancelled*(market: OnChainMarket, method subscribeRequestFailed*(market: OnChainMarket, requestId: RequestId, callback: OnRequestFailed): - Future[MarketSubscription] {.async.} = + Future[MarketSubscription] {.asyncyeah.} = proc onEvent(event: RequestFailed) {.upraises:[]} = if event.requestId == requestId: callback(event.requestId) @@ -234,11 +235,11 @@ method subscribeRequestFailed*(market: OnChainMarket, method subscribeProofSubmission*(market: OnChainMarket, callback: OnProofSubmitted): - Future[MarketSubscription] {.async.} = + Future[MarketSubscription] {.asyncyeah.} = proc onEvent(event: ProofSubmitted) {.upraises: [].} = callback(event.id, event.proof) let subscription = await market.contract.subscribe(ProofSubmitted, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) -method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} = +method unsubscribe*(subscription: OnChainMarketSubscription) {.asyncyeah.} = await subscription.eventSubscription.unsubscribe() diff --git a/codex/contracts/marketplace.nim b/codex/contracts/marketplace.nim index 45d92335..14b36c3d 100644 --- a/codex/contracts/marketplace.nim +++ b/codex/contracts/marketplace.nim @@ -3,6 +3,7 @@ import pkg/ethers/erc20 import pkg/json_rpc/rpcclient import pkg/stint import pkg/chronos +import ../asyncyeah import ../clock import ./requests import ./config diff --git a/codex/discovery.nim b/codex/discovery.nim index 73fd712e..2aec4c74 100644 --- a/codex/discovery.nim +++ b/codex/discovery.nim @@ -10,6 +10,7 @@ import std/algorithm import pkg/chronos +import ./asyncyeah import pkg/chronicles import pkg/libp2p import pkg/libp2p/routing_record @@ -58,7 +59,7 @@ proc toNodeId*(host: ca.Address): NodeId = proc findPeer*( d: Discovery, peerId: PeerId -): Future[?PeerRecord] {.async.} = +): Future[?PeerRecord] {.asyncyeah.} = trace "protocol.resolve..." ## Find peer using the given Discovery object ## @@ -74,9 +75,9 @@ proc findPeer*( method find*( d: Discovery, cid: Cid -): Future[seq[SignedPeerRecord]] {.async, base.} = +): Future[seq[SignedPeerRecord]] {.asyncyeah, base.} = ## Find block providers - ## + ## trace "Finding providers for block", cid without providers =? @@ -85,7 +86,7 @@ method find*( return providers -method provide*(d: Discovery, cid: Cid) {.async, base.} = +method provide*(d: Discovery, cid: Cid) {.asyncyeah, base.} = ## Provide a bock Cid ## @@ -102,7 +103,7 @@ method provide*(d: Discovery, cid: Cid) {.async, base.} = method find*( d: Discovery, host: ca.Address -): Future[seq[SignedPeerRecord]] {.async, base.} = +): Future[seq[SignedPeerRecord]] {.asyncyeah, base.} = ## Find host providers ## @@ -121,7 +122,7 @@ method find*( return providers -method provide*(d: Discovery, host: ca.Address) {.async, base.} = +method provide*(d: Discovery, host: ca.Address) {.asyncyeah, base.} = ## Provide hosts ## @@ -169,11 +170,11 @@ proc updateDhtRecord*(d: Discovery, ip: ValidIpAddress, port: Port) = IpTransportProtocol.udpProtocol, port)])).expect("Should construct signed record").some -proc start*(d: Discovery) {.async.} = +proc start*(d: Discovery) {.asyncyeah.} = d.protocol.open() await d.protocol.start() -proc stop*(d: Discovery) {.async.} = +proc stop*(d: Discovery) {.asyncyeah.} = await d.protocol.closeWait() proc new*( @@ -185,8 +186,8 @@ proc new*( bootstrapNodes: openArray[SignedPeerRecord] = [], store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!") ): Discovery = - ## Create a new Discovery node instance for the given key and datastore - ## + ## Create a new Discovery node instance for the given key and datastore + ## var self = Discovery( diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 9b4df58d..5f40eab0 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -14,6 +14,7 @@ push: {.upraises: [].} import std/sequtils import pkg/chronos +import ../asyncyeah import pkg/chronicles import ../manifest @@ -67,7 +68,7 @@ proc encode*( manifest: Manifest, blocks: int, parity: int -): Future[?!Manifest] {.async.} = +): Future[?!Manifest] {.asyncyeah.} = ## Encode a manifest into one that is erasure protected. ## ## `manifest` - the original manifest to be encoded @@ -156,7 +157,7 @@ proc encode*( proc decode*( self: Erasure, encoded: Manifest -): Future[?!Manifest] {.async.} = +): Future[?!Manifest] {.asyncyeah.} = ## Decode a protected manifest into it's original ## manifest ## @@ -260,10 +261,10 @@ proc decode*( return decoded.success -proc start*(self: Erasure) {.async.} = +proc start*(self: Erasure) {.asyncyeah.} = return -proc stop*(self: Erasure) {.async.} = +proc stop*(self: Erasure) {.asyncyeah.} = return proc new*( diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index e0515ba0..7f2a8987 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -20,6 +20,7 @@ import pkg/questionable import pkg/questionable/results import pkg/chronicles import pkg/chronos +import ../asyncyeah import ./manifest import ../errors diff --git a/codex/market.nim b/codex/market.nim index e2a233a6..630e7e8d 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -1,4 +1,5 @@ import pkg/chronos +import ./asyncyeah import pkg/upraises import pkg/questionable import pkg/ethers/erc20 @@ -23,50 +24,50 @@ type OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises:[].} OnProofSubmitted* = proc(id: SlotId, proof: seq[byte]) {.gcsafe, upraises:[].} -method getSigner*(market: Market): Future[Address] {.base, async.} = +method getSigner*(market: Market): Future[Address] {.base, asyncyeah.} = raiseAssert("not implemented") -method periodicity*(market: Market): Future[Periodicity] {.base, async.} = +method periodicity*(market: Market): Future[Periodicity] {.base, asyncyeah.} = raiseAssert("not implemented") -method proofTimeout*(market: Market): Future[UInt256] {.base, async.} = +method proofTimeout*(market: Market): Future[UInt256] {.base, asyncyeah.} = raiseAssert("not implemented") method requestStorage*(market: Market, - request: StorageRequest) {.base, async.} = + request: StorageRequest) {.base, asyncyeah.} = raiseAssert("not implemented") -method myRequests*(market: Market): Future[seq[RequestId]] {.base, async.} = +method myRequests*(market: Market): Future[seq[RequestId]] {.base, asyncyeah.} = raiseAssert("not implemented") -method mySlots*(market: Market): Future[seq[SlotId]] {.base, async.} = +method mySlots*(market: Market): Future[seq[SlotId]] {.base, asyncyeah.} = raiseAssert("not implemented") method getRequest*(market: Market, id: RequestId): - Future[?StorageRequest] {.base, async.} = + Future[?StorageRequest] {.base, asyncyeah.} = raiseAssert("not implemented") method requestState*(market: Market, - requestId: RequestId): Future[?RequestState] {.base, async.} = + requestId: RequestId): Future[?RequestState] {.base, asyncyeah.} = raiseAssert("not implemented") method slotState*(market: Market, - slotId: SlotId): Future[SlotState] {.base, async.} = + slotId: SlotId): Future[SlotState] {.base, asyncyeah.} = raiseAssert("not implemented") method getRequestEnd*(market: Market, - id: RequestId): Future[SecondsSince1970] {.base, async.} = + id: RequestId): Future[SecondsSince1970] {.base, asyncyeah.} = raiseAssert("not implemented") method getHost*(market: Market, requestId: RequestId, - slotIndex: UInt256): Future[?Address] {.base, async.} = + slotIndex: UInt256): Future[?Address] {.base, asyncyeah.} = raiseAssert("not implemented") method getActiveSlot*( market: Market, - slotId: SlotId): Future[?Slot] {.base, async.} = + slotId: SlotId): Future[?Slot] {.base, asyncyeah.} = raiseAssert("not implemented") @@ -74,83 +75,83 @@ method fillSlot*(market: Market, requestId: RequestId, slotIndex: UInt256, proof: seq[byte], - collateral: UInt256) {.base, async.} = + collateral: UInt256) {.base, asyncyeah.} = raiseAssert("not implemented") -method freeSlot*(market: Market, slotId: SlotId) {.base, async.} = +method freeSlot*(market: Market, slotId: SlotId) {.base, asyncyeah.} = raiseAssert("not implemented") method withdrawFunds*(market: Market, - requestId: RequestId) {.base, async.} = + requestId: RequestId) {.base, asyncyeah.} = raiseAssert("not implemented") method subscribeRequests*(market: Market, callback: OnRequest): - Future[Subscription] {.base, async.} = + Future[Subscription] {.base, asyncyeah.} = raiseAssert("not implemented") method isProofRequired*(market: Market, - id: SlotId): Future[bool] {.base, async.} = + id: SlotId): Future[bool] {.base, asyncyeah.} = raiseAssert("not implemented") method willProofBeRequired*(market: Market, - id: SlotId): Future[bool] {.base, async.} = + id: SlotId): Future[bool] {.base, asyncyeah.} = raiseAssert("not implemented") method submitProof*(market: Market, id: SlotId, - proof: seq[byte]) {.base, async.} = + proof: seq[byte]) {.base, asyncyeah.} = raiseAssert("not implemented") method markProofAsMissing*(market: Market, id: SlotId, - period: Period) {.base, async.} = + period: Period) {.base, asyncyeah.} = raiseAssert("not implemented") method canProofBeMarkedAsMissing*(market: Market, id: SlotId, - period: Period): Future[bool] {.base, async.} = + period: Period): Future[bool] {.base, asyncyeah.} = raiseAssert("not implemented") method subscribeFulfillment*(market: Market, requestId: RequestId, callback: OnFulfillment): - Future[Subscription] {.base, async.} = + Future[Subscription] {.base, asyncyeah.} = raiseAssert("not implemented") method subscribeSlotFilled*(market: Market, callback: OnSlotFilled): - Future[Subscription] {.base, async.} = + Future[Subscription] {.base, asyncyeah.} = raiseAssert("not implemented") method subscribeSlotFilled*(market: Market, requestId: RequestId, slotIndex: UInt256, callback: OnSlotFilled): - Future[Subscription] {.base, async.} = + Future[Subscription] {.base, asyncyeah.} = raiseAssert("not implemented") method subscribeSlotFreed*(market: Market, callback: OnSlotFreed): - Future[Subscription] {.base, async.} = + Future[Subscription] {.base, asyncyeah.} = raiseAssert("not implemented") method subscribeRequestCancelled*(market: Market, requestId: RequestId, callback: OnRequestCancelled): - Future[Subscription] {.base, async.} = + Future[Subscription] {.base, asyncyeah.} = raiseAssert("not implemented") method subscribeRequestFailed*(market: Market, requestId: RequestId, callback: OnRequestFailed): - Future[Subscription] {.base, async.} = + Future[Subscription] {.base, asyncyeah.} = raiseAssert("not implemented") method subscribeProofSubmission*(market: Market, callback: OnProofSubmitted): - Future[Subscription] {.base, async.} = + Future[Subscription] {.base, asyncyeah.} = raiseAssert("not implemented") -method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = +method unsubscribe*(subscription: Subscription) {.base, asyncyeah, upraises:[].} = raiseAssert("not implemented") diff --git a/codex/node.nim b/codex/node.nim index db4c18cd..c4ea3899 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -16,6 +16,7 @@ import pkg/questionable import pkg/questionable/results import pkg/chronicles import pkg/chronos +import ./asyncyeah import pkg/libp2p # TODO: remove once exported by libp2p @@ -61,9 +62,9 @@ type proc findPeer*( node: CodexNodeRef, peerId: PeerId -): Future[?PeerRecord] {.async.} = +): Future[?PeerRecord] {.asyncyeah.} = ## Find peer using the discovery service from the given CodexNode - ## + ## return await node.discovery.findPeer(peerId) proc connect*( @@ -76,7 +77,7 @@ proc connect*( proc fetchManifest*( node: CodexNodeRef, cid: Cid -): Future[?!Manifest] {.async.} = +): Future[?!Manifest] {.asyncyeah.} = ## Fetch and decode a manifest block ## @@ -102,7 +103,7 @@ proc fetchBatched*( manifest: Manifest, batchSize = FetchBatch, onBatch: BatchProc = nil -): Future[?!void] {.async, gcsafe.} = +): Future[?!void] {.asyncyeah, gcsafe.} = ## Fetch manifest in batches of `batchSize` ## @@ -130,7 +131,7 @@ proc fetchBatched*( proc retrieve*( node: CodexNodeRef, cid: Cid -): Future[?!LPStream] {.async.} = +): Future[?!LPStream] {.asyncyeah.} = ## Retrieve by Cid a single block or an entire dataset described by manifest ## @@ -138,7 +139,7 @@ proc retrieve*( trace "Retrieving blocks from manifest", cid if manifest.protected: # Retrieve, decode and save to the local store all EС groups - proc erasureJob(): Future[void] {.async.} = + proc erasureJob(): Future[void] {.asyncyeah.} = try: # Spawn an erasure decoding job without res =? (await node.erasure.decode(manifest)), error: @@ -149,7 +150,7 @@ proc retrieve*( asyncSpawn erasureJob() # else: # # Prefetch the entire dataset into the local store - # proc prefetchBlocks() {.async, raises: [Defect].} = + # proc prefetchBlocks() {.asyncyeah, raises: [Defect].} = # try: # discard await node.fetchBatched(manifest) # except CatchableError as exc: @@ -167,7 +168,7 @@ proc retrieve*( without blk =? (await node.blockStore.getBlock(cid)), err: return failure(err) - proc streamOneBlock(): Future[void] {.async.} = + proc streamOneBlock(): Future[void] {.asyncyeah.} = try: await stream.pushData(blk.data) except CatchableError as exc: @@ -185,7 +186,7 @@ proc store*( self: CodexNodeRef, stream: LPStream, blockSize = DefaultBlockSize -): Future[?!Cid] {.async.} = +): Future[?!Cid] {.asyncyeah.} = ## Save stream contents as dataset with given blockSize ## to nodes's BlockStore, and return Cid of its manifest ## @@ -256,7 +257,7 @@ proc requestStorage*( reward: UInt256, collateral: UInt256, expiry = UInt256.none -): Future[?!PurchaseId] {.async.} = +): Future[?!PurchaseId] {.asyncyeah.} = ## Initiate a request for storage sequence, this might ## be a multistep procedure. ## @@ -330,7 +331,7 @@ proc new*( contracts = Contracts.default ): CodexNodeRef = ## Create new instance of a Codex node, call `start` to run it - ## + ## CodexNodeRef( switch: switch, blockStore: store, @@ -339,7 +340,7 @@ proc new*( discovery: discovery, contracts: contracts) -proc start*(node: CodexNodeRef) {.async.} = +proc start*(node: CodexNodeRef) {.asyncyeah.} = if not node.switch.isNil: await node.switch.start() @@ -413,7 +414,7 @@ proc start*(node: CodexNodeRef) {.async.} = node.networkId = node.switch.peerInfo.peerId notice "Started codex node", id = $node.networkId, addrs = node.switch.peerInfo.addrs -proc stop*(node: CodexNodeRef) {.async.} = +proc stop*(node: CodexNodeRef) {.asyncyeah.} = trace "Stopping node" if not node.engine.isNil: diff --git a/codex/node/batch.nim b/codex/node/batch.nim index b6482989..683aaf54 100644 --- a/codex/node/batch.nim +++ b/codex/node/batch.nim @@ -1,4 +1,5 @@ import pkg/chronos +import ../asyncyeah import pkg/upraises import ../blocktype as bt diff --git a/codex/proving/proving.nim b/codex/proving/proving.nim index 0bad7184..ae0d8311 100644 --- a/codex/proving/proving.nim +++ b/codex/proving/proving.nim @@ -4,6 +4,7 @@ import pkg/questionable import pkg/chronicles import ../market import ../clock +import ../asyncyeah export sets @@ -31,15 +32,15 @@ proc `onProve=`*(proving: Proving, callback: OnProve) = func add*(proving: Proving, slot: Slot) = proving.slots.incl(slot) -proc getCurrentPeriod*(proving: Proving): Future[Period] {.async.} = +proc getCurrentPeriod*(proving: Proving): Future[Period] {.asyncyeah.} = let periodicity = await proving.market.periodicity() return periodicity.periodOf(proving.clock.now().u256) -proc waitUntilPeriod(proving: Proving, period: Period) {.async.} = +proc waitUntilPeriod(proving: Proving, period: Period) {.asyncyeah.} = let periodicity = await proving.market.periodicity() await proving.clock.waitUntil(periodicity.periodStart(period).truncate(int64)) -proc removeEndedContracts(proving: Proving) {.async.} = +proc removeEndedContracts(proving: Proving) {.asyncyeah.} = var ended: HashSet[Slot] for slot in proving.slots: let state = await proving.market.slotState(slot.id) @@ -52,7 +53,7 @@ proc removeEndedContracts(proving: Proving) {.async.} = ended.incl(slot) proving.slots.excl(ended) -method prove*(proving: Proving, slot: Slot) {.base, async.} = +method prove*(proving: Proving, slot: Slot) {.base, asyncyeah.} = logScope: currentPeriod = await proving.getCurrentPeriod() @@ -66,7 +67,7 @@ method prove*(proving: Proving, slot: Slot) {.base, async.} = except CatchableError as e: error "Submitting proof failed", msg = e.msg -proc run(proving: Proving) {.async.} = +proc run(proving: Proving) {.asyncyeah.} = try: while true: let currentPeriod = await proving.getCurrentPeriod() @@ -83,13 +84,13 @@ proc run(proving: Proving) {.async.} = except CatchableError as e: error "Proving failed", msg = e.msg -proc start*(proving: Proving) {.async.} = +proc start*(proving: Proving) {.asyncyeah.} = if proving.loop.isSome: return proving.loop = some proving.run() -proc stop*(proving: Proving) {.async.} = +proc stop*(proving: Proving) {.asyncyeah.} = if loop =? proving.loop: proving.loop = Future[void].none if not loop.finished: diff --git a/codex/proving/simulated.nim b/codex/proving/simulated.nim index ba314eb6..7b299c48 100644 --- a/codex/proving/simulated.nim +++ b/codex/proving/simulated.nim @@ -29,7 +29,7 @@ when codex_enable_proof_failures: proc onSubmitProofError(error: ref CatchableError, period: UInt256) = error "Submitting invalid proof failed", period, msg = error.msg - method prove(proving: SimulatedProving, slot: Slot) {.async.} = + method prove(proving: SimulatedProving, slot: Slot) {.asyncyeah.} = let period = await proving.getCurrentPeriod() proving.proofCount += 1 if proving.failEveryNProofs > 0'u and diff --git a/codex/purchasing.nim b/codex/purchasing.nim index c7a5f7fe..cfd716a6 100644 --- a/codex/purchasing.nim +++ b/codex/purchasing.nim @@ -1,6 +1,7 @@ import std/tables import pkg/stint import pkg/chronos +import ./asyncyeah import pkg/questionable import pkg/nimcrypto import ./market @@ -32,7 +33,7 @@ proc new*(_: type Purchasing, market: Market, clock: Clock): Purchasing = requestExpiryInterval: DefaultRequestExpiryInterval, ) -proc load*(purchasing: Purchasing) {.async.} = +proc load*(purchasing: Purchasing) {.asyncyeah.} = let market = purchasing.market let requestIds = await market.myRequests() for requestId in requestIds: @@ -40,15 +41,15 @@ proc load*(purchasing: Purchasing) {.async.} = purchase.load() purchasing.purchases[purchase.id] = purchase -proc start*(purchasing: Purchasing) {.async.} = +proc start*(purchasing: Purchasing) {.asyncyeah.} = await purchasing.load() -proc stop*(purchasing: Purchasing) {.async.} = +proc stop*(purchasing: Purchasing) {.asyncyeah.} = discard proc populate*(purchasing: Purchasing, request: StorageRequest - ): Future[StorageRequest] {.async.} = + ): Future[StorageRequest] {.asyncyeah.} = result = request if result.ask.proofProbability == 0.u256: result.ask.proofProbability = purchasing.proofProbability @@ -62,7 +63,7 @@ proc populate*(purchasing: Purchasing, proc purchase*(purchasing: Purchasing, request: StorageRequest - ): Future[Purchase] {.async.} = + ): Future[Purchase] {.asyncyeah.} = let request = await purchasing.populate(request) let purchase = Purchase.new(request, purchasing.market, purchasing.clock) purchase.start() diff --git a/codex/purchasing/purchase.nim b/codex/purchasing/purchase.nim index 07498044..394f8528 100644 --- a/codex/purchasing/purchase.nim +++ b/codex/purchasing/purchase.nim @@ -2,6 +2,7 @@ import ./statemachine import ./states/pending import ./states/unknown import ./purchaseid +import ../asyncyeah # Purchase is implemented as a state machine. # @@ -31,7 +32,7 @@ func new*( clock: Clock ): Purchase = ## create a new instance of a Purchase - ## + ## Purchase( future: Future[void].new(), requestId: requestId, @@ -56,7 +57,7 @@ proc start*(purchase: Purchase) = proc load*(purchase: Purchase) = purchase.start(PurchaseUnknown()) -proc wait*(purchase: Purchase) {.async.} = +proc wait*(purchase: Purchase) {.asyncyeah.} = await purchase.future func id*(purchase: Purchase): PurchaseId = diff --git a/codex/purchasing/states/cancelled.nim b/codex/purchasing/states/cancelled.nim index a0d8315b..464567d4 100644 --- a/codex/purchasing/states/cancelled.nim +++ b/codex/purchasing/states/cancelled.nim @@ -1,13 +1,14 @@ import ../statemachine import ./errorhandling import ./error +import ../../asyncyeah type PurchaseCancelled* = ref object of ErrorHandlingState method `$`*(state: PurchaseCancelled): string = "cancelled" -method run*(state: PurchaseCancelled, machine: Machine): Future[?State] {.async.} = +method run*(state: PurchaseCancelled, machine: Machine): Future[?State] {.asyncyeah.} = let purchase = Purchase(machine) await purchase.market.withdrawFunds(purchase.requestId) let error = newException(Timeout, "Purchase cancelled due to timeout") diff --git a/codex/purchasing/states/error.nim b/codex/purchasing/states/error.nim index df1c8d5c..72a3e5ac 100644 --- a/codex/purchasing/states/error.nim +++ b/codex/purchasing/states/error.nim @@ -1,4 +1,5 @@ import ../statemachine +import ../../asyncyeah type PurchaseErrored* = ref object of PurchaseState error*: ref CatchableError @@ -6,6 +7,6 @@ type PurchaseErrored* = ref object of PurchaseState method `$`*(state: PurchaseErrored): string = "errored" -method run*(state: PurchaseErrored, machine: Machine): Future[?State] {.async.} = +method run*(state: PurchaseErrored, machine: Machine): Future[?State] {.asyncyeah.} = let purchase = Purchase(machine) purchase.future.fail(state.error) diff --git a/codex/purchasing/states/failed.nim b/codex/purchasing/states/failed.nim index 3fbe36f7..66b0ddfa 100644 --- a/codex/purchasing/states/failed.nim +++ b/codex/purchasing/states/failed.nim @@ -1,5 +1,6 @@ import ../statemachine import ./error +import ../../asyncyeah type PurchaseFailed* = ref object of PurchaseState @@ -7,6 +8,6 @@ type method `$`*(state: PurchaseFailed): string = "failed" -method run*(state: PurchaseFailed, machine: Machine): Future[?State] {.async.} = +method run*(state: PurchaseFailed, machine: Machine): Future[?State] {.asyncyeah.} = let error = newException(PurchaseError, "Purchase failed") return some State(PurchaseErrored(error: error)) diff --git a/codex/purchasing/states/finished.nim b/codex/purchasing/states/finished.nim index 93e8b4f0..aa99636d 100644 --- a/codex/purchasing/states/finished.nim +++ b/codex/purchasing/states/finished.nim @@ -1,10 +1,11 @@ import ../statemachine +import ../../asyncyeah type PurchaseFinished* = ref object of PurchaseState method `$`*(state: PurchaseFinished): string = "finished" -method run*(state: PurchaseFinished, machine: Machine): Future[?State] {.async.} = +method run*(state: PurchaseFinished, machine: Machine): Future[?State] {.asyncyeah.} = let purchase = Purchase(machine) purchase.future.complete() diff --git a/codex/purchasing/states/pending.nim b/codex/purchasing/states/pending.nim index 64a7fdd5..254c8972 100644 --- a/codex/purchasing/states/pending.nim +++ b/codex/purchasing/states/pending.nim @@ -1,13 +1,14 @@ import ../statemachine import ./errorhandling import ./submitted +import ../../asyncyeah type PurchasePending* = ref object of ErrorHandlingState method `$`*(state: PurchasePending): string = "pending" -method run*(state: PurchasePending, machine: Machine): Future[?State] {.async.} = +method run*(state: PurchasePending, machine: Machine): Future[?State] {.asyncyeah.} = let purchase = Purchase(machine) let request = !purchase.request await purchase.market.requestStorage(request) diff --git a/codex/purchasing/states/started.nim b/codex/purchasing/states/started.nim index 27d28ddf..e526a90f 100644 --- a/codex/purchasing/states/started.nim +++ b/codex/purchasing/states/started.nim @@ -2,13 +2,14 @@ import ../statemachine import ./errorhandling import ./finished import ./failed +import ../../asyncyeah type PurchaseStarted* = ref object of ErrorHandlingState method `$`*(state: PurchaseStarted): string = "started" -method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.async.} = +method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.asyncyeah.} = let purchase = Purchase(machine) let clock = purchase.clock diff --git a/codex/purchasing/states/submitted.nim b/codex/purchasing/states/submitted.nim index 5e6dd892..7575a11c 100644 --- a/codex/purchasing/states/submitted.nim +++ b/codex/purchasing/states/submitted.nim @@ -2,19 +2,20 @@ import ../statemachine import ./errorhandling import ./started import ./cancelled +import ../../asyncyeah type PurchaseSubmitted* = ref object of ErrorHandlingState method `$`*(state: PurchaseSubmitted): string = "submitted" -method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.} = +method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.asyncyeah.} = let purchase = Purchase(machine) let request = !purchase.request let market = purchase.market let clock = purchase.clock - proc wait {.async.} = + proc wait {.asyncyeah.} = let done = newFuture[void]() proc callback(_: RequestId) = done.complete() @@ -22,7 +23,7 @@ method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async. await done await subscription.unsubscribe() - proc withTimeout(future: Future[void]) {.async.} = + proc withTimeout(future: Future[void]) {.asyncyeah.} = let expiry = request.expiry.truncate(int64) await future.withTimeout(clock, expiry) diff --git a/codex/purchasing/states/unknown.nim b/codex/purchasing/states/unknown.nim index 38628334..3e276f85 100644 --- a/codex/purchasing/states/unknown.nim +++ b/codex/purchasing/states/unknown.nim @@ -5,13 +5,14 @@ import ./started import ./cancelled import ./finished import ./failed +import ../../asyncyeah type PurchaseUnknown* = ref object of ErrorHandlingState method `$`*(state: PurchaseUnknown): string = "unknown" -method run*(state: PurchaseUnknown, machine: Machine): Future[?State] {.async.} = +method run*(state: PurchaseUnknown, machine: Machine): Future[?State] {.asyncyeah.} = let purchase = Purchase(machine) if (request =? await purchase.market.getRequest(purchase.requestId)) and (requestState =? await purchase.market.requestState(purchase.requestId)): diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 1a2d69f2..2440f8a5 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -18,6 +18,7 @@ import pkg/questionable import pkg/questionable/results import pkg/chronicles import pkg/chronos +import ../asyncyeah import pkg/presto import pkg/libp2p import pkg/stew/base10 diff --git a/codex/rest/coders.nim b/codex/rest/coders.nim index 66b41ee3..c6047cd9 100644 --- a/codex/rest/coders.nim +++ b/codex/rest/coders.nim @@ -11,6 +11,7 @@ import std/sugar import pkg/presto import pkg/chronos +import ../asyncyeah import pkg/libp2p import pkg/stew/base10 import pkg/stew/byteutils diff --git a/codex/sales.nim b/codex/sales.nim index 8efeaf61..23ee98cf 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -15,6 +15,8 @@ import ./sales/salesagent import ./sales/statemachine import ./sales/states/downloading import ./sales/states/unknown +import ./asyncyeah + ## Sales holds a list of available storage that it may sell. ## @@ -100,7 +102,7 @@ proc handleRequest(sales: Sales, agent.start(SaleDownloading()) sales.agents.add agent -proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = +proc mySlots*(sales: Sales): Future[seq[Slot]] {.asyncyeah.} = let market = sales.context.market let slotIds = await market.mySlots() var slots: seq[Slot] = @[] @@ -111,7 +113,7 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = return slots -proc load*(sales: Sales) {.async.} = +proc load*(sales: Sales) {.asyncyeah.} = let slots = await sales.mySlots() for slot in slots: @@ -123,7 +125,7 @@ proc load*(sales: Sales) {.async.} = agent.start(SaleUnknown()) sales.agents.add agent -proc start*(sales: Sales) {.async.} = +proc start*(sales: Sales) {.asyncyeah.} = doAssert sales.subscription.isNone, "Sales already started" proc onRequest(requestId: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} = @@ -134,7 +136,7 @@ proc start*(sales: Sales) {.async.} = except CatchableError as e: error "Unable to start sales", msg = e.msg -proc stop*(sales: Sales) {.async.} = +proc stop*(sales: Sales) {.asyncyeah.} = if subscription =? sales.subscription: sales.subscription = market.Subscription.none try: diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 86aacd64..60921a2d 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -10,6 +10,7 @@ import std/typetraits import pkg/chronos +import ../asyncyeah import pkg/chronicles import pkg/upraises import pkg/json_serialization @@ -121,7 +122,7 @@ func hasAvailable*(self: Reservations, bytes: uint): bool = proc exists*( self: Reservations, - id: AvailabilityId): Future[?!bool] {.async.} = + id: AvailabilityId): Future[?!bool] {.asyncyeah.} = without key =? id.key, err: return failure(err) @@ -131,7 +132,7 @@ proc exists*( proc get*( self: Reservations, - id: AvailabilityId): Future[?!Availability] {.async.} = + id: AvailabilityId): Future[?!Availability] {.asyncyeah.} = if exists =? (await self.exists(id)) and not exists: let err = newException(AvailabilityGetFailedError, @@ -151,7 +152,7 @@ proc get*( proc update( self: Reservations, - availability: Availability): Future[?!void] {.async.} = + availability: Availability): Future[?!void] {.asyncyeah.} = trace "updating availability", id = availability.id, size = availability.size, used = availability.used @@ -168,7 +169,7 @@ proc update( proc delete( self: Reservations, - id: AvailabilityId): Future[?!void] {.async.} = + id: AvailabilityId): Future[?!void] {.asyncyeah.} = trace "deleting availability", id @@ -185,7 +186,7 @@ proc delete( proc reserve*( self: Reservations, - availability: Availability): Future[?!void] {.async.} = + availability: Availability): Future[?!void] {.asyncyeah.} = if exists =? (await self.exists(availability.id)) and exists: let err = newException(AvailabilityAlreadyExistsError, @@ -215,7 +216,7 @@ proc reserve*( proc release*( self: Reservations, id: AvailabilityId, - bytes: uint): Future[?!void] {.async.} = + bytes: uint): Future[?!void] {.asyncyeah.} = trace "releasing bytes and updating availability", bytes, id @@ -254,7 +255,7 @@ proc release*( proc markUsed*( self: Reservations, - id: AvailabilityId): Future[?!void] {.async.} = + id: AvailabilityId): Future[?!void] {.asyncyeah.} = without var availability =? (await self.get(id)), err: return failure(err) @@ -267,7 +268,7 @@ proc markUsed*( proc markUnused*( self: Reservations, - id: AvailabilityId): Future[?!void] {.async.} = + id: AvailabilityId): Future[?!void] {.asyncyeah.} = without var availability =? (await self.get(id)), err: return failure(err) @@ -283,7 +284,7 @@ iterator items*(self: AvailabilityIter): Future[?Availability] = yield self.next() proc availabilities*( - self: Reservations): Future[?!AvailabilityIter] {.async.} = + self: Reservations): Future[?!AvailabilityIter] {.asyncyeah.} = var iter = AvailabilityIter() let query = Query.init(ReservationsKey) @@ -291,7 +292,7 @@ proc availabilities*( without results =? await self.repo.metaDs.query(query), err: return failure(err) - proc next(): Future[?Availability] {.async.} = + proc next(): Future[?Availability] {.asyncyeah.} = await idleAsync() iter.finished = results.finished if not results.finished and @@ -306,7 +307,7 @@ proc availabilities*( iter.next = next return success iter -proc unused*(r: Reservations): Future[?!seq[Availability]] {.async.} = +proc unused*(r: Reservations): Future[?!seq[Availability]] {.asyncyeah.} = var ret: seq[Availability] = @[] without availabilities =? (await r.availabilities), err: @@ -321,7 +322,7 @@ proc unused*(r: Reservations): Future[?!seq[Availability]] {.async.} = proc find*( self: Reservations, size, duration, minPrice: UInt256, collateral: UInt256, - used: bool): Future[?Availability] {.async.} = + used: bool): Future[?Availability] {.asyncyeah.} = without availabilities =? (await self.availabilities), err: diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 7a1f7876..4c7af8df 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -1,4 +1,5 @@ import pkg/chronos +import ../asyncyeah import pkg/chronicles import pkg/stint import ../contracts/requests @@ -33,18 +34,18 @@ proc newSalesAgent*(context: SalesContext, slotIndex: slotIndex, request: request)) -proc retrieveRequest*(agent: SalesAgent) {.async.} = +proc retrieveRequest*(agent: SalesAgent) {.asyncyeah.} = let data = agent.data let market = agent.context.market if data.request.isNone: data.request = await market.getRequest(data.requestId) -proc subscribeCancellation(agent: SalesAgent) {.async.} = +proc subscribeCancellation(agent: SalesAgent) {.asyncyeah.} = let data = agent.data let market = agent.context.market let clock = agent.context.clock - proc onCancelled() {.async.} = + proc onCancelled() {.asyncyeah.} = without request =? data.request: return @@ -61,7 +62,7 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} = data.fulfilled = await market.subscribeFulfillment(data.requestId, onFulfilled) -proc subscribeFailure(agent: SalesAgent) {.async.} = +proc subscribeFailure(agent: SalesAgent) {.asyncyeah.} = let data = agent.data let market = agent.context.market @@ -74,7 +75,7 @@ proc subscribeFailure(agent: SalesAgent) {.async.} = data.failed = await market.subscribeRequestFailed(data.requestId, onFailed) -proc subscribeSlotFilled(agent: SalesAgent) {.async.} = +proc subscribeSlotFilled(agent: SalesAgent) {.asyncyeah.} = let data = agent.data let market = agent.context.market @@ -87,7 +88,7 @@ proc subscribeSlotFilled(agent: SalesAgent) {.async.} = data.slotIndex, onSlotFilled) -proc subscribe*(agent: SalesAgent) {.async.} = +proc subscribe*(agent: SalesAgent) {.asyncyeah.} = if agent.subscribed: return @@ -96,7 +97,7 @@ proc subscribe*(agent: SalesAgent) {.async.} = await agent.subscribeSlotFilled() agent.subscribed = true -proc unsubscribe*(agent: SalesAgent) {.async.} = +proc unsubscribe*(agent: SalesAgent) {.asyncyeah.} = if not agent.subscribed: return @@ -125,6 +126,6 @@ proc unsubscribe*(agent: SalesAgent) {.async.} = agent.subscribed = false -proc stop*(agent: SalesAgent) {.async.} = +proc stop*(agent: SalesAgent) {.asyncyeah.} = procCall Machine(agent).stop() await agent.unsubscribe() diff --git a/codex/sales/salesdata.nim b/codex/sales/salesdata.nim index d8226877..b98a10d0 100644 --- a/codex/sales/salesdata.nim +++ b/codex/sales/salesdata.nim @@ -1,4 +1,5 @@ import pkg/chronos +import ../asyncyeah import ../contracts/requests import ../market import ./reservations diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim index fd2b7caa..5c8647d4 100644 --- a/codex/sales/states/cancelled.nim +++ b/codex/sales/states/cancelled.nim @@ -1,6 +1,7 @@ import ../statemachine import ./errorhandling import ./errored +import ../../asyncyeah type SaleCancelled* = ref object of ErrorHandlingState @@ -9,6 +10,6 @@ type method `$`*(state: SaleCancelled): string = "SaleCancelled" -method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} = +method run*(state: SaleCancelled, machine: Machine): Future[?State] {.asyncyeah.} = let error = newException(SaleTimeoutError, "Sale cancelled due to timeout") return some State(SaleErrored(error: error)) diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index efae5e25..a582518f 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -12,6 +12,7 @@ import ./filled import ./ignored import ./proving import ./errored +import ../../asyncyeah type SaleDownloading* = ref object of ErrorHandlingState @@ -31,7 +32,7 @@ method onSlotFilled*(state: SaleDownloading, requestId: RequestId, slotIndex: UInt256): ?State = return some State(SaleFilled()) -method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} = +method run*(state: SaleDownloading, machine: Machine): Future[?State] {.asyncyeah.} = let agent = SalesAgent(machine) let data = agent.data let context = agent.context @@ -66,7 +67,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} if markUsedErr =? (await reservations.markUsed(availability.id)).errorOption: return some State(SaleErrored(error: markUsedErr)) - proc onBatch(blocks: seq[bt.Block]) {.async.} = + proc onBatch(blocks: seq[bt.Block]) {.asyncyeah.} = # release batches of blocks as they are written to disk and # update availability size var bytes: uint = 0 diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 78dbcb6c..36a6b513 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -4,6 +4,7 @@ import pkg/upraises import pkg/chronicles import ../statemachine import ../salesagent +import ../../asyncyeah logScope: topics = "marketplace sales errored" @@ -16,7 +17,7 @@ method `$`*(state: SaleErrored): string = "SaleErrored" method onError*(state: SaleState, err: ref CatchableError): ?State {.upraises:[].} = error "error during SaleErrored run", error = err.msg -method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = +method run*(state: SaleErrored, machine: Machine): Future[?State] {.asyncyeah.} = let agent = SalesAgent(machine) let data = agent.data let context = agent.context diff --git a/codex/sales/states/errorhandling.nim b/codex/sales/states/errorhandling.nim index 01711540..4237bdb7 100644 --- a/codex/sales/states/errorhandling.nim +++ b/codex/sales/states/errorhandling.nim @@ -1,6 +1,7 @@ import pkg/questionable import ../statemachine import ./errored +import ../../asyncyeah type ErrorHandlingState* = ref object of SaleState diff --git a/codex/sales/states/failed.nim b/codex/sales/states/failed.nim index a6a710c0..e09c499a 100644 --- a/codex/sales/states/failed.nim +++ b/codex/sales/states/failed.nim @@ -1,6 +1,7 @@ import ../statemachine import ./errorhandling import ./errored +import ../../asyncyeah type SaleFailed* = ref object of ErrorHandlingState @@ -8,6 +9,6 @@ type method `$`*(state: SaleFailed): string = "SaleFailed" -method run*(state: SaleFailed, machine: Machine): Future[?State] {.async.} = +method run*(state: SaleFailed, machine: Machine): Future[?State] {.asyncyeah.} = let error = newException(SaleFailedError, "Sale failed") return some State(SaleErrored(error: error)) diff --git a/codex/sales/states/filled.nim b/codex/sales/states/filled.nim index c5318cb2..c58ffa32 100644 --- a/codex/sales/states/filled.nim +++ b/codex/sales/states/filled.nim @@ -6,6 +6,7 @@ import ./errored import ./finished import ./cancelled import ./failed +import ../../asyncyeah type SaleFilled* = ref object of ErrorHandlingState @@ -19,7 +20,7 @@ method onFailed*(state: SaleFilled, request: StorageRequest): ?State = method `$`*(state: SaleFilled): string = "SaleFilled" -method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} = +method run*(state: SaleFilled, machine: Machine): Future[?State] {.asyncyeah.} = let data = SalesAgent(machine).data let market = SalesAgent(machine).context.market diff --git a/codex/sales/states/filling.nim b/codex/sales/states/filling.nim index 266f65e4..b69ba4cb 100644 --- a/codex/sales/states/filling.nim +++ b/codex/sales/states/filling.nim @@ -6,6 +6,7 @@ import ./errorhandling import ./filled import ./cancelled import ./failed +import ../../asyncyeah logScope: topics = "marketplace sales filling" @@ -26,7 +27,7 @@ method onSlotFilled*(state: SaleFilling, requestId: RequestId, slotIndex: UInt256): ?State = return some State(SaleFilled()) -method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} = +method run(state: SaleFilling, machine: Machine): Future[?State] {.asyncyeah.} = let data = SalesAgent(machine).data let market = SalesAgent(machine).context.market without (collateral =? data.request.?ask.?collateral): diff --git a/codex/sales/states/finished.nim b/codex/sales/states/finished.nim index acbe548c..cdfa063d 100644 --- a/codex/sales/states/finished.nim +++ b/codex/sales/states/finished.nim @@ -5,6 +5,7 @@ import ../salesagent import ./errorhandling import ./cancelled import ./failed +import ../../asyncyeah logScope: topics = "marketplace sales finished" @@ -20,7 +21,7 @@ method onCancelled*(state: SaleFinished, request: StorageRequest): ?State = method onFailed*(state: SaleFinished, request: StorageRequest): ?State = return some State(SaleFailed()) -method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = +method run*(state: SaleFinished, machine: Machine): Future[?State] {.asyncyeah.} = let agent = SalesAgent(machine) let data = agent.data let context = agent.context diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index c6f35e42..dfa818cf 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -2,13 +2,14 @@ import pkg/chronos import ../statemachine import ../salesagent import ./errorhandling +import ../../asyncyeah type SaleIgnored* = ref object of ErrorHandlingState method `$`*(state: SaleIgnored): string = "SaleIgnored" -method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} = +method run*(state: SaleIgnored, machine: Machine): Future[?State] {.asyncyeah.} = let agent = SalesAgent(machine) let context = agent.context diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index 242852f9..a7aa7f7c 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -6,6 +6,7 @@ import ./filling import ./cancelled import ./failed import ./filled +import ../../asyncyeah logScope: topics = "marketplace sales proving" @@ -25,7 +26,7 @@ method onSlotFilled*(state: SaleProving, requestId: RequestId, slotIndex: UInt256): ?State = return some State(SaleFilled()) -method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} = +method run*(state: SaleProving, machine: Machine): Future[?State] {.asyncyeah.} = let data = SalesAgent(machine).data let context = SalesAgent(machine).context diff --git a/codex/sales/states/unknown.nim b/codex/sales/states/unknown.nim index 6cf4164e..7240c9e6 100644 --- a/codex/sales/states/unknown.nim +++ b/codex/sales/states/unknown.nim @@ -5,6 +5,7 @@ import ./finished import ./failed import ./errored import ./cancelled +import ../../asyncyeah type SaleUnknown* = ref object of SaleState @@ -19,7 +20,7 @@ method onCancelled*(state: SaleUnknown, request: StorageRequest): ?State = method onFailed*(state: SaleUnknown, request: StorageRequest): ?State = return some State(SaleFailed()) -method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} = +method run*(state: SaleUnknown, machine: Machine): Future[?State] {.asyncyeah.} = let agent = SalesAgent(machine) let data = agent.data let market = agent.context.market diff --git a/codex/storageproofs/por/por.nim b/codex/storageproofs/por/por.nim index a4ead3c3..712345b5 100644 --- a/codex/storageproofs/por/por.nim +++ b/codex/storageproofs/por/por.nim @@ -80,6 +80,7 @@ import std/endians import pkg/chronos +import ../asyncyeah import pkg/blscurve import pkg/blscurve/blst/blst_abi @@ -152,7 +153,7 @@ proc getSector( stream: SeekableStream, blockId: int64, sectorId: int64, - spb: int64): Future[ZChar] {.async.} = + spb: int64): Future[ZChar] {.asyncyeah.} = ## Read file sector at given postion ## @@ -268,7 +269,7 @@ proc generateAuthenticatorOpt( i: int64, s: int64, t: TauZero, - ubase: seq[blst_scalar]): Future[blst_p1] {.async.} = + ubase: seq[blst_scalar]): Future[blst_p1] {.asyncyeah.} = ## Optimized implementation of authenticator generation ## This implementation is reduces the number of scalar multiplications ## from s+1 to 1+1 , using knowledge about the scalars (r_j) @@ -325,7 +326,7 @@ proc generateProof*( q: seq[QElement], authenticators: seq[blst_p1], s: int64 -): Future[Proof] {.async.} = +): Future[Proof] {.asyncyeah.} = ## Generata BLS proofs for a given query ## @@ -438,7 +439,7 @@ proc init*( ssk: SecretKey, spk: PublicKey, blockSize: int64 -): Future[PoR] {.async.} = +): Future[PoR] {.asyncyeah.} = ## Set up the POR scheme by generating tags and metadata ## diff --git a/codex/storageproofs/storageproofs.nim b/codex/storageproofs/storageproofs.nim index aec6532b..251b980d 100644 --- a/codex/storageproofs/storageproofs.nim +++ b/codex/storageproofs/storageproofs.nim @@ -8,6 +8,7 @@ ## those terms. import pkg/chronos +import ../asyncyeah import pkg/chronicles import pkg/questionable import pkg/questionable/results @@ -36,7 +37,7 @@ proc upload*( cid: Cid, indexes: seq[int], host: ca.Address -): Future[?!void] {.async.} = +): Future[?!void] {.asyncyeah.} = ## Upload authenticators ## @@ -59,7 +60,7 @@ proc upload*( proc setupProofs*( self: StorageProofs, manifest: Manifest -): Future[?!void] {.async.} = +): Future[?!void] {.asyncyeah.} = ## Setup storage authentication ## @@ -89,7 +90,7 @@ proc init*( stpStore: stpStore, network: network) - proc tagsHandler(msg: TagsMessage) {.async, gcsafe.} = + proc tagsHandler(msg: TagsMessage) {.asyncyeah, gcsafe.} = try: await self.stpStore.store(msg.cid, msg.tags).tryGet() trace "Stored tags", cid = $msg.cid, tags = msg.tags.len diff --git a/codex/storageproofs/stpnetwork.nim b/codex/storageproofs/stpnetwork.nim index 5b0d1be8..9c703be2 100644 --- a/codex/storageproofs/stpnetwork.nim +++ b/codex/storageproofs/stpnetwork.nim @@ -10,6 +10,7 @@ import std/sequtils import pkg/chronos +import ../asyncyeah import pkg/libp2p import pkg/chronicles import pkg/questionable/results @@ -41,7 +42,7 @@ proc uploadTags*( indexes: seq[int], tags: seq[seq[byte]], host: ca.Address -): Future[?!void] {.async.} = +): Future[?!void] {.asyncyeah.} = # Upload tags to `host` # @@ -74,7 +75,7 @@ method init*(self: StpNetwork) = ## Perform protocol initialization ## - proc handle(conn: Connection, proto: string) {.async, gcsafe.} = + proc handle(conn: Connection, proto: string) {.asyncyeah, gcsafe.} = try: let msg = await conn.readLp(MaxMessageSize) diff --git a/codex/storageproofs/stpstore.nim b/codex/storageproofs/stpstore.nim index ff9ceb54..8ac34b0a 100644 --- a/codex/storageproofs/stpstore.nim +++ b/codex/storageproofs/stpstore.nim @@ -12,6 +12,7 @@ import std/strformat import pkg/libp2p import pkg/chronos +import ../asyncyeah import pkg/chronicles import pkg/stew/io2 import pkg/questionable @@ -34,7 +35,7 @@ template stpPath*(self: StpStore, cid: Cid): string = proc retrieve*( self: StpStore, cid: Cid -): Future[?!PorMessage] {.async.} = +): Future[?!PorMessage] {.asyncyeah.} = ## Retrieve authenticators from data store ## @@ -53,7 +54,7 @@ proc store*( self: StpStore, por: PorMessage, cid: Cid -): Future[?!void] {.async.} = +): Future[?!void] {.asyncyeah.} = ## Persist storage proofs ## @@ -79,7 +80,7 @@ proc retrieve*( self: StpStore, cid: Cid, blocks: seq[int] -): Future[?!seq[Tag]] {.async.} = +): Future[?!seq[Tag]] {.asyncyeah.} = var tags: seq[Tag] for b in blocks: var tag = Tag(idx: b) @@ -98,7 +99,7 @@ proc store*( self: StpStore, tags: seq[Tag], cid: Cid -): Future[?!void] {.async.} = +): Future[?!void] {.asyncyeah.} = let dir = self.stpPath(cid) @@ -124,7 +125,7 @@ proc init*( postfixLen: int = 2 ): StpStore = ## Init StpStore - ## + ## StpStore( authDir: authDir, postfixLen: postfixLen) diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 32ed91e8..4691f52a 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -12,6 +12,7 @@ import pkg/upraises push: {.upraises: [].} import pkg/chronos +import ../asyncyeah import pkg/libp2p import pkg/questionable import pkg/questionable/results @@ -81,7 +82,7 @@ method close*(self: BlockStore): Future[void] {.base.} = raiseAssert("Not implemented!") -proc contains*(self: BlockStore, blk: Cid): Future[bool] {.async.} = +proc contains*(self: BlockStore, blk: Cid): Future[bool] {.asyncyeah.} = ## Check if the block exists in the blockstore. ## Return false if error encountered ## diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index 1f7adb4a..892b1a33 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -15,6 +15,7 @@ import std/options import pkg/chronicles import pkg/chronos +import ../asyncyeah import pkg/libp2p import pkg/lrucache import pkg/questionable @@ -42,7 +43,7 @@ type const DefaultCacheSize*: NBytes = 5.MiBs -method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = +method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.asyncyeah.} = ## Get a block from the stores ## @@ -61,7 +62,7 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = trace "Error requesting block from cache", cid, error = exc.msg return failure exc -method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} = +method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.asyncyeah.} = ## Check if the block exists in the blockstore ## @@ -80,7 +81,7 @@ func cids(self: CacheStore): (iterator: Cid {.gcsafe.}) = method listBlocks*( self: CacheStore, blockType = BlockType.Manifest -): Future[?!BlocksIter] {.async.} = +): Future[?!BlocksIter] {.asyncyeah.} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## @@ -90,7 +91,7 @@ method listBlocks*( let cids = self.cids() - proc next(): Future[?Cid] {.async.} = + proc next(): Future[?Cid] {.asyncyeah.} = await idleAsync() var cid: Cid @@ -155,7 +156,7 @@ func putBlockSync(self: CacheStore, blk: Block): bool = method putBlock*( self: CacheStore, blk: Block, - ttl = Duration.none): Future[?!void] {.async.} = + ttl = Duration.none): Future[?!void] {.asyncyeah.} = ## Put a block to the blockstore ## @@ -167,7 +168,7 @@ method putBlock*( discard self.putBlockSync(blk) return success() -method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} = +method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.asyncyeah.} = ## Delete a block from the blockstore ## @@ -182,7 +183,7 @@ method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} = return success() -method close*(self: CacheStore): Future[void] {.async.} = +method close*(self: CacheStore): Future[void] {.asyncyeah.} = ## Close the blockstore, a no-op for this implementation ## @@ -195,9 +196,9 @@ proc new*( chunkSize: NBytes = DefaultChunkSize ): CacheStore {.raises: [Defect, ValueError].} = ## Create a new CacheStore instance - ## + ## ## `cacheSize` and `chunkSize` are both in bytes - ## + ## if cacheSize < chunkSize: raise newException(ValueError, "cacheSize cannot be less than chunkSize") diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index 061228f2..481105b6 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -11,6 +11,7 @@ ## Looks for and removes expired blocks from blockstores. import pkg/chronos +import ../asyncyeah import pkg/chronicles import pkg/questionable import pkg/questionable/results @@ -54,7 +55,7 @@ proc new*( clock: clock, offset: 0) -proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.} = +proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.asyncyeah.} = if isErr (await self.repoStore.delBlock(cid)): trace "Unable to delete block from repoStore" @@ -86,7 +87,7 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.asyncyeah.} = self.offset = 0 proc start*(self: BlockMaintainer) = - proc onTimer(): Future[void] {.async.} = + proc onTimer(): Future[void] {.asyncyeah.} = try: await self.runBlockCheck() except CatchableError as exc: @@ -94,5 +95,5 @@ proc start*(self: BlockMaintainer) = self.timer.start(onTimer, self.interval) -proc stop*(self: BlockMaintainer): Future[void] {.async.} = +proc stop*(self: BlockMaintainer): Future[void] {.asyncyeah.} = await self.timer.stop() diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index e31fbbd2..6da7e27a 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -13,6 +13,7 @@ push: {.upraises: [].} import pkg/chronicles import pkg/chronos +import ../asyncyeah import pkg/libp2p import ../blocktype as bt @@ -31,7 +32,7 @@ type engine*: BlockExcEngine # blockexc decision engine localStore*: BlockStore # local block store -method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.async.} = +method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.asyncyeah.} = ## Get a block from a remote peer ## @@ -50,7 +51,7 @@ method putBlock*( self: NetworkStore, blk: bt.Block, ttl = Duration.none -): Future[?!void] {.async.} = +): Future[?!void] {.asyncyeah.} = ## Store block locally and notify the network ## @@ -72,14 +73,14 @@ method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] = {.pop.} -method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} = +method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.asyncyeah.} = ## Check if the block exists in the blockstore ## trace "Checking network store for block existence", cid return await self.localStore.hasBlock(cid) -method close*(self: NetworkStore): Future[void] {.async.} = +method close*(self: NetworkStore): Future[void] {.asyncyeah.} = ## Close the underlying local blockstore ## @@ -91,8 +92,8 @@ proc new*( engine: BlockExcEngine, localStore: BlockStore ): NetworkStore = - ## Create new instance of a NetworkStore - ## + ## Create new instance of a NetworkStore + ## NetworkStore( localStore: localStore, engine: engine) diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index 6e2731a4..bac9fb9e 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -12,6 +12,7 @@ import pkg/upraises push: {.upraises: [].} import pkg/chronos +import ../asyncyeah import pkg/chronicles import pkg/libp2p import pkg/questionable @@ -70,7 +71,7 @@ func available*(self: RepoStore): uint = func available*(self: RepoStore, bytes: uint): bool = return bytes < self.available() -method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = +method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.asyncyeah.} = ## Get a block from the blockstore ## @@ -109,7 +110,7 @@ method putBlock*( self: RepoStore, blk: Block, ttl = Duration.none -): Future[?!void] {.async.} = +): Future[?!void] {.asyncyeah.} = ## Put a block to the blockstore ## @@ -158,7 +159,7 @@ method putBlock*( self.quotaUsedBytes = used return success() -proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} = +proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.asyncyeah.} = let used = self.quotaUsedBytes - blk.data.len.uint if err =? (await self.metaDs.put( QuotaUsedKey, @@ -168,12 +169,12 @@ proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} self.quotaUsedBytes = used return success() -proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} = +proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.asyncyeah.} = without key =? createBlockExpirationMetadataKey(cid), err: return failure(err) return await self.metaDs.delete(key) -method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = +method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.asyncyeah.} = ## Delete a block from the blockstore ## @@ -197,7 +198,7 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = return success() -method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = +method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.asyncyeah.} = ## Check if the block exists in the blockstore ## @@ -210,7 +211,7 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = method listBlocks*( self: RepoStore, blockType = BlockType.Manifest -): Future[?!BlocksIter] {.async.} = +): Future[?!BlocksIter] {.asyncyeah.} = ## Get the list of blocks in the RepoStore. ## This is an intensive operation ## @@ -228,7 +229,7 @@ method listBlocks*( trace "Error querying cids in repo", blockType, err = err.msg return failure(err) - proc next(): Future[?Cid] {.async.} = + proc next(): Future[?Cid] {.asyncyeah.} = await idleAsync() iter.finished = queryIter.finished if not queryIter.finished: @@ -249,9 +250,9 @@ method getBlockExpirations*( self: RepoStore, maxNumber: int, offset: int -): Future[?!BlockExpirationIter] {.async, base.} = +): Future[?!BlockExpirationIter] {.asyncyeah, base.} = ## Get block experiartions from the given RepoStore - ## + ## without query =? createBlockExpirationQuery(maxNumber, offset), err: trace "Unable to format block expirations query" return failure(err) @@ -262,7 +263,7 @@ method getBlockExpirations*( var iter = BlockExpirationIter() - proc next(): Future[?BlockExpiration] {.async.} = + proc next(): Future[?BlockExpiration] {.asyncyeah.} = if not queryIter.finished: if pair =? (await queryIter.next()) and blockKey =? pair.key: let expirationTimestamp = pair.data @@ -281,14 +282,14 @@ method getBlockExpirations*( iter.next = next return success iter -method close*(self: RepoStore): Future[void] {.async.} = +method close*(self: RepoStore): Future[void] {.asyncyeah.} = ## Close the blockstore, cleaning up resources managed by it. ## For some implementations this may be a no-op ## (await self.repoDs.close()).expect("Should close datastore") -proc hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = +proc hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.asyncyeah.} = ## Check if the block exists in the blockstore. ## Return false if error encountered ## @@ -299,7 +300,7 @@ proc hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = return await self.repoDs.has(key) -proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = +proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.asyncyeah.} = ## Reserve bytes ## @@ -322,7 +323,7 @@ proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = return success() -proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = +proc release*(self: RepoStore, bytes: uint): Future[?!void] {.asyncyeah.} = ## Release bytes ## @@ -348,7 +349,7 @@ proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = trace "Released bytes", bytes return success() -proc start*(self: RepoStore): Future[void] {.async.} = +proc start*(self: RepoStore): Future[void] {.asyncyeah.} = ## Start repo ## @@ -388,7 +389,7 @@ proc start*(self: RepoStore): Future[void] {.async.} = self.started = true -proc stop*(self: RepoStore): Future[void] {.async.} = +proc stop*(self: RepoStore): Future[void] {.asyncyeah.} = ## Stop repo ## if not self.started: @@ -410,8 +411,8 @@ func new*( quotaMaxBytes = DefaultQuotaBytes, blockTtl = DefaultBlockTtl ): RepoStore = - ## Create new instance of a RepoStore - ## + ## Create new instance of a RepoStore + ## RepoStore( repoDs: repoDs, metaDs: metaDs, diff --git a/codex/streams/asyncstreamwrapper.nim b/codex/streams/asyncstreamwrapper.nim index f2491e8d..6719c5a5 100644 --- a/codex/streams/asyncstreamwrapper.nim +++ b/codex/streams/asyncstreamwrapper.nim @@ -11,6 +11,7 @@ import pkg/upraises push: {.upraises: [].} import pkg/chronos +import ../asyncyeah import pkg/chronicles import pkg/libp2p @@ -37,7 +38,7 @@ proc new*( writer: AsyncStreamWriter = nil ): AsyncStreamWrapper = ## Create new instance of an asynchronous stream wrapper - ## + ## let stream = C(reader: reader, writer: writer) @@ -63,7 +64,7 @@ method readOnce*( self: AsyncStreamWrapper, pbytes: pointer, nbytes: int -): Future[int] {.async.} = +): Future[int] {.asyncyeah.} = trace "Reading bytes from reader", bytes = nbytes if isNil(self.reader): @@ -80,13 +81,13 @@ proc completeWrite( self: AsyncStreamWrapper, fut: Future[void], msgLen: int -): Future[void] {.async.} = +): Future[void] {.asyncyeah.} = withExceptions: await fut method write*(self: AsyncStreamWrapper, msg: seq[byte]): Future[void] = - # Avoid a copy of msg being kept in the closure created by `{.async.}` as this + # Avoid a copy of msg being kept in the closure created by `{.asyncyeah.}` as this # drives up memory usage trace "Writing bytes to writer", bytes = msg.len @@ -117,7 +118,7 @@ method closed*(self: AsyncStreamWrapper): bool = method atEof*(self: AsyncStreamWrapper): bool = self.reader.atEof() -method closeImpl*(self: AsyncStreamWrapper) {.async.} = +method closeImpl*(self: AsyncStreamWrapper) {.asyncyeah.} = try: trace "Shutting down async chronos stream" if not self.closed(): diff --git a/codex/streams/seekablestream.nim b/codex/streams/seekablestream.nim index 54c13380..31b3deda 100644 --- a/codex/streams/seekablestream.nim +++ b/codex/streams/seekablestream.nim @@ -9,6 +9,7 @@ import pkg/libp2p import pkg/chronos +import ../asyncyeah import pkg/chronicles export libp2p, chronos, chronicles diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 0f174aee..4cfba557 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -15,6 +15,7 @@ push: {.upraises: [].} import pkg/libp2p import pkg/chronos +import ../asyncyeah import pkg/chronicles import pkg/stew/ptrops @@ -53,7 +54,7 @@ proc new*( pad = true ): StoreStream = ## Create a new StoreStream instance for a given store and manifest - ## + ## result = StoreStream( store: store, manifest: manifest, @@ -76,11 +77,11 @@ method readOnce*( self: StoreStream, pbytes: pointer, nbytes: int -): Future[int] {.async.} = +): Future[int] {.asyncyeah.} = ## Read `nbytes` from current position in the StoreStream into output buffer pointed by `pbytes`. ## Return how many bytes were actually read before EOF was encountered. ## Raise exception if we are already at EOF. - ## + ## trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.len if self.atEof: @@ -117,7 +118,7 @@ method readOnce*( return read -method closeImpl*(self: StoreStream) {.async.} = +method closeImpl*(self: StoreStream) {.asyncyeah.} = trace "Closing StoreStream" self.offset = self.size # set Eof await procCall LPStream(self).closeImpl() diff --git a/codex/utils.nim b/codex/utils.nim index 70547c5a..c122463e 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -6,11 +6,12 @@ ## at your option. ## This file may not be copied, modified, or distributed except according to ## those terms. -## +## import std/parseutils import pkg/chronos +import ./asyncyeah import ./utils/asyncheapqueue import ./utils/fileutils diff --git a/codex/utils/asyncheapqueue.nim b/codex/utils/asyncheapqueue.nim index 17ca1f78..919a86a1 100644 --- a/codex/utils/asyncheapqueue.nim +++ b/codex/utils/asyncheapqueue.nim @@ -9,6 +9,7 @@ import std/sequtils import pkg/chronos +import ../asyncyeah import pkg/stew/results # Based on chronos AsyncHeapQueue and std/heapqueue @@ -136,7 +137,7 @@ proc pushNoWait*[T](heap: AsyncHeapQueue[T], item: T): Result[void, AsyncHQError return ok() -proc push*[T](heap: AsyncHeapQueue[T], item: T) {.async, gcsafe.} = +proc push*[T](heap: AsyncHeapQueue[T], item: T) {.asyncyeah, gcsafe.} = ## Push item into the queue, awaiting for an available slot ## when it's full ## @@ -171,7 +172,7 @@ proc popNoWait*[T](heap: AsyncHeapQueue[T]): Result[T, AsyncHQErrors] = heap.putters.wakeupNext() -proc pop*[T](heap: AsyncHeapQueue[T]): Future[T] {.async.} = +proc pop*[T](heap: AsyncHeapQueue[T]): Future[T] {.asyncyeah.} = ## Remove and return an ``item`` from the beginning of the queue ``heap``. ## If the queue is empty, wait until an item is available. while heap.empty(): @@ -234,7 +235,7 @@ proc pushOrUpdateNoWait*[T](heap: AsyncHeapQueue[T], item: T): Result[void, Asyn return heap.pushNoWait(item) -proc pushOrUpdate*[T](heap: AsyncHeapQueue[T], item: T) {.async.} = +proc pushOrUpdate*[T](heap: AsyncHeapQueue[T], item: T) {.asyncyeah.} = ## Update an item if it exists or push a new one ## awaiting until a slot becomes available ## diff --git a/codex/utils/asyncspawn.nim b/codex/utils/asyncspawn.nim index 6717e5e1..fd70735f 100644 --- a/codex/utils/asyncspawn.nim +++ b/codex/utils/asyncspawn.nim @@ -1,7 +1,8 @@ import pkg/chronos +import ../asyncyeah proc asyncSpawn*(future: Future[void], ignore: type CatchableError) = - proc ignoringError {.async.} = + proc ignoringError {.asyncyeah.} = try: await future except ignore: diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index 13392008..44fa2013 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -1,5 +1,6 @@ import pkg/questionable import pkg/chronos +import ../asyncyeah import pkg/chronicles import pkg/upraises @@ -42,7 +43,7 @@ proc schedule*(machine: Machine, event: Event) = except AsyncQueueFullError: raiseAssert "unlimited queue is full?!" -method run*(state: State, machine: Machine): Future[?State] {.base, async.} = +method run*(state: State, machine: Machine): Future[?State] {.base, asyncyeah.} = discard method onError*(state: State, error: ref CatchableError): ?State {.base.} = @@ -52,14 +53,14 @@ proc onError(machine: Machine, error: ref CatchableError): Event = return proc (state: State): ?State = state.onError(error) -proc run(machine: Machine, state: State) {.async.} = +proc run(machine: Machine, state: State) {.asyncyeah.} = try: if next =? await state.run(machine): machine.schedule(Event.transition(state, next)) except CancelledError: discard -proc scheduler(machine: Machine) {.async.} = +proc scheduler(machine: Machine) {.asyncyeah.} = proc onRunComplete(udata: pointer) {.gcsafe.} = var fut = cast[FutureBase](udata) if fut.failed(): diff --git a/codex/utils/timer.nim b/codex/utils/timer.nim index 4eefc599..4743c88a 100644 --- a/codex/utils/timer.nim +++ b/codex/utils/timer.nim @@ -11,6 +11,7 @@ ## Used to execute a callback in a loop import pkg/chronos +import ../asyncyeah import pkg/chronicles import pkg/upraises @@ -26,7 +27,7 @@ proc new*(T: type Timer, timerName = "Unnamed Timer"): Timer = ## Create a new Timer intance with the given name Timer(name: timerName) -proc timerLoop(timer: Timer) {.async.} = +proc timerLoop(timer: Timer) {.asyncyeah.} = try: while true: await timer.callback() @@ -44,7 +45,7 @@ method start*(timer: Timer, callback: TimerCallback, interval: Duration) {.base. timer.interval = interval timer.loopFuture = timerLoop(timer) -method stop*(timer: Timer) {.async, base.} = +method stop*(timer: Timer) {.asyncyeah, base.} = if timer.loopFuture != nil: trace "Timer stopping: ", name=timer.name await timer.loopFuture.cancelAndWait() diff --git a/codex/validation.nim b/codex/validation.nim index a3f69296..43b818ff 100644 --- a/codex/validation.nim +++ b/codex/validation.nim @@ -1,6 +1,7 @@ import std/sets import std/sequtils import pkg/chronos +import ./asyncyeah import pkg/chronicles import ./market import ./clock @@ -37,13 +38,13 @@ proc slots*(validation: Validation): seq[SlotId] = proc getCurrentPeriod(validation: Validation): UInt256 = return validation.periodicity.periodOf(validation.clock.now().u256) -proc waitUntilNextPeriod(validation: Validation) {.async.} = +proc waitUntilNextPeriod(validation: Validation) {.asyncyeah.} = let period = validation.getCurrentPeriod() let periodEnd = validation.periodicity.periodEnd(period) trace "Waiting until next period", currentPeriod = period await validation.clock.waitUntil(periodEnd.truncate(int64) + 1) -proc subscribeSlotFilled(validation: Validation) {.async.} = +proc subscribeSlotFilled(validation: Validation) {.asyncyeah.} = proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) = let slotId = slotId(requestId, slotIndex) if slotId notin validation.slots: @@ -53,7 +54,7 @@ proc subscribeSlotFilled(validation: Validation) {.async.} = let subscription = await validation.market.subscribeSlotFilled(onSlotFilled) validation.subscriptions.add(subscription) -proc removeSlotsThatHaveEnded(validation: Validation) {.async.} = +proc removeSlotsThatHaveEnded(validation: Validation) {.asyncyeah.} = var ended: HashSet[SlotId] for slotId in validation.slots: let state = await validation.market.slotState(slotId) @@ -64,7 +65,7 @@ proc removeSlotsThatHaveEnded(validation: Validation) {.async.} = proc markProofAsMissing(validation: Validation, slotId: SlotId, - period: Period) {.async.} = + period: Period) {.asyncyeah.} = logScope: currentPeriod = validation.getCurrentPeriod() @@ -78,12 +79,12 @@ proc markProofAsMissing(validation: Validation, except CatchableError as e: error "Marking proof as missing failed", msg = e.msg -proc markProofsAsMissing(validation: Validation) {.async.} = +proc markProofsAsMissing(validation: Validation) {.asyncyeah.} = for slotId in validation.slots: let previousPeriod = validation.getCurrentPeriod() - 1 await validation.markProofAsMissing(slotId, previousPeriod) -proc run(validation: Validation) {.async.} = +proc run(validation: Validation) {.asyncyeah.} = trace "Validation started" try: while true: @@ -96,13 +97,13 @@ proc run(validation: Validation) {.async.} = except CatchableError as e: error "Validation failed", msg = e.msg -proc start*(validation: Validation) {.async.} = +proc start*(validation: Validation) {.asyncyeah.} = validation.periodicity = await validation.market.periodicity() validation.proofTimeout = await validation.market.proofTimeout() await validation.subscribeSlotFilled() validation.running = validation.run() -proc stop*(validation: Validation) {.async.} = +proc stop*(validation: Validation) {.asyncyeah.} = await validation.running.cancelAndWait() while validation.subscriptions.len > 0: let subscription = validation.subscriptions.pop()