fix(integration): replace polling with started event subscription

This commit is contained in:
Eric 2025-06-06 12:53:07 +10:00
parent c391f03e89
commit 38d8692515
No known key found for this signature in database
3 changed files with 88 additions and 55 deletions

View File

@ -50,6 +50,14 @@ marketplacesuite "Marketplace":
# client requests storage
let cid = (await client.upload(data)).get
var requestStartedFut = Future[void].Raising([CancelledError]).init()
proc onRequestStarted(eventResult: ?!RequestFulfilled) {.raises: [].} =
requestStartedFut.complete()
let startedSubscription =
await marketplace.subscribe(RequestFulfilled, onRequestStarted)
let id = await client.requestStorage(
cid,
duration = 20 * 60.uint64,
@ -61,9 +69,9 @@ marketplacesuite "Marketplace":
tolerance = ecTolerance,
)
check eventuallySafe(
await client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000
)
# wait for request to start
await requestStartedFut
let purchase = (await client.getPurchase(id)).get
check purchase.error == none string
let availabilities = (await host.getAvailabilities()).get
@ -75,6 +83,8 @@ marketplacesuite "Marketplace":
check reservations.len == 3
check reservations[0].requestId == purchase.requestId
await startedSubscription.unsubscribe()
test "node slots gets paid out and rest of tokens are returned to client",
marketplaceConfig:
let size = 0xFFFFFF.uint64
@ -226,23 +236,25 @@ marketplacesuite "Marketplace payouts":
NodeConfigs(
# Uncomment to start Hardhat automatically, typically so logs can be inspected locally
hardhat: HardhatConfig.none,
clients: CodexConfigs.init(nodes = 1)
# .debug() # uncomment to enable console log output.debug()
# .withLogFile()
# # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("node", "erasure")
.some,
providers: CodexConfigs.init(nodes = 1)
# .debug() # uncomment to enable console log output
# .withLogFile()
# # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics(
# "node", "marketplace", "sales", "reservations", "node", "statemachine"
# )
.some,
clients: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics(
"codex", "codex slots builder", "codex slots sampler", "marketplace", "sales",
"statemachine", "slotqueue", "reservations", "erasure", "ethers",
).some,
providers: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics(
"codex", "codex slots builder", "codex slots sampler", "marketplace", "sales",
"statemachine", "slotqueue", "reservations", "erasure", "ethers",
).some,
):
let duration = 20.periods
let expiry = 10.periods
let duration = 6.periods
let expiry = 4.periods
let data = await RandomChunker.example(blocks = blocks)
let client = clients()[0]
let provider = providers()[0]
@ -252,15 +264,15 @@ marketplacesuite "Marketplace payouts":
let startBalanceClient = await token.balanceOf(client.ethAccount)
# provider makes storage available
let datasetSize = datasetSize(blocks, ecNodes, ecTolerance)
let totalAvailabilitySize = (datasetSize div 2).truncate(uint64)
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
echo "slotSize: ", $slotSize.truncate(uint64)
discard await providerApi.postAvailability(
# make availability size small enough that we can't fill all the slots,
# thus causing a cancellation
totalSize = totalAvailabilitySize,
totalSize = slotSize.truncate(uint64),
duration = duration.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = collateralPerByte * totalAvailabilitySize.u256,
totalCollateral = collateralPerByte * slotSize,
)
let cid = (await clientApi.upload(data)).get
@ -270,7 +282,15 @@ marketplacesuite "Marketplace payouts":
assert not eventResult.isErr
slotIdxFilled = some (!eventResult).slotIndex
let subscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
var requestCancelledFut = Future[void].Raising([CancelledError]).init()
proc onRequestCancelled(eventResult: ?!RequestCancelled) {.raises: [].} =
echo "onRequestCancelled ", $eventResult
trace "onRequestCancelled", eventResult
requestCancelledFut.complete()
let cancelledSubscription =
await marketplace.subscribe(RequestCancelled, onRequestCancelled)
let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
# client requests storage but requires multiple slots to host the content
let id = await clientApi.requestStorage(
@ -287,24 +307,25 @@ marketplacesuite "Marketplace payouts":
check eventually(slotIdxFilled.isSome, timeout = expiry.int * 1000)
let slotId = slotId(!(await clientApi.requestId(id)), !slotIdxFilled)
# check eventually(
# await providerApi.saleStateIs(slotId, "SaleCancelled"), timeout = expiry.int * 1000, pollInterval = 100
# )
# wait until sale is cancelled
await ethProvider.advanceTime(expiry.u256)
check eventually(
await providerApi.saleStateIs(slotId, "SaleCancelled"), pollInterval = 100
)
await requestCancelledFut.wait(timeout = (expiry.uint32 + 10'u32).seconds)
await advanceToNextPeriod()
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize
check eventuallySafe (
check eventually (
let endBalanceProvider = (await token.balanceOf(provider.ethAccount))
endBalanceProvider > startBalanceProvider and
endBalanceProvider < startBalanceProvider + expiry.u256 * pricePerSlotPerSecond
)
check eventuallySafe(
check eventually(
(
let endBalanceClient = (await token.balanceOf(client.ethAccount))
let endBalanceProvider = (await token.balanceOf(provider.ethAccount))
@ -314,7 +335,8 @@ marketplacesuite "Marketplace payouts":
timeout = 10 * 1000, # give client a bit of time to withdraw its funds
)
await subscription.unsubscribe()
await filledSubscription.unsubscribe()
await cancelledSubscription.unsubscribe()
test "the collateral is returned after a sale is ignored",
NodeConfigs(
@ -400,5 +422,5 @@ marketplacesuite "Marketplace payouts":
availability.totalRemainingCollateral ==
availableSlots * slotSize * minPricePerBytePerSecond,
timeout = 30 * 1000,
pollInterval = 100
pollInterval = 100,
)

View File

@ -51,6 +51,13 @@ marketplacesuite "Hosts submit regular proofs":
let cid = (await client0.upload(data)).get
var requestStartedFut = Future[void].Raising([CancelledError]).init()
proc onRequestStarted(eventResult: ?!RequestFulfilled) {.raises: [].} =
requestStartedFut.complete()
let startedSubscription =
await marketplace.subscribe(RequestFulfilled, onRequestStarted)
let purchaseId = await client0.requestStorage(
cid,
expiry = expiry,
@ -64,19 +71,18 @@ marketplacesuite "Hosts submit regular proofs":
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
check eventuallySafe(
await client0.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000
)
await requestStartedFut
var proofWasSubmitted = false
proc onProofSubmitted(event: ?!ProofSubmitted) =
proofWasSubmitted = event.isOk
let subscription = await marketplace.subscribe(ProofSubmitted, onProofSubmitted)
let proofSubmittedSubscription = await marketplace.subscribe(ProofSubmitted, onProofSubmitted)
check eventually(proofWasSubmitted, timeout = (duration - expiry).int * 1000)
await subscription.unsubscribe()
await proofSubmittedSubscription.unsubscribe()
await startedSubscription.unsubscribe()
marketplacesuite "Simulate invalid proofs":
# TODO: these are very loose tests in that they are not testing EXACTLY how

View File

@ -8,6 +8,7 @@ import ../contracts/time
import ./codexconfig
import ./codexclient
import ./nodeconfigs
import ./marketplacesuite
proc findItem[T](items: seq[T], item: T): ?!T =
for tmp in items:
@ -16,7 +17,7 @@ proc findItem[T](items: seq[T], item: T): ?!T =
return failure("Not found")
multinodesuite "Sales":
marketplacesuite "Sales":
let salesConfig = NodeConfigs(
clients: CodexConfigs.init(nodes = 1).some,
providers: CodexConfigs.init(nodes = 1)
@ -128,18 +129,16 @@ multinodesuite "Sales":
# Lets create storage request that will utilize some of the availability's space
let cid = (await client.upload(data)).get
let id = (
await client.requestStorage(
cid,
duration = 20 * 60.uint64,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 3.u256,
expiry = (10 * 60).uint64,
collateralPerByte = collateralPerByte,
nodes = 3,
tolerance = 1,
)
).get
let id = await client.requestStorage(
cid,
duration = 20 * 60.uint64,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 3.u256,
expiry = (10 * 60).uint64,
collateralPerByte = collateralPerByte,
nodes = 3,
tolerance = 1,
)
check eventuallySafe(
await client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000
@ -202,6 +201,13 @@ multinodesuite "Sales":
)
).get
var requestStartedFut = Future[void].Raising([CancelledError]).init()
proc onRequestStarted(eventResult: ?!RequestFulfilled) {.raises: [].} =
requestStartedFut.complete()
let startedSubscription =
await marketplace.subscribe(RequestFulfilled, onRequestStarted)
# client requests storage
let cid = (await client.upload(data)).get
let id = (
@ -217,11 +223,8 @@ multinodesuite "Sales":
)
).get
check eventually(
await client.purchaseStateIs(id, "started"),
timeout = 10 * 60 * 1000,
pollInterval = 100,
)
await requestStartedFut
let purchase = (await client.getPurchase(id)).get
check purchase.error == none string
@ -236,3 +239,5 @@ multinodesuite "Sales":
response.status == 422
(await response.body) ==
"Until parameter must be greater or equal to the longest currently hosted slot"
await startedSubscription.unsubscribe()