From 9438aba5d2965868df77fe9e580649a7dc2ae537 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Tue, 5 Jul 2022 14:19:01 +0200 Subject: [PATCH] [WIP sales] Handle other host fulfilling request --- codex/sales.nim | 68 ++++++++++++++---------------- tests/codex/helpers/mockmarket.nim | 6 ++- tests/codex/testsales.nim | 26 +++--------- 3 files changed, 42 insertions(+), 58 deletions(-) diff --git a/codex/sales.nim b/codex/sales.nim index d47cb965..74e763c8 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -34,6 +34,7 @@ type request: ?StorageRequest offer: ?StorageOffer subscription: ?Subscription + running: ?Future[void] waiting: ?Future[void] finished: bool Retrieve = proc(cid: string): Future[void] {.gcsafe, upraises: [].} @@ -77,18 +78,6 @@ func findAvailability(sales: Sales, ask: StorageAsk): ?Availability = ask.maxPrice >= availability.minPrice: return some availability -proc createOffer(negotiation: Negotiation): StorageOffer = - let sales = negotiation.sales - StorageOffer( - requestId: negotiation.requestId, - price: negotiation.ask.maxPrice, - expiry: sales.clock.now().u256 + sales.offerExpiryInterval - ) - -proc sendOffer(negotiation: Negotiation) {.async.} = - let offer = negotiation.createOffer() - # negotiation.offer = some await negotiation.sales.market.offerStorage(offer) - proc finish(negotiation: Negotiation, success: bool) = if negotiation.finished: return @@ -98,6 +87,9 @@ proc finish(negotiation: Negotiation, success: bool) = if subscription =? negotiation.subscription: asyncSpawn subscription.unsubscribe() + if running =? negotiation.running: + running.cancel() + if waiting =? negotiation.waiting: waiting.cancel() @@ -107,20 +99,21 @@ proc finish(negotiation: Negotiation, success: bool) = else: negotiation.sales.add(negotiation.availability) -proc onSelect(negotiation: Negotiation, offerId: array[32, byte]) = - if offer =? negotiation.offer and offer.id == offerId: - negotiation.finish(success = true) - else: +proc onFulfill(negotiation: Negotiation, requestId: array[32, byte]) {.async.} = + try: + let market = negotiation.sales.market + let host = await market.getHost(requestId) + let me = await market.getSigner() + negotiation.finish(success = (host == me.some)) + except CatchableError: negotiation.finish(success = false) -proc subscribeSelect(negotiation: Negotiation) {.async.} = - without offer =? negotiation.offer: - return - proc onSelect(offerId: array[32, byte]) {.gcsafe, upraises:[].} = - negotiation.onSelect(offerId) +proc subscribeFulfill(negotiation: Negotiation) {.async.} = + proc onFulfill(requestId: array[32, byte]) {.gcsafe, upraises:[].} = + asyncSpawn negotiation.onFulfill(requestId) let market = negotiation.sales.market - # let subscription = await market.subscribeSelection(offer.requestId, onSelect) - # negotiation.subscription = some subscription + let subscription = await market.subscribeFulfillment(negotiation.requestId, onFulfill) + negotiation.subscription = some subscription proc waitForExpiry(negotiation: Negotiation) {.async.} = without offer =? negotiation.offer: @@ -129,19 +122,21 @@ proc waitForExpiry(negotiation: Negotiation) {.async.} = negotiation.finish(success = false) proc start(negotiation: Negotiation) {.async.} = - let sales = negotiation.sales - let market = sales.market - let availability = negotiation.availability - - without retrieve =? sales.retrieve: - raiseAssert "retrieve proc not set" - - without prove =? sales.prove: - raiseAssert "prove proc not set" - try: + let sales = negotiation.sales + let market = sales.market + let availability = negotiation.availability + + without retrieve =? sales.retrieve: + raiseAssert "retrieve proc not set" + + without prove =? sales.prove: + raiseAssert "prove proc not set" + sales.remove(availability) + await negotiation.subscribeFulfill() + negotiation.request = await market.getRequest(negotiation.requestId) without request =? negotiation.request: negotiation.finish(success = false) @@ -150,11 +145,10 @@ proc start(negotiation: Negotiation) {.async.} = await retrieve(request.content.cid) let proof = await prove(request.content.cid) await market.fulfillRequest(request.id, proof) - negotiation.finish(success = true) - await negotiation.sendOffer() - await negotiation.subscribeSelect() negotiation.waiting = some negotiation.waitForExpiry() + except CancelledError: + raise except CatchableError as e: error "Negotiation failed", msg = e.msg negotiation.finish(success = false) @@ -170,7 +164,7 @@ proc handleRequest(sales: Sales, requestId: array[32, byte], ask: StorageAsk) = availability: availability ) - asyncSpawn negotiation.start() + negotiation.running = some negotiation.start() proc start*(sales: Sales) {.async.} = doAssert sales.subscription.isNone, "Sales already started" diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index c257d4c3..b3895ec7 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -34,7 +34,8 @@ method requestStorage*(market: MockMarket, request: StorageRequest): Future[StorageRequest] {.async.} = market.requested.add(request) - for subscription in market.subscriptions.onRequest: + var subscriptions = market.subscriptions.onRequest + for subscription in subscriptions: subscription.callback(request.id, request.ask) return request @@ -58,7 +59,8 @@ proc fulfillRequest*(market: MockMarket, host: Address) = let fulfillment = Fulfillment(requestId: requestId, proof: proof, host: host) market.fulfilled.add(fulfillment) - for subscription in market.subscriptions.onFulfillment: + var subscriptions = market.subscriptions.onFulfillment + for subscription in subscriptions: if subscription.requestId == requestId: subscription.callback(requestId) diff --git a/tests/codex/testsales.nim b/tests/codex/testsales.nim index 09808b2d..997c7d97 100644 --- a/tests/codex/testsales.nim +++ b/tests/codex/testsales.nim @@ -1,4 +1,3 @@ -import std/times import pkg/asynctest import pkg/chronos import pkg/codex/sales @@ -114,24 +113,13 @@ suite "Sales": check soldAvailability == availability check soldRequest == request - # test "does not call onSale when a different offer is selected": - # var didSell: bool - # sales.onSale = proc(offer: StorageOffer) = - # didSell = true - # sales.add(availability) - # let request = await market.requestStorage(request) - # var otherOffer = StorageOffer(requestId: request.id, price: 1.u256) - # otherOffer = await market.offerStorage(otherOffer) - # await market.selectOffer(otherOffer.id) - # check not didSell - - # test "makes storage available again when different offer is selected": - # sales.add(availability) - # let request = await market.requestStorage(request) - # var otherOffer = StorageOffer(requestId: request.id, price: 1.u256) - # otherOffer = await market.offerStorage(otherOffer) - # await market.selectOffer(otherOffer.id) - # check sales.available.contains(availability) + test "makes storage available again when other host fulfills request": + let otherHost = Address.example + sales.retrieve = proc(_: string) {.async.} = await sleepAsync(1.hours) + sales.add(availability) + discard await market.requestStorage(request) + market.fulfillRequest(request.id, proof, otherHost) + check sales.available == @[availability] # test "makes storage available again when offer expires": # sales.add(availability)