applies asyncyeah
This commit is contained in:
parent
ed4fd8104b
commit
6becf37c00
|
@ -10,6 +10,7 @@
|
|||
import std/sequtils
|
||||
|
||||
import pkg/chronos
|
||||
import ../../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p
|
||||
import pkg/metrics
|
||||
|
@ -64,7 +65,7 @@ type
|
|||
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
|
||||
advertiseType*: BlockType # Advertice blocks, manifests or both
|
||||
|
||||
proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
|
||||
proc discoveryQueueLoop(b: DiscoveryEngine) {.asyncyeah.} =
|
||||
while b.discEngineRunning:
|
||||
for cid in toSeq(b.pendingBlocks.wantList):
|
||||
try:
|
||||
|
@ -79,11 +80,11 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
|
|||
trace "About to sleep discovery loop"
|
||||
await sleepAsync(b.discoveryLoopSleep)
|
||||
|
||||
proc heartbeatLoop(b: DiscoveryEngine) {.async.} =
|
||||
proc heartbeatLoop(b: DiscoveryEngine) {.asyncyeah.} =
|
||||
while b.discEngineRunning:
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} =
|
||||
proc advertiseQueueLoop*(b: DiscoveryEngine) {.asyncyeah.} =
|
||||
while b.discEngineRunning:
|
||||
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
|
||||
for c in cids:
|
||||
|
@ -96,7 +97,7 @@ proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} =
|
|||
|
||||
trace "Exiting advertise task loop"
|
||||
|
||||
proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
|
||||
proc advertiseTaskLoop(b: DiscoveryEngine) {.asyncyeah.} =
|
||||
## Run advertise tasks
|
||||
##
|
||||
|
||||
|
@ -127,7 +128,7 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
|
|||
|
||||
trace "Exiting advertise task runner"
|
||||
|
||||
proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
|
||||
proc discoveryTaskLoop(b: DiscoveryEngine) {.asyncyeah.} =
|
||||
## Run discovery tasks
|
||||
##
|
||||
|
||||
|
@ -192,7 +193,7 @@ proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
|
|||
except CatchableError as exc:
|
||||
trace "Exception queueing discovery request", exc = exc.msg
|
||||
|
||||
proc start*(b: DiscoveryEngine) {.async.} =
|
||||
proc start*(b: DiscoveryEngine) {.asyncyeah.} =
|
||||
## Start the discengine task
|
||||
##
|
||||
|
||||
|
@ -213,7 +214,7 @@ proc start*(b: DiscoveryEngine) {.async.} =
|
|||
b.discoveryLoop = discoveryQueueLoop(b)
|
||||
b.heartbeatLoop = heartbeatLoop(b)
|
||||
|
||||
proc stop*(b: DiscoveryEngine) {.async.} =
|
||||
proc stop*(b: DiscoveryEngine) {.asyncyeah.} =
|
||||
## Stop the discovery engine
|
||||
##
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import std/options
|
|||
import std/algorithm
|
||||
|
||||
import pkg/chronos
|
||||
import ../../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p
|
||||
import pkg/stint
|
||||
|
@ -83,7 +84,7 @@ proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} =
|
|||
|
||||
proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.}
|
||||
|
||||
proc start*(b: BlockExcEngine) {.async.} =
|
||||
proc start*(b: BlockExcEngine) {.asyncyeah.} =
|
||||
## Start the blockexc task
|
||||
##
|
||||
|
||||
|
@ -98,7 +99,7 @@ proc start*(b: BlockExcEngine) {.async.} =
|
|||
for i in 0..<b.concurrentTasks:
|
||||
b.blockexcTasks.add(blockexcTaskRunner(b))
|
||||
|
||||
proc stop*(b: BlockExcEngine) {.async.} =
|
||||
proc stop*(b: BlockExcEngine) {.asyncyeah.} =
|
||||
## Stop the blockexc blockexc
|
||||
##
|
||||
|
||||
|
@ -121,7 +122,7 @@ proc stop*(b: BlockExcEngine) {.async.} =
|
|||
proc requestBlock*(
|
||||
b: BlockExcEngine,
|
||||
cid: Cid,
|
||||
timeout = DefaultBlockTimeout): Future[bt.Block] {.async.} =
|
||||
timeout = DefaultBlockTimeout): Future[bt.Block] {.asyncyeah.} =
|
||||
## Request a block from remotes
|
||||
##
|
||||
|
||||
|
@ -148,7 +149,7 @@ proc requestBlock*(
|
|||
let
|
||||
blockPeer = peers[0] # get cheapest
|
||||
|
||||
proc blockHandleMonitor() {.async.} =
|
||||
proc blockHandleMonitor() {.asyncyeah.} =
|
||||
try:
|
||||
trace "Monigoring block handle", cid
|
||||
b.pendingBlocks.setInFlight(cid, true)
|
||||
|
@ -199,7 +200,7 @@ proc requestBlock*(
|
|||
proc blockPresenceHandler*(
|
||||
b: BlockExcEngine,
|
||||
peer: PeerId,
|
||||
blocks: seq[BlockPresence]) {.async.} =
|
||||
blocks: seq[BlockPresence]) {.asyncyeah.} =
|
||||
## Handle block presence
|
||||
##
|
||||
|
||||
|
@ -250,7 +251,7 @@ proc blockPresenceHandler*(
|
|||
.filter do(cid: Cid) -> bool:
|
||||
not b.peers.anyIt( cid in it.peerHave ))
|
||||
|
||||
proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} =
|
||||
proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.asyncyeah.} =
|
||||
trace "Schedule a task for new blocks", items = blocks.len
|
||||
|
||||
let
|
||||
|
@ -270,7 +271,7 @@ proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} =
|
|||
|
||||
break # do next peer
|
||||
|
||||
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} =
|
||||
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.asyncyeah.} =
|
||||
## Resolve pending blocks from the pending blocks manager
|
||||
## and schedule any new task to be ran
|
||||
##
|
||||
|
@ -283,7 +284,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} =
|
|||
|
||||
proc payForBlocks(engine: BlockExcEngine,
|
||||
peer: BlockExcPeerCtx,
|
||||
blocks: seq[bt.Block]) {.async.} =
|
||||
blocks: seq[bt.Block]) {.asyncyeah.} =
|
||||
trace "Paying for blocks", blocks = blocks.len
|
||||
|
||||
let
|
||||
|
@ -297,7 +298,7 @@ proc payForBlocks(engine: BlockExcEngine,
|
|||
proc blocksHandler*(
|
||||
b: BlockExcEngine,
|
||||
peer: PeerId,
|
||||
blocks: seq[bt.Block]) {.async.} =
|
||||
blocks: seq[bt.Block]) {.asyncyeah.} =
|
||||
## handle incoming blocks
|
||||
##
|
||||
|
||||
|
@ -318,7 +319,7 @@ proc blocksHandler*(
|
|||
proc wantListHandler*(
|
||||
b: BlockExcEngine,
|
||||
peer: PeerId,
|
||||
wantList: Wantlist) {.async.} =
|
||||
wantList: Wantlist) {.asyncyeah.} =
|
||||
## Handle incoming want lists
|
||||
##
|
||||
|
||||
|
@ -386,7 +387,7 @@ proc wantListHandler*(
|
|||
proc accountHandler*(
|
||||
engine: BlockExcEngine,
|
||||
peer: PeerId,
|
||||
account: Account) {.async.} =
|
||||
account: Account) {.asyncyeah.} =
|
||||
let context = engine.peers.get(peer)
|
||||
if context.isNil:
|
||||
return
|
||||
|
@ -396,7 +397,7 @@ proc accountHandler*(
|
|||
proc paymentHandler*(
|
||||
engine: BlockExcEngine,
|
||||
peer: PeerId,
|
||||
payment: SignedState) {.async.} =
|
||||
payment: SignedState) {.asyncyeah.} =
|
||||
trace "Handling payments", peer
|
||||
|
||||
without context =? engine.peers.get(peer).option and
|
||||
|
@ -410,7 +411,7 @@ proc paymentHandler*(
|
|||
else:
|
||||
context.paymentChannel = engine.wallet.acceptChannel(payment).option
|
||||
|
||||
proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} =
|
||||
proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.asyncyeah.} =
|
||||
## Perform initial setup, such as want
|
||||
## list exchange
|
||||
##
|
||||
|
@ -439,7 +440,7 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerId) =
|
|||
# drop the peer from the peers table
|
||||
b.peers.remove(peer)
|
||||
|
||||
proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
||||
proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, asyncyeah.} =
|
||||
trace "Handling task for peer", peer = task.id
|
||||
|
||||
# Send to the peer blocks he wants to get,
|
||||
|
@ -483,7 +484,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
|||
)
|
||||
trace "Removed entries from peerWants", items = task.peerWants.len
|
||||
|
||||
proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
|
||||
proc blockexcTaskRunner(b: BlockExcEngine) {.asyncyeah.} =
|
||||
## process tasks
|
||||
##
|
||||
|
||||
|
@ -512,7 +513,7 @@ proc new*(
|
|||
peersPerRequest = DefaultMaxPeersPerRequest
|
||||
): BlockExcEngine =
|
||||
## Create new block exchange engine instance
|
||||
##
|
||||
##
|
||||
|
||||
let
|
||||
engine = BlockExcEngine(
|
||||
|
|
|
@ -15,6 +15,7 @@ push: {.upraises: [].}
|
|||
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import ../../asyncyeah
|
||||
import pkg/libp2p
|
||||
|
||||
import ../../blocktype
|
||||
|
@ -38,7 +39,7 @@ proc getWantHandle*(
|
|||
cid: Cid,
|
||||
timeout = DefaultBlockTimeout,
|
||||
inFlight = false
|
||||
): Future[Block] {.async.} =
|
||||
): Future[Block] {.asyncyeah.} =
|
||||
## Add an event for a block
|
||||
##
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ import std/sequtils
|
|||
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import ../../asyncyeah
|
||||
|
||||
import pkg/libp2p
|
||||
import pkg/libp2p/utils/semaphore
|
||||
|
@ -75,7 +76,7 @@ type
|
|||
getConn: ConnProvider
|
||||
inflightSema: AsyncSemaphore
|
||||
|
||||
proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} =
|
||||
proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.asyncyeah.} =
|
||||
## Send message to peer
|
||||
##
|
||||
|
||||
|
@ -92,7 +93,7 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} =
|
|||
proc handleWantList(
|
||||
b: BlockExcNetwork,
|
||||
peer: NetworkPeer,
|
||||
list: Wantlist) {.async.} =
|
||||
list: Wantlist) {.asyncyeah.} =
|
||||
## Handle incoming want list
|
||||
##
|
||||
|
||||
|
@ -110,8 +111,8 @@ proc makeWantList*(
|
|||
sendDontHave: bool = false
|
||||
): Wantlist =
|
||||
## make list of wanted entries
|
||||
##
|
||||
|
||||
##
|
||||
|
||||
Wantlist(
|
||||
entries: cids.mapIt(
|
||||
Entry(
|
||||
|
@ -150,7 +151,7 @@ proc handleBlocks(
|
|||
b: BlockExcNetwork,
|
||||
peer: NetworkPeer,
|
||||
blocks: seq[pb.Block]
|
||||
) {.async.} =
|
||||
) {.asyncyeah.} =
|
||||
## Handle incoming blocks
|
||||
##
|
||||
|
||||
|
@ -191,7 +192,7 @@ proc sendBlocks*(
|
|||
proc handleBlockPresence(
|
||||
b: BlockExcNetwork,
|
||||
peer: NetworkPeer,
|
||||
presence: seq[BlockPresence]) {.async.} =
|
||||
presence: seq[BlockPresence]) {.asyncyeah.} =
|
||||
## Handle block presence
|
||||
##
|
||||
|
||||
|
@ -212,7 +213,7 @@ proc handleAccount(
|
|||
network: BlockExcNetwork,
|
||||
peer: NetworkPeer,
|
||||
account: Account
|
||||
) {.async.} =
|
||||
) {.asyncyeah.} =
|
||||
## Handle account info
|
||||
##
|
||||
|
||||
|
@ -243,7 +244,7 @@ proc handlePayment(
|
|||
network: BlockExcNetwork,
|
||||
peer: NetworkPeer,
|
||||
payment: SignedState
|
||||
) {.async.} =
|
||||
) {.asyncyeah.} =
|
||||
## Handle payment
|
||||
##
|
||||
|
||||
|
@ -254,9 +255,9 @@ proc rpcHandler(
|
|||
b: BlockExcNetwork,
|
||||
peer: NetworkPeer,
|
||||
msg: Message
|
||||
) {.async.} =
|
||||
) {.asyncyeah.} =
|
||||
## handle rpc messages
|
||||
##
|
||||
##
|
||||
try:
|
||||
if msg.wantlist.entries.len > 0:
|
||||
asyncSpawn b.handleWantList(peer, msg.wantlist)
|
||||
|
@ -310,7 +311,7 @@ proc setupPeer*(b: BlockExcNetwork, peer: PeerId) =
|
|||
|
||||
discard b.getOrCreatePeer(peer)
|
||||
|
||||
proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} =
|
||||
proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.asyncyeah.} =
|
||||
await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address))
|
||||
|
||||
proc dropPeer*(b: BlockExcNetwork, peer: PeerId) =
|
||||
|
@ -323,7 +324,7 @@ method init*(b: BlockExcNetwork) =
|
|||
## Perform protocol initialization
|
||||
##
|
||||
|
||||
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =
|
||||
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.asyncyeah.} =
|
||||
if event.kind == PeerEventKind.Joined:
|
||||
b.setupPeer(peerId)
|
||||
else:
|
||||
|
@ -332,7 +333,7 @@ method init*(b: BlockExcNetwork) =
|
|||
b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
|
||||
b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
|
||||
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
proc handle(conn: Connection, proto: string) {.asyncyeah, gcsafe, closure.} =
|
||||
let peerId = conn.peerId
|
||||
let blockexcPeer = b.getOrCreatePeer(peerId)
|
||||
await blockexcPeer.readLoop(conn) # attach read loop
|
||||
|
|
|
@ -11,6 +11,7 @@ import pkg/upraises
|
|||
push: {.upraises: [].}
|
||||
|
||||
import pkg/chronos
|
||||
import ../../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p
|
||||
|
||||
|
@ -38,7 +39,7 @@ proc connected*(b: NetworkPeer): bool =
|
|||
not(isNil(b.sendConn)) and
|
||||
not(b.sendConn.closed or b.sendConn.atEof)
|
||||
|
||||
proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
|
||||
proc readLoop*(b: NetworkPeer, conn: Connection) {.asyncyeah.} =
|
||||
if isNil(conn):
|
||||
return
|
||||
|
||||
|
@ -54,7 +55,7 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
|
|||
finally:
|
||||
await conn.close()
|
||||
|
||||
proc connect*(b: NetworkPeer): Future[Connection] {.async.} =
|
||||
proc connect*(b: NetworkPeer): Future[Connection] {.asyncyeah.} =
|
||||
if b.connected:
|
||||
return b.sendConn
|
||||
|
||||
|
@ -62,7 +63,7 @@ proc connect*(b: NetworkPeer): Future[Connection] {.async.} =
|
|||
asyncSpawn b.readLoop(b.sendConn)
|
||||
return b.sendConn
|
||||
|
||||
proc send*(b: NetworkPeer, msg: Message) {.async.} =
|
||||
proc send*(b: NetworkPeer, msg: Message) {.asyncyeah.} =
|
||||
let conn = await b.connect()
|
||||
|
||||
if isNil(conn):
|
||||
|
@ -73,7 +74,7 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} =
|
|||
await conn.writeLp(protobufEncode(msg))
|
||||
|
||||
proc broadcast*(b: NetworkPeer, msg: Message) =
|
||||
proc sendAwaiter() {.async.} =
|
||||
proc sendAwaiter() {.asyncyeah.} =
|
||||
try:
|
||||
await b.send(msg)
|
||||
except CatchableError as exc:
|
||||
|
|
|
@ -13,6 +13,7 @@ import std/tables
|
|||
import pkg/chronicles
|
||||
import pkg/libp2p
|
||||
import pkg/chronos
|
||||
import ../../asyncyeah
|
||||
import pkg/nitro
|
||||
import pkg/questionable
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ import pkg/upraises
|
|||
push: {.upraises: [].}
|
||||
|
||||
import pkg/chronos
|
||||
import ../../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ import pkg/chronicles
|
|||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/chronos
|
||||
import ./asyncyeah
|
||||
import pkg/libp2p except shuffle
|
||||
|
||||
import ./blocktype
|
||||
|
@ -41,7 +42,7 @@ type
|
|||
FileChunker* = Chunker
|
||||
LPStreamChunker* = Chunker
|
||||
|
||||
proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} =
|
||||
proc getBytes*(c: Chunker): Future[seq[byte]] {.asyncyeah.} =
|
||||
## returns a chunk of bytes from
|
||||
## the instantiated chunker
|
||||
##
|
||||
|
@ -83,7 +84,7 @@ proc new*(
|
|||
##
|
||||
|
||||
proc reader(data: ChunkBuffer, len: int): Future[int]
|
||||
{.gcsafe, async, raises: [Defect].} =
|
||||
{.gcsafe, asyncyeah, raises: [Defect].} =
|
||||
var res = 0
|
||||
try:
|
||||
while res < len:
|
||||
|
@ -111,7 +112,7 @@ proc new*(
|
|||
##
|
||||
|
||||
proc reader(data: ChunkBuffer, len: int): Future[int]
|
||||
{.gcsafe, async, raises: [Defect].} =
|
||||
{.gcsafe, asyncyeah, raises: [Defect].} =
|
||||
var total = 0
|
||||
try:
|
||||
while total < len:
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import pkg/chronos
|
||||
import ./asyncyeah
|
||||
import pkg/stew/endians2
|
||||
import pkg/upraises
|
||||
|
||||
|
@ -10,12 +11,12 @@ type
|
|||
method now*(clock: Clock): SecondsSince1970 {.base, upraises: [].} =
|
||||
raiseAssert "not implemented"
|
||||
|
||||
method waitUntil*(clock: Clock, time: SecondsSince1970) {.base, async.} =
|
||||
method waitUntil*(clock: Clock, time: SecondsSince1970) {.base, asyncyeah.} =
|
||||
raiseAssert "not implemented"
|
||||
|
||||
proc withTimeout*(future: Future[void],
|
||||
clock: Clock,
|
||||
expiry: SecondsSince1970) {.async.} =
|
||||
expiry: SecondsSince1970) {.asyncyeah.} =
|
||||
let timeout = clock.waitUntil(expiry)
|
||||
try:
|
||||
await future or timeout
|
||||
|
|
|
@ -13,6 +13,7 @@ import std/tables
|
|||
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import ./asyncyeah
|
||||
import pkg/presto
|
||||
import pkg/libp2p
|
||||
import pkg/confutils
|
||||
|
@ -55,7 +56,7 @@ type
|
|||
proc bootstrapInteractions(
|
||||
config: CodexConf,
|
||||
repo: RepoStore
|
||||
): Future[Contracts] {.async.} =
|
||||
): Future[Contracts] {.asyncyeah.} =
|
||||
## bootstrap interactions and return contracts
|
||||
## using clients, hosts, validators pairings
|
||||
##
|
||||
|
@ -105,7 +106,7 @@ proc bootstrapInteractions(
|
|||
|
||||
return (client, host, validator)
|
||||
|
||||
proc start*(s: CodexServer) {.async.} =
|
||||
proc start*(s: CodexServer) {.asyncyeah.} =
|
||||
notice "Starting codex node"
|
||||
|
||||
await s.repoStore.start()
|
||||
|
@ -143,7 +144,7 @@ proc start*(s: CodexServer) {.async.} =
|
|||
s.runHandle = newFuture[void]("codex.runHandle")
|
||||
await s.runHandle
|
||||
|
||||
proc stop*(s: CodexServer) {.async.} =
|
||||
proc stop*(s: CodexServer) {.asyncyeah.} =
|
||||
notice "Stopping codex node"
|
||||
|
||||
await allFuturesThrowing(
|
||||
|
|
|
@ -18,6 +18,7 @@ import std/strutils
|
|||
import std/typetraits
|
||||
|
||||
import pkg/chronos
|
||||
import ./asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/chronicles/helpers
|
||||
import pkg/chronicles/topics_registry
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import std/times
|
||||
import pkg/ethers
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/stint
|
||||
import ../clock
|
||||
|
||||
|
@ -17,12 +18,12 @@ type
|
|||
proc new*(_: type OnChainClock, provider: Provider): OnChainClock =
|
||||
OnChainClock(provider: provider, newBlock: newAsyncEvent())
|
||||
|
||||
proc start*(clock: OnChainClock) {.async.} =
|
||||
proc start*(clock: OnChainClock) {.asyncyeah.} =
|
||||
if clock.started:
|
||||
return
|
||||
clock.started = true
|
||||
|
||||
proc onBlock(blck: Block) {.async, upraises:[].} =
|
||||
proc onBlock(blck: Block) {.asyncyeah, upraises:[].} =
|
||||
let blockTime = initTime(blck.timestamp.truncate(int64), 0)
|
||||
let computerTime = getTime()
|
||||
clock.offset = blockTime - computerTime
|
||||
|
@ -33,7 +34,7 @@ proc start*(clock: OnChainClock) {.async.} =
|
|||
|
||||
clock.subscription = await clock.provider.subscribe(onBlock)
|
||||
|
||||
proc stop*(clock: OnChainClock) {.async.} =
|
||||
proc stop*(clock: OnChainClock) {.asyncyeah.} =
|
||||
if not clock.started:
|
||||
return
|
||||
clock.started = false
|
||||
|
@ -44,7 +45,7 @@ method now*(clock: OnChainClock): SecondsSince1970 =
|
|||
doAssert clock.started, "clock should be started before calling now()"
|
||||
toUnix(getTime() + clock.offset)
|
||||
|
||||
method waitUntil*(clock: OnChainClock, time: SecondsSince1970) {.async.} =
|
||||
method waitUntil*(clock: OnChainClock, time: SecondsSince1970) {.asyncyeah.} =
|
||||
while (let difference = time - clock.now(); difference > 0):
|
||||
clock.newBlock.clear()
|
||||
discard await clock.newBlock.wait().withTimeout(chronos.seconds(difference))
|
||||
|
|
|
@ -7,6 +7,7 @@ import pkg/chronicles
|
|||
|
||||
import ../conf
|
||||
import ./marketplace
|
||||
import ../asyncyeah
|
||||
|
||||
type Deployment* = ref object
|
||||
provider: Provider
|
||||
|
@ -31,7 +32,7 @@ proc getKnownAddress(T: type, chainId: UInt256): ?Address =
|
|||
proc new*(_: type Deployment, provider: Provider, config: CodexConf): Deployment =
|
||||
Deployment(provider: provider, config: config)
|
||||
|
||||
proc address*(deployment: Deployment, contract: type): Future[?Address] {.async.} =
|
||||
proc address*(deployment: Deployment, contract: type): Future[?Address] {.asyncyeah.} =
|
||||
when contract is Marketplace:
|
||||
if address =? deployment.config.marketplaceAddress:
|
||||
return some address
|
||||
|
|
|
@ -5,6 +5,7 @@ import ../../purchasing
|
|||
import ../market
|
||||
import ../clock
|
||||
import ./interactions
|
||||
import ../../asyncyeah
|
||||
|
||||
export purchasing
|
||||
export chronicles
|
||||
|
@ -18,10 +19,10 @@ proc new*(_: type ClientInteractions,
|
|||
purchasing: Purchasing): ClientInteractions =
|
||||
ClientInteractions(clock: clock, purchasing: purchasing)
|
||||
|
||||
proc start*(self: ClientInteractions) {.async.} =
|
||||
proc start*(self: ClientInteractions) {.asyncyeah.} =
|
||||
await procCall ContractInteractions(self).start()
|
||||
await self.purchasing.start()
|
||||
|
||||
proc stop*(self: ClientInteractions) {.async.} =
|
||||
proc stop*(self: ClientInteractions) {.asyncyeah.} =
|
||||
await self.purchasing.stop()
|
||||
await procCall ContractInteractions(self).stop()
|
||||
|
|
|
@ -4,6 +4,7 @@ import pkg/chronicles
|
|||
import ../../sales
|
||||
import ../../proving
|
||||
import ./interactions
|
||||
import ../../asyncyeah
|
||||
|
||||
export sales
|
||||
export proving
|
||||
|
@ -21,15 +22,15 @@ proc new*(
|
|||
proving: Proving
|
||||
): HostInteractions =
|
||||
## Create a new HostInteractions instance
|
||||
##
|
||||
##
|
||||
HostInteractions(clock: clock, sales: sales, proving: proving)
|
||||
|
||||
method start*(self: HostInteractions) {.async.} =
|
||||
method start*(self: HostInteractions) {.asyncyeah.} =
|
||||
await procCall ContractInteractions(self).start()
|
||||
await self.sales.start()
|
||||
await self.proving.start()
|
||||
|
||||
method stop*(self: HostInteractions) {.async.} =
|
||||
method stop*(self: HostInteractions) {.asyncyeah.} =
|
||||
await self.sales.stop()
|
||||
await self.proving.stop()
|
||||
await procCall ContractInteractions(self).start()
|
||||
|
|
|
@ -2,6 +2,7 @@ import pkg/ethers
|
|||
import ../clock
|
||||
import ../marketplace
|
||||
import ../market
|
||||
import ../../asyncyeah
|
||||
|
||||
export clock
|
||||
|
||||
|
@ -9,8 +10,8 @@ type
|
|||
ContractInteractions* = ref object of RootObj
|
||||
clock*: OnChainClock
|
||||
|
||||
method start*(self: ContractInteractions) {.async, base.} =
|
||||
method start*(self: ContractInteractions) {.asyncyeah, base.} =
|
||||
await self.clock.start()
|
||||
|
||||
method stop*(self: ContractInteractions) {.async, base.} =
|
||||
method stop*(self: ContractInteractions) {.asyncyeah, base.} =
|
||||
await self.clock.stop()
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import ./interactions
|
||||
import ../../validation
|
||||
import ../../asyncyeah
|
||||
|
||||
export validation
|
||||
|
||||
|
@ -12,10 +13,10 @@ proc new*(_: type ValidatorInteractions,
|
|||
validation: Validation): ValidatorInteractions =
|
||||
ValidatorInteractions(clock: clock, validation: validation)
|
||||
|
||||
proc start*(self: ValidatorInteractions) {.async.} =
|
||||
proc start*(self: ValidatorInteractions) {.asyncyeah.} =
|
||||
await procCall ContractInteractions(self).start()
|
||||
await self.validation.start()
|
||||
|
||||
proc stop*(self: ValidatorInteractions) {.async.} =
|
||||
proc stop*(self: ValidatorInteractions) {.asyncyeah.} =
|
||||
await self.validation.stop()
|
||||
await procCall ContractInteractions(self).stop()
|
||||
|
|
|
@ -8,6 +8,7 @@ import pkg/questionable
|
|||
import pkg/chronicles
|
||||
import ../market
|
||||
import ./marketplace
|
||||
import ../asyncyeah
|
||||
|
||||
export market
|
||||
|
||||
|
@ -31,41 +32,41 @@ func new*(_: type OnChainMarket, contract: Marketplace): OnChainMarket =
|
|||
signer: signer,
|
||||
)
|
||||
|
||||
proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} =
|
||||
proc approveFunds(market: OnChainMarket, amount: UInt256) {.asyncyeah.} =
|
||||
debug "Approving tokens", amount
|
||||
let tokenAddress = await market.contract.token()
|
||||
let token = Erc20Token.new(tokenAddress, market.signer)
|
||||
|
||||
await token.approve(market.contract.address(), amount)
|
||||
|
||||
method getSigner*(market: OnChainMarket): Future[Address] {.async.} =
|
||||
method getSigner*(market: OnChainMarket): Future[Address] {.asyncyeah.} =
|
||||
return await market.signer.getAddress()
|
||||
|
||||
method periodicity*(market: OnChainMarket): Future[Periodicity] {.async.} =
|
||||
method periodicity*(market: OnChainMarket): Future[Periodicity] {.asyncyeah.} =
|
||||
let config = await market.contract.config()
|
||||
let period = config.proofs.period
|
||||
return Periodicity(seconds: period)
|
||||
|
||||
method proofTimeout*(market: OnChainMarket): Future[UInt256] {.async.} =
|
||||
method proofTimeout*(market: OnChainMarket): Future[UInt256] {.asyncyeah.} =
|
||||
let config = await market.contract.config()
|
||||
return config.proofs.timeout
|
||||
|
||||
method myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async.} =
|
||||
method myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.asyncyeah.} =
|
||||
return await market.contract.myRequests
|
||||
|
||||
method mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async.} =
|
||||
method mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.asyncyeah.} =
|
||||
let slots = await market.contract.mySlots()
|
||||
debug "Fetched my slots", numSlots=len(slots)
|
||||
|
||||
return slots
|
||||
|
||||
method requestStorage(market: OnChainMarket, request: StorageRequest){.async.} =
|
||||
method requestStorage(market: OnChainMarket, request: StorageRequest){.asyncyeah.} =
|
||||
debug "Requesting storage"
|
||||
await market.approveFunds(request.price())
|
||||
await market.contract.requestStorage(request)
|
||||
|
||||
method getRequest(market: OnChainMarket,
|
||||
id: RequestId): Future[?StorageRequest] {.async.} =
|
||||
id: RequestId): Future[?StorageRequest] {.asyncyeah.} =
|
||||
try:
|
||||
return some await market.contract.getRequest(id)
|
||||
except ProviderError as e:
|
||||
|
@ -74,7 +75,7 @@ method getRequest(market: OnChainMarket,
|
|||
raise e
|
||||
|
||||
method requestState*(market: OnChainMarket,
|
||||
requestId: RequestId): Future[?RequestState] {.async.} =
|
||||
requestId: RequestId): Future[?RequestState] {.asyncyeah.} =
|
||||
try:
|
||||
return some await market.contract.requestState(requestId)
|
||||
except ProviderError as e:
|
||||
|
@ -83,16 +84,16 @@ method requestState*(market: OnChainMarket,
|
|||
raise e
|
||||
|
||||
method slotState*(market: OnChainMarket,
|
||||
slotId: SlotId): Future[SlotState] {.async.} =
|
||||
slotId: SlotId): Future[SlotState] {.asyncyeah.} =
|
||||
return await market.contract.slotState(slotId)
|
||||
|
||||
method getRequestEnd*(market: OnChainMarket,
|
||||
id: RequestId): Future[SecondsSince1970] {.async.} =
|
||||
id: RequestId): Future[SecondsSince1970] {.asyncyeah.} =
|
||||
return await market.contract.requestEnd(id)
|
||||
|
||||
method getHost(market: OnChainMarket,
|
||||
requestId: RequestId,
|
||||
slotIndex: UInt256): Future[?Address] {.async.} =
|
||||
slotIndex: UInt256): Future[?Address] {.asyncyeah.} =
|
||||
let slotId = slotId(requestId, slotIndex)
|
||||
let address = await market.contract.getHost(slotId)
|
||||
if address != Address.default:
|
||||
|
@ -101,7 +102,7 @@ method getHost(market: OnChainMarket,
|
|||
return none Address
|
||||
|
||||
method getActiveSlot*(market: OnChainMarket,
|
||||
slotId: SlotId): Future[?Slot] {.async.} =
|
||||
slotId: SlotId): Future[?Slot] {.asyncyeah.} =
|
||||
|
||||
try:
|
||||
return some await market.contract.getActiveSlot(slotId)
|
||||
|
@ -114,19 +115,19 @@ method fillSlot(market: OnChainMarket,
|
|||
requestId: RequestId,
|
||||
slotIndex: UInt256,
|
||||
proof: seq[byte],
|
||||
collateral: UInt256) {.async.} =
|
||||
collateral: UInt256) {.asyncyeah.} =
|
||||
await market.approveFunds(collateral)
|
||||
await market.contract.fillSlot(requestId, slotIndex, proof)
|
||||
|
||||
method freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} =
|
||||
method freeSlot*(market: OnChainMarket, slotId: SlotId) {.asyncyeah.} =
|
||||
await market.contract.freeSlot(slotId)
|
||||
|
||||
method withdrawFunds(market: OnChainMarket,
|
||||
requestId: RequestId) {.async.} =
|
||||
requestId: RequestId) {.asyncyeah.} =
|
||||
await market.contract.withdrawFunds(requestId)
|
||||
|
||||
method isProofRequired*(market: OnChainMarket,
|
||||
id: SlotId): Future[bool] {.async.} =
|
||||
id: SlotId): Future[bool] {.asyncyeah.} =
|
||||
try:
|
||||
return await market.contract.isProofRequired(id)
|
||||
except ProviderError as e:
|
||||
|
@ -135,7 +136,7 @@ method isProofRequired*(market: OnChainMarket,
|
|||
raise e
|
||||
|
||||
method willProofBeRequired*(market: OnChainMarket,
|
||||
id: SlotId): Future[bool] {.async.} =
|
||||
id: SlotId): Future[bool] {.asyncyeah.} =
|
||||
try:
|
||||
return await market.contract.willProofBeRequired(id)
|
||||
except ProviderError as e:
|
||||
|
@ -145,19 +146,19 @@ method willProofBeRequired*(market: OnChainMarket,
|
|||
|
||||
method submitProof*(market: OnChainMarket,
|
||||
id: SlotId,
|
||||
proof: seq[byte]) {.async.} =
|
||||
proof: seq[byte]) {.asyncyeah.} =
|
||||
await market.contract.submitProof(id, proof)
|
||||
|
||||
method markProofAsMissing*(market: OnChainMarket,
|
||||
id: SlotId,
|
||||
period: Period) {.async.} =
|
||||
period: Period) {.asyncyeah.} =
|
||||
await market.contract.markProofAsMissing(id, period)
|
||||
|
||||
method canProofBeMarkedAsMissing*(
|
||||
market: OnChainMarket,
|
||||
id: SlotId,
|
||||
period: Period
|
||||
): Future[bool] {.async.} =
|
||||
): Future[bool] {.asyncyeah.} =
|
||||
let provider = market.contract.provider
|
||||
let contractWithoutSigner = market.contract.connect(provider)
|
||||
let overrides = CallOverrides(blockTag: some BlockTag.pending)
|
||||
|
@ -170,7 +171,7 @@ method canProofBeMarkedAsMissing*(
|
|||
|
||||
method subscribeRequests(market: OnChainMarket,
|
||||
callback: OnRequest):
|
||||
Future[MarketSubscription] {.async.} =
|
||||
Future[MarketSubscription] {.asyncyeah.} =
|
||||
proc onEvent(event: StorageRequested) {.upraises:[].} =
|
||||
callback(event.requestId, event.ask)
|
||||
let subscription = await market.contract.subscribe(StorageRequested, onEvent)
|
||||
|
@ -178,7 +179,7 @@ method subscribeRequests(market: OnChainMarket,
|
|||
|
||||
method subscribeSlotFilled*(market: OnChainMarket,
|
||||
callback: OnSlotFilled):
|
||||
Future[MarketSubscription] {.async.} =
|
||||
Future[MarketSubscription] {.asyncyeah.} =
|
||||
proc onEvent(event: SlotFilled) {.upraises:[].} =
|
||||
callback(event.requestId, event.slotIndex)
|
||||
let subscription = await market.contract.subscribe(SlotFilled, onEvent)
|
||||
|
@ -188,7 +189,7 @@ method subscribeSlotFilled*(market: OnChainMarket,
|
|||
requestId: RequestId,
|
||||
slotIndex: UInt256,
|
||||
callback: OnSlotFilled):
|
||||
Future[MarketSubscription] {.async.} =
|
||||
Future[MarketSubscription] {.asyncyeah.} =
|
||||
proc onSlotFilled(eventRequestId: RequestId, eventSlotIndex: UInt256) =
|
||||
if eventRequestId == requestId and eventSlotIndex == slotIndex:
|
||||
callback(requestId, slotIndex)
|
||||
|
@ -196,7 +197,7 @@ method subscribeSlotFilled*(market: OnChainMarket,
|
|||
|
||||
method subscribeSlotFreed*(market: OnChainMarket,
|
||||
callback: OnSlotFreed):
|
||||
Future[MarketSubscription] {.async.} =
|
||||
Future[MarketSubscription] {.asyncyeah.} =
|
||||
proc onEvent(event: SlotFreed) {.upraises:[].} =
|
||||
callback(event.slotId)
|
||||
let subscription = await market.contract.subscribe(SlotFreed, onEvent)
|
||||
|
@ -205,7 +206,7 @@ method subscribeSlotFreed*(market: OnChainMarket,
|
|||
method subscribeFulfillment(market: OnChainMarket,
|
||||
requestId: RequestId,
|
||||
callback: OnFulfillment):
|
||||
Future[MarketSubscription] {.async.} =
|
||||
Future[MarketSubscription] {.asyncyeah.} =
|
||||
proc onEvent(event: RequestFulfilled) {.upraises:[].} =
|
||||
if event.requestId == requestId:
|
||||
callback(event.requestId)
|
||||
|
@ -215,7 +216,7 @@ method subscribeFulfillment(market: OnChainMarket,
|
|||
method subscribeRequestCancelled*(market: OnChainMarket,
|
||||
requestId: RequestId,
|
||||
callback: OnRequestCancelled):
|
||||
Future[MarketSubscription] {.async.} =
|
||||
Future[MarketSubscription] {.asyncyeah.} =
|
||||
proc onEvent(event: RequestCancelled) {.upraises:[].} =
|
||||
if event.requestId == requestId:
|
||||
callback(event.requestId)
|
||||
|
@ -225,7 +226,7 @@ method subscribeRequestCancelled*(market: OnChainMarket,
|
|||
method subscribeRequestFailed*(market: OnChainMarket,
|
||||
requestId: RequestId,
|
||||
callback: OnRequestFailed):
|
||||
Future[MarketSubscription] {.async.} =
|
||||
Future[MarketSubscription] {.asyncyeah.} =
|
||||
proc onEvent(event: RequestFailed) {.upraises:[]} =
|
||||
if event.requestId == requestId:
|
||||
callback(event.requestId)
|
||||
|
@ -234,11 +235,11 @@ method subscribeRequestFailed*(market: OnChainMarket,
|
|||
|
||||
method subscribeProofSubmission*(market: OnChainMarket,
|
||||
callback: OnProofSubmitted):
|
||||
Future[MarketSubscription] {.async.} =
|
||||
Future[MarketSubscription] {.asyncyeah.} =
|
||||
proc onEvent(event: ProofSubmitted) {.upraises: [].} =
|
||||
callback(event.id, event.proof)
|
||||
let subscription = await market.contract.subscribe(ProofSubmitted, onEvent)
|
||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||
|
||||
method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} =
|
||||
method unsubscribe*(subscription: OnChainMarketSubscription) {.asyncyeah.} =
|
||||
await subscription.eventSubscription.unsubscribe()
|
||||
|
|
|
@ -3,6 +3,7 @@ import pkg/ethers/erc20
|
|||
import pkg/json_rpc/rpcclient
|
||||
import pkg/stint
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import ../clock
|
||||
import ./requests
|
||||
import ./config
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
import std/algorithm
|
||||
|
||||
import pkg/chronos
|
||||
import ./asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p
|
||||
import pkg/libp2p/routing_record
|
||||
|
@ -58,7 +59,7 @@ proc toNodeId*(host: ca.Address): NodeId =
|
|||
proc findPeer*(
|
||||
d: Discovery,
|
||||
peerId: PeerId
|
||||
): Future[?PeerRecord] {.async.} =
|
||||
): Future[?PeerRecord] {.asyncyeah.} =
|
||||
trace "protocol.resolve..."
|
||||
## Find peer using the given Discovery object
|
||||
##
|
||||
|
@ -74,9 +75,9 @@ proc findPeer*(
|
|||
method find*(
|
||||
d: Discovery,
|
||||
cid: Cid
|
||||
): Future[seq[SignedPeerRecord]] {.async, base.} =
|
||||
): Future[seq[SignedPeerRecord]] {.asyncyeah, base.} =
|
||||
## Find block providers
|
||||
##
|
||||
##
|
||||
|
||||
trace "Finding providers for block", cid
|
||||
without providers =?
|
||||
|
@ -85,7 +86,7 @@ method find*(
|
|||
|
||||
return providers
|
||||
|
||||
method provide*(d: Discovery, cid: Cid) {.async, base.} =
|
||||
method provide*(d: Discovery, cid: Cid) {.asyncyeah, base.} =
|
||||
## Provide a bock Cid
|
||||
##
|
||||
|
||||
|
@ -102,7 +103,7 @@ method provide*(d: Discovery, cid: Cid) {.async, base.} =
|
|||
method find*(
|
||||
d: Discovery,
|
||||
host: ca.Address
|
||||
): Future[seq[SignedPeerRecord]] {.async, base.} =
|
||||
): Future[seq[SignedPeerRecord]] {.asyncyeah, base.} =
|
||||
## Find host providers
|
||||
##
|
||||
|
||||
|
@ -121,7 +122,7 @@ method find*(
|
|||
|
||||
return providers
|
||||
|
||||
method provide*(d: Discovery, host: ca.Address) {.async, base.} =
|
||||
method provide*(d: Discovery, host: ca.Address) {.asyncyeah, base.} =
|
||||
## Provide hosts
|
||||
##
|
||||
|
||||
|
@ -169,11 +170,11 @@ proc updateDhtRecord*(d: Discovery, ip: ValidIpAddress, port: Port) =
|
|||
IpTransportProtocol.udpProtocol,
|
||||
port)])).expect("Should construct signed record").some
|
||||
|
||||
proc start*(d: Discovery) {.async.} =
|
||||
proc start*(d: Discovery) {.asyncyeah.} =
|
||||
d.protocol.open()
|
||||
await d.protocol.start()
|
||||
|
||||
proc stop*(d: Discovery) {.async.} =
|
||||
proc stop*(d: Discovery) {.asyncyeah.} =
|
||||
await d.protocol.closeWait()
|
||||
|
||||
proc new*(
|
||||
|
@ -185,8 +186,8 @@ proc new*(
|
|||
bootstrapNodes: openArray[SignedPeerRecord] = [],
|
||||
store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!")
|
||||
): Discovery =
|
||||
## Create a new Discovery node instance for the given key and datastore
|
||||
##
|
||||
## Create a new Discovery node instance for the given key and datastore
|
||||
##
|
||||
|
||||
var
|
||||
self = Discovery(
|
||||
|
|
|
@ -14,6 +14,7 @@ push: {.upraises: [].}
|
|||
import std/sequtils
|
||||
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/chronicles
|
||||
|
||||
import ../manifest
|
||||
|
@ -67,7 +68,7 @@ proc encode*(
|
|||
manifest: Manifest,
|
||||
blocks: int,
|
||||
parity: int
|
||||
): Future[?!Manifest] {.async.} =
|
||||
): Future[?!Manifest] {.asyncyeah.} =
|
||||
## Encode a manifest into one that is erasure protected.
|
||||
##
|
||||
## `manifest` - the original manifest to be encoded
|
||||
|
@ -156,7 +157,7 @@ proc encode*(
|
|||
proc decode*(
|
||||
self: Erasure,
|
||||
encoded: Manifest
|
||||
): Future[?!Manifest] {.async.} =
|
||||
): Future[?!Manifest] {.asyncyeah.} =
|
||||
## Decode a protected manifest into it's original
|
||||
## manifest
|
||||
##
|
||||
|
@ -260,10 +261,10 @@ proc decode*(
|
|||
|
||||
return decoded.success
|
||||
|
||||
proc start*(self: Erasure) {.async.} =
|
||||
proc start*(self: Erasure) {.asyncyeah.} =
|
||||
return
|
||||
|
||||
proc stop*(self: Erasure) {.async.} =
|
||||
proc stop*(self: Erasure) {.asyncyeah.} =
|
||||
return
|
||||
|
||||
proc new*(
|
||||
|
|
|
@ -20,6 +20,7 @@ import pkg/questionable
|
|||
import pkg/questionable/results
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
|
||||
import ./manifest
|
||||
import ../errors
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import pkg/chronos
|
||||
import ./asyncyeah
|
||||
import pkg/upraises
|
||||
import pkg/questionable
|
||||
import pkg/ethers/erc20
|
||||
|
@ -23,50 +24,50 @@ type
|
|||
OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises:[].}
|
||||
OnProofSubmitted* = proc(id: SlotId, proof: seq[byte]) {.gcsafe, upraises:[].}
|
||||
|
||||
method getSigner*(market: Market): Future[Address] {.base, async.} =
|
||||
method getSigner*(market: Market): Future[Address] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method periodicity*(market: Market): Future[Periodicity] {.base, async.} =
|
||||
method periodicity*(market: Market): Future[Periodicity] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method proofTimeout*(market: Market): Future[UInt256] {.base, async.} =
|
||||
method proofTimeout*(market: Market): Future[UInt256] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method requestStorage*(market: Market,
|
||||
request: StorageRequest) {.base, async.} =
|
||||
request: StorageRequest) {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method myRequests*(market: Market): Future[seq[RequestId]] {.base, async.} =
|
||||
method myRequests*(market: Market): Future[seq[RequestId]] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method mySlots*(market: Market): Future[seq[SlotId]] {.base, async.} =
|
||||
method mySlots*(market: Market): Future[seq[SlotId]] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method getRequest*(market: Market,
|
||||
id: RequestId):
|
||||
Future[?StorageRequest] {.base, async.} =
|
||||
Future[?StorageRequest] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method requestState*(market: Market,
|
||||
requestId: RequestId): Future[?RequestState] {.base, async.} =
|
||||
requestId: RequestId): Future[?RequestState] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method slotState*(market: Market,
|
||||
slotId: SlotId): Future[SlotState] {.base, async.} =
|
||||
slotId: SlotId): Future[SlotState] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method getRequestEnd*(market: Market,
|
||||
id: RequestId): Future[SecondsSince1970] {.base, async.} =
|
||||
id: RequestId): Future[SecondsSince1970] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method getHost*(market: Market,
|
||||
requestId: RequestId,
|
||||
slotIndex: UInt256): Future[?Address] {.base, async.} =
|
||||
slotIndex: UInt256): Future[?Address] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method getActiveSlot*(
|
||||
market: Market,
|
||||
slotId: SlotId): Future[?Slot] {.base, async.} =
|
||||
slotId: SlotId): Future[?Slot] {.base, asyncyeah.} =
|
||||
|
||||
raiseAssert("not implemented")
|
||||
|
||||
|
@ -74,83 +75,83 @@ method fillSlot*(market: Market,
|
|||
requestId: RequestId,
|
||||
slotIndex: UInt256,
|
||||
proof: seq[byte],
|
||||
collateral: UInt256) {.base, async.} =
|
||||
collateral: UInt256) {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method freeSlot*(market: Market, slotId: SlotId) {.base, async.} =
|
||||
method freeSlot*(market: Market, slotId: SlotId) {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method withdrawFunds*(market: Market,
|
||||
requestId: RequestId) {.base, async.} =
|
||||
requestId: RequestId) {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method subscribeRequests*(market: Market,
|
||||
callback: OnRequest):
|
||||
Future[Subscription] {.base, async.} =
|
||||
Future[Subscription] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method isProofRequired*(market: Market,
|
||||
id: SlotId): Future[bool] {.base, async.} =
|
||||
id: SlotId): Future[bool] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method willProofBeRequired*(market: Market,
|
||||
id: SlotId): Future[bool] {.base, async.} =
|
||||
id: SlotId): Future[bool] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method submitProof*(market: Market,
|
||||
id: SlotId,
|
||||
proof: seq[byte]) {.base, async.} =
|
||||
proof: seq[byte]) {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method markProofAsMissing*(market: Market,
|
||||
id: SlotId,
|
||||
period: Period) {.base, async.} =
|
||||
period: Period) {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method canProofBeMarkedAsMissing*(market: Market,
|
||||
id: SlotId,
|
||||
period: Period): Future[bool] {.base, async.} =
|
||||
period: Period): Future[bool] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method subscribeFulfillment*(market: Market,
|
||||
requestId: RequestId,
|
||||
callback: OnFulfillment):
|
||||
Future[Subscription] {.base, async.} =
|
||||
Future[Subscription] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method subscribeSlotFilled*(market: Market,
|
||||
callback: OnSlotFilled):
|
||||
Future[Subscription] {.base, async.} =
|
||||
Future[Subscription] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method subscribeSlotFilled*(market: Market,
|
||||
requestId: RequestId,
|
||||
slotIndex: UInt256,
|
||||
callback: OnSlotFilled):
|
||||
Future[Subscription] {.base, async.} =
|
||||
Future[Subscription] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method subscribeSlotFreed*(market: Market,
|
||||
callback: OnSlotFreed):
|
||||
Future[Subscription] {.base, async.} =
|
||||
Future[Subscription] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method subscribeRequestCancelled*(market: Market,
|
||||
requestId: RequestId,
|
||||
callback: OnRequestCancelled):
|
||||
Future[Subscription] {.base, async.} =
|
||||
Future[Subscription] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method subscribeRequestFailed*(market: Market,
|
||||
requestId: RequestId,
|
||||
callback: OnRequestFailed):
|
||||
Future[Subscription] {.base, async.} =
|
||||
Future[Subscription] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method subscribeProofSubmission*(market: Market,
|
||||
callback: OnProofSubmitted):
|
||||
Future[Subscription] {.base, async.} =
|
||||
Future[Subscription] {.base, asyncyeah.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} =
|
||||
method unsubscribe*(subscription: Subscription) {.base, asyncyeah, upraises:[].} =
|
||||
raiseAssert("not implemented")
|
||||
|
|
|
@ -16,6 +16,7 @@ import pkg/questionable
|
|||
import pkg/questionable/results
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import ./asyncyeah
|
||||
import pkg/libp2p
|
||||
|
||||
# TODO: remove once exported by libp2p
|
||||
|
@ -61,9 +62,9 @@ type
|
|||
proc findPeer*(
|
||||
node: CodexNodeRef,
|
||||
peerId: PeerId
|
||||
): Future[?PeerRecord] {.async.} =
|
||||
): Future[?PeerRecord] {.asyncyeah.} =
|
||||
## Find peer using the discovery service from the given CodexNode
|
||||
##
|
||||
##
|
||||
return await node.discovery.findPeer(peerId)
|
||||
|
||||
proc connect*(
|
||||
|
@ -76,7 +77,7 @@ proc connect*(
|
|||
proc fetchManifest*(
|
||||
node: CodexNodeRef,
|
||||
cid: Cid
|
||||
): Future[?!Manifest] {.async.} =
|
||||
): Future[?!Manifest] {.asyncyeah.} =
|
||||
## Fetch and decode a manifest block
|
||||
##
|
||||
|
||||
|
@ -102,7 +103,7 @@ proc fetchBatched*(
|
|||
manifest: Manifest,
|
||||
batchSize = FetchBatch,
|
||||
onBatch: BatchProc = nil
|
||||
): Future[?!void] {.async, gcsafe.} =
|
||||
): Future[?!void] {.asyncyeah, gcsafe.} =
|
||||
## Fetch manifest in batches of `batchSize`
|
||||
##
|
||||
|
||||
|
@ -130,7 +131,7 @@ proc fetchBatched*(
|
|||
proc retrieve*(
|
||||
node: CodexNodeRef,
|
||||
cid: Cid
|
||||
): Future[?!LPStream] {.async.} =
|
||||
): Future[?!LPStream] {.asyncyeah.} =
|
||||
## Retrieve by Cid a single block or an entire dataset described by manifest
|
||||
##
|
||||
|
||||
|
@ -138,7 +139,7 @@ proc retrieve*(
|
|||
trace "Retrieving blocks from manifest", cid
|
||||
if manifest.protected:
|
||||
# Retrieve, decode and save to the local store all EС groups
|
||||
proc erasureJob(): Future[void] {.async.} =
|
||||
proc erasureJob(): Future[void] {.asyncyeah.} =
|
||||
try:
|
||||
# Spawn an erasure decoding job
|
||||
without res =? (await node.erasure.decode(manifest)), error:
|
||||
|
@ -149,7 +150,7 @@ proc retrieve*(
|
|||
asyncSpawn erasureJob()
|
||||
# else:
|
||||
# # Prefetch the entire dataset into the local store
|
||||
# proc prefetchBlocks() {.async, raises: [Defect].} =
|
||||
# proc prefetchBlocks() {.asyncyeah, raises: [Defect].} =
|
||||
# try:
|
||||
# discard await node.fetchBatched(manifest)
|
||||
# except CatchableError as exc:
|
||||
|
@ -167,7 +168,7 @@ proc retrieve*(
|
|||
without blk =? (await node.blockStore.getBlock(cid)), err:
|
||||
return failure(err)
|
||||
|
||||
proc streamOneBlock(): Future[void] {.async.} =
|
||||
proc streamOneBlock(): Future[void] {.asyncyeah.} =
|
||||
try:
|
||||
await stream.pushData(blk.data)
|
||||
except CatchableError as exc:
|
||||
|
@ -185,7 +186,7 @@ proc store*(
|
|||
self: CodexNodeRef,
|
||||
stream: LPStream,
|
||||
blockSize = DefaultBlockSize
|
||||
): Future[?!Cid] {.async.} =
|
||||
): Future[?!Cid] {.asyncyeah.} =
|
||||
## Save stream contents as dataset with given blockSize
|
||||
## to nodes's BlockStore, and return Cid of its manifest
|
||||
##
|
||||
|
@ -256,7 +257,7 @@ proc requestStorage*(
|
|||
reward: UInt256,
|
||||
collateral: UInt256,
|
||||
expiry = UInt256.none
|
||||
): Future[?!PurchaseId] {.async.} =
|
||||
): Future[?!PurchaseId] {.asyncyeah.} =
|
||||
## Initiate a request for storage sequence, this might
|
||||
## be a multistep procedure.
|
||||
##
|
||||
|
@ -330,7 +331,7 @@ proc new*(
|
|||
contracts = Contracts.default
|
||||
): CodexNodeRef =
|
||||
## Create new instance of a Codex node, call `start` to run it
|
||||
##
|
||||
##
|
||||
CodexNodeRef(
|
||||
switch: switch,
|
||||
blockStore: store,
|
||||
|
@ -339,7 +340,7 @@ proc new*(
|
|||
discovery: discovery,
|
||||
contracts: contracts)
|
||||
|
||||
proc start*(node: CodexNodeRef) {.async.} =
|
||||
proc start*(node: CodexNodeRef) {.asyncyeah.} =
|
||||
if not node.switch.isNil:
|
||||
await node.switch.start()
|
||||
|
||||
|
@ -413,7 +414,7 @@ proc start*(node: CodexNodeRef) {.async.} =
|
|||
node.networkId = node.switch.peerInfo.peerId
|
||||
notice "Started codex node", id = $node.networkId, addrs = node.switch.peerInfo.addrs
|
||||
|
||||
proc stop*(node: CodexNodeRef) {.async.} =
|
||||
proc stop*(node: CodexNodeRef) {.asyncyeah.} =
|
||||
trace "Stopping node"
|
||||
|
||||
if not node.engine.isNil:
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/upraises
|
||||
import ../blocktype as bt
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import pkg/questionable
|
|||
import pkg/chronicles
|
||||
import ../market
|
||||
import ../clock
|
||||
import ../asyncyeah
|
||||
|
||||
export sets
|
||||
|
||||
|
@ -31,15 +32,15 @@ proc `onProve=`*(proving: Proving, callback: OnProve) =
|
|||
func add*(proving: Proving, slot: Slot) =
|
||||
proving.slots.incl(slot)
|
||||
|
||||
proc getCurrentPeriod*(proving: Proving): Future[Period] {.async.} =
|
||||
proc getCurrentPeriod*(proving: Proving): Future[Period] {.asyncyeah.} =
|
||||
let periodicity = await proving.market.periodicity()
|
||||
return periodicity.periodOf(proving.clock.now().u256)
|
||||
|
||||
proc waitUntilPeriod(proving: Proving, period: Period) {.async.} =
|
||||
proc waitUntilPeriod(proving: Proving, period: Period) {.asyncyeah.} =
|
||||
let periodicity = await proving.market.periodicity()
|
||||
await proving.clock.waitUntil(periodicity.periodStart(period).truncate(int64))
|
||||
|
||||
proc removeEndedContracts(proving: Proving) {.async.} =
|
||||
proc removeEndedContracts(proving: Proving) {.asyncyeah.} =
|
||||
var ended: HashSet[Slot]
|
||||
for slot in proving.slots:
|
||||
let state = await proving.market.slotState(slot.id)
|
||||
|
@ -52,7 +53,7 @@ proc removeEndedContracts(proving: Proving) {.async.} =
|
|||
ended.incl(slot)
|
||||
proving.slots.excl(ended)
|
||||
|
||||
method prove*(proving: Proving, slot: Slot) {.base, async.} =
|
||||
method prove*(proving: Proving, slot: Slot) {.base, asyncyeah.} =
|
||||
logScope:
|
||||
currentPeriod = await proving.getCurrentPeriod()
|
||||
|
||||
|
@ -66,7 +67,7 @@ method prove*(proving: Proving, slot: Slot) {.base, async.} =
|
|||
except CatchableError as e:
|
||||
error "Submitting proof failed", msg = e.msg
|
||||
|
||||
proc run(proving: Proving) {.async.} =
|
||||
proc run(proving: Proving) {.asyncyeah.} =
|
||||
try:
|
||||
while true:
|
||||
let currentPeriod = await proving.getCurrentPeriod()
|
||||
|
@ -83,13 +84,13 @@ proc run(proving: Proving) {.async.} =
|
|||
except CatchableError as e:
|
||||
error "Proving failed", msg = e.msg
|
||||
|
||||
proc start*(proving: Proving) {.async.} =
|
||||
proc start*(proving: Proving) {.asyncyeah.} =
|
||||
if proving.loop.isSome:
|
||||
return
|
||||
|
||||
proving.loop = some proving.run()
|
||||
|
||||
proc stop*(proving: Proving) {.async.} =
|
||||
proc stop*(proving: Proving) {.asyncyeah.} =
|
||||
if loop =? proving.loop:
|
||||
proving.loop = Future[void].none
|
||||
if not loop.finished:
|
||||
|
|
|
@ -29,7 +29,7 @@ when codex_enable_proof_failures:
|
|||
proc onSubmitProofError(error: ref CatchableError, period: UInt256) =
|
||||
error "Submitting invalid proof failed", period, msg = error.msg
|
||||
|
||||
method prove(proving: SimulatedProving, slot: Slot) {.async.} =
|
||||
method prove(proving: SimulatedProving, slot: Slot) {.asyncyeah.} =
|
||||
let period = await proving.getCurrentPeriod()
|
||||
proving.proofCount += 1
|
||||
if proving.failEveryNProofs > 0'u and
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import std/tables
|
||||
import pkg/stint
|
||||
import pkg/chronos
|
||||
import ./asyncyeah
|
||||
import pkg/questionable
|
||||
import pkg/nimcrypto
|
||||
import ./market
|
||||
|
@ -32,7 +33,7 @@ proc new*(_: type Purchasing, market: Market, clock: Clock): Purchasing =
|
|||
requestExpiryInterval: DefaultRequestExpiryInterval,
|
||||
)
|
||||
|
||||
proc load*(purchasing: Purchasing) {.async.} =
|
||||
proc load*(purchasing: Purchasing) {.asyncyeah.} =
|
||||
let market = purchasing.market
|
||||
let requestIds = await market.myRequests()
|
||||
for requestId in requestIds:
|
||||
|
@ -40,15 +41,15 @@ proc load*(purchasing: Purchasing) {.async.} =
|
|||
purchase.load()
|
||||
purchasing.purchases[purchase.id] = purchase
|
||||
|
||||
proc start*(purchasing: Purchasing) {.async.} =
|
||||
proc start*(purchasing: Purchasing) {.asyncyeah.} =
|
||||
await purchasing.load()
|
||||
|
||||
proc stop*(purchasing: Purchasing) {.async.} =
|
||||
proc stop*(purchasing: Purchasing) {.asyncyeah.} =
|
||||
discard
|
||||
|
||||
proc populate*(purchasing: Purchasing,
|
||||
request: StorageRequest
|
||||
): Future[StorageRequest] {.async.} =
|
||||
): Future[StorageRequest] {.asyncyeah.} =
|
||||
result = request
|
||||
if result.ask.proofProbability == 0.u256:
|
||||
result.ask.proofProbability = purchasing.proofProbability
|
||||
|
@ -62,7 +63,7 @@ proc populate*(purchasing: Purchasing,
|
|||
|
||||
proc purchase*(purchasing: Purchasing,
|
||||
request: StorageRequest
|
||||
): Future[Purchase] {.async.} =
|
||||
): Future[Purchase] {.asyncyeah.} =
|
||||
let request = await purchasing.populate(request)
|
||||
let purchase = Purchase.new(request, purchasing.market, purchasing.clock)
|
||||
purchase.start()
|
||||
|
|
|
@ -2,6 +2,7 @@ import ./statemachine
|
|||
import ./states/pending
|
||||
import ./states/unknown
|
||||
import ./purchaseid
|
||||
import ../asyncyeah
|
||||
|
||||
# Purchase is implemented as a state machine.
|
||||
#
|
||||
|
@ -31,7 +32,7 @@ func new*(
|
|||
clock: Clock
|
||||
): Purchase =
|
||||
## create a new instance of a Purchase
|
||||
##
|
||||
##
|
||||
Purchase(
|
||||
future: Future[void].new(),
|
||||
requestId: requestId,
|
||||
|
@ -56,7 +57,7 @@ proc start*(purchase: Purchase) =
|
|||
proc load*(purchase: Purchase) =
|
||||
purchase.start(PurchaseUnknown())
|
||||
|
||||
proc wait*(purchase: Purchase) {.async.} =
|
||||
proc wait*(purchase: Purchase) {.asyncyeah.} =
|
||||
await purchase.future
|
||||
|
||||
func id*(purchase: Purchase): PurchaseId =
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
import ../statemachine
|
||||
import ./errorhandling
|
||||
import ./error
|
||||
import ../../asyncyeah
|
||||
|
||||
type PurchaseCancelled* = ref object of ErrorHandlingState
|
||||
|
||||
method `$`*(state: PurchaseCancelled): string =
|
||||
"cancelled"
|
||||
|
||||
method run*(state: PurchaseCancelled, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: PurchaseCancelled, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let purchase = Purchase(machine)
|
||||
await purchase.market.withdrawFunds(purchase.requestId)
|
||||
let error = newException(Timeout, "Purchase cancelled due to timeout")
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import ../statemachine
|
||||
import ../../asyncyeah
|
||||
|
||||
type PurchaseErrored* = ref object of PurchaseState
|
||||
error*: ref CatchableError
|
||||
|
@ -6,6 +7,6 @@ type PurchaseErrored* = ref object of PurchaseState
|
|||
method `$`*(state: PurchaseErrored): string =
|
||||
"errored"
|
||||
|
||||
method run*(state: PurchaseErrored, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: PurchaseErrored, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let purchase = Purchase(machine)
|
||||
purchase.future.fail(state.error)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import ../statemachine
|
||||
import ./error
|
||||
import ../../asyncyeah
|
||||
|
||||
type
|
||||
PurchaseFailed* = ref object of PurchaseState
|
||||
|
@ -7,6 +8,6 @@ type
|
|||
method `$`*(state: PurchaseFailed): string =
|
||||
"failed"
|
||||
|
||||
method run*(state: PurchaseFailed, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: PurchaseFailed, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let error = newException(PurchaseError, "Purchase failed")
|
||||
return some State(PurchaseErrored(error: error))
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
import ../statemachine
|
||||
import ../../asyncyeah
|
||||
|
||||
type PurchaseFinished* = ref object of PurchaseState
|
||||
|
||||
method `$`*(state: PurchaseFinished): string =
|
||||
"finished"
|
||||
|
||||
method run*(state: PurchaseFinished, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: PurchaseFinished, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let purchase = Purchase(machine)
|
||||
purchase.future.complete()
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
import ../statemachine
|
||||
import ./errorhandling
|
||||
import ./submitted
|
||||
import ../../asyncyeah
|
||||
|
||||
type PurchasePending* = ref object of ErrorHandlingState
|
||||
|
||||
method `$`*(state: PurchasePending): string =
|
||||
"pending"
|
||||
|
||||
method run*(state: PurchasePending, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: PurchasePending, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let purchase = Purchase(machine)
|
||||
let request = !purchase.request
|
||||
await purchase.market.requestStorage(request)
|
||||
|
|
|
@ -2,13 +2,14 @@ import ../statemachine
|
|||
import ./errorhandling
|
||||
import ./finished
|
||||
import ./failed
|
||||
import ../../asyncyeah
|
||||
|
||||
type PurchaseStarted* = ref object of ErrorHandlingState
|
||||
|
||||
method `$`*(state: PurchaseStarted): string =
|
||||
"started"
|
||||
|
||||
method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let purchase = Purchase(machine)
|
||||
|
||||
let clock = purchase.clock
|
||||
|
|
|
@ -2,19 +2,20 @@ import ../statemachine
|
|||
import ./errorhandling
|
||||
import ./started
|
||||
import ./cancelled
|
||||
import ../../asyncyeah
|
||||
|
||||
type PurchaseSubmitted* = ref object of ErrorHandlingState
|
||||
|
||||
method `$`*(state: PurchaseSubmitted): string =
|
||||
"submitted"
|
||||
|
||||
method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let purchase = Purchase(machine)
|
||||
let request = !purchase.request
|
||||
let market = purchase.market
|
||||
let clock = purchase.clock
|
||||
|
||||
proc wait {.async.} =
|
||||
proc wait {.asyncyeah.} =
|
||||
let done = newFuture[void]()
|
||||
proc callback(_: RequestId) =
|
||||
done.complete()
|
||||
|
@ -22,7 +23,7 @@ method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.
|
|||
await done
|
||||
await subscription.unsubscribe()
|
||||
|
||||
proc withTimeout(future: Future[void]) {.async.} =
|
||||
proc withTimeout(future: Future[void]) {.asyncyeah.} =
|
||||
let expiry = request.expiry.truncate(int64)
|
||||
await future.withTimeout(clock, expiry)
|
||||
|
||||
|
|
|
@ -5,13 +5,14 @@ import ./started
|
|||
import ./cancelled
|
||||
import ./finished
|
||||
import ./failed
|
||||
import ../../asyncyeah
|
||||
|
||||
type PurchaseUnknown* = ref object of ErrorHandlingState
|
||||
|
||||
method `$`*(state: PurchaseUnknown): string =
|
||||
"unknown"
|
||||
|
||||
method run*(state: PurchaseUnknown, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: PurchaseUnknown, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let purchase = Purchase(machine)
|
||||
if (request =? await purchase.market.getRequest(purchase.requestId)) and
|
||||
(requestState =? await purchase.market.requestState(purchase.requestId)):
|
||||
|
|
|
@ -18,6 +18,7 @@ import pkg/questionable
|
|||
import pkg/questionable/results
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/presto
|
||||
import pkg/libp2p
|
||||
import pkg/stew/base10
|
||||
|
|
|
@ -11,6 +11,7 @@ import std/sugar
|
|||
|
||||
import pkg/presto
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/libp2p
|
||||
import pkg/stew/base10
|
||||
import pkg/stew/byteutils
|
||||
|
|
|
@ -15,6 +15,8 @@ import ./sales/salesagent
|
|||
import ./sales/statemachine
|
||||
import ./sales/states/downloading
|
||||
import ./sales/states/unknown
|
||||
import ./asyncyeah
|
||||
|
||||
|
||||
## Sales holds a list of available storage that it may sell.
|
||||
##
|
||||
|
@ -100,7 +102,7 @@ proc handleRequest(sales: Sales,
|
|||
agent.start(SaleDownloading())
|
||||
sales.agents.add agent
|
||||
|
||||
proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
|
||||
proc mySlots*(sales: Sales): Future[seq[Slot]] {.asyncyeah.} =
|
||||
let market = sales.context.market
|
||||
let slotIds = await market.mySlots()
|
||||
var slots: seq[Slot] = @[]
|
||||
|
@ -111,7 +113,7 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
|
|||
|
||||
return slots
|
||||
|
||||
proc load*(sales: Sales) {.async.} =
|
||||
proc load*(sales: Sales) {.asyncyeah.} =
|
||||
let slots = await sales.mySlots()
|
||||
|
||||
for slot in slots:
|
||||
|
@ -123,7 +125,7 @@ proc load*(sales: Sales) {.async.} =
|
|||
agent.start(SaleUnknown())
|
||||
sales.agents.add agent
|
||||
|
||||
proc start*(sales: Sales) {.async.} =
|
||||
proc start*(sales: Sales) {.asyncyeah.} =
|
||||
doAssert sales.subscription.isNone, "Sales already started"
|
||||
|
||||
proc onRequest(requestId: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} =
|
||||
|
@ -134,7 +136,7 @@ proc start*(sales: Sales) {.async.} =
|
|||
except CatchableError as e:
|
||||
error "Unable to start sales", msg = e.msg
|
||||
|
||||
proc stop*(sales: Sales) {.async.} =
|
||||
proc stop*(sales: Sales) {.asyncyeah.} =
|
||||
if subscription =? sales.subscription:
|
||||
sales.subscription = market.Subscription.none
|
||||
try:
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
import std/typetraits
|
||||
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/upraises
|
||||
import pkg/json_serialization
|
||||
|
@ -121,7 +122,7 @@ func hasAvailable*(self: Reservations, bytes: uint): bool =
|
|||
|
||||
proc exists*(
|
||||
self: Reservations,
|
||||
id: AvailabilityId): Future[?!bool] {.async.} =
|
||||
id: AvailabilityId): Future[?!bool] {.asyncyeah.} =
|
||||
|
||||
without key =? id.key, err:
|
||||
return failure(err)
|
||||
|
@ -131,7 +132,7 @@ proc exists*(
|
|||
|
||||
proc get*(
|
||||
self: Reservations,
|
||||
id: AvailabilityId): Future[?!Availability] {.async.} =
|
||||
id: AvailabilityId): Future[?!Availability] {.asyncyeah.} =
|
||||
|
||||
if exists =? (await self.exists(id)) and not exists:
|
||||
let err = newException(AvailabilityGetFailedError,
|
||||
|
@ -151,7 +152,7 @@ proc get*(
|
|||
|
||||
proc update(
|
||||
self: Reservations,
|
||||
availability: Availability): Future[?!void] {.async.} =
|
||||
availability: Availability): Future[?!void] {.asyncyeah.} =
|
||||
|
||||
trace "updating availability", id = availability.id, size = availability.size,
|
||||
used = availability.used
|
||||
|
@ -168,7 +169,7 @@ proc update(
|
|||
|
||||
proc delete(
|
||||
self: Reservations,
|
||||
id: AvailabilityId): Future[?!void] {.async.} =
|
||||
id: AvailabilityId): Future[?!void] {.asyncyeah.} =
|
||||
|
||||
trace "deleting availability", id
|
||||
|
||||
|
@ -185,7 +186,7 @@ proc delete(
|
|||
|
||||
proc reserve*(
|
||||
self: Reservations,
|
||||
availability: Availability): Future[?!void] {.async.} =
|
||||
availability: Availability): Future[?!void] {.asyncyeah.} =
|
||||
|
||||
if exists =? (await self.exists(availability.id)) and exists:
|
||||
let err = newException(AvailabilityAlreadyExistsError,
|
||||
|
@ -215,7 +216,7 @@ proc reserve*(
|
|||
proc release*(
|
||||
self: Reservations,
|
||||
id: AvailabilityId,
|
||||
bytes: uint): Future[?!void] {.async.} =
|
||||
bytes: uint): Future[?!void] {.asyncyeah.} =
|
||||
|
||||
trace "releasing bytes and updating availability", bytes, id
|
||||
|
||||
|
@ -254,7 +255,7 @@ proc release*(
|
|||
|
||||
proc markUsed*(
|
||||
self: Reservations,
|
||||
id: AvailabilityId): Future[?!void] {.async.} =
|
||||
id: AvailabilityId): Future[?!void] {.asyncyeah.} =
|
||||
|
||||
without var availability =? (await self.get(id)), err:
|
||||
return failure(err)
|
||||
|
@ -267,7 +268,7 @@ proc markUsed*(
|
|||
|
||||
proc markUnused*(
|
||||
self: Reservations,
|
||||
id: AvailabilityId): Future[?!void] {.async.} =
|
||||
id: AvailabilityId): Future[?!void] {.asyncyeah.} =
|
||||
|
||||
without var availability =? (await self.get(id)), err:
|
||||
return failure(err)
|
||||
|
@ -283,7 +284,7 @@ iterator items*(self: AvailabilityIter): Future[?Availability] =
|
|||
yield self.next()
|
||||
|
||||
proc availabilities*(
|
||||
self: Reservations): Future[?!AvailabilityIter] {.async.} =
|
||||
self: Reservations): Future[?!AvailabilityIter] {.asyncyeah.} =
|
||||
|
||||
var iter = AvailabilityIter()
|
||||
let query = Query.init(ReservationsKey)
|
||||
|
@ -291,7 +292,7 @@ proc availabilities*(
|
|||
without results =? await self.repo.metaDs.query(query), err:
|
||||
return failure(err)
|
||||
|
||||
proc next(): Future[?Availability] {.async.} =
|
||||
proc next(): Future[?Availability] {.asyncyeah.} =
|
||||
await idleAsync()
|
||||
iter.finished = results.finished
|
||||
if not results.finished and
|
||||
|
@ -306,7 +307,7 @@ proc availabilities*(
|
|||
iter.next = next
|
||||
return success iter
|
||||
|
||||
proc unused*(r: Reservations): Future[?!seq[Availability]] {.async.} =
|
||||
proc unused*(r: Reservations): Future[?!seq[Availability]] {.asyncyeah.} =
|
||||
var ret: seq[Availability] = @[]
|
||||
|
||||
without availabilities =? (await r.availabilities), err:
|
||||
|
@ -321,7 +322,7 @@ proc unused*(r: Reservations): Future[?!seq[Availability]] {.async.} =
|
|||
proc find*(
|
||||
self: Reservations,
|
||||
size, duration, minPrice: UInt256, collateral: UInt256,
|
||||
used: bool): Future[?Availability] {.async.} =
|
||||
used: bool): Future[?Availability] {.asyncyeah.} =
|
||||
|
||||
|
||||
without availabilities =? (await self.availabilities), err:
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/stint
|
||||
import ../contracts/requests
|
||||
|
@ -33,18 +34,18 @@ proc newSalesAgent*(context: SalesContext,
|
|||
slotIndex: slotIndex,
|
||||
request: request))
|
||||
|
||||
proc retrieveRequest*(agent: SalesAgent) {.async.} =
|
||||
proc retrieveRequest*(agent: SalesAgent) {.asyncyeah.} =
|
||||
let data = agent.data
|
||||
let market = agent.context.market
|
||||
if data.request.isNone:
|
||||
data.request = await market.getRequest(data.requestId)
|
||||
|
||||
proc subscribeCancellation(agent: SalesAgent) {.async.} =
|
||||
proc subscribeCancellation(agent: SalesAgent) {.asyncyeah.} =
|
||||
let data = agent.data
|
||||
let market = agent.context.market
|
||||
let clock = agent.context.clock
|
||||
|
||||
proc onCancelled() {.async.} =
|
||||
proc onCancelled() {.asyncyeah.} =
|
||||
without request =? data.request:
|
||||
return
|
||||
|
||||
|
@ -61,7 +62,7 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} =
|
|||
data.fulfilled =
|
||||
await market.subscribeFulfillment(data.requestId, onFulfilled)
|
||||
|
||||
proc subscribeFailure(agent: SalesAgent) {.async.} =
|
||||
proc subscribeFailure(agent: SalesAgent) {.asyncyeah.} =
|
||||
let data = agent.data
|
||||
let market = agent.context.market
|
||||
|
||||
|
@ -74,7 +75,7 @@ proc subscribeFailure(agent: SalesAgent) {.async.} =
|
|||
data.failed =
|
||||
await market.subscribeRequestFailed(data.requestId, onFailed)
|
||||
|
||||
proc subscribeSlotFilled(agent: SalesAgent) {.async.} =
|
||||
proc subscribeSlotFilled(agent: SalesAgent) {.asyncyeah.} =
|
||||
let data = agent.data
|
||||
let market = agent.context.market
|
||||
|
||||
|
@ -87,7 +88,7 @@ proc subscribeSlotFilled(agent: SalesAgent) {.async.} =
|
|||
data.slotIndex,
|
||||
onSlotFilled)
|
||||
|
||||
proc subscribe*(agent: SalesAgent) {.async.} =
|
||||
proc subscribe*(agent: SalesAgent) {.asyncyeah.} =
|
||||
if agent.subscribed:
|
||||
return
|
||||
|
||||
|
@ -96,7 +97,7 @@ proc subscribe*(agent: SalesAgent) {.async.} =
|
|||
await agent.subscribeSlotFilled()
|
||||
agent.subscribed = true
|
||||
|
||||
proc unsubscribe*(agent: SalesAgent) {.async.} =
|
||||
proc unsubscribe*(agent: SalesAgent) {.asyncyeah.} =
|
||||
if not agent.subscribed:
|
||||
return
|
||||
|
||||
|
@ -125,6 +126,6 @@ proc unsubscribe*(agent: SalesAgent) {.async.} =
|
|||
|
||||
agent.subscribed = false
|
||||
|
||||
proc stop*(agent: SalesAgent) {.async.} =
|
||||
proc stop*(agent: SalesAgent) {.asyncyeah.} =
|
||||
procCall Machine(agent).stop()
|
||||
await agent.unsubscribe()
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import ../contracts/requests
|
||||
import ../market
|
||||
import ./reservations
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import ../statemachine
|
||||
import ./errorhandling
|
||||
import ./errored
|
||||
import ../../asyncyeah
|
||||
|
||||
type
|
||||
SaleCancelled* = ref object of ErrorHandlingState
|
||||
|
@ -9,6 +10,6 @@ type
|
|||
|
||||
method `$`*(state: SaleCancelled): string = "SaleCancelled"
|
||||
|
||||
method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: SaleCancelled, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let error = newException(SaleTimeoutError, "Sale cancelled due to timeout")
|
||||
return some State(SaleErrored(error: error))
|
||||
|
|
|
@ -12,6 +12,7 @@ import ./filled
|
|||
import ./ignored
|
||||
import ./proving
|
||||
import ./errored
|
||||
import ../../asyncyeah
|
||||
|
||||
type
|
||||
SaleDownloading* = ref object of ErrorHandlingState
|
||||
|
@ -31,7 +32,7 @@ method onSlotFilled*(state: SaleDownloading, requestId: RequestId,
|
|||
slotIndex: UInt256): ?State =
|
||||
return some State(SaleFilled())
|
||||
|
||||
method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: SaleDownloading, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let agent = SalesAgent(machine)
|
||||
let data = agent.data
|
||||
let context = agent.context
|
||||
|
@ -66,7 +67,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
|
|||
if markUsedErr =? (await reservations.markUsed(availability.id)).errorOption:
|
||||
return some State(SaleErrored(error: markUsedErr))
|
||||
|
||||
proc onBatch(blocks: seq[bt.Block]) {.async.} =
|
||||
proc onBatch(blocks: seq[bt.Block]) {.asyncyeah.} =
|
||||
# release batches of blocks as they are written to disk and
|
||||
# update availability size
|
||||
var bytes: uint = 0
|
||||
|
|
|
@ -4,6 +4,7 @@ import pkg/upraises
|
|||
import pkg/chronicles
|
||||
import ../statemachine
|
||||
import ../salesagent
|
||||
import ../../asyncyeah
|
||||
|
||||
logScope:
|
||||
topics = "marketplace sales errored"
|
||||
|
@ -16,7 +17,7 @@ method `$`*(state: SaleErrored): string = "SaleErrored"
|
|||
method onError*(state: SaleState, err: ref CatchableError): ?State {.upraises:[].} =
|
||||
error "error during SaleErrored run", error = err.msg
|
||||
|
||||
method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: SaleErrored, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let agent = SalesAgent(machine)
|
||||
let data = agent.data
|
||||
let context = agent.context
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import pkg/questionable
|
||||
import ../statemachine
|
||||
import ./errored
|
||||
import ../../asyncyeah
|
||||
|
||||
type
|
||||
ErrorHandlingState* = ref object of SaleState
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import ../statemachine
|
||||
import ./errorhandling
|
||||
import ./errored
|
||||
import ../../asyncyeah
|
||||
|
||||
type
|
||||
SaleFailed* = ref object of ErrorHandlingState
|
||||
|
@ -8,6 +9,6 @@ type
|
|||
|
||||
method `$`*(state: SaleFailed): string = "SaleFailed"
|
||||
|
||||
method run*(state: SaleFailed, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: SaleFailed, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let error = newException(SaleFailedError, "Sale failed")
|
||||
return some State(SaleErrored(error: error))
|
||||
|
|
|
@ -6,6 +6,7 @@ import ./errored
|
|||
import ./finished
|
||||
import ./cancelled
|
||||
import ./failed
|
||||
import ../../asyncyeah
|
||||
|
||||
type
|
||||
SaleFilled* = ref object of ErrorHandlingState
|
||||
|
@ -19,7 +20,7 @@ method onFailed*(state: SaleFilled, request: StorageRequest): ?State =
|
|||
|
||||
method `$`*(state: SaleFilled): string = "SaleFilled"
|
||||
|
||||
method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: SaleFilled, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let data = SalesAgent(machine).data
|
||||
let market = SalesAgent(machine).context.market
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import ./errorhandling
|
|||
import ./filled
|
||||
import ./cancelled
|
||||
import ./failed
|
||||
import ../../asyncyeah
|
||||
|
||||
logScope:
|
||||
topics = "marketplace sales filling"
|
||||
|
@ -26,7 +27,7 @@ method onSlotFilled*(state: SaleFilling, requestId: RequestId,
|
|||
slotIndex: UInt256): ?State =
|
||||
return some State(SaleFilled())
|
||||
|
||||
method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} =
|
||||
method run(state: SaleFilling, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let data = SalesAgent(machine).data
|
||||
let market = SalesAgent(machine).context.market
|
||||
without (collateral =? data.request.?ask.?collateral):
|
||||
|
|
|
@ -5,6 +5,7 @@ import ../salesagent
|
|||
import ./errorhandling
|
||||
import ./cancelled
|
||||
import ./failed
|
||||
import ../../asyncyeah
|
||||
|
||||
logScope:
|
||||
topics = "marketplace sales finished"
|
||||
|
@ -20,7 +21,7 @@ method onCancelled*(state: SaleFinished, request: StorageRequest): ?State =
|
|||
method onFailed*(state: SaleFinished, request: StorageRequest): ?State =
|
||||
return some State(SaleFailed())
|
||||
|
||||
method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: SaleFinished, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let agent = SalesAgent(machine)
|
||||
let data = agent.data
|
||||
let context = agent.context
|
||||
|
|
|
@ -2,13 +2,14 @@ import pkg/chronos
|
|||
import ../statemachine
|
||||
import ../salesagent
|
||||
import ./errorhandling
|
||||
import ../../asyncyeah
|
||||
|
||||
type
|
||||
SaleIgnored* = ref object of ErrorHandlingState
|
||||
|
||||
method `$`*(state: SaleIgnored): string = "SaleIgnored"
|
||||
|
||||
method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: SaleIgnored, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let agent = SalesAgent(machine)
|
||||
let context = agent.context
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import ./filling
|
|||
import ./cancelled
|
||||
import ./failed
|
||||
import ./filled
|
||||
import ../../asyncyeah
|
||||
|
||||
logScope:
|
||||
topics = "marketplace sales proving"
|
||||
|
@ -25,7 +26,7 @@ method onSlotFilled*(state: SaleProving, requestId: RequestId,
|
|||
slotIndex: UInt256): ?State =
|
||||
return some State(SaleFilled())
|
||||
|
||||
method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: SaleProving, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let data = SalesAgent(machine).data
|
||||
let context = SalesAgent(machine).context
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import ./finished
|
|||
import ./failed
|
||||
import ./errored
|
||||
import ./cancelled
|
||||
import ../../asyncyeah
|
||||
|
||||
type
|
||||
SaleUnknown* = ref object of SaleState
|
||||
|
@ -19,7 +20,7 @@ method onCancelled*(state: SaleUnknown, request: StorageRequest): ?State =
|
|||
method onFailed*(state: SaleUnknown, request: StorageRequest): ?State =
|
||||
return some State(SaleFailed())
|
||||
|
||||
method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} =
|
||||
method run*(state: SaleUnknown, machine: Machine): Future[?State] {.asyncyeah.} =
|
||||
let agent = SalesAgent(machine)
|
||||
let data = agent.data
|
||||
let market = agent.context.market
|
||||
|
|
|
@ -80,6 +80,7 @@
|
|||
import std/endians
|
||||
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/blscurve
|
||||
import pkg/blscurve/blst/blst_abi
|
||||
|
||||
|
@ -152,7 +153,7 @@ proc getSector(
|
|||
stream: SeekableStream,
|
||||
blockId: int64,
|
||||
sectorId: int64,
|
||||
spb: int64): Future[ZChar] {.async.} =
|
||||
spb: int64): Future[ZChar] {.asyncyeah.} =
|
||||
## Read file sector at given <blockid, sectorid> postion
|
||||
##
|
||||
|
||||
|
@ -268,7 +269,7 @@ proc generateAuthenticatorOpt(
|
|||
i: int64,
|
||||
s: int64,
|
||||
t: TauZero,
|
||||
ubase: seq[blst_scalar]): Future[blst_p1] {.async.} =
|
||||
ubase: seq[blst_scalar]): Future[blst_p1] {.asyncyeah.} =
|
||||
## Optimized implementation of authenticator generation
|
||||
## This implementation is reduces the number of scalar multiplications
|
||||
## from s+1 to 1+1 , using knowledge about the scalars (r_j)
|
||||
|
@ -325,7 +326,7 @@ proc generateProof*(
|
|||
q: seq[QElement],
|
||||
authenticators: seq[blst_p1],
|
||||
s: int64
|
||||
): Future[Proof] {.async.} =
|
||||
): Future[Proof] {.asyncyeah.} =
|
||||
## Generata BLS proofs for a given query
|
||||
##
|
||||
|
||||
|
@ -438,7 +439,7 @@ proc init*(
|
|||
ssk: SecretKey,
|
||||
spk: PublicKey,
|
||||
blockSize: int64
|
||||
): Future[PoR] {.async.} =
|
||||
): Future[PoR] {.asyncyeah.} =
|
||||
## Set up the POR scheme by generating tags and metadata
|
||||
##
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
## those terms.
|
||||
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
@ -36,7 +37,7 @@ proc upload*(
|
|||
cid: Cid,
|
||||
indexes: seq[int],
|
||||
host: ca.Address
|
||||
): Future[?!void] {.async.} =
|
||||
): Future[?!void] {.asyncyeah.} =
|
||||
## Upload authenticators
|
||||
##
|
||||
|
||||
|
@ -59,7 +60,7 @@ proc upload*(
|
|||
proc setupProofs*(
|
||||
self: StorageProofs,
|
||||
manifest: Manifest
|
||||
): Future[?!void] {.async.} =
|
||||
): Future[?!void] {.asyncyeah.} =
|
||||
## Setup storage authentication
|
||||
##
|
||||
|
||||
|
@ -89,7 +90,7 @@ proc init*(
|
|||
stpStore: stpStore,
|
||||
network: network)
|
||||
|
||||
proc tagsHandler(msg: TagsMessage) {.async, gcsafe.} =
|
||||
proc tagsHandler(msg: TagsMessage) {.asyncyeah, gcsafe.} =
|
||||
try:
|
||||
await self.stpStore.store(msg.cid, msg.tags).tryGet()
|
||||
trace "Stored tags", cid = $msg.cid, tags = msg.tags.len
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
import std/sequtils
|
||||
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/libp2p
|
||||
import pkg/chronicles
|
||||
import pkg/questionable/results
|
||||
|
@ -41,7 +42,7 @@ proc uploadTags*(
|
|||
indexes: seq[int],
|
||||
tags: seq[seq[byte]],
|
||||
host: ca.Address
|
||||
): Future[?!void] {.async.} =
|
||||
): Future[?!void] {.asyncyeah.} =
|
||||
# Upload tags to `host`
|
||||
#
|
||||
|
||||
|
@ -74,7 +75,7 @@ method init*(self: StpNetwork) =
|
|||
## Perform protocol initialization
|
||||
##
|
||||
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
||||
proc handle(conn: Connection, proto: string) {.asyncyeah, gcsafe.} =
|
||||
try:
|
||||
let
|
||||
msg = await conn.readLp(MaxMessageSize)
|
||||
|
|
|
@ -12,6 +12,7 @@ import std/strformat
|
|||
|
||||
import pkg/libp2p
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/stew/io2
|
||||
import pkg/questionable
|
||||
|
@ -34,7 +35,7 @@ template stpPath*(self: StpStore, cid: Cid): string =
|
|||
proc retrieve*(
|
||||
self: StpStore,
|
||||
cid: Cid
|
||||
): Future[?!PorMessage] {.async.} =
|
||||
): Future[?!PorMessage] {.asyncyeah.} =
|
||||
## Retrieve authenticators from data store
|
||||
##
|
||||
|
||||
|
@ -53,7 +54,7 @@ proc store*(
|
|||
self: StpStore,
|
||||
por: PorMessage,
|
||||
cid: Cid
|
||||
): Future[?!void] {.async.} =
|
||||
): Future[?!void] {.asyncyeah.} =
|
||||
## Persist storage proofs
|
||||
##
|
||||
|
||||
|
@ -79,7 +80,7 @@ proc retrieve*(
|
|||
self: StpStore,
|
||||
cid: Cid,
|
||||
blocks: seq[int]
|
||||
): Future[?!seq[Tag]] {.async.} =
|
||||
): Future[?!seq[Tag]] {.asyncyeah.} =
|
||||
var tags: seq[Tag]
|
||||
for b in blocks:
|
||||
var tag = Tag(idx: b)
|
||||
|
@ -98,7 +99,7 @@ proc store*(
|
|||
self: StpStore,
|
||||
tags: seq[Tag],
|
||||
cid: Cid
|
||||
): Future[?!void] {.async.} =
|
||||
): Future[?!void] {.asyncyeah.} =
|
||||
let
|
||||
dir = self.stpPath(cid)
|
||||
|
||||
|
@ -124,7 +125,7 @@ proc init*(
|
|||
postfixLen: int = 2
|
||||
): StpStore =
|
||||
## Init StpStore
|
||||
##
|
||||
##
|
||||
StpStore(
|
||||
authDir: authDir,
|
||||
postfixLen: postfixLen)
|
||||
|
|
|
@ -12,6 +12,7 @@ import pkg/upraises
|
|||
push: {.upraises: [].}
|
||||
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/libp2p
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
@ -81,7 +82,7 @@ method close*(self: BlockStore): Future[void] {.base.} =
|
|||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
proc contains*(self: BlockStore, blk: Cid): Future[bool] {.async.} =
|
||||
proc contains*(self: BlockStore, blk: Cid): Future[bool] {.asyncyeah.} =
|
||||
## Check if the block exists in the blockstore.
|
||||
## Return false if error encountered
|
||||
##
|
||||
|
|
|
@ -15,6 +15,7 @@ import std/options
|
|||
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/libp2p
|
||||
import pkg/lrucache
|
||||
import pkg/questionable
|
||||
|
@ -42,7 +43,7 @@ type
|
|||
const
|
||||
DefaultCacheSize*: NBytes = 5.MiBs
|
||||
|
||||
method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
|
||||
method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.asyncyeah.} =
|
||||
## Get a block from the stores
|
||||
##
|
||||
|
||||
|
@ -61,7 +62,7 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
|
|||
trace "Error requesting block from cache", cid, error = exc.msg
|
||||
return failure exc
|
||||
|
||||
method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
|
||||
method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.asyncyeah.} =
|
||||
## Check if the block exists in the blockstore
|
||||
##
|
||||
|
||||
|
@ -80,7 +81,7 @@ func cids(self: CacheStore): (iterator: Cid {.gcsafe.}) =
|
|||
method listBlocks*(
|
||||
self: CacheStore,
|
||||
blockType = BlockType.Manifest
|
||||
): Future[?!BlocksIter] {.async.} =
|
||||
): Future[?!BlocksIter] {.asyncyeah.} =
|
||||
## Get the list of blocks in the BlockStore. This is an intensive operation
|
||||
##
|
||||
|
||||
|
@ -90,7 +91,7 @@ method listBlocks*(
|
|||
let
|
||||
cids = self.cids()
|
||||
|
||||
proc next(): Future[?Cid] {.async.} =
|
||||
proc next(): Future[?Cid] {.asyncyeah.} =
|
||||
await idleAsync()
|
||||
|
||||
var cid: Cid
|
||||
|
@ -155,7 +156,7 @@ func putBlockSync(self: CacheStore, blk: Block): bool =
|
|||
method putBlock*(
|
||||
self: CacheStore,
|
||||
blk: Block,
|
||||
ttl = Duration.none): Future[?!void] {.async.} =
|
||||
ttl = Duration.none): Future[?!void] {.asyncyeah.} =
|
||||
## Put a block to the blockstore
|
||||
##
|
||||
|
||||
|
@ -167,7 +168,7 @@ method putBlock*(
|
|||
discard self.putBlockSync(blk)
|
||||
return success()
|
||||
|
||||
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
|
||||
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.asyncyeah.} =
|
||||
## Delete a block from the blockstore
|
||||
##
|
||||
|
||||
|
@ -182,7 +183,7 @@ method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
|
|||
|
||||
return success()
|
||||
|
||||
method close*(self: CacheStore): Future[void] {.async.} =
|
||||
method close*(self: CacheStore): Future[void] {.asyncyeah.} =
|
||||
## Close the blockstore, a no-op for this implementation
|
||||
##
|
||||
|
||||
|
@ -195,9 +196,9 @@ proc new*(
|
|||
chunkSize: NBytes = DefaultChunkSize
|
||||
): CacheStore {.raises: [Defect, ValueError].} =
|
||||
## Create a new CacheStore instance
|
||||
##
|
||||
##
|
||||
## `cacheSize` and `chunkSize` are both in bytes
|
||||
##
|
||||
##
|
||||
|
||||
if cacheSize < chunkSize:
|
||||
raise newException(ValueError, "cacheSize cannot be less than chunkSize")
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
## Looks for and removes expired blocks from blockstores.
|
||||
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
@ -54,7 +55,7 @@ proc new*(
|
|||
clock: clock,
|
||||
offset: 0)
|
||||
|
||||
proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.} =
|
||||
proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.asyncyeah.} =
|
||||
if isErr (await self.repoStore.delBlock(cid)):
|
||||
trace "Unable to delete block from repoStore"
|
||||
|
||||
|
@ -86,7 +87,7 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.asyncyeah.} =
|
|||
self.offset = 0
|
||||
|
||||
proc start*(self: BlockMaintainer) =
|
||||
proc onTimer(): Future[void] {.async.} =
|
||||
proc onTimer(): Future[void] {.asyncyeah.} =
|
||||
try:
|
||||
await self.runBlockCheck()
|
||||
except CatchableError as exc:
|
||||
|
@ -94,5 +95,5 @@ proc start*(self: BlockMaintainer) =
|
|||
|
||||
self.timer.start(onTimer, self.interval)
|
||||
|
||||
proc stop*(self: BlockMaintainer): Future[void] {.async.} =
|
||||
proc stop*(self: BlockMaintainer): Future[void] {.asyncyeah.} =
|
||||
await self.timer.stop()
|
||||
|
|
|
@ -13,6 +13,7 @@ push: {.upraises: [].}
|
|||
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/libp2p
|
||||
|
||||
import ../blocktype as bt
|
||||
|
@ -31,7 +32,7 @@ type
|
|||
engine*: BlockExcEngine # blockexc decision engine
|
||||
localStore*: BlockStore # local block store
|
||||
|
||||
method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.async.} =
|
||||
method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.asyncyeah.} =
|
||||
## Get a block from a remote peer
|
||||
##
|
||||
|
||||
|
@ -50,7 +51,7 @@ method putBlock*(
|
|||
self: NetworkStore,
|
||||
blk: bt.Block,
|
||||
ttl = Duration.none
|
||||
): Future[?!void] {.async.} =
|
||||
): Future[?!void] {.asyncyeah.} =
|
||||
## Store block locally and notify the network
|
||||
##
|
||||
|
||||
|
@ -72,14 +73,14 @@ method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
|
|||
|
||||
{.pop.}
|
||||
|
||||
method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} =
|
||||
method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.asyncyeah.} =
|
||||
## Check if the block exists in the blockstore
|
||||
##
|
||||
|
||||
trace "Checking network store for block existence", cid
|
||||
return await self.localStore.hasBlock(cid)
|
||||
|
||||
method close*(self: NetworkStore): Future[void] {.async.} =
|
||||
method close*(self: NetworkStore): Future[void] {.asyncyeah.} =
|
||||
## Close the underlying local blockstore
|
||||
##
|
||||
|
||||
|
@ -91,8 +92,8 @@ proc new*(
|
|||
engine: BlockExcEngine,
|
||||
localStore: BlockStore
|
||||
): NetworkStore =
|
||||
## Create new instance of a NetworkStore
|
||||
##
|
||||
## Create new instance of a NetworkStore
|
||||
##
|
||||
NetworkStore(
|
||||
localStore: localStore,
|
||||
engine: engine)
|
||||
|
|
|
@ -12,6 +12,7 @@ import pkg/upraises
|
|||
push: {.upraises: [].}
|
||||
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p
|
||||
import pkg/questionable
|
||||
|
@ -70,7 +71,7 @@ func available*(self: RepoStore): uint =
|
|||
func available*(self: RepoStore, bytes: uint): bool =
|
||||
return bytes < self.available()
|
||||
|
||||
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
|
||||
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.asyncyeah.} =
|
||||
## Get a block from the blockstore
|
||||
##
|
||||
|
||||
|
@ -109,7 +110,7 @@ method putBlock*(
|
|||
self: RepoStore,
|
||||
blk: Block,
|
||||
ttl = Duration.none
|
||||
): Future[?!void] {.async.} =
|
||||
): Future[?!void] {.asyncyeah.} =
|
||||
## Put a block to the blockstore
|
||||
##
|
||||
|
||||
|
@ -158,7 +159,7 @@ method putBlock*(
|
|||
self.quotaUsedBytes = used
|
||||
return success()
|
||||
|
||||
proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} =
|
||||
proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.asyncyeah.} =
|
||||
let used = self.quotaUsedBytes - blk.data.len.uint
|
||||
if err =? (await self.metaDs.put(
|
||||
QuotaUsedKey,
|
||||
|
@ -168,12 +169,12 @@ proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.}
|
|||
self.quotaUsedBytes = used
|
||||
return success()
|
||||
|
||||
proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
|
||||
proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.asyncyeah.} =
|
||||
without key =? createBlockExpirationMetadataKey(cid), err:
|
||||
return failure(err)
|
||||
return await self.metaDs.delete(key)
|
||||
|
||||
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
|
||||
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.asyncyeah.} =
|
||||
## Delete a block from the blockstore
|
||||
##
|
||||
|
||||
|
@ -197,7 +198,7 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
|
|||
|
||||
return success()
|
||||
|
||||
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
|
||||
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.asyncyeah.} =
|
||||
## Check if the block exists in the blockstore
|
||||
##
|
||||
|
||||
|
@ -210,7 +211,7 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
|
|||
method listBlocks*(
|
||||
self: RepoStore,
|
||||
blockType = BlockType.Manifest
|
||||
): Future[?!BlocksIter] {.async.} =
|
||||
): Future[?!BlocksIter] {.asyncyeah.} =
|
||||
## Get the list of blocks in the RepoStore.
|
||||
## This is an intensive operation
|
||||
##
|
||||
|
@ -228,7 +229,7 @@ method listBlocks*(
|
|||
trace "Error querying cids in repo", blockType, err = err.msg
|
||||
return failure(err)
|
||||
|
||||
proc next(): Future[?Cid] {.async.} =
|
||||
proc next(): Future[?Cid] {.asyncyeah.} =
|
||||
await idleAsync()
|
||||
iter.finished = queryIter.finished
|
||||
if not queryIter.finished:
|
||||
|
@ -249,9 +250,9 @@ method getBlockExpirations*(
|
|||
self: RepoStore,
|
||||
maxNumber: int,
|
||||
offset: int
|
||||
): Future[?!BlockExpirationIter] {.async, base.} =
|
||||
): Future[?!BlockExpirationIter] {.asyncyeah, base.} =
|
||||
## Get block experiartions from the given RepoStore
|
||||
##
|
||||
##
|
||||
without query =? createBlockExpirationQuery(maxNumber, offset), err:
|
||||
trace "Unable to format block expirations query"
|
||||
return failure(err)
|
||||
|
@ -262,7 +263,7 @@ method getBlockExpirations*(
|
|||
|
||||
var iter = BlockExpirationIter()
|
||||
|
||||
proc next(): Future[?BlockExpiration] {.async.} =
|
||||
proc next(): Future[?BlockExpiration] {.asyncyeah.} =
|
||||
if not queryIter.finished:
|
||||
if pair =? (await queryIter.next()) and blockKey =? pair.key:
|
||||
let expirationTimestamp = pair.data
|
||||
|
@ -281,14 +282,14 @@ method getBlockExpirations*(
|
|||
iter.next = next
|
||||
return success iter
|
||||
|
||||
method close*(self: RepoStore): Future[void] {.async.} =
|
||||
method close*(self: RepoStore): Future[void] {.asyncyeah.} =
|
||||
## Close the blockstore, cleaning up resources managed by it.
|
||||
## For some implementations this may be a no-op
|
||||
##
|
||||
|
||||
(await self.repoDs.close()).expect("Should close datastore")
|
||||
|
||||
proc hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
|
||||
proc hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.asyncyeah.} =
|
||||
## Check if the block exists in the blockstore.
|
||||
## Return false if error encountered
|
||||
##
|
||||
|
@ -299,7 +300,7 @@ proc hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
|
|||
|
||||
return await self.repoDs.has(key)
|
||||
|
||||
proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
|
||||
proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.asyncyeah.} =
|
||||
## Reserve bytes
|
||||
##
|
||||
|
||||
|
@ -322,7 +323,7 @@ proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
|
|||
|
||||
return success()
|
||||
|
||||
proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
|
||||
proc release*(self: RepoStore, bytes: uint): Future[?!void] {.asyncyeah.} =
|
||||
## Release bytes
|
||||
##
|
||||
|
||||
|
@ -348,7 +349,7 @@ proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
|
|||
trace "Released bytes", bytes
|
||||
return success()
|
||||
|
||||
proc start*(self: RepoStore): Future[void] {.async.} =
|
||||
proc start*(self: RepoStore): Future[void] {.asyncyeah.} =
|
||||
## Start repo
|
||||
##
|
||||
|
||||
|
@ -388,7 +389,7 @@ proc start*(self: RepoStore): Future[void] {.async.} =
|
|||
|
||||
self.started = true
|
||||
|
||||
proc stop*(self: RepoStore): Future[void] {.async.} =
|
||||
proc stop*(self: RepoStore): Future[void] {.asyncyeah.} =
|
||||
## Stop repo
|
||||
##
|
||||
if not self.started:
|
||||
|
@ -410,8 +411,8 @@ func new*(
|
|||
quotaMaxBytes = DefaultQuotaBytes,
|
||||
blockTtl = DefaultBlockTtl
|
||||
): RepoStore =
|
||||
## Create new instance of a RepoStore
|
||||
##
|
||||
## Create new instance of a RepoStore
|
||||
##
|
||||
RepoStore(
|
||||
repoDs: repoDs,
|
||||
metaDs: metaDs,
|
||||
|
|
|
@ -11,6 +11,7 @@ import pkg/upraises
|
|||
push: {.upraises: [].}
|
||||
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p
|
||||
|
||||
|
@ -37,7 +38,7 @@ proc new*(
|
|||
writer: AsyncStreamWriter = nil
|
||||
): AsyncStreamWrapper =
|
||||
## Create new instance of an asynchronous stream wrapper
|
||||
##
|
||||
##
|
||||
let
|
||||
stream = C(reader: reader, writer: writer)
|
||||
|
||||
|
@ -63,7 +64,7 @@ method readOnce*(
|
|||
self: AsyncStreamWrapper,
|
||||
pbytes: pointer,
|
||||
nbytes: int
|
||||
): Future[int] {.async.} =
|
||||
): Future[int] {.asyncyeah.} =
|
||||
|
||||
trace "Reading bytes from reader", bytes = nbytes
|
||||
if isNil(self.reader):
|
||||
|
@ -80,13 +81,13 @@ proc completeWrite(
|
|||
self: AsyncStreamWrapper,
|
||||
fut: Future[void],
|
||||
msgLen: int
|
||||
): Future[void] {.async.} =
|
||||
): Future[void] {.asyncyeah.} =
|
||||
|
||||
withExceptions:
|
||||
await fut
|
||||
|
||||
method write*(self: AsyncStreamWrapper, msg: seq[byte]): Future[void] =
|
||||
# Avoid a copy of msg being kept in the closure created by `{.async.}` as this
|
||||
# Avoid a copy of msg being kept in the closure created by `{.asyncyeah.}` as this
|
||||
# drives up memory usage
|
||||
|
||||
trace "Writing bytes to writer", bytes = msg.len
|
||||
|
@ -117,7 +118,7 @@ method closed*(self: AsyncStreamWrapper): bool =
|
|||
method atEof*(self: AsyncStreamWrapper): bool =
|
||||
self.reader.atEof()
|
||||
|
||||
method closeImpl*(self: AsyncStreamWrapper) {.async.} =
|
||||
method closeImpl*(self: AsyncStreamWrapper) {.asyncyeah.} =
|
||||
try:
|
||||
trace "Shutting down async chronos stream"
|
||||
if not self.closed():
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
|
||||
import pkg/libp2p
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/chronicles
|
||||
|
||||
export libp2p, chronos, chronicles
|
||||
|
|
|
@ -15,6 +15,7 @@ push: {.upraises: [].}
|
|||
|
||||
import pkg/libp2p
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/stew/ptrops
|
||||
|
||||
|
@ -53,7 +54,7 @@ proc new*(
|
|||
pad = true
|
||||
): StoreStream =
|
||||
## Create a new StoreStream instance for a given store and manifest
|
||||
##
|
||||
##
|
||||
result = StoreStream(
|
||||
store: store,
|
||||
manifest: manifest,
|
||||
|
@ -76,11 +77,11 @@ method readOnce*(
|
|||
self: StoreStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int
|
||||
): Future[int] {.async.} =
|
||||
): Future[int] {.asyncyeah.} =
|
||||
## Read `nbytes` from current position in the StoreStream into output buffer pointed by `pbytes`.
|
||||
## Return how many bytes were actually read before EOF was encountered.
|
||||
## Raise exception if we are already at EOF.
|
||||
##
|
||||
##
|
||||
|
||||
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.len
|
||||
if self.atEof:
|
||||
|
@ -117,7 +118,7 @@ method readOnce*(
|
|||
|
||||
return read
|
||||
|
||||
method closeImpl*(self: StoreStream) {.async.} =
|
||||
method closeImpl*(self: StoreStream) {.asyncyeah.} =
|
||||
trace "Closing StoreStream"
|
||||
self.offset = self.size # set Eof
|
||||
await procCall LPStream(self).closeImpl()
|
||||
|
|
|
@ -6,11 +6,12 @@
|
|||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
##
|
||||
##
|
||||
|
||||
import std/parseutils
|
||||
|
||||
import pkg/chronos
|
||||
import ./asyncyeah
|
||||
|
||||
import ./utils/asyncheapqueue
|
||||
import ./utils/fileutils
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
|
||||
import std/sequtils
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/stew/results
|
||||
|
||||
# Based on chronos AsyncHeapQueue and std/heapqueue
|
||||
|
@ -136,7 +137,7 @@ proc pushNoWait*[T](heap: AsyncHeapQueue[T], item: T): Result[void, AsyncHQError
|
|||
|
||||
return ok()
|
||||
|
||||
proc push*[T](heap: AsyncHeapQueue[T], item: T) {.async, gcsafe.} =
|
||||
proc push*[T](heap: AsyncHeapQueue[T], item: T) {.asyncyeah, gcsafe.} =
|
||||
## Push item into the queue, awaiting for an available slot
|
||||
## when it's full
|
||||
##
|
||||
|
@ -171,7 +172,7 @@ proc popNoWait*[T](heap: AsyncHeapQueue[T]): Result[T, AsyncHQErrors] =
|
|||
|
||||
heap.putters.wakeupNext()
|
||||
|
||||
proc pop*[T](heap: AsyncHeapQueue[T]): Future[T] {.async.} =
|
||||
proc pop*[T](heap: AsyncHeapQueue[T]): Future[T] {.asyncyeah.} =
|
||||
## Remove and return an ``item`` from the beginning of the queue ``heap``.
|
||||
## If the queue is empty, wait until an item is available.
|
||||
while heap.empty():
|
||||
|
@ -234,7 +235,7 @@ proc pushOrUpdateNoWait*[T](heap: AsyncHeapQueue[T], item: T): Result[void, Asyn
|
|||
|
||||
return heap.pushNoWait(item)
|
||||
|
||||
proc pushOrUpdate*[T](heap: AsyncHeapQueue[T], item: T) {.async.} =
|
||||
proc pushOrUpdate*[T](heap: AsyncHeapQueue[T], item: T) {.asyncyeah.} =
|
||||
## Update an item if it exists or push a new one
|
||||
## awaiting until a slot becomes available
|
||||
##
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
|
||||
proc asyncSpawn*(future: Future[void], ignore: type CatchableError) =
|
||||
proc ignoringError {.async.} =
|
||||
proc ignoringError {.asyncyeah.} =
|
||||
try:
|
||||
await future
|
||||
except ignore:
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import pkg/questionable
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/upraises
|
||||
|
||||
|
@ -42,7 +43,7 @@ proc schedule*(machine: Machine, event: Event) =
|
|||
except AsyncQueueFullError:
|
||||
raiseAssert "unlimited queue is full?!"
|
||||
|
||||
method run*(state: State, machine: Machine): Future[?State] {.base, async.} =
|
||||
method run*(state: State, machine: Machine): Future[?State] {.base, asyncyeah.} =
|
||||
discard
|
||||
|
||||
method onError*(state: State, error: ref CatchableError): ?State {.base.} =
|
||||
|
@ -52,14 +53,14 @@ proc onError(machine: Machine, error: ref CatchableError): Event =
|
|||
return proc (state: State): ?State =
|
||||
state.onError(error)
|
||||
|
||||
proc run(machine: Machine, state: State) {.async.} =
|
||||
proc run(machine: Machine, state: State) {.asyncyeah.} =
|
||||
try:
|
||||
if next =? await state.run(machine):
|
||||
machine.schedule(Event.transition(state, next))
|
||||
except CancelledError:
|
||||
discard
|
||||
|
||||
proc scheduler(machine: Machine) {.async.} =
|
||||
proc scheduler(machine: Machine) {.asyncyeah.} =
|
||||
proc onRunComplete(udata: pointer) {.gcsafe.} =
|
||||
var fut = cast[FutureBase](udata)
|
||||
if fut.failed():
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
## Used to execute a callback in a loop
|
||||
|
||||
import pkg/chronos
|
||||
import ../asyncyeah
|
||||
import pkg/chronicles
|
||||
import pkg/upraises
|
||||
|
||||
|
@ -26,7 +27,7 @@ proc new*(T: type Timer, timerName = "Unnamed Timer"): Timer =
|
|||
## Create a new Timer intance with the given name
|
||||
Timer(name: timerName)
|
||||
|
||||
proc timerLoop(timer: Timer) {.async.} =
|
||||
proc timerLoop(timer: Timer) {.asyncyeah.} =
|
||||
try:
|
||||
while true:
|
||||
await timer.callback()
|
||||
|
@ -44,7 +45,7 @@ method start*(timer: Timer, callback: TimerCallback, interval: Duration) {.base.
|
|||
timer.interval = interval
|
||||
timer.loopFuture = timerLoop(timer)
|
||||
|
||||
method stop*(timer: Timer) {.async, base.} =
|
||||
method stop*(timer: Timer) {.asyncyeah, base.} =
|
||||
if timer.loopFuture != nil:
|
||||
trace "Timer stopping: ", name=timer.name
|
||||
await timer.loopFuture.cancelAndWait()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import std/sets
|
||||
import std/sequtils
|
||||
import pkg/chronos
|
||||
import ./asyncyeah
|
||||
import pkg/chronicles
|
||||
import ./market
|
||||
import ./clock
|
||||
|
@ -37,13 +38,13 @@ proc slots*(validation: Validation): seq[SlotId] =
|
|||
proc getCurrentPeriod(validation: Validation): UInt256 =
|
||||
return validation.periodicity.periodOf(validation.clock.now().u256)
|
||||
|
||||
proc waitUntilNextPeriod(validation: Validation) {.async.} =
|
||||
proc waitUntilNextPeriod(validation: Validation) {.asyncyeah.} =
|
||||
let period = validation.getCurrentPeriod()
|
||||
let periodEnd = validation.periodicity.periodEnd(period)
|
||||
trace "Waiting until next period", currentPeriod = period
|
||||
await validation.clock.waitUntil(periodEnd.truncate(int64) + 1)
|
||||
|
||||
proc subscribeSlotFilled(validation: Validation) {.async.} =
|
||||
proc subscribeSlotFilled(validation: Validation) {.asyncyeah.} =
|
||||
proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) =
|
||||
let slotId = slotId(requestId, slotIndex)
|
||||
if slotId notin validation.slots:
|
||||
|
@ -53,7 +54,7 @@ proc subscribeSlotFilled(validation: Validation) {.async.} =
|
|||
let subscription = await validation.market.subscribeSlotFilled(onSlotFilled)
|
||||
validation.subscriptions.add(subscription)
|
||||
|
||||
proc removeSlotsThatHaveEnded(validation: Validation) {.async.} =
|
||||
proc removeSlotsThatHaveEnded(validation: Validation) {.asyncyeah.} =
|
||||
var ended: HashSet[SlotId]
|
||||
for slotId in validation.slots:
|
||||
let state = await validation.market.slotState(slotId)
|
||||
|
@ -64,7 +65,7 @@ proc removeSlotsThatHaveEnded(validation: Validation) {.async.} =
|
|||
|
||||
proc markProofAsMissing(validation: Validation,
|
||||
slotId: SlotId,
|
||||
period: Period) {.async.} =
|
||||
period: Period) {.asyncyeah.} =
|
||||
logScope:
|
||||
currentPeriod = validation.getCurrentPeriod()
|
||||
|
||||
|
@ -78,12 +79,12 @@ proc markProofAsMissing(validation: Validation,
|
|||
except CatchableError as e:
|
||||
error "Marking proof as missing failed", msg = e.msg
|
||||
|
||||
proc markProofsAsMissing(validation: Validation) {.async.} =
|
||||
proc markProofsAsMissing(validation: Validation) {.asyncyeah.} =
|
||||
for slotId in validation.slots:
|
||||
let previousPeriod = validation.getCurrentPeriod() - 1
|
||||
await validation.markProofAsMissing(slotId, previousPeriod)
|
||||
|
||||
proc run(validation: Validation) {.async.} =
|
||||
proc run(validation: Validation) {.asyncyeah.} =
|
||||
trace "Validation started"
|
||||
try:
|
||||
while true:
|
||||
|
@ -96,13 +97,13 @@ proc run(validation: Validation) {.async.} =
|
|||
except CatchableError as e:
|
||||
error "Validation failed", msg = e.msg
|
||||
|
||||
proc start*(validation: Validation) {.async.} =
|
||||
proc start*(validation: Validation) {.asyncyeah.} =
|
||||
validation.periodicity = await validation.market.periodicity()
|
||||
validation.proofTimeout = await validation.market.proofTimeout()
|
||||
await validation.subscribeSlotFilled()
|
||||
validation.running = validation.run()
|
||||
|
||||
proc stop*(validation: Validation) {.async.} =
|
||||
proc stop*(validation: Validation) {.asyncyeah.} =
|
||||
await validation.running.cancelAndWait()
|
||||
while validation.subscriptions.len > 0:
|
||||
let subscription = validation.subscriptions.pop()
|
||||
|
|
Loading…
Reference in New Issue