fix: load slots on sales module start (#510)

This commit is contained in:
Adam Uhlíř 2023-08-15 11:39:49 +02:00 committed by GitHub
parent 7efa9177df
commit 39efac1a97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 111 additions and 25 deletions

View File

@ -42,7 +42,7 @@ export stint
export reservations export reservations
logScope: logScope:
topics = "sales" topics = "sales marketplace"
type type
Sales* = ref object Sales* = ref object
@ -121,6 +121,7 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
let slotIds = await market.mySlots() let slotIds = await market.mySlots()
var slots: seq[Slot] = @[] var slots: seq[Slot] = @[]
info "Loading active slots", slotsCount = len(slots)
for slotId in slotIds: for slotId in slotIds:
if slot =? (await market.getActiveSlot(slotId)): if slot =? (await market.getActiveSlot(slotId)):
slots.add slot slots.add slot
@ -393,6 +394,7 @@ proc unsubscribe(sales: Sales) {.async.} =
proc start*(sales: Sales) {.async.} = proc start*(sales: Sales) {.async.} =
await sales.startSlotQueue() await sales.startSlotQueue()
await sales.subscribe() await sales.subscribe()
await sales.load()
proc stop*(sales: Sales) {.async.} = proc stop*(sales: Sales) {.async.} =
trace "stopping sales" trace "stopping sales"

View File

@ -22,6 +22,95 @@ import ../helpers/eventually
import ../examples import ../examples
import ./helpers import ./helpers
asyncchecksuite "Sales - start":
let proof = exampleProof()
var request: StorageRequest
var sales: Sales
var market: MockMarket
var clock: MockClock
var proving: Proving
var reservations: Reservations
var repo: RepoStore
var queue: SlotQueue
var itemsProcessed: seq[SlotQueueItem]
setup:
request = StorageRequest(
ask: StorageAsk(
slots: 4,
slotSize: 100.u256,
duration: 60.u256,
reward: 10.u256,
collateral: 200.u256,
),
content: StorageContent(
cid: "some cid"
),
expiry: (getTime() + initDuration(hours=1)).toUnix.u256
)
market = MockMarket.new()
clock = MockClock.new()
proving = Proving.new()
let repoDs = SQLiteDatastore.new(Memory).tryGet()
let metaDs = SQLiteDatastore.new(Memory).tryGet()
repo = RepoStore.new(repoDs, metaDs)
await repo.start()
sales = Sales.new(market, clock, proving, repo)
reservations = sales.context.reservations
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
return success()
queue = sales.context.slotQueue
proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} =
return proof
itemsProcessed = @[]
request.expiry = (clock.now() + 42).u256
teardown:
await sales.stop()
await repo.stop()
proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} =
let address = await market.getSigner()
let slot = MockSlot(requestId: request.id,
slotIndex: slotIdx,
proof: proof,
host: address)
market.filled.add slot
market.slotState[slotId(request.id, slotIdx)] = SlotState.Filled
test "load slots when Sales module starts":
let me = await market.getSigner()
request.ask.slots = 2
market.requested = @[request]
market.requestState[request.id] = RequestState.New
let slot0 = MockSlot(requestId: request.id,
slotIndex: 0.u256,
proof: proof,
host: me)
await fillSlot(slot0.slotIndex)
let slot1 = MockSlot(requestId: request.id,
slotIndex: 1.u256,
proof: proof,
host: me)
await fillSlot(slot1.slotIndex)
market.activeSlots[me] = @[request.slotId(0.u256), request.slotId(1.u256)]
market.requested = @[request]
market.activeRequests[me] = @[request.id]
await sales.start()
check eventually sales.agents.len == 2
check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 0.u256)
check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256)
asyncchecksuite "Sales": asyncchecksuite "Sales":
let proof = exampleProof() let proof = exampleProof()
@ -58,6 +147,10 @@ asyncchecksuite "Sales":
) )
market = MockMarket.new() market = MockMarket.new()
let me = await market.getSigner()
market.activeSlots[me] = @[]
clock = MockClock.new() clock = MockClock.new()
proving = Proving.new() proving = Proving.new()
let repoDs = SQLiteDatastore.new(Memory).tryGet() let repoDs = SQLiteDatastore.new(Memory).tryGet()
@ -113,7 +206,7 @@ asyncchecksuite "Sales":
check isOk await reservations.reserve(availability) check isOk await reservations.reserve(availability)
await market.requestStorage(request) await market.requestStorage(request)
let items = SlotQueueItem.init(request) let items = SlotQueueItem.init(request)
check eventuallyCheck items.allIt(itemsProcessed.contains(it)) check eventually items.allIt(itemsProcessed.contains(it))
test "removes slots from slot queue once RequestCancelled emitted": test "removes slots from slot queue once RequestCancelled emitted":
let request1 = await addRequestToSaturatedQueue() let request1 = await addRequestToSaturatedQueue()
@ -146,7 +239,7 @@ asyncchecksuite "Sales":
market.emitSlotFreed(request.id, 2.u256) market.emitSlotFreed(request.id, 2.u256)
let expected = SlotQueueItem.init(request, 2.uint16) let expected = SlotQueueItem.init(request, 2.uint16)
check eventuallyCheck itemsProcessed.contains(expected) check eventually itemsProcessed.contains(expected)
test "request slots are not added to the slot queue when no availabilities exist": test "request slots are not added to the slot queue when no availabilities exist":
var itemsProcessed: seq[SlotQueueItem] = @[] var itemsProcessed: seq[SlotQueueItem] = @[]
@ -199,7 +292,7 @@ asyncchecksuite "Sales":
check isOk await reservations.reserve(availability) check isOk await reservations.reserve(availability)
await market.requestStorage(request) await market.requestStorage(request)
check eventuallyCheck used check eventually used
test "reduces remaining availability size after download": test "reduces remaining availability size after download":
let blk = bt.Block.example let blk = bt.Block.example
@ -212,7 +305,7 @@ asyncchecksuite "Sales":
return success() return success()
check isOk await reservations.reserve(availability) check isOk await reservations.reserve(availability)
await market.requestStorage(request) await market.requestStorage(request)
check eventuallyCheck getAvailability().?size == success 1.u256 check eventually getAvailability().?size == success 1.u256
test "ignores download when duration not long enough": test "ignores download when duration not long enough":
availability.duration = request.ask.duration - 1 availability.duration = request.ask.duration - 1
@ -265,7 +358,7 @@ asyncchecksuite "Sales":
return success() return success()
check isOk await reservations.reserve(availability) check isOk await reservations.reserve(availability)
await market.requestStorage(request) await market.requestStorage(request)
check eventuallyCheck storingRequest == request check eventually storingRequest == request
check storingSlot < request.ask.slots.u256 check storingSlot < request.ask.slots.u256
test "handles errors during state run": test "handles errors during state run":
@ -280,7 +373,7 @@ asyncchecksuite "Sales":
saleFailed = true saleFailed = true
check isOk await reservations.reserve(availability) check isOk await reservations.reserve(availability)
await market.requestStorage(request) await market.requestStorage(request)
check eventuallyCheck saleFailed check eventually saleFailed
test "makes storage available again when data retrieval fails": test "makes storage available again when data retrieval fails":
let error = newException(IOError, "data retrieval failed") let error = newException(IOError, "data retrieval failed")
@ -290,7 +383,7 @@ asyncchecksuite "Sales":
return failure(error) return failure(error)
check isOk await reservations.reserve(availability) check isOk await reservations.reserve(availability)
await market.requestStorage(request) await market.requestStorage(request)
check eventuallyCheck getAvailability().?used == success false check eventually getAvailability().?used == success false
check getAvailability().?size == success availability.size check getAvailability().?size == success availability.size
test "generates proof of storage": test "generates proof of storage":
@ -301,7 +394,7 @@ asyncchecksuite "Sales":
provingSlot = slot.slotIndex provingSlot = slot.slotIndex
check isOk await reservations.reserve(availability) check isOk await reservations.reserve(availability)
await market.requestStorage(request) await market.requestStorage(request)
check eventuallyCheck provingRequest == request check eventually provingRequest == request
check provingSlot < request.ask.slots.u256 check provingSlot < request.ask.slots.u256
test "fills a slot": test "fills a slot":
@ -325,7 +418,7 @@ asyncchecksuite "Sales":
soldSlotIndex = slotIndex soldSlotIndex = slotIndex
check isOk await reservations.reserve(availability) check isOk await reservations.reserve(availability)
await market.requestStorage(request) await market.requestStorage(request)
check eventuallyCheck soldAvailability == availability check eventually soldAvailability == availability
check soldRequest == request check soldRequest == request
check soldSlotIndex < request.ask.slots.u256 check soldSlotIndex < request.ask.slots.u256
@ -342,7 +435,7 @@ asyncchecksuite "Sales":
clearedSlotIndex = slotIndex clearedSlotIndex = slotIndex
check isOk await reservations.reserve(availability) check isOk await reservations.reserve(availability)
await market.requestStorage(request) await market.requestStorage(request)
check eventuallyCheck clearedRequest == request check eventually clearedRequest == request
check clearedSlotIndex < request.ask.slots.u256 check clearedSlotIndex < request.ask.slots.u256
test "makes storage available again when other host fills the slot": test "makes storage available again when other host fills the slot":
@ -356,7 +449,7 @@ asyncchecksuite "Sales":
await market.requestStorage(request) await market.requestStorage(request)
for slotIndex in 0..<request.ask.slots: for slotIndex in 0..<request.ask.slots:
market.fillSlot(request.id, slotIndex.u256, proof, otherHost) market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
check eventuallyCheck (await reservations.allAvailabilities) == @[availability] check eventually (await reservations.allAvailabilities) == @[availability]
test "makes storage available again when request expires": test "makes storage available again when request expires":
sales.onStore = proc(request: StorageRequest, sales.onStore = proc(request: StorageRequest,
@ -367,7 +460,7 @@ asyncchecksuite "Sales":
check isOk await reservations.reserve(availability) check isOk await reservations.reserve(availability)
await market.requestStorage(request) await market.requestStorage(request)
clock.set(request.expiry.truncate(int64)) clock.set(request.expiry.truncate(int64))
check eventuallyCheck (await reservations.allAvailabilities) == @[availability] check eventually (await reservations.allAvailabilities) == @[availability]
test "adds proving for slot when slot is filled": test "adds proving for slot when slot is filled":
var soldSlotIndex: UInt256 var soldSlotIndex: UInt256
@ -412,16 +505,7 @@ asyncchecksuite "Sales":
market.activeRequests[me] = @[request.id] market.activeRequests[me] = @[request.id]
await sales.load() await sales.load()
let expected = SalesData(requestId: request.id, request: some request)
# because sales.load() calls agent.start, we won't know the slotIndex
# randomly selected for the agent, and we also won't know the value of
# `failed`/`fulfilled`/`cancelled` futures, so we need to compare
# the properties we know
# TODO: when calling sales.load(), slot index should be restored and not
# randomly re-assigned, so this may no longer be needed
proc `==` (data0, data1: SalesData): bool =
return data0.requestId == data1.requestId and
data0.request == data1.request
check eventuallyCheck sales.agents.len == 2 check eventually sales.agents.len == 2
check sales.agents.all(agent => agent.data == expected) check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 0.u256)
check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256)