diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index d094c454..cb8fd921 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -126,16 +126,16 @@ proc start*(b: Advertiser) {.async: (raises: []).} = trace "Advertiser start" + if b.advertiserRunning: + warn "Starting advertiser twice" + return + proc onBlock(cid: Cid) {.async.} = await b.advertiseBlock(cid) doAssert(b.localStore.onBlockStored.isNone()) b.localStore.onBlockStored = onBlock.some - if b.advertiserRunning: - warn "Starting advertiser twice" - return - b.advertiserRunning = true for i in 0 ..< b.concurrentAdvReqs: let fut = b.processQueueLoop() diff --git a/codex/codex.nim b/codex/codex.nim index 391a94fc..2b52f492 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -56,6 +56,7 @@ type codexNode: CodexNodeRef repoStore: RepoStore maintenance: BlockMaintainer + taskpool: Taskpool CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet @@ -191,6 +192,9 @@ proc stop*(s: CodexServer) {.async.} = error "Failed to stop codex node", failures = res.failure.len raiseAssert "Failed to stop codex node" + if not s.taskpool.isNil: + s.taskpool.shutdown() + proc new*( T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey ): CodexServer = @@ -331,4 +335,5 @@ proc new*( restServer: restServer, repoStore: repoStore, maintenance: maintenance, + taskpool: taskpool, ) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 884969d0..2ca8b113 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -82,6 +82,7 @@ type rounded: Natural steps: Natural blocksCount: Natural + totalGroups: Natural strategy: StrategyType ErasureError* = object of CodexError @@ -166,11 +167,14 @@ proc prepareEncodingData( let strategy = params.strategy.init( - firstIndex = 0, lastIndex = params.rounded - 1, iterations = params.steps + firstIndex = 0, + lastIndex = params.rounded - 1, + iterations = params.steps, + totalGroups = params.totalGroups, ) - indicies = toSeq(strategy.getIndicies(step)) + indices = toSeq(strategy.getIndices(step)) pendingBlocksIter = - self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount)) + self.getPendingBlocks(manifest, indices.filterIt(it < manifest.blocksCount)) var resolved = 0 for fut in pendingBlocksIter: @@ -185,7 +189,7 @@ proc prepareEncodingData( resolved.inc() - for idx in indicies.filterIt(it >= manifest.blocksCount): + for idx in indices.filterIt(it >= manifest.blocksCount): let pos = indexToPos(params.steps, idx, step) trace "Padding with empty block", idx shallowCopy(data[pos], emptyBlock) @@ -216,10 +220,13 @@ proc prepareDecodingData( let strategy = encoded.protectedStrategy.init( - firstIndex = 0, lastIndex = encoded.blocksCount - 1, iterations = encoded.steps + firstIndex = 0, + lastIndex = encoded.blocksCount - 1, + iterations = encoded.steps, + totalGroups = encoded.numSlots, ) - indicies = toSeq(strategy.getIndicies(step)) - pendingBlocksIter = self.getPendingBlocks(encoded, indicies) + indices = toSeq(strategy.getIndices(step)) + pendingBlocksIter = self.getPendingBlocks(encoded, indices) var dataPieces = 0 @@ -281,6 +288,7 @@ proc init*( rounded = roundUp(manifest.blocksCount, ecK) steps = divUp(rounded, ecK) blocksCount = rounded + (steps * ecM) + totalGroups = ecK + ecM success EncodingParams( ecK: ecK, @@ -288,6 +296,7 @@ proc init*( rounded: rounded, steps: steps, blocksCount: blocksCount, + totalGroups: totalGroups, strategy: strategy, ) @@ -384,6 +393,8 @@ proc encodeData( var data = seq[seq[byte]].new() # number of blocks to encode parity = createDoubleArray(params.ecM, manifest.blockSize.int) + defer: + freeDoubleArray(parity, params.ecM) data[].setLen(params.ecK) # TODO: this is a tight blocking loop so we sleep here to allow @@ -408,8 +419,6 @@ proc encodeData( return failure(err) except CancelledError as exc: raise exc - finally: - freeDoubleArray(parity, params.ecM) var idx = params.rounded + step for j in 0 ..< params.ecM: @@ -582,6 +591,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = data = seq[seq[byte]].new() parityData = seq[seq[byte]].new() recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int) + defer: + freeDoubleArray(recovered, encoded.ecK) data[].setLen(encoded.ecK) # set len to K parityData[].setLen(encoded.ecM) # set len to M @@ -608,8 +619,6 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = return failure(err) except CancelledError as exc: raise exc - finally: - freeDoubleArray(recovered, encoded.ecK) for i in 0 ..< encoded.ecK: let idx = i * encoded.steps + step @@ -659,6 +668,137 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = return decoded.success +proc repair*(self: Erasure, encoded: Manifest, slotIdx: int): Future[?!void] {.async.} = + ## Repair a protected manifest slot + ## + ## `encoded` - the encoded (protected) manifest to + ## be repaired + ## + logScope: + steps = encoded.steps + rounded_blocks = encoded.rounded + new_manifest = encoded.blocksCount + + var + cids = seq[Cid].new() + decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM) + emptyBlock = newSeq[byte](encoded.blockSize.int) + + cids[].setLen(encoded.blocksCount) + try: + for step in 0 ..< encoded.steps: + await sleepAsync(10.millis) + + var + data = seq[seq[byte]].new() + parityData = seq[seq[byte]].new() + recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int) + + data[].setLen(encoded.ecK) + parityData[].setLen(encoded.ecM) + + without (dataPieces, _) =? ( + await self.prepareDecodingData( + encoded, step, data, parityData, cids, emptyBlock + ) + ), err: + trace "Unable to prepare decoding data", error = err.msg + return failure(err) + + if dataPieces >= encoded.ecK: + trace "Retrieved all the required data blocks for this step" + continue + + trace "Erasure decoding data" + try: + if err =? ( + await self.asyncDecode( + encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered + ) + ).errorOption: + return failure(err) + except CancelledError as exc: + raise exc + finally: + freeDoubleArray(recovered, encoded.ecK) + + for i in 0 ..< encoded.ecK: + let idx = i * encoded.steps + step + if data[i].len <= 0 and not cids[idx].isEmpty: + var innerPtr: ptr UncheckedArray[byte] = recovered[][i] + + without blk =? bt.Block.new( + innerPtr.toOpenArray(0, encoded.blockSize.int - 1) + ), error: + trace "Unable to create data block!", exc = error.msg + return failure(error) + + trace "Recovered data block", cid = blk.cid, index = i + if isErr (await self.store.putBlock(blk)): + trace "Unable to store data block!", cid = blk.cid + return failure("Unable to store data block!") + + cids[idx] = blk.cid + except CancelledError as exc: + trace "Erasure coding decoding cancelled" + raise exc # cancellation needs to be propagated + except CatchableError as exc: + trace "Erasure coding decoding error", exc = exc.msg + return failure(exc) + finally: + decoder.release() + + without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: + return failure(err) + + without treeCid =? tree.rootCid, err: + return failure(err) + + if treeCid != encoded.originalTreeCid: + return failure( + "Original tree root differs from the tree root computed out of recovered data" + ) + + if err =? (await self.store.putAllProofs(tree)).errorOption: + return failure(err) + + without repaired =? ( + await self.encode( + Manifest.new(encoded), encoded.ecK, encoded.ecM, encoded.protectedStrategy + ) + ), err: + return failure(err) + + if repaired.treeCid != encoded.treeCid: + return failure( + "Original tree root differs from the repaired tree root encoded out of recovered data" + ) + + let + strategy = + ?encoded.protectedStrategy.init( + firstIndex = 0, + lastIndex = encoded.blocksCount - 1, + iterations = encoded.steps, + totalGroups = encoded.numSlots, + ).catch + groupIndices = ?toSeq(strategy.getGroupIndices(slotIdx)).catch + + for step in 0 ..< encoded.steps: + let indices = strategy.getIndices(step) + for i in indices: + if i notin groupIndices: + if isErr (await self.store.delBlock(encoded.treeCid, i)): + trace "Failed to remove block from tree ", + treeCid = encoded.treeCid, index = i + + for i, cid in cids[0 ..< encoded.originalBlocksCount]: + if i notin groupIndices: + if isErr (await self.store.delBlock(treeCid, i)): + trace "Failed to remove original block from tree ", treeCid = treeCid, index = i + + return success() + proc start*(self: Erasure) {.async.} = return diff --git a/codex/indexingstrategy.nim b/codex/indexingstrategy.nim index 063ecd98..9d64936c 100644 --- a/codex/indexingstrategy.nim +++ b/codex/indexingstrategy.nim @@ -24,13 +24,17 @@ type IndexingError* = object of CodexError IndexingWrongIndexError* = object of IndexingError IndexingWrongIterationsError* = object of IndexingError + IndexingWrongTotalGroupsError* = object of IndexingError + IndexingWrongNumPadGroupBlocksError* = object of IndexingError IndexingStrategy* = object - strategyType*: StrategyType + strategyType*: StrategyType # Strategy algorithm firstIndex*: int # Lowest index that can be returned lastIndex*: int # Highest index that can be returned - iterations*: int # getIndices(iteration) will run from 0 ..< iterations - step*: int + iterations*: int # Number of iterations (0 ..< iterations) + step*: int # Step size between indices + totalGroups*: int # Total number of groups to distribute indices into + numPadGroupBlocks*: int # Optional number of padding blocks per group func checkIteration( self: IndexingStrategy, iteration: int @@ -44,7 +48,7 @@ func getIter(first, last, step: int): Iter[int] = {.cast(noSideEffect).}: Iter[int].new(first, last, step) -func getLinearIndicies( +func getLinearIndices( self: IndexingStrategy, iteration: int ): Iter[int] {.raises: [IndexingError].} = self.checkIteration(iteration) @@ -55,7 +59,7 @@ func getLinearIndicies( getIter(first, last, 1) -func getSteppedIndicies( +func getSteppedIndices( self: IndexingStrategy, iteration: int ): Iter[int] {.raises: [IndexingError].} = self.checkIteration(iteration) @@ -66,17 +70,54 @@ func getSteppedIndicies( getIter(first, last, self.iterations) -func getIndicies*( +func getIndices*( self: IndexingStrategy, iteration: int ): Iter[int] {.raises: [IndexingError].} = + ## defines the layout of blocks per encoding iteration (data + parity) + ## + case self.strategyType of StrategyType.LinearStrategy: - self.getLinearIndicies(iteration) + self.getLinearIndices(iteration) of StrategyType.SteppedStrategy: - self.getSteppedIndicies(iteration) + self.getSteppedIndices(iteration) + +func getGroupIndices*( + self: IndexingStrategy, groupIndex: int +): Iter[int] {.raises: [IndexingError].} = + ## defines failure recovery groups by selecting specific block indices + ## from each encoding step (using getIndices) + ## + + {.cast(noSideEffect).}: + Iter[int].new( + iterator (): int {.raises: [IndexingError], gcsafe.} = + var idx = groupIndex + for step in 0 ..< self.iterations: + var + current = 0 + found = false + for value in self.getIndices(step): + if current == idx: + yield value + found = true + break + inc current + if not found: + raise newException( + IndexingError, "groupIndex exceeds indices length in iteration " & $step + ) + idx = (idx + 1) mod self.totalGroups + + for i in 0 ..< self.numPadGroupBlocks: + yield self.lastIndex + (groupIndex + 1) + i * self.totalGroups + + ) func init*( - strategy: StrategyType, firstIndex, lastIndex, iterations: int + strategy: StrategyType, + firstIndex, lastIndex, iterations, totalGroups: int, + numPadGroupBlocks = 0.int, ): IndexingStrategy {.raises: [IndexingError].} = if firstIndex > lastIndex: raise newException( @@ -91,10 +132,25 @@ func init*( "iterations (" & $iterations & ") must be greater than zero.", ) + if totalGroups <= 0: + raise newException( + IndexingWrongTotalGroupsError, + "totalGroups (" & $totalGroups & ") must be greater than zero.", + ) + + if numPadGroupBlocks < 0: + raise newException( + IndexingWrongNumPadGroupBlocksError, + "numPadGroupBlocks (" & $numPadGroupBlocks & + ") must be equal or greater than zero.", + ) + IndexingStrategy( strategyType: strategy, firstIndex: firstIndex, lastIndex: lastIndex, iterations: iterations, + totalGroups: totalGroups, step: divUp((lastIndex - firstIndex + 1), iterations), + numPadGroupBlocks: numPadGroupBlocks, ) diff --git a/codex/node.nim b/codex/node.nim index fb653c0d..5737b372 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -182,32 +182,35 @@ proc fetchBatched*( # (i: int) => self.networkStore.getBlock(BlockAddress.init(cid, i)) # ) - while not iter.finished: - let blockFutures = collect: - for i in 0 ..< batchSize: - if not iter.finished: - let address = BlockAddress.init(cid, iter.next()) - if not (await address in self.networkStore) or fetchLocal: - self.networkStore.getBlock(address) + try: + while not iter.finished: + let blockFutures = collect: + for i in 0 ..< batchSize: + if not iter.finished: + let address = BlockAddress.init(cid, iter.next()) + if not (await address in self.networkStore) or fetchLocal: + self.networkStore.getBlock(address) - without blockResults =? await allFinishedValues(blockFutures), err: - trace "Some blocks failed to fetch", err = err.msg - return failure(err) + without blockResults =? await allFinishedValues(blockFutures), err: + trace "Some blocks failed to fetch", err = err.msg + return failure(err) - let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value) + let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value) - let numOfFailedBlocks = blockResults.len - blocks.len - if numOfFailedBlocks > 0: - return - failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")") + let numOfFailedBlocks = blockResults.len - blocks.len + if numOfFailedBlocks > 0: + return + failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")") - if not onBatch.isNil and batchErr =? (await onBatch(blocks)).errorOption: - return failure(batchErr) + if not onBatch.isNil and batchErr =? (await onBatch(blocks)).errorOption: + return failure(batchErr) - if not iter.finished: - await sleepAsync(1.millis) + if not iter.finished: + await sleepAsync(1.millis) - success() + success() + except IndexingError as e: + failure(e.msg) proc fetchBatched*( self: CodexNodeRef, @@ -629,10 +632,6 @@ proc onStore( trace "Received a request to store a slot" - # TODO: Use the isRepairing to manage the slot download. - # If isRepairing is true, the slot has to be repaired before - # being downloaded. - without manifest =? (await self.fetchManifest(cid)), err: trace "Unable to fetch manifest for cid", cid, err = err.msg return failure(err) @@ -665,32 +664,46 @@ proc onStore( return success() - without indexer =? - manifest.verifiableStrategy.init(0, manifest.blocksCount - 1, manifest.numSlots).catch, - err: - trace "Unable to create indexing strategy from protected manifest", err = err.msg - return failure(err) - if slotIdx > int.high.uint64: error "Cannot cast slot index to int", slotIndex = slotIdx return - without blksIter =? indexer.getIndicies(slotIdx.int).catch, err: - trace "Unable to get indicies from strategy", err = err.msg - return failure(err) + if isRepairing: + trace "start repairing slot", slotIdx + try: + let erasure = Erasure.new( + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + ) + if err =? (await erasure.repair(manifest, slotIdx.int)).errorOption: + error "Unable to erasure decode repairing manifest", + cid = manifest.treeCid, exc = err.msg + return failure(err) + except CatchableError as exc: + error "Error erasure decoding repairing manifest", + cid = manifest.treeCid, exc = exc.msg + return failure(exc.msg) + else: + without indexer =? + manifest.verifiableStrategy.init( + 0, manifest.blocksCount - 1, manifest.steps, manifest.numSlots + ).catch, err: + trace "Unable to create indexing strategy from protected manifest", err = err.msg + return failure(err) - if err =? ( - await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry) - ).errorOption: - trace "Unable to fetch blocks", err = err.msg - return failure(err) + without blksIter =? indexer.getGroupIndices(slotIdx.int).catch, err: + trace "Unable to get indices from strategy", err = err.msg + return failure(err) + + if err =? ( + await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry) + ).errorOption: + trace "Unable to fetch blocks", err = err.msg + return failure(err) without slotRoot =? (await builder.buildSlot(slotIdx.int)), err: trace "Unable to build slot", err = err.msg return failure(err) - trace "Slot successfully retrieved and reconstructed" - if cid =? slotRoot.toSlotCid() and cid != manifest.slotRoots[slotIdx]: trace "Slot root mismatch", manifest = manifest.slotRoots[slotIdx.int], recovered = slotRoot.toSlotCid() @@ -831,9 +844,6 @@ proc start*(self: CodexNodeRef) {.async.} = proc stop*(self: CodexNodeRef) {.async.} = trace "Stopping node" - if not self.taskpool.isNil: - self.taskpool.shutdown() - await self.trackedFutures.cancelTracked() if not self.engine.isNil: diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 1ea57a0f..54a5d70d 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -113,18 +113,19 @@ func numSlotCells*[T, H](self: SlotsBuilder[T, H]): Natural = self.numBlockCells * self.numSlotBlocks -func slotIndiciesIter*[T, H](self: SlotsBuilder[T, H], slot: Natural): ?!Iter[int] = +func slotIndicesIter*[T, H](self: SlotsBuilder[T, H], slot: Natural): Iter[int] = ## Returns the slot indices. ## - self.strategy.getIndicies(slot).catch + self.strategy.getGroupIndicies(slot) -func slotIndicies*[T, H](self: SlotsBuilder[T, H], slot: Natural): seq[int] = +func slotIndices*[T, H]( + self: SlotsBuilder[T, H], slot: Natural +): seq[int] {.raises: [IndexingError].} = ## Returns the slot indices. ## - if iter =? self.strategy.getIndicies(slot).catch: - return toSeq(iter) + toSeq(self.strategy.getGroupIndices(slot)) func manifest*[T, H](self: SlotsBuilder[T, H]): Manifest = ## Returns the manifest. @@ -183,22 +184,25 @@ proc getCellHashes*[T, H]( numberOfSlots = numberOfSlots slotIndex = slotIndex - let hashes = collect(newSeq): - for i, blkIdx in self.strategy.getIndicies(slotIndex): - logScope: - blkIdx = blkIdx - pos = i + try: + let hashes = collect(newSeq): + for i, blkIdx in self.strategy.getGroupIndices(slotIndex): + logScope: + blkIdx = blkIdx + pos = i - trace "Getting block CID for tree at index" - without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root, - err: - error "Failed to get block CID for tree at index", err = err.msg - return failure(err) + trace "Getting block CID for tree at index" + without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and + digest =? tree.root, err: + error "Failed to get block CID for tree at index", err = err.msg + return failure(err) - trace "Get block digest", digest = digest.toHex - digest + trace "Get block digest", digest = digest.toHex + digest - success hashes + success hashes + except IndexingError as e: + failure e.msg proc buildSlotTree*[T, H]( self: SlotsBuilder[T, H], slotIndex: Natural @@ -346,7 +350,14 @@ proc new*[T, H]( emptyBlock = newSeq[byte](manifest.blockSize.int) emptyDigestTree = ?T.digestTree(emptyBlock, cellSize.int) - strategy = ?strategy.init(0, numBlocksTotal - 1, manifest.numSlots).catch + strategy = + ?strategy.init( + firstIndex = 0, + lastIndex = manifest.blocksCount - 1, + iterations = manifest.steps, + totalGroups = manifest.numSlots, + numPadGroupBlocks = numPadSlotBlocks, + ).catch logScope: numSlotBlocks = numSlotBlocks diff --git a/codex/slots/sampler/sampler.nim b/codex/slots/sampler/sampler.nim index bccdaff2..6107f4f0 100644 --- a/codex/slots/sampler/sampler.nim +++ b/codex/slots/sampler/sampler.nim @@ -53,8 +53,8 @@ proc getSample*[T, H]( cellsPerBlock = self.builder.numBlockCells blkCellIdx = cellIdx.toCellInBlk(cellsPerBlock) # block cell index blkSlotIdx = cellIdx.toBlkInSlot(cellsPerBlock) # slot tree index - origBlockIdx = self.builder.slotIndicies(self.index)[blkSlotIdx] - # convert to original dataset block index + slotIndices = ?self.builder.slotIndices(self.index).catch + origBlockIdx = slotIndices[blkSlotIdx] # convert to original dataset block index logScope: cellIdx = cellIdx diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index f94bca33..88297831 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -128,6 +128,13 @@ method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] = trace "Deleting block from network store", cid return self.localStore.delBlock(cid) +method delBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!void] = + ## Delete a block from the blockstore + ## + + trace "Deleting block from network store", treeCid, index + return self.localStore.delBlock(treeCid, index) + {.pop.} method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} = diff --git a/codex/utils/iter.nim b/codex/utils/iter.nim index 17ba8d8b..6a2f5459 100644 --- a/codex/utils/iter.nim +++ b/codex/utils/iter.nim @@ -7,6 +7,7 @@ type Function*[T, U] = proc(fut: T): U {.raises: [CatchableError], gcsafe, closure.} IsFinished* = proc(): bool {.raises: [], gcsafe, closure.} GenNext*[T] = proc(): T {.raises: [CatchableError], gcsafe.} + Iterable[T] = iterator (): T Iter*[T] = ref object finished: bool next*: GenNext[T] @@ -89,6 +90,36 @@ proc new*[T](_: type Iter[T], items: seq[T]): Iter[T] = Iter[int].new(0 ..< items.len).map((i: int) => items[i]) +proc new*[T](_: type Iter[T], iter: Iterable[T]): Iter[T] = + ## Creates a new Iter from an iterator + ## + var nextOrErr: Option[Result[T, ref CatchableError]] + proc tryNext(): void = + nextOrErr = Result[T, ref CatchableError].none + while not iter.finished: + try: + let t: T = iter() + if not iter.finished: + nextOrErr = some(success(t)) + break + except CatchableError as err: + nextOrErr = some(T.failure(err)) + except Exception: + assert(false) + + proc genNext(): T {.raises: [CatchableError].} = + without u =? nextOrErr.unsafeGet, err: + raise err + + tryNext() + return u + + proc isFinished(): bool = + nextOrErr.isNone + + tryNext() + Iter[T].new(genNext, isFinished) + proc empty*[T](_: type Iter[T]): Iter[T] = ## Creates an empty Iter ## diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index 73dd8daf..1332625a 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -125,14 +125,14 @@ asyncchecksuite "Test Node - Host contracts": fetchedBytes += blk.data.len.uint return success() - (await onStore(request, 1.uint64, onBlocks, isRepairing = false)).tryGet() + (await onStore(request, 0.uint64, onBlocks, isRepairing = false)).tryGet() check fetchedBytes == 12 * DefaultBlockSize.uint let indexer = verifiable.protectedStrategy.init( - 0, verifiable.numSlotBlocks() - 1, verifiable.numSlots + 0, verifiable.blocksCount - 1, verifiable.steps, verifiable.numSlots ) - for index in indexer.getIndicies(1): + for index in indexer.getGroupIndices(0.int): let blk = (await localStore.getBlock(verifiable.treeCid, index)).tryGet key = (createBlockExpirationMetadataKey(blk.cid)).tryGet diff --git a/tests/codex/node/testslotrepair.nim b/tests/codex/node/testslotrepair.nim new file mode 100644 index 00000000..7dfe56ad --- /dev/null +++ b/tests/codex/node/testslotrepair.nim @@ -0,0 +1,316 @@ +import std/options +import std/importutils + +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/stint +import pkg/taskpools + +import pkg/nitro +import pkg/codexdht/discv5/protocol as discv5 + +import pkg/codex/logutils +import pkg/codex/stores +import pkg/codex/contracts +import pkg/codex/blockexchange +import pkg/codex/chunker +import pkg/codex/slots +import pkg/codex/systemclock +import pkg/codex/manifest +import pkg/codex/discovery +import pkg/codex/erasure +import pkg/codex/blocktype as bt +import pkg/codex/indexingstrategy +import pkg/codex/nat +import pkg/codex/utils/natutils +import pkg/chronos/transports/stream + +import pkg/codex/node {.all.} + +import ../../asynctest +import ../../examples +import ../helpers + +privateAccess(CodexNodeRef) # enable access to private fields + +logScope: + topics = "testSlotRepair" + +proc nextFreePort*(startPort: int): Future[int] {.async.} = + proc client(server: StreamServer, transp: StreamTransport) {.async: (raises: []).} = + await transp.closeWait() + + var port = startPort + while true: + try: + let host = initTAddress("127.0.0.1", port) + var server = createStreamServer(host, client, {ReuseAddr}) + await server.closeWait() + return port + except TransportOsError: + inc port + +proc fetchStreamData(stream: LPStream, datasetSize: int): Future[seq[byte]] {.async.} = + var buf = newSeqUninitialized[byte](datasetSize) + while not stream.atEof: + var length = await stream.readOnce(addr buf[0], buf.len) + if length <= 0: + break + assert buf.len == datasetSize + buf + +proc flatten[T](s: seq[seq[T]]): seq[T] = + var t = newSeq[T](0) + for ss in s: + t &= ss + return t + +asyncchecksuite "Test Node - Slot Repair": + var + manifest: Manifest + builder: Poseidon2Builder + verifiable: Manifest + verifiableBlock: bt.Block + protected: Manifest + + tempLevelDbs: seq[TempLevelDb] = newSeq[TempLevelDb]() + localStores: seq[RepoStore] = newSeq[RepoStore]() + nodes: seq[CodexNodeRef] = newSeq[CodexNodeRef]() + taskpool: Taskpool + + let numNodes = 12 + + setup: + taskpool = Taskpool.new() + var bootstrapNodes: seq[SignedPeerRecord] = @[] + for i in 0 ..< numNodes: + let + listenPort = await nextFreePort(8080 + 2 * i) + bindPort = await nextFreePort(listenPort + 1) + listenAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/" & $listenPort).expect( + "invalid multiaddress" + ) + switch = newStandardSwitch( + transportFlags = {ServerFlags.ReuseAddr}, + sendSignedPeerRecord = true, + addrs = listenAddr, + ) + wallet = WalletRef.new(EthPrivateKey.random()) + network = BlockExcNetwork.new(switch) + peerStore = PeerCtxStore.new() + pendingBlocks = PendingBlocksManager.new() + bdStore = TempLevelDb.new() + blockDiscoveryStore = bdStore.newDb() + repoStore = TempLevelDb.new() + mdStore = TempLevelDb.new() + localStore = + RepoStore.new(repoStore.newDb(), mdStore.newDb(), clock = SystemClock.new()) + blockDiscovery = Discovery.new( + switch.peerInfo.privateKey, + announceAddrs = @[listenAddr], + bindPort = bindPort.Port, + store = blockDiscoveryStore, + bootstrapNodes = bootstrapNodes, + ) + discovery = DiscoveryEngine.new( + localStore, peerStore, network, blockDiscovery, pendingBlocks + ) + advertiser = Advertiser.new(localStore, blockDiscovery) + engine = BlockExcEngine.new( + localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks + ) + store = NetworkStore.new(engine, localStore) + node = CodexNodeRef.new( + switch = switch, + networkStore = store, + engine = engine, + prover = Prover.none, + discovery = blockDiscovery, + taskpool = taskpool, + ) + + await localStore.start() + await switch.peerInfo.update() + switch.mount(network) + + let (announceAddrs, discoveryAddrs) = nattedAddress( + NatConfig(hasExtIp: false, nat: NatNone), switch.peerInfo.addrs, bindPort.Port + ) + node.discovery.updateAnnounceRecord(announceAddrs) + node.discovery.updateDhtRecord(discoveryAddrs) + + check node.discovery.dhtRecord.isSome + bootstrapNodes.add !node.discovery.dhtRecord + + tempLevelDbs.add bdStore + tempLevelDbs.add repoStore + tempLevelDbs.add mdStore + localStores.add localStore + nodes.add node + + for node in nodes: + await node.switch.start() + await node.start() + + teardown: + for node in nodes: + await node.switch.stop() + await node.stop() + for s in tempLevelDbs: + await s.destroyDb() + for l in localStores: + await l.stop() + taskpool.shutdown() + localStores = @[] + nodes = @[] + tempLevelDbs = @[] + + test "repair slots (2,1)": + let + numBlocks = 5 + datasetSize = numBlocks * DefaultBlockSize.int + ecK = 2 + ecM = 1 + localStore = localStores[0] + store = nodes[0].blockStore + blocks = + await makeRandomBlocks(datasetSize = datasetSize, blockSize = DefaultBlockSize) + data = ( + block: + collect(newSeq) do: + for blk in blocks: + blk.data + ).flatten() + assert blocks.len == numBlocks + + # Populate manifest in local store + manifest = await storeDataGetManifest(localStore, blocks) + let + manifestBlock = + bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) + + (await localStore.putBlock(manifestBlock)).tryGet() + + protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() + builder = Poseidon2Builder.new(localStore, protected).tryGet() + verifiable = (await builder.buildManifest()).tryGet() + verifiableBlock = + bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() + + # Populate protected manifest in local store + (await localStore.putBlock(verifiableBlock)).tryGet() + + var request = StorageRequest.example + request.content.cid = verifiableBlock.cid + request.ask.slots = protected.numSlots.uint64 + request.ask.slotSize = DefaultBlockSize.uint64 + + for i in 0 ..< protected.numSlots.uint64: + (await nodes[i + 1].onStore(request, i, nil, isRepairing = false)).tryGet() + + await nodes[0].switch.stop() # acts as client + await nodes[1].switch.stop() # slot 0 missing now + + # repair missing slot + (await nodes[4].onStore(request, 0.uint64, nil, isRepairing = true)).tryGet() + + await nodes[2].switch.stop() # slot 1 missing now + + (await nodes[5].onStore(request, 1.uint64, nil, isRepairing = true)).tryGet() + + await nodes[3].switch.stop() # slot 2 missing now + + (await nodes[6].onStore(request, 2.uint64, nil, isRepairing = true)).tryGet() + + await nodes[4].switch.stop() # slot 0 missing now + + # repair missing slot from repaired slots + (await nodes[7].onStore(request, 0.uint64, nil, isRepairing = true)).tryGet() + + await nodes[5].switch.stop() # slot 1 missing now + + # repair missing slot from repaired slots + (await nodes[8].onStore(request, 1.uint64, nil, isRepairing = true)).tryGet() + + await nodes[6].switch.stop() # slot 2 missing now + + # repair missing slot from repaired slots + (await nodes[9].onStore(request, 2.uint64, nil, isRepairing = true)).tryGet() + + let + stream = (await nodes[10].retrieve(verifiableBlock.cid, local = false)).tryGet() + expectedData = await fetchStreamData(stream, datasetSize) + assert expectedData.len == data.len + assert expectedData == data + + test "repair slots (3,2)": + let + numBlocks = 40 + datasetSize = numBlocks * DefaultBlockSize.int + ecK = 3 + ecM = 2 + localStore = localStores[0] + store = nodes[0].blockStore + blocks = + await makeRandomBlocks(datasetSize = datasetSize, blockSize = DefaultBlockSize) + data = ( + block: + collect(newSeq) do: + for blk in blocks: + blk.data + ).flatten() + assert blocks.len == numBlocks + + # Populate manifest in local store + manifest = await storeDataGetManifest(localStore, blocks) + let + manifestBlock = + bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) + + (await localStore.putBlock(manifestBlock)).tryGet() + + protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() + builder = Poseidon2Builder.new(localStore, protected).tryGet() + verifiable = (await builder.buildManifest()).tryGet() + verifiableBlock = + bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() + + # Populate protected manifest in local store + (await localStore.putBlock(verifiableBlock)).tryGet() + + var request = StorageRequest.example + request.content.cid = verifiableBlock.cid + request.ask.slots = protected.numSlots.uint64 + request.ask.slotSize = DefaultBlockSize.uint64 + + for i in 0 ..< protected.numSlots.uint64: + (await nodes[i + 1].onStore(request, i, nil, isRepairing = false)).tryGet() + + await nodes[0].switch.stop() # acts as client + await nodes[1].switch.stop() # slot 0 missing now + await nodes[3].switch.stop() # slot 2 missing now + + # repair missing slots + (await nodes[6].onStore(request, 0.uint64, nil, isRepairing = true)).tryGet() + (await nodes[7].onStore(request, 2.uint64, nil, isRepairing = true)).tryGet() + + await nodes[2].switch.stop() # slot 1 missing now + await nodes[4].switch.stop() # slot 3 missing now + + # repair missing slots from repaired slots + (await nodes[8].onStore(request, 1.uint64, nil, isRepairing = true)).tryGet() + (await nodes[9].onStore(request, 3.uint64, nil, isRepairing = true)).tryGet() + + await nodes[5].switch.stop() # slot 4 missing now + + # repair missing slot from repaired slots + (await nodes[10].onStore(request, 4.uint64, nil, isRepairing = true)).tryGet() + + let + stream = (await nodes[11].retrieve(verifiableBlock.cid, local = false)).tryGet() + expectedData = await fetchStreamData(stream, datasetSize) + assert expectedData.len == data.len + assert expectedData == data diff --git a/tests/codex/slots/testslotbuilder.nim b/tests/codex/slots/testslotbuilder.nim index 9a2043a8..31a12b9f 100644 --- a/tests/codex/slots/testslotbuilder.nim +++ b/tests/codex/slots/testslotbuilder.nim @@ -167,7 +167,13 @@ suite "Slot builder": test "Should build slot hashes for all slots": let - steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots) + steppedStrategy = Strategy.init( + 0, + protectedManifest.blocksCount - 1, + protectedManifest.steps, + numSlots, + numPadSlotBlocks, + ) builder = Poseidon2Builder .new(localStore, protectedManifest, cellSize = cellSize) @@ -176,7 +182,7 @@ suite "Slot builder": for i in 0 ..< numSlots: let expectedHashes = collect(newSeq): - for j, idx in steppedStrategy.getIndicies(i): + for j, idx in steppedStrategy.getGroupIndices(i): if j > (protectedManifest.numSlotBlocks - 1): emptyDigest else: @@ -190,7 +196,13 @@ suite "Slot builder": test "Should build slot trees for all slots": let - steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots) + steppedStrategy = Strategy.init( + 0, + protectedManifest.blocksCount - 1, + protectedManifest.steps, + numSlots, + numPadSlotBlocks, + ) builder = Poseidon2Builder .new(localStore, protectedManifest, cellSize = cellSize) @@ -199,7 +211,7 @@ suite "Slot builder": for i in 0 ..< numSlots: let expectedHashes = collect(newSeq): - for j, idx in steppedStrategy.getIndicies(i): + for j, idx in steppedStrategy.getGroupIndices(i): if j > (protectedManifest.numSlotBlocks - 1): emptyDigest else: @@ -235,7 +247,13 @@ suite "Slot builder": test "Should build correct verification root": let - steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots) + steppedStrategy = Strategy.init( + 0, + protectedManifest.blocksCount - 1, + protectedManifest.steps, + numSlots, + numPadSlotBlocks, + ) builder = Poseidon2Builder .new(localStore, protectedManifest, cellSize = cellSize) .tryGet() @@ -245,7 +263,7 @@ suite "Slot builder": slotsHashes = collect(newSeq): for i in 0 ..< numSlots: let slotHashes = collect(newSeq): - for j, idx in steppedStrategy.getIndicies(i): + for j, idx in steppedStrategy.getGroupIndices(i): if j > (protectedManifest.numSlotBlocks - 1): emptyDigest else: @@ -261,7 +279,13 @@ suite "Slot builder": test "Should build correct verification root manifest": let - steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots) + steppedStrategy = Strategy.init( + 0, + protectedManifest.blocksCount - 1, + protectedManifest.steps, + numSlots, + numPadSlotBlocks, + ) builder = Poseidon2Builder .new(localStore, protectedManifest, cellSize = cellSize) .tryGet() @@ -269,7 +293,7 @@ suite "Slot builder": slotsHashes = collect(newSeq): for i in 0 ..< numSlots: let slotHashes = collect(newSeq): - for j, idx in steppedStrategy.getIndicies(i): + for j, idx in steppedStrategy.getGroupIndices(i): if j > (protectedManifest.numSlotBlocks - 1): emptyDigest else: diff --git a/tests/codex/testindexingstrategy.nim b/tests/codex/testindexingstrategy.nim index 322906dc..5f88a8be 100644 --- a/tests/codex/testindexingstrategy.nim +++ b/tests/codex/testindexingstrategy.nim @@ -1,4 +1,5 @@ import std/sequtils +import std/algorithm import pkg/chronos import pkg/codex/utils/asynciter @@ -14,61 +15,67 @@ for offset in @[0, 1, 2, 100]: firstIndex = 0 + offset lastIndex = 12 + offset nIters = 3 - linear = LinearStrategy.init(firstIndex, lastIndex, nIters) - stepped = SteppedStrategy.init(firstIndex, lastIndex, nIters) + totalGroups = 1 + linear = LinearStrategy.init(firstIndex, lastIndex, nIters, totalGroups) + stepped = SteppedStrategy.init(firstIndex, lastIndex, nIters, totalGroups) test "linear": check: - toSeq(linear.getIndicies(0)) == @[0, 1, 2, 3, 4].mapIt(it + offset) - toSeq(linear.getIndicies(1)) == @[5, 6, 7, 8, 9].mapIt(it + offset) - toSeq(linear.getIndicies(2)) == @[10, 11, 12].mapIt(it + offset) + toSeq(linear.getIndices(0)) == @[0, 1, 2, 3, 4].mapIt(it + offset) + toSeq(linear.getIndices(1)) == @[5, 6, 7, 8, 9].mapIt(it + offset) + toSeq(linear.getIndices(2)) == @[10, 11, 12].mapIt(it + offset) test "stepped": check: - toSeq(stepped.getIndicies(0)) == @[0, 3, 6, 9, 12].mapIt(it + offset) - toSeq(stepped.getIndicies(1)) == @[1, 4, 7, 10].mapIt(it + offset) - toSeq(stepped.getIndicies(2)) == @[2, 5, 8, 11].mapIt(it + offset) + toSeq(stepped.getIndices(0)) == @[0, 3, 6, 9, 12].mapIt(it + offset) + toSeq(stepped.getIndices(1)) == @[1, 4, 7, 10].mapIt(it + offset) + toSeq(stepped.getIndices(2)) == @[2, 5, 8, 11].mapIt(it + offset) suite "Indexing strategies": let - linear = LinearStrategy.init(0, 10, 3) - stepped = SteppedStrategy.init(0, 10, 3) + totalGroups = 1 + linear = LinearStrategy.init(0, 10, 3, totalGroups) + stepped = SteppedStrategy.init(0, 10, 3, totalGroups) test "smallest range 0": let - l = LinearStrategy.init(0, 0, 1) - s = SteppedStrategy.init(0, 0, 1) + l = LinearStrategy.init(0, 0, 1, totalGroups) + s = SteppedStrategy.init(0, 0, 1, totalGroups) check: - toSeq(l.getIndicies(0)) == @[0] - toSeq(s.getIndicies(0)) == @[0] + toSeq(l.getIndices(0)) == @[0] + toSeq(s.getIndices(0)) == @[0] test "smallest range 1": let - l = LinearStrategy.init(0, 1, 1) - s = SteppedStrategy.init(0, 1, 1) + l = LinearStrategy.init(0, 1, 1, totalGroups) + s = SteppedStrategy.init(0, 1, 1, totalGroups) check: - toSeq(l.getIndicies(0)) == @[0, 1] - toSeq(s.getIndicies(0)) == @[0, 1] + toSeq(l.getIndices(0)) == @[0, 1] + toSeq(s.getIndices(0)) == @[0, 1] test "first index must be smaller than last index": expect IndexingWrongIndexError: - discard LinearStrategy.init(10, 0, 1) + discard LinearStrategy.init(10, 0, 1, totalGroups) test "iterations must be greater than zero": expect IndexingWrongIterationsError: - discard LinearStrategy.init(0, 10, 0) + discard LinearStrategy.init(0, 10, 0, totalGroups) + + test "totalGroups must be greater than zero": + expect IndexingWrongTotalGroupsError: + discard LinearStrategy.init(1, 1, 1, 0) test "should split elements evenly when possible": - let l = LinearStrategy.init(0, 11, 3) + let l = LinearStrategy.init(0, 11, 3, totalGroups) check: - toSeq(l.getIndicies(0)) == @[0, 1, 2, 3].mapIt(it) - toSeq(l.getIndicies(1)) == @[4, 5, 6, 7].mapIt(it) - toSeq(l.getIndicies(2)) == @[8, 9, 10, 11].mapIt(it) + toSeq(l.getIndices(0)) == @[0, 1, 2, 3].mapIt(it) + toSeq(l.getIndices(1)) == @[4, 5, 6, 7].mapIt(it) + toSeq(l.getIndices(2)) == @[8, 9, 10, 11].mapIt(it) test "linear - oob": expect IndexingError: - discard linear.getIndicies(3) + discard linear.getIndices(3) test "stepped - oob": expect IndexingError: - discard stepped.getIndicies(3) + discard stepped.getIndices(3) diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 0ee4fc50..c6f0154b 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -1,4 +1,5 @@ import ./node/testnode import ./node/testcontracts +import ./node/testslotrepair {.warning[UnusedImport]: off.} diff --git a/tests/integration/testslotrepair.nim b/tests/integration/testslotrepair.nim new file mode 100644 index 00000000..3c065267 --- /dev/null +++ b/tests/integration/testslotrepair.nim @@ -0,0 +1,318 @@ +import pkg/questionable +import pkg/codex/logutils +import ../contracts/time +import ../contracts/deployment +import ../codex/helpers +import ../examples +import ./marketplacesuite +import ./nodeconfigs + +export logutils + +logScope: + topics = "integration test slot repair" + +marketplacesuite "SP Slot Repair": + const minPricePerBytePerSecond = 1.u256 + const collateralPerByte = 1.u256 + const blocks = 3 + const ecNodes = 5 + const ecTolerance = 2 + + test "repair from local store", + NodeConfigs( + clients: CodexConfigs + .init(nodes = 1) + .debug() + .withLogFile() + .withLogTopics("node", "erasure").some, + providers: CodexConfigs + .init(nodes = 2) + .withSimulateProofFailures(idx = 1, failEveryNProofs = 1) + .debug() + .withLogFile() + .withLogTopics("marketplace", "sales", "reservations", "node", "statemachine").some, + validators: CodexConfigs + .init(nodes = 1) + .debug() + .withLogFile() + .withLogTopics("validator").some, + ): + let client0 = clients()[0] + let provider0 = providers()[0] + let provider1 = providers()[1] + let expiry = 10.periods + let duration = expiry + 10.periods + + let data = await RandomChunker.example(blocks = blocks) + let slotSize = slotSize(blocks, ecNodes, ecTolerance) + + let availability = ( + await provider0.client.postAvailability( + totalSize = 4 * slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 100 * slotSize * collateralPerByte, + ) + ).get + + discard await provider1.client.postAvailability( + totalSize = slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 100 * slotSize * collateralPerByte, + ) + + var filledSlotIds: seq[SlotId] = @[] + proc onSlotFilled(eventResult: ?!SlotFilled) = + assert not eventResult.isErr + let event = !eventResult + let slotId = slotId(event.requestId, event.slotIndex) + filledSlotIds.add slotId + + let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + + var freedSlotId = none SlotId + proc onSlotFreed(eventResult: ?!SlotFreed) = + assert not eventResult.isErr + let event = !eventResult + let slotId = slotId(event.requestId, event.slotIndex) + + assert slotId in filledSlotIds + + filledSlotIds.del(filledSlotIds.find(slotId)) + freedSlotId = some(slotId) + + let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + + let cid = (await client0.client.upload(data)).get + + let purchaseId = await client0.client.requestStorage( + cid, + expiry = expiry, + duration = duration, + nodes = ecNodes, + tolerance = ecTolerance, + proofProbability = 1.u256, + ) + + check eventually( + await client0.client.purchaseStateIs(purchaseId, "started"), + timeout = expiry.int * 1000, + ) + + await client0.stop() + + await provider0.client.patchAvailability( + availabilityId = availability.id, + totalSize = (5 * slotSize.truncate(uint64)).uint64.some, + duration = duration.uint64.some, + minPricePerBytePerSecond = minPricePerBytePerSecond.some, + totalCollateral = (100 * slotSize * collateralPerByte).some, + ) + + check eventually(freedSlotId.isSome, timeout = (duration - expiry).int * 1000) + + check eventually( + freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000 + ) + + await filledSubscription.unsubscribe() + await slotFreedsubscription.unsubscribe() + + test "repair from remote store", + NodeConfigs( + clients: CodexConfigs + .init(nodes = 1) + .debug() + .withLogFile() + .withLogTopics("node", "erasure").some, + providers: CodexConfigs + .init(nodes = 3) + .withSimulateProofFailures(idx = 1, failEveryNProofs = 1) + .debug() + .withLogFile() + .withLogTopics("marketplace", "sales", "reservations", "node", "statemachine").some, + validators: CodexConfigs + .init(nodes = 1) + .debug() + .withLogFile() + .withLogTopics("validator").some, + ): + let client0 = clients()[0] + let provider0 = providers()[0] + let provider1 = providers()[1] + let provider2 = providers()[2] + let expiry = 10.periods + let duration = expiry + 10.periods + + let data = await RandomChunker.example(blocks = blocks) + let slotSize = slotSize(blocks, ecNodes, ecTolerance) + + discard await provider0.client.postAvailability( + totalSize = 4 * slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 100 * slotSize * collateralPerByte, + ) + + discard await provider1.client.postAvailability( + totalSize = slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 100 * slotSize * collateralPerByte, + ) + + var filledSlotIds: seq[SlotId] = @[] + proc onSlotFilled(eventResult: ?!SlotFilled) = + assert not eventResult.isErr + let event = !eventResult + let slotId = slotId(event.requestId, event.slotIndex) + filledSlotIds.add slotId + + let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + + var freedSlotId = none SlotId + proc onSlotFreed(eventResult: ?!SlotFreed) = + assert not eventResult.isErr + let event = !eventResult + let slotId = slotId(event.requestId, event.slotIndex) + + assert slotId in filledSlotIds + + filledSlotIds.del(filledSlotIds.find(slotId)) + freedSlotId = some(slotId) + + let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + + let cid = (await client0.client.upload(data)).get + + let purchaseId = await client0.client.requestStorage( + cid, + expiry = expiry, + duration = duration, + nodes = ecNodes, + tolerance = ecTolerance, + proofProbability = 1.u256, + ) + + check eventually( + await client0.client.purchaseStateIs(purchaseId, "started"), + timeout = expiry.int * 1000, + ) + + await client0.stop() + + discard await provider2.client.postAvailability( + totalSize = slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 100 * slotSize * collateralPerByte, + ) + + check eventually(freedSlotId.isSome, timeout = (duration - expiry).int * 1000) + + await provider1.stop() + + check eventually( + freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000 + ) + + await filledSubscription.unsubscribe() + await slotFreedsubscription.unsubscribe() + + test "storage provider slot repair", + NodeConfigs( + clients: CodexConfigs + .init(nodes = 1) + .debug() + .withLogFile() + .withLogTopics("node", "erasure").some, + providers: CodexConfigs + .init(nodes = 4) + .debug() + .withLogFile() + .withLogTopics("marketplace", "sales", "reservations", "node").some, + validators: CodexConfigs + .init(nodes = 1) + .debug() + .withLogFile() + .withLogTopics("validator").some, + ): + let client0 = clients()[0] + let expiry = 10.periods + let duration = expiry + 10.periods + let size = 0xFFFFFF.uint64 + + let data = await RandomChunker.example(blocks = blocks) + let datasetSize = + datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) + + await createAvailabilities( + size, duration, datasetSize * collateralPerByte, minPricePerBytePerSecond + ) + + let cid = (await client0.client.upload(data)).get + + let purchaseId = await client0.client.requestStorage( + cid, + expiry = expiry, + duration = duration, + collateralPerByte = collateralPerByte, + nodes = ecNodes, + tolerance = ecTolerance, + proofProbability = 1.u256, + pricePerBytePerSecond = minPricePerBytePerSecond, + ) + + let requestId = (await client0.client.requestId(purchaseId)).get + + var filledSlotIds: seq[SlotId] = @[] + proc onSlotFilled(eventResult: ?!SlotFilled) = + assert not eventResult.isErr + let event = !eventResult + if event.requestId == requestId: + let slotId = slotId(event.requestId, event.slotIndex) + filledSlotIds.add slotId + + let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + + check eventually( + await client0.client.purchaseStateIs(purchaseId, "started"), + timeout = expiry.int * 1000, + ) + + await client0.stop() + + check eventually( + filledSlotIds.len == blocks, timeout = (duration - expiry).int * 1000 + ) + trace "all slots have been filled" + + var slotWasFreed = false + proc onSlotFreed(event: ?!SlotFreed) = + if event.isOk and event.value.requestId == requestId: + trace "slot was freed", slotIndex = $event.value.slotIndex + slotWasFreed = true + + let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + + block provider_search: + while true: + for slotId in filledSlotIds: + for provider in providers(): + if (await provider.client.saleStateIs(slotId, "SaleProving")): + await provider.stop() + break provider_search + await sleepAsync(100.milliseconds) + + check eventually(slotWasFreed, timeout = (duration - expiry).int * 1000) + + await slotFreedsubscription.unsubscribe() + + check eventually( + filledSlotIds.len > blocks, timeout = (duration - expiry).int * 1000 + ) + trace "freed slot was filled" + + await filledSubscription.unsubscribe() diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index 152d22dd..07a69749 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -9,5 +9,6 @@ import ./integration/testmarketplace import ./integration/testproofs import ./integration/testvalidator import ./integration/testecbug +import ./integration/testslotrepair {.warning[UnusedImport]: off.}