diff --git a/codex.nimble b/codex.nimble index 1cc86e40..d57c5b91 100644 --- a/codex.nimble +++ b/codex.nimble @@ -13,7 +13,7 @@ requires "bearssl >= 0.1.4" requires "chronicles >= 0.7.2" requires "chronos >= 2.5.2" requires "confutils" -requires "ethers >= 0.2.4 & < 0.3.0" +requires "ethers >= 0.5.0 & < 0.6.0" requires "libbacktrace" requires "libp2p" requires "metrics" diff --git a/codex/contracts/clock.nim b/codex/contracts/clock.nim index d7136573..7b5187c8 100644 --- a/codex/contracts/clock.nim +++ b/codex/contracts/clock.nim @@ -22,14 +22,14 @@ proc start*(clock: OnChainClock) {.async.} = return clock.started = true - proc onBlock(blck: Block) {.async, upraises:[].} = + proc onBlock(blck: Block) {.upraises:[].} = let blockTime = initTime(blck.timestamp.truncate(int64), 0) let computerTime = getTime() clock.offset = blockTime - computerTime clock.newBlock.fire() if latestBlock =? (await clock.provider.getBlock(BlockTag.latest)): - await onBlock(latestBlock) + onBlock(latestBlock) clock.subscription = await clock.provider.subscribe(onBlock) diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index b7b7c94f..90f9f5b8 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -1,5 +1,6 @@ +import std/sequtils import std/strutils -import std/strformat +import std/sugar import pkg/chronicles import pkg/ethers import pkg/ethers/testing @@ -36,7 +37,7 @@ proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} = let tokenAddress = await market.contract.token() let token = Erc20Token.new(tokenAddress, market.signer) - await token.approve(market.contract.address(), amount) + discard await token.approve(market.contract.address(), amount) method getSigner*(market: OnChainMarket): Future[Address] {.async.} = return await market.signer.getAddress() @@ -168,11 +169,13 @@ method canProofBeMarkedAsMissing*( trace "Proof can not be marked as missing", msg = e.msg return false -method subscribeRequests(market: OnChainMarket, +method subscribeRequests*(market: OnChainMarket, callback: OnRequest): Future[MarketSubscription] {.async.} = proc onEvent(event: StorageRequested) {.upraises:[].} = - callback(event.requestId, event.ask) + callback(event.requestId, + event.ask, + event.expiry) let subscription = await market.contract.subscribe(StorageRequested, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) @@ -198,10 +201,18 @@ method subscribeSlotFreed*(market: OnChainMarket, callback: OnSlotFreed): Future[MarketSubscription] {.async.} = proc onEvent(event: SlotFreed) {.upraises:[].} = - callback(event.slotId) + callback(event.requestId, event.slotIndex) let subscription = await market.contract.subscribe(SlotFreed, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) +method subscribeFulfillment(market: OnChainMarket, + callback: OnFulfillment): + Future[MarketSubscription] {.async.} = + proc onEvent(event: RequestFulfilled) {.upraises:[].} = + callback(event.requestId) + let subscription = await market.contract.subscribe(RequestFulfilled, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + method subscribeFulfillment(market: OnChainMarket, requestId: RequestId, callback: OnFulfillment): @@ -212,6 +223,14 @@ method subscribeFulfillment(market: OnChainMarket, let subscription = await market.contract.subscribe(RequestFulfilled, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) +method subscribeRequestCancelled*(market: OnChainMarket, + callback: OnRequestCancelled): + Future[MarketSubscription] {.async.} = + proc onEvent(event: RequestCancelled) {.upraises:[].} = + callback(event.requestId) + let subscription = await market.contract.subscribe(RequestCancelled, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + method subscribeRequestCancelled*(market: OnChainMarket, requestId: RequestId, callback: OnRequestCancelled): @@ -222,6 +241,14 @@ method subscribeRequestCancelled*(market: OnChainMarket, let subscription = await market.contract.subscribe(RequestCancelled, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) +method subscribeRequestFailed*(market: OnChainMarket, + callback: OnRequestFailed): + Future[MarketSubscription] {.async.} = + proc onEvent(event: RequestFailed) {.upraises:[]} = + callback(event.requestId) + let subscription = await market.contract.subscribe(RequestFailed, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + method subscribeRequestFailed*(market: OnChainMarket, requestId: RequestId, callback: OnRequestFailed): @@ -242,3 +269,24 @@ method subscribeProofSubmission*(market: OnChainMarket, method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} = await subscription.eventSubscription.unsubscribe() + +method queryPastStorageRequests*(market: OnChainMarket, + blocksAgo: int): + Future[seq[PastStorageRequest]] {.async.} = + + let contract = market.contract + let provider = contract.provider + + let head = await provider.getBlockNumber() + let fromBlock = BlockTag.init(head - blocksAgo.abs.u256) + + let events = await contract.queryFilter(StorageRequested, + fromBlock, + BlockTag.latest) + return events.map(event => + PastStorageRequest( + requestId: event.requestId, + ask: event.ask, + expiry: event.expiry + ) + ) diff --git a/codex/contracts/marketplace.nim b/codex/contracts/marketplace.nim index 45d92335..f09e8720 100644 --- a/codex/contracts/marketplace.nim +++ b/codex/contracts/marketplace.nim @@ -18,13 +18,13 @@ type StorageRequested* = object of Event requestId*: RequestId ask*: StorageAsk + expiry*: UInt256 SlotFilled* = object of Event requestId* {.indexed.}: RequestId - slotIndex* {.indexed.}: UInt256 - slotId*: SlotId + slotIndex*: UInt256 SlotFreed* = object of Event requestId* {.indexed.}: RequestId - slotId*: SlotId + slotIndex*: UInt256 RequestFulfilled* = object of Event requestId* {.indexed.}: RequestId RequestCancelled* = object of Event diff --git a/codex/contracts/requests.nim b/codex/contracts/requests.nim index 4c6e8b10..7393f278 100644 --- a/codex/contracts/requests.nim +++ b/codex/contracts/requests.nim @@ -4,6 +4,8 @@ import pkg/nimcrypto import pkg/ethers/fields import pkg/questionable/results import pkg/stew/byteutils +import pkg/json_serialization +import pkg/upraises export contractabi @@ -203,3 +205,17 @@ func price*(request: StorageRequest): UInt256 = func size*(ask: StorageAsk): UInt256 = ask.slots.u256 * ask.slotSize + +proc writeValue*( + writer: var JsonWriter, + value: SlotId | RequestId) {.upraises:[IOError].} = + + mixin writeValue + writer.writeValue value.toArray + +proc readValue*[T: SlotId | RequestId]( + reader: var JsonReader, + value: var T) {.upraises: [SerializationError, IOError].} = + + mixin readValue + value = T reader.readValue(T.distinctBase) diff --git a/codex/market.nim b/codex/market.nim index e2a233a6..be0d06fc 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -15,13 +15,19 @@ export periods type Market* = ref object of RootObj Subscription* = ref object of RootObj - OnRequest* = proc(id: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} + OnRequest* = proc(id: RequestId, + ask: StorageAsk, + expiry: UInt256) {.gcsafe, upraises:[].} OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].} OnSlotFilled* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises:[].} - OnSlotFreed* = proc(slotId: SlotId) {.gcsafe, upraises: [].} + OnSlotFreed* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises: [].} OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises:[].} OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises:[].} OnProofSubmitted* = proc(id: SlotId, proof: seq[byte]) {.gcsafe, upraises:[].} + PastStorageRequest* = object + requestId*: RequestId + ask*: StorageAsk + expiry*: UInt256 method getSigner*(market: Market): Future[Address] {.base, async.} = raiseAssert("not implemented") @@ -112,6 +118,11 @@ method canProofBeMarkedAsMissing*(market: Market, period: Period): Future[bool] {.base, async.} = raiseAssert("not implemented") +method subscribeFulfillment*(market: Market, + callback: OnFulfillment): + Future[Subscription] {.base, async.} = + raiseAssert("not implemented") + method subscribeFulfillment*(market: Market, requestId: RequestId, callback: OnFulfillment): @@ -135,12 +146,22 @@ method subscribeSlotFreed*(market: Market, Future[Subscription] {.base, async.} = raiseAssert("not implemented") +method subscribeRequestCancelled*(market: Market, + callback: OnRequestCancelled): + Future[Subscription] {.base, async.} = + raiseAssert("not implemented") + method subscribeRequestCancelled*(market: Market, requestId: RequestId, callback: OnRequestCancelled): Future[Subscription] {.base, async.} = raiseAssert("not implemented") +method subscribeRequestFailed*(market: Market, + callback: OnRequestFailed): + Future[Subscription] {.base, async.} = + raiseAssert("not implemented") + method subscribeRequestFailed*(market: Market, requestId: RequestId, callback: OnRequestFailed): @@ -154,3 +175,8 @@ method subscribeProofSubmission*(market: Market, method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = raiseAssert("not implemented") + +method queryPastStorageRequests*(market: Market, + blocksAgo: int): + Future[seq[PastStorageRequest]] {.base, async.} = + raiseAssert("not implemented") diff --git a/codex/sales.nim b/codex/sales.nim index 8efeaf61..8a7f0cb5 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -1,20 +1,24 @@ import std/sequtils +import std/sugar +import std/tables import pkg/questionable -import pkg/upraises import pkg/stint import pkg/chronicles import pkg/datastore -import ./rng import ./market import ./clock import ./proving import ./stores import ./contracts/requests +import ./contracts/marketplace import ./sales/salescontext import ./sales/salesagent import ./sales/statemachine -import ./sales/states/downloading +import ./sales/slotqueue +import ./sales/trackedfutures +import ./sales/states/preparing import ./sales/states/unknown +import ./utils/then ## Sales holds a list of available storage that it may sell. ## @@ -43,8 +47,10 @@ logScope: type Sales* = ref object context*: SalesContext - subscription*: ?market.Subscription agents*: seq[SalesAgent] + running: bool + subscriptions: seq[market.Subscription] + trackedFutures: TrackedFutures proc `onStore=`*(sales: Sales, onStore: OnStore) = sales.context.onStore = some onStore @@ -67,37 +73,47 @@ func new*(_: type Sales, proving: Proving, repo: RepoStore): Sales = - Sales(context: SalesContext( - market: market, - clock: clock, - proving: proving, - reservations: Reservations.new(repo) - )) + let reservations = Reservations.new(repo) + Sales( + context: SalesContext( + market: market, + clock: clock, + proving: proving, + reservations: reservations, + slotQueue: SlotQueue.new(reservations) + ), + trackedFutures: TrackedFutures.new(), + subscriptions: @[] + ) -proc randomSlotIndex(numSlots: uint64): UInt256 = - let rng = Rng.instance - let slotIndex = rng.rand(numSlots - 1) - return slotIndex.u256 +proc remove(sales: Sales, agent: SalesAgent) {.async.} = + await agent.stop() + if sales.running: + sales.agents.keepItIf(it != agent) -proc handleRequest(sales: Sales, - requestId: RequestId, - ask: StorageAsk) = +proc cleanUp(sales: Sales, + agent: SalesAgent, + processing: Future[void]) {.async.} = + await sales.remove(agent) + # signal back to the slot queue to cycle a worker + if not processing.isNil and not processing.finished(): + processing.complete() - debug "handling storage requested", - slots = ask.slots, slotSize = ask.slotSize, duration = ask.duration, - reward = ask.reward, maxSlotLoss = ask.maxSlotLoss +proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = + debug "processing slot from queue", requestId = $item.requestId, + slot = item.slotIndex - # TODO: check if random slot is actually available (not already filled) - let slotIndex = randomSlotIndex(ask.slots) let agent = newSalesAgent( sales.context, - requestId, - slotIndex, + item.requestId, + item.slotIndex.u256, none StorageRequest ) - agent.context.onIgnored = proc {.gcsafe, upraises:[].} = - sales.agents.keepItIf(it != agent) - agent.start(SaleDownloading()) + + agent.context.onCleanUp = proc {.async.} = + await sales.cleanUp(agent, done) + + agent.start(SalePreparing()) sales.agents.add agent proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = @@ -120,27 +136,272 @@ proc load*(sales: Sales) {.async.} = slot.request.id, slot.slotIndex, some slot.request) + + agent.context.onCleanUp = proc {.async.} = await sales.remove(agent) + agent.start(SaleUnknown()) sales.agents.add agent -proc start*(sales: Sales) {.async.} = - doAssert sales.subscription.isNone, "Sales already started" +proc onReservationAdded(sales: Sales, availability: Availability) {.async.} = + ## Query last 256 blocks for new requests, adding them to the queue. `push` + ## checks for availability before adding to the queue. If processed, the + ## sales agent will check if the slot is free. + let context = sales.context + let market = context.market + let queue = context.slotQueue - proc onRequest(requestId: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} = - sales.handleRequest(requestId, ask) + logScope: + topics = "sales onReservationAdded callback" + + trace "reservation added, querying past storage requests to add to queue" try: - sales.subscription = some await sales.context.market.subscribeRequests(onRequest) + let events = await market.queryPastStorageRequests(256) + let requests = events.map(event => + SlotQueueItem.init(event.requestId, event.ask, event.expiry) + ) + + trace "found past storage requested events to add to queue", + events = events.len + + for slots in requests: + for slot in slots: + if err =? (await queue.push(slot)).errorOption: + # continue on error + if err of QueueNotRunningError: + warn "cannot push items to queue, queue is not running" + elif err of NoMatchingAvailabilityError: + info "slot in queue had no matching availabilities, ignoring" + elif err of SlotsOutOfRangeError: + warn "Too many slots, cannot add to queue" + elif err of SlotQueueItemExistsError: + trace "item already exists, ignoring" + discard + else: raise err + except CatchableError as e: - error "Unable to start sales", msg = e.msg + warn "Error adding request to SlotQueue", error = e.msg + discard + +proc onStorageRequested(sales: Sales, + requestId: RequestId, + ask: StorageAsk, + expiry: UInt256) = + + logScope: + topics = "sales onStorageRequested" + requestId + slots = ask.slots + expiry + + let slotQueue = sales.context.slotQueue + + trace "storage requested, adding slots to queue" + + without items =? SlotQueueItem.init(requestId, ask, expiry).catch, err: + if err of SlotsOutOfRangeError: + warn "Too many slots, cannot add to queue" + else: + warn "Failed to create slot queue items from request", error = err.msg + + for item in items: + # continue on failure + slotQueue.push(item) + .track(sales) + .catch(proc(err: ref CatchableError) = + if err of NoMatchingAvailabilityError: + info "slot in queue had no matching availabilities, ignoring" + elif err of SlotQueueItemExistsError: + error "Failed to push item to queue becaue it already exists" + elif err of QueueNotRunningError: + warn "Failed to push item to queue becaue queue is not running" + else: + warn "Error adding request to SlotQueue", error = err.msg + ) + +proc onSlotFreed(sales: Sales, + requestId: RequestId, + slotIndex: UInt256) = + + logScope: + topics = "sales onSlotFreed" + requestId + slotIndex + + trace "slot freed, adding to queue" + + proc addSlotToQueue() {.async.} = + let context = sales.context + let market = context.market + let queue = context.slotQueue + + # first attempt to populate request using existing slot metadata in queue + without var found =? queue.populateItem(requestId, + slotIndex.truncate(uint16)): + trace "no existing request metadata, getting request info from contract" + # if there's no existing slot for that request, retrieve the request + # from the contract. + without request =? await market.getRequest(requestId): + error "unknown request in contract" + return + + found = SlotQueueItem.init(request, slotIndex.truncate(uint16)) + + if err =? (await queue.push(found)).errorOption: + raise err + + addSlotToQueue() + .track(sales) + .catch(proc(err: ref CatchableError) = + if err of NoMatchingAvailabilityError: + info "slot in queue had no matching availabilities, ignoring" + elif err of SlotQueueItemExistsError: + error "Failed to push item to queue becaue it already exists" + elif err of QueueNotRunningError: + warn "Failed to push item to queue becaue queue is not running" + else: + warn "Error adding request to SlotQueue", error = err.msg + ) + +proc subscribeRequested(sales: Sales) {.async.} = + let context = sales.context + let market = context.market + + proc onStorageRequested(requestId: RequestId, + ask: StorageAsk, + expiry: UInt256) = + sales.onStorageRequested(requestId, ask, expiry) + + try: + let sub = await market.subscribeRequests(onStorageRequested) + sales.subscriptions.add(sub) + except CatchableError as e: + error "Unable to subscribe to storage request events", msg = e.msg + +proc subscribeCancellation(sales: Sales) {.async.} = + let context = sales.context + let market = context.market + let queue = context.slotQueue + + proc onCancelled(requestId: RequestId) = + trace "request cancelled, removing all request slots from queue" + queue.delete(requestId) + + try: + let sub = await market.subscribeRequestCancelled(onCancelled) + sales.subscriptions.add(sub) + except CatchableError as e: + error "Unable to subscribe to cancellation events", msg = e.msg + +proc subscribeFulfilled*(sales: Sales) {.async.} = + let context = sales.context + let market = context.market + let queue = context.slotQueue + + proc onFulfilled(requestId: RequestId) = + trace "request fulfilled, removing all request slots from queue" + queue.delete(requestId) + + for agent in sales.agents: + agent.onFulfilled(requestId) + + try: + let sub = await market.subscribeFulfillment(onFulfilled) + sales.subscriptions.add(sub) + except CatchableError as e: + error "Unable to subscribe to storage fulfilled events", msg = e.msg + +proc subscribeFailure(sales: Sales) {.async.} = + let context = sales.context + let market = context.market + let queue = context.slotQueue + + proc onFailed(requestId: RequestId) = + trace "request failed, removing all request slots from queue" + queue.delete(requestId) + + for agent in sales.agents: + agent.onFailed(requestId) + + try: + let sub = await market.subscribeRequestFailed(onFailed) + sales.subscriptions.add(sub) + except CatchableError as e: + error "Unable to subscribe to storage failure events", msg = e.msg + +proc subscribeSlotFilled(sales: Sales) {.async.} = + let context = sales.context + let market = context.market + let queue = context.slotQueue + + proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) = + trace "slot filled, removing from slot queue", requestId, slotIndex + queue.delete(requestId, slotIndex.truncate(uint16)) + + for agent in sales.agents: + agent.onSlotFilled(requestId, slotIndex) + + try: + let sub = await market.subscribeSlotFilled(onSlotFilled) + sales.subscriptions.add(sub) + except CatchableError as e: + error "Unable to subscribe to slot filled events", msg = e.msg + +proc subscribeSlotFreed(sales: Sales) {.async.} = + let context = sales.context + let market = context.market + + proc onSlotFreed(requestId: RequestId, slotIndex: UInt256) = + sales.onSlotFreed(requestId, slotIndex) + + try: + let sub = await market.subscribeSlotFreed(onSlotFreed) + sales.subscriptions.add(sub) + except CatchableError as e: + error "Unable to subscribe to slot freed events", msg = e.msg + +proc startSlotQueue(sales: Sales) {.async.} = + let slotQueue = sales.context.slotQueue + let reservations = sales.context.reservations + + slotQueue.onProcessSlot = + proc(item: SlotQueueItem, done: Future[void]) {.async.} = + sales.processSlot(item, done) + + asyncSpawn slotQueue.start() + + reservations.onReservationAdded = + proc(availability: Availability) {.async.} = + await sales.onReservationAdded(availability) + + +proc subscribe(sales: Sales) {.async.} = + await sales.subscribeRequested() + await sales.subscribeFulfilled() + await sales.subscribeFailure() + await sales.subscribeSlotFilled() + await sales.subscribeSlotFreed() + await sales.subscribeCancellation() + +proc unsubscribe(sales: Sales) {.async.} = + for sub in sales.subscriptions: + try: + await sub.unsubscribe() + except CatchableError as e: + error "Unable to unsubscribe from subscription", error = e.msg + +proc start*(sales: Sales) {.async.} = + await sales.startSlotQueue() + await sales.subscribe() proc stop*(sales: Sales) {.async.} = - if subscription =? sales.subscription: - sales.subscription = market.Subscription.none - try: - await subscription.unsubscribe() - except CatchableError as e: - warn "Unsubscribe failed", msg = e.msg + trace "stopping sales" + sales.running = false + await sales.context.slotQueue.stop() + await sales.unsubscribe() + await sales.trackedFutures.cancelTracked() for agent in sales.agents: await agent.stop() + + sales.agents = @[] diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 86aacd64..0895307c 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -42,7 +42,9 @@ type used*: bool Reservations* = ref object repo: RepoStore + onReservationAdded: ?OnReservationAdded GetNext* = proc(): Future[?Availability] {.upraises: [], gcsafe, closure.} + OnReservationAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.} AvailabilityIter* = ref object finished*: bool next*: GetNext @@ -96,18 +98,22 @@ proc toErr[E1: ref CatchableError, E2: AvailabilityError]( proc writeValue*( writer: var JsonWriter, - value: SlotId | AvailabilityId) {.upraises:[IOError].} = + value: AvailabilityId) {.upraises:[IOError].} = mixin writeValue writer.writeValue value.toArray -proc readValue*[T: SlotId | AvailabilityId]( +proc readValue*[T: AvailabilityId]( reader: var JsonReader, value: var T) {.upraises: [SerializationError, IOError].} = mixin readValue value = T reader.readValue(T.distinctBase) +proc `onReservationAdded=`*(self: Reservations, + onReservationAdded: OnReservationAdded) = + self.onReservationAdded = some onReservationAdded + func key(id: AvailabilityId): ?!Key = (ReservationsKey / id.toArray.toHex) @@ -210,6 +216,15 @@ proc reserve*( return failure(updateErr) + if onReservationAdded =? self.onReservationAdded: + try: + await onReservationAdded(availability) + except CatchableError as e: + # we don't have any insight into types of errors that `onProcessSlot` can + # throw because it is caller-defined + warn "Unknown error during 'onReservationAdded' callback", + availabilityId = availability.id, error = e.msg + return success() proc release*( @@ -320,7 +335,7 @@ proc unused*(r: Reservations): Future[?!seq[Availability]] {.async.} = proc find*( self: Reservations, - size, duration, minPrice: UInt256, collateral: UInt256, + size, duration, minPrice, collateral: UInt256, used: bool): Future[?Availability] {.async.} = diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 7a1f7876..ef7b255b 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -1,8 +1,11 @@ import pkg/chronos import pkg/chronicles +import pkg/questionable +import pkg/questionable/results import pkg/stint +import pkg/upraises import ../contracts/requests -import ../utils/asyncspawn +import ../errors import ./statemachine import ./salescontext import ./salesdata @@ -13,10 +16,13 @@ export reservations logScope: topics = "marketplace sales" -type SalesAgent* = ref object of Machine - context*: SalesContext - data*: SalesData - subscribed: bool +type + SalesAgent* = ref object of Machine + context*: SalesContext + data*: SalesData + subscribed: bool + SalesAgentError = object of CodexError + AllSlotsFilledError* = object of SalesAgentError func `==`*(a, b: SalesAgent): bool = a.data.requestId == b.data.requestId and @@ -41,7 +47,6 @@ proc retrieveRequest*(agent: SalesAgent) {.async.} = proc subscribeCancellation(agent: SalesAgent) {.async.} = let data = agent.data - let market = agent.context.market let clock = agent.context.clock proc onCancelled() {.async.} = @@ -49,51 +54,34 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} = return await clock.waitUntil(request.expiry.truncate(int64)) - if not data.fulfilled.isNil: - asyncSpawn data.fulfilled.unsubscribe(), ignore = CatchableError agent.schedule(cancelledEvent(request)) data.cancelled = onCancelled() - proc onFulfilled(_: RequestId) = - data.cancelled.cancel() +method onFulfilled*(agent: SalesAgent, requestId: RequestId) {.base, gcsafe, upraises: [].} = + if agent.data.requestId == requestId and + not agent.data.cancelled.isNil: + agent.data.cancelled.cancel() - data.fulfilled = - await market.subscribeFulfillment(data.requestId, onFulfilled) - -proc subscribeFailure(agent: SalesAgent) {.async.} = - let data = agent.data - let market = agent.context.market - - proc onFailed(_: RequestId) = - without request =? data.request: - return - asyncSpawn data.failed.unsubscribe(), ignore = CatchableError +method onFailed*(agent: SalesAgent, requestId: RequestId) {.base, gcsafe, upraises: [].} = + without request =? agent.data.request: + return + if agent.data.requestId == requestId: agent.schedule(failedEvent(request)) - data.failed = - await market.subscribeRequestFailed(data.requestId, onFailed) +method onSlotFilled*(agent: SalesAgent, + requestId: RequestId, + slotIndex: UInt256) {.base, gcsafe, upraises: [].} = -proc subscribeSlotFilled(agent: SalesAgent) {.async.} = - let data = agent.data - let market = agent.context.market - - proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) = - asyncSpawn data.slotFilled.unsubscribe(), ignore = CatchableError - agent.schedule(slotFilledEvent(requestId, data.slotIndex)) - - data.slotFilled = - await market.subscribeSlotFilled(data.requestId, - data.slotIndex, - onSlotFilled) + if agent.data.requestId == requestId and + agent.data.slotIndex == slotIndex: + agent.schedule(slotFilledEvent(requestId, slotIndex)) proc subscribe*(agent: SalesAgent) {.async.} = if agent.subscribed: return await agent.subscribeCancellation() - await agent.subscribeFailure() - await agent.subscribeSlotFilled() agent.subscribed = true proc unsubscribe*(agent: SalesAgent) {.async.} = @@ -101,25 +89,7 @@ proc unsubscribe*(agent: SalesAgent) {.async.} = return let data = agent.data - try: - if not data.fulfilled.isNil: - await data.fulfilled.unsubscribe() - data.fulfilled = nil - except CatchableError: - discard - try: - if not data.failed.isNil: - await data.failed.unsubscribe() - data.failed = nil - except CatchableError: - discard - try: - if not data.slotFilled.isNil: - await data.slotFilled.unsubscribe() - data.slotFilled = nil - except CatchableError: - discard - if not data.cancelled.isNil: + if not data.cancelled.isNil and not data.cancelled.finished: await data.cancelled.cancelAndWait() data.cancelled = nil diff --git a/codex/sales/salescontext.nim b/codex/sales/salescontext.nim index ede2b1a6..9063ba31 100644 --- a/codex/sales/salescontext.nim +++ b/codex/sales/salescontext.nim @@ -5,6 +5,7 @@ import ../node/batch import ../market import ../clock import ../proving +import ./slotqueue import ./reservations type @@ -14,9 +15,10 @@ type onStore*: ?OnStore onClear*: ?OnClear onSale*: ?OnSale - onIgnored*: OnIgnored + onCleanUp*: OnCleanUp proving*: Proving reservations*: Reservations + slotQueue*: SlotQueue OnStore* = proc(request: StorageRequest, slot: UInt256, @@ -27,4 +29,4 @@ type slotIndex: UInt256) {.gcsafe, upraises: [].} OnSale* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} - OnIgnored* = proc() {.gcsafe, upraises: [].} + OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].} diff --git a/codex/sales/salesdata.nim b/codex/sales/salesdata.nim index d8226877..0e975ac1 100644 --- a/codex/sales/salesdata.nim +++ b/codex/sales/salesdata.nim @@ -9,7 +9,4 @@ type ask*: StorageAsk request*: ?StorageRequest slotIndex*: UInt256 - failed*: market.Subscription - fulfilled*: market.Subscription - slotFilled*: market.Subscription cancelled*: Future[void] diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim new file mode 100644 index 00000000..b512360d --- /dev/null +++ b/codex/sales/slotqueue.nim @@ -0,0 +1,396 @@ +import std/sequtils +import std/sugar +import std/tables +import pkg/chronicles +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/upraises +import ./reservations +import ./trackedfutures +import ../errors +import ../rng +import ../utils +import ../contracts/requests +import ../utils/asyncheapqueue +import ../utils/then + +logScope: + topics = "marketplace slotqueue" + +type + OnProcessSlot* = + proc(item: SlotQueueItem, done: Future[void]): Future[void] {.gcsafe, upraises:[].} + + # Non-ref obj copies value when assigned, preventing accidental modification + # of values which could cause an incorrect order (eg + # ``slotQueue[1].collateral = 1`` would cause ``collateral`` to be updated, + # but the heap invariant would no longer be honoured. When non-ref, the + # compiler can ensure that statement will fail). + SlotQueueWorker = object + doneProcessing*: Future[void] + + SlotQueueItem* = object + requestId: RequestId + slotIndex: uint16 + slotSize: UInt256 + duration: UInt256 + reward: UInt256 + collateral: UInt256 + expiry: UInt256 + + # don't need to -1 to prevent overflow when adding 1 (to always allow push) + # because AsyncHeapQueue size is of type `int`, which is larger than `uint16` + SlotQueueSize = range[1'u16..uint16.high] + + SlotQueue* = ref object + maxWorkers: int + onProcessSlot: ?OnProcessSlot + queue: AsyncHeapQueue[SlotQueueItem] + reservations: Reservations + running: bool + workers: AsyncQueue[SlotQueueWorker] + trackedFutures: TrackedFutures + + SlotQueueError = object of CodexError + SlotQueueItemExistsError* = object of SlotQueueError + SlotQueueItemNotExistsError* = object of SlotQueueError + SlotsOutOfRangeError* = object of SlotQueueError + NoMatchingAvailabilityError* = object of SlotQueueError + QueueNotRunningError* = object of SlotQueueError + +# Number of concurrent workers used for processing SlotQueueItems +const DefaultMaxWorkers = 3 + +# Cap slot queue size to prevent unbounded growth and make sifting more +# efficient. Max size is not equivalent to the number of slots a host can +# service, which is limited by host availabilities and new requests circulating +# the network. Additionally, each new request/slot in the network will be +# included in the queue if it is higher priority than any of the exisiting +# items. Older slots should be unfillable over time as other hosts fill the +# slots. +const DefaultMaxSize = 64'u16 + +proc profitability(item: SlotQueueItem): UInt256 = + StorageAsk(collateral: item.collateral, + duration: item.duration, + reward: item.reward, + slotSize: item.slotSize).pricePerSlot + +proc `<`*(a, b: SlotQueueItem): bool = + # for A to have a higher priority than B (in a min queue), A must be less than + # B. + var scoreA: uint8 = 0 + var scoreB: uint8 = 0 + + proc addIf(score: var uint8, condition: bool, addition: int) = + if condition: + score += 1'u8 shl addition + + scoreA.addIf(a.profitability > b.profitability, 3) + scoreB.addIf(a.profitability < b.profitability, 3) + + scoreA.addIf(a.collateral < b.collateral, 2) + scoreB.addIf(a.collateral > b.collateral, 2) + + scoreA.addIf(a.expiry > b.expiry, 1) + scoreB.addIf(a.expiry < b.expiry, 1) + + scoreA.addIf(a.slotSize < b.slotSize, 0) + scoreB.addIf(a.slotSize > b.slotSize, 0) + + return scoreA > scoreB + +proc `==`*(a, b: SlotQueueItem): bool = + a.requestId == b.requestId and + a.slotIndex == b.slotIndex + +proc new*(_: type SlotQueue, + reservations: Reservations, + maxWorkers = DefaultMaxWorkers, + maxSize: SlotQueueSize = DefaultMaxSize): SlotQueue = + + if maxWorkers <= 0: + raise newException(ValueError, "maxWorkers must be positive") + if maxWorkers.uint16 > maxSize: + raise newException(ValueError, "maxWorkers must be less than maxSize") + + SlotQueue( + maxWorkers: maxWorkers, + # Add 1 to always allow for an extra item to be pushed onto the queue + # temporarily. After push (and sort), the bottom-most item will be deleted + queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1), + reservations: reservations, + running: false, + trackedFutures: TrackedFutures.new() + ) + # avoid instantiating `workers` in constructor to avoid side effects in + # `newAsyncQueue` procedure + +proc init*(_: type SlotQueueWorker): SlotQueueWorker = + SlotQueueWorker( + doneProcessing: newFuture[void]("slotqueue.worker.processing") + ) + +proc init*(_: type SlotQueueItem, + requestId: RequestId, + slotIndex: uint16, + ask: StorageAsk, + expiry: UInt256): SlotQueueItem = + + SlotQueueItem( + requestId: requestId, + slotIndex: slotIndex, + slotSize: ask.slotSize, + duration: ask.duration, + reward: ask.reward, + collateral: ask.collateral, + expiry: expiry + ) + +proc init*(_: type SlotQueueItem, + request: StorageRequest, + slotIndex: uint16): SlotQueueItem = + + SlotQueueItem.init(request.id, + slotIndex, + request.ask, + request.expiry) + +proc init*(_: type SlotQueueItem, + requestId: RequestId, + ask: StorageAsk, + expiry: UInt256): seq[SlotQueueItem] = + + if not ask.slots.inRange: + raise newException(SlotsOutOfRangeError, "Too many slots") + + var i = 0'u16 + proc initSlotQueueItem: SlotQueueItem = + let item = SlotQueueItem.init(requestId, i, ask, expiry) + inc i + return item + + var items = newSeqWith(ask.slots.int, initSlotQueueItem()) + Rng.instance.shuffle(items) + return items + +proc init*(_: type SlotQueueItem, + request: StorageRequest): seq[SlotQueueItem] = + + return SlotQueueItem.init(request.id, request.ask, request.expiry) + +proc inRange*(val: SomeUnsignedInt): bool = + val.uint16 in SlotQueueSize.low..SlotQueueSize.high + +proc requestId*(self: SlotQueueItem): RequestId = self.requestId +proc slotIndex*(self: SlotQueueItem): uint16 = self.slotIndex +proc slotSize*(self: SlotQueueItem): UInt256 = self.slotSize +proc duration*(self: SlotQueueItem): UInt256 = self.duration +proc reward*(self: SlotQueueItem): UInt256 = self.reward +proc collateral*(self: SlotQueueItem): UInt256 = self.collateral + +proc running*(self: SlotQueue): bool = self.running + +proc len*(self: SlotQueue): int = self.queue.len + +proc size*(self: SlotQueue): int = self.queue.size - 1 + +proc `$`*(self: SlotQueue): string = $self.queue + +proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) = + self.onProcessSlot = some onProcessSlot + +proc activeWorkers*(self: SlotQueue): int = + if not self.running: return 0 + + # active = capacity - available + self.maxWorkers - self.workers.len + +proc contains*(self: SlotQueue, item: SlotQueueItem): bool = + self.queue.contains(item) + +proc populateItem*(self: SlotQueue, + requestId: RequestId, + slotIndex: uint16): ?SlotQueueItem = + + trace "populate item, items in queue", len = self.queue.len + for item in self.queue.items: + trace "populate item search", itemRequestId = item.requestId, requestId + if item.requestId == requestId: + return some SlotQueueItem( + requestId: requestId, + slotIndex: slotIndex, + slotSize: item.slotSize, + duration: item.duration, + reward: item.reward, + collateral: item.collateral, + expiry: item.expiry + ) + return none SlotQueueItem + +proc push*(self: SlotQueue, item: SlotQueueItem): Future[?!void] {.async.} = + + trace "pushing item to queue", + requestId = item.requestId, slotIndex = item.slotIndex + + if not self.running: + let err = newException(QueueNotRunningError, "queue not running") + return failure(err) + + without availability =? await self.reservations.find(item.slotSize, + item.duration, + item.profitability, + item.collateral, + used = false): + let err = newException(NoMatchingAvailabilityError, "no availability") + return failure(err) + + if self.contains(item): + let err = newException(SlotQueueItemExistsError, "item already exists") + return failure(err) + + if err =? self.queue.pushNoWait(item).mapFailure.errorOption: + return failure(err) + + if self.queue.full(): + # delete the last item + self.queue.del(self.queue.size - 1) + + doAssert self.queue.len <= self.queue.size - 1 + return success() + +proc push*(self: SlotQueue, items: seq[SlotQueueItem]): Future[?!void] {.async.} = + for item in items: + if err =? (await self.push(item)).errorOption: + return failure(err) + + return success() + +proc findByRequest(self: SlotQueue, requestId: RequestId): seq[SlotQueueItem] = + var items: seq[SlotQueueItem] = @[] + for item in self.queue.items: + if item.requestId == requestId: + items.add item + return items + +proc delete*(self: SlotQueue, item: SlotQueueItem) = + logScope: + requestId = item.requestId + slotIndex = item.slotIndex + + trace "removing item from queue" + + if not self.running: + trace "cannot delete item from queue, queue not running" + return + + self.queue.delete(item) + +proc delete*(self: SlotQueue, requestId: RequestId, slotIndex: uint16) = + let item = SlotQueueItem(requestId: requestId, slotIndex: slotIndex) + self.delete(item) + +proc delete*(self: SlotQueue, requestId: RequestId) = + let items = self.findByRequest(requestId) + for item in items: + self.delete(item) + +proc `[]`*(self: SlotQueue, i: Natural): SlotQueueItem = + self.queue[i] + +proc addWorker(self: SlotQueue): ?!void = + if not self.running: + let err = newException(QueueNotRunningError, "queue must be running") + return failure(err) + + trace "adding new worker to worker queue" + + let worker = SlotQueueWorker.init() + try: + self.workers.addLastNoWait(worker) + except AsyncQueueFullError: + return failure("failed to add worker, worker queue full") + + return success() + +proc dispatch(self: SlotQueue, + worker: SlotQueueWorker, + item: SlotQueueItem) {.async.} = + logScope: + requestId = item.requestId + slotIndex = item.slotIndex + + if not self.running: + warn "Could not dispatch worker because queue is not running" + return + + if onProcessSlot =? self.onProcessSlot: + try: + await onProcessSlot(item, worker.doneProcessing) + await worker.doneProcessing + + if err =? self.addWorker().errorOption: + raise err # catch below + + except QueueNotRunningError as e: + info "could not re-add worker to worker queue, queue not running", + error = e.msg + except CancelledError: + # do not bubble exception up as it is called with `asyncSpawn` which would + # convert the exception into a `FutureDefect` + discard + except CatchableError as e: + # we don't have any insight into types of errors that `onProcessSlot` can + # throw because it is caller-defined + warn "Unknown error processing slot in worker", error = e.msg + +proc start*(self: SlotQueue) {.async.} = + if self.running: + return + + trace "starting slot queue" + + self.running = true + + # must be called in `start` to avoid sideeffects in `new` + self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers) + + # Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its + # task, a new worker will be pushed to the queue + for i in 0.. removeFuture()) + .catch((e: ref CatchableError) => removeFuture()) + + trace "tracking future" + self.futures[fut.id] = FutureBase(fut) + return fut + +proc track*[T, U](future: Future[T], self: U): Future[T] = + ## Convenience method that allows chaining future, eg: + ## `await someFut().track(sales)`, where `sales` has declared a + ## `trackedFutures` property. + self.trackedFutures.track(future) + +proc cancelTracked*(self: TrackedFutures) {.async.} = + self.cancelling = true + + for future in self.futures.values: + if not future.isNil and not future.finished: + trace "cancelling tracked future", id = future.id + await future.cancelAndWait() + + self.cancelling = false diff --git a/codex/utils/asyncheapqueue.nim b/codex/utils/asyncheapqueue.nim index 17ca1f78..e7d7edad 100644 --- a/codex/utils/asyncheapqueue.nim +++ b/codex/utils/asyncheapqueue.nim @@ -283,7 +283,7 @@ proc len*[T](heap: AsyncHeapQueue[T]): int {.inline.} = proc size*[T](heap: AsyncHeapQueue[T]): int {.inline.} = ## Return the maximum number of elements in ``heap``. - len(heap.maxsize) + heap.maxsize proc `[]`*[T](heap: AsyncHeapQueue[T], i: Natural) : T {.inline.} = ## Access the i-th element of ``heap`` by order from first to last. diff --git a/codex/utils/then.nim b/codex/utils/then.nim new file mode 100644 index 00000000..2bb5699e --- /dev/null +++ b/codex/utils/then.nim @@ -0,0 +1,226 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/upraises + +# Similar to JavaScript's Promise API, `.then` and `.catch` can be used to +# handle results and errors of async `Futures` within a synchronous closure. +# They can be used as an alternative to `asyncSpawn` which does not return a +# value and will raise a `FutureDefect` if there are unhandled errors +# encountered. Both `.then` and `.catch` act as callbacks that do not block the +# synchronous closure's flow. + +# `.then` is called when the `Future` is successfully completed and can be +# chained as many times as desired, calling each `.then` callback in order. When +# the `Future` returns `Result[T, ref CatchableError]` (or `?!T`), the value +# called in the `.then` callback will be unpacked from the `Result` as a +# convenience. In other words, for `Future[?!T]`, the `.then` callback will take +# a single parameter `T`. See `tests/utils/testthen.nim` for more examples. To +# allow for chaining, `.then` returns its future. If the future is already +# complete, the `.then` callback will be executed immediately. + +# `.catch` is called when the `Future` fails. In the case when the `Future` +# returns a `Result[T, ref CatchableError` (or `?!T`), `.catch` will be called +# if the `Result` contains an error. If the `Future` is already failed (or +# `Future[?!T]` contains an error), the `.catch` callback will be excuted +# immediately. + +# NOTE: Cancelled `Futures` are discarded as bubbling the `CancelledError` to +# the synchronous closure will likely cause an unintended and unhandled +# exception. + +# More info on JavaScript's Promise API can be found at: +# https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise + +runnableExamples: + proc asyncProc(): Future[int] {.async.} = + await sleepAsync(1.millis) + return 1 + + asyncProc() + .then(proc(i: int) = echo "returned ", i) + .catch(proc(e: ref CatchableError) = doAssert false, "will not be triggered") + + # outputs "returned 1" + + proc asyncProcWithError(): Future[int] {.async.} = + await sleepAsync(1.millis) + raise newException(ValueError, "some error") + + asyncProcWithError() + .then(proc(i: int) = doAssert false, "will not be triggered") + .catch(proc(e: ref CatchableError) = echo "errored: ", e.msg) + + # outputs "errored: some error" + +type + OnSuccess*[T] = proc(val: T) {.gcsafe, upraises: [].} + OnError* = proc(err: ref CatchableError) {.gcsafe, upraises: [].} + +proc ignoreError(err: ref CatchableError) = discard + +template returnOrError(future: FutureBase, onError: OnError) = + if not future.finished: + return + + if future.cancelled: + # do not bubble as closure is synchronous + return + + if future.failed: + onError(future.error) + return + + +proc then*(future: Future[void], + onError: OnError): + Future[void] = + + proc cb(udata: pointer) = + future.returnOrError(onError) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + return future + +proc then*(future: Future[void], + onSuccess: OnSuccess[void], + onError: OnError = ignoreError): + Future[void] = + + proc cb(udata: pointer) = + future.returnOrError(onError) + onSuccess() + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + return future + +proc then*[T](future: Future[T], + onSuccess: OnSuccess[T], + onError: OnError = ignoreError): + Future[T] = + + proc cb(udata: pointer) = + future.returnOrError(onError) + + without val =? future.read.catch, err: + onError(err) + return + onSuccess(val) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + return future + +proc then*[T](future: Future[?!T], + onSuccess: OnSuccess[T], + onError: OnError = ignoreError): + Future[?!T] = + + proc cb(udata: pointer) = + future.returnOrError(onError) + + try: + without val =? future.read, err: + onError(err) + return + onSuccess(val) + except CatchableError as e: + onError(e) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + return future + +proc then*(future: Future[?!void], + onError: OnError = ignoreError): + Future[?!void] = + + proc cb(udata: pointer) = + future.returnOrError(onError) + + try: + if err =? future.read.errorOption: + onError(err) + except CatchableError as e: + onError(e) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + return future + +proc then*(future: Future[?!void], + onSuccess: OnSuccess[void], + onError: OnError = ignoreError): + Future[?!void] = + + proc cb(udata: pointer) = + future.returnOrError(onError) + + try: + if err =? future.read.errorOption: + onError(err) + return + except CatchableError as e: + onError(e) + return + onSuccess() + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + return future + +proc catch*[T](future: Future[T], onError: OnError) = + + proc cb(udata: pointer) = + future.returnOrError(onError) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + +proc catch*[T](future: Future[?!T], onError: OnError) = + + proc cb(udata: pointer) = + future.returnOrError(onError) + + try: + if err =? future.read.errorOption: + onError(err) + except CatchableError as e: + onError(e) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation diff --git a/tests/codex/helpers/eventually.nim b/tests/codex/helpers/eventually.nim index bbeef3be..3d68fc62 100644 --- a/tests/codex/helpers/eventually.nim +++ b/tests/codex/helpers/eventually.nim @@ -11,3 +11,15 @@ template eventually*(condition: untyped, timeout = 5.seconds): bool = else: await sleepAsync(1.millis) await loop() + +template always*(condition: untyped, timeout = 50.millis): bool = + proc loop: Future[bool] {.async.} = + let start = Moment.now() + while true: + if not condition: + return false + if Moment.now() > (start + timeout): + return true + else: + await sleepAsync(1.millis) + await loop() diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 6616ef57..867a3ef5 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -2,6 +2,8 @@ import std/sequtils import std/tables import std/hashes import std/sets +import std/sugar +import pkg/questionable import pkg/codex/market import pkg/codex/contracts/requests import pkg/codex/contracts/config @@ -53,7 +55,7 @@ type callback: OnRequest FulfillmentSubscription* = ref object of Subscription market: MockMarket - requestId: RequestId + requestId: ?RequestId callback: OnFulfillment SlotFilledSubscription* = ref object of Subscription market: MockMarket @@ -65,11 +67,11 @@ type callback: OnSlotFreed RequestCancelledSubscription* = ref object of Subscription market: MockMarket - requestId: RequestId + requestId: ?RequestId callback: OnRequestCancelled RequestFailedSubscription* = ref object of Subscription market: MockMarket - requestId: RequestId + requestId: ?RequestId callback: OnRequestCancelled ProofSubmittedSubscription = ref object of Subscription market: MockMarket @@ -83,7 +85,7 @@ proc hash*(requestId: RequestId): Hash = proc new*(_: type MockMarket): MockMarket = ## Create a new mocked Market instance - ## + ## let config = MarketplaceConfig( collateral: CollateralConfig( repairRewardPercentage: 10, @@ -112,7 +114,9 @@ method requestStorage*(market: MockMarket, request: StorageRequest) {.async.} = market.requested.add(request) var subscriptions = market.subscriptions.onRequest for subscription in subscriptions: - subscription.callback(request.id, request.ask) + subscription.callback(request.id, + request.ask, + request.expiry) method myRequests*(market: MockMarket): Future[seq[RequestId]] {.async.} = return market.activeRequests[market.signer] @@ -173,28 +177,32 @@ proc emitSlotFilled*(market: MockMarket, if requestMatches and slotMatches: subscription.callback(requestId, slotIndex) -proc emitSlotFreed*(market: MockMarket, slotId: SlotId) = +proc emitSlotFreed*(market: MockMarket, + requestId: RequestId, + slotIndex: UInt256) = var subscriptions = market.subscriptions.onSlotFreed for subscription in subscriptions: - subscription.callback(slotId) + subscription.callback(requestId, slotIndex) -proc emitRequestCancelled*(market: MockMarket, - requestId: RequestId) = +proc emitRequestCancelled*(market: MockMarket, requestId: RequestId) = var subscriptions = market.subscriptions.onRequestCancelled for subscription in subscriptions: - if subscription.requestId == requestId: + if subscription.requestId == requestId.some or + subscription.requestId.isNone: subscription.callback(requestId) proc emitRequestFulfilled*(market: MockMarket, requestId: RequestId) = var subscriptions = market.subscriptions.onFulfillment for subscription in subscriptions: - if subscription.requestId == requestId: + if subscription.requestId == requestId.some or + subscription.requestId.isNone: subscription.callback(requestId) proc emitRequestFailed*(market: MockMarket, requestId: RequestId) = var subscriptions = market.subscriptions.onRequestFailed for subscription in subscriptions: - if subscription.requestId == requestId: + if subscription.requestId == requestId.some or + subscription.requestId.isNone: subscription.callback(requestId) proc fillSlot*(market: MockMarket, @@ -221,7 +229,12 @@ method fillSlot*(market: MockMarket, method freeSlot*(market: MockMarket, slotId: SlotId) {.async.} = market.freed.add(slotId) - market.emitSlotFreed(slotId) + for s in market.filled: + if slotId(s.requestId, s.slotIndex) == slotId: + market.emitSlotFreed(s.requestId, s.slotIndex) + break + market.slotState[slotId] = SlotState.Free + method withdrawFunds*(market: MockMarket, requestId: RequestId) {.async.} = @@ -281,13 +294,24 @@ method subscribeRequests*(market: MockMarket, market.subscriptions.onRequest.add(subscription) return subscription +method subscribeFulfillment*(market: MockMarket, + callback: OnFulfillment): + Future[Subscription] {.async.} = + let subscription = FulfillmentSubscription( + market: market, + requestId: none RequestId, + callback: callback + ) + market.subscriptions.onFulfillment.add(subscription) + return subscription + method subscribeFulfillment*(market: MockMarket, requestId: RequestId, callback: OnFulfillment): Future[Subscription] {.async.} = let subscription = FulfillmentSubscription( market: market, - requestId: requestId, + requestId: some requestId, callback: callback ) market.subscriptions.onFulfillment.add(subscription) @@ -321,25 +345,47 @@ method subscribeSlotFreed*(market: MockMarket, market.subscriptions.onSlotFreed.add(subscription) return subscription +method subscribeRequestCancelled*(market: MockMarket, + callback: OnRequestCancelled): + Future[Subscription] {.async.} = + let subscription = RequestCancelledSubscription( + market: market, + requestId: none RequestId, + callback: callback + ) + market.subscriptions.onRequestCancelled.add(subscription) + return subscription + method subscribeRequestCancelled*(market: MockMarket, requestId: RequestId, callback: OnRequestCancelled): Future[Subscription] {.async.} = let subscription = RequestCancelledSubscription( market: market, - requestId: requestId, + requestId: some requestId, callback: callback ) market.subscriptions.onRequestCancelled.add(subscription) return subscription +method subscribeRequestFailed*(market: MockMarket, + callback: OnRequestFailed): + Future[Subscription] {.async.} = + let subscription = RequestFailedSubscription( + market: market, + requestId: none RequestId, + callback: callback + ) + market.subscriptions.onRequestFailed.add(subscription) + return subscription + method subscribeRequestFailed*(market: MockMarket, requestId: RequestId, callback: OnRequestFailed): Future[Subscription] {.async.} = let subscription = RequestFailedSubscription( market: market, - requestId: requestId, + requestId: some requestId, callback: callback ) market.subscriptions.onRequestFailed.add(subscription) @@ -355,6 +401,17 @@ method subscribeProofSubmission*(mock: MockMarket, mock.subscriptions.onProofSubmitted.add(subscription) return subscription +method queryPastStorageRequests*(market: MockMarket, + blocksAgo: int): + Future[seq[PastStorageRequest]] {.async.} = + # MockMarket does not have the concept of blocks, so simply return all + # previous events + return market.requested.map(request => + PastStorageRequest(requestId: request.id, + ask: request.ask, + expiry: request.expiry) + ) + method unsubscribe*(subscription: RequestSubscription) {.async.} = subscription.market.subscriptions.onRequest.keepItIf(it != subscription) diff --git a/tests/codex/helpers/mocksalesagent.nim b/tests/codex/helpers/mocksalesagent.nim new file mode 100644 index 00000000..43b0be87 --- /dev/null +++ b/tests/codex/helpers/mocksalesagent.nim @@ -0,0 +1,16 @@ +import pkg/codex/sales/salesagent + +type + MockSalesAgent = ref object of SalesAgent + fulfilledCalled*: bool + failedCalled*: bool + slotFilledCalled*: bool + +method onFulfilled*(agent: SalesAgent, requestId: RequestId) = + fulfilledCalled = true + +method onFailed*(agent: SalesAgent, requestId: RequestId) = + failedCalled = true + +method onSlotFilled*(agent: SalesAgent, requestId: RequestId, slotIndex: UInt256) {.base.} = + slotFilledCalled = true diff --git a/tests/codex/sales/states/testpreparing.nim b/tests/codex/sales/states/testpreparing.nim new file mode 100644 index 00000000..6f5d8c7f --- /dev/null +++ b/tests/codex/sales/states/testpreparing.nim @@ -0,0 +1,29 @@ +import std/unittest +import pkg/questionable +import pkg/codex/contracts/requests +import pkg/codex/sales/states/downloading +import pkg/codex/sales/states/cancelled +import pkg/codex/sales/states/failed +import pkg/codex/sales/states/filled +import ../../examples + +suite "sales state 'preparing'": + + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + var state: SalePreparing + + setup: + state = SalePreparing.new() + + test "switches to cancelled state when request expires": + let next = state.onCancelled(request) + check !next of SaleCancelled + + test "switches to failed state when request fails": + let next = state.onFailed(request) + check !next of SaleFailed + + test "switches to filled state when slot is filled": + let next = state.onSlotFilled(request.id, slotIndex) + check !next of SaleFilled diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index a595676c..e8d890d3 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -11,6 +11,7 @@ import pkg/codex/sales import pkg/codex/sales/salesdata import pkg/codex/sales/salescontext import pkg/codex/sales/reservations +import pkg/codex/sales/slotqueue import pkg/codex/stores/repostore import pkg/codex/proving import pkg/codex/blocktype as bt @@ -32,6 +33,8 @@ asyncchecksuite "Sales": var proving: Proving var reservations: Reservations var repo: RepoStore + var queue: SlotQueue + var itemsProcessed: seq[SlotQueueItem] setup: availability = Availability.init( @@ -67,22 +70,122 @@ asyncchecksuite "Sales": slot: UInt256, onBatch: BatchProc): Future[?!void] {.async.} = return success() + queue = sales.context.slotQueue proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} = return proof await sales.start() request.expiry = (clock.now() + 42).u256 + itemsProcessed = @[] teardown: - await repo.stop() await sales.stop() + await repo.stop() proc getAvailability: ?!Availability = waitFor reservations.get(availability.id) - proc wasIgnored: Future[bool] {.async.} = - return - eventually sales.agents.len == 1 and # agent created at first - eventually sales.agents.len == 0 # then removed once ignored + proc notProcessed(itemsProcessed: seq[SlotQueueItem], + request: StorageRequest): bool = + let items = SlotQueueItem.init(request) + for i in 0.. itemB + + test "expands available all possible slot indices on init": + let request = StorageRequest.example + let items = SlotQueueItem.init(request) + check items.len.uint64 == request.ask.slots + var checked = 0 + for slotIndex in 0'u16..