Fix slot queue push (#542)

* [sales] remove availability check before adding to slot queue

* [sales] add missing return statement

* [tests] remove 'eventuallyCheck' helper

* [sales] remove reservations from slot queue

* [tests] rename module `eventually` -> `always`

* [sales] increase slot queue size

Because it will now also hold items for which we haven't
checked availability yet.
This commit is contained in:
markspanbroek 2023-09-04 16:42:09 +02:00 committed by GitHub
parent d279eebd69
commit d3a22a7b7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 73 additions and 190 deletions

View File

@ -89,7 +89,7 @@ func new*(_: type Sales,
market: market,
clock: clock,
reservations: reservations,
slotQueue: SlotQueue.new(reservations),
slotQueue: SlotQueue.new(),
simulateProofFailures: simulateProofFailures
),
trackedFutures: TrackedFutures.new(),
@ -182,7 +182,7 @@ proc onReservationAdded(sales: Sales, availability: Availability) {.async.} =
for slots in requests:
for slot in slots:
if err =? (await queue.push(slot)).errorOption:
if err =? queue.push(slot).errorOption:
# continue on error
if err of QueueNotRunningError:
warn "cannot push items to queue, queue is not running"
@ -219,12 +219,11 @@ proc onStorageRequested(sales: Sales,
warn "Too many slots, cannot add to queue"
else:
warn "Failed to create slot queue items from request", error = err.msg
return
for item in items:
# continue on failure
slotQueue.push(item)
.track(sales)
.catch(proc(err: ref CatchableError) =
if err =? slotQueue.push(item).errorOption:
if err of NoMatchingAvailabilityError:
info "slot in queue had no matching availabilities, ignoring"
elif err of SlotQueueItemExistsError:
@ -233,7 +232,6 @@ proc onStorageRequested(sales: Sales,
warn "Failed to push item to queue becaue queue is not running"
else:
warn "Error adding request to SlotQueue", error = err.msg
)
proc onSlotFreed(sales: Sales,
requestId: RequestId,
@ -263,7 +261,7 @@ proc onSlotFreed(sales: Sales,
found = SlotQueueItem.init(request, slotIndex.truncate(uint16))
if err =? (await queue.push(found)).errorOption:
if err =? queue.push(found).errorOption:
raise err
addSlotToQueue()

View File

@ -5,7 +5,6 @@ import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import ./reservations
import ../errors
import ../rng
import ../utils
@ -46,7 +45,6 @@ type
maxWorkers: int
onProcessSlot: ?OnProcessSlot
queue: AsyncHeapQueue[SlotQueueItem]
reservations: Reservations
running: bool
workers: AsyncQueue[SlotQueueWorker]
trackedFutures: TrackedFutures
@ -68,7 +66,7 @@ const DefaultMaxWorkers = 3
# included in the queue if it is higher priority than any of the exisiting
# items. Older slots should be unfillable over time as other hosts fill the
# slots.
const DefaultMaxSize = 64'u16
const DefaultMaxSize = 128'u16
proc profitability(item: SlotQueueItem): UInt256 =
StorageAsk(collateral: item.collateral,
@ -105,7 +103,6 @@ proc `==`*(a, b: SlotQueueItem): bool =
a.slotIndex == b.slotIndex
proc new*(_: type SlotQueue,
reservations: Reservations,
maxWorkers = DefaultMaxWorkers,
maxSize: SlotQueueSize = DefaultMaxSize): SlotQueue =
@ -119,7 +116,6 @@ proc new*(_: type SlotQueue,
# Add 1 to always allow for an extra item to be pushed onto the queue
# temporarily. After push (and sort), the bottom-most item will be deleted
queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1),
reservations: reservations,
running: false,
trackedFutures: TrackedFutures.new()
)
@ -228,7 +224,7 @@ proc populateItem*(self: SlotQueue,
)
return none SlotQueueItem
proc push*(self: SlotQueue, item: SlotQueueItem): Future[?!void] {.async.} =
proc push*(self: SlotQueue, item: SlotQueueItem): ?!void =
trace "pushing item to queue",
requestId = item.requestId, slotIndex = item.slotIndex
@ -237,14 +233,6 @@ proc push*(self: SlotQueue, item: SlotQueueItem): Future[?!void] {.async.} =
let err = newException(QueueNotRunningError, "queue not running")
return failure(err)
without availability =? await self.reservations.find(item.slotSize,
item.duration,
item.profitability,
item.collateral,
used = false):
let err = newException(NoMatchingAvailabilityError, "no availability")
return failure(err)
if self.contains(item):
let err = newException(SlotQueueItemExistsError, "item already exists")
return failure(err)
@ -259,9 +247,9 @@ proc push*(self: SlotQueue, item: SlotQueueItem): Future[?!void] {.async.} =
doAssert self.queue.len <= self.queue.size - 1
return success()
proc push*(self: SlotQueue, items: seq[SlotQueueItem]): Future[?!void] {.async.} =
proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void =
for item in items:
if err =? (await self.push(item)).errorOption:
if err =? self.push(item).errorOption:
return failure(err)
return success()

View File

@ -9,10 +9,10 @@ import pkg/codex/rng
import ./helpers/nodeutils
import ./helpers/randomchunker
import ./helpers/mockdiscovery
import ./helpers/eventually
import ./helpers/always
import ../checktest
export randomchunker, nodeutils, mockdiscovery, eventually, checktest, manifest
export randomchunker, nodeutils, mockdiscovery, always, checktest, manifest
export libp2p except setup, eventually

View File

@ -1,17 +1,5 @@
import pkg/chronos
template eventuallyCheck*(condition: untyped, timeout = 5.seconds): bool =
proc loop: Future[bool] {.async.} =
let start = Moment.now()
while true:
if condition:
return true
if Moment.now() > (start + timeout):
return false
else:
await sleepAsync(1.millis)
await loop()
template always*(condition: untyped, timeout = 50.millis): bool =
proc loop: Future[bool] {.async.} =
let start = Moment.now()

View File

@ -64,7 +64,7 @@ asyncchecksuite "sales state 'proving'":
market.setProofRequired(slot.id, true)
await market.advanceToNextPeriod()
check eventuallyCheck receivedIds == @[slot.id] and receivedProofs == @[proof]
check eventually receivedIds == @[slot.id] and receivedProofs == @[proof]
await future.cancelAndWait()
await subscription.unsubscribe()
@ -77,6 +77,6 @@ asyncchecksuite "sales state 'proving'":
market.slotState[slot.id] = SlotState.Finished
await market.advanceToNextPeriod()
check eventuallyCheck future.finished
check eventually future.finished
check !(future.read()) of SalePayout

View File

@ -92,5 +92,5 @@ asyncchecksuite "sales state 'simulated-proving'":
market.slotState[slot.id] = SlotState.Finished
await market.advanceToNextPeriod()
check eventuallyCheck future.finished
check eventually future.finished
check !(future.read()) of SalePayout

View File

@ -17,7 +17,7 @@ import pkg/codex/blocktype as bt
import pkg/codex/node
import ../helpers/mockmarket
import ../helpers/mockclock
import ../helpers/eventually
import ../helpers/always
import ../examples
import ./helpers
@ -189,7 +189,10 @@ asyncchecksuite "Sales":
var request1 = StorageRequest.example
request1.ask.collateral = request.ask.collateral + 1
discard await reservations.reserve(availability)
await market.requestStorage(request)
# saturate queue
while queue.len < queue.size - 1:
await market.requestStorage(StorageRequest.example)
# send request
await market.requestStorage(request1)
await sleepAsync(5.millis) # wait for request slots to be added to queue
return request1
@ -236,33 +239,6 @@ asyncchecksuite "Sales":
let expected = SlotQueueItem.init(request, 2.uint16)
check eventually itemsProcessed.contains(expected)
test "request slots are not added to the slot queue when no availabilities exist":
var itemsProcessed: seq[SlotQueueItem] = @[]
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
await market.requestStorage(request)
# check that request was ignored due to no matching availability
check always itemsProcessed.len == 0
test "non-matching availabilities/requests are not added to the slot queue":
var itemsProcessed: seq[SlotQueueItem] = @[]
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
let nonMatchingAvailability = Availability.init(
size=100.u256,
duration=60.u256,
minPrice=601.u256, # too high
maxCollateral=400.u256
)
check isOk await reservations.reserve(nonMatchingAvailability)
await market.requestStorage(request)
# check that request was ignored due to no matching availability
check always itemsProcessed.len == 0
test "adds past requests to queue once availability added":
var itemsProcessed: seq[SlotQueueItem] = @[]
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
@ -273,7 +249,7 @@ asyncchecksuite "Sales":
# now add matching availability
check isOk await reservations.reserve(availability)
check eventuallyCheck itemsProcessed.len == request.ask.slots.int
check eventually itemsProcessed.len == request.ask.slots.int
test "makes storage unavailable when downloading a matched request":
var used = false
@ -395,7 +371,7 @@ asyncchecksuite "Sales":
test "fills a slot":
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventuallyCheck market.filled.len == 1
check eventually market.filled.len > 0
check market.filled[0].requestId == request.id
check market.filled[0].slotIndex < request.ask.slots.u256
check market.filled[0].proof == proof

View File

@ -9,7 +9,6 @@ import pkg/codex/sales/states/errorhandling
import pkg/codex/proving
import ../helpers/mockmarket
import ../helpers/mockclock
import ../helpers/eventually
import ../helpers
import ../examples

View File

@ -6,29 +6,18 @@ import pkg/datastore
import pkg/questionable
import pkg/questionable/results
import pkg/codex/sales/reservations
import pkg/codex/sales/slotqueue
import pkg/codex/stores
import ../helpers
import ../helpers/mockmarket
import ../helpers/eventually
import ../examples
suite "Slot queue start/stop":
var repo: RepoStore
var repoDs: Datastore
var metaDs: SQLiteDatastore
var reservations: Reservations
var queue: SlotQueue
setup:
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repo = RepoStore.new(repoDs, metaDs)
reservations = Reservations.new(repo)
queue = SlotQueue.new(reservations)
queue = SlotQueue.new()
teardown:
await queue.stop()
@ -58,11 +47,6 @@ suite "Slot queue start/stop":
suite "Slot queue workers":
var repo: RepoStore
var repoDs: Datastore
var metaDs: SQLiteDatastore
var availability: Availability
var reservations: Reservations
var queue: SlotQueue
proc onProcessSlot(item: SlotQueueItem, doneProcessing: Future[void]) {.async.} =
@ -74,21 +58,8 @@ suite "Slot queue workers":
setup:
let request = StorageRequest.example
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
let quota = request.ask.slotSize.truncate(uint) * 100 + 1
repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = quota)
reservations = Reservations.new(repo)
# create an availability that should always match
availability = Availability.init(
size = request.ask.slotSize * 100,
duration = request.ask.duration * 100,
minPrice = request.ask.pricePerSlot div 100,
maxCollateral = request.ask.collateral * 100
)
queue = SlotQueue.new(reservations, maxSize = 5, maxWorkers = 3)
queue = SlotQueue.new(maxSize = 5, maxWorkers = 3)
queue.onProcessSlot = onProcessSlot
discard await reservations.reserve(availability)
proc startQueue = asyncSpawn queue.start()
@ -100,11 +71,11 @@ suite "Slot queue workers":
test "maxWorkers cannot be 0":
expect ValueError:
discard SlotQueue.new(reservations, maxSize = 1, maxWorkers = 0)
discard SlotQueue.new(maxSize = 1, maxWorkers = 0)
test "maxWorkers cannot surpass maxSize":
expect ValueError:
discard SlotQueue.new(reservations, maxSize = 1, maxWorkers = 2)
discard SlotQueue.new(maxSize = 1, maxWorkers = 2)
test "does not surpass max workers":
startQueue()
@ -112,10 +83,10 @@ suite "Slot queue workers":
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
let item4 = SlotQueueItem.example
check (await queue.push(item1)).isOk
check (await queue.push(item2)).isOk
check (await queue.push(item3)).isOk
check (await queue.push(item4)).isOk
check queue.push(item1).isOk
check queue.push(item2).isOk
check queue.push(item3).isOk
check queue.push(item4).isOk
check eventually queue.activeWorkers == 3
test "discards workers once processing completed":
@ -130,28 +101,21 @@ suite "Slot queue workers":
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
let item4 = SlotQueueItem.example
check (await queue.push(item1)).isOk # finishes after 1.millis
check (await queue.push(item2)).isOk # finishes after 1.millis
check (await queue.push(item3)).isOk # finishes after 1.millis
check (await queue.push(item4)).isOk
check queue.push(item1).isOk # finishes after 1.millis
check queue.push(item2).isOk # finishes after 1.millis
check queue.push(item3).isOk # finishes after 1.millis
check queue.push(item4).isOk
check eventually queue.activeWorkers == 1
suite "Slot queue":
var onProcessSlotCalled = false
var onProcessSlotCalledWith: seq[(RequestId, uint16)]
var repo: RepoStore
var repoDs: Datastore
var metaDs: SQLiteDatastore
var availability: Availability
var reservations: Reservations
var queue: SlotQueue
let maxWorkers = 2
var unpauseQueue: Future[void]
var paused: bool
proc newSlotQueue(maxSize, maxWorkers: int, processSlotDelay = 1.millis) =
queue = SlotQueue.new(reservations, maxWorkers, maxSize.uint16)
queue = SlotQueue.new(maxWorkers, maxSize.uint16)
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
await sleepAsync(processSlotDelay)
trace "processing item", requestId = item.requestId, slotIndex = item.slotIndex
@ -163,20 +127,6 @@ suite "Slot queue":
setup:
onProcessSlotCalled = false
onProcessSlotCalledWith = @[]
let request = StorageRequest.example
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
let quota = request.ask.slotSize.truncate(uint) * 100 + 1
repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = quota)
reservations = Reservations.new(repo)
# create an availability that should always match
availability = Availability.init(
size = request.ask.slotSize * 100,
duration = request.ask.duration * 100,
minPrice = request.ask.pricePerSlot div 100,
maxCollateral = request.ask.collateral * 100
)
discard await reservations.reserve(availability)
teardown:
paused = false
@ -226,8 +176,8 @@ suite "Slot queue":
newSlotQueue(maxSize = 2, maxWorkers = 2)
let item1 = SlotQueueItem.example
let item2 = SlotQueueItem.example
check (await queue.push(item1)).isOk
check (await queue.push(item2)).isOk
check queue.push(item1).isOk
check queue.push(item2).isOk
check eventually onProcessSlotCalledWith == @[
(item1.requestId, item1.slotIndex),
(item2.requestId, item2.slotIndex)
@ -240,11 +190,11 @@ suite "Slot queue":
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
let item4 = SlotQueueItem.example
check isOk (await queue.push(item0))
check isOk (await queue.push(item1))
check isOk (await queue.push(item2))
check isOk (await queue.push(item3))
check isOk (await queue.push(item4))
check isOk queue.push(item0)
check isOk queue.push(item1)
check isOk queue.push(item2)
check isOk queue.push(item3)
check isOk queue.push(item4)
test "populates item with exisiting request metadata":
newSlotQueue(maxSize = 8, maxWorkers = 1, processSlotDelay = 10.millis)
@ -253,8 +203,8 @@ suite "Slot queue":
request1.ask.collateral += 1.u256
let items0 = SlotQueueItem.init(request0)
let items1 = SlotQueueItem.init(request1)
check (await queue.push(items0)).isOk
check (await queue.push(items1)).isOk
check queue.push(items0).isOk
check queue.push(items1).isOk
let populated = !queue.populateItem(request1.id, 12'u16)
check populated.requestId == request1.id
check populated.slotIndex == 12'u16
@ -289,9 +239,9 @@ suite "Slot queue":
let item0 = SlotQueueItem.example
let item1 = SlotQueueItem.example
let item2 = SlotQueueItem.example
check isOk (await queue.push(item0))
check isOk (await queue.push(item1))
check (await queue.push(@[item2, item2, item2, item2])).error of SlotQueueItemExistsError
check isOk queue.push(item0)
check isOk queue.push(item1)
check queue.push(@[item2, item2, item2, item2]).error of SlotQueueItemExistsError
test "can add items past max maxSize":
newSlotQueue(maxSize = 4, maxWorkers = 2, processSlotDelay = 10.millis)
@ -299,10 +249,10 @@ suite "Slot queue":
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
let item4 = SlotQueueItem.example
check (await queue.push(item1)).isOk
check (await queue.push(item2)).isOk
check (await queue.push(item3)).isOk
check (await queue.push(item4)).isOk
check queue.push(item1).isOk
check queue.push(item2).isOk
check queue.push(item3).isOk
check queue.push(item4).isOk
check eventually onProcessSlotCalledWith.len == 4
test "can delete items":
@ -311,10 +261,10 @@ suite "Slot queue":
let item1 = SlotQueueItem.example
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
check (await queue.push(item0)).isOk
check (await queue.push(item1)).isOk
check (await queue.push(item2)).isOk
check (await queue.push(item3)).isOk
check queue.push(item0).isOk
check queue.push(item1).isOk
check queue.push(item2).isOk
check queue.push(item3).isOk
queue.delete(item3)
check not queue.contains(item3)
@ -325,8 +275,8 @@ suite "Slot queue":
request1.ask.collateral += 1.u256
let items0 = SlotQueueItem.init(request0)
let items1 = SlotQueueItem.init(request1)
check (await queue.push(items0)).isOk
check (await queue.push(items1)).isOk
check queue.push(items0).isOk
check queue.push(items1).isOk
let last = items1[items1.high]
check eventually queue.contains(last)
queue.delete(last.requestId, last.slotIndex)
@ -341,8 +291,8 @@ suite "Slot queue":
request1.ask.collateral += 1.u256
let items0 = SlotQueueItem.init(request0)
let items1 = SlotQueueItem.init(request1)
check (await queue.push(items0)).isOk
check (await queue.push(items1)).isOk
check queue.push(items0).isOk
check queue.push(items1).isOk
queue.delete(request1.id)
check not onProcessSlotCalledWith.anyIt(it[0] == request1.id)
@ -366,7 +316,7 @@ suite "Slot queue":
let item4 = SlotQueueItem.init(request4, 0)
let item5 = SlotQueueItem.init(request5, 0)
check queue.contains(item5) == false
check (await queue.push(@[item0, item1, item2, item3, item4, item5])).isOk
check queue.push(@[item0, item1, item2, item3, item4, item5]).isOk
check queue.contains(item5)
test "sorts items by profitability ascending (higher pricePerSlot = higher priority)":
@ -401,13 +351,13 @@ suite "Slot queue":
newSlotQueue(maxSize = 2, maxWorkers = 2)
let item = SlotQueueItem.example
check not onProcessSlotCalled
check (await queue.push(item)).isOk
check queue.push(item).isOk
check eventually onProcessSlotCalled
test "should only process item once":
newSlotQueue(maxSize = 2, maxWorkers = 2)
let item = SlotQueueItem.example
check (await queue.push(item)).isOk
check queue.push(item).isOk
check eventually onProcessSlotCalledWith == @[
(item.requestId, item.slotIndex)
]
@ -425,13 +375,13 @@ suite "Slot queue":
request.ask.reward += 1.u256
let item3 = SlotQueueItem.init(request, 3)
check (await queue.push(item0)).isOk
check queue.push(item0).isOk
await sleepAsync(1.millis)
check (await queue.push(item1)).isOk
check queue.push(item1).isOk
await sleepAsync(1.millis)
check (await queue.push(item2)).isOk
check queue.push(item2).isOk
await sleepAsync(1.millis)
check (await queue.push(item3)).isOk
check queue.push(item3).isOk
check eventually (
onProcessSlotCalledWith == @[
@ -441,11 +391,3 @@ suite "Slot queue":
(item3.requestId, item3.slotIndex),
]
)
test "fails to push when there's no matching availability":
newSlotQueue(maxSize = 2, maxWorkers = 2)
discard await reservations.release(availability.id,
availability.size.truncate(uint))
let item = SlotQueueItem.example
check (await queue.push(item)).error of NoMatchingAvailabilityError

View File

@ -11,7 +11,6 @@ import pkg/codex/purchasing/states/cancelled
import pkg/codex/purchasing/states/failed
import ./helpers/mockmarket
import ./helpers/mockclock
import ./helpers/eventually
import ./examples
import ./helpers

View File

@ -5,7 +5,6 @@ import codex/validation
import codex/periods
import ./helpers/mockmarket
import ./helpers/mockclock
import ./helpers/eventually
import ./examples
import ./helpers

View File

@ -3,7 +3,6 @@ import pkg/questionable
import pkg/chronos
import pkg/upraises
import codex/utils/asyncstatemachine
import ../helpers/eventually
import ../helpers
type

View File

@ -11,7 +11,6 @@ import pkg/chronos
import pkg/asynctest
import codex/utils/timer
import ../helpers/eventually
import ../helpers
asyncchecksuite "Timer":

View File

@ -1,7 +1,6 @@
import pkg/asynctest
import pkg/chronos
import codex/utils/trackedfutures
import ../helpers/eventually
import ../helpers
type Module = object

View File

@ -2,7 +2,6 @@ import std/options
import pkg/chronos
import pkg/stew/byteutils
import codex/contracts
import ../codex/helpers/eventually
import ../ethertest
import ./examples
import ./time

View File

@ -7,7 +7,6 @@ import pkg/codex/contracts
import pkg/codex/utils/stintutils
import ../contracts/time
import ../contracts/deployment
import ../codex/helpers/eventually
import ./twonodes

View File

@ -6,7 +6,6 @@ import codex/contracts
import codex/periods
import ../contracts/time
import ../contracts/deployment
import ../codex/helpers/eventually
import ./twonodes
import ./multinodes