mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-05 23:13:09 +00:00
feat(marketplace): indicate that slot is being repaired when trying to download (#1083)
* Indicate that slot is being repaired when trying to download * Fix tests * Apply nph * Calculate the repair collateral when adding the item into the queue * Add slotCollateral calculation with getRequest cache and remove populationItem function * Update with pricePerByte * Simplify StorageAsk parameter * Minor fixes * Move cache request to another PR * Rename SlotQueueItem collateral and required in init * Use override func to optimise calls when the slot state is known * Remove unused code * Cosmetic change * Use raiseMarketError helper * Add exceptions to async pragma * Cosmetic change * Use raiseMarketError helper * Let slotCollateral determines the slot sate * Use configSync to avoid async pragma in onStorageRequested * Add loadConfig function * Add CatchableError to async pragma * Add missing pragma raises errors * Move loadConfig * Avoid swallow CancelledError * Avoid swallowing CancelledError * Avoid swallowing CancelledError * Update error messages * Except MarketError instead of CatchableError * Fix merge issue * Log fatal when configuration cannot be loaded * Propagate MarketError in slotCollateral * Remove useless configSync * Use result with explicit error * Fix syntax --------- Signed-off-by: Arnaud <arnaud@status.im>
This commit is contained in:
parent
fab5e16afd
commit
7065718e09
@ -134,6 +134,10 @@ proc bootstrapInteractions(s: CodexServer): Future[void] {.async.} =
|
|||||||
if config.simulateProofFailures > 0:
|
if config.simulateProofFailures > 0:
|
||||||
warn "Proof failure simulation is not enabled for this build! Configuration ignored"
|
warn "Proof failure simulation is not enabled for this build! Configuration ignored"
|
||||||
|
|
||||||
|
if error =? (await market.loadConfig()).errorOption:
|
||||||
|
fatal "Cannot load market configuration", error = error.msg
|
||||||
|
quit QuitFailure
|
||||||
|
|
||||||
let purchasing = Purchasing.new(market, clock)
|
let purchasing = Purchasing.new(market, clock)
|
||||||
let sales = Sales.new(market, clock, repo, proofFailures)
|
let sales = Sales.new(market, clock, repo, proofFailures)
|
||||||
client = some ClientInteractions.new(clock, purchasing)
|
client = some ClientInteractions.new(clock, purchasing)
|
||||||
|
|||||||
@ -55,11 +55,17 @@ template convertEthersError(body) =
|
|||||||
except EthersError as error:
|
except EthersError as error:
|
||||||
raiseMarketError(error.msgDetail)
|
raiseMarketError(error.msgDetail)
|
||||||
|
|
||||||
proc config(market: OnChainMarket): Future[MarketplaceConfig] {.async.} =
|
proc config(
|
||||||
|
market: OnChainMarket
|
||||||
|
): Future[MarketplaceConfig] {.async: (raises: [CancelledError, MarketError]).} =
|
||||||
without resolvedConfig =? market.configuration:
|
without resolvedConfig =? market.configuration:
|
||||||
let fetchedConfig = await market.contract.configuration()
|
if err =? (await market.loadConfig()).errorOption:
|
||||||
market.configuration = some fetchedConfig
|
raiseMarketError(err.msg)
|
||||||
return fetchedConfig
|
|
||||||
|
without config =? market.configuration:
|
||||||
|
raiseMarketError("Failed to access to config from the Marketplace contract")
|
||||||
|
|
||||||
|
return config
|
||||||
|
|
||||||
return resolvedConfig
|
return resolvedConfig
|
||||||
|
|
||||||
@ -70,7 +76,26 @@ proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} =
|
|||||||
let token = Erc20Token.new(tokenAddress, market.signer)
|
let token = Erc20Token.new(tokenAddress, market.signer)
|
||||||
discard await token.increaseAllowance(market.contract.address(), amount).confirm(1)
|
discard await token.increaseAllowance(market.contract.address(), amount).confirm(1)
|
||||||
|
|
||||||
method getZkeyHash*(market: OnChainMarket): Future[?string] {.async.} =
|
method loadConfig*(
|
||||||
|
market: OnChainMarket
|
||||||
|
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
try:
|
||||||
|
without config =? market.configuration:
|
||||||
|
let fetchedConfig = await market.contract.configuration()
|
||||||
|
|
||||||
|
market.configuration = some fetchedConfig
|
||||||
|
|
||||||
|
return success()
|
||||||
|
except AsyncLockError, EthersError:
|
||||||
|
let err = getCurrentException()
|
||||||
|
return failure newException(
|
||||||
|
MarketError,
|
||||||
|
"Failed to fetch the config from the Marketplace contract: " & err.msg,
|
||||||
|
)
|
||||||
|
|
||||||
|
method getZkeyHash*(
|
||||||
|
market: OnChainMarket
|
||||||
|
): Future[?string] {.async: (raises: [CancelledError, MarketError]).} =
|
||||||
let config = await market.config()
|
let config = await market.config()
|
||||||
return some config.proofs.zkeyHash
|
return some config.proofs.zkeyHash
|
||||||
|
|
||||||
@ -78,18 +103,24 @@ method getSigner*(market: OnChainMarket): Future[Address] {.async.} =
|
|||||||
convertEthersError:
|
convertEthersError:
|
||||||
return await market.signer.getAddress()
|
return await market.signer.getAddress()
|
||||||
|
|
||||||
method periodicity*(market: OnChainMarket): Future[Periodicity] {.async.} =
|
method periodicity*(
|
||||||
|
market: OnChainMarket
|
||||||
|
): Future[Periodicity] {.async: (raises: [CancelledError, MarketError]).} =
|
||||||
convertEthersError:
|
convertEthersError:
|
||||||
let config = await market.config()
|
let config = await market.config()
|
||||||
let period = config.proofs.period
|
let period = config.proofs.period
|
||||||
return Periodicity(seconds: period)
|
return Periodicity(seconds: period)
|
||||||
|
|
||||||
method proofTimeout*(market: OnChainMarket): Future[uint64] {.async.} =
|
method proofTimeout*(
|
||||||
|
market: OnChainMarket
|
||||||
|
): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} =
|
||||||
convertEthersError:
|
convertEthersError:
|
||||||
let config = await market.config()
|
let config = await market.config()
|
||||||
return config.proofs.timeout
|
return config.proofs.timeout
|
||||||
|
|
||||||
method repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async.} =
|
method repairRewardPercentage*(
|
||||||
|
market: OnChainMarket
|
||||||
|
): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
|
||||||
convertEthersError:
|
convertEthersError:
|
||||||
let config = await market.config()
|
let config = await market.config()
|
||||||
return config.collateral.repairRewardPercentage
|
return config.collateral.repairRewardPercentage
|
||||||
@ -99,7 +130,9 @@ method requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async.} =
|
|||||||
let config = await market.config()
|
let config = await market.config()
|
||||||
return config.requestDurationLimit
|
return config.requestDurationLimit
|
||||||
|
|
||||||
method proofDowntime*(market: OnChainMarket): Future[uint8] {.async.} =
|
method proofDowntime*(
|
||||||
|
market: OnChainMarket
|
||||||
|
): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
|
||||||
convertEthersError:
|
convertEthersError:
|
||||||
let config = await market.config()
|
let config = await market.config()
|
||||||
return config.proofs.downtime
|
return config.proofs.downtime
|
||||||
@ -128,19 +161,22 @@ method requestStorage(market: OnChainMarket, request: StorageRequest) {.async.}
|
|||||||
|
|
||||||
method getRequest*(
|
method getRequest*(
|
||||||
market: OnChainMarket, id: RequestId
|
market: OnChainMarket, id: RequestId
|
||||||
): Future[?StorageRequest] {.async.} =
|
): Future[?StorageRequest] {.async: (raises: [CancelledError]).} =
|
||||||
let key = $id
|
try:
|
||||||
|
let key = $id
|
||||||
|
|
||||||
if market.requestCache.contains(key):
|
if key in market.requestCache:
|
||||||
return some market.requestCache[key]
|
return some market.requestCache[key]
|
||||||
|
|
||||||
convertEthersError:
|
let request = await market.contract.getRequest(id)
|
||||||
try:
|
market.requestCache[key] = request
|
||||||
let request = await market.contract.getRequest(id)
|
return some request
|
||||||
market.requestCache[key] = request
|
except Marketplace_UnknownRequest, KeyError:
|
||||||
return some request
|
warn "Cannot retrieve the request", error = getCurrentExceptionMsg()
|
||||||
except Marketplace_UnknownRequest:
|
return none StorageRequest
|
||||||
return none StorageRequest
|
except EthersError, AsyncLockError:
|
||||||
|
error "Cannot retrieve the request", error = getCurrentExceptionMsg()
|
||||||
|
return none StorageRequest
|
||||||
|
|
||||||
method requestState*(
|
method requestState*(
|
||||||
market: OnChainMarket, requestId: RequestId
|
market: OnChainMarket, requestId: RequestId
|
||||||
@ -152,10 +188,17 @@ method requestState*(
|
|||||||
except Marketplace_UnknownRequest:
|
except Marketplace_UnknownRequest:
|
||||||
return none RequestState
|
return none RequestState
|
||||||
|
|
||||||
method slotState*(market: OnChainMarket, slotId: SlotId): Future[SlotState] {.async.} =
|
method slotState*(
|
||||||
|
market: OnChainMarket, slotId: SlotId
|
||||||
|
): Future[SlotState] {.async: (raises: [CancelledError, MarketError]).} =
|
||||||
convertEthersError:
|
convertEthersError:
|
||||||
let overrides = CallOverrides(blockTag: some BlockTag.pending)
|
try:
|
||||||
return await market.contract.slotState(slotId, overrides)
|
let overrides = CallOverrides(blockTag: some BlockTag.pending)
|
||||||
|
return await market.contract.slotState(slotId, overrides)
|
||||||
|
except AsyncLockError as err:
|
||||||
|
raiseMarketError(
|
||||||
|
"Failed to fetch the slot state from the Marketplace contract: " & err.msg
|
||||||
|
)
|
||||||
|
|
||||||
method getRequestEnd*(
|
method getRequestEnd*(
|
||||||
market: OnChainMarket, id: RequestId
|
market: OnChainMarket, id: RequestId
|
||||||
@ -507,3 +550,40 @@ method queryPastStorageRequestedEvents*(
|
|||||||
let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
|
let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
|
||||||
|
|
||||||
return await market.queryPastStorageRequestedEvents(fromBlock)
|
return await market.queryPastStorageRequestedEvents(fromBlock)
|
||||||
|
|
||||||
|
method slotCollateral*(
|
||||||
|
market: OnChainMarket, requestId: RequestId, slotIndex: uint64
|
||||||
|
): Future[?!UInt256] {.async: (raises: [CancelledError]).} =
|
||||||
|
let slotid = slotId(requestId, slotIndex)
|
||||||
|
|
||||||
|
try:
|
||||||
|
let slotState = await market.slotState(slotid)
|
||||||
|
|
||||||
|
without request =? await market.getRequest(requestId):
|
||||||
|
return failure newException(
|
||||||
|
MarketError, "Failure calculating the slotCollateral, cannot get the request"
|
||||||
|
)
|
||||||
|
|
||||||
|
return market.slotCollateral(request.ask.collateralPerSlot, slotState)
|
||||||
|
except MarketError as error:
|
||||||
|
error "Error when trying to calculate the slotCollateral", error = error.msg
|
||||||
|
return failure error
|
||||||
|
|
||||||
|
method slotCollateral*(
|
||||||
|
market: OnChainMarket, collateralPerSlot: UInt256, slotState: SlotState
|
||||||
|
): ?!UInt256 {.raises: [].} =
|
||||||
|
if slotState == SlotState.Repair:
|
||||||
|
without repairRewardPercentage =?
|
||||||
|
market.configuration .? collateral .? repairRewardPercentage:
|
||||||
|
return failure newException(
|
||||||
|
MarketError,
|
||||||
|
"Failure calculating the slotCollateral, cannot get the reward percentage",
|
||||||
|
)
|
||||||
|
|
||||||
|
return success (
|
||||||
|
collateralPerSlot - (collateralPerSlot * repairRewardPercentage.u256).div(
|
||||||
|
100.u256
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return success(collateralPerSlot)
|
||||||
|
|||||||
@ -62,25 +62,40 @@ type
|
|||||||
ProofSubmitted* = object of MarketplaceEvent
|
ProofSubmitted* = object of MarketplaceEvent
|
||||||
id*: SlotId
|
id*: SlotId
|
||||||
|
|
||||||
method getZkeyHash*(market: Market): Future[?string] {.base, async.} =
|
method loadConfig*(
|
||||||
|
market: Market
|
||||||
|
): Future[?!void] {.base, async: (raises: [CancelledError]).} =
|
||||||
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
|
method getZkeyHash*(
|
||||||
|
market: Market
|
||||||
|
): Future[?string] {.base, async: (raises: [CancelledError, MarketError]).} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method getSigner*(market: Market): Future[Address] {.base, async.} =
|
method getSigner*(market: Market): Future[Address] {.base, async.} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method periodicity*(market: Market): Future[Periodicity] {.base, async.} =
|
method periodicity*(
|
||||||
|
market: Market
|
||||||
|
): Future[Periodicity] {.base, async: (raises: [CancelledError, MarketError]).} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method proofTimeout*(market: Market): Future[uint64] {.base, async.} =
|
method proofTimeout*(
|
||||||
|
market: Market
|
||||||
|
): Future[uint64] {.base, async: (raises: [CancelledError, MarketError]).} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method repairRewardPercentage*(market: Market): Future[uint8] {.base, async.} =
|
method repairRewardPercentage*(
|
||||||
|
market: Market
|
||||||
|
): Future[uint8] {.base, async: (raises: [CancelledError, MarketError]).} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method requestDurationLimit*(market: Market): Future[uint64] {.base, async.} =
|
method requestDurationLimit*(market: Market): Future[uint64] {.base, async.} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method proofDowntime*(market: Market): Future[uint8] {.base, async.} =
|
method proofDowntime*(
|
||||||
|
market: Market
|
||||||
|
): Future[uint8] {.base, async: (raises: [CancelledError, MarketError]).} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method getPointer*(market: Market, slotId: SlotId): Future[uint8] {.base, async.} =
|
method getPointer*(market: Market, slotId: SlotId): Future[uint8] {.base, async.} =
|
||||||
@ -102,7 +117,7 @@ method mySlots*(market: Market): Future[seq[SlotId]] {.base, async.} =
|
|||||||
|
|
||||||
method getRequest*(
|
method getRequest*(
|
||||||
market: Market, id: RequestId
|
market: Market, id: RequestId
|
||||||
): Future[?StorageRequest] {.base, async.} =
|
): Future[?StorageRequest] {.base, async: (raises: [CancelledError]).} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method requestState*(
|
method requestState*(
|
||||||
@ -110,7 +125,9 @@ method requestState*(
|
|||||||
): Future[?RequestState] {.base, async.} =
|
): Future[?RequestState] {.base, async.} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method slotState*(market: Market, slotId: SlotId): Future[SlotState] {.base, async.} =
|
method slotState*(
|
||||||
|
market: Market, slotId: SlotId
|
||||||
|
): Future[SlotState] {.base, async: (raises: [CancelledError, MarketError]).} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
method getRequestEnd*(
|
method getRequestEnd*(
|
||||||
@ -270,3 +287,13 @@ method queryPastStorageRequestedEvents*(
|
|||||||
market: Market, blocksAgo: int
|
market: Market, blocksAgo: int
|
||||||
): Future[seq[StorageRequested]] {.base, async.} =
|
): Future[seq[StorageRequested]] {.base, async.} =
|
||||||
raiseAssert("not implemented")
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
|
method slotCollateral*(
|
||||||
|
market: Market, requestId: RequestId, slotIndex: uint64
|
||||||
|
): Future[?!UInt256] {.base, async: (raises: [CancelledError]).} =
|
||||||
|
raiseAssert("not implemented")
|
||||||
|
|
||||||
|
method slotCollateral*(
|
||||||
|
market: Market, collateralPerSlot: UInt256, slotState: SlotState
|
||||||
|
): ?!UInt256 {.base, gcsafe, raises: [].} =
|
||||||
|
raiseAssert("not implemented")
|
||||||
|
|||||||
@ -591,7 +591,11 @@ proc requestStorage*(
|
|||||||
success purchase.id
|
success purchase.id
|
||||||
|
|
||||||
proc onStore(
|
proc onStore(
|
||||||
self: CodexNodeRef, request: StorageRequest, slotIdx: uint64, blocksCb: BlocksCb
|
self: CodexNodeRef,
|
||||||
|
request: StorageRequest,
|
||||||
|
slotIdx: uint64,
|
||||||
|
blocksCb: BlocksCb,
|
||||||
|
isRepairing: bool = false,
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async.} =
|
||||||
## store data in local storage
|
## store data in local storage
|
||||||
##
|
##
|
||||||
@ -604,6 +608,10 @@ proc onStore(
|
|||||||
|
|
||||||
trace "Received a request to store a slot"
|
trace "Received a request to store a slot"
|
||||||
|
|
||||||
|
# TODO: Use the isRepairing to manage the slot download.
|
||||||
|
# If isRepairing is true, the slot has to be repaired before
|
||||||
|
# being downloaded.
|
||||||
|
|
||||||
without manifest =? (await self.fetchManifest(cid)), err:
|
without manifest =? (await self.fetchManifest(cid)), err:
|
||||||
trace "Unable to fetch manifest for cid", cid, err = err.msg
|
trace "Unable to fetch manifest for cid", cid, err = err.msg
|
||||||
return failure(err)
|
return failure(err)
|
||||||
@ -745,9 +753,12 @@ proc start*(self: CodexNodeRef) {.async.} =
|
|||||||
|
|
||||||
if hostContracts =? self.contracts.host:
|
if hostContracts =? self.contracts.host:
|
||||||
hostContracts.sales.onStore = proc(
|
hostContracts.sales.onStore = proc(
|
||||||
request: StorageRequest, slot: uint64, onBatch: BatchProc
|
request: StorageRequest,
|
||||||
|
slot: uint64,
|
||||||
|
onBatch: BatchProc,
|
||||||
|
isRepairing: bool = false,
|
||||||
): Future[?!void] =
|
): Future[?!void] =
|
||||||
self.onStore(request, slot, onBatch)
|
self.onStore(request, slot, onBatch, isRepairing)
|
||||||
|
|
||||||
hostContracts.sales.onExpiryUpdate = proc(
|
hostContracts.sales.onExpiryUpdate = proc(
|
||||||
rootCid: Cid, expiry: SecondsSince1970
|
rootCid: Cid, expiry: SecondsSince1970
|
||||||
|
|||||||
@ -157,13 +157,28 @@ proc cleanUp(
|
|||||||
# Re-add items back into the queue to prevent small availabilities from
|
# Re-add items back into the queue to prevent small availabilities from
|
||||||
# draining the queue. Seen items will be ordered last.
|
# draining the queue. Seen items will be ordered last.
|
||||||
if reprocessSlot and request =? data.request:
|
if reprocessSlot and request =? data.request:
|
||||||
let queue = sales.context.slotQueue
|
try:
|
||||||
var seenItem = SlotQueueItem.init(
|
without collateral =?
|
||||||
data.requestId, data.slotIndex.uint16, data.ask, request.expiry, seen = true
|
await sales.context.market.slotCollateral(data.requestId, data.slotIndex), err:
|
||||||
)
|
error "Failed to re-add item back to the slot queue: unable to calculate collateral",
|
||||||
trace "pushing ignored item to queue, marked as seen"
|
error = err.msg
|
||||||
if err =? queue.push(seenItem).errorOption:
|
return
|
||||||
error "failed to readd slot to queue", errorType = $(type err), error = err.msg
|
|
||||||
|
let queue = sales.context.slotQueue
|
||||||
|
var seenItem = SlotQueueItem.init(
|
||||||
|
data.requestId,
|
||||||
|
data.slotIndex.uint16,
|
||||||
|
data.ask,
|
||||||
|
request.expiry,
|
||||||
|
seen = true,
|
||||||
|
collateral = collateral,
|
||||||
|
)
|
||||||
|
trace "pushing ignored item to queue, marked as seen"
|
||||||
|
if err =? queue.push(seenItem).errorOption:
|
||||||
|
error "failed to readd slot to queue", errorType = $(type err), error = err.msg
|
||||||
|
except MarketError as e:
|
||||||
|
error "Failed to re-add item back to the slot queue.", error = e.msg
|
||||||
|
return
|
||||||
|
|
||||||
await sales.remove(agent)
|
await sales.remove(agent)
|
||||||
|
|
||||||
@ -283,7 +298,7 @@ proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} =
|
|||||||
|
|
||||||
proc onStorageRequested(
|
proc onStorageRequested(
|
||||||
sales: Sales, requestId: RequestId, ask: StorageAsk, expiry: uint64
|
sales: Sales, requestId: RequestId, ask: StorageAsk, expiry: uint64
|
||||||
) =
|
) {.raises: [].} =
|
||||||
logScope:
|
logScope:
|
||||||
topics = "marketplace sales onStorageRequested"
|
topics = "marketplace sales onStorageRequested"
|
||||||
requestId
|
requestId
|
||||||
@ -294,7 +309,14 @@ proc onStorageRequested(
|
|||||||
|
|
||||||
trace "storage requested, adding slots to queue"
|
trace "storage requested, adding slots to queue"
|
||||||
|
|
||||||
without items =? SlotQueueItem.init(requestId, ask, expiry).catch, err:
|
let market = sales.context.market
|
||||||
|
|
||||||
|
without collateral =? market.slotCollateral(ask.collateralPerSlot, SlotState.Free),
|
||||||
|
err:
|
||||||
|
error "Request failure, unable to calculate collateral", error = err.msg
|
||||||
|
return
|
||||||
|
|
||||||
|
without items =? SlotQueueItem.init(requestId, ask, expiry, collateral).catch, err:
|
||||||
if err of SlotsOutOfRangeError:
|
if err of SlotsOutOfRangeError:
|
||||||
warn "Too many slots, cannot add to queue"
|
warn "Too many slots, cannot add to queue"
|
||||||
else:
|
else:
|
||||||
@ -319,35 +341,45 @@ proc onSlotFreed(sales: Sales, requestId: RequestId, slotIndex: uint64) =
|
|||||||
|
|
||||||
trace "slot freed, adding to queue"
|
trace "slot freed, adding to queue"
|
||||||
|
|
||||||
proc addSlotToQueue() {.async: (raises: []).} =
|
proc addSlotToQueue() {.async: (raises: [CancelledError]).} =
|
||||||
let context = sales.context
|
let context = sales.context
|
||||||
let market = context.market
|
let market = context.market
|
||||||
let queue = context.slotQueue
|
let queue = context.slotQueue
|
||||||
|
|
||||||
|
without request =? (await market.getRequest(requestId)), err:
|
||||||
|
error "unknown request in contract", error = err.msgDetail
|
||||||
|
return
|
||||||
|
|
||||||
|
# Take the repairing state into consideration to calculate the collateral.
|
||||||
|
# This is particularly needed because it will affect the priority in the queue
|
||||||
|
# and we want to give the user the ability to tweak the parameters.
|
||||||
|
# Adding the repairing state directly in the queue priority calculation
|
||||||
|
# would not allow this flexibility.
|
||||||
|
without collateral =?
|
||||||
|
market.slotCollateral(request.ask.collateralPerSlot, SlotState.Repair), err:
|
||||||
|
error "Failed to add freed slot to queue: unable to calculate collateral",
|
||||||
|
error = err.msg
|
||||||
|
return
|
||||||
|
|
||||||
if slotIndex > uint16.high.uint64:
|
if slotIndex > uint16.high.uint64:
|
||||||
error "Cannot cast slot index to uint16, value = ", slotIndex
|
error "Cannot cast slot index to uint16, value = ", slotIndex
|
||||||
return
|
return
|
||||||
|
|
||||||
# first attempt to populate request using existing metadata in queue
|
without slotQueueItem =?
|
||||||
without var found =? queue.populateItem(requestId, slotIndex.uint16):
|
SlotQueueItem.init(request, slotIndex.uint16, collateral = collateral).catch, err:
|
||||||
trace "no existing request metadata, getting request info from contract"
|
warn "Too many slots, cannot add to queue", error = err.msgDetail
|
||||||
# if there's no existing slot for that request, retrieve the request
|
return
|
||||||
# from the contract.
|
|
||||||
try:
|
|
||||||
without request =? await market.getRequest(requestId):
|
|
||||||
error "unknown request in contract"
|
|
||||||
return
|
|
||||||
|
|
||||||
found = SlotQueueItem.init(request, slotIndex.uint16)
|
if err =? queue.push(slotQueueItem).errorOption:
|
||||||
except CancelledError:
|
if err of SlotQueueItemExistsError:
|
||||||
discard # do not propagate as addSlotToQueue was asyncSpawned
|
error "Failed to push item to queue becaue it already exists",
|
||||||
except CatchableError as e:
|
error = err.msgDetail
|
||||||
error "failed to get request from contract and add slots to queue",
|
elif err of QueueNotRunningError:
|
||||||
error = e.msgDetail
|
warn "Failed to push item to queue becaue queue is not running",
|
||||||
|
error = err.msgDetail
|
||||||
if err =? queue.push(found).errorOption:
|
|
||||||
error "failed to push slot items to queue", error = err.msgDetail
|
|
||||||
|
|
||||||
|
# We could get rid of this by adding the storage ask in the SlotFreed event,
|
||||||
|
# so we would not need to call getRequest to get the collateralPerSlot.
|
||||||
let fut = addSlotToQueue()
|
let fut = addSlotToQueue()
|
||||||
sales.trackedFutures.track(fut)
|
sales.trackedFutures.track(fut)
|
||||||
asyncSpawn fut
|
asyncSpawn fut
|
||||||
@ -356,7 +388,9 @@ proc subscribeRequested(sales: Sales) {.async.} =
|
|||||||
let context = sales.context
|
let context = sales.context
|
||||||
let market = context.market
|
let market = context.market
|
||||||
|
|
||||||
proc onStorageRequested(requestId: RequestId, ask: StorageAsk, expiry: uint64) =
|
proc onStorageRequested(
|
||||||
|
requestId: RequestId, ask: StorageAsk, expiry: uint64
|
||||||
|
) {.raises: [].} =
|
||||||
sales.onStorageRequested(requestId, ask, expiry)
|
sales.onStorageRequested(requestId, ask, expiry)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -26,7 +26,7 @@ type
|
|||||||
|
|
||||||
BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].}
|
BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].}
|
||||||
OnStore* = proc(
|
OnStore* = proc(
|
||||||
request: StorageRequest, slot: uint64, blocksCb: BlocksCb
|
request: StorageRequest, slot: uint64, blocksCb: BlocksCb, isRepairing: bool
|
||||||
): Future[?!void] {.gcsafe, upraises: [].}
|
): Future[?!void] {.gcsafe, upraises: [].}
|
||||||
OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.
|
OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.
|
||||||
gcsafe, upraises: []
|
gcsafe, upraises: []
|
||||||
|
|||||||
@ -34,7 +34,7 @@ type
|
|||||||
slotSize: uint64
|
slotSize: uint64
|
||||||
duration: uint64
|
duration: uint64
|
||||||
pricePerBytePerSecond: UInt256
|
pricePerBytePerSecond: UInt256
|
||||||
collateralPerByte: UInt256
|
collateral: UInt256 # Collateral computed
|
||||||
expiry: uint64
|
expiry: uint64
|
||||||
seen: bool
|
seen: bool
|
||||||
|
|
||||||
@ -76,9 +76,6 @@ proc profitability(item: SlotQueueItem): UInt256 =
|
|||||||
slotSize: item.slotSize,
|
slotSize: item.slotSize,
|
||||||
).pricePerSlot
|
).pricePerSlot
|
||||||
|
|
||||||
proc collateralPerSlot(item: SlotQueueItem): UInt256 =
|
|
||||||
StorageAsk(collateralPerByte: item.collateralPerByte, slotSize: item.slotSize).collateralPerSlot
|
|
||||||
|
|
||||||
proc `<`*(a, b: SlotQueueItem): bool =
|
proc `<`*(a, b: SlotQueueItem): bool =
|
||||||
# for A to have a higher priority than B (in a min queue), A must be less than
|
# for A to have a higher priority than B (in a min queue), A must be less than
|
||||||
# B.
|
# B.
|
||||||
@ -95,8 +92,8 @@ proc `<`*(a, b: SlotQueueItem): bool =
|
|||||||
scoreA.addIf(a.profitability > b.profitability, 3)
|
scoreA.addIf(a.profitability > b.profitability, 3)
|
||||||
scoreB.addIf(a.profitability < b.profitability, 3)
|
scoreB.addIf(a.profitability < b.profitability, 3)
|
||||||
|
|
||||||
scoreA.addIf(a.collateralPerSlot < b.collateralPerSlot, 2)
|
scoreA.addIf(a.collateral < b.collateral, 2)
|
||||||
scoreB.addIf(a.collateralPerSlot > b.collateralPerSlot, 2)
|
scoreB.addIf(a.collateral > b.collateral, 2)
|
||||||
|
|
||||||
scoreA.addIf(a.expiry > b.expiry, 1)
|
scoreA.addIf(a.expiry > b.expiry, 1)
|
||||||
scoreB.addIf(a.expiry < b.expiry, 1)
|
scoreB.addIf(a.expiry < b.expiry, 1)
|
||||||
@ -137,6 +134,7 @@ proc init*(
|
|||||||
slotIndex: uint16,
|
slotIndex: uint16,
|
||||||
ask: StorageAsk,
|
ask: StorageAsk,
|
||||||
expiry: uint64,
|
expiry: uint64,
|
||||||
|
collateral: UInt256,
|
||||||
seen = false,
|
seen = false,
|
||||||
): SlotQueueItem =
|
): SlotQueueItem =
|
||||||
SlotQueueItem(
|
SlotQueueItem(
|
||||||
@ -145,25 +143,32 @@ proc init*(
|
|||||||
slotSize: ask.slotSize,
|
slotSize: ask.slotSize,
|
||||||
duration: ask.duration,
|
duration: ask.duration,
|
||||||
pricePerBytePerSecond: ask.pricePerBytePerSecond,
|
pricePerBytePerSecond: ask.pricePerBytePerSecond,
|
||||||
collateralPerByte: ask.collateralPerByte,
|
collateral: collateral,
|
||||||
expiry: expiry,
|
expiry: expiry,
|
||||||
seen: seen,
|
seen: seen,
|
||||||
)
|
)
|
||||||
|
|
||||||
proc init*(
|
proc init*(
|
||||||
_: type SlotQueueItem, request: StorageRequest, slotIndex: uint16
|
_: type SlotQueueItem,
|
||||||
|
request: StorageRequest,
|
||||||
|
slotIndex: uint16,
|
||||||
|
collateral: UInt256,
|
||||||
): SlotQueueItem =
|
): SlotQueueItem =
|
||||||
SlotQueueItem.init(request.id, slotIndex, request.ask, request.expiry)
|
SlotQueueItem.init(request.id, slotIndex, request.ask, request.expiry, collateral)
|
||||||
|
|
||||||
proc init*(
|
proc init*(
|
||||||
_: type SlotQueueItem, requestId: RequestId, ask: StorageAsk, expiry: uint64
|
_: type SlotQueueItem,
|
||||||
): seq[SlotQueueItem] =
|
requestId: RequestId,
|
||||||
|
ask: StorageAsk,
|
||||||
|
expiry: uint64,
|
||||||
|
collateral: UInt256,
|
||||||
|
): seq[SlotQueueItem] {.raises: [SlotsOutOfRangeError].} =
|
||||||
if not ask.slots.inRange:
|
if not ask.slots.inRange:
|
||||||
raise newException(SlotsOutOfRangeError, "Too many slots")
|
raise newException(SlotsOutOfRangeError, "Too many slots")
|
||||||
|
|
||||||
var i = 0'u16
|
var i = 0'u16
|
||||||
proc initSlotQueueItem(): SlotQueueItem =
|
proc initSlotQueueItem(): SlotQueueItem =
|
||||||
let item = SlotQueueItem.init(requestId, i, ask, expiry)
|
let item = SlotQueueItem.init(requestId, i, ask, expiry, collateral)
|
||||||
inc i
|
inc i
|
||||||
return item
|
return item
|
||||||
|
|
||||||
@ -171,8 +176,10 @@ proc init*(
|
|||||||
Rng.instance.shuffle(items)
|
Rng.instance.shuffle(items)
|
||||||
return items
|
return items
|
||||||
|
|
||||||
proc init*(_: type SlotQueueItem, request: StorageRequest): seq[SlotQueueItem] =
|
proc init*(
|
||||||
return SlotQueueItem.init(request.id, request.ask, request.expiry)
|
_: type SlotQueueItem, request: StorageRequest, collateral: UInt256
|
||||||
|
): seq[SlotQueueItem] =
|
||||||
|
return SlotQueueItem.init(request.id, request.ask, request.expiry, collateral)
|
||||||
|
|
||||||
proc inRange*(val: SomeUnsignedInt): bool =
|
proc inRange*(val: SomeUnsignedInt): bool =
|
||||||
val.uint16 in SlotQueueSize.low .. SlotQueueSize.high
|
val.uint16 in SlotQueueSize.low .. SlotQueueSize.high
|
||||||
@ -234,25 +241,7 @@ proc unpause*(self: SlotQueue) =
|
|||||||
# set unpaused flag to true -- unblocks coroutines waiting on unpaused.wait()
|
# set unpaused flag to true -- unblocks coroutines waiting on unpaused.wait()
|
||||||
self.unpaused.fire()
|
self.unpaused.fire()
|
||||||
|
|
||||||
proc populateItem*(
|
proc push*(self: SlotQueue, item: SlotQueueItem): ?!void {.raises: [].} =
|
||||||
self: SlotQueue, requestId: RequestId, slotIndex: uint16
|
|
||||||
): ?SlotQueueItem =
|
|
||||||
trace "populate item, items in queue", len = self.queue.len
|
|
||||||
for item in self.queue.items:
|
|
||||||
trace "populate item search", itemRequestId = item.requestId, requestId
|
|
||||||
if item.requestId == requestId:
|
|
||||||
return some SlotQueueItem(
|
|
||||||
requestId: requestId,
|
|
||||||
slotIndex: slotIndex,
|
|
||||||
slotSize: item.slotSize,
|
|
||||||
duration: item.duration,
|
|
||||||
pricePerBytePerSecond: item.pricePerBytePerSecond,
|
|
||||||
collateralPerByte: item.collateralPerByte,
|
|
||||||
expiry: item.expiry,
|
|
||||||
)
|
|
||||||
return none SlotQueueItem
|
|
||||||
|
|
||||||
proc push*(self: SlotQueue, item: SlotQueueItem): ?!void =
|
|
||||||
logScope:
|
logScope:
|
||||||
requestId = item.requestId
|
requestId = item.requestId
|
||||||
slotIndex = item.slotIndex
|
slotIndex = item.slotIndex
|
||||||
|
|||||||
@ -67,8 +67,11 @@ method run*(
|
|||||||
return await reservations.release(reservation.id, reservation.availabilityId, bytes)
|
return await reservations.release(reservation.id, reservation.availabilityId, bytes)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
let slotId = slotId(request.id, data.slotIndex)
|
||||||
|
let isRepairing = (await context.market.slotState(slotId)) == SlotState.Repair
|
||||||
|
|
||||||
trace "Starting download"
|
trace "Starting download"
|
||||||
if err =? (await onStore(request, data.slotIndex, onBlocks)).errorOption:
|
if err =? (await onStore(request, data.slotIndex, onBlocks, isRepairing)).errorOption:
|
||||||
return some State(SaleErrored(error: err, reprocessSlot: false))
|
return some State(SaleErrored(error: err, reprocessSlot: false))
|
||||||
|
|
||||||
trace "Download complete"
|
trace "Download complete"
|
||||||
|
|||||||
@ -38,18 +38,11 @@ method run*(
|
|||||||
slotIndex = data.slotIndex
|
slotIndex = data.slotIndex
|
||||||
|
|
||||||
try:
|
try:
|
||||||
let slotState = await market.slotState(slotId(data.requestId, data.slotIndex))
|
without collateral =? await market.slotCollateral(data.requestId, data.slotIndex),
|
||||||
let requestedCollateral = request.ask.collateralPerSlot
|
err:
|
||||||
var collateral: UInt256
|
error "Failure attempting to fill slot: unable to calculate collateral",
|
||||||
|
error = err.msg
|
||||||
if slotState == SlotState.Repair:
|
return
|
||||||
# When repairing the node gets "discount" on the collateral that it needs to
|
|
||||||
let repairRewardPercentage = (await market.repairRewardPercentage).u256
|
|
||||||
collateral =
|
|
||||||
requestedCollateral -
|
|
||||||
((requestedCollateral * repairRewardPercentage)).div(100.u256)
|
|
||||||
else:
|
|
||||||
collateral = requestedCollateral
|
|
||||||
|
|
||||||
debug "Filling slot"
|
debug "Filling slot"
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -138,22 +138,35 @@ proc new*(_: type MockMarket, clock: ?Clock = Clock.none): MockMarket =
|
|||||||
signer: Address.example, config: config, canReserveSlot: true, clock: clock
|
signer: Address.example, config: config, canReserveSlot: true, clock: clock
|
||||||
)
|
)
|
||||||
|
|
||||||
|
method loadConfig*(
|
||||||
|
market: MockMarket
|
||||||
|
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
discard
|
||||||
|
|
||||||
method getSigner*(market: MockMarket): Future[Address] {.async.} =
|
method getSigner*(market: MockMarket): Future[Address] {.async.} =
|
||||||
return market.signer
|
return market.signer
|
||||||
|
|
||||||
method periodicity*(mock: MockMarket): Future[Periodicity] {.async.} =
|
method periodicity*(
|
||||||
|
mock: MockMarket
|
||||||
|
): Future[Periodicity] {.async: (raises: [CancelledError, MarketError]).} =
|
||||||
return Periodicity(seconds: mock.config.proofs.period)
|
return Periodicity(seconds: mock.config.proofs.period)
|
||||||
|
|
||||||
method proofTimeout*(market: MockMarket): Future[uint64] {.async.} =
|
method proofTimeout*(
|
||||||
|
market: MockMarket
|
||||||
|
): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} =
|
||||||
return market.config.proofs.timeout
|
return market.config.proofs.timeout
|
||||||
|
|
||||||
method requestDurationLimit*(market: MockMarket): Future[uint64] {.async.} =
|
method requestDurationLimit*(market: MockMarket): Future[uint64] {.async.} =
|
||||||
return market.config.requestDurationLimit
|
return market.config.requestDurationLimit
|
||||||
|
|
||||||
method proofDowntime*(market: MockMarket): Future[uint8] {.async.} =
|
method proofDowntime*(
|
||||||
|
market: MockMarket
|
||||||
|
): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
|
||||||
return market.config.proofs.downtime
|
return market.config.proofs.downtime
|
||||||
|
|
||||||
method repairRewardPercentage*(market: MockMarket): Future[uint8] {.async.} =
|
method repairRewardPercentage*(
|
||||||
|
market: MockMarket
|
||||||
|
): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
|
||||||
return market.config.collateral.repairRewardPercentage
|
return market.config.collateral.repairRewardPercentage
|
||||||
|
|
||||||
method getPointer*(market: MockMarket, slotId: SlotId): Future[uint8] {.async.} =
|
method getPointer*(market: MockMarket, slotId: SlotId): Future[uint8] {.async.} =
|
||||||
@ -173,7 +186,7 @@ method mySlots*(market: MockMarket): Future[seq[SlotId]] {.async.} =
|
|||||||
|
|
||||||
method getRequest*(
|
method getRequest*(
|
||||||
market: MockMarket, id: RequestId
|
market: MockMarket, id: RequestId
|
||||||
): Future[?StorageRequest] {.async.} =
|
): Future[?StorageRequest] {.async: (raises: [CancelledError]).} =
|
||||||
for request in market.requested:
|
for request in market.requested:
|
||||||
if request.id == id:
|
if request.id == id:
|
||||||
return some request
|
return some request
|
||||||
@ -191,10 +204,16 @@ method requestState*(
|
|||||||
): Future[?RequestState] {.async.} =
|
): Future[?RequestState] {.async.} =
|
||||||
return market.requestState .? [requestId]
|
return market.requestState .? [requestId]
|
||||||
|
|
||||||
method slotState*(market: MockMarket, slotId: SlotId): Future[SlotState] {.async.} =
|
method slotState*(
|
||||||
if not market.slotState.hasKey(slotId):
|
market: MockMarket, slotId: SlotId
|
||||||
|
): Future[SlotState] {.async: (raises: [CancelledError, MarketError]).} =
|
||||||
|
if slotId notin market.slotState:
|
||||||
return SlotState.Free
|
return SlotState.Free
|
||||||
return market.slotState[slotId]
|
|
||||||
|
try:
|
||||||
|
return market.slotState[slotId]
|
||||||
|
except KeyError as e:
|
||||||
|
raiseAssert "SlotId not found in known slots (MockMarket.slotState)"
|
||||||
|
|
||||||
method getRequestEnd*(
|
method getRequestEnd*(
|
||||||
market: MockMarket, id: RequestId
|
market: MockMarket, id: RequestId
|
||||||
@ -534,3 +553,33 @@ method unsubscribe*(subscription: ProofSubmittedSubscription) {.async.} =
|
|||||||
|
|
||||||
method unsubscribe*(subscription: SlotReservationsFullSubscription) {.async.} =
|
method unsubscribe*(subscription: SlotReservationsFullSubscription) {.async.} =
|
||||||
subscription.market.subscriptions.onSlotReservationsFull.keepItIf(it != subscription)
|
subscription.market.subscriptions.onSlotReservationsFull.keepItIf(it != subscription)
|
||||||
|
|
||||||
|
method slotCollateral*(
|
||||||
|
market: MockMarket, requestId: RequestId, slotIndex: uint64
|
||||||
|
): Future[?!UInt256] {.async: (raises: [CancelledError]).} =
|
||||||
|
let slotid = slotId(requestId, slotIndex)
|
||||||
|
|
||||||
|
try:
|
||||||
|
let state = await slotState(market, slotid)
|
||||||
|
|
||||||
|
without request =? await market.getRequest(requestId):
|
||||||
|
return failure newException(
|
||||||
|
MarketError, "Failure calculating the slotCollateral, cannot get the request"
|
||||||
|
)
|
||||||
|
|
||||||
|
return market.slotCollateral(request.ask.collateralPerSlot, state)
|
||||||
|
except MarketError as error:
|
||||||
|
error "Error when trying to calculate the slotCollateral", error = error.msg
|
||||||
|
return failure error
|
||||||
|
|
||||||
|
method slotCollateral*(
|
||||||
|
market: MockMarket, collateralPerSlot: UInt256, slotState: SlotState
|
||||||
|
): ?!UInt256 {.raises: [].} =
|
||||||
|
if slotState == SlotState.Repair:
|
||||||
|
let repairRewardPercentage = market.config.collateral.repairRewardPercentage.u256
|
||||||
|
|
||||||
|
return success (
|
||||||
|
collateralPerSlot - (collateralPerSlot * repairRewardPercentage).div(100.u256)
|
||||||
|
)
|
||||||
|
|
||||||
|
return success collateralPerSlot
|
||||||
|
|||||||
@ -7,7 +7,7 @@ type MockSlotQueueItem* = object
|
|||||||
slotSize*: uint64
|
slotSize*: uint64
|
||||||
duration*: uint64
|
duration*: uint64
|
||||||
pricePerBytePerSecond*: UInt256
|
pricePerBytePerSecond*: UInt256
|
||||||
collateralPerByte*: UInt256
|
collateral*: UInt256
|
||||||
expiry*: uint64
|
expiry*: uint64
|
||||||
seen*: bool
|
seen*: bool
|
||||||
|
|
||||||
@ -19,8 +19,8 @@ proc toSlotQueueItem*(item: MockSlotQueueItem): SlotQueueItem =
|
|||||||
slotSize: item.slotSize,
|
slotSize: item.slotSize,
|
||||||
duration: item.duration,
|
duration: item.duration,
|
||||||
pricePerBytePerSecond: item.pricePerBytePerSecond,
|
pricePerBytePerSecond: item.pricePerBytePerSecond,
|
||||||
collateralPerByte: item.collateralPerByte,
|
|
||||||
),
|
),
|
||||||
expiry = item.expiry,
|
expiry = item.expiry,
|
||||||
seen = item.seen,
|
seen = item.seen,
|
||||||
|
collateral = item.collateral,
|
||||||
)
|
)
|
||||||
|
|||||||
@ -125,7 +125,7 @@ asyncchecksuite "Test Node - Host contracts":
|
|||||||
fetchedBytes += blk.data.len.uint
|
fetchedBytes += blk.data.len.uint
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
(await onStore(request, 1.uint64, onBlocks)).tryGet()
|
(await onStore(request, 1.uint64, onBlocks, isRepairing = false)).tryGet()
|
||||||
check fetchedBytes == 12 * DefaultBlockSize.uint
|
check fetchedBytes == 12 * DefaultBlockSize.uint
|
||||||
|
|
||||||
let indexer = verifiable.protectedStrategy.init(
|
let indexer = verifiable.protectedStrategy.init(
|
||||||
|
|||||||
@ -62,7 +62,7 @@ asyncchecksuite "Sales - start":
|
|||||||
sales = Sales.new(market, clock, repo)
|
sales = Sales.new(market, clock, repo)
|
||||||
reservations = sales.context.reservations
|
reservations = sales.context.reservations
|
||||||
sales.onStore = proc(
|
sales.onStore = proc(
|
||||||
request: StorageRequest, slot: uint64, onBatch: BatchProc
|
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async.} =
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
@ -181,7 +181,7 @@ asyncchecksuite "Sales":
|
|||||||
sales = Sales.new(market, clock, repo)
|
sales = Sales.new(market, clock, repo)
|
||||||
reservations = sales.context.reservations
|
reservations = sales.context.reservations
|
||||||
sales.onStore = proc(
|
sales.onStore = proc(
|
||||||
request: StorageRequest, slot: uint64, onBatch: BatchProc
|
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async.} =
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
@ -229,7 +229,7 @@ asyncchecksuite "Sales":
|
|||||||
availability = a.get # update id
|
availability = a.get # update id
|
||||||
|
|
||||||
proc notProcessed(itemsProcessed: seq[SlotQueueItem], request: StorageRequest): bool =
|
proc notProcessed(itemsProcessed: seq[SlotQueueItem], request: StorageRequest): bool =
|
||||||
let items = SlotQueueItem.init(request)
|
let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot)
|
||||||
for i in 0 ..< items.len:
|
for i in 0 ..< items.len:
|
||||||
if itemsProcessed.contains(items[i]):
|
if itemsProcessed.contains(items[i]):
|
||||||
return false
|
return false
|
||||||
@ -266,7 +266,7 @@ asyncchecksuite "Sales":
|
|||||||
done.complete()
|
done.complete()
|
||||||
createAvailability()
|
createAvailability()
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
let items = SlotQueueItem.init(request)
|
let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot)
|
||||||
check eventually items.allIt(itemsProcessed.contains(it))
|
check eventually items.allIt(itemsProcessed.contains(it))
|
||||||
|
|
||||||
test "removes slots from slot queue once RequestCancelled emitted":
|
test "removes slots from slot queue once RequestCancelled emitted":
|
||||||
@ -287,13 +287,15 @@ asyncchecksuite "Sales":
|
|||||||
test "removes slot index from slot queue once SlotFilled emitted":
|
test "removes slot index from slot queue once SlotFilled emitted":
|
||||||
let request1 = await addRequestToSaturatedQueue()
|
let request1 = await addRequestToSaturatedQueue()
|
||||||
market.emitSlotFilled(request1.id, 1.uint64)
|
market.emitSlotFilled(request1.id, 1.uint64)
|
||||||
let expected = SlotQueueItem.init(request1, 1'u16)
|
let expected =
|
||||||
|
SlotQueueItem.init(request1, 1'u16, collateral = request1.ask.collateralPerSlot)
|
||||||
check always (not itemsProcessed.contains(expected))
|
check always (not itemsProcessed.contains(expected))
|
||||||
|
|
||||||
test "removes slot index from slot queue once SlotReservationsFull emitted":
|
test "removes slot index from slot queue once SlotReservationsFull emitted":
|
||||||
let request1 = await addRequestToSaturatedQueue()
|
let request1 = await addRequestToSaturatedQueue()
|
||||||
market.emitSlotReservationsFull(request1.id, 1.uint64)
|
market.emitSlotReservationsFull(request1.id, 1.uint64)
|
||||||
let expected = SlotQueueItem.init(request1, 1'u16)
|
let expected =
|
||||||
|
SlotQueueItem.init(request1, 1'u16, collateral = request1.ask.collateralPerSlot)
|
||||||
check always (not itemsProcessed.contains(expected))
|
check always (not itemsProcessed.contains(expected))
|
||||||
|
|
||||||
test "adds slot index to slot queue once SlotFreed emitted":
|
test "adds slot index to slot queue once SlotFreed emitted":
|
||||||
@ -303,14 +305,21 @@ asyncchecksuite "Sales":
|
|||||||
|
|
||||||
createAvailability()
|
createAvailability()
|
||||||
market.requested.add request # "contract" must be able to return request
|
market.requested.add request # "contract" must be able to return request
|
||||||
|
|
||||||
market.emitSlotFreed(request.id, 2.uint64)
|
market.emitSlotFreed(request.id, 2.uint64)
|
||||||
|
|
||||||
let expected = SlotQueueItem.init(request, 2.uint16)
|
without collateralPerSlot =? await market.slotCollateral(request.id, 2.uint64),
|
||||||
|
error:
|
||||||
|
fail()
|
||||||
|
|
||||||
|
let expected =
|
||||||
|
SlotQueueItem.init(request, 2.uint16, collateral = request.ask.collateralPerSlot)
|
||||||
|
|
||||||
check eventually itemsProcessed.contains(expected)
|
check eventually itemsProcessed.contains(expected)
|
||||||
|
|
||||||
test "items in queue are readded (and marked seen) once ignored":
|
test "items in queue are readded (and marked seen) once ignored":
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
let items = SlotQueueItem.init(request)
|
let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot)
|
||||||
check eventually queue.len > 0
|
check eventually queue.len > 0
|
||||||
# queue starts paused, allow items to be added to the queue
|
# queue starts paused, allow items to be added to the queue
|
||||||
check eventually queue.paused
|
check eventually queue.paused
|
||||||
@ -331,7 +340,7 @@ asyncchecksuite "Sales":
|
|||||||
test "queue is paused once availability is insufficient to service slots in queue":
|
test "queue is paused once availability is insufficient to service slots in queue":
|
||||||
createAvailability() # enough to fill a single slot
|
createAvailability() # enough to fill a single slot
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
let items = SlotQueueItem.init(request)
|
let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot)
|
||||||
check eventually queue.len > 0
|
check eventually queue.len > 0
|
||||||
# queue starts paused, allow items to be added to the queue
|
# queue starts paused, allow items to be added to the queue
|
||||||
check eventually queue.paused
|
check eventually queue.paused
|
||||||
@ -348,7 +357,7 @@ asyncchecksuite "Sales":
|
|||||||
|
|
||||||
test "availability size is reduced by request slot size when fully downloaded":
|
test "availability size is reduced by request slot size when fully downloaded":
|
||||||
sales.onStore = proc(
|
sales.onStore = proc(
|
||||||
request: StorageRequest, slot: uint64, onBatch: BatchProc
|
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async.} =
|
||||||
let blk = bt.Block.new(@[1.byte]).get
|
let blk = bt.Block.new(@[1.byte]).get
|
||||||
await onBatch(blk.repeat(request.ask.slotSize.int))
|
await onBatch(blk.repeat(request.ask.slotSize.int))
|
||||||
@ -361,7 +370,7 @@ asyncchecksuite "Sales":
|
|||||||
test "non-downloaded bytes are returned to availability once finished":
|
test "non-downloaded bytes are returned to availability once finished":
|
||||||
var slotIndex = 0.uint64
|
var slotIndex = 0.uint64
|
||||||
sales.onStore = proc(
|
sales.onStore = proc(
|
||||||
request: StorageRequest, slot: uint64, onBatch: BatchProc
|
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async.} =
|
||||||
slotIndex = slot
|
slotIndex = slot
|
||||||
let blk = bt.Block.new(@[1.byte]).get
|
let blk = bt.Block.new(@[1.byte]).get
|
||||||
@ -421,7 +430,7 @@ asyncchecksuite "Sales":
|
|||||||
var storingRequest: StorageRequest
|
var storingRequest: StorageRequest
|
||||||
var storingSlot: uint64
|
var storingSlot: uint64
|
||||||
sales.onStore = proc(
|
sales.onStore = proc(
|
||||||
request: StorageRequest, slot: uint64, onBatch: BatchProc
|
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async.} =
|
||||||
storingRequest = request
|
storingRequest = request
|
||||||
storingSlot = slot
|
storingSlot = slot
|
||||||
@ -434,7 +443,7 @@ asyncchecksuite "Sales":
|
|||||||
test "makes storage available again when data retrieval fails":
|
test "makes storage available again when data retrieval fails":
|
||||||
let error = newException(IOError, "data retrieval failed")
|
let error = newException(IOError, "data retrieval failed")
|
||||||
sales.onStore = proc(
|
sales.onStore = proc(
|
||||||
request: StorageRequest, slot: uint64, onBatch: BatchProc
|
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async.} =
|
||||||
return failure(error)
|
return failure(error)
|
||||||
createAvailability()
|
createAvailability()
|
||||||
@ -503,7 +512,7 @@ asyncchecksuite "Sales":
|
|||||||
test "makes storage available again when other host fills the slot":
|
test "makes storage available again when other host fills the slot":
|
||||||
let otherHost = Address.example
|
let otherHost = Address.example
|
||||||
sales.onStore = proc(
|
sales.onStore = proc(
|
||||||
request: StorageRequest, slot: uint64, onBatch: BatchProc
|
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async.} =
|
||||||
await sleepAsync(chronos.hours(1))
|
await sleepAsync(chronos.hours(1))
|
||||||
return success()
|
return success()
|
||||||
@ -519,7 +528,7 @@ asyncchecksuite "Sales":
|
|||||||
|
|
||||||
let origSize = availability.freeSize
|
let origSize = availability.freeSize
|
||||||
sales.onStore = proc(
|
sales.onStore = proc(
|
||||||
request: StorageRequest, slot: uint64, onBatch: BatchProc
|
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async.} =
|
||||||
await sleepAsync(chronos.hours(1))
|
await sleepAsync(chronos.hours(1))
|
||||||
return success()
|
return success()
|
||||||
@ -544,7 +553,7 @@ asyncchecksuite "Sales":
|
|||||||
|
|
||||||
let origSize = availability.freeSize
|
let origSize = availability.freeSize
|
||||||
sales.onStore = proc(
|
sales.onStore = proc(
|
||||||
request: StorageRequest, slot: uint64, onBatch: BatchProc
|
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async.} =
|
||||||
await sleepAsync(chronos.hours(1))
|
await sleepAsync(chronos.hours(1))
|
||||||
return success()
|
return success()
|
||||||
|
|||||||
@ -159,8 +159,10 @@ suite "Slot queue":
|
|||||||
requestB.ask.collateralPerByte = 1.u256
|
requestB.ask.collateralPerByte = 1.u256
|
||||||
requestB.expiry = 1000.uint64
|
requestB.expiry = 1000.uint64
|
||||||
|
|
||||||
let itemA = SlotQueueItem.init(requestA, 0)
|
let itemA =
|
||||||
let itemB = SlotQueueItem.init(requestB, 0)
|
SlotQueueItem.init(requestA, 0, collateral = requestA.ask.collateralPerSlot)
|
||||||
|
let itemB =
|
||||||
|
SlotQueueItem.init(requestB, 0, collateral = requestB.ask.collateralPerSlot)
|
||||||
check itemB < itemA # B higher priority than A
|
check itemB < itemA # B higher priority than A
|
||||||
check itemA > itemB
|
check itemA > itemB
|
||||||
|
|
||||||
@ -172,7 +174,7 @@ suite "Slot queue":
|
|||||||
slotSize: 1.uint64,
|
slotSize: 1.uint64,
|
||||||
duration: 1.uint64,
|
duration: 1.uint64,
|
||||||
pricePerBytePerSecond: 2.u256, # profitability is higher (good)
|
pricePerBytePerSecond: 2.u256, # profitability is higher (good)
|
||||||
collateralPerByte: 1.u256,
|
collateral: 1.u256,
|
||||||
expiry: 1.uint64,
|
expiry: 1.uint64,
|
||||||
seen: true, # seen (bad), more weight than profitability
|
seen: true, # seen (bad), more weight than profitability
|
||||||
)
|
)
|
||||||
@ -182,7 +184,7 @@ suite "Slot queue":
|
|||||||
slotSize: 1.uint64,
|
slotSize: 1.uint64,
|
||||||
duration: 1.uint64,
|
duration: 1.uint64,
|
||||||
pricePerBytePerSecond: 1.u256, # profitability is lower (bad)
|
pricePerBytePerSecond: 1.u256, # profitability is lower (bad)
|
||||||
collateralPerByte: 1.u256,
|
collateral: 1.u256,
|
||||||
expiry: 1.uint64,
|
expiry: 1.uint64,
|
||||||
seen: false, # not seen (good)
|
seen: false, # not seen (good)
|
||||||
)
|
)
|
||||||
@ -197,7 +199,7 @@ suite "Slot queue":
|
|||||||
slotSize: 1.uint64,
|
slotSize: 1.uint64,
|
||||||
duration: 1.uint64,
|
duration: 1.uint64,
|
||||||
pricePerBytePerSecond: 1.u256, # reward is lower (bad)
|
pricePerBytePerSecond: 1.u256, # reward is lower (bad)
|
||||||
collateralPerByte: 1.u256, # collateral is lower (good)
|
collateral: 1.u256, # collateral is lower (good)
|
||||||
expiry: 1.uint64,
|
expiry: 1.uint64,
|
||||||
seen: false,
|
seen: false,
|
||||||
)
|
)
|
||||||
@ -208,7 +210,7 @@ suite "Slot queue":
|
|||||||
duration: 1.uint64,
|
duration: 1.uint64,
|
||||||
pricePerBytePerSecond: 2.u256,
|
pricePerBytePerSecond: 2.u256,
|
||||||
# reward is higher (good), more weight than collateral
|
# reward is higher (good), more weight than collateral
|
||||||
collateralPerByte: 2.u256, # collateral is higher (bad)
|
collateral: 2.u256, # collateral is higher (bad)
|
||||||
expiry: 1.uint64,
|
expiry: 1.uint64,
|
||||||
seen: false,
|
seen: false,
|
||||||
)
|
)
|
||||||
@ -223,7 +225,7 @@ suite "Slot queue":
|
|||||||
slotSize: 1.uint64,
|
slotSize: 1.uint64,
|
||||||
duration: 1.uint64,
|
duration: 1.uint64,
|
||||||
pricePerBytePerSecond: 1.u256,
|
pricePerBytePerSecond: 1.u256,
|
||||||
collateralPerByte: 2.u256, # collateral is higher (bad)
|
collateral: 2.u256, # collateral is higher (bad)
|
||||||
expiry: 2.uint64, # expiry is longer (good)
|
expiry: 2.uint64, # expiry is longer (good)
|
||||||
seen: false,
|
seen: false,
|
||||||
)
|
)
|
||||||
@ -233,7 +235,7 @@ suite "Slot queue":
|
|||||||
slotSize: 1.uint64,
|
slotSize: 1.uint64,
|
||||||
duration: 1.uint64,
|
duration: 1.uint64,
|
||||||
pricePerBytePerSecond: 1.u256,
|
pricePerBytePerSecond: 1.u256,
|
||||||
collateralPerByte: 1.u256, # collateral is lower (good), more weight than expiry
|
collateral: 1.u256, # collateral is lower (good), more weight than expiry
|
||||||
expiry: 1.uint64, # expiry is shorter (bad)
|
expiry: 1.uint64, # expiry is shorter (bad)
|
||||||
seen: false,
|
seen: false,
|
||||||
)
|
)
|
||||||
@ -248,7 +250,7 @@ suite "Slot queue":
|
|||||||
slotSize: 1.uint64, # slotSize is smaller (good)
|
slotSize: 1.uint64, # slotSize is smaller (good)
|
||||||
duration: 1.uint64,
|
duration: 1.uint64,
|
||||||
pricePerBytePerSecond: 1.u256,
|
pricePerBytePerSecond: 1.u256,
|
||||||
collateralPerByte: 1.u256,
|
collateral: 1.u256,
|
||||||
expiry: 1.uint64, # expiry is shorter (bad)
|
expiry: 1.uint64, # expiry is shorter (bad)
|
||||||
seen: false,
|
seen: false,
|
||||||
)
|
)
|
||||||
@ -258,7 +260,7 @@ suite "Slot queue":
|
|||||||
slotSize: 2.uint64, # slotSize is larger (bad)
|
slotSize: 2.uint64, # slotSize is larger (bad)
|
||||||
duration: 1.uint64,
|
duration: 1.uint64,
|
||||||
pricePerBytePerSecond: 1.u256,
|
pricePerBytePerSecond: 1.u256,
|
||||||
collateralPerByte: 1.u256,
|
collateral: 1.u256,
|
||||||
expiry: 2.uint64, # expiry is longer (good), more weight than slotSize
|
expiry: 2.uint64, # expiry is longer (good), more weight than slotSize
|
||||||
seen: false,
|
seen: false,
|
||||||
)
|
)
|
||||||
@ -273,7 +275,7 @@ suite "Slot queue":
|
|||||||
slotSize: 2.uint64, # slotSize is larger (bad)
|
slotSize: 2.uint64, # slotSize is larger (bad)
|
||||||
duration: 1.uint64,
|
duration: 1.uint64,
|
||||||
pricePerBytePerSecond: 1.u256,
|
pricePerBytePerSecond: 1.u256,
|
||||||
collateralPerByte: 1.u256,
|
collateral: 1.u256,
|
||||||
expiry: 1.uint64, # expiry is shorter (bad)
|
expiry: 1.uint64, # expiry is shorter (bad)
|
||||||
seen: false,
|
seen: false,
|
||||||
)
|
)
|
||||||
@ -283,7 +285,7 @@ suite "Slot queue":
|
|||||||
slotSize: 1.uint64, # slotSize is smaller (good)
|
slotSize: 1.uint64, # slotSize is smaller (good)
|
||||||
duration: 1.uint64,
|
duration: 1.uint64,
|
||||||
pricePerBytePerSecond: 1.u256,
|
pricePerBytePerSecond: 1.u256,
|
||||||
collateralPerByte: 1.u256,
|
collateral: 1.u256,
|
||||||
expiry: 1.uint64,
|
expiry: 1.uint64,
|
||||||
seen: false,
|
seen: false,
|
||||||
)
|
)
|
||||||
@ -292,11 +294,16 @@ suite "Slot queue":
|
|||||||
|
|
||||||
test "expands available all possible slot indices on init":
|
test "expands available all possible slot indices on init":
|
||||||
let request = StorageRequest.example
|
let request = StorageRequest.example
|
||||||
let items = SlotQueueItem.init(request)
|
let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot)
|
||||||
check items.len.uint64 == request.ask.slots
|
check items.len.uint64 == request.ask.slots
|
||||||
var checked = 0
|
var checked = 0
|
||||||
for slotIndex in 0'u16 ..< request.ask.slots.uint16:
|
for slotIndex in 0'u16 ..< request.ask.slots.uint16:
|
||||||
check items.anyIt(it == SlotQueueItem.init(request, slotIndex))
|
check items.anyIt(
|
||||||
|
it ==
|
||||||
|
SlotQueueItem.init(
|
||||||
|
request, slotIndex, collateral = request.ask.collateralPerSlot
|
||||||
|
)
|
||||||
|
)
|
||||||
inc checked
|
inc checked
|
||||||
check checked == items.len
|
check checked == items.len
|
||||||
|
|
||||||
@ -322,34 +329,17 @@ suite "Slot queue":
|
|||||||
check isOk queue.push(item3)
|
check isOk queue.push(item3)
|
||||||
check isOk queue.push(item4)
|
check isOk queue.push(item4)
|
||||||
|
|
||||||
test "populates item with exisiting request metadata":
|
|
||||||
newSlotQueue(maxSize = 8, maxWorkers = 1, processSlotDelay = 10.millis)
|
|
||||||
let request0 = StorageRequest.example
|
|
||||||
var request1 = StorageRequest.example
|
|
||||||
request1.ask.collateralPerByte += 1.u256
|
|
||||||
let items0 = SlotQueueItem.init(request0)
|
|
||||||
let items1 = SlotQueueItem.init(request1)
|
|
||||||
check queue.push(items0).isOk
|
|
||||||
check queue.push(items1).isOk
|
|
||||||
let populated = !queue.populateItem(request1.id, 12'u16)
|
|
||||||
check populated.requestId == request1.id
|
|
||||||
check populated.slotIndex == 12'u16
|
|
||||||
check populated.slotSize == request1.ask.slotSize
|
|
||||||
check populated.duration == request1.ask.duration
|
|
||||||
check populated.pricePerBytePerSecond == request1.ask.pricePerBytePerSecond
|
|
||||||
check populated.collateralPerByte == request1.ask.collateralPerByte
|
|
||||||
|
|
||||||
test "does not find exisiting request metadata":
|
|
||||||
newSlotQueue(maxSize = 2, maxWorkers = 2)
|
|
||||||
let item = SlotQueueItem.example
|
|
||||||
check queue.populateItem(item.requestId, 12'u16).isNone
|
|
||||||
|
|
||||||
test "can support uint16.high slots":
|
test "can support uint16.high slots":
|
||||||
var request = StorageRequest.example
|
var request = StorageRequest.example
|
||||||
let maxUInt16 = uint16.high
|
let maxUInt16 = uint16.high
|
||||||
let uint64Slots = uint64(maxUInt16)
|
let uint64Slots = uint64(maxUInt16)
|
||||||
request.ask.slots = uint64Slots
|
request.ask.slots = uint64Slots
|
||||||
let items = SlotQueueItem.init(request.id, request.ask, request.expiry)
|
let items = SlotQueueItem.init(
|
||||||
|
request.id,
|
||||||
|
request.ask,
|
||||||
|
request.expiry,
|
||||||
|
collateral = request.ask.collateralPerSlot,
|
||||||
|
)
|
||||||
check items.len.uint16 == maxUInt16
|
check items.len.uint16 == maxUInt16
|
||||||
|
|
||||||
test "cannot support greater than uint16.high slots":
|
test "cannot support greater than uint16.high slots":
|
||||||
@ -358,7 +348,12 @@ suite "Slot queue":
|
|||||||
let uint64Slots = uint64(int32Slots)
|
let uint64Slots = uint64(int32Slots)
|
||||||
request.ask.slots = uint64Slots
|
request.ask.slots = uint64Slots
|
||||||
expect SlotsOutOfRangeError:
|
expect SlotsOutOfRangeError:
|
||||||
discard SlotQueueItem.init(request.id, request.ask, request.expiry)
|
discard SlotQueueItem.init(
|
||||||
|
request.id,
|
||||||
|
request.ask,
|
||||||
|
request.expiry,
|
||||||
|
collateral = request.ask.collateralPerSlot,
|
||||||
|
)
|
||||||
|
|
||||||
test "cannot push duplicate items":
|
test "cannot push duplicate items":
|
||||||
newSlotQueue(maxSize = 6, maxWorkers = 1, processSlotDelay = 15.millis)
|
newSlotQueue(maxSize = 6, maxWorkers = 1, processSlotDelay = 15.millis)
|
||||||
@ -399,8 +394,10 @@ suite "Slot queue":
|
|||||||
let request0 = StorageRequest.example
|
let request0 = StorageRequest.example
|
||||||
var request1 = StorageRequest.example
|
var request1 = StorageRequest.example
|
||||||
request1.ask.collateralPerByte += 1.u256
|
request1.ask.collateralPerByte += 1.u256
|
||||||
let items0 = SlotQueueItem.init(request0)
|
let items0 =
|
||||||
let items1 = SlotQueueItem.init(request1)
|
SlotQueueItem.init(request0, collateral = request0.ask.collateralPerSlot)
|
||||||
|
let items1 =
|
||||||
|
SlotQueueItem.init(request1, collateral = request1.ask.collateralPerSlot)
|
||||||
check queue.push(items0).isOk
|
check queue.push(items0).isOk
|
||||||
check queue.push(items1).isOk
|
check queue.push(items1).isOk
|
||||||
let last = items1[items1.high]
|
let last = items1[items1.high]
|
||||||
@ -413,8 +410,10 @@ suite "Slot queue":
|
|||||||
let request0 = StorageRequest.example
|
let request0 = StorageRequest.example
|
||||||
var request1 = StorageRequest.example
|
var request1 = StorageRequest.example
|
||||||
request1.ask.collateralPerByte += 1.u256
|
request1.ask.collateralPerByte += 1.u256
|
||||||
let items0 = SlotQueueItem.init(request0)
|
let items0 =
|
||||||
let items1 = SlotQueueItem.init(request1)
|
SlotQueueItem.init(request0, collateral = request0.ask.collateralPerSlot)
|
||||||
|
let items1 =
|
||||||
|
SlotQueueItem.init(request1, collateral = request1.ask.collateralPerSlot)
|
||||||
check queue.push(items0).isOk
|
check queue.push(items0).isOk
|
||||||
check queue.push(items1).isOk
|
check queue.push(items1).isOk
|
||||||
queue.delete(request1.id)
|
queue.delete(request1.id)
|
||||||
@ -433,42 +432,56 @@ suite "Slot queue":
|
|||||||
request3.ask.collateralPerByte = request2.ask.collateralPerByte + 1
|
request3.ask.collateralPerByte = request2.ask.collateralPerByte + 1
|
||||||
request4.ask.collateralPerByte = request3.ask.collateralPerByte + 1
|
request4.ask.collateralPerByte = request3.ask.collateralPerByte + 1
|
||||||
request5.ask.collateralPerByte = request4.ask.collateralPerByte + 1
|
request5.ask.collateralPerByte = request4.ask.collateralPerByte + 1
|
||||||
let item0 = SlotQueueItem.init(request0, 0)
|
let item0 =
|
||||||
let item1 = SlotQueueItem.init(request1, 0)
|
SlotQueueItem.init(request0, 0, collateral = request0.ask.collateralPerSlot)
|
||||||
let item2 = SlotQueueItem.init(request2, 0)
|
let item1 =
|
||||||
let item3 = SlotQueueItem.init(request3, 0)
|
SlotQueueItem.init(request1, 0, collateral = request1.ask.collateralPerSlot)
|
||||||
let item4 = SlotQueueItem.init(request4, 0)
|
let item2 =
|
||||||
let item5 = SlotQueueItem.init(request5, 0)
|
SlotQueueItem.init(request2, 0, collateral = request2.ask.collateralPerSlot)
|
||||||
|
let item3 =
|
||||||
|
SlotQueueItem.init(request3, 0, collateral = request3.ask.collateralPerSlot)
|
||||||
|
let item4 =
|
||||||
|
SlotQueueItem.init(request4, 0, collateral = request4.ask.collateralPerSlot)
|
||||||
|
let item5 =
|
||||||
|
SlotQueueItem.init(request5, 0, collateral = request5.ask.collateralPerSlot)
|
||||||
check queue.contains(item5) == false
|
check queue.contains(item5) == false
|
||||||
check queue.push(@[item0, item1, item2, item3, item4, item5]).isOk
|
check queue.push(@[item0, item1, item2, item3, item4, item5]).isOk
|
||||||
check queue.contains(item5)
|
check queue.contains(item5)
|
||||||
|
|
||||||
test "sorts items by profitability descending (higher pricePerBytePerSecond == higher priority == goes first in the list)":
|
test "sorts items by profitability descending (higher pricePerBytePerSecond == higher priority == goes first in the list)":
|
||||||
var request = StorageRequest.example
|
var request = StorageRequest.example
|
||||||
let item0 = SlotQueueItem.init(request, 0)
|
let item0 =
|
||||||
|
SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot)
|
||||||
request.ask.pricePerBytePerSecond += 1.u256
|
request.ask.pricePerBytePerSecond += 1.u256
|
||||||
let item1 = SlotQueueItem.init(request, 1)
|
let item1 =
|
||||||
|
SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot)
|
||||||
check item1 < item0
|
check item1 < item0
|
||||||
|
|
||||||
test "sorts items by collateral ascending (higher required collateralPerByte = lower priority == comes later in the list)":
|
test "sorts items by collateral ascending (higher required collateral = lower priority == comes later in the list)":
|
||||||
var request = StorageRequest.example
|
var request = StorageRequest.example
|
||||||
let item0 = SlotQueueItem.init(request, 0)
|
let item0 =
|
||||||
request.ask.collateralPerByte += 1.u256
|
SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot)
|
||||||
let item1 = SlotQueueItem.init(request, 1)
|
let item1 = SlotQueueItem.init(
|
||||||
|
request, 1, collateral = request.ask.collateralPerSlot + 1.u256
|
||||||
|
)
|
||||||
check item1 > item0
|
check item1 > item0
|
||||||
|
|
||||||
test "sorts items by expiry descending (longer expiry = higher priority)":
|
test "sorts items by expiry descending (longer expiry = higher priority)":
|
||||||
var request = StorageRequest.example
|
var request = StorageRequest.example
|
||||||
let item0 = SlotQueueItem.init(request, 0)
|
let item0 =
|
||||||
|
SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot)
|
||||||
request.expiry += 1
|
request.expiry += 1
|
||||||
let item1 = SlotQueueItem.init(request, 1)
|
let item1 =
|
||||||
|
SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot)
|
||||||
check item1 < item0
|
check item1 < item0
|
||||||
|
|
||||||
test "sorts items by slot size descending (bigger dataset = higher profitability = higher priority)":
|
test "sorts items by slot size descending (bigger dataset = higher profitability = higher priority)":
|
||||||
var request = StorageRequest.example
|
var request = StorageRequest.example
|
||||||
let item0 = SlotQueueItem.init(request, 0)
|
let item0 =
|
||||||
|
SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot)
|
||||||
request.ask.slotSize += 1
|
request.ask.slotSize += 1
|
||||||
let item1 = SlotQueueItem.init(request, 1)
|
let item1 =
|
||||||
|
SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot)
|
||||||
check item1 < item0
|
check item1 < item0
|
||||||
|
|
||||||
test "should call callback once an item is added":
|
test "should call callback once an item is added":
|
||||||
@ -489,13 +502,17 @@ suite "Slot queue":
|
|||||||
# sleeping after push allows the slotqueue loop to iterate,
|
# sleeping after push allows the slotqueue loop to iterate,
|
||||||
# calling the callback for each pushed/updated item
|
# calling the callback for each pushed/updated item
|
||||||
var request = StorageRequest.example
|
var request = StorageRequest.example
|
||||||
let item0 = SlotQueueItem.init(request, 0)
|
let item0 =
|
||||||
|
SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot)
|
||||||
request.ask.pricePerBytePerSecond += 1.u256
|
request.ask.pricePerBytePerSecond += 1.u256
|
||||||
let item1 = SlotQueueItem.init(request, 1)
|
let item1 =
|
||||||
|
SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot)
|
||||||
request.ask.pricePerBytePerSecond += 1.u256
|
request.ask.pricePerBytePerSecond += 1.u256
|
||||||
let item2 = SlotQueueItem.init(request, 2)
|
let item2 =
|
||||||
|
SlotQueueItem.init(request, 2, collateral = request.ask.collateralPerSlot)
|
||||||
request.ask.pricePerBytePerSecond += 1.u256
|
request.ask.pricePerBytePerSecond += 1.u256
|
||||||
let item3 = SlotQueueItem.init(request, 3)
|
let item3 =
|
||||||
|
SlotQueueItem.init(request, 3, collateral = request.ask.collateralPerSlot)
|
||||||
|
|
||||||
check queue.push(item0).isOk
|
check queue.push(item0).isOk
|
||||||
await sleepAsync(1.millis)
|
await sleepAsync(1.millis)
|
||||||
@ -520,13 +537,17 @@ suite "Slot queue":
|
|||||||
# sleeping after push allows the slotqueue loop to iterate,
|
# sleeping after push allows the slotqueue loop to iterate,
|
||||||
# calling the callback for each pushed/updated item
|
# calling the callback for each pushed/updated item
|
||||||
var request = StorageRequest.example
|
var request = StorageRequest.example
|
||||||
let item0 = SlotQueueItem.init(request, 0)
|
let item0 =
|
||||||
|
SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot)
|
||||||
request.ask.pricePerBytePerSecond += 1.u256
|
request.ask.pricePerBytePerSecond += 1.u256
|
||||||
let item1 = SlotQueueItem.init(request, 1)
|
let item1 =
|
||||||
|
SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot)
|
||||||
request.ask.pricePerBytePerSecond += 1.u256
|
request.ask.pricePerBytePerSecond += 1.u256
|
||||||
let item2 = SlotQueueItem.init(request, 2)
|
let item2 =
|
||||||
|
SlotQueueItem.init(request, 2, collateral = request.ask.collateralPerSlot)
|
||||||
request.ask.pricePerBytePerSecond += 1.u256
|
request.ask.pricePerBytePerSecond += 1.u256
|
||||||
let item3 = SlotQueueItem.init(request, 3)
|
let item3 =
|
||||||
|
SlotQueueItem.init(request, 3, collateral = request.ask.collateralPerSlot)
|
||||||
|
|
||||||
check queue.push(item0).isOk
|
check queue.push(item0).isOk
|
||||||
check queue.push(item1).isOk
|
check queue.push(item1).isOk
|
||||||
@ -550,7 +571,7 @@ suite "Slot queue":
|
|||||||
queue.pause
|
queue.pause
|
||||||
|
|
||||||
let request = StorageRequest.example
|
let request = StorageRequest.example
|
||||||
var items = SlotQueueItem.init(request)
|
var items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot)
|
||||||
check queue.push(items).isOk
|
check queue.push(items).isOk
|
||||||
# check all items processed
|
# check all items processed
|
||||||
check eventually queue.len == 0
|
check eventually queue.len == 0
|
||||||
@ -558,8 +579,14 @@ suite "Slot queue":
|
|||||||
test "pushing seen item does not unpause queue":
|
test "pushing seen item does not unpause queue":
|
||||||
newSlotQueue(maxSize = 4, maxWorkers = 4)
|
newSlotQueue(maxSize = 4, maxWorkers = 4)
|
||||||
let request = StorageRequest.example
|
let request = StorageRequest.example
|
||||||
let item0 =
|
let item0 = SlotQueueItem.init(
|
||||||
SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = true)
|
request.id,
|
||||||
|
0'u16,
|
||||||
|
request.ask,
|
||||||
|
request.expiry,
|
||||||
|
request.ask.collateralPerSlot,
|
||||||
|
seen = true,
|
||||||
|
)
|
||||||
check queue.paused
|
check queue.paused
|
||||||
check queue.push(item0).isOk
|
check queue.push(item0).isOk
|
||||||
check queue.paused
|
check queue.paused
|
||||||
@ -567,8 +594,14 @@ suite "Slot queue":
|
|||||||
test "paused queue waits for unpause before continuing processing":
|
test "paused queue waits for unpause before continuing processing":
|
||||||
newSlotQueue(maxSize = 4, maxWorkers = 4)
|
newSlotQueue(maxSize = 4, maxWorkers = 4)
|
||||||
let request = StorageRequest.example
|
let request = StorageRequest.example
|
||||||
let item =
|
let item = SlotQueueItem.init(
|
||||||
SlotQueueItem.init(request.id, 1'u16, request.ask, request.expiry, seen = false)
|
request.id,
|
||||||
|
1'u16,
|
||||||
|
request.ask,
|
||||||
|
request.expiry,
|
||||||
|
request.ask.collateralPerSlot,
|
||||||
|
seen = false,
|
||||||
|
)
|
||||||
check queue.paused
|
check queue.paused
|
||||||
# push causes unpause
|
# push causes unpause
|
||||||
check queue.push(item).isOk
|
check queue.push(item).isOk
|
||||||
@ -579,10 +612,22 @@ suite "Slot queue":
|
|||||||
test "processing a 'seen' item pauses the queue":
|
test "processing a 'seen' item pauses the queue":
|
||||||
newSlotQueue(maxSize = 4, maxWorkers = 4)
|
newSlotQueue(maxSize = 4, maxWorkers = 4)
|
||||||
let request = StorageRequest.example
|
let request = StorageRequest.example
|
||||||
let unseen =
|
let unseen = SlotQueueItem.init(
|
||||||
SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = false)
|
request.id,
|
||||||
let seen =
|
0'u16,
|
||||||
SlotQueueItem.init(request.id, 1'u16, request.ask, request.expiry, seen = true)
|
request.ask,
|
||||||
|
request.expiry,
|
||||||
|
request.ask.collateralPerSlot,
|
||||||
|
seen = false,
|
||||||
|
)
|
||||||
|
let seen = SlotQueueItem.init(
|
||||||
|
request.id,
|
||||||
|
1'u16,
|
||||||
|
request.ask,
|
||||||
|
request.expiry,
|
||||||
|
request.ask.collateralPerSlot,
|
||||||
|
seen = true,
|
||||||
|
)
|
||||||
# push causes unpause
|
# push causes unpause
|
||||||
check queue.push(unseen).isSuccess
|
check queue.push(unseen).isSuccess
|
||||||
# check all items processed
|
# check all items processed
|
||||||
@ -595,10 +640,22 @@ suite "Slot queue":
|
|||||||
test "processing a 'seen' item does not decrease the number of workers":
|
test "processing a 'seen' item does not decrease the number of workers":
|
||||||
newSlotQueue(maxSize = 4, maxWorkers = 4)
|
newSlotQueue(maxSize = 4, maxWorkers = 4)
|
||||||
let request = StorageRequest.example
|
let request = StorageRequest.example
|
||||||
let unseen =
|
let unseen = SlotQueueItem.init(
|
||||||
SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = false)
|
request.id,
|
||||||
let seen =
|
0'u16,
|
||||||
SlotQueueItem.init(request.id, 1'u16, request.ask, request.expiry, seen = true)
|
request.ask,
|
||||||
|
request.expiry,
|
||||||
|
request.ask.collateralPerSlot,
|
||||||
|
seen = false,
|
||||||
|
)
|
||||||
|
let seen = SlotQueueItem.init(
|
||||||
|
request.id,
|
||||||
|
1'u16,
|
||||||
|
request.ask,
|
||||||
|
request.expiry,
|
||||||
|
request.ask.collateralPerSlot,
|
||||||
|
seen = true,
|
||||||
|
)
|
||||||
# push seen item to ensure that queue is pausing
|
# push seen item to ensure that queue is pausing
|
||||||
check queue.push(seen).isSuccess
|
check queue.push(seen).isSuccess
|
||||||
# unpause and pause a number of times
|
# unpause and pause a number of times
|
||||||
@ -615,10 +672,22 @@ suite "Slot queue":
|
|||||||
test "item 'seen' flags can be cleared":
|
test "item 'seen' flags can be cleared":
|
||||||
newSlotQueue(maxSize = 4, maxWorkers = 1)
|
newSlotQueue(maxSize = 4, maxWorkers = 1)
|
||||||
let request = StorageRequest.example
|
let request = StorageRequest.example
|
||||||
let item0 =
|
let item0 = SlotQueueItem.init(
|
||||||
SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = true)
|
request.id,
|
||||||
let item1 =
|
0'u16,
|
||||||
SlotQueueItem.init(request.id, 1'u16, request.ask, request.expiry, seen = true)
|
request.ask,
|
||||||
|
request.expiry,
|
||||||
|
request.ask.collateralPerSlot,
|
||||||
|
seen = true,
|
||||||
|
)
|
||||||
|
let item1 = SlotQueueItem.init(
|
||||||
|
request.id,
|
||||||
|
1'u16,
|
||||||
|
request.ask,
|
||||||
|
request.expiry,
|
||||||
|
request.ask.collateralPerSlot,
|
||||||
|
seen = true,
|
||||||
|
)
|
||||||
check queue.push(item0).isOk
|
check queue.push(item0).isOk
|
||||||
check queue.push(item1).isOk
|
check queue.push(item1).isOk
|
||||||
check queue[0].seen
|
check queue[0].seen
|
||||||
|
|||||||
@ -598,6 +598,37 @@ ethersuite "On-Chain Market":
|
|||||||
check endBalanceHost == (startBalanceHost + request.ask.collateralPerSlot)
|
check endBalanceHost == (startBalanceHost + request.ask.collateralPerSlot)
|
||||||
check endBalanceReward == (startBalanceReward + expectedPayout)
|
check endBalanceReward == (startBalanceReward + expectedPayout)
|
||||||
|
|
||||||
|
test "returns the collateral when the slot is not being repaired":
|
||||||
|
await market.requestStorage(request)
|
||||||
|
await market.reserveSlot(request.id, 0.uint64)
|
||||||
|
await market.fillSlot(request.id, 0.uint64, proof, request.ask.collateralPerSlot)
|
||||||
|
|
||||||
|
let slotId = request.slotId(0.uint64)
|
||||||
|
without collateral =? await market.slotCollateral(request.id, 0.uint64), error:
|
||||||
|
fail()
|
||||||
|
|
||||||
|
check collateral == request.ask.collateralPerSlot
|
||||||
|
|
||||||
|
test "calculates correctly the collateral when the slot is being repaired":
|
||||||
|
# Ensure that the config is loaded and repairRewardPercentage is available
|
||||||
|
discard await market.repairRewardPercentage()
|
||||||
|
|
||||||
|
await market.requestStorage(request)
|
||||||
|
await market.reserveSlot(request.id, 0.uint64)
|
||||||
|
await market.fillSlot(request.id, 0.uint64, proof, request.ask.collateralPerSlot)
|
||||||
|
await market.freeSlot(slotId(request.id, 0.uint64))
|
||||||
|
|
||||||
|
let slotId = request.slotId(0.uint64)
|
||||||
|
|
||||||
|
without collateral =? await market.slotCollateral(request.id, 0.uint64), error:
|
||||||
|
fail()
|
||||||
|
|
||||||
|
# slotCollateral
|
||||||
|
# repairRewardPercentage = 10
|
||||||
|
# expected collateral = slotCollateral - slotCollateral * 0.1
|
||||||
|
check collateral ==
|
||||||
|
request.ask.collateralPerSlot - (request.ask.collateralPerSlot * 10).div(100.u256)
|
||||||
|
|
||||||
test "the request is added in cache after the fist access":
|
test "the request is added in cache after the fist access":
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
|
|
||||||
|
|||||||
@ -72,7 +72,9 @@ proc example*(_: type Slot): Slot =
|
|||||||
proc example*(_: type SlotQueueItem): SlotQueueItem =
|
proc example*(_: type SlotQueueItem): SlotQueueItem =
|
||||||
let request = StorageRequest.example
|
let request = StorageRequest.example
|
||||||
let slot = Slot.example
|
let slot = Slot.example
|
||||||
SlotQueueItem.init(request, slot.slotIndex.uint16)
|
SlotQueueItem.init(
|
||||||
|
request, slot.slotIndex.uint16, collateral = request.ask.collateralPerSlot
|
||||||
|
)
|
||||||
|
|
||||||
proc example(_: type G1Point): G1Point =
|
proc example(_: type G1Point): G1Point =
|
||||||
G1Point(x: UInt256.example, y: UInt256.example)
|
G1Point(x: UInt256.example, y: UInt256.example)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user