diff --git a/codex/market.nim b/codex/market.nim index f4cb7ede..1a3d5c91 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -12,6 +12,7 @@ type Subscription* = ref object of RootObj OnRequest* = proc(id: array[32, byte], ask: StorageAsk) {.gcsafe, upraises:[].} OnFulfillment* = proc(requestId: array[32, byte]) {.gcsafe, upraises: [].} + OnSlotFilled* = proc(requestId: array[32, byte], slotIndex: UInt256) {.gcsafe, upraises:[].} method getSigner*(market: Market): Future[Address] {.base, async.} = raiseAssert("not implemented") @@ -35,6 +36,11 @@ method fulfillRequest*(market: Market, proof: seq[byte]) {.base, async.} = raiseAssert("not implemented") +method getHost*(market: Market, + requestId: array[32, byte], + slotIndex: UInt256): Future[?Address] {.base, async.} = + raiseAssert("not implemented") + method fillSlot*(market: Market, requestId: array[32, byte], slotIndex: UInt256, @@ -52,5 +58,12 @@ method subscribeFulfillment*(market: Market, Future[Subscription] {.base, async.} = raiseAssert("not implemented") +method subscribeSlotFilled*(market: Market, + requestId: array[32, byte], + slotIndex: UInt256, + callback: OnSlotFilled): + Future[Subscription] {.base, async.} = + raiseAssert("not implemented") + method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = raiseAssert("not implemented") diff --git a/codex/sales.nim b/codex/sales.nim index acf83a75..9baef8b1 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -55,7 +55,9 @@ type OnStore = proc(cid: string, availability: Availability): Future[void] {.gcsafe, upraises: [].} OnProve = proc(cid: string): Future[seq[byte]] {.gcsafe, upraises: [].} OnClear = proc(availability: Availability, request: StorageRequest) {.gcsafe, upraises: [].} - OnSale = proc(availability: Availability, request: StorageRequest) {.gcsafe, upraises: [].} + OnSale = proc(availability: Availability, + request: StorageRequest, + slotIndex: UInt256) {.gcsafe, upraises: [].} func new*(_: type Sales, market: Market, clock: Clock): Sales = Sales( @@ -113,26 +115,31 @@ proc finish(agent: SalesAgent, success: bool) = if success: if onSale =? agent.sales.onSale and request =? agent.request: - onSale(agent.availability, request) + onSale(agent.availability, request, 0.u256) # TODO: slot index else: if onClear =? agent.sales.onClear and request =? agent.request: onClear(agent.availability, request) agent.sales.add(agent.availability) -proc onFulfill(agent: SalesAgent, requestId: array[32, byte]) {.async.} = +proc onSlotFilled(agent: SalesAgent, + requestId: array[32, byte], + slotIndex: UInt256) {.async.} = try: let market = agent.sales.market - let host = await market.getHost(requestId) + let host = await market.getHost(requestId, slotIndex) let me = await market.getSigner() agent.finish(success = (host == me.some)) except CatchableError: agent.finish(success = false) -proc subscribeFulfill(agent: SalesAgent) {.async.} = - proc onFulfill(requestId: array[32, byte]) {.gcsafe, upraises:[].} = - asyncSpawn agent.onFulfill(requestId) +proc subscribeSlotFilled(agent: SalesAgent) {.async.} = + proc onSlotFilled(requestId: array[32, byte], + slotIndex: UInt256) {.gcsafe, upraises:[].} = + asyncSpawn agent.onSlotFilled(requestId, slotIndex) let market = agent.sales.market - let subscription = await market.subscribeFulfillment(agent.requestId, onFulfill) + let subscription = await market.subscribeSlotFilled(agent.requestId, + 0.u256, + onSlotFilled) # TODO: slot index agent.subscription = some subscription proc waitForExpiry(agent: SalesAgent) {.async.} = @@ -155,7 +162,7 @@ proc start(agent: SalesAgent) {.async.} = sales.remove(availability) - await agent.subscribeFulfill() + await agent.subscribeSlotFilled() agent.request = await market.getRequest(agent.requestId) without request =? agent.request: @@ -166,7 +173,7 @@ proc start(agent: SalesAgent) {.async.} = await onStore(request.content.cid, availability) let proof = await onProve(request.content.cid) - await market.fulfillRequest(request.id, proof) + await market.fillSlot(request.id, 0.u256, proof) # TODO: slot index except CancelledError: raise except CatchableError as e: diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index b3895ec7..3fe55173 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -7,15 +7,22 @@ type MockMarket* = ref object of Market requested*: seq[StorageRequest] fulfilled*: seq[Fulfillment] + filled*: seq[Slot] signer: Address subscriptions: Subscriptions Fulfillment* = object requestId*: array[32, byte] proof*: seq[byte] host*: Address + Slot* = object + requestId*: array[32, byte] + slotIndex*: UInt256 + proof*: seq[byte] + host*: Address Subscriptions = object onRequest: seq[RequestSubscription] onFulfillment: seq[FulfillmentSubscription] + onSlotFilled: seq[SlotFilledSubscription] RequestSubscription* = ref object of Subscription market: MockMarket callback: OnRequest @@ -23,6 +30,11 @@ type market: MockMarket requestId: array[32, byte] callback: OnFulfillment + SlotFilledSubscription* = ref object of Subscription + market: MockMarket + requestId: array[32, byte] + slotIndex: UInt256 + callback: OnSlotFilled proc new*(_: type MockMarket): MockMarket = MockMarket(signer: Address.example) @@ -69,6 +81,38 @@ method fulfillRequest*(market: MockMarket, proof: seq[byte]) {.async.} = market.fulfillRequest(requestid, proof, market.signer) +method getHost(market: MockMarket, + requestId: array[32, byte], + slotIndex: UInt256): Future[?Address] {.async.} = + for slot in market.filled: + if slot.requestId == requestId and slot.slotIndex == slotIndex: + return some slot.host + return none Address + +proc fillSlot*(market: MockMarket, + requestId: array[32, byte], + slotIndex: UInt256, + proof: seq[byte], + host: Address) = + let slot = Slot( + requestId: requestId, + slotIndex: slotIndex, + proof: proof, + host: host + ) + market.filled.add(slot) + var subscriptions = market.subscriptions.onSlotFilled + for subscription in subscriptions: + if subscription.requestId == requestId and + subscription.slotIndex == slotIndex: + subscription.callback(requestId, slotIndex) + +method fillSlot*(market: MockMarket, + requestId: array[32, byte], + slotIndex: UInt256, + proof: seq[byte]) {.async.} = + market.fillSlot(requestId, slotIndex, proof, market.signer) + method subscribeRequests*(market: MockMarket, callback: OnRequest): Future[Subscription] {.async.} = @@ -91,8 +135,25 @@ method subscribeFulfillment*(market: MockMarket, market.subscriptions.onFulfillment.add(subscription) return subscription +method subscribeSlotFilled*(market: MockMarket, + requestId: array[32, byte], + slotIndex: UInt256, + callback: OnSlotFilled): + Future[Subscription] {.async.} = + let subscription = SlotFilledSubscription( + market: market, + requestId: requestId, + slotIndex: slotIndex, + callback: callback + ) + market.subscriptions.onSlotFilled.add(subscription) + return subscription + method unsubscribe*(subscription: RequestSubscription) {.async.} = subscription.market.subscriptions.onRequest.keepItIf(it != subscription) method unsubscribe*(subscription: FulfillmentSubscription) {.async.} = subscription.market.subscriptions.onFulfillment.keepItIf(it != subscription) + +method unsubscribe*(subscription: SlotFilledSubscription) {.async.} = + subscription.market.subscriptions.onSlotFilled.keepItIf(it != subscription) diff --git a/tests/codex/testsales.nim b/tests/codex/testsales.nim index b2cac7d3..c4bac488 100644 --- a/tests/codex/testsales.nim +++ b/tests/codex/testsales.nim @@ -16,7 +16,8 @@ suite "Sales": ask: StorageAsk( duration: 60.u256, size: 100.u256, - reward:42.u256 + reward:42.u256, + slots: 4 ), content: StorageContent( cid: "some cid" @@ -101,24 +102,30 @@ suite "Sales": discard await market.requestStorage(request) check provingCid == request.content.cid - test "fulfills request": + test "fills a slot": sales.add(availability) discard await market.requestStorage(request) - check market.fulfilled.len == 1 - check market.fulfilled[0].requestId == request.id - check market.fulfilled[0].proof == proof - check market.fulfilled[0].host == await market.getSigner() + check market.filled.len == 1 + check market.filled[0].requestId == request.id + check market.filled[0].slotIndex < request.ask.slots.u256 + check market.filled[0].proof == proof + check market.filled[0].host == await market.getSigner() - test "calls onSale when request is fulfilled": + test "calls onSale when slot is filled": var soldAvailability: Availability var soldRequest: StorageRequest - sales.onSale = proc(availability: Availability, request: StorageRequest) = + var soldSlotIndex: UInt256 + sales.onSale = proc(availability: Availability, + request: StorageRequest, + slotIndex: UInt256) = soldAvailability = availability soldRequest = request + soldSlotIndex = slotIndex sales.add(availability) discard await market.requestStorage(request) check soldAvailability == availability check soldRequest == request + check soldSlotIndex < request.ask.slots.u256 test "calls onClear when storage becomes available again": sales.onProve = proc(cid: string): Future[seq[byte]] {.async.} = @@ -133,13 +140,14 @@ suite "Sales": check clearedAvailability == availability check clearedRequest == request - test "makes storage available again when other host fulfills request": + test "makes storage available again when other host fills the slot": let otherHost = Address.example sales.onStore = proc(cid: string, availability: Availability) {.async.} = await sleepAsync(1.hours) sales.add(availability) discard await market.requestStorage(request) - market.fulfillRequest(request.id, proof, otherHost) + for slotIndex in 0..