[purchasing] Withdraw funds when request times out

When a request for storage times out (not enough slots filled), the client will initiate a withdraw request to retrieve its funds out of the contract, setting the state of the request to RequestState.Cancelled. The client will also emit a RequestCancelled event for others to listen to (ie hosts will need to listen for this event to withdraw its collateral).

Add unit test that checks for emission of RequestCancelled after request is purchased request expires.

Update dagger-contracts dependency to commit that holds the changes supporting withdrawing of funds.
This commit is contained in:
Eric Mastro 2022-08-18 16:01:47 +10:00 committed by Eric Mastro
parent 9939d85b74
commit 0c3fbad470
7 changed files with 104 additions and 6 deletions

View File

@ -58,6 +58,10 @@ method fillSlot(market: OnChainMarket,
proof: seq[byte]) {.async.} = proof: seq[byte]) {.async.} =
await market.contract.fillSlot(requestId, slotIndex, proof) await market.contract.fillSlot(requestId, slotIndex, proof)
method withdrawFunds(market: OnChainMarket,
requestId: array[32, byte]) {.async.} =
await market.contract.withdrawFunds(requestId)
method subscribeRequests(market: OnChainMarket, method subscribeRequests(market: OnChainMarket,
callback: OnRequest): callback: OnRequest):
Future[MarketSubscription] {.async.} = Future[MarketSubscription] {.async.} =
@ -87,5 +91,15 @@ method subscribeFulfillment(market: OnChainMarket,
let subscription = await market.contract.subscribe(RequestFulfilled, onEvent) let subscription = await market.contract.subscribe(RequestFulfilled, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription) return OnChainMarketSubscription(eventSubscription: subscription)
method subscribeRequestCancelled*(market: OnChainMarket,
requestId: array[32, byte],
callback: OnRequestCancelled):
Future[MarketSubscription] {.async.} =
proc onEvent(event: RequestCancelled) {.upraises:[].} =
if event.requestId == requestId:
callback(event.requestId)
let subscription = await market.contract.subscribe(RequestCancelled, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription)
method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} = method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} =
await subscription.eventSubscription.unsubscribe() await subscription.eventSubscription.unsubscribe()

View File

@ -18,7 +18,8 @@ type
slotId* {.indexed.}: SlotId slotId* {.indexed.}: SlotId
RequestFulfilled* = object of Event RequestFulfilled* = object of Event
requestId* {.indexed.}: RequestId requestId* {.indexed.}: RequestId
RequestCancelled* = object of Event
requestId* {.indexed.}: Id
ProofSubmitted* = object of Event ProofSubmitted* = object of Event
id*: SlotId id*: SlotId
proof*: seq[byte] proof*: seq[byte]
@ -34,6 +35,7 @@ proc balanceOf*(storage: Storage, account: Address): UInt256 {.contract, view.}
proc requestStorage*(storage: Storage, request: StorageRequest) {.contract.} proc requestStorage*(storage: Storage, request: StorageRequest) {.contract.}
proc fillSlot*(storage: Storage, requestId: RequestId, slotIndex: UInt256, proof: seq[byte]) {.contract.} proc fillSlot*(storage: Storage, requestId: RequestId, slotIndex: UInt256, proof: seq[byte]) {.contract.}
proc withdrawFunds*(storage: Storage, requestId: Id) {.contract.}
proc payoutSlot*(storage: Storage, requestId: RequestId, slotIndex: UInt256) {.contract.} proc payoutSlot*(storage: Storage, requestId: RequestId, slotIndex: UInt256) {.contract.}
proc getRequest*(storage: Storage, id: RequestId): StorageRequest {.contract, view.} proc getRequest*(storage: Storage, id: RequestId): StorageRequest {.contract, view.}
proc getHost*(storage: Storage, id: SlotId): Address {.contract, view.} proc getHost*(storage: Storage, id: SlotId): Address {.contract, view.}

View File

@ -10,9 +10,10 @@ export requests
type type
Market* = ref object of RootObj Market* = ref object of RootObj
Subscription* = ref object of RootObj Subscription* = ref object of RootObj
OnRequest* = proc(id: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} OnRequest* = proc(id: array[32, byte], ask: StorageAsk) {.gcsafe, upraises:[].}
OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].} OnFulfillment* = proc(requestId: array[32, byte]) {.gcsafe, upraises: [].}
OnSlotFilled* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises:[].} OnSlotFilled* = proc(requestId: array[32, byte], slotIndex: UInt256) {.gcsafe, upraises:[].}
OnRequestCancelled* = proc(requestId: array[32, byte]) {.gcsafe, upraises:[].}
method getSigner*(market: Market): Future[Address] {.base, async.} = method getSigner*(market: Market): Future[Address] {.base, async.} =
raiseAssert("not implemented") raiseAssert("not implemented")
@ -38,6 +39,10 @@ method fillSlot*(market: Market,
proof: seq[byte]) {.base, async.} = proof: seq[byte]) {.base, async.} =
raiseAssert("not implemented") raiseAssert("not implemented")
method withdrawFunds*(market: Market,
requestId: array[32, byte]) {.base, async.} =
raiseAssert("not implemented")
method subscribeRequests*(market: Market, method subscribeRequests*(market: Market,
callback: OnRequest): callback: OnRequest):
Future[Subscription] {.base, async.} = Future[Subscription] {.base, async.} =
@ -56,5 +61,11 @@ method subscribeSlotFilled*(market: Market,
Future[Subscription] {.base, async.} = Future[Subscription] {.base, async.} =
raiseAssert("not implemented") raiseAssert("not implemented")
method subscribeRequestCancelled*(market: Market,
requestId: array[32, byte],
callback: OnRequestCancelled):
Future[Subscription] {.base, async.} =
raiseAssert("not implemented")
method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} =
raiseAssert("not implemented") raiseAssert("not implemented")

View File

@ -23,6 +23,12 @@ type
clock: Clock clock: Clock
request*: StorageRequest request*: StorageRequest
PurchaseTimeout* = Timeout PurchaseTimeout* = Timeout
RequestState* = enum
New = 1, # [default] waiting to fill slots
Started = 2, # all slots filled, accepting regular proofs
Cancelled = 3, # not enough slots filled before expiry
Finished = 4, # successfully completed
Failed = 5 # too many nodes have failed to provide proofs, data lost
PurchaseId* = distinct array[32, byte] PurchaseId* = distinct array[32, byte]
const DefaultProofProbability = 100.u256 const DefaultProofProbability = 100.u256
@ -75,6 +81,7 @@ func getPurchase*(purchasing: Purchasing, id: PurchaseId): ?Purchase =
proc run(purchase: Purchase) {.async.} = proc run(purchase: Purchase) {.async.} =
let market = purchase.market let market = purchase.market
let clock = purchase.clock let clock = purchase.clock
var state = RequestState.New
proc requestStorage {.async.} = proc requestStorage {.async.} =
purchase.request = await market.requestStorage(purchase.request) purchase.request = await market.requestStorage(purchase.request)
@ -87,13 +94,26 @@ proc run(purchase: Purchase) {.async.} =
let subscription = await market.subscribeFulfillment(request.id, callback) let subscription = await market.subscribeFulfillment(request.id, callback)
await done await done
await subscription.unsubscribe() await subscription.unsubscribe()
state = RequestState.Started
proc withTimeout(future: Future[void]) {.async.} = proc withTimeout(future: Future[void]) {.async.} =
let expiry = purchase.request.expiry.truncate(int64) let expiry = purchase.request.expiry.truncate(int64)
await future.withTimeout(clock, expiry) await future.withTimeout(clock, expiry)
await requestStorage() await requestStorage()
await waitUntilFulfilled().withTimeout() try:
await waitUntilFulfilled().withTimeout()
except PurchaseTimeout as e:
if state != RequestState.Started:
# If contract was fulfilled, the state would be RequestState.Started.
# Otherwise, the request would have timed out and should be considered
# cancelled. However, the request state hasn't been updated to
# RequestState.Cancelled yet so we can't check for that state or listen for
# an event emission. Instead, the state will be updated when the client
# requests to withdraw funds from the storage request.
await market.withdrawFunds(purchase.request.id)
state = RequestState.Cancelled
raise e
proc start(purchase: Purchase) = proc start(purchase: Purchase) =
purchase.future = purchase.run() purchase.future = purchase.run()
@ -104,6 +124,9 @@ proc wait*(purchase: Purchase) {.async.} =
func id*(purchase: Purchase): PurchaseId = func id*(purchase: Purchase): PurchaseId =
PurchaseId(purchase.request.id) PurchaseId(purchase.request.id)
func cancelled*(purchase: Purchase): bool =
purchase.future.cancelled
func finished*(purchase: Purchase): bool = func finished*(purchase: Purchase): bool =
purchase.future.finished purchase.future.finished

View File

@ -23,6 +23,7 @@ type
onRequest: seq[RequestSubscription] onRequest: seq[RequestSubscription]
onFulfillment: seq[FulfillmentSubscription] onFulfillment: seq[FulfillmentSubscription]
onSlotFilled: seq[SlotFilledSubscription] onSlotFilled: seq[SlotFilledSubscription]
onRequestCancelled: seq[RequestCancelledSubscription]
RequestSubscription* = ref object of Subscription RequestSubscription* = ref object of Subscription
market: MockMarket market: MockMarket
callback: OnRequest callback: OnRequest
@ -35,6 +36,10 @@ type
requestId: RequestId requestId: RequestId
slotIndex: UInt256 slotIndex: UInt256
callback: OnSlotFilled callback: OnSlotFilled
RequestCancelledSubscription* = ref object of Subscription
market: MockMarket
requestId: array[32, byte]
callback: OnRequestCancelled
proc new*(_: type MockMarket): MockMarket = proc new*(_: type MockMarket): MockMarket =
MockMarket(signer: Address.example) MockMarket(signer: Address.example)
@ -75,6 +80,13 @@ proc emitSlotFilled*(market: MockMarket,
subscription.slotIndex == slotIndex: subscription.slotIndex == slotIndex:
subscription.callback(requestId, slotIndex) subscription.callback(requestId, slotIndex)
proc emitRequestCancelled*(market: MockMarket,
requestId: array[32, byte]) =
var subscriptions = market.subscriptions.onRequestCancelled
for subscription in subscriptions:
if subscription.requestId == requestId:
subscription.callback(requestId)
proc emitRequestFulfilled*(market: MockMarket, requestId: RequestId) = proc emitRequestFulfilled*(market: MockMarket, requestId: RequestId) =
var subscriptions = market.subscriptions.onFulfillment var subscriptions = market.subscriptions.onFulfillment
for subscription in subscriptions: for subscription in subscriptions:
@ -101,6 +113,10 @@ method fillSlot*(market: MockMarket,
proof: seq[byte]) {.async.} = proof: seq[byte]) {.async.} =
market.fillSlot(requestId, slotIndex, proof, market.signer) market.fillSlot(requestId, slotIndex, proof, market.signer)
method withdrawFunds*(market: MockMarket,
requestId: array[32, byte]) {.async.} =
market.emitRequestCancelled(requestId)
method subscribeRequests*(market: MockMarket, method subscribeRequests*(market: MockMarket,
callback: OnRequest): callback: OnRequest):
Future[Subscription] {.async.} = Future[Subscription] {.async.} =
@ -137,6 +153,18 @@ method subscribeSlotFilled*(market: MockMarket,
market.subscriptions.onSlotFilled.add(subscription) market.subscriptions.onSlotFilled.add(subscription)
return subscription return subscription
method subscribeRequestCancelled*(market: MockMarket,
requestId: array[32, byte],
callback: OnRequestCancelled):
Future[Subscription] {.async.} =
let subscription = RequestCancelledSubscription(
market: market,
requestId: requestId,
callback: callback
)
market.subscriptions.onRequestCancelled.add(subscription)
return subscription
method unsubscribe*(subscription: RequestSubscription) {.async.} = method unsubscribe*(subscription: RequestSubscription) {.async.} =
subscription.market.subscriptions.onRequest.keepItIf(it != subscription) subscription.market.subscriptions.onRequest.keepItIf(it != subscription)
@ -145,3 +173,6 @@ method unsubscribe*(subscription: FulfillmentSubscription) {.async.} =
method unsubscribe*(subscription: SlotFilledSubscription) {.async.} = method unsubscribe*(subscription: SlotFilledSubscription) {.async.} =
subscription.market.subscriptions.onSlotFilled.keepItIf(it != subscription) subscription.market.subscriptions.onSlotFilled.keepItIf(it != subscription)
method unsubscribe*(subscription: RequestCancelledSubscription) {.async.} =
subscription.market.subscriptions.onRequestCancelled.keepItIf(it != subscription)

View File

@ -1,6 +1,7 @@
import std/times import std/times
import pkg/asynctest import pkg/asynctest
import pkg/chronos import pkg/chronos
import pkg/upraises
import pkg/stint import pkg/stint
import pkg/codex/purchasing import pkg/codex/purchasing
import ./helpers/mockmarket import ./helpers/mockmarket
@ -87,3 +88,19 @@ suite "Purchasing":
clock.set(request.expiry.truncate(int64)) clock.set(request.expiry.truncate(int64))
expect PurchaseTimeout: expect PurchaseTimeout:
await purchase.wait() await purchase.wait()
test "supports request cancelled subscription when request times out":
let purchase = purchasing.purchase(request)
let request = market.requested[0]
var receivedIds: seq[array[32, byte]]
clock.set(request.expiry.truncate(int64))
proc onRequestCancelled(id: array[32, byte]) {.gcsafe, upraises:[].} =
receivedIds.add(id)
let subscription = await market.subscribeRequestCancelled(
request.id,
onRequestCancelled)
try:
await purchase.wait()
except PurchaseTimeout:
check receivedIds == @[request.id]
await subscription.unsubscribe()

@ -1 +1 @@
Subproject commit 9ab65ae5a61a09a6849cc4adbd8ef58fb89c037e Subproject commit 503c496fe24b77f9ad180bf9b85dfa55baf4cb0e