[sales] replace fulfillRequest() by fillSlot()
This commit is contained in:
parent
156cd5ba73
commit
dc95c863d2
|
@ -12,6 +12,7 @@ type
|
||||||
Subscription* = ref object of RootObj
|
Subscription* = ref object of RootObj
|
||||||
OnRequest* = proc(id: array[32, byte], ask: StorageAsk) {.gcsafe, upraises:[].}
|
OnRequest* = proc(id: array[32, byte], ask: StorageAsk) {.gcsafe, upraises:[].}
|
||||||
OnFulfillment* = proc(requestId: array[32, byte]) {.gcsafe, upraises: [].}
|
OnFulfillment* = proc(requestId: array[32, byte]) {.gcsafe, upraises: [].}
|
||||||
|
OnSlotFilled* = proc(requestId: array[32, byte], slotIndex: UInt256) {.gcsafe, upraises:[].}
|
||||||
|
|
||||||
method getSigner*(market: Market): Future[Address] {.base, async.} =
|
method getSigner*(market: Market): Future[Address] {.base, async.} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
@ -35,6 +36,11 @@ method fulfillRequest*(market: Market,
|
||||||
proof: seq[byte]) {.base, async.} =
|
proof: seq[byte]) {.base, async.} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
|
method getHost*(market: Market,
|
||||||
|
requestId: array[32, byte],
|
||||||
|
slotIndex: UInt256): Future[?Address] {.base, async.} =
|
||||||
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method fillSlot*(market: Market,
|
method fillSlot*(market: Market,
|
||||||
requestId: array[32, byte],
|
requestId: array[32, byte],
|
||||||
slotIndex: UInt256,
|
slotIndex: UInt256,
|
||||||
|
@ -52,5 +58,12 @@ method subscribeFulfillment*(market: Market,
|
||||||
Future[Subscription] {.base, async.} =
|
Future[Subscription] {.base, async.} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
|
method subscribeSlotFilled*(market: Market,
|
||||||
|
requestId: array[32, byte],
|
||||||
|
slotIndex: UInt256,
|
||||||
|
callback: OnSlotFilled):
|
||||||
|
Future[Subscription] {.base, async.} =
|
||||||
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} =
|
method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
|
@ -55,7 +55,9 @@ type
|
||||||
OnStore = proc(cid: string, availability: Availability): Future[void] {.gcsafe, upraises: [].}
|
OnStore = proc(cid: string, availability: Availability): Future[void] {.gcsafe, upraises: [].}
|
||||||
OnProve = proc(cid: string): Future[seq[byte]] {.gcsafe, upraises: [].}
|
OnProve = proc(cid: string): Future[seq[byte]] {.gcsafe, upraises: [].}
|
||||||
OnClear = proc(availability: Availability, request: StorageRequest) {.gcsafe, upraises: [].}
|
OnClear = proc(availability: Availability, request: StorageRequest) {.gcsafe, upraises: [].}
|
||||||
OnSale = proc(availability: Availability, request: StorageRequest) {.gcsafe, upraises: [].}
|
OnSale = proc(availability: Availability,
|
||||||
|
request: StorageRequest,
|
||||||
|
slotIndex: UInt256) {.gcsafe, upraises: [].}
|
||||||
|
|
||||||
func new*(_: type Sales, market: Market, clock: Clock): Sales =
|
func new*(_: type Sales, market: Market, clock: Clock): Sales =
|
||||||
Sales(
|
Sales(
|
||||||
|
@ -113,26 +115,31 @@ proc finish(agent: SalesAgent, success: bool) =
|
||||||
|
|
||||||
if success:
|
if success:
|
||||||
if onSale =? agent.sales.onSale and request =? agent.request:
|
if onSale =? agent.sales.onSale and request =? agent.request:
|
||||||
onSale(agent.availability, request)
|
onSale(agent.availability, request, 0.u256) # TODO: slot index
|
||||||
else:
|
else:
|
||||||
if onClear =? agent.sales.onClear and request =? agent.request:
|
if onClear =? agent.sales.onClear and request =? agent.request:
|
||||||
onClear(agent.availability, request)
|
onClear(agent.availability, request)
|
||||||
agent.sales.add(agent.availability)
|
agent.sales.add(agent.availability)
|
||||||
|
|
||||||
proc onFulfill(agent: SalesAgent, requestId: array[32, byte]) {.async.} =
|
proc onSlotFilled(agent: SalesAgent,
|
||||||
|
requestId: array[32, byte],
|
||||||
|
slotIndex: UInt256) {.async.} =
|
||||||
try:
|
try:
|
||||||
let market = agent.sales.market
|
let market = agent.sales.market
|
||||||
let host = await market.getHost(requestId)
|
let host = await market.getHost(requestId, slotIndex)
|
||||||
let me = await market.getSigner()
|
let me = await market.getSigner()
|
||||||
agent.finish(success = (host == me.some))
|
agent.finish(success = (host == me.some))
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
agent.finish(success = false)
|
agent.finish(success = false)
|
||||||
|
|
||||||
proc subscribeFulfill(agent: SalesAgent) {.async.} =
|
proc subscribeSlotFilled(agent: SalesAgent) {.async.} =
|
||||||
proc onFulfill(requestId: array[32, byte]) {.gcsafe, upraises:[].} =
|
proc onSlotFilled(requestId: array[32, byte],
|
||||||
asyncSpawn agent.onFulfill(requestId)
|
slotIndex: UInt256) {.gcsafe, upraises:[].} =
|
||||||
|
asyncSpawn agent.onSlotFilled(requestId, slotIndex)
|
||||||
let market = agent.sales.market
|
let market = agent.sales.market
|
||||||
let subscription = await market.subscribeFulfillment(agent.requestId, onFulfill)
|
let subscription = await market.subscribeSlotFilled(agent.requestId,
|
||||||
|
0.u256,
|
||||||
|
onSlotFilled) # TODO: slot index
|
||||||
agent.subscription = some subscription
|
agent.subscription = some subscription
|
||||||
|
|
||||||
proc waitForExpiry(agent: SalesAgent) {.async.} =
|
proc waitForExpiry(agent: SalesAgent) {.async.} =
|
||||||
|
@ -155,7 +162,7 @@ proc start(agent: SalesAgent) {.async.} =
|
||||||
|
|
||||||
sales.remove(availability)
|
sales.remove(availability)
|
||||||
|
|
||||||
await agent.subscribeFulfill()
|
await agent.subscribeSlotFilled()
|
||||||
|
|
||||||
agent.request = await market.getRequest(agent.requestId)
|
agent.request = await market.getRequest(agent.requestId)
|
||||||
without request =? agent.request:
|
without request =? agent.request:
|
||||||
|
@ -166,7 +173,7 @@ proc start(agent: SalesAgent) {.async.} =
|
||||||
|
|
||||||
await onStore(request.content.cid, availability)
|
await onStore(request.content.cid, availability)
|
||||||
let proof = await onProve(request.content.cid)
|
let proof = await onProve(request.content.cid)
|
||||||
await market.fulfillRequest(request.id, proof)
|
await market.fillSlot(request.id, 0.u256, proof) # TODO: slot index
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
raise
|
raise
|
||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
|
|
|
@ -7,15 +7,22 @@ type
|
||||||
MockMarket* = ref object of Market
|
MockMarket* = ref object of Market
|
||||||
requested*: seq[StorageRequest]
|
requested*: seq[StorageRequest]
|
||||||
fulfilled*: seq[Fulfillment]
|
fulfilled*: seq[Fulfillment]
|
||||||
|
filled*: seq[Slot]
|
||||||
signer: Address
|
signer: Address
|
||||||
subscriptions: Subscriptions
|
subscriptions: Subscriptions
|
||||||
Fulfillment* = object
|
Fulfillment* = object
|
||||||
requestId*: array[32, byte]
|
requestId*: array[32, byte]
|
||||||
proof*: seq[byte]
|
proof*: seq[byte]
|
||||||
host*: Address
|
host*: Address
|
||||||
|
Slot* = object
|
||||||
|
requestId*: array[32, byte]
|
||||||
|
slotIndex*: UInt256
|
||||||
|
proof*: seq[byte]
|
||||||
|
host*: Address
|
||||||
Subscriptions = object
|
Subscriptions = object
|
||||||
onRequest: seq[RequestSubscription]
|
onRequest: seq[RequestSubscription]
|
||||||
onFulfillment: seq[FulfillmentSubscription]
|
onFulfillment: seq[FulfillmentSubscription]
|
||||||
|
onSlotFilled: seq[SlotFilledSubscription]
|
||||||
RequestSubscription* = ref object of Subscription
|
RequestSubscription* = ref object of Subscription
|
||||||
market: MockMarket
|
market: MockMarket
|
||||||
callback: OnRequest
|
callback: OnRequest
|
||||||
|
@ -23,6 +30,11 @@ type
|
||||||
market: MockMarket
|
market: MockMarket
|
||||||
requestId: array[32, byte]
|
requestId: array[32, byte]
|
||||||
callback: OnFulfillment
|
callback: OnFulfillment
|
||||||
|
SlotFilledSubscription* = ref object of Subscription
|
||||||
|
market: MockMarket
|
||||||
|
requestId: array[32, byte]
|
||||||
|
slotIndex: UInt256
|
||||||
|
callback: OnSlotFilled
|
||||||
|
|
||||||
proc new*(_: type MockMarket): MockMarket =
|
proc new*(_: type MockMarket): MockMarket =
|
||||||
MockMarket(signer: Address.example)
|
MockMarket(signer: Address.example)
|
||||||
|
@ -69,6 +81,38 @@ method fulfillRequest*(market: MockMarket,
|
||||||
proof: seq[byte]) {.async.} =
|
proof: seq[byte]) {.async.} =
|
||||||
market.fulfillRequest(requestid, proof, market.signer)
|
market.fulfillRequest(requestid, proof, market.signer)
|
||||||
|
|
||||||
|
method getHost(market: MockMarket,
|
||||||
|
requestId: array[32, byte],
|
||||||
|
slotIndex: UInt256): Future[?Address] {.async.} =
|
||||||
|
for slot in market.filled:
|
||||||
|
if slot.requestId == requestId and slot.slotIndex == slotIndex:
|
||||||
|
return some slot.host
|
||||||
|
return none Address
|
||||||
|
|
||||||
|
proc fillSlot*(market: MockMarket,
|
||||||
|
requestId: array[32, byte],
|
||||||
|
slotIndex: UInt256,
|
||||||
|
proof: seq[byte],
|
||||||
|
host: Address) =
|
||||||
|
let slot = Slot(
|
||||||
|
requestId: requestId,
|
||||||
|
slotIndex: slotIndex,
|
||||||
|
proof: proof,
|
||||||
|
host: host
|
||||||
|
)
|
||||||
|
market.filled.add(slot)
|
||||||
|
var subscriptions = market.subscriptions.onSlotFilled
|
||||||
|
for subscription in subscriptions:
|
||||||
|
if subscription.requestId == requestId and
|
||||||
|
subscription.slotIndex == slotIndex:
|
||||||
|
subscription.callback(requestId, slotIndex)
|
||||||
|
|
||||||
|
method fillSlot*(market: MockMarket,
|
||||||
|
requestId: array[32, byte],
|
||||||
|
slotIndex: UInt256,
|
||||||
|
proof: seq[byte]) {.async.} =
|
||||||
|
market.fillSlot(requestId, slotIndex, proof, market.signer)
|
||||||
|
|
||||||
method subscribeRequests*(market: MockMarket,
|
method subscribeRequests*(market: MockMarket,
|
||||||
callback: OnRequest):
|
callback: OnRequest):
|
||||||
Future[Subscription] {.async.} =
|
Future[Subscription] {.async.} =
|
||||||
|
@ -91,8 +135,25 @@ method subscribeFulfillment*(market: MockMarket,
|
||||||
market.subscriptions.onFulfillment.add(subscription)
|
market.subscriptions.onFulfillment.add(subscription)
|
||||||
return subscription
|
return subscription
|
||||||
|
|
||||||
|
method subscribeSlotFilled*(market: MockMarket,
|
||||||
|
requestId: array[32, byte],
|
||||||
|
slotIndex: UInt256,
|
||||||
|
callback: OnSlotFilled):
|
||||||
|
Future[Subscription] {.async.} =
|
||||||
|
let subscription = SlotFilledSubscription(
|
||||||
|
market: market,
|
||||||
|
requestId: requestId,
|
||||||
|
slotIndex: slotIndex,
|
||||||
|
callback: callback
|
||||||
|
)
|
||||||
|
market.subscriptions.onSlotFilled.add(subscription)
|
||||||
|
return subscription
|
||||||
|
|
||||||
method unsubscribe*(subscription: RequestSubscription) {.async.} =
|
method unsubscribe*(subscription: RequestSubscription) {.async.} =
|
||||||
subscription.market.subscriptions.onRequest.keepItIf(it != subscription)
|
subscription.market.subscriptions.onRequest.keepItIf(it != subscription)
|
||||||
|
|
||||||
method unsubscribe*(subscription: FulfillmentSubscription) {.async.} =
|
method unsubscribe*(subscription: FulfillmentSubscription) {.async.} =
|
||||||
subscription.market.subscriptions.onFulfillment.keepItIf(it != subscription)
|
subscription.market.subscriptions.onFulfillment.keepItIf(it != subscription)
|
||||||
|
|
||||||
|
method unsubscribe*(subscription: SlotFilledSubscription) {.async.} =
|
||||||
|
subscription.market.subscriptions.onSlotFilled.keepItIf(it != subscription)
|
||||||
|
|
|
@ -16,7 +16,8 @@ suite "Sales":
|
||||||
ask: StorageAsk(
|
ask: StorageAsk(
|
||||||
duration: 60.u256,
|
duration: 60.u256,
|
||||||
size: 100.u256,
|
size: 100.u256,
|
||||||
reward:42.u256
|
reward:42.u256,
|
||||||
|
slots: 4
|
||||||
),
|
),
|
||||||
content: StorageContent(
|
content: StorageContent(
|
||||||
cid: "some cid"
|
cid: "some cid"
|
||||||
|
@ -101,24 +102,30 @@ suite "Sales":
|
||||||
discard await market.requestStorage(request)
|
discard await market.requestStorage(request)
|
||||||
check provingCid == request.content.cid
|
check provingCid == request.content.cid
|
||||||
|
|
||||||
test "fulfills request":
|
test "fills a slot":
|
||||||
sales.add(availability)
|
sales.add(availability)
|
||||||
discard await market.requestStorage(request)
|
discard await market.requestStorage(request)
|
||||||
check market.fulfilled.len == 1
|
check market.filled.len == 1
|
||||||
check market.fulfilled[0].requestId == request.id
|
check market.filled[0].requestId == request.id
|
||||||
check market.fulfilled[0].proof == proof
|
check market.filled[0].slotIndex < request.ask.slots.u256
|
||||||
check market.fulfilled[0].host == await market.getSigner()
|
check market.filled[0].proof == proof
|
||||||
|
check market.filled[0].host == await market.getSigner()
|
||||||
|
|
||||||
test "calls onSale when request is fulfilled":
|
test "calls onSale when slot is filled":
|
||||||
var soldAvailability: Availability
|
var soldAvailability: Availability
|
||||||
var soldRequest: StorageRequest
|
var soldRequest: StorageRequest
|
||||||
sales.onSale = proc(availability: Availability, request: StorageRequest) =
|
var soldSlotIndex: UInt256
|
||||||
|
sales.onSale = proc(availability: Availability,
|
||||||
|
request: StorageRequest,
|
||||||
|
slotIndex: UInt256) =
|
||||||
soldAvailability = availability
|
soldAvailability = availability
|
||||||
soldRequest = request
|
soldRequest = request
|
||||||
|
soldSlotIndex = slotIndex
|
||||||
sales.add(availability)
|
sales.add(availability)
|
||||||
discard await market.requestStorage(request)
|
discard await market.requestStorage(request)
|
||||||
check soldAvailability == availability
|
check soldAvailability == availability
|
||||||
check soldRequest == request
|
check soldRequest == request
|
||||||
|
check soldSlotIndex < request.ask.slots.u256
|
||||||
|
|
||||||
test "calls onClear when storage becomes available again":
|
test "calls onClear when storage becomes available again":
|
||||||
sales.onProve = proc(cid: string): Future[seq[byte]] {.async.} =
|
sales.onProve = proc(cid: string): Future[seq[byte]] {.async.} =
|
||||||
|
@ -133,13 +140,14 @@ suite "Sales":
|
||||||
check clearedAvailability == availability
|
check clearedAvailability == availability
|
||||||
check clearedRequest == request
|
check clearedRequest == request
|
||||||
|
|
||||||
test "makes storage available again when other host fulfills request":
|
test "makes storage available again when other host fills the slot":
|
||||||
let otherHost = Address.example
|
let otherHost = Address.example
|
||||||
sales.onStore = proc(cid: string, availability: Availability) {.async.} =
|
sales.onStore = proc(cid: string, availability: Availability) {.async.} =
|
||||||
await sleepAsync(1.hours)
|
await sleepAsync(1.hours)
|
||||||
sales.add(availability)
|
sales.add(availability)
|
||||||
discard await market.requestStorage(request)
|
discard await market.requestStorage(request)
|
||||||
market.fulfillRequest(request.id, proof, otherHost)
|
for slotIndex in 0..<request.ask.slots:
|
||||||
|
market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
|
||||||
check sales.available == @[availability]
|
check sales.available == @[availability]
|
||||||
|
|
||||||
test "makes storage available again when request expires":
|
test "makes storage available again when request expires":
|
||||||
|
|
Loading…
Reference in New Issue