[WIP sales] Handle other host fulfilling request

This commit is contained in:
Mark Spanbroek 2022-07-05 14:19:01 +02:00 committed by markspanbroek
parent 7f864570bd
commit 9438aba5d2
3 changed files with 42 additions and 58 deletions

View File

@ -34,6 +34,7 @@ type
request: ?StorageRequest
offer: ?StorageOffer
subscription: ?Subscription
running: ?Future[void]
waiting: ?Future[void]
finished: bool
Retrieve = proc(cid: string): Future[void] {.gcsafe, upraises: [].}
@ -77,18 +78,6 @@ func findAvailability(sales: Sales, ask: StorageAsk): ?Availability =
ask.maxPrice >= availability.minPrice:
return some availability
proc createOffer(negotiation: Negotiation): StorageOffer =
let sales = negotiation.sales
StorageOffer(
requestId: negotiation.requestId,
price: negotiation.ask.maxPrice,
expiry: sales.clock.now().u256 + sales.offerExpiryInterval
)
proc sendOffer(negotiation: Negotiation) {.async.} =
let offer = negotiation.createOffer()
# negotiation.offer = some await negotiation.sales.market.offerStorage(offer)
proc finish(negotiation: Negotiation, success: bool) =
if negotiation.finished:
return
@ -98,6 +87,9 @@ proc finish(negotiation: Negotiation, success: bool) =
if subscription =? negotiation.subscription:
asyncSpawn subscription.unsubscribe()
if running =? negotiation.running:
running.cancel()
if waiting =? negotiation.waiting:
waiting.cancel()
@ -107,20 +99,21 @@ proc finish(negotiation: Negotiation, success: bool) =
else:
negotiation.sales.add(negotiation.availability)
proc onSelect(negotiation: Negotiation, offerId: array[32, byte]) =
if offer =? negotiation.offer and offer.id == offerId:
negotiation.finish(success = true)
else:
proc onFulfill(negotiation: Negotiation, requestId: array[32, byte]) {.async.} =
try:
let market = negotiation.sales.market
let host = await market.getHost(requestId)
let me = await market.getSigner()
negotiation.finish(success = (host == me.some))
except CatchableError:
negotiation.finish(success = false)
proc subscribeSelect(negotiation: Negotiation) {.async.} =
without offer =? negotiation.offer:
return
proc onSelect(offerId: array[32, byte]) {.gcsafe, upraises:[].} =
negotiation.onSelect(offerId)
proc subscribeFulfill(negotiation: Negotiation) {.async.} =
proc onFulfill(requestId: array[32, byte]) {.gcsafe, upraises:[].} =
asyncSpawn negotiation.onFulfill(requestId)
let market = negotiation.sales.market
# let subscription = await market.subscribeSelection(offer.requestId, onSelect)
# negotiation.subscription = some subscription
let subscription = await market.subscribeFulfillment(negotiation.requestId, onFulfill)
negotiation.subscription = some subscription
proc waitForExpiry(negotiation: Negotiation) {.async.} =
without offer =? negotiation.offer:
@ -129,6 +122,7 @@ proc waitForExpiry(negotiation: Negotiation) {.async.} =
negotiation.finish(success = false)
proc start(negotiation: Negotiation) {.async.} =
try:
let sales = negotiation.sales
let market = sales.market
let availability = negotiation.availability
@ -139,9 +133,10 @@ proc start(negotiation: Negotiation) {.async.} =
without prove =? sales.prove:
raiseAssert "prove proc not set"
try:
sales.remove(availability)
await negotiation.subscribeFulfill()
negotiation.request = await market.getRequest(negotiation.requestId)
without request =? negotiation.request:
negotiation.finish(success = false)
@ -150,11 +145,10 @@ proc start(negotiation: Negotiation) {.async.} =
await retrieve(request.content.cid)
let proof = await prove(request.content.cid)
await market.fulfillRequest(request.id, proof)
negotiation.finish(success = true)
await negotiation.sendOffer()
await negotiation.subscribeSelect()
negotiation.waiting = some negotiation.waitForExpiry()
except CancelledError:
raise
except CatchableError as e:
error "Negotiation failed", msg = e.msg
negotiation.finish(success = false)
@ -170,7 +164,7 @@ proc handleRequest(sales: Sales, requestId: array[32, byte], ask: StorageAsk) =
availability: availability
)
asyncSpawn negotiation.start()
negotiation.running = some negotiation.start()
proc start*(sales: Sales) {.async.} =
doAssert sales.subscription.isNone, "Sales already started"

View File

@ -34,7 +34,8 @@ method requestStorage*(market: MockMarket,
request: StorageRequest):
Future[StorageRequest] {.async.} =
market.requested.add(request)
for subscription in market.subscriptions.onRequest:
var subscriptions = market.subscriptions.onRequest
for subscription in subscriptions:
subscription.callback(request.id, request.ask)
return request
@ -58,7 +59,8 @@ proc fulfillRequest*(market: MockMarket,
host: Address) =
let fulfillment = Fulfillment(requestId: requestId, proof: proof, host: host)
market.fulfilled.add(fulfillment)
for subscription in market.subscriptions.onFulfillment:
var subscriptions = market.subscriptions.onFulfillment
for subscription in subscriptions:
if subscription.requestId == requestId:
subscription.callback(requestId)

View File

@ -1,4 +1,3 @@
import std/times
import pkg/asynctest
import pkg/chronos
import pkg/codex/sales
@ -114,24 +113,13 @@ suite "Sales":
check soldAvailability == availability
check soldRequest == request
# test "does not call onSale when a different offer is selected":
# var didSell: bool
# sales.onSale = proc(offer: StorageOffer) =
# didSell = true
# sales.add(availability)
# let request = await market.requestStorage(request)
# var otherOffer = StorageOffer(requestId: request.id, price: 1.u256)
# otherOffer = await market.offerStorage(otherOffer)
# await market.selectOffer(otherOffer.id)
# check not didSell
# test "makes storage available again when different offer is selected":
# sales.add(availability)
# let request = await market.requestStorage(request)
# var otherOffer = StorageOffer(requestId: request.id, price: 1.u256)
# otherOffer = await market.offerStorage(otherOffer)
# await market.selectOffer(otherOffer.id)
# check sales.available.contains(availability)
test "makes storage available again when other host fulfills request":
let otherHost = Address.example
sales.retrieve = proc(_: string) {.async.} = await sleepAsync(1.hours)
sales.add(availability)
discard await market.requestStorage(request)
market.fulfillRequest(request.id, proof, otherHost)
check sales.available == @[availability]
# test "makes storage available again when offer expires":
# sales.add(availability)