mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-05-18 09:19:43 +00:00
Process slot queue on reservation callback onMarkUnused
This commit is contained in:
parent
c87715046e
commit
a24efbdf56
@ -153,7 +153,7 @@ proc load*(sales: Sales) {.async.} =
|
|||||||
agent.start(SaleUnknown())
|
agent.start(SaleUnknown())
|
||||||
sales.agents.add agent
|
sales.agents.add agent
|
||||||
|
|
||||||
proc onReservationAdded(sales: Sales, availability: Availability) {.async.} =
|
proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} =
|
||||||
## Query last 256 blocks for new requests, adding them to the queue. `push`
|
## Query last 256 blocks for new requests, adding them to the queue. `push`
|
||||||
## checks for availability before adding to the queue. If processed, the
|
## checks for availability before adding to the queue. If processed, the
|
||||||
## sales agent will check if the slot is free.
|
## sales agent will check if the slot is free.
|
||||||
@ -384,9 +384,11 @@ proc startSlotQueue(sales: Sales) {.async.} =
|
|||||||
|
|
||||||
asyncSpawn slotQueue.start()
|
asyncSpawn slotQueue.start()
|
||||||
|
|
||||||
reservations.onReservationAdded =
|
proc onAvailabilityAdded(availability: Availability) {.async.} =
|
||||||
proc(availability: Availability) {.async.} =
|
await sales.onAvailabilityAdded(availability)
|
||||||
await sales.onReservationAdded(availability)
|
|
||||||
|
reservations.onAdded = onAvailabilityAdded
|
||||||
|
reservations.onMarkUnused = onAvailabilityAdded
|
||||||
|
|
||||||
|
|
||||||
proc subscribe(sales: Sales) {.async.} =
|
proc subscribe(sales: Sales) {.async.} =
|
||||||
|
|||||||
@ -43,9 +43,10 @@ type
|
|||||||
used*: bool
|
used*: bool
|
||||||
Reservations* = ref object
|
Reservations* = ref object
|
||||||
repo: RepoStore
|
repo: RepoStore
|
||||||
onReservationAdded: ?OnReservationAdded
|
onAdded: ?OnAvailabilityAdded
|
||||||
|
onMarkUnused: ?OnAvailabilityAdded
|
||||||
GetNext* = proc(): Future[?Availability] {.upraises: [], gcsafe, closure.}
|
GetNext* = proc(): Future[?Availability] {.upraises: [], gcsafe, closure.}
|
||||||
OnReservationAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.}
|
OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.}
|
||||||
AvailabilityIter* = ref object
|
AvailabilityIter* = ref object
|
||||||
finished*: bool
|
finished*: bool
|
||||||
next*: GetNext
|
next*: GetNext
|
||||||
@ -111,9 +112,15 @@ proc readValue*[T: AvailabilityId](
|
|||||||
mixin readValue
|
mixin readValue
|
||||||
value = T reader.readValue(T.distinctBase)
|
value = T reader.readValue(T.distinctBase)
|
||||||
|
|
||||||
proc `onReservationAdded=`*(self: Reservations,
|
proc `onAdded=`*(self: Reservations,
|
||||||
onReservationAdded: OnReservationAdded) =
|
onAdded: OnAvailabilityAdded) =
|
||||||
self.onReservationAdded = some onReservationAdded
|
self.onAdded = some onAdded
|
||||||
|
|
||||||
|
proc `onMarkUnused=`*(
|
||||||
|
self: Reservations,
|
||||||
|
onMarkUnused: OnAvailabilityAdded
|
||||||
|
) =
|
||||||
|
self.onMarkUnused = some onMarkUnused
|
||||||
|
|
||||||
func key(id: AvailabilityId): ?!Key =
|
func key(id: AvailabilityId): ?!Key =
|
||||||
(ReservationsKey / id.toArray.toHex)
|
(ReservationsKey / id.toArray.toHex)
|
||||||
@ -217,13 +224,13 @@ proc reserve*(
|
|||||||
|
|
||||||
return failure(updateErr)
|
return failure(updateErr)
|
||||||
|
|
||||||
if onReservationAdded =? self.onReservationAdded:
|
if onAdded =? self.onAdded:
|
||||||
try:
|
try:
|
||||||
await onReservationAdded(availability)
|
await onAdded(availability)
|
||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
# we don't have any insight into types of errors that `onProcessSlot` can
|
# we don't have any insight into types of errors that `onProcessSlot` can
|
||||||
# throw because it is caller-defined
|
# throw because it is caller-defined
|
||||||
warn "Unknown error during 'onReservationAdded' callback",
|
warn "Unknown error during 'onAdded' callback",
|
||||||
availabilityId = availability.id, error = e.msg
|
availabilityId = availability.id, error = e.msg
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
@ -292,6 +299,13 @@ proc markUnused*(
|
|||||||
let r = await self.update(availability)
|
let r = await self.update(availability)
|
||||||
if r.isOk:
|
if r.isOk:
|
||||||
trace "availability marked unused", id = id.toArray.toHex
|
trace "availability marked unused", id = id.toArray.toHex
|
||||||
|
|
||||||
|
if onMarkedUnused =? self.onMarkUnused:
|
||||||
|
try:
|
||||||
|
await onMarkedUnused(availability)
|
||||||
|
except CatchableError as e:
|
||||||
|
warn "Unknown error during 'onMarkedUnused' callback",
|
||||||
|
availabilityId = availability.id, error = e.msg
|
||||||
return r
|
return r
|
||||||
|
|
||||||
iterator items*(self: AvailabilityIter): Future[?Availability] =
|
iterator items*(self: AvailabilityIter): Future[?Availability] =
|
||||||
|
|||||||
@ -117,6 +117,25 @@ asyncchecksuite "Reservations module":
|
|||||||
|
|
||||||
check not available.used
|
check not available.used
|
||||||
|
|
||||||
|
test "onMarkedUnused called when availability marked unused":
|
||||||
|
var markedUnused: Availability
|
||||||
|
reservations.onMarkUnused = proc(a: Availability) {.async.} =
|
||||||
|
markedUnused = a
|
||||||
|
|
||||||
|
check isOk await reservations.reserve(availability)
|
||||||
|
check isOk await reservations.markUnused(availability.id)
|
||||||
|
|
||||||
|
check markedUnused == availability
|
||||||
|
|
||||||
|
test "onAdded called when availability is reserved":
|
||||||
|
var added: Availability
|
||||||
|
reservations.onAdded = proc(a: Availability) {.async.} =
|
||||||
|
added = a
|
||||||
|
|
||||||
|
check isOk await reservations.reserve(availability)
|
||||||
|
|
||||||
|
check added == availability
|
||||||
|
|
||||||
test "used availability can be found":
|
test "used availability can be found":
|
||||||
check isOk await reservations.reserve(availability)
|
check isOk await reservations.reserve(availability)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user