feat(slot-reservations): support SlotReservationsFull event (#926)
This commit is contained in:
parent
91e4e368de
commit
566db2fa30
|
@ -307,6 +307,17 @@ method subscribeSlotFreed*(market: OnChainMarket,
|
||||||
let subscription = await market.contract.subscribe(SlotFreed, onEvent)
|
let subscription = await market.contract.subscribe(SlotFreed, onEvent)
|
||||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||||
|
|
||||||
|
method subscribeSlotReservationsFull*(
|
||||||
|
market: OnChainMarket,
|
||||||
|
callback: OnSlotReservationsFull): Future[MarketSubscription] {.async.} =
|
||||||
|
|
||||||
|
proc onEvent(event: SlotReservationsFull) {.upraises:[].} =
|
||||||
|
callback(event.requestId, event.slotIndex)
|
||||||
|
|
||||||
|
convertEthersError:
|
||||||
|
let subscription = await market.contract.subscribe(SlotReservationsFull, onEvent)
|
||||||
|
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||||
|
|
||||||
method subscribeFulfillment(market: OnChainMarket,
|
method subscribeFulfillment(market: OnChainMarket,
|
||||||
callback: OnFulfillment):
|
callback: OnFulfillment):
|
||||||
Future[MarketSubscription] {.async.} =
|
Future[MarketSubscription] {.async.} =
|
||||||
|
|
|
@ -25,6 +25,7 @@ type
|
||||||
OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].}
|
OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].}
|
||||||
OnSlotFilled* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises:[].}
|
OnSlotFilled* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises:[].}
|
||||||
OnSlotFreed* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises: [].}
|
OnSlotFreed* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises: [].}
|
||||||
|
OnSlotReservationsFull* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises: [].}
|
||||||
OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises:[].}
|
OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises:[].}
|
||||||
OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises:[].}
|
OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises:[].}
|
||||||
OnProofSubmitted* = proc(id: SlotId) {.gcsafe, upraises:[].}
|
OnProofSubmitted* = proc(id: SlotId) {.gcsafe, upraises:[].}
|
||||||
|
@ -42,6 +43,9 @@ type
|
||||||
SlotFreed* = object of MarketplaceEvent
|
SlotFreed* = object of MarketplaceEvent
|
||||||
requestId* {.indexed.}: RequestId
|
requestId* {.indexed.}: RequestId
|
||||||
slotIndex*: UInt256
|
slotIndex*: UInt256
|
||||||
|
SlotReservationsFull* = object of MarketplaceEvent
|
||||||
|
requestId* {.indexed.}: RequestId
|
||||||
|
slotIndex*: UInt256
|
||||||
RequestFulfilled* = object of MarketplaceEvent
|
RequestFulfilled* = object of MarketplaceEvent
|
||||||
requestId* {.indexed.}: RequestId
|
requestId* {.indexed.}: RequestId
|
||||||
RequestCancelled* = object of MarketplaceEvent
|
RequestCancelled* = object of MarketplaceEvent
|
||||||
|
@ -203,6 +207,12 @@ method subscribeSlotFreed*(market: Market,
|
||||||
Future[Subscription] {.base, async.} =
|
Future[Subscription] {.base, async.} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
|
method subscribeSlotReservationsFull*(
|
||||||
|
market: Market,
|
||||||
|
callback: OnSlotReservationsFull): Future[Subscription] {.base, async.} =
|
||||||
|
|
||||||
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method subscribeRequestCancelled*(market: Market,
|
method subscribeRequestCancelled*(market: Market,
|
||||||
callback: OnRequestCancelled):
|
callback: OnRequestCancelled):
|
||||||
Future[Subscription] {.base, async.} =
|
Future[Subscription] {.base, async.} =
|
||||||
|
|
|
@ -465,6 +465,23 @@ proc subscribeSlotFreed(sales: Sales) {.async.} =
|
||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
error "Unable to subscribe to slot freed events", msg = e.msg
|
error "Unable to subscribe to slot freed events", msg = e.msg
|
||||||
|
|
||||||
|
proc subscribeSlotReservationsFull(sales: Sales) {.async.} =
|
||||||
|
let context = sales.context
|
||||||
|
let market = context.market
|
||||||
|
let queue = context.slotQueue
|
||||||
|
|
||||||
|
proc onSlotReservationsFull(requestId: RequestId, slotIndex: UInt256) =
|
||||||
|
trace "reservations for slot full, removing from slot queue", requestId, slotIndex
|
||||||
|
queue.delete(requestId, slotIndex.truncate(uint16))
|
||||||
|
|
||||||
|
try:
|
||||||
|
let sub = await market.subscribeSlotReservationsFull(onSlotReservationsFull)
|
||||||
|
sales.subscriptions.add(sub)
|
||||||
|
except CancelledError as error:
|
||||||
|
raise error
|
||||||
|
except CatchableError as e:
|
||||||
|
error "Unable to subscribe to slot filled events", msg = e.msg
|
||||||
|
|
||||||
proc startSlotQueue(sales: Sales) {.async.} =
|
proc startSlotQueue(sales: Sales) {.async.} =
|
||||||
let slotQueue = sales.context.slotQueue
|
let slotQueue = sales.context.slotQueue
|
||||||
let reservations = sales.context.reservations
|
let reservations = sales.context.reservations
|
||||||
|
@ -488,6 +505,7 @@ proc subscribe(sales: Sales) {.async.} =
|
||||||
await sales.subscribeSlotFilled()
|
await sales.subscribeSlotFilled()
|
||||||
await sales.subscribeSlotFreed()
|
await sales.subscribeSlotFreed()
|
||||||
await sales.subscribeCancellation()
|
await sales.subscribeCancellation()
|
||||||
|
await sales.subscribeSlotReservationsFull()
|
||||||
|
|
||||||
proc unsubscribe(sales: Sales) {.async.} =
|
proc unsubscribe(sales: Sales) {.async.} =
|
||||||
for sub in sales.subscriptions:
|
for sub in sales.subscriptions:
|
||||||
|
|
|
@ -54,6 +54,7 @@ type
|
||||||
onFulfillment: seq[FulfillmentSubscription]
|
onFulfillment: seq[FulfillmentSubscription]
|
||||||
onSlotFilled: seq[SlotFilledSubscription]
|
onSlotFilled: seq[SlotFilledSubscription]
|
||||||
onSlotFreed: seq[SlotFreedSubscription]
|
onSlotFreed: seq[SlotFreedSubscription]
|
||||||
|
onSlotReservationsFull: seq[SlotReservationsFullSubscription]
|
||||||
onRequestCancelled: seq[RequestCancelledSubscription]
|
onRequestCancelled: seq[RequestCancelledSubscription]
|
||||||
onRequestFailed: seq[RequestFailedSubscription]
|
onRequestFailed: seq[RequestFailedSubscription]
|
||||||
onProofSubmitted: seq[ProofSubmittedSubscription]
|
onProofSubmitted: seq[ProofSubmittedSubscription]
|
||||||
|
@ -72,6 +73,9 @@ type
|
||||||
SlotFreedSubscription* = ref object of Subscription
|
SlotFreedSubscription* = ref object of Subscription
|
||||||
market: MockMarket
|
market: MockMarket
|
||||||
callback: OnSlotFreed
|
callback: OnSlotFreed
|
||||||
|
SlotReservationsFullSubscription* = ref object of Subscription
|
||||||
|
market: MockMarket
|
||||||
|
callback: OnSlotReservationsFull
|
||||||
RequestCancelledSubscription* = ref object of Subscription
|
RequestCancelledSubscription* = ref object of Subscription
|
||||||
market: MockMarket
|
market: MockMarket
|
||||||
requestId: ?RequestId
|
requestId: ?RequestId
|
||||||
|
@ -202,6 +206,15 @@ proc emitSlotFreed*(market: MockMarket,
|
||||||
for subscription in subscriptions:
|
for subscription in subscriptions:
|
||||||
subscription.callback(requestId, slotIndex)
|
subscription.callback(requestId, slotIndex)
|
||||||
|
|
||||||
|
proc emitSlotReservationsFull*(
|
||||||
|
market: MockMarket,
|
||||||
|
requestId: RequestId,
|
||||||
|
slotIndex: UInt256) =
|
||||||
|
|
||||||
|
var subscriptions = market.subscriptions.onSlotReservationsFull
|
||||||
|
for subscription in subscriptions:
|
||||||
|
subscription.callback(requestId, slotIndex)
|
||||||
|
|
||||||
proc emitRequestCancelled*(market: MockMarket, requestId: RequestId) =
|
proc emitRequestCancelled*(market: MockMarket, requestId: RequestId) =
|
||||||
var subscriptions = market.subscriptions.onRequestCancelled
|
var subscriptions = market.subscriptions.onRequestCancelled
|
||||||
for subscription in subscriptions:
|
for subscription in subscriptions:
|
||||||
|
@ -389,6 +402,15 @@ method subscribeSlotFreed*(market: MockMarket,
|
||||||
market.subscriptions.onSlotFreed.add(subscription)
|
market.subscriptions.onSlotFreed.add(subscription)
|
||||||
return subscription
|
return subscription
|
||||||
|
|
||||||
|
method subscribeSlotReservationsFull*(
|
||||||
|
market: MockMarket,
|
||||||
|
callback: OnSlotReservationsFull): Future[Subscription] {.async.} =
|
||||||
|
|
||||||
|
let subscription =
|
||||||
|
SlotReservationsFullSubscription(market: market, callback: callback)
|
||||||
|
market.subscriptions.onSlotReservationsFull.add(subscription)
|
||||||
|
return subscription
|
||||||
|
|
||||||
method subscribeRequestCancelled*(market: MockMarket,
|
method subscribeRequestCancelled*(market: MockMarket,
|
||||||
callback: OnRequestCancelled):
|
callback: OnRequestCancelled):
|
||||||
Future[Subscription] {.async.} =
|
Future[Subscription] {.async.} =
|
||||||
|
@ -481,3 +503,6 @@ method unsubscribe*(subscription: RequestFailedSubscription) {.async.} =
|
||||||
|
|
||||||
method unsubscribe*(subscription: ProofSubmittedSubscription) {.async.} =
|
method unsubscribe*(subscription: ProofSubmittedSubscription) {.async.} =
|
||||||
subscription.market.subscriptions.onProofSubmitted.keepItIf(it != subscription)
|
subscription.market.subscriptions.onProofSubmitted.keepItIf(it != subscription)
|
||||||
|
|
||||||
|
method unsubscribe*(subscription: SlotReservationsFullSubscription) {.async.} =
|
||||||
|
subscription.market.subscriptions.onSlotReservationsFull.keepItIf(it != subscription)
|
||||||
|
|
|
@ -270,6 +270,12 @@ asyncchecksuite "Sales":
|
||||||
let expected = SlotQueueItem.init(request1, 1'u16)
|
let expected = SlotQueueItem.init(request1, 1'u16)
|
||||||
check always (not itemsProcessed.contains(expected))
|
check always (not itemsProcessed.contains(expected))
|
||||||
|
|
||||||
|
test "removes slot index from slot queue once SlotReservationsFull emitted":
|
||||||
|
let request1 = await addRequestToSaturatedQueue()
|
||||||
|
market.emitSlotReservationsFull(request1.id, 1.u256)
|
||||||
|
let expected = SlotQueueItem.init(request1, 1'u16)
|
||||||
|
check always (not itemsProcessed.contains(expected))
|
||||||
|
|
||||||
test "adds slot index to slot queue once SlotFreed emitted":
|
test "adds slot index to slot queue once SlotFreed emitted":
|
||||||
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
|
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
|
||||||
itemsProcessed.add item
|
itemsProcessed.add item
|
||||||
|
|
|
@ -200,6 +200,30 @@ ethersuite "On-Chain Market":
|
||||||
check receivedIdxs == @[slotIndex]
|
check receivedIdxs == @[slotIndex]
|
||||||
await subscription.unsubscribe()
|
await subscription.unsubscribe()
|
||||||
|
|
||||||
|
test "supports slot reservations full subscriptions":
|
||||||
|
let account2 = ethProvider.getSigner(accounts[2])
|
||||||
|
let account3 = ethProvider.getSigner(accounts[3])
|
||||||
|
|
||||||
|
await market.requestStorage(request)
|
||||||
|
|
||||||
|
var receivedRequestIds: seq[RequestId] = @[]
|
||||||
|
var receivedIdxs: seq[UInt256] = @[]
|
||||||
|
proc onSlotReservationsFull(requestId: RequestId, idx: UInt256) =
|
||||||
|
receivedRequestIds.add(requestId)
|
||||||
|
receivedIdxs.add(idx)
|
||||||
|
let subscription =
|
||||||
|
await market.subscribeSlotReservationsFull(onSlotReservationsFull)
|
||||||
|
|
||||||
|
await market.reserveSlot(request.id, slotIndex)
|
||||||
|
switchAccount(account2)
|
||||||
|
await market.reserveSlot(request.id, slotIndex)
|
||||||
|
switchAccount(account3)
|
||||||
|
await market.reserveSlot(request.id, slotIndex)
|
||||||
|
|
||||||
|
check receivedRequestIds == @[request.id]
|
||||||
|
check receivedIdxs == @[slotIndex]
|
||||||
|
await subscription.unsubscribe()
|
||||||
|
|
||||||
test "support fulfillment subscriptions":
|
test "support fulfillment subscriptions":
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
var receivedIds: seq[RequestId]
|
var receivedIds: seq[RequestId]
|
||||||
|
|
Loading…
Reference in New Issue