From acc50aa4bae553b641087a513a47e891daa2a3a3 Mon Sep 17 00:00:00 2001 From: Chrysostomos Nanakos Date: Fri, 11 Apr 2025 12:09:54 +0300 Subject: [PATCH] Add support for slot reconstruction on unavailable slot detection (#892) Includes corresponding tests. Various small fixes. --- codex/blockexchange/engine/advertiser.nim | 8 +- codex/erasure/erasure.nim | 133 +++++++++ codex/node.nim | 49 ++-- codex/stores/networkstore.nim | 7 + tests/codex/node/testslotrepair.nim | 190 +++++++++++++ tests/codex/testnode.nim | 1 + tests/integration/testslotrepair.nim | 315 ++++++++++++++++++++++ tests/testIntegration.nim | 1 + 8 files changed, 680 insertions(+), 24 deletions(-) create mode 100644 tests/codex/node/testslotrepair.nim create mode 100644 tests/integration/testslotrepair.nim diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index d094c454..cb8fd921 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -126,16 +126,16 @@ proc start*(b: Advertiser) {.async: (raises: []).} = trace "Advertiser start" + if b.advertiserRunning: + warn "Starting advertiser twice" + return + proc onBlock(cid: Cid) {.async.} = await b.advertiseBlock(cid) doAssert(b.localStore.onBlockStored.isNone()) b.localStore.onBlockStored = onBlock.some - if b.advertiserRunning: - warn "Starting advertiser twice" - return - b.advertiserRunning = true for i in 0 ..< b.concurrentAdvReqs: let fut = b.processQueueLoop() diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 884969d0..c9189ca5 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -659,6 +659,139 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = return decoded.success +proc repair*(self: Erasure, encoded: Manifest, slotIdx: int): Future[?!void] {.async.} = + ## Repair a protected manifest slot + ## + ## `encoded` - the encoded (protected) manifest to + ## be repaired + ## + logScope: + steps = encoded.steps + rounded_blocks = encoded.rounded + new_manifest = encoded.blocksCount + + var + cids = seq[Cid].new() + decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM) + emptyBlock = newSeq[byte](encoded.blockSize.int) + + cids[].setLen(encoded.blocksCount) + try: + for step in 0 ..< encoded.steps: + await sleepAsync(10.millis) + + var + data = seq[seq[byte]].new() + parityData = seq[seq[byte]].new() + recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int) + + data[].setLen(encoded.ecK) + parityData[].setLen(encoded.ecM) + + without (dataPieces, _) =? ( + await self.prepareDecodingData( + encoded, step, data, parityData, cids, emptyBlock + ) + ), err: + trace "Unable to prepare decoding data", error = err.msg + return failure(err) + + if dataPieces >= encoded.ecK: + trace "Retrieved all the required data blocks for this step" + continue + + trace "Erasure decoding data" + try: + if err =? ( + await self.asyncDecode( + encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered + ) + ).errorOption: + return failure(err) + except CancelledError as exc: + raise exc + finally: + freeDoubleArray(recovered, encoded.ecK) + + for i in 0 ..< encoded.ecK: + let idx = i * encoded.steps + step + if data[i].len <= 0 and not cids[idx].isEmpty: + var innerPtr: ptr UncheckedArray[byte] = recovered[][i] + + without blk =? bt.Block.new( + innerPtr.toOpenArray(0, encoded.blockSize.int - 1) + ), error: + trace "Unable to create data block!", exc = error.msg + return failure(error) + + trace "Recovered data block", cid = blk.cid, index = i + if isErr (await self.store.putBlock(blk)): + trace "Unable to store data block!", cid = blk.cid + return failure("Unable to store data block!") + + cids[idx] = blk.cid + except CancelledError as exc: + trace "Erasure coding decoding cancelled" + raise exc # cancellation needs to be propagated + except CatchableError as exc: + trace "Erasure coding decoding error", exc = exc.msg + return failure(exc) + finally: + decoder.release() + + without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: + return failure(err) + + without treeCid =? tree.rootCid, err: + return failure(err) + + if treeCid != encoded.originalTreeCid: + return failure( + "Original tree root differs from the tree root computed out of recovered data" + ) + + if err =? (await self.store.putAllProofs(tree)).errorOption: + return failure(err) + + without repaired =? ( + await self.encode( + Manifest.new(encoded), encoded.ecK, encoded.ecM, encoded.protectedStrategy + ) + ), err: + return failure(err) + + if repaired.treeCid != encoded.treeCid: + return failure( + "Original tree root differs from the repaired tree root encoded out of recovered data" + ) + + let + groupStrategy = ?encoded.protectedStrategy.init( + firstIndex = 0, + lastIndex = encoded.blocksCount - 1, + iterations = encoded.numSlots + ).catch + groupIndices = toSeq(groupStrategy.getIndicies(slotIdx)) + strategy = ?encoded.protectedStrategy.init( + firstIndex = 0, + lastIndex = encoded.blocksCount - 1, + iterations = encoded.steps + ).catch + + for step in 0 ..< encoded.steps: + let indices = strategy.getIndicies(step) + for i in indices: + if i notin groupIndices: + if isErr (await self.store.delBlock(encoded.treeCid, i)): + trace "Failed to remove block from tree ", treeCid = encoded.treeCid, index = i + + for i, cid in cids[0 ..< encoded.originalBlocksCount]: + if i notin groupIndices: + if isErr (await self.store.delBlock(treeCid, i)): + trace "Failed to remove original block from tree ", treeCid = treeCid, index = i + + return success() + proc start*(self: Erasure) {.async.} = return diff --git a/codex/node.nim b/codex/node.nim index fb653c0d..ba128aa5 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -629,10 +629,6 @@ proc onStore( trace "Received a request to store a slot" - # TODO: Use the isRepairing to manage the slot download. - # If isRepairing is true, the slot has to be repaired before - # being downloaded. - without manifest =? (await self.fetchManifest(cid)), err: trace "Unable to fetch manifest for cid", cid, err = err.msg return failure(err) @@ -665,32 +661,45 @@ proc onStore( return success() - without indexer =? - manifest.verifiableStrategy.init(0, manifest.blocksCount - 1, manifest.numSlots).catch, - err: - trace "Unable to create indexing strategy from protected manifest", err = err.msg - return failure(err) - if slotIdx > int.high.uint64: error "Cannot cast slot index to int", slotIndex = slotIdx return - without blksIter =? indexer.getIndicies(slotIdx.int).catch, err: - trace "Unable to get indicies from strategy", err = err.msg - return failure(err) + if isRepairing: + trace "start repairing slot", slotIdx + try: + let erasure = Erasure.new( + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + ) + if err =? (await erasure.repair(manifest, slotIdx.int)).errorOption: + error "Unable to erasure decode repairing manifest", + cid = manifest.treeCid, exc = err.msg + return failure(err) + except CatchableError as exc: + error "Error erasure decoding repairing manifest", + cid = manifest.treeCid, exc = exc.msg + return failure(exc.msg) + else: + without indexer =? + manifest.verifiableStrategy.init(0, manifest.blocksCount - 1, manifest.numSlots).catch, + err: + trace "Unable to create indexing strategy from protected manifest", err = err.msg + return failure(err) - if err =? ( - await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry) - ).errorOption: - trace "Unable to fetch blocks", err = err.msg - return failure(err) + without blksIter =? indexer.getIndicies(slotIdx.int).catch, err: + trace "Unable to get indicies from strategy", err = err.msg + return failure(err) + + if err =? ( + await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry) + ).errorOption: + trace "Unable to fetch blocks", err = err.msg + return failure(err) without slotRoot =? (await builder.buildSlot(slotIdx.int)), err: trace "Unable to build slot", err = err.msg return failure(err) - trace "Slot successfully retrieved and reconstructed" - if cid =? slotRoot.toSlotCid() and cid != manifest.slotRoots[slotIdx]: trace "Slot root mismatch", manifest = manifest.slotRoots[slotIdx.int], recovered = slotRoot.toSlotCid() diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index f94bca33..88297831 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -128,6 +128,13 @@ method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] = trace "Deleting block from network store", cid return self.localStore.delBlock(cid) +method delBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!void] = + ## Delete a block from the blockstore + ## + + trace "Deleting block from network store", treeCid, index + return self.localStore.delBlock(treeCid, index) + {.pop.} method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} = diff --git a/tests/codex/node/testslotrepair.nim b/tests/codex/node/testslotrepair.nim new file mode 100644 index 00000000..6825c75d --- /dev/null +++ b/tests/codex/node/testslotrepair.nim @@ -0,0 +1,190 @@ +import std/options +import std/importutils + +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/stint +import pkg/taskpools + +import pkg/nitro +import pkg/codexdht/discv5/protocol as discv5 + +import pkg/codex/logutils +import pkg/codex/stores +import pkg/codex/contracts +import pkg/codex/blockexchange +import pkg/codex/chunker +import pkg/codex/slots +import pkg/codex/manifest +import pkg/codex/discovery +import pkg/codex/erasure +import pkg/codex/blocktype as bt +import pkg/codex/indexingstrategy +import pkg/codex/nat +import pkg/codex/utils/natutils +import pkg/chronos/transports/stream + +import pkg/codex/node {.all.} + +import ../../asynctest +import ../../examples +import ../helpers + +privateAccess(CodexNodeRef) # enable access to private fields + +logScope: + topics = "testSlotRepair" + +proc nextFreePort*(startPort: int): Future[int] {.async.} = + proc client(server: StreamServer, transp: StreamTransport) {.async: (raises: []).} = + await transp.closeWait() + + var port = startPort + while true: + try: + let host = initTAddress("127.0.0.1", port) + var server = createStreamServer(host, client, {ReuseAddr}) + await server.closeWait() + return port + except TransportOsError: + inc port + +asyncchecksuite "Test Node - Slot Repair": + var + manifest: Manifest + builder: Poseidon2Builder + verifiable: Manifest + verifiableBlock: bt.Block + protected: Manifest + + localStores: seq[CacheStore] = newSeq[CacheStore]() + nodes: seq[CodexNodeRef] = newSeq[CodexNodeRef]() + + let + numNodes = 11 + numBlocks = 24 + ecK = 3 + ecM = 2 + + setup: + var bootstrapNodes: seq[SignedPeerRecord] = @[] + for i in 0 ..< numNodes: + let + listenPort = await nextFreePort(8080 + 2 * i) + bindPort = await nextFreePort(listenPort + 1) + listenAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/" & $listenPort).expect( + "invalid multiaddress" + ) + switch = newStandardSwitch( + transportFlags = {ServerFlags.ReuseAddr}, + sendSignedPeerRecord = true, + addrs = listenAddr, + ) + wallet = WalletRef.new(EthPrivateKey.random()) + network = BlockExcNetwork.new(switch) + peerStore = PeerCtxStore.new() + pendingBlocks = PendingBlocksManager.new() + blockDiscoveryStore = TempLevelDb.new().newDb() + localStore = CacheStore.new() + blockDiscovery = Discovery.new( + switch.peerInfo.privateKey, + announceAddrs = @[listenAddr], + bindPort = bindPort.Port, + store = blockDiscoveryStore, + bootstrapNodes = bootstrapNodes, + ) + discovery = DiscoveryEngine.new( + localStore, peerStore, network, blockDiscovery, pendingBlocks + ) + advertiser = Advertiser.new(localStore, blockDiscovery) + engine = BlockExcEngine.new( + localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks + ) + store = NetworkStore.new(engine, localStore) + node = CodexNodeRef.new( + switch = switch, + networkStore = store, + engine = engine, + prover = Prover.none, + discovery = blockDiscovery, + taskpool = Taskpool.new(), + ) + + await switch.peerInfo.update() + switch.mount(network) + + let (announceAddrs, discoveryAddrs) = nattedAddress( + NatConfig(hasExtIp: false, nat: NatNone), switch.peerInfo.addrs, bindPort.Port + ) + node.discovery.updateAnnounceRecord(announceAddrs) + node.discovery.updateDhtRecord(discoveryAddrs) + + check node.discovery.dhtRecord.isSome + bootstrapNodes.add !node.discovery.dhtRecord + + localStores.add localStore + nodes.add node + + for node in nodes: + await node.switch.start() + await node.start() + + let + localStore = localStores[0] + store = nodes[0].blockStore + + let blocks = + await makeRandomBlocks(datasetSize = numBlocks * 64.KiBs.int, blockSize = 64.KiBs) + assert blocks.len == numBlocks + + # Populate manifest in local store + manifest = await storeDataGetManifest(localStore, blocks) + let + manifestBlock = + bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new) + + (await localStore.putBlock(manifestBlock)).tryGet() + + protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() + builder = Poseidon2Builder.new(localStore, protected).tryGet() + verifiable = (await builder.buildManifest()).tryGet() + verifiableBlock = + bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() + + # Populate protected manifest in local store + (await localStore.putBlock(verifiableBlock)).tryGet() + + teardown: + for node in nodes: + await node.switch.stop() + localStores = @[] + nodes = @[] + + test "repair slot": + var request = StorageRequest.example + request.content.cid = verifiableBlock.cid + request.ask.slots = protected.numSlots.uint64 + request.ask.slotSize = DefaultBlockSize.uint64 + + for i in 0 ..< protected.numSlots.uint64: + (await nodes[i + 1].onStore(request, i, nil, isRepairing = false)).tryGet() + + await nodes[0].switch.stop() # acts as client + await nodes[1].switch.stop() # slot 0 missing now + await nodes[3].switch.stop() # slot 2 missing now + + # repair missing slot + (await nodes[6].onStore(request, 0.uint64, nil, isRepairing = true)).tryGet() + (await nodes[7].onStore(request, 2.uint64, nil, isRepairing = true)).tryGet() + + await nodes[2].switch.stop() # slot 1 missing now + await nodes[4].switch.stop() # slot 3 missing now + + (await nodes[8].onStore(request, 1.uint64, nil, isRepairing = true)).tryGet() + (await nodes[9].onStore(request, 3.uint64, nil, isRepairing = true)).tryGet() + + await nodes[5].switch.stop() # slot 4 missing now + + (await nodes[10].onStore(request, 4.uint64, nil, isRepairing = true)).tryGet() diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 0ee4fc50..c6f0154b 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -1,4 +1,5 @@ import ./node/testnode import ./node/testcontracts +import ./node/testslotrepair {.warning[UnusedImport]: off.} diff --git a/tests/integration/testslotrepair.nim b/tests/integration/testslotrepair.nim new file mode 100644 index 00000000..9e30afe2 --- /dev/null +++ b/tests/integration/testslotrepair.nim @@ -0,0 +1,315 @@ +import pkg/questionable +import pkg/codex/logutils +import ../contracts/time +import ../contracts/deployment +import ../codex/helpers +import ../examples +import ./marketplacesuite +import ./nodeconfigs + +export logutils + +logScope: + topics = "integration test slot repair" + +marketplacesuite "SP Slot Repair": + const minPricePerBytePerSecond = 1.u256 + const collateralPerByte = 1.u256 + const blocks = 3 + const ecNodes = 5 + const ecTolerance = 2 + + test "repair from local store", + NodeConfigs( + clients: CodexConfigs + .init(nodes = 1) + .debug() + .withLogFile() + .withLogTopics("node", "erasure").some, + providers: CodexConfigs + .init(nodes = 2) + .withSimulateProofFailures(idx = 1, failEveryNProofs = 1) + .debug() + .withLogFile() + .withLogTopics("marketplace", "sales", "reservations", "node", "statemachine").some, + validators: CodexConfigs + .init(nodes = 1) + .debug() + .withLogFile() + .withLogTopics("validator").some, + ): + let client0 = clients()[0] + let provider0 = providers()[0] + let provider1 = providers()[1] + let expiry = 10.periods + let duration = expiry + 10.periods + + let data = await RandomChunker.example(blocks = blocks) + let slotSize = slotSize(blocks, ecNodes, ecTolerance) + + let availability = ( + await provider0.client.postAvailability( + totalSize = 4 * slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 100 * slotSize * collateralPerByte, + ) + ).get + + discard await provider1.client.postAvailability( + totalSize = slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 100 * slotSize * collateralPerByte, + ) + + var filledSlotIds: seq[SlotId] = @[] + proc onSlotFilled(eventResult: ?!SlotFilled) = + assert not eventResult.isErr + let event = !eventResult + let slotId = slotId(event.requestId, event.slotIndex) + filledSlotIds.add slotId + + let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + + var freedSlotId = none SlotId + proc onSlotFreed(eventResult: ?!SlotFreed) = + assert not eventResult.isErr + let event = !eventResult + let slotId = slotId(event.requestId, event.slotIndex) + + assert slotId in filledSlotIds + + filledSlotIds.del(filledSlotIds.find(slotId)) + freedSlotId = some(slotId) + + let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + + let cid = (await client0.client.upload(data)).get + + let purchaseId = await client0.client.requestStorage( + cid, + expiry = expiry, + duration = duration, + nodes = ecNodes, + tolerance = ecTolerance, + proofProbability = 1.u256, + ) + + check eventually( + await client0.client.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000 + ) + + await client0.stop() + + await provider0.client.patchAvailability( + availabilityId = availability.id, + totalSize = (5 * slotSize.truncate(uint64)).uint64.some, + duration = duration.uint64.some, + minPricePerBytePerSecond = minPricePerBytePerSecond.some, + totalCollateral = (100 * slotSize * collateralPerByte).some, + ) + + check eventually(freedSlotId.isSome, timeout = (duration - expiry).int * 1000) + + check eventually( + freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000 + ) + + await filledSubscription.unsubscribe() + await slotFreedsubscription.unsubscribe() + + test "repair from remote store", + NodeConfigs( + clients: CodexConfigs + .init(nodes = 1) + .debug() + .withLogFile() + .withLogTopics("node", "erasure").some, + providers: CodexConfigs + .init(nodes = 3) + .withSimulateProofFailures(idx = 1, failEveryNProofs = 1) + .debug() + .withLogFile() + .withLogTopics("marketplace", "sales", "reservations", "node", "statemachine").some, + validators: CodexConfigs + .init(nodes = 1) + .debug() + .withLogFile() + .withLogTopics("validator").some, + ): + let client0 = clients()[0] + let provider0 = providers()[0] + let provider1 = providers()[1] + let provider2 = providers()[2] + let expiry = 10.periods + let duration = expiry + 10.periods + + let data = await RandomChunker.example(blocks = blocks) + let slotSize = slotSize(blocks, ecNodes, ecTolerance) + + discard await provider0.client.postAvailability( + totalSize = 4 * slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 100 * slotSize * collateralPerByte, + ) + + discard await provider1.client.postAvailability( + totalSize = slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 100 * slotSize * collateralPerByte, + ) + + var filledSlotIds: seq[SlotId] = @[] + proc onSlotFilled(eventResult: ?!SlotFilled) = + assert not eventResult.isErr + let event = !eventResult + let slotId = slotId(event.requestId, event.slotIndex) + filledSlotIds.add slotId + + let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + + var freedSlotId = none SlotId + proc onSlotFreed(eventResult: ?!SlotFreed) = + assert not eventResult.isErr + let event = !eventResult + let slotId = slotId(event.requestId, event.slotIndex) + + assert slotId in filledSlotIds + + filledSlotIds.del(filledSlotIds.find(slotId)) + freedSlotId = some(slotId) + + let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + + let cid = (await client0.client.upload(data)).get + + let purchaseId = await client0.client.requestStorage( + cid, + expiry = expiry, + duration = duration, + nodes = ecNodes, + tolerance = ecTolerance, + proofProbability = 1.u256, + ) + + check eventually( + await client0.client.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000 + ) + + await client0.stop() + + discard await provider2.client.postAvailability( + totalSize = slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 100 * slotSize * collateralPerByte, + ) + + check eventually(freedSlotId.isSome, timeout = (duration - expiry).int * 1000) + + await provider1.stop() + + check eventually( + freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000 + ) + + await filledSubscription.unsubscribe() + await slotFreedsubscription.unsubscribe() + + test "storage provider slot repair", + NodeConfigs( + clients: CodexConfigs + .init(nodes = 1) + .debug() + .withLogFile() + .withLogTopics("node", "erasure").some, + providers: CodexConfigs + .init(nodes = 4) + .debug() + .withLogFile() + .withLogTopics("marketplace", "sales", "reservations", "node").some, + validators: CodexConfigs + .init(nodes = 1) + .debug() + .withLogFile() + .withLogTopics("validator").some, + ): + let client0 = clients()[0] + let expiry = 10.periods + let duration = expiry + 10.periods + let size = 0xFFFFFF.uint64 + + let data = await RandomChunker.example(blocks = blocks) + let datasetSize = + datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) + + await createAvailabilities( + size, duration, datasetSize * collateralPerByte, minPricePerBytePerSecond + ) + + let cid = (await client0.client.upload(data)).get + + let purchaseId = await client0.client.requestStorage( + cid, + expiry = expiry, + duration = duration, + collateralPerByte = collateralPerByte, + nodes = ecNodes, + tolerance = ecTolerance, + proofProbability = 1.u256, + pricePerBytePerSecond = minPricePerBytePerSecond, + ) + + let requestId = (await client0.client.requestId(purchaseId)).get + + var filledSlotIds: seq[SlotId] = @[] + proc onSlotFilled(eventResult: ?!SlotFilled) = + assert not eventResult.isErr + let event = !eventResult + if event.requestId == requestId: + let slotId = slotId(event.requestId, event.slotIndex) + filledSlotIds.add slotId + + let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + + check eventually( + await client0.client.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000 + ) + + await client0.stop() + + check eventually( + filledSlotIds.len == blocks, timeout = (duration - expiry).int * 1000 + ) + trace "all slots have been filled" + + var slotWasFreed = false + proc onSlotFreed(event: ?!SlotFreed) = + if event.isOk and event.value.requestId == requestId: + trace "slot was freed", slotIndex = $event.value.slotIndex + slotWasFreed = true + + let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + + block provider_search: + while true: + for slotId in filledSlotIds: + for provider in providers(): + if (await provider.client.saleStateIs(slotId, "SaleProving")): + await provider.stop() + break provider_search + await sleepAsync(100.milliseconds) + + check eventually(slotWasFreed, timeout = (duration - expiry).int * 1000) + + await slotFreedsubscription.unsubscribe() + + check eventually( + filledSlotIds.len > blocks, timeout = (duration - expiry).int * 1000 + ) + trace "freed slot was filled" + + await filledSubscription.unsubscribe() diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index 152d22dd..07a69749 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -9,5 +9,6 @@ import ./integration/testmarketplace import ./integration/testproofs import ./integration/testvalidator import ./integration/testecbug +import ./integration/testslotrepair {.warning[UnusedImport]: off.}