mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-02 13:33:10 +00:00
fix(sales): fix marketplace block expiry (#1258)
This commit is contained in:
parent
85823342e9
commit
27d807a841
@ -623,6 +623,7 @@ proc requestStorage*(
|
||||
proc onStore(
|
||||
self: CodexNodeRef,
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slotIdx: uint64,
|
||||
blocksCb: BlocksCb,
|
||||
isRepairing: bool = false,
|
||||
@ -651,8 +652,6 @@ proc onStore(
|
||||
trace "Unable to create slots builder", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
let expiry = request.expiry
|
||||
|
||||
if slotIdx > manifest.slotRoots.high.uint64:
|
||||
trace "Slot index not in manifest", slotIdx
|
||||
return failure(newException(CodexError, "Slot index not in manifest"))
|
||||
@ -663,7 +662,7 @@ proc onStore(
|
||||
trace "Updating expiry for blocks", blocks = blocks.len
|
||||
|
||||
let ensureExpiryFutures =
|
||||
blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry.toSecondsSince1970))
|
||||
blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry))
|
||||
|
||||
let res = await allFinishedFailed[?!void](ensureExpiryFutures)
|
||||
if res.failure.len > 0:
|
||||
@ -789,11 +788,12 @@ proc start*(self: CodexNodeRef) {.async.} =
|
||||
if hostContracts =? self.contracts.host:
|
||||
hostContracts.sales.onStore = proc(
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slot: uint64,
|
||||
onBatch: BatchProc,
|
||||
isRepairing: bool = false,
|
||||
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
|
||||
self.onStore(request, slot, onBatch, isRepairing)
|
||||
self.onStore(request, expiry, slot, onBatch, isRepairing)
|
||||
|
||||
hostContracts.sales.onExpiryUpdate = proc(
|
||||
rootCid: Cid, expiry: SecondsSince1970
|
||||
|
||||
@ -148,26 +148,12 @@ proc cleanUp(
|
||||
|
||||
# Re-add items back into the queue to prevent small availabilities from
|
||||
# draining the queue. Seen items will be ordered last.
|
||||
if data.slotIndex <= uint16.high.uint64 and reprocessSlot and request =? data.request:
|
||||
let res =
|
||||
await noCancel sales.context.market.slotCollateral(data.requestId, data.slotIndex)
|
||||
if res.isErr:
|
||||
error "Failed to re-add item back to the slot queue: unable to calculate collateral",
|
||||
error = res.error.msg
|
||||
else:
|
||||
let collateral = res.get()
|
||||
let queue = sales.context.slotQueue
|
||||
var seenItem = SlotQueueItem.init(
|
||||
data.requestId,
|
||||
data.slotIndex.uint16,
|
||||
data.ask,
|
||||
request.expiry,
|
||||
seen = true,
|
||||
collateral = collateral,
|
||||
)
|
||||
trace "pushing ignored item to queue, marked as seen"
|
||||
if err =? queue.push(seenItem).errorOption:
|
||||
error "failed to readd slot to queue", errorType = $(type err), error = err.msg
|
||||
if reprocessSlot and request =? data.request and var item =? agent.data.slotQueueItem:
|
||||
let queue = sales.context.slotQueue
|
||||
item.seen = true
|
||||
trace "pushing ignored item to queue, marked as seen"
|
||||
if err =? queue.push(item).errorOption:
|
||||
error "failed to readd slot to queue", errorType = $(type err), error = err.msg
|
||||
|
||||
let fut = sales.remove(agent)
|
||||
sales.trackedFutures.track(fut)
|
||||
@ -181,8 +167,9 @@ proc processSlot(
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
debug "Processing slot from queue", requestId = item.requestId, slot = item.slotIndex
|
||||
|
||||
let agent =
|
||||
newSalesAgent(sales.context, item.requestId, item.slotIndex, none StorageRequest)
|
||||
let agent = newSalesAgent(
|
||||
sales.context, item.requestId, item.slotIndex, none StorageRequest, some item
|
||||
)
|
||||
|
||||
let completed = newAsyncEvent()
|
||||
|
||||
|
||||
@ -11,6 +11,7 @@ import ./statemachine
|
||||
import ./salescontext
|
||||
import ./salesdata
|
||||
import ./reservations
|
||||
import ./slotqueue
|
||||
|
||||
export reservations
|
||||
|
||||
@ -42,10 +43,16 @@ proc newSalesAgent*(
|
||||
requestId: RequestId,
|
||||
slotIndex: uint64,
|
||||
request: ?StorageRequest,
|
||||
slotQueueItem = SlotQueueItem.none,
|
||||
): SalesAgent =
|
||||
var agent = SalesAgent.new()
|
||||
agent.context = context
|
||||
agent.data = SalesData(requestId: requestId, slotIndex: slotIndex, request: request)
|
||||
agent.data = SalesData(
|
||||
requestId: requestId,
|
||||
slotIndex: slotIndex,
|
||||
request: request,
|
||||
slotQueueItem: slotQueueItem,
|
||||
)
|
||||
return agent
|
||||
|
||||
proc retrieveRequest*(agent: SalesAgent) {.async.} =
|
||||
|
||||
@ -28,7 +28,11 @@ type
|
||||
gcsafe, async: (raises: [CancelledError])
|
||||
.}
|
||||
OnStore* = proc(
|
||||
request: StorageRequest, slot: uint64, blocksCb: BlocksCb, isRepairing: bool
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slot: uint64,
|
||||
blocksCb: BlocksCb,
|
||||
isRepairing: bool,
|
||||
): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).}
|
||||
OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.
|
||||
gcsafe, async: (raises: [CancelledError])
|
||||
|
||||
@ -2,6 +2,7 @@ import pkg/chronos
|
||||
import ../contracts/requests
|
||||
import ../market
|
||||
import ./reservations
|
||||
import ./slotqueue
|
||||
|
||||
type SalesData* = ref object
|
||||
requestId*: RequestId
|
||||
@ -10,3 +11,4 @@ type SalesData* = ref object
|
||||
slotIndex*: uint64
|
||||
cancelled*: Future[void]
|
||||
reservation*: ?Reservation
|
||||
slotQueueItem*: ?SlotQueueItem
|
||||
|
||||
@ -30,7 +30,7 @@ type
|
||||
duration: uint64
|
||||
pricePerBytePerSecond: UInt256
|
||||
collateral: UInt256 # Collateral computed
|
||||
expiry: uint64
|
||||
expiry: ?uint64
|
||||
seen: bool
|
||||
|
||||
# don't need to -1 to prevent overflow when adding 1 (to always allow push)
|
||||
@ -89,8 +89,9 @@ proc `<`*(a, b: SlotQueueItem): bool =
|
||||
scoreA.addIf(a.collateral < b.collateral, 2)
|
||||
scoreB.addIf(a.collateral > b.collateral, 2)
|
||||
|
||||
scoreA.addIf(a.expiry > b.expiry, 1)
|
||||
scoreB.addIf(a.expiry < b.expiry, 1)
|
||||
if expiryA =? a.expiry and expiryB =? b.expiry:
|
||||
scoreA.addIf(expiryA > expiryB, 1)
|
||||
scoreB.addIf(expiryA < expiryB, 1)
|
||||
|
||||
return scoreA > scoreB
|
||||
|
||||
@ -124,7 +125,7 @@ proc init*(
|
||||
requestId: RequestId,
|
||||
slotIndex: uint16,
|
||||
ask: StorageAsk,
|
||||
expiry: uint64,
|
||||
expiry: ?uint64,
|
||||
collateral: UInt256,
|
||||
seen = false,
|
||||
): SlotQueueItem =
|
||||
@ -139,6 +140,17 @@ proc init*(
|
||||
seen: seen,
|
||||
)
|
||||
|
||||
proc init*(
|
||||
_: type SlotQueueItem,
|
||||
requestId: RequestId,
|
||||
slotIndex: uint16,
|
||||
ask: StorageAsk,
|
||||
expiry: uint64,
|
||||
collateral: UInt256,
|
||||
seen = false,
|
||||
): SlotQueueItem =
|
||||
SlotQueueItem.init(requestId, slotIndex, ask, some expiry, collateral, seen)
|
||||
|
||||
proc init*(
|
||||
_: type SlotQueueItem,
|
||||
request: StorageRequest,
|
||||
@ -151,7 +163,7 @@ proc init*(
|
||||
_: type SlotQueueItem,
|
||||
requestId: RequestId,
|
||||
ask: StorageAsk,
|
||||
expiry: uint64,
|
||||
expiry: ?uint64,
|
||||
collateral: UInt256,
|
||||
): seq[SlotQueueItem] {.raises: [SlotsOutOfRangeError].} =
|
||||
if not ask.slots.inRange:
|
||||
@ -167,10 +179,19 @@ proc init*(
|
||||
Rng.instance.shuffle(items)
|
||||
return items
|
||||
|
||||
proc init*(
|
||||
_: type SlotQueueItem,
|
||||
requestId: RequestId,
|
||||
ask: StorageAsk,
|
||||
expiry: uint64,
|
||||
collateral: UInt256,
|
||||
): seq[SlotQueueItem] {.raises: [SlotsOutOfRangeError].} =
|
||||
SlotQueueItem.init(requestId, ask, some expiry, collateral)
|
||||
|
||||
proc init*(
|
||||
_: type SlotQueueItem, request: StorageRequest, collateral: UInt256
|
||||
): seq[SlotQueueItem] =
|
||||
return SlotQueueItem.init(request.id, request.ask, request.expiry, collateral)
|
||||
return SlotQueueItem.init(request.id, request.ask, uint64.none, collateral)
|
||||
|
||||
proc inRange*(val: SomeUnsignedInt): bool =
|
||||
val.uint16 in SlotQueueSize.low .. SlotQueueSize.high
|
||||
@ -196,6 +217,9 @@ proc collateralPerByte*(self: SlotQueueItem): UInt256 =
|
||||
proc seen*(self: SlotQueueItem): bool =
|
||||
self.seen
|
||||
|
||||
proc `seen=`*(self: var SlotQueueItem, seen: bool) =
|
||||
self.seen = seen
|
||||
|
||||
proc running*(self: SlotQueue): bool =
|
||||
self.running
|
||||
|
||||
|
||||
@ -38,6 +38,7 @@ method run*(
|
||||
let agent = SalesAgent(machine)
|
||||
let data = agent.data
|
||||
let context = agent.context
|
||||
let market = context.market
|
||||
let reservations = context.reservations
|
||||
|
||||
without onStore =? context.onStore:
|
||||
@ -69,11 +70,21 @@ method run*(
|
||||
return await reservations.release(reservation.id, reservation.availabilityId, bytes)
|
||||
|
||||
try:
|
||||
let slotId = slotId(request.id, data.slotIndex)
|
||||
let isRepairing = (await context.market.slotState(slotId)) == SlotState.Repair
|
||||
let requestId = request.id
|
||||
let slotId = slotId(requestId, data.slotIndex)
|
||||
let requestState = await market.requestState(requestId)
|
||||
let isRepairing = (await market.slotState(slotId)) == SlotState.Repair
|
||||
|
||||
trace "Retrieving expiry"
|
||||
var expiry: SecondsSince1970
|
||||
if state =? requestState and state == RequestState.Started:
|
||||
expiry = await market.getRequestEnd(requestId)
|
||||
else:
|
||||
expiry = await market.requestExpiresAt(requestId)
|
||||
|
||||
trace "Starting download"
|
||||
if err =? (await onStore(request, data.slotIndex, onBlocks, isRepairing)).errorOption:
|
||||
if err =?
|
||||
(await onStore(request, expiry, data.slotIndex, onBlocks, isRepairing)).errorOption:
|
||||
return some State(SaleErrored(error: err, reprocessSlot: false))
|
||||
|
||||
trace "Download complete"
|
||||
|
||||
@ -14,6 +14,7 @@ from pkg/ethers import BlockTag
|
||||
import codex/clock
|
||||
|
||||
import ../examples
|
||||
import ./mockclock
|
||||
|
||||
export market
|
||||
export tables
|
||||
@ -51,7 +52,7 @@ type
|
||||
errorOnFillSlot*: ?(ref MarketError)
|
||||
errorOnFreeSlot*: ?(ref MarketError)
|
||||
errorOnGetHost*: ?(ref MarketError)
|
||||
clock: ?Clock
|
||||
clock: Clock
|
||||
|
||||
Fulfillment* = object
|
||||
requestId*: RequestId
|
||||
@ -63,7 +64,7 @@ type
|
||||
host*: Address
|
||||
slotIndex*: uint64
|
||||
proof*: Groth16Proof
|
||||
timestamp: ?SecondsSince1970
|
||||
timestamp: SecondsSince1970
|
||||
collateral*: UInt256
|
||||
|
||||
Subscriptions = object
|
||||
@ -119,7 +120,7 @@ proc hash*(address: Address): Hash =
|
||||
proc hash*(requestId: RequestId): Hash =
|
||||
hash(requestId.toArray)
|
||||
|
||||
proc new*(_: type MockMarket, clock: ?Clock = Clock.none): MockMarket =
|
||||
proc new*(_: type MockMarket, clock: Clock = MockClock.new()): MockMarket =
|
||||
## Create a new mocked Market instance
|
||||
##
|
||||
let config = MarketplaceConfig(
|
||||
@ -181,10 +182,15 @@ method getPointer*(market: MockMarket, slotId: SlotId): Future[uint8] {.async.}
|
||||
method requestStorage*(
|
||||
market: MockMarket, request: StorageRequest
|
||||
) {.async: (raises: [CancelledError, MarketError]).} =
|
||||
let now = market.clock.now()
|
||||
let requestExpiresAt = now + request.expiry.toSecondsSince1970
|
||||
let requestEndsAt = now + request.ask.duration.toSecondsSince1970
|
||||
market.requested.add(request)
|
||||
market.requestExpiry[request.id] = requestExpiresAt
|
||||
market.requestEnds[request.id] = requestEndsAt
|
||||
var subscriptions = market.subscriptions.onRequest
|
||||
for subscription in subscriptions:
|
||||
subscription.callback(request.id, request.ask, request.expiry)
|
||||
subscription.callback(request.id, request.ask, requestExpiresAt.uint64)
|
||||
|
||||
method myRequests*(market: MockMarket): Future[seq[RequestId]] {.async.} =
|
||||
return market.activeRequests[market.signer]
|
||||
@ -308,7 +314,7 @@ proc fillSlot*(
|
||||
slotIndex: slotIndex,
|
||||
proof: proof,
|
||||
host: host,
|
||||
timestamp: market.clock .? now,
|
||||
timestamp: market.clock.now,
|
||||
collateral: collateral,
|
||||
)
|
||||
market.filled.add(slot)
|
||||
@ -541,7 +547,11 @@ method queryPastStorageRequestedEvents*(
|
||||
): Future[seq[StorageRequested]] {.async.} =
|
||||
return market.requested.map(
|
||||
request =>
|
||||
StorageRequested(requestId: request.id, ask: request.ask, expiry: request.expiry)
|
||||
StorageRequested(
|
||||
requestId: request.id,
|
||||
ask: request.ask,
|
||||
expiry: market.requestExpiry[request.id].uint64,
|
||||
)
|
||||
)
|
||||
|
||||
method queryPastStorageRequestedEvents*(
|
||||
@ -549,7 +559,11 @@ method queryPastStorageRequestedEvents*(
|
||||
): Future[seq[StorageRequested]] {.async.} =
|
||||
return market.requested.map(
|
||||
request =>
|
||||
StorageRequested(requestId: request.id, ask: request.ask, expiry: request.expiry)
|
||||
StorageRequested(
|
||||
requestId: request.id,
|
||||
ask: request.ask,
|
||||
expiry: market.requestExpiry[request.id].uint64,
|
||||
)
|
||||
)
|
||||
|
||||
method queryPastSlotFilledEvents*(
|
||||
@ -571,10 +585,7 @@ method queryPastSlotFilledEvents*(
|
||||
): Future[seq[SlotFilled]] {.async.} =
|
||||
let filtered = market.filled.filter(
|
||||
proc(slot: MockSlot): bool =
|
||||
if timestamp =? slot.timestamp:
|
||||
return timestamp >= fromTime
|
||||
else:
|
||||
true
|
||||
return slot.timestamp >= fromTime
|
||||
)
|
||||
return filtered.map(
|
||||
slot => SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex)
|
||||
|
||||
@ -116,8 +116,7 @@ asyncchecksuite "Test Node - Host contracts":
|
||||
let onStore = !sales.onStore
|
||||
var request = StorageRequest.example
|
||||
request.content.cid = verifiableBlock.cid
|
||||
request.expiry =
|
||||
(getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.uint64
|
||||
let expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix
|
||||
var fetchedBytes: uint = 0
|
||||
|
||||
let onBlocks = proc(
|
||||
@ -127,7 +126,7 @@ asyncchecksuite "Test Node - Host contracts":
|
||||
fetchedBytes += blk.data.len.uint
|
||||
return success()
|
||||
|
||||
(await onStore(request, 1.uint64, onBlocks, isRepairing = false)).tryGet()
|
||||
(await onStore(request, expiry, 1.uint64, onBlocks, isRepairing = false)).tryGet()
|
||||
check fetchedBytes == 12 * DefaultBlockSize.uint
|
||||
|
||||
let indexer = verifiable.protectedStrategy.init(
|
||||
@ -141,4 +140,4 @@ asyncchecksuite "Test Node - Host contracts":
|
||||
bytes = (await localStoreMetaDs.get(key)).tryGet
|
||||
blkMd = BlockMetadata.decode(bytes).tryGet
|
||||
|
||||
check blkMd.expiry == request.expiry.toSecondsSince1970
|
||||
check blkMd.expiry == expiry
|
||||
|
||||
@ -37,7 +37,6 @@ asyncchecksuite "Sales - start":
|
||||
var repo: RepoStore
|
||||
var queue: SlotQueue
|
||||
var itemsProcessed: seq[SlotQueueItem]
|
||||
var expiry: SecondsSince1970
|
||||
|
||||
setup:
|
||||
request = StorageRequest(
|
||||
@ -51,7 +50,7 @@ asyncchecksuite "Sales - start":
|
||||
content: StorageContent(
|
||||
cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet
|
||||
),
|
||||
expiry: (getTime() + initDuration(hours = 1)).toUnix.uint64,
|
||||
expiry: 60,
|
||||
)
|
||||
|
||||
market = MockMarket.new()
|
||||
@ -63,7 +62,11 @@ asyncchecksuite "Sales - start":
|
||||
sales = Sales.new(market, clock, repo)
|
||||
reservations = sales.context.reservations
|
||||
sales.onStore = proc(
|
||||
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slot: uint64,
|
||||
onBatch: BatchProc,
|
||||
isRepairing = false,
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
return success()
|
||||
|
||||
@ -78,8 +81,6 @@ asyncchecksuite "Sales - start":
|
||||
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =
|
||||
return success(proof)
|
||||
itemsProcessed = @[]
|
||||
expiry = (clock.now() + 42)
|
||||
request.expiry = expiry.uint64
|
||||
|
||||
teardown:
|
||||
await sales.stop()
|
||||
@ -100,7 +101,6 @@ asyncchecksuite "Sales - start":
|
||||
request.ask.slots = 2
|
||||
market.requested = @[request]
|
||||
market.requestState[request.id] = RequestState.New
|
||||
market.requestExpiry[request.id] = expiry
|
||||
|
||||
let slot0 = MockSlot(requestId: request.id, slotIndex: 0, proof: proof, host: me)
|
||||
await fillSlot(slot0.slotIndex)
|
||||
@ -167,14 +167,13 @@ asyncchecksuite "Sales":
|
||||
content: StorageContent(
|
||||
cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet
|
||||
),
|
||||
expiry: (getTime() + initDuration(hours = 1)).toUnix.uint64,
|
||||
expiry: 60,
|
||||
)
|
||||
|
||||
market = MockMarket.new()
|
||||
|
||||
let me = await market.getSigner()
|
||||
market.activeSlots[me] = @[]
|
||||
market.requestEnds[request.id] = request.expiry.toSecondsSince1970
|
||||
|
||||
clock = MockClock.new()
|
||||
let repoDs = repoTmp.newDb()
|
||||
@ -184,7 +183,11 @@ asyncchecksuite "Sales":
|
||||
sales = Sales.new(market, clock, repo)
|
||||
reservations = sales.context.reservations
|
||||
sales.onStore = proc(
|
||||
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slot: uint64,
|
||||
onBatch: BatchProc,
|
||||
isRepairing = false,
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
return success()
|
||||
|
||||
@ -361,7 +364,11 @@ asyncchecksuite "Sales":
|
||||
|
||||
test "availability size is reduced by request slot size when fully downloaded":
|
||||
sales.onStore = proc(
|
||||
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slot: uint64,
|
||||
onBatch: BatchProc,
|
||||
isRepairing = false,
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
let blk = bt.Block.new(@[1.byte]).get
|
||||
await onBatch(blk.repeat(request.ask.slotSize.int))
|
||||
@ -374,7 +381,11 @@ asyncchecksuite "Sales":
|
||||
test "bytes are returned to availability once finished":
|
||||
var slotIndex = 0.uint64
|
||||
sales.onStore = proc(
|
||||
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slot: uint64,
|
||||
onBatch: BatchProc,
|
||||
isRepairing = false,
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
slotIndex = slot
|
||||
let blk = bt.Block.new(@[1.byte]).get
|
||||
@ -456,7 +467,11 @@ asyncchecksuite "Sales":
|
||||
|
||||
var storingRequest: StorageRequest
|
||||
sales.onStore = proc(
|
||||
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slot: uint64,
|
||||
onBatch: BatchProc,
|
||||
isRepairing = false,
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
storingRequest = request
|
||||
return success()
|
||||
@ -469,7 +484,11 @@ asyncchecksuite "Sales":
|
||||
var storingRequest: StorageRequest
|
||||
var storingSlot: uint64
|
||||
sales.onStore = proc(
|
||||
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slot: uint64,
|
||||
onBatch: BatchProc,
|
||||
isRepairing = false,
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
storingRequest = request
|
||||
storingSlot = slot
|
||||
@ -482,7 +501,11 @@ asyncchecksuite "Sales":
|
||||
test "makes storage available again when data retrieval fails":
|
||||
let error = newException(IOError, "data retrieval failed")
|
||||
sales.onStore = proc(
|
||||
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slot: uint64,
|
||||
onBatch: BatchProc,
|
||||
isRepairing = false,
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
return failure(error)
|
||||
createAvailability()
|
||||
@ -551,7 +574,11 @@ asyncchecksuite "Sales":
|
||||
test "makes storage available again when other host fills the slot":
|
||||
let otherHost = Address.example
|
||||
sales.onStore = proc(
|
||||
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slot: uint64,
|
||||
onBatch: BatchProc,
|
||||
isRepairing = false,
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
await sleepAsync(chronos.hours(1))
|
||||
return success()
|
||||
@ -562,12 +589,13 @@ asyncchecksuite "Sales":
|
||||
check eventually (await reservations.all(Availability)).get == @[availability]
|
||||
|
||||
test "makes storage available again when request expires":
|
||||
let expiry = getTime().toUnix() + 10
|
||||
market.requestExpiry[request.id] = expiry
|
||||
|
||||
let origSize = availability.freeSize
|
||||
sales.onStore = proc(
|
||||
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slot: uint64,
|
||||
onBatch: BatchProc,
|
||||
isRepairing = false,
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
await sleepAsync(chronos.hours(1))
|
||||
return success()
|
||||
@ -578,23 +606,25 @@ asyncchecksuite "Sales":
|
||||
# would otherwise not set the timeout early enough as it uses `clock.now` in the deadline calculation.
|
||||
await sleepAsync(chronos.milliseconds(100))
|
||||
market.requestState[request.id] = RequestState.Cancelled
|
||||
clock.set(expiry + 1)
|
||||
clock.set(market.requestExpiry[request.id] + 1)
|
||||
check eventually (await reservations.all(Availability)).get == @[availability]
|
||||
check getAvailability().freeSize == origSize
|
||||
|
||||
test "verifies that request is indeed expired from onchain before firing onCancelled":
|
||||
let expiry = getTime().toUnix() + 10
|
||||
# ensure only one slot, otherwise once bytes are returned to the
|
||||
# availability, the queue will be unpaused and availability will be consumed
|
||||
# by other slots
|
||||
request.ask.slots = 1
|
||||
market.requestExpiry[request.id] = expiry
|
||||
market.requestEnds[request.id] =
|
||||
getTime().toUnix() + cast[int64](request.ask.duration)
|
||||
|
||||
let origSize = availability.freeSize
|
||||
sales.onStore = proc(
|
||||
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||
request: StorageRequest,
|
||||
expiry: SecondsSince1970,
|
||||
slot: uint64,
|
||||
onBatch: BatchProc,
|
||||
isRepairing = false,
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
await sleepAsync(chronos.hours(1))
|
||||
return success()
|
||||
@ -606,7 +636,7 @@ asyncchecksuite "Sales":
|
||||
# If we would not await, then the `clock.set` would run "too fast" as the `subscribeCancellation()`
|
||||
# would otherwise not set the timeout early enough as it uses `clock.now` in the deadline calculation.
|
||||
await sleepAsync(chronos.milliseconds(100))
|
||||
clock.set(expiry + 1)
|
||||
clock.set(market.requestExpiry[request.id] + 1)
|
||||
check getAvailability().freeSize == 0
|
||||
|
||||
market.requestState[request.id] = RequestState.Cancelled
|
||||
|
||||
@ -300,10 +300,7 @@ suite "Slot queue":
|
||||
let uint64Slots = uint64(maxUInt16)
|
||||
request.ask.slots = uint64Slots
|
||||
let items = SlotQueueItem.init(
|
||||
request.id,
|
||||
request.ask,
|
||||
request.expiry,
|
||||
collateral = request.ask.collateralPerSlot,
|
||||
request.id, request.ask, 0, collateral = request.ask.collateralPerSlot
|
||||
)
|
||||
check items.len.uint16 == maxUInt16
|
||||
|
||||
@ -314,10 +311,7 @@ suite "Slot queue":
|
||||
request.ask.slots = uint64Slots
|
||||
expect SlotsOutOfRangeError:
|
||||
discard SlotQueueItem.init(
|
||||
request.id,
|
||||
request.ask,
|
||||
request.expiry,
|
||||
collateral = request.ask.collateralPerSlot,
|
||||
request.id, request.ask, 0, collateral = request.ask.collateralPerSlot
|
||||
)
|
||||
|
||||
test "cannot push duplicate items":
|
||||
@ -433,11 +427,12 @@ suite "Slot queue":
|
||||
|
||||
test "sorts items by expiry descending (longer expiry = higher priority)":
|
||||
var request = StorageRequest.example
|
||||
let item0 =
|
||||
SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot)
|
||||
request.expiry += 1
|
||||
let item1 =
|
||||
SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot)
|
||||
let item0 = SlotQueueItem.init(
|
||||
request.id, 0, request.ask, expiry = 3, collateral = request.ask.collateralPerSlot
|
||||
)
|
||||
let item1 = SlotQueueItem.init(
|
||||
request.id, 1, request.ask, expiry = 7, collateral = request.ask.collateralPerSlot
|
||||
)
|
||||
check item1 < item0
|
||||
|
||||
test "sorts items by slot size descending (bigger dataset = higher profitability = higher priority)":
|
||||
@ -545,12 +540,7 @@ suite "Slot queue":
|
||||
newSlotQueue(maxSize = 4, maxWorkers = 4)
|
||||
let request = StorageRequest.example
|
||||
let item0 = SlotQueueItem.init(
|
||||
request.id,
|
||||
0'u16,
|
||||
request.ask,
|
||||
request.expiry,
|
||||
request.ask.collateralPerSlot,
|
||||
seen = true,
|
||||
request.id, 0'u16, request.ask, 0, request.ask.collateralPerSlot, seen = true
|
||||
)
|
||||
check queue.paused
|
||||
check queue.push(item0).isOk
|
||||
@ -560,12 +550,7 @@ suite "Slot queue":
|
||||
newSlotQueue(maxSize = 4, maxWorkers = 4)
|
||||
let request = StorageRequest.example
|
||||
let item = SlotQueueItem.init(
|
||||
request.id,
|
||||
1'u16,
|
||||
request.ask,
|
||||
request.expiry,
|
||||
request.ask.collateralPerSlot,
|
||||
seen = false,
|
||||
request.id, 1'u16, request.ask, 0, request.ask.collateralPerSlot, seen = false
|
||||
)
|
||||
check queue.paused
|
||||
# push causes unpause
|
||||
@ -578,20 +563,10 @@ suite "Slot queue":
|
||||
newSlotQueue(maxSize = 4, maxWorkers = 4)
|
||||
let request = StorageRequest.example
|
||||
let unseen = SlotQueueItem.init(
|
||||
request.id,
|
||||
0'u16,
|
||||
request.ask,
|
||||
request.expiry,
|
||||
request.ask.collateralPerSlot,
|
||||
seen = false,
|
||||
request.id, 0'u16, request.ask, 0, request.ask.collateralPerSlot, seen = false
|
||||
)
|
||||
let seen = SlotQueueItem.init(
|
||||
request.id,
|
||||
1'u16,
|
||||
request.ask,
|
||||
request.expiry,
|
||||
request.ask.collateralPerSlot,
|
||||
seen = true,
|
||||
request.id, 1'u16, request.ask, 0, request.ask.collateralPerSlot, seen = true
|
||||
)
|
||||
# push causes unpause
|
||||
check queue.push(unseen).isSuccess
|
||||
@ -606,20 +581,10 @@ suite "Slot queue":
|
||||
newSlotQueue(maxSize = 4, maxWorkers = 1)
|
||||
let request = StorageRequest.example
|
||||
let item0 = SlotQueueItem.init(
|
||||
request.id,
|
||||
0'u16,
|
||||
request.ask,
|
||||
request.expiry,
|
||||
request.ask.collateralPerSlot,
|
||||
seen = true,
|
||||
request.id, 0'u16, request.ask, 0, request.ask.collateralPerSlot, seen = true
|
||||
)
|
||||
let item1 = SlotQueueItem.init(
|
||||
request.id,
|
||||
1'u16,
|
||||
request.ask,
|
||||
request.expiry,
|
||||
request.ask.collateralPerSlot,
|
||||
seen = true,
|
||||
request.id, 1'u16, request.ask, 0, request.ask.collateralPerSlot, seen = true
|
||||
)
|
||||
check queue.push(item0).isOk
|
||||
check queue.push(item1).isOk
|
||||
|
||||
@ -96,21 +96,19 @@ asyncchecksuite "Purchasing":
|
||||
check purchase.error.isNone
|
||||
|
||||
test "fails when request times out":
|
||||
let expiry = getTime().toUnix() + 10
|
||||
market.requestExpiry[populatedRequest.id] = expiry
|
||||
let purchase = await purchasing.purchase(populatedRequest)
|
||||
check eventually market.requested.len > 0
|
||||
|
||||
let expiry = market.requestExpiry[populatedRequest.id]
|
||||
clock.set(expiry + 1)
|
||||
expect PurchaseTimeout:
|
||||
await purchase.wait()
|
||||
|
||||
test "checks that funds were withdrawn when purchase times out":
|
||||
let expiry = getTime().toUnix() + 10
|
||||
market.requestExpiry[populatedRequest.id] = expiry
|
||||
let purchase = await purchasing.purchase(populatedRequest)
|
||||
check eventually market.requested.len > 0
|
||||
let request = market.requested[0]
|
||||
let expiry = market.requestExpiry[populatedRequest.id]
|
||||
clock.set(expiry + 1)
|
||||
expect PurchaseTimeout:
|
||||
await purchase.wait()
|
||||
|
||||
@ -50,7 +50,7 @@ asyncchecksuite "validation":
|
||||
setup:
|
||||
groupIndex = groupIndexForSlotId(slot.id, !validationGroups)
|
||||
clock = MockClock.new()
|
||||
market = MockMarket.new(clock = Clock(clock).some)
|
||||
market = MockMarket.new(clock)
|
||||
market.config.proofs.period = period
|
||||
market.config.proofs.timeout = timeout
|
||||
validation = newValidation(clock, market, maxSlots, validationGroups, groupIndex)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user