mirror of
https://github.com/codex-storage/nim-codex.git
synced 2025-02-20 08:38:29 +00:00
[marketplace] add reservations tests, and get/exists to api
This commit is contained in:
parent
ffe3b4511b
commit
1a0012e80a
@ -1,7 +1,6 @@
|
|||||||
import pkg/questionable
|
import pkg/questionable
|
||||||
import pkg/upraises
|
import pkg/upraises
|
||||||
import pkg/stint
|
import pkg/stint
|
||||||
import pkg/nimcrypto
|
|
||||||
import pkg/chronicles
|
import pkg/chronicles
|
||||||
import pkg/datastore
|
import pkg/datastore
|
||||||
import ./rng
|
import ./rng
|
||||||
@ -48,7 +47,7 @@ func new*(_: type Sales,
|
|||||||
market: market,
|
market: market,
|
||||||
clock: clock,
|
clock: clock,
|
||||||
proving: proving,
|
proving: proving,
|
||||||
reservations: Reservations.new(repo: repo, data: data)
|
reservations: Reservations.new(repo, data)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -104,13 +103,10 @@ proc load*(sales: Sales) {.async.} =
|
|||||||
for slotId in slotIds:
|
for slotId in slotIds:
|
||||||
# TODO: this needs to be optimised
|
# TODO: this needs to be optimised
|
||||||
if request =? await market.getRequestFromSlotId(slotId):
|
if request =? await market.getRequestFromSlotId(slotId):
|
||||||
without availability =? await sales.reservations.find(request.ask.slotSize,
|
let availability = await sales.reservations.find(request.ask.slotSize,
|
||||||
request.ask.duration,
|
request.ask.duration,
|
||||||
request.ask.pricePerSlot,
|
request.ask.pricePerSlot,
|
||||||
used = true):
|
used = true)
|
||||||
# TODO: when slot is filled on chain, but no local availability, how
|
|
||||||
# should this be handled?
|
|
||||||
raiseAssert "failed to find availability"
|
|
||||||
|
|
||||||
without slotIndex =? findSlotIndex(request.ask.slots,
|
without slotIndex =? findSlotIndex(request.ask.slots,
|
||||||
request.id,
|
request.id,
|
||||||
@ -122,7 +118,7 @@ proc load*(sales: Sales) {.async.} =
|
|||||||
request.id,
|
request.id,
|
||||||
# TODO: change availability to be non-optional? It doesn't make sense to move
|
# TODO: change availability to be non-optional? It doesn't make sense to move
|
||||||
# forward with the sales process at this point if there is no availability
|
# forward with the sales process at this point if there is no availability
|
||||||
some availability,
|
availability,
|
||||||
some slotIndex,
|
some slotIndex,
|
||||||
some request)
|
some request)
|
||||||
|
|
||||||
@ -152,3 +148,5 @@ proc stop*(sales: Sales) {.async.} =
|
|||||||
for agent in sales.agents:
|
for agent in sales.agents:
|
||||||
await agent.stop()
|
await agent.stop()
|
||||||
|
|
||||||
|
await sales.reservations.stop
|
||||||
|
|
||||||
|
@ -14,6 +14,7 @@ import pkg/upraises
|
|||||||
import pkg/json_serialization
|
import pkg/json_serialization
|
||||||
import pkg/json_serialization/std/options
|
import pkg/json_serialization/std/options
|
||||||
import pkg/stint
|
import pkg/stint
|
||||||
|
import pkg/nimcrypto
|
||||||
|
|
||||||
push: {.upraises: [].}
|
push: {.upraises: [].}
|
||||||
|
|
||||||
@ -24,109 +25,154 @@ import ../namespaces
|
|||||||
import ../contracts/requests
|
import ../contracts/requests
|
||||||
|
|
||||||
type
|
type
|
||||||
|
AvailabilityId* = distinct array[32, byte]
|
||||||
Availability* = object
|
Availability* = object
|
||||||
id*: array[32, byte]
|
id*: AvailabilityId
|
||||||
size*: UInt256
|
size*: UInt256
|
||||||
duration*: UInt256
|
duration*: UInt256
|
||||||
minPrice*: UInt256
|
minPrice*: UInt256
|
||||||
slotId*: ?SlotId
|
slotId*: ?SlotId
|
||||||
Reservations* = object
|
Reservations* = ref object
|
||||||
|
started*: bool
|
||||||
repo: RepoStore
|
repo: RepoStore
|
||||||
persist: Datastore
|
persist: Datastore
|
||||||
# AvailabilityNotExistsError* = object of CodexError
|
GetNext* = proc(): Future[?Availability] {.upraises: [], gcsafe, closure.}
|
||||||
|
AvailabilityIter* = ref object
|
||||||
|
finished*: bool
|
||||||
|
next*: GetNext
|
||||||
|
AvailabilityError* = object of CodexError
|
||||||
|
innerException*: ref CatchableError
|
||||||
|
AvailabilityNotExistsError* = object of AvailabilityError
|
||||||
|
AvailabilityAlreadyExistsError* = object of AvailabilityError
|
||||||
|
AvailabilityReserveFailedError* = object of AvailabilityError
|
||||||
|
AvailabilityReleaseFailedError* = object of AvailabilityError
|
||||||
|
AvailabilityDeleteFailedError* = object of AvailabilityError
|
||||||
|
AvailabilityPutFailedError* = object of AvailabilityError
|
||||||
|
AvailabilityGetFailedError* = object of AvailabilityError
|
||||||
|
|
||||||
const
|
const
|
||||||
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
|
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
|
||||||
ReservationsKey = (SalesKey / "reservations").tryGet
|
ReservationsKey = (SalesKey / "reservations").tryGet
|
||||||
|
|
||||||
proc new*(T: type Reservations,
|
proc new*(
|
||||||
|
T: type Reservations,
|
||||||
repo: RepoStore,
|
repo: RepoStore,
|
||||||
data: Datastore): Reservations =
|
data: Datastore): Reservations =
|
||||||
|
|
||||||
T(repo: repo, persist: data)
|
T(repo: repo, persist: data)
|
||||||
|
|
||||||
proc init*(_: type Availability,
|
proc start*(self: Reservations) {.async.} =
|
||||||
|
if self.started:
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.repo.start()
|
||||||
|
self.started = true
|
||||||
|
|
||||||
|
proc stop*(self: Reservations) {.async.} =
|
||||||
|
if not self.started:
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.repo.stop()
|
||||||
|
(await self.persist.close()).expect("Should close meta store!")
|
||||||
|
|
||||||
|
self.started = false
|
||||||
|
|
||||||
|
proc init*(
|
||||||
|
_: type Availability,
|
||||||
size: UInt256,
|
size: UInt256,
|
||||||
duration: UInt256,
|
duration: UInt256,
|
||||||
minPrice: UInt256): Availability =
|
minPrice: UInt256): Availability =
|
||||||
|
|
||||||
var id: array[32, byte]
|
var id: array[32, byte]
|
||||||
doAssert randomBytes(id) == 32
|
doAssert randomBytes(id) == 32
|
||||||
Availability(id: id, size: size, duration: duration, minPrice: minPrice)
|
Availability(id: AvailabilityId(id), size: size, duration: duration, minPrice: minPrice)
|
||||||
|
|
||||||
proc key(availability: Availability): ?!Key =
|
func toArray*(id: AvailabilityId): array[32, byte] =
|
||||||
(ReservationsKey / $availability.id)
|
array[32, byte](id)
|
||||||
|
|
||||||
|
proc `==`*(x, y: AvailabilityId): bool {.borrow.}
|
||||||
|
|
||||||
|
proc toErr[E1: ref CatchableError, E2: AvailabilityError](
|
||||||
|
e1: E1,
|
||||||
|
_: type E2,
|
||||||
|
msg: string = "see inner exception"): ref E2 =
|
||||||
|
|
||||||
|
let e2 = newException(E2, msg)
|
||||||
|
e2.innerException = e1
|
||||||
|
return e2
|
||||||
|
|
||||||
|
proc writeValue*(
|
||||||
|
writer: var JsonWriter,
|
||||||
|
value: SlotId | AvailabilityId) {.raises:[IOError].} =
|
||||||
|
|
||||||
proc writeValue*(writer: var JsonWriter, value: SlotId) {.raises:[IOError].} =
|
|
||||||
mixin writeValue
|
mixin writeValue
|
||||||
writer.writeValue value.toArray
|
writer.writeValue value.toArray
|
||||||
|
|
||||||
proc readValue*(reader: var JsonReader, value: var SlotId)
|
proc readValue*[T: SlotId | AvailabilityId](
|
||||||
{.raises: [SerializationError, IOError].} =
|
reader: var JsonReader,
|
||||||
|
value: var T) {.raises: [SerializationError, IOError].} =
|
||||||
|
|
||||||
mixin readValue
|
mixin readValue
|
||||||
value = SlotId reader.readValue(SlotId.distinctBase)
|
value = T reader.readValue(T.distinctBase)
|
||||||
|
|
||||||
proc available*(self: Reservations): uint =
|
func used*(availability: Availability): bool =
|
||||||
|
availability.slotId.isSome
|
||||||
|
|
||||||
|
func key(id: AvailabilityId): ?!Key =
|
||||||
|
(ReservationsKey / id.toArray.toHex)
|
||||||
|
|
||||||
|
func key*(availability: Availability): ?!Key =
|
||||||
|
return availability.id.key
|
||||||
|
|
||||||
|
func available*(self: Reservations): uint =
|
||||||
return self.repo.quotaMaxBytes - self.repo.totalUsed
|
return self.repo.quotaMaxBytes - self.repo.totalUsed
|
||||||
|
|
||||||
proc available*(self: Reservations, bytes: uint): bool =
|
func available*(self: Reservations, bytes: uint): bool =
|
||||||
return bytes < self.available()
|
return bytes < self.available()
|
||||||
|
|
||||||
proc reserve*(self: Reservations,
|
proc exists*(
|
||||||
availability: Availability): Future[?!void] {.async.} =
|
self: Reservations,
|
||||||
|
id: AvailabilityId): Future[?!bool] {.async.} =
|
||||||
|
|
||||||
# TODO: reconcile data sizes -- availability uses UInt256 and RepoStore
|
without key =? id.key, err:
|
||||||
# uses uint, thus the need to truncate
|
|
||||||
if err =? (await self.repo.reserve(
|
|
||||||
availability.size.truncate(uint))).errorOption:
|
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
without key =? availability.key, err:
|
let exists = await self.persist.contains(key)
|
||||||
|
return success(exists)
|
||||||
|
|
||||||
|
proc get*(
|
||||||
|
self: Reservations,
|
||||||
|
id: AvailabilityId): Future[?!Availability] {.async.} =
|
||||||
|
|
||||||
|
if exists =? (await self.exists(id)) and not exists:
|
||||||
|
let err = newException(AvailabilityNotExistsError,
|
||||||
|
"Availability does not exist")
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
if err =? (await self.persist.put(
|
without key =? id.key, err:
|
||||||
key,
|
|
||||||
@(availability.toJson.toBytes))).errorOption:
|
|
||||||
return failure(err)
|
|
||||||
|
|
||||||
return success()
|
|
||||||
|
|
||||||
# TODO: call site not yet determined. Perhaps reuse of Availabilty should be set
|
|
||||||
# on creation (from the REST endpoint). Reusable availability wouldn't get
|
|
||||||
# released after contract completion. Non-reusable availability would.
|
|
||||||
proc release*(self: Reservations,
|
|
||||||
availability: Availability): Future[?!void] {.async.} =
|
|
||||||
|
|
||||||
# TODO: reconcile data sizes -- availability uses UInt256 and RepoStore
|
|
||||||
# uses uint, thus the need to truncate
|
|
||||||
if err =? (await self.repo.release(
|
|
||||||
availability.size.truncate(uint))).errorOption:
|
|
||||||
return failure(err)
|
|
||||||
|
|
||||||
without key =? availability.key, err:
|
|
||||||
return failure(err)
|
|
||||||
|
|
||||||
if err =? (await self.persist.delete(key)).errorOption:
|
|
||||||
return failure(err)
|
|
||||||
|
|
||||||
return success()
|
|
||||||
|
|
||||||
proc update(self: Reservations,
|
|
||||||
availability: Availability,
|
|
||||||
slotId: ?SlotId): Future[?!void] {.async.} =
|
|
||||||
|
|
||||||
without key =? availability.key, err:
|
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
without serialized =? await self.persist.get(key), err:
|
without serialized =? await self.persist.get(key), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
without var updated =? Json.decode(serialized, Availability).catch, err:
|
without availability =? Json.decode(serialized, Availability).catch, err:
|
||||||
|
return failure(err)
|
||||||
|
|
||||||
|
return success availability
|
||||||
|
|
||||||
|
proc update(
|
||||||
|
self: Reservations,
|
||||||
|
availability: Availability,
|
||||||
|
slotId: ?SlotId): Future[?!void] {.async.} =
|
||||||
|
|
||||||
|
without var updated =? await self.get(availability.id), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
updated.slotId = slotId
|
updated.slotId = slotId
|
||||||
|
|
||||||
|
without key =? availability.key, err:
|
||||||
|
return failure(err)
|
||||||
|
|
||||||
if err =? (await self.persist.put(
|
if err =? (await self.persist.put(
|
||||||
key,
|
key,
|
||||||
@(updated.toJson.toBytes))).errorOption:
|
@(updated.toJson.toBytes))).errorOption:
|
||||||
@ -134,36 +180,120 @@ proc update(self: Reservations,
|
|||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
proc markUsed*(self: Reservations,
|
proc reserve*(
|
||||||
|
self: Reservations,
|
||||||
|
availability: Availability): Future[Result[void, ref AvailabilityError]] {.async.} =
|
||||||
|
|
||||||
|
if exists =? (await self.exists(availability.id)) and exists:
|
||||||
|
let err = newException(AvailabilityAlreadyExistsError,
|
||||||
|
"Availability already exists")
|
||||||
|
return failure(err)
|
||||||
|
|
||||||
|
without key =? availability.key, err:
|
||||||
|
return failure(err.toErr(AvailabilityError))
|
||||||
|
|
||||||
|
if err =? (await self.persist.put(
|
||||||
|
key,
|
||||||
|
@(availability.toJson.toBytes))).errorOption:
|
||||||
|
return failure(err.toErr(AvailabilityError))
|
||||||
|
|
||||||
|
# TODO: reconcile data sizes -- availability uses UInt256 and RepoStore
|
||||||
|
# uses uint, thus the need to truncate
|
||||||
|
if reserveInnerErr =? (await self.repo.reserve(
|
||||||
|
availability.size.truncate(uint))).errorOption:
|
||||||
|
|
||||||
|
var reserveErr = reserveInnerErr.toErr(AvailabilityReserveFailedError)
|
||||||
|
|
||||||
|
# rollback persisted availability
|
||||||
|
if rollbackInnerErr =? (await self.persist.delete(key)).errorOption:
|
||||||
|
let rollbackErr = rollbackInnerErr.toErr(AvailabilityDeleteFailedError,
|
||||||
|
"Failed to delete persisted availability during rollback")
|
||||||
|
reserveErr.innerException = rollbackErr
|
||||||
|
|
||||||
|
return failure(reserveErr)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
# TODO: call site not yet determined. Perhaps reuse of Availabilty should be set
|
||||||
|
# on creation (from the REST endpoint). Reusable availability wouldn't get
|
||||||
|
# released after contract completion. Non-reusable availability would.
|
||||||
|
proc release*(
|
||||||
|
self: Reservations,
|
||||||
|
id: AvailabilityId): Future[Result[void, ref AvailabilityError]] {.async.} =
|
||||||
|
|
||||||
|
without availability =? (await self.get(id)), err:
|
||||||
|
return failure(err.toErr(AvailabilityGetFailedError))
|
||||||
|
|
||||||
|
without key =? id.key, err:
|
||||||
|
return failure(err.toErr(AvailabilityError))
|
||||||
|
|
||||||
|
if err =? (await self.persist.delete(key)).errorOption:
|
||||||
|
return failure(err.toErr(AvailabilityDeleteFailedError))
|
||||||
|
|
||||||
|
# TODO: reconcile data sizes -- availability uses UInt256 and RepoStore
|
||||||
|
# uses uint, thus the need to truncate
|
||||||
|
if releaseInnerErr =? (await self.repo.release(
|
||||||
|
availability.size.truncate(uint))).errorOption:
|
||||||
|
|
||||||
|
var releaseErr = releaseInnerErr.toErr(AvailabilityReleaseFailedError)
|
||||||
|
|
||||||
|
# rollback delete
|
||||||
|
if rollbackInnerErr =? (await self.persist.put(
|
||||||
|
key,
|
||||||
|
@(availability.toJson.toBytes))).errorOption:
|
||||||
|
|
||||||
|
var rollbackErr = rollbackInnerErr.toErr(AvailabilityPutFailedError,
|
||||||
|
"Failed to restore persisted availability during rollback")
|
||||||
|
releaseErr.innerException = rollbackErr
|
||||||
|
|
||||||
|
return failure(releaseErr)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
|
||||||
|
proc markUsed*(
|
||||||
|
self: Reservations,
|
||||||
availability: Availability,
|
availability: Availability,
|
||||||
slotId: SlotId): Future[?!void] {.async.} =
|
slotId: SlotId): Future[?!void] {.async.} =
|
||||||
|
|
||||||
return await self.update(availability, some slotId)
|
return await self.update(availability, some slotId)
|
||||||
|
|
||||||
proc markUnused*(self: Reservations,
|
proc markUnused*(
|
||||||
|
self: Reservations,
|
||||||
availability: Availability): Future[?!void] {.async.} =
|
availability: Availability): Future[?!void] {.async.} =
|
||||||
|
|
||||||
return await self.update(availability, none SlotId)
|
return await self.update(availability, none SlotId)
|
||||||
|
|
||||||
proc availabilities*(self: Reservations): Future[?!seq[Availability]] {.async.} =
|
iterator items*(self: AvailabilityIter): Future[?Availability] =
|
||||||
var availabilities: seq[Availability] = @[]
|
while not self.finished:
|
||||||
|
yield self.next()
|
||||||
|
|
||||||
|
proc availabilities*(
|
||||||
|
self: Reservations): Future[?!AvailabilityIter] {.async.} =
|
||||||
|
|
||||||
|
var iter = AvailabilityIter()
|
||||||
let query = Query.init(ReservationsKey)
|
let query = Query.init(ReservationsKey)
|
||||||
|
|
||||||
without results =? await self.persist.query(query), err:
|
without results =? await self.persist.query(query), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
for fItem in results.items:
|
proc next(): Future[?Availability] {.async.} =
|
||||||
without item =? (await fItem), err:
|
await idleAsync()
|
||||||
return failure(err)
|
iter.finished = results.finished
|
||||||
|
if not results.finished and
|
||||||
|
r =? (await results.next()) and
|
||||||
|
serialized =? r.data and
|
||||||
|
serialized.len > 0:
|
||||||
|
|
||||||
let serialized = $ item.data
|
return some Json.decode(string.fromBytes(serialized), Availability)
|
||||||
without availability =? Json.decode(serialized, Availability).catch, err:
|
|
||||||
return failure(err)
|
|
||||||
availabilities.add availability
|
|
||||||
|
|
||||||
return success(availabilities)
|
return none Availability
|
||||||
|
|
||||||
proc find*(self: Reservations,
|
iter.next = next
|
||||||
|
return success iter
|
||||||
|
|
||||||
|
proc find*(
|
||||||
|
self: Reservations,
|
||||||
size, duration, minPrice: UInt256,
|
size, duration, minPrice: UInt256,
|
||||||
used: bool): Future[?Availability] {.async.} =
|
used: bool): Future[?Availability] {.async.} =
|
||||||
|
|
||||||
@ -171,9 +301,10 @@ proc find*(self: Reservations,
|
|||||||
error "failed to get all availabilities", error = err.msg
|
error "failed to get all availabilities", error = err.msg
|
||||||
return none Availability
|
return none Availability
|
||||||
|
|
||||||
for availability in availabilities:
|
for a in availabilities:
|
||||||
let satisfiesUsed = (used and availability.slotId.isSome) or
|
if availability =? (await a):
|
||||||
(not used and availability.slotId.isNone)
|
let satisfiesUsed = (used and availability.used) or
|
||||||
|
(not used and not availability.used)
|
||||||
if satisfiesUsed and
|
if satisfiesUsed and
|
||||||
size <= availability.size and
|
size <= availability.size and
|
||||||
duration <= availability.duration and
|
duration <= availability.duration and
|
||||||
|
@ -40,9 +40,10 @@ method enterAsync(state: SaleDownloading) {.async.} =
|
|||||||
raiseAssert "no sale request"
|
raiseAssert "no sale request"
|
||||||
|
|
||||||
if availability =? agent.availability:
|
if availability =? agent.availability:
|
||||||
if err =? (await agent.sales.reservations.markUsed(availability,
|
if err =? (await agent.sales.reservations.markUsed(
|
||||||
request.slotId(slotIndex))).errorOption:
|
availability,
|
||||||
raiseAssert "failed to mark availability as used"
|
request.slotId(agent.slotIndex))).errorOption:
|
||||||
|
raiseAssert "failed to mark availability as used, error: " & err.msg
|
||||||
|
|
||||||
await onStore(request, agent.slotIndex, agent.availability)
|
await onStore(request, agent.slotIndex, agent.availability)
|
||||||
await state.switchAsync(SaleProving())
|
await state.switchAsync(SaleProving())
|
||||||
|
@ -25,7 +25,10 @@ method enterAsync*(state: SaleErrored) {.async.} =
|
|||||||
# TODO: if future updates `availability.reusable == true` then
|
# TODO: if future updates `availability.reusable == true` then
|
||||||
# agent.sales.reservations.markUnused, else
|
# agent.sales.reservations.markUnused, else
|
||||||
# agent.sales.reservations.release
|
# agent.sales.reservations.release
|
||||||
|
if (exists =? await agent.sales.reservations.exists(availability.id)) and
|
||||||
|
exists == true:
|
||||||
|
|
||||||
if err =? (await agent.sales.reservations.markUnused(availability)).errorOption:
|
if err =? (await agent.sales.reservations.markUnused(availability)).errorOption:
|
||||||
raiseAssert "Failed to mark availability unused"
|
raiseAssert "Failed to mark availability unused, error: " & err.msg
|
||||||
|
|
||||||
error "Sale error", error=state.error.msg
|
error "Sale error", error=state.error.msg
|
||||||
|
16
tests/codex/sales/helpers.nim
Normal file
16
tests/codex/sales/helpers.nim
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
import std/algorithm
|
||||||
|
|
||||||
|
import pkg/chronos
|
||||||
|
import pkg/questionable
|
||||||
|
import pkg/questionable/results
|
||||||
|
|
||||||
|
import pkg/codex/sales/reservations
|
||||||
|
|
||||||
|
proc allAvailabilities*(r: Reservations): Future[seq[Availability]] {.async.} =
|
||||||
|
var ret: seq[Availability] = @[]
|
||||||
|
without availabilities =? (await r.availabilities), err:
|
||||||
|
raiseAssert "failed to get availabilities, error: " & err.msg
|
||||||
|
for a in availabilities:
|
||||||
|
if availability =? (await a):
|
||||||
|
ret.add availability
|
||||||
|
return ret.reversed()
|
198
tests/codex/sales/testreservations.nim
Normal file
198
tests/codex/sales/testreservations.nim
Normal file
@ -0,0 +1,198 @@
|
|||||||
|
import std/sequtils
|
||||||
|
import std/sugar
|
||||||
|
|
||||||
|
import pkg/questionable
|
||||||
|
import pkg/questionable/results
|
||||||
|
|
||||||
|
import pkg/chronos
|
||||||
|
import pkg/asynctest
|
||||||
|
import pkg/datastore
|
||||||
|
import pkg/json_serialization
|
||||||
|
import pkg/json_serialization/std/options
|
||||||
|
import pkg/stew/byteutils
|
||||||
|
|
||||||
|
import pkg/codex/stores
|
||||||
|
import pkg/codex/sales
|
||||||
|
|
||||||
|
import ../examples
|
||||||
|
import ./helpers
|
||||||
|
|
||||||
|
suite "Reservations module":
|
||||||
|
|
||||||
|
var
|
||||||
|
repo: RepoStore
|
||||||
|
repoDs: Datastore
|
||||||
|
metaDs: Datastore
|
||||||
|
availability: Availability
|
||||||
|
reservations: Reservations
|
||||||
|
|
||||||
|
setup:
|
||||||
|
repoDs = SQLiteDatastore.new(Memory).tryGet()
|
||||||
|
metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||||
|
repo = RepoStore.new(repoDs, metaDs)
|
||||||
|
reservations = Reservations.new(repo, metaDs)
|
||||||
|
availability = Availability.example
|
||||||
|
|
||||||
|
teardown:
|
||||||
|
await reservations.stop()
|
||||||
|
|
||||||
|
test "has no availability initially":
|
||||||
|
check (await reservations.allAvailabilities()).len == 0
|
||||||
|
|
||||||
|
test "generates unique ids for storage availability":
|
||||||
|
let availability1 = Availability.init(1.u256, 2.u256, 3.u256)
|
||||||
|
let availability2 = Availability.init(1.u256, 2.u256, 3.u256)
|
||||||
|
check availability1.id != availability2.id
|
||||||
|
|
||||||
|
test "can reserve available storage":
|
||||||
|
let availability1 = Availability.example
|
||||||
|
let availability2 = Availability.example
|
||||||
|
check isOk await reservations.reserve(availability1)
|
||||||
|
check isOk await reservations.reserve(availability2)
|
||||||
|
|
||||||
|
let availabilities = await reservations.allAvailabilities()
|
||||||
|
check:
|
||||||
|
availabilities.len == 2
|
||||||
|
availabilities.contains(availability1)
|
||||||
|
availabilities.contains(availability2)
|
||||||
|
|
||||||
|
test "reserved availability exists":
|
||||||
|
check isOk await reservations.reserve(availability)
|
||||||
|
|
||||||
|
without exists =? await reservations.exists(availability.id):
|
||||||
|
fail()
|
||||||
|
|
||||||
|
check exists
|
||||||
|
|
||||||
|
test "reserved availability can be released":
|
||||||
|
check isOk await reservations.reserve(availability)
|
||||||
|
check isOk await reservations.release(availability.id)
|
||||||
|
|
||||||
|
without exists =? await reservations.exists(availability.id):
|
||||||
|
fail()
|
||||||
|
|
||||||
|
check not exists
|
||||||
|
|
||||||
|
test "non-existant availability cannot be released":
|
||||||
|
let r = await reservations.release(availability.id)
|
||||||
|
check r.error of AvailabilityGetFailedError
|
||||||
|
check r.error.innerException of AvailabilityNotExistsError
|
||||||
|
|
||||||
|
test "added availability is not used initially":
|
||||||
|
check isOk await reservations.reserve(availability)
|
||||||
|
|
||||||
|
without available =? await reservations.get(availability.id):
|
||||||
|
fail()
|
||||||
|
|
||||||
|
check not available.used
|
||||||
|
|
||||||
|
test "availability can be marked used":
|
||||||
|
check isOk await reservations.reserve(availability)
|
||||||
|
|
||||||
|
check isOk await reservations.markUsed(availability, SlotId.example)
|
||||||
|
|
||||||
|
without available =? await reservations.get(availability.id):
|
||||||
|
fail()
|
||||||
|
|
||||||
|
check available.used
|
||||||
|
|
||||||
|
test "availability can be marked unused":
|
||||||
|
check isOk await reservations.reserve(availability)
|
||||||
|
|
||||||
|
check isOk await reservations.markUsed(availability, SlotId.example)
|
||||||
|
check isOk await reservations.markUnused(availability)
|
||||||
|
|
||||||
|
without available =? await reservations.get(availability.id):
|
||||||
|
fail()
|
||||||
|
|
||||||
|
check not available.used
|
||||||
|
|
||||||
|
test "used availability can be found":
|
||||||
|
check isOk await reservations.reserve(availability)
|
||||||
|
|
||||||
|
check isOk await reservations.markUsed(availability, SlotId.example)
|
||||||
|
|
||||||
|
without available =? await reservations.find(availability.size,
|
||||||
|
availability.duration, availability.minPrice, used = true):
|
||||||
|
|
||||||
|
fail()
|
||||||
|
|
||||||
|
test "unused availability can be found":
|
||||||
|
check isOk await reservations.reserve(availability)
|
||||||
|
|
||||||
|
without available =? await reservations.find(availability.size,
|
||||||
|
availability.duration, availability.minPrice, used = false):
|
||||||
|
|
||||||
|
fail()
|
||||||
|
|
||||||
|
test "non-existant availability cannot be found":
|
||||||
|
check isNone (await reservations.find(availability.size,
|
||||||
|
availability.duration, availability.minPrice, used = false))
|
||||||
|
|
||||||
|
test "non-existant availability cannot be retrieved":
|
||||||
|
let r = await reservations.get(availability.id)
|
||||||
|
check r.error of AvailabilityNotExistsError
|
||||||
|
|
||||||
|
test "same availability cannot be reserved twice":
|
||||||
|
check isOk await reservations.reserve(availability)
|
||||||
|
let r = await reservations.reserve(availability)
|
||||||
|
check r.error of AvailabilityAlreadyExistsError
|
||||||
|
|
||||||
|
test "can get available bytes in repo":
|
||||||
|
check reservations.available == DefaultQuotaBytes
|
||||||
|
|
||||||
|
test "reserving availability reduces available bytes":
|
||||||
|
check isOk await reservations.reserve(availability)
|
||||||
|
check reservations.available ==
|
||||||
|
DefaultQuotaBytes - availability.size.truncate(uint)
|
||||||
|
|
||||||
|
test "reports quota available to be reserved":
|
||||||
|
check reservations.available(availability.size.truncate(uint))
|
||||||
|
|
||||||
|
test "reports quota not available to be reserved":
|
||||||
|
repo = RepoStore.new(repoDs, metaDs,
|
||||||
|
quotaMaxBytes = availability.size.truncate(uint) - 1)
|
||||||
|
reservations = Reservations.new(repo, metaDs)
|
||||||
|
check not reservations.available(availability.size.truncate(uint))
|
||||||
|
|
||||||
|
test "fails to reserve availability size that is larger than available quota":
|
||||||
|
repo = RepoStore.new(repoDs, metaDs,
|
||||||
|
quotaMaxBytes = availability.size.truncate(uint) - 1)
|
||||||
|
reservations = Reservations.new(repo, metaDs)
|
||||||
|
let r = await reservations.reserve(availability)
|
||||||
|
check r.error of AvailabilityReserveFailedError
|
||||||
|
check r.error.innerException of QuotaNotEnoughError
|
||||||
|
|
||||||
|
test "rolls back persisted availability if repo reservation fails":
|
||||||
|
repo = RepoStore.new(repoDs, metaDs,
|
||||||
|
quotaMaxBytes = availability.size.truncate(uint) - 1)
|
||||||
|
reservations = Reservations.new(repo, metaDs)
|
||||||
|
discard await reservations.reserve(availability)
|
||||||
|
check exists =? (await reservations.exists(availability.id)) and not exists
|
||||||
|
|
||||||
|
test "fails to release availability size that is larger than available quota":
|
||||||
|
repo = RepoStore.new(repoDs, metaDs,
|
||||||
|
quotaMaxBytes = availability.size.truncate(uint))
|
||||||
|
reservations = Reservations.new(repo, metaDs)
|
||||||
|
discard await reservations.reserve(availability)
|
||||||
|
# increase size of availability past repo quota, so that the next release
|
||||||
|
# will fail
|
||||||
|
availability.size += 1.u256
|
||||||
|
let key = !(availability.key)
|
||||||
|
check isOk await metaDs.put(key, @(availability.toJson.toBytes))
|
||||||
|
let r = await reservations.release(availability.id)
|
||||||
|
check r.error of AvailabilityReleaseFailedError
|
||||||
|
check r.error.innerException.msg == "Cannot release this many bytes"
|
||||||
|
|
||||||
|
test "rolls back persisted availability if repo release fails":
|
||||||
|
repo = RepoStore.new(repoDs, metaDs,
|
||||||
|
quotaMaxBytes = availability.size.truncate(uint))
|
||||||
|
reservations = Reservations.new(repo, metaDs)
|
||||||
|
discard await reservations.reserve(availability)
|
||||||
|
# increase size of availability past repo quota, so that the next release
|
||||||
|
# will fail
|
||||||
|
availability.size += 1.u256
|
||||||
|
let key = !(availability.key)
|
||||||
|
check isOk await metaDs.put(key, @(availability.toJson.toBytes))
|
||||||
|
discard await reservations.release(availability.id)
|
||||||
|
check exists =? (await reservations.exists(availability.id)) and exists
|
385
tests/codex/sales/teststatemachine.nim
Normal file
385
tests/codex/sales/teststatemachine.nim
Normal file
@ -0,0 +1,385 @@
|
|||||||
|
import std/times
|
||||||
|
import std/sequtils
|
||||||
|
import std/sugar
|
||||||
|
|
||||||
|
import pkg/asynctest
|
||||||
|
import pkg/datastore
|
||||||
|
import pkg/questionable
|
||||||
|
import pkg/questionable/results
|
||||||
|
|
||||||
|
import pkg/codex/sales
|
||||||
|
import pkg/codex/sales/states/[downloading, cancelled, errored, filled, filling,
|
||||||
|
failed, proving, finished, unknown]
|
||||||
|
import pkg/codex/sales/reservations
|
||||||
|
import pkg/codex/sales/statemachine
|
||||||
|
import pkg/codex/stores/repostore
|
||||||
|
|
||||||
|
import ../helpers/mockmarket
|
||||||
|
import ../helpers/mockclock
|
||||||
|
import ../helpers/eventually
|
||||||
|
import ../examples
|
||||||
|
|
||||||
|
suite "Sales state machine":
|
||||||
|
|
||||||
|
let availability = Availability.init(
|
||||||
|
size=100.u256,
|
||||||
|
duration=60.u256,
|
||||||
|
minPrice=600.u256
|
||||||
|
)
|
||||||
|
var request = StorageRequest(
|
||||||
|
ask: StorageAsk(
|
||||||
|
slots: 4,
|
||||||
|
slotSize: 100.u256,
|
||||||
|
duration: 60.u256,
|
||||||
|
reward: 10.u256,
|
||||||
|
),
|
||||||
|
content: StorageContent(
|
||||||
|
cid: "some cid"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
let proof = exampleProof()
|
||||||
|
|
||||||
|
var sales: Sales
|
||||||
|
var market: MockMarket
|
||||||
|
var clock: MockClock
|
||||||
|
var proving: Proving
|
||||||
|
var slotIdx: UInt256
|
||||||
|
var slotId: SlotId
|
||||||
|
|
||||||
|
setup:
|
||||||
|
market = MockMarket.new()
|
||||||
|
clock = MockClock.new()
|
||||||
|
proving = Proving.new()
|
||||||
|
let repoDs = SQLiteDatastore.new(Memory).tryGet()
|
||||||
|
let metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||||
|
let repo = RepoStore.new(repoDs, metaDs)
|
||||||
|
sales = Sales.new(market, clock, proving, repo, metaDs)
|
||||||
|
sales.onStore = proc(request: StorageRequest,
|
||||||
|
slot: UInt256,
|
||||||
|
availability: ?Availability) {.async.} =
|
||||||
|
discard
|
||||||
|
sales.onProve = proc(request: StorageRequest,
|
||||||
|
slot: UInt256): Future[seq[byte]] {.async.} =
|
||||||
|
return proof
|
||||||
|
await sales.start()
|
||||||
|
request.expiry = (clock.now() + 42).u256
|
||||||
|
discard await sales.reservations.reserve(availability)
|
||||||
|
slotIdx = 0.u256
|
||||||
|
slotId = request.slotId(slotIdx)
|
||||||
|
|
||||||
|
teardown:
|
||||||
|
await sales.stop()
|
||||||
|
|
||||||
|
proc newSalesAgent(slotIdx: UInt256 = 0.u256): SalesAgent =
|
||||||
|
let agent = sales.newSalesAgent(request.id,
|
||||||
|
some availability,
|
||||||
|
some slotIdx,
|
||||||
|
some request)
|
||||||
|
return agent
|
||||||
|
|
||||||
|
proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} =
|
||||||
|
let address = await market.getSigner()
|
||||||
|
let slot = MockSlot(requestId: request.id,
|
||||||
|
slotIndex: slotIdx,
|
||||||
|
proof: proof,
|
||||||
|
host: address)
|
||||||
|
market.filled.add slot
|
||||||
|
market.slotState[slotId(request.id, slotIdx)] = SlotState.Filled
|
||||||
|
|
||||||
|
test "moves to SaleErrored when SaleFilled errors":
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
market.slotState[slotId] = SlotState.Free
|
||||||
|
await agent.switchAsync(SaleUnknown())
|
||||||
|
without state =? (agent.state as SaleErrored):
|
||||||
|
fail()
|
||||||
|
check state.error of UnexpectedSlotError
|
||||||
|
check state.error.msg == "slot state on chain should not be 'free'"
|
||||||
|
|
||||||
|
test "moves to SaleFilled>SaleFinished when slot state is Filled":
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await fillSlot()
|
||||||
|
await agent.switchAsync(SaleUnknown())
|
||||||
|
check (agent.state as SaleFinished).isSome
|
||||||
|
|
||||||
|
test "moves to SaleFinished when slot state is Finished":
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await fillSlot()
|
||||||
|
market.slotState[slotId] = SlotState.Finished
|
||||||
|
agent.switch(SaleUnknown())
|
||||||
|
check (agent.state as SaleFinished).isSome
|
||||||
|
|
||||||
|
test "moves to SaleFinished when slot state is Paid":
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
market.slotState[slotId] = SlotState.Paid
|
||||||
|
agent.switch(SaleUnknown())
|
||||||
|
check (agent.state as SaleFinished).isSome
|
||||||
|
|
||||||
|
test "moves to SaleErrored when slot state is Failed":
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
market.slotState[slotId] = SlotState.Failed
|
||||||
|
agent.switch(SaleUnknown())
|
||||||
|
without state =? (agent.state as SaleErrored):
|
||||||
|
fail()
|
||||||
|
check state.error of SaleFailedError
|
||||||
|
check state.error.msg == "Sale failed"
|
||||||
|
|
||||||
|
test "moves to SaleErrored when Downloading and request expires":
|
||||||
|
sales.onStore = proc(request: StorageRequest,
|
||||||
|
slot: UInt256,
|
||||||
|
availability: ?Availability) {.async.} =
|
||||||
|
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
||||||
|
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await agent.start(request.ask.slots)
|
||||||
|
market.requested.add request
|
||||||
|
market.requestState[request.id] = RequestState.New
|
||||||
|
await agent.switchAsync(SaleDownloading())
|
||||||
|
clock.set(request.expiry.truncate(int64))
|
||||||
|
await sleepAsync chronos.seconds(2)
|
||||||
|
|
||||||
|
without state =? (agent.state as SaleErrored):
|
||||||
|
fail()
|
||||||
|
check state.error of SaleTimeoutError
|
||||||
|
check state.error.msg == "Sale cancelled due to timeout"
|
||||||
|
|
||||||
|
test "moves to SaleErrored when Downloading and request fails":
|
||||||
|
sales.onStore = proc(request: StorageRequest,
|
||||||
|
slot: UInt256,
|
||||||
|
availability: ?Availability) {.async.} =
|
||||||
|
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await agent.start(request.ask.slots)
|
||||||
|
market.requested.add request
|
||||||
|
market.requestState[request.id] = RequestState.New
|
||||||
|
await agent.switchAsync(SaleDownloading())
|
||||||
|
market.emitRequestFailed(request.id)
|
||||||
|
await sleepAsync chronos.seconds(2)
|
||||||
|
|
||||||
|
without state =? (agent.state as SaleErrored):
|
||||||
|
fail()
|
||||||
|
check state.error of SaleFailedError
|
||||||
|
check state.error.msg == "Sale failed"
|
||||||
|
|
||||||
|
test "moves to SaleErrored when Filling and request expires":
|
||||||
|
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await agent.start(request.ask.slots)
|
||||||
|
market.requested.add request
|
||||||
|
market.requestState[request.id] = RequestState.New
|
||||||
|
await agent.switchAsync(SaleFilling())
|
||||||
|
clock.set(request.expiry.truncate(int64))
|
||||||
|
await sleepAsync chronos.seconds(2)
|
||||||
|
|
||||||
|
without state =? (agent.state as SaleErrored):
|
||||||
|
fail()
|
||||||
|
check state.error of SaleTimeoutError
|
||||||
|
check state.error.msg == "Sale cancelled due to timeout"
|
||||||
|
|
||||||
|
test "moves to SaleErrored when Filling and request fails":
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await agent.start(request.ask.slots)
|
||||||
|
market.requested.add request
|
||||||
|
market.requestState[request.id] = RequestState.New
|
||||||
|
await agent.switchAsync(SaleFilling())
|
||||||
|
market.emitRequestFailed(request.id)
|
||||||
|
await sleepAsync chronos.seconds(2)
|
||||||
|
|
||||||
|
without state =? (agent.state as SaleErrored):
|
||||||
|
fail()
|
||||||
|
check state.error of SaleFailedError
|
||||||
|
check state.error.msg == "Sale failed"
|
||||||
|
|
||||||
|
test "moves to SaleErrored when Finished and request expires":
|
||||||
|
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await agent.start(request.ask.slots)
|
||||||
|
market.requested.add request
|
||||||
|
market.requestState[request.id] = RequestState.Finished
|
||||||
|
await agent.switchAsync(SaleFinished())
|
||||||
|
clock.set(request.expiry.truncate(int64))
|
||||||
|
await sleepAsync chronos.seconds(2)
|
||||||
|
|
||||||
|
without state =? (agent.state as SaleErrored):
|
||||||
|
fail()
|
||||||
|
check state.error of SaleTimeoutError
|
||||||
|
check state.error.msg == "Sale cancelled due to timeout"
|
||||||
|
|
||||||
|
test "moves to SaleErrored when Finished and request fails":
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await agent.start(request.ask.slots)
|
||||||
|
market.requested.add request
|
||||||
|
market.requestState[request.id] = RequestState.Finished
|
||||||
|
await agent.switchAsync(SaleFinished())
|
||||||
|
market.emitRequestFailed(request.id)
|
||||||
|
await sleepAsync chronos.seconds(2)
|
||||||
|
|
||||||
|
without state =? (agent.state as SaleErrored):
|
||||||
|
fail()
|
||||||
|
check state.error of SaleFailedError
|
||||||
|
check state.error.msg == "Sale failed"
|
||||||
|
|
||||||
|
test "moves to SaleErrored when Proving and request expires":
|
||||||
|
sales.onProve = proc(request: StorageRequest,
|
||||||
|
slot: UInt256): Future[seq[byte]] {.async.} =
|
||||||
|
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
||||||
|
return @[]
|
||||||
|
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await agent.start(request.ask.slots)
|
||||||
|
market.requested.add request
|
||||||
|
market.requestState[request.id] = RequestState.New
|
||||||
|
await agent.switchAsync(SaleProving())
|
||||||
|
clock.set(request.expiry.truncate(int64))
|
||||||
|
await sleepAsync chronos.seconds(2)
|
||||||
|
|
||||||
|
without state =? (agent.state as SaleErrored):
|
||||||
|
fail()
|
||||||
|
check state.error of SaleTimeoutError
|
||||||
|
check state.error.msg == "Sale cancelled due to timeout"
|
||||||
|
|
||||||
|
test "moves to SaleErrored when Proving and request fails":
|
||||||
|
sales.onProve = proc(request: StorageRequest,
|
||||||
|
slot: UInt256): Future[seq[byte]] {.async.} =
|
||||||
|
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
||||||
|
return @[]
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await agent.start(request.ask.slots)
|
||||||
|
market.requested.add request
|
||||||
|
market.requestState[request.id] = RequestState.New
|
||||||
|
await agent.switchAsync(SaleProving())
|
||||||
|
market.emitRequestFailed(request.id)
|
||||||
|
await sleepAsync chronos.seconds(2)
|
||||||
|
|
||||||
|
without state =? (agent.state as SaleErrored):
|
||||||
|
fail()
|
||||||
|
check state.error of SaleFailedError
|
||||||
|
check state.error.msg == "Sale failed"
|
||||||
|
|
||||||
|
test "moves to SaleErrored when Downloading and slot is filled by another host":
|
||||||
|
sales.onStore = proc(request: StorageRequest,
|
||||||
|
slot: UInt256,
|
||||||
|
availability: ?Availability) {.async.} =
|
||||||
|
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await agent.start(request.ask.slots)
|
||||||
|
market.requested.add request
|
||||||
|
market.requestState[request.id] = RequestState.New
|
||||||
|
await agent.switchAsync(SaleDownloading())
|
||||||
|
market.fillSlot(request.id, agent.slotIndex, proof, Address.example)
|
||||||
|
await sleepAsync chronos.seconds(2)
|
||||||
|
|
||||||
|
let state = (agent.state as SaleErrored)
|
||||||
|
check state.isSome
|
||||||
|
check (!state).error.msg == "Slot filled by other host"
|
||||||
|
|
||||||
|
test "moves to SaleErrored when Proving and slot is filled by another host":
|
||||||
|
sales.onProve = proc(request: StorageRequest,
|
||||||
|
slot: UInt256): Future[seq[byte]] {.async.} =
|
||||||
|
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
||||||
|
return @[]
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await agent.start(request.ask.slots)
|
||||||
|
market.requested.add request
|
||||||
|
market.requestState[request.id] = RequestState.New
|
||||||
|
await agent.switchAsync(SaleProving())
|
||||||
|
market.fillSlot(request.id, agent.slotIndex, proof, Address.example)
|
||||||
|
await sleepAsync chronos.seconds(2)
|
||||||
|
|
||||||
|
without state =? (agent.state as SaleErrored):
|
||||||
|
fail()
|
||||||
|
check state.error of HostMismatchError
|
||||||
|
check state.error.msg == "Slot filled by other host"
|
||||||
|
|
||||||
|
test "moves to SaleErrored when Filling and slot is filled by another host":
|
||||||
|
sales.onProve = proc(request: StorageRequest,
|
||||||
|
slot: UInt256): Future[seq[byte]] {.async.} =
|
||||||
|
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
||||||
|
return @[]
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await agent.start(request.ask.slots)
|
||||||
|
market.requested.add request
|
||||||
|
market.requestState[request.id] = RequestState.New
|
||||||
|
market.fillSlot(request.id, agent.slotIndex, proof, Address.example)
|
||||||
|
await agent.switchAsync(SaleFilling())
|
||||||
|
await sleepAsync chronos.seconds(2)
|
||||||
|
|
||||||
|
without state =? (agent.state as SaleErrored):
|
||||||
|
fail()
|
||||||
|
check state.error of HostMismatchError
|
||||||
|
check state.error.msg == "Slot filled by other host"
|
||||||
|
|
||||||
|
test "moves from SaleDownloading to SaleFinished, calling necessary callbacks":
|
||||||
|
var onProveCalled, onStoreCalled, onClearCalled, onSaleCalled: bool
|
||||||
|
sales.onProve = proc(request: StorageRequest,
|
||||||
|
slot: UInt256): Future[seq[byte]] {.async.} =
|
||||||
|
onProveCalled = true
|
||||||
|
return @[]
|
||||||
|
sales.onStore = proc(request: StorageRequest,
|
||||||
|
slot: UInt256,
|
||||||
|
availability: ?Availability) {.async.} =
|
||||||
|
onStoreCalled = true
|
||||||
|
sales.onClear = proc(availability: ?Availability,
|
||||||
|
request: StorageRequest,
|
||||||
|
slotIndex: UInt256) =
|
||||||
|
onClearCalled = true
|
||||||
|
sales.onSale = proc(availability: ?Availability,
|
||||||
|
request: StorageRequest,
|
||||||
|
slotIndex: UInt256) =
|
||||||
|
onSaleCalled = true
|
||||||
|
|
||||||
|
let agent = newSalesAgent()
|
||||||
|
await agent.start(request.ask.slots)
|
||||||
|
market.requested.add request
|
||||||
|
market.requestState[request.id] = RequestState.New
|
||||||
|
await fillSlot(agent.slotIndex)
|
||||||
|
await agent.switchAsync(SaleDownloading())
|
||||||
|
market.emitRequestFulfilled(request.id)
|
||||||
|
await sleepAsync chronos.seconds(2)
|
||||||
|
|
||||||
|
without state =? (agent.state as SaleFinished):
|
||||||
|
fail()
|
||||||
|
check onProveCalled
|
||||||
|
check onStoreCalled
|
||||||
|
check not onClearCalled
|
||||||
|
check onSaleCalled
|
||||||
|
|
||||||
|
test "loads active slots from market":
|
||||||
|
let me = await market.getSigner()
|
||||||
|
|
||||||
|
request.ask.slots = 2
|
||||||
|
market.requested = @[request]
|
||||||
|
market.requestState[request.id] = RequestState.New
|
||||||
|
|
||||||
|
let slot0 = MockSlot(requestId: request.id,
|
||||||
|
slotIndex: 0.u256,
|
||||||
|
proof: proof,
|
||||||
|
host: me)
|
||||||
|
await fillSlot(slot0.slotIndex)
|
||||||
|
|
||||||
|
let slot1 = MockSlot(requestId: request.id,
|
||||||
|
slotIndex: 1.u256,
|
||||||
|
proof: proof,
|
||||||
|
host: me)
|
||||||
|
await fillSlot(slot1.slotIndex)
|
||||||
|
market.activeSlots[me] = @[request.slotId(0.u256), request.slotId(1.u256)]
|
||||||
|
market.requested = @[request]
|
||||||
|
market.activeRequests[me] = @[request.id]
|
||||||
|
|
||||||
|
await sales.load()
|
||||||
|
let expected = SalesAgent(sales: sales,
|
||||||
|
requestId: request.id,
|
||||||
|
availability: none Availability,
|
||||||
|
request: some request)
|
||||||
|
# because sales.load() calls agent.start, we won't know the slotIndex
|
||||||
|
# randomly selected for the agent, and we also won't know the value of
|
||||||
|
# `failed`/`fulfilled`/`cancelled` futures, so we need to compare
|
||||||
|
# the properties we know
|
||||||
|
# TODO: when calling sales.load(), slot index should be restored and not
|
||||||
|
# randomly re-assigned, so this may no longer be needed
|
||||||
|
proc `==` (agent0, agent1: SalesAgent): bool =
|
||||||
|
return agent0.sales == agent1.sales and
|
||||||
|
agent0.requestId == agent1.requestId and
|
||||||
|
agent0.availability == agent1.availability and
|
||||||
|
agent0.request == agent1.request
|
||||||
|
|
||||||
|
check sales.agents.all(agent => agent == expected)
|
@ -1,17 +1,23 @@
|
|||||||
import std/sets
|
|
||||||
import std/sequtils
|
|
||||||
import std/sugar
|
|
||||||
import std/times
|
import std/times
|
||||||
|
|
||||||
import pkg/asynctest
|
import pkg/asynctest
|
||||||
import pkg/chronos
|
import pkg/datastore
|
||||||
|
import pkg/questionable
|
||||||
|
import pkg/questionable/results
|
||||||
|
|
||||||
import pkg/codex/sales
|
import pkg/codex/sales
|
||||||
import pkg/codex/sales/states/[downloading, cancelled, errored, filled, filling,
|
import pkg/codex/sales/reservations
|
||||||
failed, proving, finished, unknown]
|
import pkg/codex/stores/repostore
|
||||||
|
|
||||||
import ./helpers/mockmarket
|
import ./helpers/mockmarket
|
||||||
import ./helpers/mockclock
|
import ./helpers/mockclock
|
||||||
import ./helpers/eventually
|
import ./helpers/eventually
|
||||||
import ./examples
|
import ./examples
|
||||||
|
|
||||||
|
import ./sales/teststatemachine
|
||||||
|
import ./sales/testreservations
|
||||||
|
import ./sales/helpers
|
||||||
|
|
||||||
suite "Sales":
|
suite "Sales":
|
||||||
|
|
||||||
let availability = Availability.init(
|
let availability = Availability.init(
|
||||||
@ -33,16 +39,20 @@ suite "Sales":
|
|||||||
)
|
)
|
||||||
let proof = exampleProof()
|
let proof = exampleProof()
|
||||||
|
|
||||||
var sales: Sales
|
var
|
||||||
var market: MockMarket
|
sales: Sales
|
||||||
var clock: MockClock
|
market: MockMarket
|
||||||
var proving: Proving
|
clock: MockClock
|
||||||
|
proving: Proving
|
||||||
|
|
||||||
setup:
|
setup:
|
||||||
market = MockMarket.new()
|
market = MockMarket.new()
|
||||||
clock = MockClock.new()
|
clock = MockClock.new()
|
||||||
proving = Proving.new()
|
proving = Proving.new()
|
||||||
sales = Sales.new(market, clock, proving)
|
let repoDs = SQLiteDatastore.new(Memory).tryGet()
|
||||||
|
let metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||||
|
let repo = RepoStore.new(repoDs, metaDs)
|
||||||
|
sales = Sales.new(market, clock, proving, repo, metaDs)
|
||||||
sales.onStore = proc(request: StorageRequest,
|
sales.onStore = proc(request: StorageRequest,
|
||||||
slot: UInt256,
|
slot: UInt256,
|
||||||
availability: ?Availability) {.async.} =
|
availability: ?Availability) {.async.} =
|
||||||
@ -56,46 +66,30 @@ suite "Sales":
|
|||||||
teardown:
|
teardown:
|
||||||
await sales.stop()
|
await sales.stop()
|
||||||
|
|
||||||
test "has no availability initially":
|
|
||||||
check sales.available.len == 0
|
|
||||||
|
|
||||||
test "can add available storage":
|
|
||||||
let availability1 = Availability.example
|
|
||||||
let availability2 = Availability.example
|
|
||||||
sales.add(availability1)
|
|
||||||
check sales.available.contains(availability1)
|
|
||||||
sales.add(availability2)
|
|
||||||
check sales.available.contains(availability1)
|
|
||||||
check sales.available.contains(availability2)
|
|
||||||
|
|
||||||
test "can remove available storage":
|
|
||||||
sales.add(availability)
|
|
||||||
sales.remove(availability)
|
|
||||||
check sales.available.len == 0
|
|
||||||
|
|
||||||
test "generates unique ids for storage availability":
|
|
||||||
let availability1 = Availability.init(1.u256, 2.u256, 3.u256)
|
|
||||||
let availability2 = Availability.init(1.u256, 2.u256, 3.u256)
|
|
||||||
check availability1.id != availability2.id
|
|
||||||
|
|
||||||
test "makes storage unavailable when matching request comes in":
|
test "makes storage unavailable when matching request comes in":
|
||||||
sales.add(availability)
|
check isOk await sales.reservations.reserve(availability)
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
check sales.available.len == 0
|
without availability =? await sales.reservations.get(availability.id):
|
||||||
|
fail()
|
||||||
|
check availability.used
|
||||||
|
|
||||||
test "ignores request when no matching storage is available":
|
test "ignores request when no matching storage is available":
|
||||||
sales.add(availability)
|
check isOk await sales.reservations.reserve(availability)
|
||||||
var tooBig = request
|
var tooBig = request
|
||||||
tooBig.ask.slotSize = request.ask.slotSize + 1
|
tooBig.ask.slotSize = request.ask.slotSize + 1
|
||||||
await market.requestStorage(tooBig)
|
await market.requestStorage(tooBig)
|
||||||
check sales.available == @[availability]
|
without availability =? await sales.reservations.get(availability.id):
|
||||||
|
fail()
|
||||||
|
check not availability.used
|
||||||
|
|
||||||
test "ignores request when reward is too low":
|
test "ignores request when reward is too low":
|
||||||
sales.add(availability)
|
check isOk await sales.reservations.reserve(availability)
|
||||||
var tooCheap = request
|
var tooCheap = request
|
||||||
tooCheap.ask.reward = request.ask.reward - 1
|
tooCheap.ask.reward = request.ask.reward - 1
|
||||||
await market.requestStorage(tooCheap)
|
await market.requestStorage(tooCheap)
|
||||||
check sales.available == @[availability]
|
without availability =? await sales.reservations.get(availability.id):
|
||||||
|
fail()
|
||||||
|
check not availability.used
|
||||||
|
|
||||||
test "retrieves and stores data locally":
|
test "retrieves and stores data locally":
|
||||||
var storingRequest: StorageRequest
|
var storingRequest: StorageRequest
|
||||||
@ -108,7 +102,7 @@ suite "Sales":
|
|||||||
storingSlot = slot
|
storingSlot = slot
|
||||||
check availability.isSome
|
check availability.isSome
|
||||||
storingAvailability = !availability
|
storingAvailability = !availability
|
||||||
sales.add(availability)
|
check isOk await sales.reservations.reserve(availability)
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
check storingRequest == request
|
check storingRequest == request
|
||||||
check storingSlot < request.ask.slots.u256
|
check storingSlot < request.ask.slots.u256
|
||||||
@ -120,9 +114,11 @@ suite "Sales":
|
|||||||
slot: UInt256,
|
slot: UInt256,
|
||||||
availability: ?Availability) {.async.} =
|
availability: ?Availability) {.async.} =
|
||||||
raise error
|
raise error
|
||||||
sales.add(availability)
|
check isOk await sales.reservations.reserve(availability)
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
check sales.available == @[availability]
|
without availability =? await sales.reservations.get(availability.id):
|
||||||
|
fail()
|
||||||
|
check not availability.used
|
||||||
|
|
||||||
test "generates proof of storage":
|
test "generates proof of storage":
|
||||||
var provingRequest: StorageRequest
|
var provingRequest: StorageRequest
|
||||||
@ -131,13 +127,13 @@ suite "Sales":
|
|||||||
slot: UInt256): Future[seq[byte]] {.async.} =
|
slot: UInt256): Future[seq[byte]] {.async.} =
|
||||||
provingRequest = request
|
provingRequest = request
|
||||||
provingSlot = slot
|
provingSlot = slot
|
||||||
sales.add(availability)
|
check isOk await sales.reservations.reserve(availability)
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
check provingRequest == request
|
check provingRequest == request
|
||||||
check provingSlot < request.ask.slots.u256
|
check provingSlot < request.ask.slots.u256
|
||||||
|
|
||||||
test "fills a slot":
|
test "fills a slot":
|
||||||
sales.add(availability)
|
check isOk await sales.reservations.reserve(availability)
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
check market.filled.len == 1
|
check market.filled.len == 1
|
||||||
check market.filled[0].requestId == request.id
|
check market.filled[0].requestId == request.id
|
||||||
@ -156,7 +152,7 @@ suite "Sales":
|
|||||||
soldAvailability = a
|
soldAvailability = a
|
||||||
soldRequest = request
|
soldRequest = request
|
||||||
soldSlotIndex = slotIndex
|
soldSlotIndex = slotIndex
|
||||||
sales.add(availability)
|
check isOk await sales.reservations.reserve(availability)
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
check soldAvailability == availability
|
check soldAvailability == availability
|
||||||
check soldRequest == request
|
check soldRequest == request
|
||||||
@ -178,7 +174,7 @@ suite "Sales":
|
|||||||
clearedAvailability = a
|
clearedAvailability = a
|
||||||
clearedRequest = request
|
clearedRequest = request
|
||||||
clearedSlotIndex = slotIndex
|
clearedSlotIndex = slotIndex
|
||||||
sales.add(availability)
|
check isOk await sales.reservations.reserve(availability)
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
check clearedAvailability == availability
|
check clearedAvailability == availability
|
||||||
check clearedRequest == request
|
check clearedRequest == request
|
||||||
@ -190,22 +186,24 @@ suite "Sales":
|
|||||||
slot: UInt256,
|
slot: UInt256,
|
||||||
availability: ?Availability) {.async.} =
|
availability: ?Availability) {.async.} =
|
||||||
await sleepAsync(chronos.hours(1))
|
await sleepAsync(chronos.hours(1))
|
||||||
sales.add(availability)
|
check isOk await sales.reservations.reserve(availability)
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
for slotIndex in 0..<request.ask.slots:
|
for slotIndex in 0..<request.ask.slots:
|
||||||
market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
|
market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
|
||||||
await sleepAsync(chronos.seconds(2))
|
await sleepAsync(chronos.seconds(2))
|
||||||
check sales.available == @[availability]
|
without availabilities =? (await sales.reservations.allAvailabilities):
|
||||||
|
fail()
|
||||||
|
check availabilities == @[availability]
|
||||||
|
|
||||||
test "makes storage available again when request expires":
|
test "makes storage available again when request expires":
|
||||||
sales.onStore = proc(request: StorageRequest,
|
sales.onStore = proc(request: StorageRequest,
|
||||||
slot: UInt256,
|
slot: UInt256,
|
||||||
availability: ?Availability) {.async.} =
|
availability: ?Availability) {.async.} =
|
||||||
await sleepAsync(chronos.hours(1))
|
await sleepAsync(chronos.hours(1))
|
||||||
sales.add(availability)
|
check isOk await sales.reservations.reserve(availability)
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
clock.set(request.expiry.truncate(int64))
|
clock.set(request.expiry.truncate(int64))
|
||||||
check eventually (sales.available == @[availability])
|
check eventually ((await sales.reservations.allAvailabilities) == @[availability])
|
||||||
|
|
||||||
test "adds proving for slot when slot is filled":
|
test "adds proving for slot when slot is filled":
|
||||||
var soldSlotIndex: UInt256
|
var soldSlotIndex: UInt256
|
||||||
@ -214,368 +212,7 @@ suite "Sales":
|
|||||||
slotIndex: UInt256) =
|
slotIndex: UInt256) =
|
||||||
soldSlotIndex = slotIndex
|
soldSlotIndex = slotIndex
|
||||||
check proving.slots.len == 0
|
check proving.slots.len == 0
|
||||||
sales.add(availability)
|
check isOk await sales.reservations.reserve(availability)
|
||||||
await market.requestStorage(request)
|
await market.requestStorage(request)
|
||||||
check proving.slots.len == 1
|
check proving.slots.len == 1
|
||||||
check proving.slots.contains(request.slotId(soldSlotIndex))
|
check proving.slots.contains(request.slotId(soldSlotIndex))
|
||||||
|
|
||||||
suite "Sales state machine":
|
|
||||||
|
|
||||||
let availability = Availability.init(
|
|
||||||
size=100.u256,
|
|
||||||
duration=60.u256,
|
|
||||||
minPrice=600.u256
|
|
||||||
)
|
|
||||||
var request = StorageRequest(
|
|
||||||
ask: StorageAsk(
|
|
||||||
slots: 4,
|
|
||||||
slotSize: 100.u256,
|
|
||||||
duration: 60.u256,
|
|
||||||
reward: 10.u256,
|
|
||||||
),
|
|
||||||
content: StorageContent(
|
|
||||||
cid: "some cid"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
let proof = exampleProof()
|
|
||||||
|
|
||||||
var sales: Sales
|
|
||||||
var market: MockMarket
|
|
||||||
var clock: MockClock
|
|
||||||
var proving: Proving
|
|
||||||
var slotIdx: UInt256
|
|
||||||
var slotId: SlotId
|
|
||||||
|
|
||||||
setup:
|
|
||||||
market = MockMarket.new()
|
|
||||||
clock = MockClock.new()
|
|
||||||
proving = Proving.new()
|
|
||||||
sales = Sales.new(market, clock, proving)
|
|
||||||
sales.onStore = proc(request: StorageRequest,
|
|
||||||
slot: UInt256,
|
|
||||||
availability: ?Availability) {.async.} =
|
|
||||||
discard
|
|
||||||
sales.onProve = proc(request: StorageRequest,
|
|
||||||
slot: UInt256): Future[seq[byte]] {.async.} =
|
|
||||||
return proof
|
|
||||||
await sales.start()
|
|
||||||
request.expiry = (clock.now() + 42).u256
|
|
||||||
slotIdx = 0.u256
|
|
||||||
slotId = slotId(request.id, slotIdx)
|
|
||||||
|
|
||||||
teardown:
|
|
||||||
await sales.stop()
|
|
||||||
|
|
||||||
proc newSalesAgent(slotIdx: UInt256 = 0.u256): SalesAgent =
|
|
||||||
let agent = sales.newSalesAgent(request.id,
|
|
||||||
slotIdx,
|
|
||||||
some availability,
|
|
||||||
some request)
|
|
||||||
return agent
|
|
||||||
|
|
||||||
proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} =
|
|
||||||
let address = await market.getSigner()
|
|
||||||
let slot = MockSlot(requestId: request.id,
|
|
||||||
slotIndex: slotIdx,
|
|
||||||
proof: proof,
|
|
||||||
host: address)
|
|
||||||
market.filled.add slot
|
|
||||||
market.slotState[slotId(request.id, slotIdx)] = SlotState.Filled
|
|
||||||
|
|
||||||
test "moves to SaleErrored when SaleFilled errors":
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
market.slotState[slotId] = SlotState.Free
|
|
||||||
await agent.switchAsync(SaleUnknown())
|
|
||||||
without state =? (agent.state as SaleErrored):
|
|
||||||
fail()
|
|
||||||
check state.error of UnexpectedSlotError
|
|
||||||
check state.error.msg == "slot state on chain should not be 'free'"
|
|
||||||
|
|
||||||
test "moves to SaleFilled>SaleFinished when slot state is Filled":
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await fillSlot()
|
|
||||||
await agent.switchAsync(SaleUnknown())
|
|
||||||
check (agent.state as SaleFinished).isSome
|
|
||||||
|
|
||||||
test "moves to SaleFinished when slot state is Finished":
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await fillSlot()
|
|
||||||
market.slotState[slotId] = SlotState.Finished
|
|
||||||
agent.switch(SaleUnknown())
|
|
||||||
check (agent.state as SaleFinished).isSome
|
|
||||||
|
|
||||||
test "moves to SaleFinished when slot state is Paid":
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
market.slotState[slotId] = SlotState.Paid
|
|
||||||
agent.switch(SaleUnknown())
|
|
||||||
check (agent.state as SaleFinished).isSome
|
|
||||||
|
|
||||||
test "moves to SaleErrored when slot state is Failed":
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
market.slotState[slotId] = SlotState.Failed
|
|
||||||
agent.switch(SaleUnknown())
|
|
||||||
without state =? (agent.state as SaleErrored):
|
|
||||||
fail()
|
|
||||||
check state.error of SaleFailedError
|
|
||||||
check state.error.msg == "Sale failed"
|
|
||||||
|
|
||||||
test "moves to SaleErrored when Downloading and request expires":
|
|
||||||
sales.onStore = proc(request: StorageRequest,
|
|
||||||
slot: UInt256,
|
|
||||||
availability: ?Availability) {.async.} =
|
|
||||||
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
|
||||||
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await agent.start(request.ask.slots)
|
|
||||||
market.requested.add request
|
|
||||||
market.requestState[request.id] = RequestState.New
|
|
||||||
await agent.switchAsync(SaleDownloading())
|
|
||||||
clock.set(request.expiry.truncate(int64))
|
|
||||||
await sleepAsync chronos.seconds(2)
|
|
||||||
|
|
||||||
without state =? (agent.state as SaleErrored):
|
|
||||||
fail()
|
|
||||||
check state.error of SaleTimeoutError
|
|
||||||
check state.error.msg == "Sale cancelled due to timeout"
|
|
||||||
|
|
||||||
test "moves to SaleErrored when Downloading and request fails":
|
|
||||||
sales.onStore = proc(request: StorageRequest,
|
|
||||||
slot: UInt256,
|
|
||||||
availability: ?Availability) {.async.} =
|
|
||||||
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await agent.start(request.ask.slots)
|
|
||||||
market.requested.add request
|
|
||||||
market.requestState[request.id] = RequestState.New
|
|
||||||
await agent.switchAsync(SaleDownloading())
|
|
||||||
market.emitRequestFailed(request.id)
|
|
||||||
await sleepAsync chronos.seconds(2)
|
|
||||||
|
|
||||||
without state =? (agent.state as SaleErrored):
|
|
||||||
fail()
|
|
||||||
check state.error of SaleFailedError
|
|
||||||
check state.error.msg == "Sale failed"
|
|
||||||
|
|
||||||
test "moves to SaleErrored when Filling and request expires":
|
|
||||||
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await agent.start(request.ask.slots)
|
|
||||||
market.requested.add request
|
|
||||||
market.requestState[request.id] = RequestState.New
|
|
||||||
await agent.switchAsync(SaleFilling())
|
|
||||||
clock.set(request.expiry.truncate(int64))
|
|
||||||
await sleepAsync chronos.seconds(2)
|
|
||||||
|
|
||||||
without state =? (agent.state as SaleErrored):
|
|
||||||
fail()
|
|
||||||
check state.error of SaleTimeoutError
|
|
||||||
check state.error.msg == "Sale cancelled due to timeout"
|
|
||||||
|
|
||||||
test "moves to SaleErrored when Filling and request fails":
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await agent.start(request.ask.slots)
|
|
||||||
market.requested.add request
|
|
||||||
market.requestState[request.id] = RequestState.New
|
|
||||||
await agent.switchAsync(SaleFilling())
|
|
||||||
market.emitRequestFailed(request.id)
|
|
||||||
await sleepAsync chronos.seconds(2)
|
|
||||||
|
|
||||||
without state =? (agent.state as SaleErrored):
|
|
||||||
fail()
|
|
||||||
check state.error of SaleFailedError
|
|
||||||
check state.error.msg == "Sale failed"
|
|
||||||
|
|
||||||
test "moves to SaleErrored when Finished and request expires":
|
|
||||||
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await agent.start(request.ask.slots)
|
|
||||||
market.requested.add request
|
|
||||||
market.requestState[request.id] = RequestState.Finished
|
|
||||||
await agent.switchAsync(SaleFinished())
|
|
||||||
clock.set(request.expiry.truncate(int64))
|
|
||||||
await sleepAsync chronos.seconds(2)
|
|
||||||
|
|
||||||
without state =? (agent.state as SaleErrored):
|
|
||||||
fail()
|
|
||||||
check state.error of SaleTimeoutError
|
|
||||||
check state.error.msg == "Sale cancelled due to timeout"
|
|
||||||
|
|
||||||
test "moves to SaleErrored when Finished and request fails":
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await agent.start(request.ask.slots)
|
|
||||||
market.requested.add request
|
|
||||||
market.requestState[request.id] = RequestState.Finished
|
|
||||||
await agent.switchAsync(SaleFinished())
|
|
||||||
market.emitRequestFailed(request.id)
|
|
||||||
await sleepAsync chronos.seconds(2)
|
|
||||||
|
|
||||||
without state =? (agent.state as SaleErrored):
|
|
||||||
fail()
|
|
||||||
check state.error of SaleFailedError
|
|
||||||
check state.error.msg == "Sale failed"
|
|
||||||
|
|
||||||
test "moves to SaleErrored when Proving and request expires":
|
|
||||||
sales.onProve = proc(request: StorageRequest,
|
|
||||||
slot: UInt256): Future[seq[byte]] {.async.} =
|
|
||||||
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
|
||||||
return @[]
|
|
||||||
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await agent.start(request.ask.slots)
|
|
||||||
market.requested.add request
|
|
||||||
market.requestState[request.id] = RequestState.New
|
|
||||||
await agent.switchAsync(SaleProving())
|
|
||||||
clock.set(request.expiry.truncate(int64))
|
|
||||||
await sleepAsync chronos.seconds(2)
|
|
||||||
|
|
||||||
without state =? (agent.state as SaleErrored):
|
|
||||||
fail()
|
|
||||||
check state.error of SaleTimeoutError
|
|
||||||
check state.error.msg == "Sale cancelled due to timeout"
|
|
||||||
|
|
||||||
test "moves to SaleErrored when Proving and request fails":
|
|
||||||
sales.onProve = proc(request: StorageRequest,
|
|
||||||
slot: UInt256): Future[seq[byte]] {.async.} =
|
|
||||||
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
|
||||||
return @[]
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await agent.start(request.ask.slots)
|
|
||||||
market.requested.add request
|
|
||||||
market.requestState[request.id] = RequestState.New
|
|
||||||
await agent.switchAsync(SaleProving())
|
|
||||||
market.emitRequestFailed(request.id)
|
|
||||||
await sleepAsync chronos.seconds(2)
|
|
||||||
|
|
||||||
without state =? (agent.state as SaleErrored):
|
|
||||||
fail()
|
|
||||||
check state.error of SaleFailedError
|
|
||||||
check state.error.msg == "Sale failed"
|
|
||||||
|
|
||||||
test "moves to SaleErrored when Downloading and slot is filled by another host":
|
|
||||||
sales.onStore = proc(request: StorageRequest,
|
|
||||||
slot: UInt256,
|
|
||||||
availability: ?Availability) {.async.} =
|
|
||||||
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await agent.start(request.ask.slots)
|
|
||||||
market.requested.add request
|
|
||||||
market.requestState[request.id] = RequestState.New
|
|
||||||
await agent.switchAsync(SaleDownloading())
|
|
||||||
market.fillSlot(request.id, agent.slotIndex, proof, Address.example)
|
|
||||||
await sleepAsync chronos.seconds(2)
|
|
||||||
|
|
||||||
let state = (agent.state as SaleErrored)
|
|
||||||
check state.isSome
|
|
||||||
check (!state).error.msg == "Slot filled by other host"
|
|
||||||
|
|
||||||
test "moves to SaleErrored when Proving and slot is filled by another host":
|
|
||||||
sales.onProve = proc(request: StorageRequest,
|
|
||||||
slot: UInt256): Future[seq[byte]] {.async.} =
|
|
||||||
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
|
||||||
return @[]
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await agent.start(request.ask.slots)
|
|
||||||
market.requested.add request
|
|
||||||
market.requestState[request.id] = RequestState.New
|
|
||||||
await agent.switchAsync(SaleProving())
|
|
||||||
market.fillSlot(request.id, agent.slotIndex, proof, Address.example)
|
|
||||||
await sleepAsync chronos.seconds(2)
|
|
||||||
|
|
||||||
without state =? (agent.state as SaleErrored):
|
|
||||||
fail()
|
|
||||||
check state.error of HostMismatchError
|
|
||||||
check state.error.msg == "Slot filled by other host"
|
|
||||||
|
|
||||||
test "moves to SaleErrored when Filling and slot is filled by another host":
|
|
||||||
sales.onProve = proc(request: StorageRequest,
|
|
||||||
slot: UInt256): Future[seq[byte]] {.async.} =
|
|
||||||
await sleepAsync(chronos.minutes(1)) # "far" in the future
|
|
||||||
return @[]
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await agent.start(request.ask.slots)
|
|
||||||
market.requested.add request
|
|
||||||
market.requestState[request.id] = RequestState.New
|
|
||||||
market.fillSlot(request.id, agent.slotIndex, proof, Address.example)
|
|
||||||
await agent.switchAsync(SaleFilling())
|
|
||||||
await sleepAsync chronos.seconds(2)
|
|
||||||
|
|
||||||
without state =? (agent.state as SaleErrored):
|
|
||||||
fail()
|
|
||||||
check state.error of HostMismatchError
|
|
||||||
check state.error.msg == "Slot filled by other host"
|
|
||||||
|
|
||||||
test "moves from SaleDownloading to SaleFinished, calling necessary callbacks":
|
|
||||||
var onProveCalled, onStoreCalled, onClearCalled, onSaleCalled: bool
|
|
||||||
sales.onProve = proc(request: StorageRequest,
|
|
||||||
slot: UInt256): Future[seq[byte]] {.async.} =
|
|
||||||
onProveCalled = true
|
|
||||||
return @[]
|
|
||||||
sales.onStore = proc(request: StorageRequest,
|
|
||||||
slot: UInt256,
|
|
||||||
availability: ?Availability) {.async.} =
|
|
||||||
onStoreCalled = true
|
|
||||||
sales.onClear = proc(availability: ?Availability,
|
|
||||||
request: StorageRequest,
|
|
||||||
slotIndex: UInt256) =
|
|
||||||
onClearCalled = true
|
|
||||||
sales.onSale = proc(availability: ?Availability,
|
|
||||||
request: StorageRequest,
|
|
||||||
slotIndex: UInt256) =
|
|
||||||
onSaleCalled = true
|
|
||||||
|
|
||||||
let agent = newSalesAgent()
|
|
||||||
await agent.start(request.ask.slots)
|
|
||||||
market.requested.add request
|
|
||||||
market.requestState[request.id] = RequestState.New
|
|
||||||
await fillSlot(agent.slotIndex)
|
|
||||||
await agent.switchAsync(SaleDownloading())
|
|
||||||
market.emitRequestFulfilled(request.id)
|
|
||||||
await sleepAsync chronos.seconds(2)
|
|
||||||
|
|
||||||
without state =? (agent.state as SaleFinished):
|
|
||||||
fail()
|
|
||||||
check onProveCalled
|
|
||||||
check onStoreCalled
|
|
||||||
check not onClearCalled
|
|
||||||
check onSaleCalled
|
|
||||||
|
|
||||||
test "loads active slots from market":
|
|
||||||
let me = await market.getSigner()
|
|
||||||
|
|
||||||
request.ask.slots = 2
|
|
||||||
market.requested = @[request]
|
|
||||||
market.requestState[request.id] = RequestState.New
|
|
||||||
|
|
||||||
let slot0 = MockSlot(requestId: request.id,
|
|
||||||
slotIndex: 0.u256,
|
|
||||||
proof: proof,
|
|
||||||
host: me)
|
|
||||||
await fillSlot(slot0.slotIndex)
|
|
||||||
|
|
||||||
let slot1 = MockSlot(requestId: request.id,
|
|
||||||
slotIndex: 1.u256,
|
|
||||||
proof: proof,
|
|
||||||
host: me)
|
|
||||||
await fillSlot(slot1.slotIndex)
|
|
||||||
market.activeSlots[me] = @[request.slotId(0.u256), request.slotId(1.u256)]
|
|
||||||
market.requested = @[request]
|
|
||||||
market.activeRequests[me] = @[request.id]
|
|
||||||
|
|
||||||
await sales.load()
|
|
||||||
let expected = SalesAgent(sales: sales,
|
|
||||||
requestId: request.id,
|
|
||||||
availability: none Availability,
|
|
||||||
request: some request)
|
|
||||||
# because sales.load() calls agent.start, we won't know the slotIndex
|
|
||||||
# randomly selected for the agent, and we also won't know the value of
|
|
||||||
# `failed`/`fulfilled`/`cancelled` futures, so we need to compare
|
|
||||||
# the properties we know
|
|
||||||
# TODO: when calling sales.load(), slot index should be restored and not
|
|
||||||
# randomly re-assigned, so this may no longer be needed
|
|
||||||
proc `==` (agent0, agent1: SalesAgent): bool =
|
|
||||||
return agent0.sales == agent1.sales and
|
|
||||||
agent0.requestId == agent1.requestId and
|
|
||||||
agent0.availability == agent1.availability and
|
|
||||||
agent0.request == agent1.request
|
|
||||||
|
|
||||||
check sales.agents.all(agent => agent == expected)
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user