Apply markplace test changes and use ERC20 subscriptions

This commit is contained in:
Arnaud 2025-06-18 16:56:05 +02:00
parent b8ebaba0d3
commit 86641a6a28
No known key found for this signature in database
GPG Key ID: B8FBC178F10CA7AE
6 changed files with 126 additions and 131 deletions

View File

@ -5,23 +5,49 @@ import pkg/codex/contracts/marketplace as mp
import pkg/codex/periods
import pkg/codex/utils/json
from pkg/codex/utils import roundUp, divUp
import ./multinodes
import ./multinodes except Subscription
import ../contracts/time
import ../contracts/deployment
export mp
export multinodes
template marketplacesuite*(name: string, body: untyped) =
template marketplacesuite*(name: string, stopOnRequestFail: bool, body: untyped) =
multinodesuite name:
var marketplace {.inject, used.}: Marketplace
var period: uint64
var periodicity: Periodicity
var token {.inject, used.}: Erc20Token
var requestStartedEvent: AsyncEvent
var requestStartedSubscription: Subscription
var requestFailedEvent: AsyncEvent
var requestFailedSubscription: Subscription
proc onRequestStarted(eventResult: ?!RequestFulfilled) {.raises: [].} =
requestStartedEvent.fire()
proc onRequestFailed(eventResult: ?!RequestFailed) {.raises: [].} =
requestFailedEvent.fire()
if stopOnRequestFail:
fail()
proc getCurrentPeriod(): Future[Period] {.async.} =
return periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64))
proc waitForRequestToStart(
seconds = 10 * 60 + 10
): Future[Period] {.async: (raises: [CancelledError, AsyncTimeoutError]).} =
await requestStartedEvent.wait().wait(timeout = chronos.seconds(seconds))
# Recreate a new future if we need to wait for another request
requestStartedEvent = newAsyncEvent()
proc waitForRequestToFail(
seconds = (5 * 60) + 10
): Future[Period] {.async: (raises: [CancelledError, AsyncTimeoutError]).} =
await requestFailedEvent.wait().wait(timeout = chronos.seconds(seconds))
# Recreate a new future if we need to wait for another request
requestFailedEvent = newAsyncEvent()
proc advanceToNextPeriod() {.async.} =
let periodicity = Periodicity(seconds: period)
let currentTime = (await ethProvider.currentTime()).truncate(uint64)
@ -30,7 +56,7 @@ template marketplacesuite*(name: string, body: untyped) =
await ethProvider.advanceTimeTo(endOfPeriod.u256 + 1)
template eventuallyP(condition: untyped, finalPeriod: Period): bool =
proc eventuallyP(): Future[bool] {.async.} =
proc eventuallyP(): Future[bool] {.async: (raises: [CancelledError]).} =
while (
let currentPeriod = await getCurrentPeriod()
currentPeriod <= finalPeriod
@ -107,4 +133,17 @@ template marketplacesuite*(name: string, body: untyped) =
period = config.proofs.period
periodicity = Periodicity(seconds: period)
requestStartedEvent = newAsyncEvent()
requestFailedEvent = newAsyncEvent()
requestStartedSubscription =
await marketplace.subscribe(RequestFulfilled, onRequestStarted)
requestFailedSubscription =
await marketplace.subscribe(RequestFailed, onRequestFailed)
teardown:
await requestStartedSubscription.unsubscribe()
await requestFailedSubscription.unsubscribe()
body

View File

@ -4,7 +4,9 @@ import ./marketplacesuite
import ./nodeconfigs
import ./hardhatconfig
marketplacesuite "Bug #821 - node crashes during erasure coding":
marketplacesuite(
name = "Bug #821 - node crashes during erasure coding", stopOnRequestFail = true
):
test "should be able to create storage request and download dataset",
NodeConfigs(
clients: CodexConfigs

View File

@ -7,7 +7,7 @@ import ./marketplacesuite
import ./twonodes
import ./nodeconfigs
marketplacesuite "Marketplace":
marketplacesuite(name = "Marketplace", stopOnRequestFail = true):
let marketplaceConfig = NodeConfigs(
clients: CodexConfigs.init(nodes = 1).some,
providers: CodexConfigs.init(nodes = 1).some,
@ -61,9 +61,8 @@ marketplacesuite "Marketplace":
tolerance = ecTolerance,
)
check eventually(
await client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000
)
discard await waitForRequestToStart()
let purchase = (await client.getPurchase(id)).get
check purchase.error == none string
let availabilities = (await host.getAvailabilities()).get
@ -108,9 +107,8 @@ marketplacesuite "Marketplace":
tolerance = ecTolerance,
)
check eventually(
await client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000
)
discard await waitForRequestToStart()
let purchase = (await client.getPurchase(id)).get
check purchase.error == none string
@ -178,10 +176,7 @@ marketplacesuite "Marketplace":
let requestId = (await client0.client.requestId(purchaseId)).get
# We wait that the 3 slots are filled by the first SP
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"),
timeout = 10 * 60.int * 1000,
)
discard await waitForRequestToStart()
# Here we create the same availability as previously but for the second SP.
# Meaning that, after ignoring all the slots for the first request, the second SP will process
@ -206,15 +201,12 @@ marketplacesuite "Marketplace":
let requestId2 = (await client0.client.requestId(purchaseId2)).get
# Wait that the slots of the second request are filled
check eventually(
await client0.client.purchaseStateIs(purchaseId2, "started"),
timeout = 10 * 60.int * 1000,
)
discard await waitForRequestToStart()
# Double check, verify that our second SP hosts the 3 slots
check eventually ((await provider1.client.getSlots()).get).len == 3
check ((await provider1.client.getSlots()).get).len == 3
marketplacesuite "Marketplace payouts":
marketplacesuite(name = "Marketplace payouts", stopOnRequestFail = true):
const minPricePerBytePerSecond = 1.u256
const collateralPerByte = 1.u256
const blocks = 8
@ -269,7 +261,15 @@ marketplacesuite "Marketplace payouts":
assert not eventResult.isErr
slotIdxFilled = some (!eventResult).slotIndex
let subscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
let slotFilledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
var requestCancelledEvent = newAsyncEvent()
proc onRequestCancelled(eventResult: ?!RequestCancelled) =
assert not eventResult.isErr
requestCancelledEvent.fire()
let requestCancelledSubscription =
await marketplace.subscribe(RequestCancelled, onRequestCancelled)
# client requests storage but requires multiple slots to host the content
let id = await clientApi.requestStorage(
@ -286,36 +286,47 @@ marketplacesuite "Marketplace payouts":
check eventually(slotIdxFilled.isSome, timeout = expiry.int * 1000)
let slotId = slotId(!(await clientApi.requestId(id)), !slotIdxFilled)
var counter = 0
var transferEvent = newAsyncEvent()
proc onTransfer(eventResult: ?!Transfer) =
assert not eventResult.isErr
counter += 1
if counter == 3:
transferEvent.fire()
let tokenAddress = await marketplace.token()
let token = Erc20Token.new(tokenAddress, ethProvider.getSigner())
let tokenSubscription = await token.subscribe(Transfer, onTransfer)
# wait until sale is cancelled
await ethProvider.advanceTime(expiry.u256)
check eventually(
await providerApi.saleStateIs(slotId, "SaleCancelled"),
timeout = 5 * 1000,
pollInterval = 200,
)
await requestCancelledEvent.wait().wait(timeout = chronos.seconds(5))
await advanceToNextPeriod()
await transferEvent.wait().wait(timeout = chronos.seconds(60))
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize
check eventually (
check (
let endBalanceProvider = (await token.balanceOf(provider.ethAccount))
endBalanceProvider > startBalanceProvider and
endBalanceProvider < startBalanceProvider + expiry.u256 * pricePerSlotPerSecond
)
check eventually(
check (
(
let endBalanceClient = (await token.balanceOf(client.ethAccount))
let endBalanceProvider = (await token.balanceOf(provider.ethAccount))
(startBalanceClient - endBalanceClient) ==
(endBalanceProvider - startBalanceProvider)
),
timeout = 10 * 1000, # give client a bit of time to withdraw its funds
)
)
await subscription.unsubscribe()
await slotFilledSubscription.unsubscribe()
await requestCancelledSubscription.unsubscribe()
await tokenSubscription.unsubscribe()
test "the collateral is returned after a sale is ignored",
NodeConfigs(
@ -326,9 +337,7 @@ marketplacesuite "Marketplace payouts":
# uncomment to enable console log output
# .withLogFile()
# uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics(
# "node", "marketplace", "sales", "reservations", "statemachine"
# )
# .withLogTopics("node", "marketplace", "sales", "reservations", "statemachine")
.some,
):
let data = await RandomChunker.example(blocks = blocks)
@ -336,9 +345,11 @@ marketplacesuite "Marketplace payouts":
let provider0 = providers()[0]
let provider1 = providers()[1]
let provider2 = providers()[2]
let duration = 20 * 60.uint64
let duration = 30.uint64
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
await marketplace.subscribe(RequestFulfilled, onRequestStarted)
# Here we create 3 SP which can host 3 slot.
# While they will process the slot, each SP will
# create a reservation for each slot.
@ -379,10 +390,7 @@ marketplacesuite "Marketplace payouts":
let requestId = (await client0.client.requestId(purchaseId)).get
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"),
timeout = 10 * 60.int * 1000,
)
discard await waitForRequestToStart()
# Here we will check that for each provider, the total remaining collateral
# will match the available slots.
@ -390,7 +398,7 @@ marketplacesuite "Marketplace payouts":
# to host 2 more slots.
for provider in providers():
let client = provider.client
check eventually(
check(
block:
let availabilities = (await client.getAvailabilities()).get
let availability = availabilities[0]

View File

@ -13,7 +13,7 @@ export logutils
logScope:
topics = "integration test proofs"
marketplacesuite "Hosts submit regular proofs":
marketplacesuite(name = "Hosts submit regular proofs", stopOnRequestFail = false):
const minPricePerBytePerSecond = 1.u256
const collateralPerByte = 1.u256
const blocks = 8
@ -64,9 +64,7 @@ marketplacesuite "Hosts submit regular proofs":
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
check eventually(
await client0.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000
)
discard await waitForRequestToStart(expiry.int)
var proofWasSubmitted = false
proc onProofSubmitted(event: ?!ProofSubmitted) =
@ -78,7 +76,7 @@ marketplacesuite "Hosts submit regular proofs":
await subscription.unsubscribe()
marketplacesuite "Simulate invalid proofs":
marketplacesuite(name = "Simulate invalid proofs", stopOnRequestFail = false):
# TODO: these are very loose tests in that they are not testing EXACTLY how
# proofs were marked as missed by the validator. These tests should be
# tightened so that they are showing, as an integration test, that specific
@ -102,13 +100,19 @@ marketplacesuite "Simulate invalid proofs":
providers: CodexConfigs
.init(nodes = 1)
.withSimulateProofFailures(idx = 0, failEveryNProofs = 1)
# .debug() # uncomment to enable console log output
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("marketplace", "sales", "reservations", "node", "clock", "slotsbuilder")
# .debug()
# uncomment to enable console log output
# .withLogFile()
# uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics(
# "marketplace", "sales", "reservations", "node", "clock", "slotsbuilder"
# )
.some,
validators: CodexConfigs.init(nodes = 1)
# .debug() # uncomment to enable console log output
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .debug()
# uncomment to enable console log output
# .withLogFile()
# uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("validator", "onchain", "ethers", "clock")
.some,
):
@ -140,9 +144,7 @@ marketplacesuite "Simulate invalid proofs":
)
let requestId = (await client0.requestId(purchaseId)).get
check eventually(
await client0.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000
)
discard await waitForRequestToStart(expiry.int)
var slotWasFreed = false
proc onSlotFreed(event: ?!SlotFreed) =
@ -350,5 +352,3 @@ marketplacesuite "Simulate invalid proofs":
# (await token.balanceOf(provider1.ethAccount)) >
# (await token.balanceOf(provider0.ethAccount))
# )
# await subscription.unsubscribe()

View File

@ -8,6 +8,7 @@ import ../contracts/time
import ./codexconfig
import ./codexclient
import ./nodeconfigs
import ./marketplacesuite
proc findItem[T](items: seq[T], item: T): ?!T =
for tmp in items:
@ -16,7 +17,7 @@ proc findItem[T](items: seq[T], item: T): ?!T =
return failure("Not found")
multinodesuite "Sales":
marketplacesuite(name = "Sales", stopOnRequestFail = true):
let salesConfig = NodeConfigs(
clients: CodexConfigs.init(nodes = 1).some,
providers: CodexConfigs.init(nodes = 1)
@ -128,22 +129,19 @@ multinodesuite "Sales":
# Lets create storage request that will utilize some of the availability's space
let cid = (await client.upload(data)).get
let id = (
await client.requestStorage(
cid,
duration = 20 * 60.uint64,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 3.u256,
expiry = (10 * 60).uint64,
collateralPerByte = collateralPerByte,
nodes = 3,
tolerance = 1,
)
).get
check eventually(
await client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000
let id = await client.requestStorage(
cid,
duration = 20 * 60.uint64,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 3.u256,
expiry = (10 * 60).uint64,
collateralPerByte = collateralPerByte,
nodes = 3,
tolerance = 1,
)
discard await waitForRequestToStart()
let updatedAvailability =
((await host.getAvailabilities()).get).findItem(availability).get
check updatedAvailability.totalSize != updatedAvailability.freeSize
@ -217,9 +215,8 @@ multinodesuite "Sales":
)
).get
check eventually(
await client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000
)
discard await waitForRequestToStart()
let purchase = (await client.getPurchase(id)).get
check purchase.error == none string

View File

@ -15,25 +15,7 @@ export logutils
logScope:
topics = "integration test validation"
template eventuallyS(
expression: untyped, timeout = 10, step = 5, cancelExpression: untyped = false
): bool =
bind Moment, now, seconds
proc eventuallyS(): Future[bool] {.async.} =
let endTime = Moment.now() + timeout.seconds
var secondsElapsed = 0
while not expression:
if endTime < Moment.now():
return false
if cancelExpression:
return false
await sleepAsync(step.seconds)
return true
await eventuallyS()
marketplacesuite "Validation":
marketplacesuite(name = "Validation", stopOnRequestFail = false):
const blocks = 8
const ecNodes = 3
const ecTolerance = 1
@ -42,21 +24,6 @@ marketplacesuite "Validation":
const collateralPerByte = 1.u256
const minPricePerBytePerSecond = 1.u256
proc waitForRequestToFail(
marketplace: Marketplace, requestId: RequestId, timeout = 10, step = 5
): Future[bool] {.async.} =
let endTime = Moment.now() + timeout.seconds
var requestState = await marketplace.requestState(requestId)
while requestState != RequestState.Failed:
if endTime < Moment.now():
return false
if requestState != RequestState.Started:
return false
await sleepAsync(step.seconds)
requestState = await marketplace.requestState(requestId)
return true
test "validator marks proofs as missing when using validation groups",
NodeConfigs(
# Uncomment to start Hardhat automatically, typically so logs can be inspected locally
@ -119,14 +86,7 @@ marketplacesuite "Validation":
debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId
if not eventuallyS(
await client0.purchaseStateIs(purchaseId, "started"),
timeout = (expiry + 60).int,
step = 5,
):
debug "validation suite: timed out waiting for the purchase to start"
fail()
return
discard await waitForRequestToStart(expiry.int + 60)
discard await ethProvider.send("evm_mine")
currentTime = await ethProvider.currentTime()
@ -134,9 +94,7 @@ marketplacesuite "Validation":
debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds
check await marketplace.waitForRequestToFail(
requestId, timeout = secondsTillRequestEnd + 60, step = 5
)
discard await waitForRequestToFail(secondsTillRequestEnd + 60)
test "validator uses historical state to mark missing proofs",
NodeConfigs(
@ -189,14 +147,7 @@ marketplacesuite "Validation":
debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId
if not eventuallyS(
await client0.purchaseStateIs(purchaseId, "started"),
timeout = (expiry + 60).int,
step = 5,
):
debug "validation suite: timed out waiting for the purchase to start"
fail()
return
discard await waitForRequestToStart(expiry.int + 60)
# extra block just to make sure we have one that separates us
# from the block containing the last (past) SlotFilled event
@ -223,6 +174,4 @@ marketplacesuite "Validation":
debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds
check await marketplace.waitForRequestToFail(
requestId, timeout = secondsTillRequestEnd + 60, step = 5
)
discard await waitForRequestToFail(secondsTillRequestEnd + 60)