diff --git a/tests/integration/1_minute/testecbug.nim b/tests/integration/1_minute/testecbug.nim index 4136adbc..d1980c5a 100644 --- a/tests/integration/1_minute/testecbug.nim +++ b/tests/integration/1_minute/testecbug.nim @@ -16,42 +16,29 @@ marketplacesuite(name = "Bug #821 - node crashes during erasure coding"): providers: CodexConfigs.init(nodes = 0).some, ): let - pricePerBytePerSecond = 1.u256 duration = 20.periods - collateralPerByte = 1.u256 expiry = 10.periods - data = await RandomChunker.example(blocks = 8) client = clients()[0] clientApi = client.client + data = await RandomChunker.example(blocks = 8) - let cid = (await clientApi.upload(data)).get - - var requestId = none RequestId - proc onStorageRequested(eventResult: ?!StorageRequested) = - assert not eventResult.isErr - requestId = some (!eventResult).requestId - - let subscription = await marketplace.subscribe(StorageRequested, onStorageRequested) - - # client requests storage but requires multiple slots to host the content - let id = await clientApi.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = pricePerBytePerSecond, - expiry = expiry, - collateralPerByte = collateralPerByte, - nodes = 3, - tolerance = 1, + let (purchaseId, requestId) = await requestStorage( + clientApi, duration = duration, expiry = expiry, data = data.some ) - check eventually(requestId.isSome, timeout = expiry.int * 1000) + let storageRequestedEvent = newAsyncEvent() + + proc onStorageRequested(eventResult: ?!StorageRequested) = + assert not eventResult.isErr + storageRequestedEvent.fire() + + await marketplaceSubscribe(StorageRequested, onStorageRequested) + await storageRequestedEvent.wait().wait(timeout = chronos.seconds(expiry.int64)) let - request = await marketplace.getRequest(requestId.get) + request = await marketplace.getRequest(requestId) cidFromRequest = request.content.cid downloaded = await clientApi.downloadBytes(cidFromRequest, local = true) check downloaded.isOk check downloaded.get.toHex == data.toHex - - await subscription.unsubscribe() diff --git a/tests/integration/30_minutes/testmarketplace.nim b/tests/integration/30_minutes/testmarketplace.nim index 4250af48..235abcdd 100644 --- a/tests/integration/30_minutes/testmarketplace.nim +++ b/tests/integration/30_minutes/testmarketplace.nim @@ -2,9 +2,10 @@ import std/times import ../../examples import ../../contracts/time import ../../contracts/deployment -import ./../marketplacesuite -import ../twonodes +import ./../marketplacesuite except Subscription +import ../twonodes except Subscription import ../nodeconfigs +from pkg/ethers import Subscription marketplacesuite(name = "Marketplace"): let marketplaceConfig = NodeConfigs( @@ -22,6 +23,11 @@ marketplacesuite(name = "Marketplace"): const blocks = 8 const ecNodes = 3 const ecTolerance = 1 + const size = 0xFFFFFF.uint64 + const slotBytes = slotSize(blocks, ecNodes, ecTolerance) + const duration = 20 * 60.uint64 + const expiry = 10 * 60.uint64 + const pricePerSlotPerSecond = minPricePerBytePerSecond * slotBytes setup: host = providers()[0].client @@ -35,45 +41,33 @@ marketplacesuite(name = "Marketplace"): await ethProvider.advanceTime(1.u256) test "nodes negotiate contracts on the marketplace", marketplaceConfig: - let size = 0xFFFFFF.uint64 - let data = await RandomChunker.example(blocks = blocks) # host makes storage available let availability = ( await host.postAvailability( totalSize = size, - duration = 20 * 60.uint64, + duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, totalCollateral = size.u256 * minPricePerBytePerSecond, ) ).get # client requests storage - let cid = (await client.upload(data)).get - let id = await client.requestStorage( - cid, - duration = 20 * 60.uint64, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) + let (purchaseId, requestId) = await requestStorage(client) - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) - let purchase = (await client.getPurchase(id)).get + let purchase = (await client.getPurchase(purchaseId)).get check purchase.error == none string - let state = await marketplace.requestState(purchase.requestId) + let state = await marketplace.requestState(requestId) check state == RequestState.Started let availabilities = (await host.getAvailabilities()).get check availabilities.len == 1 - let datasetSize = datasetSize(blocks, ecNodes, ecTolerance) let newSize = availabilities[0].freeSize - check newSize > 0 and newSize + datasetSize == size + let datasetSize = datasetSize(blocks, ecNodes, ecTolerance) + check newSize > 0 and newSize.u256 + datasetSize == size.u256 let reservations = (await host.getAvailabilityReservations(availability.id)).get check reservations.len == 3 @@ -90,46 +84,25 @@ marketplacesuite(name = "Marketplace"): test "node slots gets paid out and rest of tokens are returned to client", marketplaceConfig: - let size = 0xFFFFFF.uint64 - let data = await RandomChunker.example(blocks = blocks) - let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) - let tokenAddress = await marketplace.token() - let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) - let duration = 20 * 60.uint64 + var providerRewardEvent = newAsyncEvent() + var clientFundsEvent = newAsyncEvent() + var transferEvent = newAsyncEvent() + var filledAtPerSlot: seq[UInt256] = @[] + var requestId: RequestId # host makes storage available let startBalanceHost = await token.balanceOf(hostAccount) let startBalanceClient = await token.balanceOf(clientAccount) - discard ( - await host.postAvailability( - totalSize = size, - duration = 20 * 60.uint64, - minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = size.u256 * minPricePerBytePerSecond, - ) - ).get - - let slotSize = slotSize(blocks, ecNodes, ecTolerance) - - var filledAtPerSlot: seq[UInt256] = @[] proc storeFilledAtTimestamps() {.async.} = let filledAt = await ethProvider.blockTime(BlockTag.latest) filledAtPerSlot.add(filledAt) - proc onSlotFilled(eventResult: ?!SlotFilled) = + proc onSlotFilled(eventResult: ?!SlotFilled) {.raises: [].} = assert not eventResult.isErr let event = !eventResult asyncSpawn storeFilledAtTimestamps() - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - - # client requests storage - let cid = (await client.upload(data)).get - - let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize - var providerRewardEvent = newAsyncEvent() - proc checkProviderRewards() {.async.} = let endBalanceHost = await token.balanceOf(hostAccount) let requestEnd = await marketplace.requestEnd(requestId) @@ -140,23 +113,19 @@ marketplacesuite(name = "Marketplace"): if rewards + startBalanceHost == endBalanceHost: providerRewardEvent.fire() - var clientFundsEvent = newAsyncEvent() + proc checkClientFunds() {.async.} = + let requestEnd = await marketplace.requestEnd(requestId) + let hostRewards = filledAtPerSlot + .mapIt((requestEnd.u256 - it) * pricePerSlotPerSecond) + .foldl(a + b, 0.u256) - proc checkHostFunds() {.async.} = - var hostRewards = 0.u256 - - for filledAt in filledAtPerSlot: - hostRewards += (requestEnd.u256 - filledAt) * pricePerSlotPerSecond - - let requestPrice = minPricePerBytePerSecond * slotSize * duration.u256 * 3 + let requestPrice = pricePerSlotPerSecond * duration.u256 * 3 let fundsBackToClient = requestPrice - hostRewards let endBalanceClient = await token.balanceOf(clientAccount) if startBalanceClient + fundsBackToClient - requestPrice == endBalanceClient: clientFundsEvent.fire() - var transferEvent = newAsyncEvent() - proc onTransfer(eventResult: ?!Transfer) = assert not eventResult.isErr @@ -164,36 +133,39 @@ marketplacesuite(name = "Marketplace"): if data.receiver == hostAccount: asyncSpawn checkProviderRewards() if data.receiver == clientAccount: - asyncSpawn checkHostFunds() + asyncSpawn checkClientFunds() - let id = await client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) + discard ( + await host.postAvailability( + totalSize = size, + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = size.u256 * minPricePerBytePerSecond, + ) + ).get - let requestId = (await client.requestId(id)).get + # client requests storage + let (_, id) = await requestStorage(client) + requestId = id - discard await waitForRequestToStart() + # Subscribe SlotFilled event to receive the filledAt timestamp + # and calculate the provider reward + await marketplaceSubscribe(SlotFilled, onSlotFilled) - stopOnRequestFailed: - # Proving mechanism uses blockchain clock to do proving/collect/cleanup round - # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks - # only with new transaction - await ethProvider.advanceTime(duration.u256) + await waitForRequestToStart(requestId, expiry.int64) - let tokenSubscription = await token.subscribe(Transfer, onTransfer) + # Proving mechanism uses blockchain clock to do proving/collect/cleanup round + # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks + # only with new transaction + await ethProvider.advanceTime(duration.u256) - await clientFundsEvent.wait().wait(timeout = chronos.seconds(60)) - await providerRewardEvent.wait().wait(timeout = chronos.seconds(60)) + await tokenSubscribe(onTransfer) - await tokenSubscription.unsubscribe() - await filledSubscription.unsubscribe() + # Wait for the exact expected balances. + # The timeout is 60 seconds because the event should occur quickly, + # thanks to `advanceTime` moving to the end of the request duration. + await clientFundsEvent.wait().wait(timeout = chronos.seconds(60)) + await providerRewardEvent.wait().wait(timeout = chronos.seconds(60)) test "SP are able to process slots after workers were busy with other slots and ignored them", NodeConfigs( @@ -206,76 +178,52 @@ marketplacesuite(name = "Marketplace"): # .withLogTopics("marketplace", "sales", "statemachine","slotqueue", "reservations") .some, ): - let client0 = clients()[0] - let provider0 = providers()[0] - let provider1 = providers()[1] - let duration = 20 * 60.uint64 - - let data = await RandomChunker.example(blocks = blocks) - let slotSize = slotSize(blocks, ecNodes, ecTolerance) + var requestId: RequestId # We create an avavilability allowing the first SP to host the 3 slots. # So the second SP will not have any availability so it will just process # the slots and ignore them. - discard await provider0.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + discard await host.postAvailability( + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + totalCollateral = 3 * slotBytes * minPricePerBytePerSecond, ) - let cid = (await client0.client.upload(data)).get - - let purchaseId = await client0.client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 1.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) - - let requestId = (await client0.client.requestId(purchaseId)).get + let (_, id) = await requestStorage(client) + requestId = id # We wait that the 3 slots are filled by the first SP - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) # Here we create the same availability as previously but for the second SP. # Meaning that, after ignoring all the slots for the first request, the second SP will process # and host the slots for the second request. - discard await provider1.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + let host1 = providers()[1].client + + discard await host1.postAvailability( + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * collateralPerByte, + totalCollateral = 3 * slotBytes * collateralPerByte, ) - let purchaseId2 = await client0.client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) - let requestId2 = (await client0.client.requestId(purchaseId2)).get + let (_, id2) = await requestStorage(client) + requestId = id2 # Wait that the slots of the second request are filled - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) # Double check, verify that our second SP hosts the 3 slots - let signer = ethProvider.getSigner(hostAccount) + let host1Account = providers()[1].ethAccount + let signer = ethProvider.getSigner(host1Account) let marketplaceWithProviderSigner = marketplace.connect(signer) let slots = await marketplaceWithProviderSigner.mySlots() check slots.len == 3 for slotId in slots: let slot = await marketplaceWithProviderSigner.getActiveSlot(slotId) - check slot.request.id == purchase.requestId + check slot.request.id == requestId marketplacesuite(name = "Marketplace payouts"): const minPricePerBytePerSecond = 1.u256 @@ -283,6 +231,10 @@ marketplacesuite(name = "Marketplace payouts"): const blocks = 8 const ecNodes = 3 const ecTolerance = 1 + const slotBytes = slotSize(blocks, ecNodes, ecTolerance) + const duration = 20 * 60.uint64 + const expiry = 10 * 60.uint64 + const pricePerSlotPerSecond = minPricePerBytePerSecond * slotBytes test "expired request partially pays out for stored time", NodeConfigs( @@ -303,85 +255,46 @@ marketplacesuite(name = "Marketplace payouts"): # ) .some, ): - let duration = 20.periods - let expiry = 10.periods - let data = await RandomChunker.example(blocks = blocks) let client = clients()[0] let provider = providers()[0] let clientApi = client.client let providerApi = provider.client - let startBalanceProvider = await token.balanceOf(provider.ethAccount) - let startBalanceClient = await token.balanceOf(client.ethAccount) let hostAccount = providers()[0].ethAccount let clientAccount = clients()[0].ethAccount - # provider makes storage available - let datasetSize = datasetSize(blocks, ecNodes, ecTolerance) - let totalAvailabilitySize = (datasetSize div 2).truncate(uint64) - discard await providerApi.postAvailability( - # make availability size small enough that we can't fill all the slots, - # thus causing a cancellation - totalSize = totalAvailabilitySize, - duration = duration.uint64, - minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = collateralPerByte * totalAvailabilitySize.u256, - ) - - let cid = (await clientApi.upload(data)).get - - var filledAtPerSlot: seq[UInt256] = @[] var slotIndex = 0.uint64 var slotFilledEvent = newAsyncEvent() + var requestCancelledEvent = newAsyncEvent() + var providerRewardEvent = newAsyncEvent() + var filledAtPerSlot: seq[UInt256] = @[] + var requestId: RequestId + + let startBalanceClient = await token.balanceOf(client.ethAccount) + let startBalanceProvider = await token.balanceOf(hostAccount) proc storeFilledAtTimestamps() {.async.} = let filledAt = await ethProvider.blockTime(BlockTag.latest) filledAtPerSlot.add(filledAt) - slotFilledEvent.fire() - proc onSlotFilled(eventResult: ?!SlotFilled) = + proc onSlotFilled(eventResult: ?!SlotFilled) {.raises: [].} = assert not eventResult.isErr let event = !eventResult slotIndex = event.slotIndex asyncSpawn storeFilledAtTimestamps() + slotFilledEvent.fire() - let slotFilledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - - var requestCancelledEvent = newAsyncEvent() proc onRequestCancelled(eventResult: ?!RequestCancelled) = assert not eventResult.isErr requestCancelledEvent.fire() - let requestCancelledSubscription = - await marketplace.subscribe(RequestCancelled, onRequestCancelled) - - # client requests storage but requires multiple slots to host the content - let id = await clientApi.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - expiry = expiry, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) - let requestId = (await clientApi.requestId(id)).get - - # wait until one slot is filled - await slotFilledEvent.wait().wait(timeout = chronos.seconds(expiry.int)) - let slotId = slotId(!(await clientApi.requestId(id)), slotIndex) - let slotSize = slotSize(blocks, ecNodes, ecTolerance) - - let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize - var providerRewardEvent = newAsyncEvent() - proc checkProviderRewards() {.async.} = + let endBalanceProvider = await token.balanceOf(hostAccount) let requestEnd = await marketplace.requestEnd(requestId) let rewards = filledAtPerSlot .mapIt((requestEnd.u256 - it) * pricePerSlotPerSecond) .foldl(a + b, 0.u256) - let endBalanceHost = await token.balanceOf(hostAccount) - if rewards + startBalanceProvider == endBalanceHost: + if rewards + startBalanceProvider == endBalanceProvider: providerRewardEvent.fire() proc onTransfer(eventResult: ?!Transfer) = @@ -391,15 +304,37 @@ marketplacesuite(name = "Marketplace payouts"): if data.receiver == hostAccount: asyncSpawn checkProviderRewards() - let tokenAddress = await marketplace.token() - let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) - let tokenSubscription = await token.subscribe(Transfer, onTransfer) + # provider makes storage available + let datasetSize = datasetSize(blocks, ecNodes, ecTolerance) + let totalAvailabilitySize = (datasetSize div 2).truncate(uint64) + + discard await providerApi.postAvailability( + # make availability size small enough that we can't fill all the slots, + # thus causing a cancellation + totalSize = totalAvailabilitySize, + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = collateralPerByte * totalAvailabilitySize.u256, + ) + + let (_, id) = await requestStorage(clientApi) + requestId = id + + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(RequestCancelled, onRequestCancelled) + + # wait until one slot is filled + await slotFilledEvent.wait().wait(timeout = chronos.seconds(expiry.int)) + let slotId = slotId(requestId, slotIndex) + + await tokenSubscribe(onTransfer) # wait until sale is cancelled await ethProvider.advanceTime(expiry.u256) await requestCancelledEvent.wait().wait(timeout = chronos.seconds(5)) await advanceToNextPeriod() + # Wait for the expected balance for the provider await providerRewardEvent.wait().wait(timeout = chronos.seconds(60)) # Ensure that total rewards stay within the payout limit @@ -412,14 +347,11 @@ marketplacesuite(name = "Marketplace payouts"): let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) let endBalanceClient = (await token.balanceOf(client.ethAccount)) + check( startBalanceClient - endBalanceClient == endBalanceProvider - startBalanceProvider ) - await slotFilledSubscription.unsubscribe() - await requestCancelledSubscription.unsubscribe() - await tokenSubscription.unsubscribe() - test "the collateral is returned after a sale is ignored", NodeConfigs( hardhat: HardhatConfig.none, @@ -434,13 +366,10 @@ marketplacesuite(name = "Marketplace payouts"): # ) .some, ): - let data = await RandomChunker.example(blocks = blocks) let client0 = clients()[0] let provider0 = providers()[0] let provider1 = providers()[1] let provider2 = providers()[2] - let duration = 20 * 60.uint64 - let slotSize = slotSize(blocks, ecNodes, ecTolerance) # Here we create 3 SP which can host 3 slot. # While they will process the slot, each SP will @@ -449,59 +378,44 @@ marketplacesuite(name = "Marketplace payouts"): # will be ignored. In that case, the collateral assigned for # the reservation should return to the availability. discard await provider0.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + totalCollateral = 3 * slotBytes * minPricePerBytePerSecond, ) discard await provider1.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + totalCollateral = 3 * slotBytes * minPricePerBytePerSecond, ) discard await provider2.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + totalCollateral = 3 * slotBytes * minPricePerBytePerSecond, ) - let cid = (await client0.client.upload(data)).get + let (_, requestId) = await requestStorage(client0.client) + await waitForRequestToStart(requestId, expiry.int64) - let purchaseId = await client0.client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 1.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) + # Here we will check that for each provider, the total remaining collateral + # will match the available slots. + # So if a SP hosts 1 slot, it should have enough total remaining collateral + # to host 2 more slots. + for provider in providers(): + let client = provider.client + check eventually( + block: + try: + let availabilities = (await client.getAvailabilities()).get + let availability = availabilities[0] + let slots = (await client.getSlots()).get + let availableSlots = (3 - slots.len).u256 - let requestId = (await client0.client.requestId(purchaseId)).get - - discard await waitForRequestToStart() - - stopOnRequestFailed: - # Here we will check that for each provider, the total remaining collateral - # will match the available slots. - # So if a SP hosts 1 slot, it should have enough total remaining collateral - # to host 2 more slots. - for provider in providers(): - let client = provider.client - check eventually( - block: - try: - let availabilities = (await client.getAvailabilities()).get - let availability = availabilities[0] - let slots = (await client.getSlots()).get - let availableSlots = (3 - slots.len).u256 - - availability.totalRemainingCollateral == - availableSlots * slotSize * minPricePerBytePerSecond - except HttpConnectionError: - return false, - timeout = 30 * 1000, - ) + availability.totalRemainingCollateral == + availableSlots * slotBytes * minPricePerBytePerSecond + except HttpConnectionError: + return false, + timeout = 30 * 1000, + ) diff --git a/tests/integration/30_minutes/testproofs.nim b/tests/integration/30_minutes/testproofs.nim index 46829c96..d7772774 100644 --- a/tests/integration/30_minutes/testproofs.nim +++ b/tests/integration/30_minutes/testproofs.nim @@ -38,8 +38,12 @@ marketplacesuite(name = "Hosts submit regular proofs"): let client0 = clients()[0].client let expiry = 10.periods let duration = expiry + 5.periods + let proofSubmittedEvent = newAsyncEvent() + + proc onProofSubmitted(event: ?!ProofSubmitted) = + if event.isOk: + proofSubmittedEvent.fire() - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) await createAvailabilities( @@ -49,32 +53,20 @@ marketplacesuite(name = "Hosts submit regular proofs"): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get - - let purchaseId = await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - ) + let (purchaseId, requestId) = + await requestStorage(client0, duration = duration, expiry = expiry) let purchase = (await client0.getPurchase(purchaseId)).get check purchase.error == none string - let slotSize = slotSize(blocks, ecNodes, ecTolerance) + await marketplaceSubscribe(ProofSubmitted, onProofSubmitted) - discard await waitForRequestToStart(expiry.int) + await waitForRequestToStart(requestId, expiry.int64) - var proofWasSubmitted = false - proc onProofSubmitted(event: ?!ProofSubmitted) = - proofWasSubmitted = event.isOk - - let subscription = await marketplace.subscribe(ProofSubmitted, onProofSubmitted) - - check eventually(proofWasSubmitted, timeout = (duration - expiry).int * 1000) - - await subscription.unsubscribe() + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + await proofSubmittedEvent.wait().wait( + timeout = chronos.seconds(secondsTillRequestEnd) + ) marketplacesuite(name = "Simulate invalid proofs"): # TODO: these are very loose tests in that they are not testing EXACTLY how @@ -88,6 +80,17 @@ marketplacesuite(name = "Simulate invalid proofs"): const ecNodes = 3 const ecTolerance = 1 + var slotWasFreedEvent: AsyncEvent + var requestId: RequestId + + proc onSlotFreed(event: ?!SlotFreed) = + if event.isOk and event.value.requestId == requestId: + slotWasFreedEvent.fire() + + setup: + requestId = RequestId.default() + slotWasFreedEvent = newAsyncEvent() + test "slot is freed after too many invalid proofs submitted", NodeConfigs( # Uncomment to start Hardhat automatically, typically so logs can be inspected locally @@ -120,7 +123,6 @@ marketplacesuite(name = "Simulate invalid proofs"): let expiry = 10.periods let duration = expiry + 10.periods - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) await createAvailabilities( @@ -130,32 +132,14 @@ marketplacesuite(name = "Simulate invalid proofs"): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get - - let purchaseId = ( - await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - proofProbability = 1.u256, - ) + let (purchaseId, id) = await requestStorage( + client0, duration = duration, proofProbability = 1.u256, expiry = expiry ) - let requestId = (await client0.requestId(purchaseId)).get + requestId = id - discard await waitForRequestToStart(expiry.int) + await marketplaceSubscribe(SlotFreed, onSlotFreed) - var slotWasFreed = false - proc onSlotFreed(event: ?!SlotFreed) = - if event.isOk and event.value.requestId == requestId: - slotWasFreed = true - - let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - - check eventually(slotWasFreed, timeout = (duration - expiry).int * 1000) - - await subscription.unsubscribe() + await slotWasFreedEvent.wait().wait(timeout = chronos.seconds(duration.int64)) test "slot is not freed when not enough invalid proofs submitted", NodeConfigs( @@ -182,8 +166,15 @@ marketplacesuite(name = "Simulate invalid proofs"): let client0 = clients()[0].client let expiry = 10.periods let duration = expiry + 10.periods + let slotWasFilledEvent = newAsyncEvent() + + proc onSlotFilled(eventResult: ?!SlotFilled) = + assert not eventResult.isErr + let event = !eventResult + + if event.requestId == requestId: + slotWasFilledEvent.fire() - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) await createAvailabilities( @@ -193,44 +184,24 @@ marketplacesuite(name = "Simulate invalid proofs"): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get + let (purchaseId, id) = + await requestStorage(client0, duration = duration, expiry = expiry) + requestId = id - let purchaseId = await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - proofProbability = 1.u256, - ) - let requestId = (await client0.requestId(purchaseId)).get - - var slotWasFilled = false - proc onSlotFilled(eventResult: ?!SlotFilled) = - assert not eventResult.isErr - let event = !eventResult - - if event.requestId == requestId: - slotWasFilled = true - - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFreed, onSlotFreed) # wait for the first slot to be filled - check eventually(slotWasFilled, timeout = expiry.int * 1000) + await slotWasFilledEvent.wait().wait(timeout = chronos.seconds(expiry.int64)) - var slotWasFreed = false - proc onSlotFreed(event: ?!SlotFreed) = - if event.isOk and event.value.requestId == requestId: - slotWasFreed = true - - let freedSubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - - # In 2 periods you cannot have enough invalid proofs submitted: - await sleepAsync(2.periods.int.seconds) - check not slotWasFreed - - await filledSubscription.unsubscribe() - await freedSubscription.unsubscribe() + try: + # In 2 periods you cannot have enough invalid proofs submitted: + await slotWasFreedEvent.wait().wait( + timeout = chronos.seconds(2.periods.int.seconds) + ) + fail("invalid proofs were not expected in 2 periods") + except AsyncTimeoutError: + discard # TODO: uncomment once fixed # WARNING: in the meantime minPrice has changed to minPricePerBytePerSecond diff --git a/tests/integration/30_minutes/testslotrepair.nim b/tests/integration/30_minutes/testslotrepair.nim index 07084338..32a95959 100644 --- a/tests/integration/30_minutes/testslotrepair.nim +++ b/tests/integration/30_minutes/testslotrepair.nim @@ -20,18 +20,17 @@ marketplacesuite(name = "SP Slot Repair"): const ecTolerance = 1 const size = slotSize(blocks, ecNodes, ecTolerance) - var filledSlotIds: seq[SlotId] = @[] - var freedSlotId = none SlotId + var freedSlotIndex = none uint64 var requestId: RequestId + var slotFilledEvent: AsyncEvent # Here we are keeping track of the slot filled using their ids. proc onSlotFilled(eventResult: ?!SlotFilled) = assert not eventResult.isErr let event = !eventResult - if event.requestId == requestId: - let slotId = slotId(event.requestId, event.slotIndex) - filledSlotIds.add slotId + if event.requestId == requestId and event.slotIndex == freedSlotIndex.get: + slotFilledEvent.fire() # Here we are retrieving the slot id freed. # When the event is triggered, the slot id is removed @@ -42,44 +41,23 @@ marketplacesuite(name = "SP Slot Repair"): let slotId = slotId(event.requestId, event.slotIndex) if event.requestId == requestId: - assert slotId in filledSlotIds - filledSlotIds.del(filledSlotIds.find(slotId)) - freedSlotId = some(slotId) + freedSlotIndex = some event.slotIndex - proc createPurchase(client: CodexClient): Future[PurchaseId] {.async.} = - let data = await RandomChunker.example(blocks = blocks) - let cid = (await client.upload(data)).get - - let purchaseId = await client.requestStorage( - cid, - expiry = 10.periods, - duration = 20.periods, - nodes = ecNodes, - tolerance = ecTolerance, - collateralPerByte = 1.u256, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 1.u256, - ) - requestId = (await client.requestId(purchaseId)).get - - return purchaseId - - proc freeSlot(provider: CodexClient): Future[void] {.async.} = + proc freeSlot(provider: CodexProcess): Future[void] {.async.} = # Get the second provider signer. - let signer = ethProvider.getSigner(accounts[2]) + let signer = ethProvider.getSigner(provider.ethAccount) let marketplaceWithSecondProviderSigner = marketplace.connect(signer) # Call freeSlot to speed up the process. # It accelerates the test by skipping validator # proof verification and not waiting for the full period. # The downside is that this doesn't reflect the real slot freed process. - let slots = (await provider.getSlots()).get() + let slots = (await provider.client.getSlots()).get() let slotId = slotId(requestId, slots[0].slotIndex) discard await marketplaceWithSecondProviderSigner.freeSlot(slotId) setup: - filledSlotIds = @[] - freedSlotId = none SlotId + slotFilledEvent = newAsyncEvent() test "repair from local store", NodeConfigs( @@ -124,13 +102,17 @@ marketplacesuite(name = "SP Slot Repair"): ) ).get - let purchaseId = await createPurchase(client0.client) - - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + let (purchaseId, id) = await requestStorage( + client0.client, + blocks = blocks, + expiry = expiry, + duration = duration, + proofProbability = 1.u256, + ) + requestId = id # Wait for purchase starts, meaning that the slots are filled. - discard await waitForRequestToStart(expiry.int) + await waitForRequestToStart(requestId, expiry.int64) # stop client so it doesn't serve any blocks anymore await client0.stop() @@ -148,17 +130,16 @@ marketplacesuite(name = "SP Slot Repair"): totalSize = (3 * size.truncate(uint64)).uint64.some, ) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFreed, onSlotFreed) + # Let's free the slot to speed up the process - await freeSlot(provider1.client) + await freeSlot(provider1) - # We expect that the freed slot is added in the filled slot id list, + # We expect that the freed slot is filled again, # meaning that the slot was repaired locally by SP 1. - # check eventually( - # freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000 - # ) - - await filledSubscription.unsubscribe() - await slotFreedsubscription.unsubscribe() + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + await slotFilledEvent.wait().wait(timeout = chronos.seconds(secondsTillRequestEnd)) test "repair from local and remote store", NodeConfigs( @@ -204,13 +185,17 @@ marketplacesuite(name = "SP Slot Repair"): totalCollateral = size * collateralPerByte, ) - let purchaseId = await createPurchase(client0.client) - - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + let (purchaseId, id) = await requestStorage( + client0.client, + blocks = blocks, + expiry = expiry, + duration = duration, + proofProbability = 1.u256, + ) + requestId = id # Wait for purchase starts, meaning that the slots are filled. - discard await waitForRequestToStart(expiry.int) + await waitForRequestToStart(requestId, expiry.int64) # stop client so it doesn't serve any blocks anymore await client0.stop() @@ -227,16 +212,16 @@ marketplacesuite(name = "SP Slot Repair"): totalCollateral = (2 * size * collateralPerByte).some, ) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFreed, onSlotFreed) + # Let's free the slot to speed up the process - await freeSlot(provider1.client) + await freeSlot(provider1) - # We expect that the freed slot is added in the filled slot id list, + # We expect that the freed slot is filled again, # meaning that the slot was repaired locally and remotely (using SP 3) by SP 1. - # check eventually(freedSlotId.isSome, timeout = expiry.int * 1000) - # check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000) - - await filledSubscription.unsubscribe() - await slotFreedsubscription.unsubscribe() + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + await slotFilledEvent.wait().wait(timeout = chronos.seconds(secondsTillRequestEnd)) test "repair from remote store only", NodeConfigs( @@ -275,13 +260,17 @@ marketplacesuite(name = "SP Slot Repair"): ) ).get - let purchaseId = await createPurchase(client0.client) - - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + let (purchaseId, id) = await requestStorage( + client0.client, + blocks = blocks, + expiry = expiry, + duration = duration, + proofProbability = 1.u256, + ) + requestId = id # Wait for purchase starts, meaning that the slots are filled. - discard await waitForRequestToStart(expiry.int) + await waitForRequestToStart(requestId, expiry.int64) # stop client so it doesn't serve any blocks anymore await client0.stop() @@ -299,12 +288,12 @@ marketplacesuite(name = "SP Slot Repair"): # SP 2 will not pick the slot again. await provider1.client.patchAvailability(availability1.id, enabled = false.some) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFreed, onSlotFreed) + # Let's free the slot to speed up the process - await freeSlot(provider1.client) + await freeSlot(provider1) # At this point, SP 3 should repair the slot from SP 1 and host it. - # check eventually(freedSlotId.isSome, timeout = expiry.int * 1000) - # check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000) - - await filledSubscription.unsubscribe() - await slotFreedsubscription.unsubscribe() + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + await slotFilledEvent.wait().wait(timeout = chronos.seconds(secondsTillRequestEnd)) diff --git a/tests/integration/30_minutes/testvalidator.nim b/tests/integration/30_minutes/testvalidator.nim index 24773398..b60f048b 100644 --- a/tests/integration/30_minutes/testvalidator.nim +++ b/tests/integration/30_minutes/testvalidator.nim @@ -20,7 +20,6 @@ marketplacesuite(name = "Validation"): const ecNodes = 3 const ecTolerance = 1 const proofProbability = 1.u256 - const collateralPerByte = 1.u256 const minPricePerBytePerSecond = 1.u256 @@ -60,9 +59,6 @@ marketplacesuite(name = "Validation"): # let mine a block to sync the blocktime with the current clock discard await ethProvider.send("evm_mine") - var currentTime = await ethProvider.currentTime() - let requestEndTime = currentTime.truncate(uint64) + duration - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) @@ -73,28 +69,19 @@ marketplacesuite(name = "Validation"): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get - let purchaseId = await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - proofProbability = proofProbability, + let (purchaseId, requestId) = await requestStorage( + client0, duration = duration, expiry = expiry, proofProbability = proofProbability ) - let requestId = (await client0.requestId(purchaseId)).get debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId - discard await waitForRequestToStart(expiry.int + 60) + await waitForRequestToStart(requestId, expiry.int64) discard await ethProvider.send("evm_mine") - currentTime = await ethProvider.currentTime() - let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int - debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds - - discard await waitForRequestToFail(secondsTillRequestEnd + 60) + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd + await waitForRequestToFail(requestId, secondsTillRequestEnd.int64) test "validator uses historical state to mark missing proofs", NodeConfigs( @@ -124,7 +111,6 @@ marketplacesuite(name = "Validation"): var currentTime = await ethProvider.currentTime() let requestEndTime = currentTime.truncate(uint64) + duration - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) await createAvailabilities( @@ -134,20 +120,13 @@ marketplacesuite(name = "Validation"): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get - let purchaseId = await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - proofProbability = proofProbability, + let (purchaseId, requestId) = await requestStorage( + client0, duration = duration, expiry = expiry, proofProbability = proofProbability ) - let requestId = (await client0.requestId(purchaseId)).get debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId - discard await waitForRequestToStart(expiry.int + 60) + await waitForRequestToStart(requestId, expiry.int64) # extra block just to make sure we have one that separates us # from the block containing the last (past) SlotFilled event @@ -168,10 +147,6 @@ marketplacesuite(name = "Validation"): let node = await startValidatorNode(config) running.add RunningNode(role: Role.Validator, node: node) - discard await ethProvider.send("evm_mine") - currentTime = await ethProvider.currentTime() - let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int - - debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds - - discard await waitForRequestToFail(secondsTillRequestEnd + 60) + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd + await waitForRequestToFail(requestId, secondsTillRequestEnd.int64) diff --git a/tests/integration/5_minutes/testsales.nim b/tests/integration/5_minutes/testsales.nim index 6de52202..d8d5dcca 100644 --- a/tests/integration/5_minutes/testsales.nim +++ b/tests/integration/5_minutes/testsales.nim @@ -114,10 +114,11 @@ marketplacesuite(name = "Sales"): test "updating availability - updating totalSize does not allow bellow utilized", salesConfig: let originalSize = 0xFFFFFF.uint64 - let data = await RandomChunker.example(blocks = 8) let minPricePerBytePerSecond = 3.u256 let collateralPerByte = 1.u256 let totalCollateral = originalSize.u256 * collateralPerByte + let expiry = 10 * 60.uint64 + let availability = ( await host.postAvailability( totalSize = originalSize, @@ -128,19 +129,11 @@ marketplacesuite(name = "Sales"): ).get # Lets create storage request that will utilize some of the availability's space - let cid = (await client.upload(data)).get - let id = await client.requestStorage( - cid, - duration = 20 * 60.uint64, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = (10 * 60).uint64, - collateralPerByte = collateralPerByte, - nodes = 3, - tolerance = 1, + let (purchaseId, requestId) = await requestStorage( + client = client, pricePerBytePerSecond = minPricePerBytePerSecond ) - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) let updatedAvailability = ((await host.getAvailabilities()).get).findItem(availability).get @@ -183,12 +176,9 @@ marketplacesuite(name = "Sales"): test "returns an error when trying to update the until date before an existing a request is finished", salesConfig: let size = 0xFFFFFF.uint64 - let data = await RandomChunker.example(blocks = 8) let duration = 20 * 60.uint64 + let expiry = 10 * 60.uint64 let minPricePerBytePerSecond = 3.u256 - let collateralPerByte = 1.u256 - let ecNodes = 3.uint - let ecTolerance = 1.uint # host makes storage available let availability = ( @@ -200,24 +190,12 @@ marketplacesuite(name = "Sales"): ) ).get - # client requests storage - let cid = (await client.upload(data)).get - let id = ( - await client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) - ).get + let (purchaseId, requestId) = + await requestStorage(client, pricePerBytePerSecond = minPricePerBytePerSecond) - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) - let purchase = (await client.getPurchase(id)).get + let purchase = (await client.getPurchase(purchaseId)).get check purchase.error == none string let unixNow = getTime().toUnix() diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim index dbb77540..e3b010a0 100644 --- a/tests/integration/marketplacesuite.nim +++ b/tests/integration/marketplacesuite.nim @@ -1,5 +1,4 @@ import macros -import std/strutils import std/unittest import pkg/chronos @@ -9,7 +8,7 @@ import pkg/codex/contracts/marketplace as mp import pkg/codex/periods import pkg/codex/utils/json from pkg/codex/utils import roundUp, divUp -import ./multinodes except Subscription +import ./multinodes except Subscription, Event import ../contracts/time import ../contracts/deployment @@ -22,72 +21,83 @@ template marketplacesuite*(name: string, body: untyped) = var period: uint64 var periodicity: Periodicity var token {.inject, used.}: Erc20Token - var requestStartedEvent: AsyncEvent - var requestStartedSubscription: Subscription - var requestFailedEvent: AsyncEvent - var requestFailedSubscription: Subscription - - template fail(reason: string) = - raise newException(TestFailedError, reason) + var subscriptions: seq[Subscription] = @[] + var tokenSubscription: Subscription proc check(cond: bool, reason = "Check failed"): void = if not cond: fail(reason) - template stopOnRequestFailed(tbody: untyped) = - let completed = newAsyncEvent() + proc marketplaceSubscribe[E: Event]( + event: type E, handler: EventHandler[E] + ) {.async.} = + let sub = await marketplace.subscribe(event, handler) + subscriptions.add(sub) - let mainFut = ( - proc(): Future[void] {.async.} = - tbody - completed.fire() - )() + proc tokenSubscribe( + handler: proc(event: ?!Transfer) {.gcsafe, raises: [].} + ) {.async.} = + let sub = await token.subscribe(Transfer, handler) + tokenSubscription = sub - let fastFailFut = ( - proc(): Future[void] {.async.} = - try: - await requestFailedEvent.wait().wait(timeout = chronos.seconds(60)) - completed.fire() - raise newException(TestFailedError, "storage request has failed") - except AsyncTimeoutError: - discard - )() + proc subscribeOnRequestFulfilled( + requestId: RequestId + ): Future[AsyncEvent] {.async.} = + let event = newAsyncEvent() - await completed.wait().wait(timeout = chronos.seconds(60 * 30)) + proc onRequestFulfilled(eventResult: ?!RequestFulfilled) {.raises: [].} = + assert not eventResult.isErr + let er = !eventResult - if not fastFailFut.completed: - await fastFailFut.cancelAndWait() + if er.requestId == requestId: + event.fire() - if mainFut.failed: - raise mainFut.error + let sub = await marketplace.subscribe(RequestFulfilled, onRequestFulfilled) + subscriptions.add(sub) - if fastFailFut.failed: - raise fastFailFut.error - - proc onRequestStarted(eventResult: ?!RequestFulfilled) {.raises: [].} = - requestStartedEvent.fire() - - proc onRequestFailed(eventResult: ?!RequestFailed) {.raises: [].} = - requestFailedEvent.fire() + return event proc getCurrentPeriod(): Future[Period] {.async.} = return periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64)) proc waitForRequestToStart( - seconds = 10 * 60 + 10 - ): Future[Period] {. - async: (raises: [CancelledError, AsyncTimeoutError, TestFailedError]) - .} = - await requestStartedEvent.wait().wait(timeout = chronos.seconds(seconds)) - # Recreate a new future if we need to wait for another request - requestStartedEvent = newAsyncEvent() + requestId: RequestId, seconds = 10 * 60 + 10 + ): Future[void] {.async.} = + let event = newAsyncEvent() + + proc onRequestFulfilled(eventResult: ?!RequestFulfilled) {.raises: [].} = + assert not eventResult.isErr + let er = !eventResult + + if er.requestId == requestId: + event.fire() + + let sub = await marketplace.subscribe(RequestFulfilled, onRequestFulfilled) + subscriptions.add(sub) + + await event.wait().wait(timeout = chronos.seconds(seconds)) + + proc getSecondsTillRequestEnd(requestId: RequestId): Future[int64] {.async.} = + let currentTime = await ethProvider.currentTime() + let requestEnd = await marketplace.requestEnd(requestId) + return requestEnd.int64 - currentTime.truncate(int64) proc waitForRequestToFail( - seconds = (5 * 60) + 10 - ): Future[Period] {.async: (raises: [CancelledError, AsyncTimeoutError]).} = - await requestFailedEvent.wait().wait(timeout = chronos.seconds(seconds)) - # Recreate a new future if we need to wait for another request - requestFailedEvent = newAsyncEvent() + requestId: RequestId, seconds = (5 * 60) + 10 + ): Future[void] {.async.} = + let event = newAsyncEvent() + + proc onRequestFailed(eventResult: ?!RequestFailed) {.raises: [].} = + assert not eventResult.isErr + let er = !eventResult + + if er.requestId == requestId: + event.fire() + + let sub = await marketplace.subscribe(RequestFailed, onRequestFailed) + subscriptions.add(sub) + + await event.wait().wait(timeout = chronos.seconds(seconds)) proc advanceToNextPeriod() {.async.} = let periodicity = Periodicity(seconds: period) @@ -145,7 +155,7 @@ template marketplacesuite*(name: string, body: untyped) = client: CodexClient, cid: Cid, proofProbability = 1.u256, - duration: uint64 = 12.periods, + duration: uint64 = 20 * 60.uint64, pricePerBytePerSecond = 1.u256, collateralPerByte = 1.u256, expiry: uint64 = 4.periods, @@ -167,6 +177,36 @@ template marketplacesuite*(name: string, body: untyped) = return id + proc requestStorage( + client: CodexClient, + proofProbability = 3.u256, + duration = 20 * 60.uint64, + pricePerBytePerSecond = 1.u256, + collateralPerByte = 1.u256, + expiry = 10 * 60.uint64, + nodes = 3, + tolerance = 1, + blocks = 8, + data = seq[byte].none, + ): Future[(PurchaseId, RequestId)] {.async.} = + let bytes = data |? await RandomChunker.example(blocks = blocks) + let cid = (await client.upload(bytes)).get + + let purchaseId = await client.requestStorage( + cid, + duration = duration, + pricePerBytePerSecond = pricePerBytePerSecond, + proofProbability = proofProbability, + expiry = expiry, + collateralPerByte = collateralPerByte, + nodes = nodes, + tolerance = tolerance, + ) + + let requestId = (await client.requestId(purchaseId)).get + + return (purchaseId, requestId) + setup: marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) let tokenAddress = await marketplace.token() @@ -174,18 +214,12 @@ template marketplacesuite*(name: string, body: untyped) = let config = await marketplace.configuration() period = config.proofs.period periodicity = Periodicity(seconds: period) - - requestStartedEvent = newAsyncEvent() - requestFailedEvent = newAsyncEvent() - - requestStartedSubscription = - await marketplace.subscribe(RequestFulfilled, onRequestStarted) - - requestFailedSubscription = - await marketplace.subscribe(RequestFailed, onRequestFailed) - + subscriptions = @[] teardown: - await requestStartedSubscription.unsubscribe() - await requestFailedSubscription.unsubscribe() + for subscription in subscriptions: + await subscription.unsubscribe() + + if not tokenSubscription.isNil: + await tokenSubscription.unsubscribe() body diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index 51c5ccb4..80827551 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -107,7 +107,10 @@ template multinodesuite*(name: string, body: untyped) = currentTestName = tname nodeConfigs = startNodeConfigs test tname: - failAndTeardownOnError("test failed", tbody) + tbody + + template fail(reason: string) = + raise newException(TestFailedError, reason) proc sanitize(pathSegment: string): string = var sanitized = pathSegment @@ -255,21 +258,34 @@ template multinodesuite*(name: string, body: untyped) = return await newCodexProcess(validatorIdx, config, Role.Validator) - proc teardownImpl() {.async.} = + proc teardownImpl() {.async: (raises: [CancelledError]).} = for nodes in @[validators(), clients(), providers()]: for node in nodes: - await node.stop() # also stops rest client - node.removeDataDir() + try: + await node.stop() # also stops rest client + node.removeDataDir() + except CancelledError as e: + raise e + # Raised by removeDataDir + except Exception as e: + error "error when trying to stop the node", error = e.msg # if hardhat was started in the test, kill the node # otherwise revert the snapshot taken in the test setup let hardhat = hardhat() if not hardhat.isNil: - await hardhat.stop() + try: + await hardhat.stop() + except CancelledError as e: + raise e + except CatchableError as e: + error "error when trying to stop hardhat", error = e.msg else: - discard await send(ethProvider, "evm_revert", @[snapshot]) - - await ethProvider.close() + try: + discard await send(ethProvider, "evm_revert", @[snapshot]) + await ethProvider.close() + except ProviderError as e: + error "error when trying to cleanup the evm", error = e.msg running = @[] @@ -285,7 +301,7 @@ template multinodesuite*(name: string, body: untyped) = await teardownImpl() when declared(teardownAllIMPL): teardownAllIMPL() - raise er + fail(er.msg) proc updateBootstrapNodes( node: CodexProcess