diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index ef1465f9..9e68ebbb 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -124,6 +124,10 @@ proc start*(b: Advertiser) {.async: (raises: []).} = trace "Advertiser start" + # The advertiser is expected to be started only once. + if b.advertiserRunning: + raiseAssert "Advertiser can only be started once — this should not happen" + proc onBlock(cid: Cid) {.async: (raises: []).} = try: await b.advertiseBlock(cid) @@ -133,10 +137,6 @@ proc start*(b: Advertiser) {.async: (raises: []).} = doAssert(b.localStore.onBlockStored.isNone()) b.localStore.onBlockStored = onBlock.some - if b.advertiserRunning: - warn "Starting advertiser twice" - return - b.advertiserRunning = true for i in 0 ..< b.concurrentAdvReqs: let fut = b.processQueueLoop() diff --git a/codex/codex.nim b/codex/codex.nim index ef5a3b94..81357464 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -56,6 +56,7 @@ type codexNode: CodexNodeRef repoStore: RepoStore maintenance: BlockMaintainer + taskpool: Taskpool CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet @@ -193,6 +194,9 @@ proc stop*(s: CodexServer) {.async.} = error "Failed to stop codex node", failures = res.failure.len raiseAssert "Failed to stop codex node" + if not s.taskpool.isNil: + s.taskpool.shutdown() + proc new*( T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey ): CodexServer = @@ -333,4 +337,5 @@ proc new*( restServer: restServer, repoStore: repoStore, maintenance: maintenance, + taskpool: taskpool, ) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index a83838c6..a75837b7 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -25,6 +25,7 @@ import ../logutils import ../manifest import ../merkletree import ../stores +import ../clock import ../blocktype as bt import ../utils import ../utils/asynciter @@ -120,19 +121,25 @@ func indexToPos(steps, idx, step: int): int {.inline.} = (idx - step) div steps proc getPendingBlocks( - self: Erasure, manifest: Manifest, indicies: seq[int] -): AsyncIter[(?!bt.Block, int)] = + self: Erasure, manifest: Manifest, indices: seq[int] +): (AsyncIter[(?!bt.Block, int)], seq[Future[?!bt.Block]]) = ## Get pending blocks iterator ## - var + pendingBlockFutures: seq[Future[?!bt.Block]] = @[] + pendingBlocks: seq[Future[(?!bt.Block, int)]] = @[] + + proc attachIndex( + fut: Future[?!bt.Block], i: int + ): Future[(?!bt.Block, int)] {.async.} = + ## avoids closure capture issues + return (await fut, i) + + for blockIndex in indices: # request blocks from the store - pendingBlocks = indicies.map( - (i: int) => - self.store.getBlock(BlockAddress.init(manifest.treeCid, i)).map( - (r: ?!bt.Block) => (r, i) - ) # Get the data blocks (first K) - ) + let fut = self.store.getBlock(BlockAddress.init(manifest.treeCid, blockIndex)) + pendingBlockFutures.add(fut) + pendingBlocks.add(attachIndex(fut, blockIndex)) proc isFinished(): bool = pendingBlocks.len == 0 @@ -150,7 +157,7 @@ proc getPendingBlocks( $index, ) - AsyncIter[(?!bt.Block, int)].new(genNext, isFinished) + (AsyncIter[(?!bt.Block, int)].new(genNext, isFinished), pendingBlockFutures) proc prepareEncodingData( self: Erasure, @@ -168,16 +175,16 @@ proc prepareEncodingData( strategy = params.strategy.init( firstIndex = 0, lastIndex = params.rounded - 1, iterations = params.steps ) - indicies = toSeq(strategy.getIndicies(step)) - pendingBlocksIter = - self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount)) + indices = toSeq(strategy.getIndices(step)) + (pendingBlocksIter, _) = + self.getPendingBlocks(manifest, indices.filterIt(it < manifest.blocksCount)) var resolved = 0 for fut in pendingBlocksIter: let (blkOrErr, idx) = await fut without blk =? blkOrErr, err: - warn "Failed retreiving a block", treeCid = manifest.treeCid, idx, msg = err.msg - continue + warn "Failed retrieving a block", treeCid = manifest.treeCid, idx, msg = err.msg + return failure(err) let pos = indexToPos(params.steps, idx, step) shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data) @@ -185,7 +192,7 @@ proc prepareEncodingData( resolved.inc() - for idx in indicies.filterIt(it >= manifest.blocksCount): + for idx in indices.filterIt(it >= manifest.blocksCount): let pos = indexToPos(params.steps, idx, step) trace "Padding with empty block", idx shallowCopy(data[pos], emptyBlock) @@ -218,8 +225,8 @@ proc prepareDecodingData( strategy = encoded.protectedStrategy.init( firstIndex = 0, lastIndex = encoded.blocksCount - 1, iterations = encoded.steps ) - indicies = toSeq(strategy.getIndicies(step)) - pendingBlocksIter = self.getPendingBlocks(encoded, indicies) + indices = toSeq(strategy.getIndices(step)) + (pendingBlocksIter, pendingBlockFutures) = self.getPendingBlocks(encoded, indices) var dataPieces = 0 @@ -233,7 +240,7 @@ proc prepareDecodingData( let (blkOrErr, idx) = await fut without blk =? blkOrErr, err: - trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg + trace "Failed retrieving a block", idx, treeCid = encoded.treeCid, msg = err.msg continue let pos = indexToPos(encoded.steps, idx, step) @@ -259,6 +266,11 @@ proc prepareDecodingData( resolved.inc + pendingBlockFutures.apply( + proc(fut: auto) = + fut.cancel() + ) + return success (dataPieces.Natural, parityPieces.Natural) proc init*( @@ -352,7 +364,7 @@ proc asyncEncode*( return failure(joinErr) if not task.success.load(): - return failure("Leopard encoding failed") + return failure("Leopard encoding task failed") success() @@ -382,6 +394,8 @@ proc encodeData( var data = seq[seq[byte]].new() # number of blocks to encode parity = createDoubleArray(params.ecM, manifest.blockSize.int) + defer: + freeDoubleArray(parity, params.ecM) data[].setLen(params.ecK) # TODO: this is a tight blocking loop so we sleep here to allow @@ -406,8 +420,6 @@ proc encodeData( return failure(err) except CancelledError as exc: raise exc - finally: - freeDoubleArray(parity, params.ecM) var idx = params.rounded + step for j in 0 ..< params.ecM: @@ -544,17 +556,13 @@ proc asyncDecode*( return failure(joinErr) if not task.success.load(): - return failure("Leopard encoding failed") + return failure("Leopard decoding task failed") success() -proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = - ## Decode a protected manifest into it's original - ## manifest - ## - ## `encoded` - the encoded (protected) manifest to - ## be recovered - ## +proc decodeInternal( + self: Erasure, encoded: Manifest +): Future[?!(ref seq[Cid], seq[Natural])] {.async.} = logScope: steps = encoded.steps rounded_blocks = encoded.rounded @@ -578,6 +586,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = data = seq[seq[byte]].new() parityData = seq[seq[byte]].new() recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int) + defer: + freeDoubleArray(recovered, encoded.ecK) data[].setLen(encoded.ecK) # set len to K parityData[].setLen(encoded.ecM) # set len to M @@ -604,8 +614,6 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = return failure(err) except CancelledError as exc: raise exc - finally: - freeDoubleArray(recovered, encoded.ecK) for i in 0 ..< encoded.ecK: let idx = i * encoded.steps + step @@ -634,6 +642,19 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = finally: decoder.release() + return (cids, recoveredIndices).success + +proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = + ## Decode a protected manifest into it's original + ## manifest + ## + ## `encoded` - the encoded (protected) manifest to + ## be recovered + ## + + without (cids, recoveredIndices) =? (await self.decodeInternal(encoded)), err: + return failure(err) + without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: return failure(err) @@ -655,6 +676,44 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = return decoded.success +proc repair*(self: Erasure, encoded: Manifest): Future[?!void] {.async.} = + ## Repair a protected manifest by reconstructing the full dataset + ## + ## `encoded` - the encoded (protected) manifest to + ## be repaired + ## + + without (cids, _) =? (await self.decodeInternal(encoded)), err: + return failure(err) + + without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: + return failure(err) + + without treeCid =? tree.rootCid, err: + return failure(err) + + if treeCid != encoded.originalTreeCid: + return failure( + "Original tree root differs from the tree root computed out of recovered data" + ) + + if err =? (await self.store.putAllProofs(tree)).errorOption: + return failure(err) + + without repaired =? ( + await self.encode( + Manifest.new(encoded), encoded.ecK, encoded.ecM, encoded.protectedStrategy + ) + ), err: + return failure(err) + + if repaired.treeCid != encoded.treeCid: + return failure( + "Original tree root differs from the repaired tree root encoded out of recovered data" + ) + + return success() + proc start*(self: Erasure) {.async.} = return diff --git a/codex/indexingstrategy.nim b/codex/indexingstrategy.nim index 063ecd98..99a5d12d 100644 --- a/codex/indexingstrategy.nim +++ b/codex/indexingstrategy.nim @@ -24,13 +24,17 @@ type IndexingError* = object of CodexError IndexingWrongIndexError* = object of IndexingError IndexingWrongIterationsError* = object of IndexingError + IndexingWrongGroupCountError* = object of IndexingError + IndexingWrongPadBlockCountError* = object of IndexingError IndexingStrategy* = object - strategyType*: StrategyType + strategyType*: StrategyType # Indexing strategy algorithm firstIndex*: int # Lowest index that can be returned lastIndex*: int # Highest index that can be returned - iterations*: int # getIndices(iteration) will run from 0 ..< iterations - step*: int + iterations*: int # Number of iteration steps (0 ..< iterations) + step*: int # Step size between generated indices + groupCount*: int # Number of groups to partition indices into + padBlockCount*: int # Number of padding blocks to append per group func checkIteration( self: IndexingStrategy, iteration: int @@ -44,39 +48,47 @@ func getIter(first, last, step: int): Iter[int] = {.cast(noSideEffect).}: Iter[int].new(first, last, step) -func getLinearIndicies( - self: IndexingStrategy, iteration: int -): Iter[int] {.raises: [IndexingError].} = - self.checkIteration(iteration) - +func getLinearIndices(self: IndexingStrategy, iteration: int): Iter[int] = let first = self.firstIndex + iteration * self.step last = min(first + self.step - 1, self.lastIndex) getIter(first, last, 1) -func getSteppedIndicies( - self: IndexingStrategy, iteration: int -): Iter[int] {.raises: [IndexingError].} = - self.checkIteration(iteration) - +func getSteppedIndices(self: IndexingStrategy, iteration: int): Iter[int] = let first = self.firstIndex + iteration last = self.lastIndex getIter(first, last, self.iterations) -func getIndicies*( - self: IndexingStrategy, iteration: int -): Iter[int] {.raises: [IndexingError].} = +func getStrategyIndices(self: IndexingStrategy, iteration: int): Iter[int] = case self.strategyType of StrategyType.LinearStrategy: - self.getLinearIndicies(iteration) + self.getLinearIndices(iteration) of StrategyType.SteppedStrategy: - self.getSteppedIndicies(iteration) + self.getSteppedIndices(iteration) + +func getIndices*( + self: IndexingStrategy, iteration: int +): Iter[int] {.raises: [IndexingError].} = + self.checkIteration(iteration) + {.cast(noSideEffect).}: + Iter[int].new( + iterator (): int {.gcsafe.} = + for value in self.getStrategyIndices(iteration): + yield value + + for i in 0 ..< self.padBlockCount: + yield self.lastIndex + (iteration + 1) + i * self.groupCount + + ) func init*( - strategy: StrategyType, firstIndex, lastIndex, iterations: int + strategy: StrategyType, + firstIndex, lastIndex, iterations: int, + groupCount = 0, + padBlockCount = 0, ): IndexingStrategy {.raises: [IndexingError].} = if firstIndex > lastIndex: raise newException( @@ -91,10 +103,24 @@ func init*( "iterations (" & $iterations & ") must be greater than zero.", ) + if padBlockCount < 0: + raise newException( + IndexingWrongPadBlockCountError, + "padBlockCount (" & $padBlockCount & ") must be equal or greater than zero.", + ) + + if padBlockCount > 0 and groupCount <= 0: + raise newException( + IndexingWrongGroupCountError, + "groupCount (" & $groupCount & ") must be greater than zero.", + ) + IndexingStrategy( strategyType: strategy, firstIndex: firstIndex, lastIndex: lastIndex, iterations: iterations, step: divUp((lastIndex - firstIndex + 1), iterations), + groupCount: groupCount, + padBlockCount: padBlockCount, ) diff --git a/codex/node.nim b/codex/node.nim index 34d71774..e010b085 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -639,10 +639,6 @@ proc onStore( trace "Received a request to store a slot" - # TODO: Use the isRepairing to manage the slot download. - # If isRepairing is true, the slot has to be repaired before - # being downloaded. - without manifest =? (await self.fetchManifest(cid)), err: trace "Unable to fetch manifest for cid", cid, err = err.msg return failure(err) @@ -675,32 +671,45 @@ proc onStore( return success() - without indexer =? - manifest.verifiableStrategy.init(0, manifest.blocksCount - 1, manifest.numSlots).catch, - err: - trace "Unable to create indexing strategy from protected manifest", err = err.msg - return failure(err) - if slotIdx > int.high.uint64: error "Cannot cast slot index to int", slotIndex = slotIdx return - without blksIter =? indexer.getIndicies(slotIdx.int).catch, err: - trace "Unable to get indicies from strategy", err = err.msg - return failure(err) + if isRepairing: + trace "start repairing slot", slotIdx + try: + let erasure = Erasure.new( + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + ) + if err =? (await erasure.repair(manifest)).errorOption: + error "Unable to erasure decode repairing manifest", + cid = manifest.treeCid, exc = err.msg + return failure(err) + except CatchableError as exc: + error "Error erasure decoding repairing manifest", + cid = manifest.treeCid, exc = exc.msg + return failure(exc.msg) + else: + without indexer =? + manifest.verifiableStrategy.init(0, manifest.blocksCount - 1, manifest.numSlots).catch, + err: + trace "Unable to create indexing strategy from protected manifest", err = err.msg + return failure(err) - if err =? ( - await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry) - ).errorOption: - trace "Unable to fetch blocks", err = err.msg - return failure(err) + without blksIter =? indexer.getIndices(slotIdx.int).catch, err: + trace "Unable to get indices from strategy", err = err.msg + return failure(err) + + if err =? ( + await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry) + ).errorOption: + trace "Unable to fetch blocks", err = err.msg + return failure(err) without slotRoot =? (await builder.buildSlot(slotIdx.int)), err: trace "Unable to build slot", err = err.msg return failure(err) - trace "Slot successfully retrieved and reconstructed" - if cid =? slotRoot.toSlotCid() and cid != manifest.slotRoots[slotIdx]: trace "Slot root mismatch", manifest = manifest.slotRoots[slotIdx.int], recovered = slotRoot.toSlotCid() @@ -842,9 +851,6 @@ proc start*(self: CodexNodeRef) {.async.} = proc stop*(self: CodexNodeRef) {.async.} = trace "Stopping node" - if not self.taskpool.isNil: - self.taskpool.shutdown() - await self.trackedFutures.cancelTracked() if not self.engine.isNil: diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index a26fc04e..5fbb0fe1 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -113,17 +113,17 @@ func numSlotCells*[T, H](self: SlotsBuilder[T, H]): Natural = self.numBlockCells * self.numSlotBlocks -func slotIndiciesIter*[T, H](self: SlotsBuilder[T, H], slot: Natural): ?!Iter[int] = +func slotIndicesIter*[T, H](self: SlotsBuilder[T, H], slot: Natural): ?!Iter[int] = ## Returns the slot indices. ## - self.strategy.getIndicies(slot).catch + self.strategy.getIndices(slot).catch -func slotIndicies*[T, H](self: SlotsBuilder[T, H], slot: Natural): seq[int] = +func slotIndices*[T, H](self: SlotsBuilder[T, H], slot: Natural): seq[int] = ## Returns the slot indices. ## - if iter =? self.strategy.getIndicies(slot).catch: + if iter =? self.strategy.getIndices(slot).catch: return toSeq(iter) func manifest*[T, H](self: SlotsBuilder[T, H]): Manifest = @@ -184,7 +184,7 @@ proc getCellHashes*[T, H]( slotIndex = slotIndex let hashes = collect(newSeq): - for i, blkIdx in self.strategy.getIndicies(slotIndex): + for i, blkIdx in self.strategy.getIndices(slotIndex): logScope: blkIdx = blkIdx pos = i @@ -310,7 +310,7 @@ proc new*[T, H]( _: type SlotsBuilder[T, H], store: BlockStore, manifest: Manifest, - strategy = SteppedStrategy, + strategy = LinearStrategy, cellSize = DefaultCellSize, ): ?!SlotsBuilder[T, H] = if not manifest.protected: @@ -354,7 +354,14 @@ proc new*[T, H]( emptyBlock = newSeq[byte](manifest.blockSize.int) emptyDigestTree = ?T.digestTree(emptyBlock, cellSize.int) - strategy = ?strategy.init(0, numBlocksTotal - 1, manifest.numSlots).catch + strategy = + ?strategy.init( + 0, + manifest.blocksCount - 1, + manifest.numSlots, + manifest.numSlots, + numPadSlotBlocks, + ).catch logScope: numSlotBlocks = numSlotBlocks diff --git a/codex/slots/sampler/sampler.nim b/codex/slots/sampler/sampler.nim index 6ea41ee3..d7a36cfd 100644 --- a/codex/slots/sampler/sampler.nim +++ b/codex/slots/sampler/sampler.nim @@ -53,7 +53,7 @@ proc getSample*[T, H]( cellsPerBlock = self.builder.numBlockCells blkCellIdx = cellIdx.toCellInBlk(cellsPerBlock) # block cell index blkSlotIdx = cellIdx.toBlkInSlot(cellsPerBlock) # slot tree index - origBlockIdx = self.builder.slotIndicies(self.index)[blkSlotIdx] + origBlockIdx = self.builder.slotIndices(self.index)[blkSlotIdx] # convert to original dataset block index logScope: diff --git a/codex/utils/iter.nim b/codex/utils/iter.nim index 17ba8d8b..9afd6c12 100644 --- a/codex/utils/iter.nim +++ b/codex/utils/iter.nim @@ -7,6 +7,7 @@ type Function*[T, U] = proc(fut: T): U {.raises: [CatchableError], gcsafe, closure.} IsFinished* = proc(): bool {.raises: [], gcsafe, closure.} GenNext*[T] = proc(): T {.raises: [CatchableError], gcsafe.} + Iterator[T] = iterator (): T Iter*[T] = ref object finished: bool next*: GenNext[T] @@ -89,6 +90,37 @@ proc new*[T](_: type Iter[T], items: seq[T]): Iter[T] = Iter[int].new(0 ..< items.len).map((i: int) => items[i]) +proc new*[T](_: type Iter[T], iter: Iterator[T]): Iter[T] = + ## Creates a new Iter from an iterator + ## + var nextOrErr: Option[?!T] + proc tryNext(): void = + nextOrErr = none(?!T) + while not iter.finished: + try: + let t: T = iter() + if not iter.finished: + nextOrErr = some(success(t)) + break + except CatchableError as err: + nextOrErr = some(T.failure(err)) + + proc genNext(): T {.raises: [CatchableError].} = + if nextOrErr.isNone: + raise newException(CatchableError, "Iterator finished but genNext was called") + + without u =? nextOrErr.unsafeGet, err: + raise err + + tryNext() + return u + + proc isFinished(): bool = + nextOrErr.isNone + + tryNext() + Iter[T].new(genNext, isFinished) + proc empty*[T](_: type Iter[T]): Iter[T] = ## Creates an empty Iter ## @@ -105,10 +137,10 @@ proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] = Iter[U].new(genNext = () => fn(iter.next()), isFinished = () => iter.finished) proc mapFilter*[T, U](iter: Iter[T], mapPredicate: Function[T, Option[U]]): Iter[U] = - var nextUOrErr: Option[Result[U, ref CatchableError]] + var nextUOrErr: Option[?!U] proc tryFetch(): void = - nextUOrErr = Result[U, ref CatchableError].none + nextUOrErr = none(?!U) while not iter.finished: try: let t = iter.next() @@ -119,6 +151,9 @@ proc mapFilter*[T, U](iter: Iter[T], mapPredicate: Function[T, Option[U]]): Iter nextUOrErr = some(U.failure(err)) proc genNext(): U {.raises: [CatchableError].} = + if nextUOrErr.isNone: + raise newException(CatchableError, "Iterator finished but genNext was called") + # at this point nextUOrErr should always be some(..) without u =? nextUOrErr.unsafeGet, err: raise err diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index 0c250231..6ab345d1 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -26,8 +26,8 @@ asyncchecksuite "NetworkStore engine - 2 nodes": setup: blocks1 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb) blocks2 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb) - nodeCmps1 = generateNodes(1, blocks1)[0] - nodeCmps2 = generateNodes(1, blocks2)[0] + nodeCmps1 = generateNodes(1, blocks1).components[0] + nodeCmps2 = generateNodes(1, blocks2).components[0] await allFuturesThrowing( nodeCmps1.switch.start(), diff --git a/tests/codex/helpers/nodeutils.nim b/tests/codex/helpers/nodeutils.nim index 057e349d..6d2edd46 100644 --- a/tests/codex/helpers/nodeutils.nim +++ b/tests/codex/helpers/nodeutils.nim @@ -1,6 +1,7 @@ import std/sequtils import pkg/chronos +import pkg/taskpools import pkg/libp2p import pkg/libp2p/errors @@ -8,60 +9,221 @@ import pkg/codex/discovery import pkg/codex/stores import pkg/codex/blocktype as bt import pkg/codex/blockexchange +import pkg/codex/systemclock +import pkg/codex/nat +import pkg/codex/utils/natutils +import pkg/codex/slots + +import pkg/codex/node import ../examples +import ../../helpers -type NodesComponents* = - tuple[ - switch: Switch, - blockDiscovery: Discovery, - wallet: WalletRef, - network: BlockExcNetwork, - localStore: BlockStore, - peerStore: PeerCtxStore, - pendingBlocks: PendingBlocksManager, - discovery: DiscoveryEngine, - engine: BlockExcEngine, - networkStore: NetworkStore, - ] +proc nextFreePort*(startPort: int): Future[int] {.async.} = + proc client(server: StreamServer, transp: StreamTransport) {.async: (raises: []).} = + await transp.closeWait() + + var port = startPort + while true: + try: + let host = initTAddress("127.0.0.1", port) + var server = createStreamServer(host, client, {ReuseAddr}) + await server.closeWait() + return port + except TransportOsError: + inc port + +type + NodesComponents* = object + switch*: Switch + blockDiscovery*: Discovery + wallet*: WalletRef + network*: BlockExcNetwork + localStore*: BlockStore + peerStore*: PeerCtxStore + pendingBlocks*: PendingBlocksManager + discovery*: DiscoveryEngine + engine*: BlockExcEngine + networkStore*: NetworkStore + node*: CodexNodeRef = nil + tempDbs*: seq[TempLevelDb] = @[] + + NodesCluster* = ref object + components*: seq[NodesComponents] + taskpool*: Taskpool + + NodeConfig* = object + useRepoStore*: bool = false + findFreePorts*: bool = false + basePort*: int = 8080 + createFullNode*: bool = false + enableBootstrap*: bool = false + +converter toTuple*( + nc: NodesComponents +): tuple[ + switch: Switch, + blockDiscovery: Discovery, + wallet: WalletRef, + network: BlockExcNetwork, + localStore: BlockStore, + peerStore: PeerCtxStore, + pendingBlocks: PendingBlocksManager, + discovery: DiscoveryEngine, + engine: BlockExcEngine, + networkStore: NetworkStore, +] = + ( + nc.switch, nc.blockDiscovery, nc.wallet, nc.network, nc.localStore, nc.peerStore, + nc.pendingBlocks, nc.discovery, nc.engine, nc.networkStore, + ) + +converter toComponents*(cluster: NodesCluster): seq[NodesComponents] = + cluster.components + +proc nodes*(cluster: NodesCluster): seq[CodexNodeRef] = + cluster.components.filterIt(it.node != nil).mapIt(it.node) + +proc localStores*(cluster: NodesCluster): seq[BlockStore] = + cluster.components.mapIt(it.localStore) + +proc switches*(cluster: NodesCluster): seq[Switch] = + cluster.components.mapIt(it.switch) proc generateNodes*( - num: Natural, blocks: openArray[bt.Block] = [] -): seq[NodesComponents] = + num: Natural, blocks: openArray[bt.Block] = [], config: NodeConfig = NodeConfig() +): NodesCluster = + var + components: seq[NodesComponents] = @[] + taskpool = Taskpool.new() + bootstrapNodes: seq[SignedPeerRecord] = @[] + for i in 0 ..< num: + let basePortForNode = config.basePort + 2 * i.int + let listenPort = + if config.findFreePorts: + waitFor nextFreePort(basePortForNode) + else: + basePortForNode + + let bindPort = + if config.findFreePorts: + waitFor nextFreePort(listenPort + 1) + else: + listenPort + 1 + let - switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) - discovery = Discovery.new( - switch.peerInfo.privateKey, - announceAddrs = - @[ - MultiAddress.init("/ip4/127.0.0.1/tcp/0").expect( - "Should return multiaddress" - ) - ], + listenAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/" & $listenPort).expect( + "invalid multiaddress" + ) + + switch = newStandardSwitch( + transportFlags = {ServerFlags.ReuseAddr}, + sendSignedPeerRecord = config.enableBootstrap, + addrs = + if config.findFreePorts: + listenAddr + else: + MultiAddress.init("/ip4/127.0.0.1/tcp/0").expect("invalid multiaddress"), ) - wallet = WalletRef.example + + wallet = + if config.createFullNode: + WalletRef.new(EthPrivateKey.random()) + else: + WalletRef.example network = BlockExcNetwork.new(switch) - localStore = CacheStore.new(blocks.mapIt(it)) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() - advertiser = Advertiser.new(localStore, discovery) - blockDiscovery = - DiscoveryEngine.new(localStore, peerStore, network, discovery, pendingBlocks) + + let (localStore, tempDbs, blockDiscovery) = + if config.useRepoStore: + let + bdStore = TempLevelDb.new() + repoStore = TempLevelDb.new() + mdStore = TempLevelDb.new() + store = + RepoStore.new(repoStore.newDb(), mdStore.newDb(), clock = SystemClock.new()) + blockDiscoveryStore = bdStore.newDb() + discovery = Discovery.new( + switch.peerInfo.privateKey, + announceAddrs = @[listenAddr], + bindPort = bindPort.Port, + store = blockDiscoveryStore, + bootstrapNodes = bootstrapNodes, + ) + waitFor store.start() + (store.BlockStore, @[bdStore, repoStore, mdStore], discovery) + else: + let + store = CacheStore.new(blocks.mapIt(it)) + discovery = + Discovery.new(switch.peerInfo.privateKey, announceAddrs = @[listenAddr]) + (store.BlockStore, newSeq[TempLevelDb](), discovery) + + let + discovery = DiscoveryEngine.new( + localStore, peerStore, network, blockDiscovery, pendingBlocks + ) + advertiser = Advertiser.new(localStore, blockDiscovery) engine = BlockExcEngine.new( - localStore, wallet, network, blockDiscovery, advertiser, peerStore, - pendingBlocks, + localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks ) networkStore = NetworkStore.new(engine, localStore) switch.mount(network) - let nc: NodesComponents = ( - switch, discovery, wallet, network, localStore, peerStore, pendingBlocks, - blockDiscovery, engine, networkStore, + let node = + if config.createFullNode: + let fullNode = CodexNodeRef.new( + switch = switch, + networkStore = networkStore, + engine = engine, + prover = Prover.none, + discovery = blockDiscovery, + taskpool = taskpool, + ) + + if config.enableBootstrap: + waitFor switch.peerInfo.update() + let (announceAddrs, discoveryAddrs) = nattedAddress( + NatConfig(hasExtIp: false, nat: NatNone), + switch.peerInfo.addrs, + bindPort.Port, + ) + blockDiscovery.updateAnnounceRecord(announceAddrs) + blockDiscovery.updateDhtRecord(discoveryAddrs) + if blockDiscovery.dhtRecord.isSome: + bootstrapNodes.add !blockDiscovery.dhtRecord + + fullNode + else: + nil + + let nodeComponent = NodesComponents( + switch: switch, + blockDiscovery: blockDiscovery, + wallet: wallet, + network: network, + localStore: localStore, + peerStore: peerStore, + pendingBlocks: pendingBlocks, + discovery: discovery, + engine: engine, + networkStore: networkStore, + node: node, + tempDbs: tempDbs, ) - result.add(nc) + components.add(nodeComponent) + + if config.createFullNode: + for component in components: + if component.node != nil: + waitFor component.node.switch.start() + waitFor component.node.start() + + return NodesCluster(components: components, taskpool: taskpool) proc connectNodes*(nodes: seq[Switch]) {.async.} = for dialer in nodes: @@ -71,3 +233,22 @@ proc connectNodes*(nodes: seq[Switch]) {.async.} = proc connectNodes*(nodes: seq[NodesComponents]) {.async.} = await connectNodes(nodes.mapIt(it.switch)) + +proc connectNodes*(cluster: NodesCluster) {.async.} = + await connectNodes(cluster.components) + +proc cleanup*(cluster: NodesCluster) {.async.} = + for component in cluster.components: + if component.node != nil: + await component.node.switch.stop() + await component.node.stop() + + for component in cluster.components: + for db in component.tempDbs: + await db.destroyDb() + + for component in cluster.components: + if component.tempDbs.len > 0: + await RepoStore(component.localStore).stop() + + cluster.taskpool.shutdown() diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index 8469d3e7..e8d9c743 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -129,11 +129,11 @@ asyncchecksuite "Test Node - Host contracts": (await onStore(request, expiry, 1.uint64, onBlocks, isRepairing = false)).tryGet() check fetchedBytes == 12 * DefaultBlockSize.uint - let indexer = verifiable.protectedStrategy.init( - 0, verifiable.numSlotBlocks() - 1, verifiable.numSlots + let indexer = verifiable.verifiableStrategy.init( + 0, verifiable.blocksCount - 1, verifiable.numSlots ) - for index in indexer.getIndicies(1): + for index in indexer.getIndices(1): let blk = (await localStore.getBlock(verifiable.treeCid, index)).tryGet key = (createBlockExpirationMetadataKey(blk.cid)).tryGet diff --git a/tests/codex/node/testslotrepair.nim b/tests/codex/node/testslotrepair.nim new file mode 100644 index 00000000..d96078d2 --- /dev/null +++ b/tests/codex/node/testslotrepair.nim @@ -0,0 +1,220 @@ +import std/options +import std/importutils +import std/times + +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/stint + +import pkg/codex/logutils +import pkg/codex/stores +import pkg/codex/contracts +import pkg/codex/slots +import pkg/codex/manifest +import pkg/codex/erasure +import pkg/codex/blocktype as bt +import pkg/chronos/transports/stream + +import pkg/codex/node {.all.} + +import ../../asynctest +import ../../examples +import ../helpers + +import ./helpers + +privateAccess(CodexNodeRef) # enable access to private fields + +logScope: + topics = "testSlotRepair" + +proc fetchStreamData(stream: LPStream, datasetSize: int): Future[seq[byte]] {.async.} = + var buf = newSeq[byte](datasetSize) + await stream.readExactly(addr buf[0], datasetSize) + buf + +proc flatten[T](s: seq[seq[T]]): seq[T] = + var t = newSeq[T](0) + for ss in s: + t &= ss + return t + +asyncchecksuite "Test Node - Slot Repair": + let + numNodes = 12 + config = NodeConfig( + useRepoStore: true, + findFreePorts: true, + createFullNode: true, + enableBootstrap: true, + ) + var + manifest: Manifest + builder: Poseidon2Builder + verifiable: Manifest + verifiableBlock: bt.Block + protected: Manifest + cluster: NodesCluster + + nodes: seq[CodexNodeRef] + localStores: seq[BlockStore] + + setup: + cluster = generateNodes(numNodes, config = config) + nodes = cluster.nodes + localStores = cluster.localStores + + teardown: + await cluster.cleanup() + localStores = @[] + nodes = @[] + + test "repair slots (2,1)": + let + expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix + numBlocks = 5 + datasetSize = numBlocks * DefaultBlockSize.int + ecK = 2 + ecM = 1 + localStore = localStores[0] + store = nodes[0].blockStore + blocks = + await makeRandomBlocks(datasetSize = datasetSize, blockSize = DefaultBlockSize) + data = ( + block: + collect(newSeq): + for blk in blocks: + blk.data + ).flatten() + check blocks.len == numBlocks + + # Populate manifest in local store + manifest = await storeDataGetManifest(localStore, blocks) + let + manifestBlock = + bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() + erasure = + Erasure.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool) + + (await localStore.putBlock(manifestBlock)).tryGet() + + protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() + builder = Poseidon2Builder.new(localStore, protected).tryGet() + verifiable = (await builder.buildManifest()).tryGet() + verifiableBlock = + bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() + + # Populate protected manifest in local store + (await localStore.putBlock(verifiableBlock)).tryGet() + + var request = StorageRequest.example + request.content.cid = verifiableBlock.cid + + for i in 0 ..< protected.numSlots.uint64: + (await nodes[i + 1].onStore(request, expiry, i, nil, isRepairing = false)).tryGet() + + await nodes[0].switch.stop() # acts as client + await nodes[1].switch.stop() # slot 0 missing now + + # repair missing slot + (await nodes[4].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() + + await nodes[2].switch.stop() # slot 1 missing now + + (await nodes[5].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() + + await nodes[3].switch.stop() # slot 2 missing now + + (await nodes[6].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() + + await nodes[4].switch.stop() # slot 0 missing now + + # repair missing slot from repaired slots + (await nodes[7].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() + + await nodes[5].switch.stop() # slot 1 missing now + + # repair missing slot from repaired slots + (await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() + + await nodes[6].switch.stop() # slot 2 missing now + + # repair missing slot from repaired slots + (await nodes[9].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() + + let + stream = (await nodes[10].retrieve(verifiableBlock.cid, local = false)).tryGet() + expectedData = await fetchStreamData(stream, datasetSize) + check expectedData.len == data.len + check expectedData == data + + test "repair slots (3,2)": + let + expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix + numBlocks = 40 + datasetSize = numBlocks * DefaultBlockSize.int + ecK = 3 + ecM = 2 + localStore = localStores[0] + store = nodes[0].blockStore + blocks = + await makeRandomBlocks(datasetSize = datasetSize, blockSize = DefaultBlockSize) + data = ( + block: + collect(newSeq): + for blk in blocks: + blk.data + ).flatten() + check blocks.len == numBlocks + + # Populate manifest in local store + manifest = await storeDataGetManifest(localStore, blocks) + let + manifestBlock = + bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() + erasure = + Erasure.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool) + + (await localStore.putBlock(manifestBlock)).tryGet() + + protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() + builder = Poseidon2Builder.new(localStore, protected).tryGet() + verifiable = (await builder.buildManifest()).tryGet() + verifiableBlock = + bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() + + # Populate protected manifest in local store + (await localStore.putBlock(verifiableBlock)).tryGet() + + var request = StorageRequest.example + request.content.cid = verifiableBlock.cid + + for i in 0 ..< protected.numSlots.uint64: + (await nodes[i + 1].onStore(request, expiry, i, nil, isRepairing = false)).tryGet() + + await nodes[0].switch.stop() # acts as client + await nodes[1].switch.stop() # slot 0 missing now + await nodes[3].switch.stop() # slot 2 missing now + + # repair missing slots + (await nodes[6].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() + (await nodes[7].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() + + await nodes[2].switch.stop() # slot 1 missing now + await nodes[4].switch.stop() # slot 3 missing now + + # repair missing slots from repaired slots + (await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() + (await nodes[9].onStore(request, expiry, 3.uint64, nil, isRepairing = true)).tryGet() + + await nodes[5].switch.stop() # slot 4 missing now + + # repair missing slot from repaired slots + (await nodes[10].onStore(request, expiry, 4.uint64, nil, isRepairing = true)).tryGet() + + let + stream = (await nodes[11].retrieve(verifiableBlock.cid, local = false)).tryGet() + expectedData = await fetchStreamData(stream, datasetSize) + check expectedData.len == data.len + check expectedData == data diff --git a/tests/codex/slots/testslotbuilder.nim b/tests/codex/slots/testslotbuilder.nim index 9a2043a8..fc3c7bd5 100644 --- a/tests/codex/slots/testslotbuilder.nim +++ b/tests/codex/slots/testslotbuilder.nim @@ -27,7 +27,7 @@ import pkg/codex/slots {.all.} privateAccess(Poseidon2Builder) # enable access to private fields privateAccess(Manifest) # enable access to private fields -const Strategy = SteppedStrategy +const Strategy = LinearStrategy suite "Slot builder": let @@ -67,7 +67,6 @@ suite "Slot builder": var datasetBlocks: seq[bt.Block] - padBlocks: seq[bt.Block] localStore: BlockStore manifest: Manifest protectedManifest: Manifest @@ -167,7 +166,9 @@ suite "Slot builder": test "Should build slot hashes for all slots": let - steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots) + linearStrategy = Strategy.init( + 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks + ) builder = Poseidon2Builder .new(localStore, protectedManifest, cellSize = cellSize) @@ -176,7 +177,7 @@ suite "Slot builder": for i in 0 ..< numSlots: let expectedHashes = collect(newSeq): - for j, idx in steppedStrategy.getIndicies(i): + for j, idx in linearStrategy.getIndices(i): if j > (protectedManifest.numSlotBlocks - 1): emptyDigest else: @@ -190,7 +191,9 @@ suite "Slot builder": test "Should build slot trees for all slots": let - steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots) + linearStrategy = Strategy.init( + 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks + ) builder = Poseidon2Builder .new(localStore, protectedManifest, cellSize = cellSize) @@ -199,7 +202,7 @@ suite "Slot builder": for i in 0 ..< numSlots: let expectedHashes = collect(newSeq): - for j, idx in steppedStrategy.getIndicies(i): + for j, idx in linearStrategy.getIndices(i): if j > (protectedManifest.numSlotBlocks - 1): emptyDigest else: @@ -235,7 +238,9 @@ suite "Slot builder": test "Should build correct verification root": let - steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots) + linearStrategy = Strategy.init( + 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks + ) builder = Poseidon2Builder .new(localStore, protectedManifest, cellSize = cellSize) .tryGet() @@ -245,7 +250,7 @@ suite "Slot builder": slotsHashes = collect(newSeq): for i in 0 ..< numSlots: let slotHashes = collect(newSeq): - for j, idx in steppedStrategy.getIndicies(i): + for j, idx in linearStrategy.getIndices(i): if j > (protectedManifest.numSlotBlocks - 1): emptyDigest else: @@ -261,7 +266,9 @@ suite "Slot builder": test "Should build correct verification root manifest": let - steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots) + linearStrategy = Strategy.init( + 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks + ) builder = Poseidon2Builder .new(localStore, protectedManifest, cellSize = cellSize) .tryGet() @@ -269,7 +276,7 @@ suite "Slot builder": slotsHashes = collect(newSeq): for i in 0 ..< numSlots: let slotHashes = collect(newSeq): - for j, idx in steppedStrategy.getIndicies(i): + for j, idx in linearStrategy.getIndices(i): if j > (protectedManifest.numSlotBlocks - 1): emptyDigest else: diff --git a/tests/codex/testindexingstrategy.nim b/tests/codex/testindexingstrategy.nim index 322906dc..00486849 100644 --- a/tests/codex/testindexingstrategy.nim +++ b/tests/codex/testindexingstrategy.nim @@ -19,15 +19,15 @@ for offset in @[0, 1, 2, 100]: test "linear": check: - toSeq(linear.getIndicies(0)) == @[0, 1, 2, 3, 4].mapIt(it + offset) - toSeq(linear.getIndicies(1)) == @[5, 6, 7, 8, 9].mapIt(it + offset) - toSeq(linear.getIndicies(2)) == @[10, 11, 12].mapIt(it + offset) + toSeq(linear.getIndices(0)) == @[0, 1, 2, 3, 4].mapIt(it + offset) + toSeq(linear.getIndices(1)) == @[5, 6, 7, 8, 9].mapIt(it + offset) + toSeq(linear.getIndices(2)) == @[10, 11, 12].mapIt(it + offset) test "stepped": check: - toSeq(stepped.getIndicies(0)) == @[0, 3, 6, 9, 12].mapIt(it + offset) - toSeq(stepped.getIndicies(1)) == @[1, 4, 7, 10].mapIt(it + offset) - toSeq(stepped.getIndicies(2)) == @[2, 5, 8, 11].mapIt(it + offset) + toSeq(stepped.getIndices(0)) == @[0, 3, 6, 9, 12].mapIt(it + offset) + toSeq(stepped.getIndices(1)) == @[1, 4, 7, 10].mapIt(it + offset) + toSeq(stepped.getIndices(2)) == @[2, 5, 8, 11].mapIt(it + offset) suite "Indexing strategies": let @@ -39,16 +39,16 @@ suite "Indexing strategies": l = LinearStrategy.init(0, 0, 1) s = SteppedStrategy.init(0, 0, 1) check: - toSeq(l.getIndicies(0)) == @[0] - toSeq(s.getIndicies(0)) == @[0] + toSeq(l.getIndices(0)) == @[0] + toSeq(s.getIndices(0)) == @[0] test "smallest range 1": let l = LinearStrategy.init(0, 1, 1) s = SteppedStrategy.init(0, 1, 1) check: - toSeq(l.getIndicies(0)) == @[0, 1] - toSeq(s.getIndicies(0)) == @[0, 1] + toSeq(l.getIndices(0)) == @[0, 1] + toSeq(s.getIndices(0)) == @[0, 1] test "first index must be smaller than last index": expect IndexingWrongIndexError: @@ -61,14 +61,14 @@ suite "Indexing strategies": test "should split elements evenly when possible": let l = LinearStrategy.init(0, 11, 3) check: - toSeq(l.getIndicies(0)) == @[0, 1, 2, 3].mapIt(it) - toSeq(l.getIndicies(1)) == @[4, 5, 6, 7].mapIt(it) - toSeq(l.getIndicies(2)) == @[8, 9, 10, 11].mapIt(it) + toSeq(l.getIndices(0)) == @[0, 1, 2, 3].mapIt(it) + toSeq(l.getIndices(1)) == @[4, 5, 6, 7].mapIt(it) + toSeq(l.getIndices(2)) == @[8, 9, 10, 11].mapIt(it) test "linear - oob": expect IndexingError: - discard linear.getIndicies(3) + discard linear.getIndices(3) test "stepped - oob": expect IndexingError: - discard stepped.getIndicies(3) + discard stepped.getIndices(3) diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 0ee4fc50..c6f0154b 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -1,4 +1,5 @@ import ./node/testnode import ./node/testcontracts +import ./node/testslotrepair {.warning[UnusedImport]: off.} diff --git a/tests/integration/testslotrepair.nim b/tests/integration/testslotrepair.nim new file mode 100644 index 00000000..267a9c39 --- /dev/null +++ b/tests/integration/testslotrepair.nim @@ -0,0 +1,318 @@ +import pkg/questionable +import pkg/codex/logutils +import ../contracts/time +import ../contracts/deployment +import ../codex/helpers +import ../examples +import ./marketplacesuite +import ./nodeconfigs + +export logutils + +logScope: + topics = "integration test slot repair" + +marketplacesuite "SP Slot Repair": + const minPricePerBytePerSecond = 1.u256 + const collateralPerByte = 1.u256 + const blocks = 3 + const ecNodes = 3 + const ecTolerance = 1 + const size = slotSize(blocks, ecNodes, ecTolerance) + + var filledSlotIds: seq[SlotId] = @[] + var freedSlotId = none SlotId + var requestId: RequestId + + # Here we are keeping track of the slot filled using their ids. + proc onSlotFilled(eventResult: ?!SlotFilled) = + assert not eventResult.isErr + let event = !eventResult + + if event.requestId == requestId: + let slotId = slotId(event.requestId, event.slotIndex) + filledSlotIds.add slotId + + # Here we are retrieving the slot id freed. + # When the event is triggered, the slot id is removed + # from the filled slot id list. + proc onSlotFreed(eventResult: ?!SlotFreed) = + assert not eventResult.isErr + let event = !eventResult + let slotId = slotId(event.requestId, event.slotIndex) + + if event.requestId == requestId: + assert slotId in filledSlotIds + filledSlotIds.del(filledSlotIds.find(slotId)) + freedSlotId = some(slotId) + + proc createPurchase(client: CodexClient): Future[PurchaseId] {.async.} = + let data = await RandomChunker.example(blocks = blocks) + let cid = (await client.upload(data)).get + + let purchaseId = await client.requestStorage( + cid, + expiry = 10.periods, + duration = 20.periods, + nodes = ecNodes, + tolerance = ecTolerance, + collateralPerByte = 1.u256, + pricePerBytePerSecond = minPricePerBytePerSecond, + proofProbability = 1.u256, + ) + requestId = (await client.requestId(purchaseId)).get + + return purchaseId + + proc freeSlot(provider: CodexClient): Future[void] {.async.} = + # Get the second provider signer. + let signer = ethProvider.getSigner(accounts[2]) + let marketplaceWithSecondProviderSigner = marketplace.connect(signer) + + # Call freeSlot to speed up the process. + # It accelerates the test by skipping validator + # proof verification and not waiting for the full period. + # The downside is that this doesn't reflect the real slot freed process. + let slots = (await provider.getSlots()).get() + let slotId = slotId(requestId, slots[0].slotIndex) + discard await marketplaceWithSecondProviderSigner.freeSlot(slotId) + + setup: + filledSlotIds = @[] + freedSlotId = none SlotId + + test "repair from local store", + NodeConfigs( + clients: CodexConfigs.init(nodes = 1).some, + # .debug() + # .withLogFile() + # .withLogTopics("node", "erasure").some, + providers: CodexConfigs + .init(nodes = 2) + .withSimulateProofFailures(idx = 1, failEveryNProofs = 1) + # .debug() + .withLogFile() + .withLogTopics("marketplace", "sales", "reservations", "statemachine").some, + validators: CodexConfigs.init(nodes = 1).some, + # .debug() + # .withLogFile() + # .withLogTopics("validator").some, + ): + let client0 = clients()[0] + let provider0 = providers()[0] + let provider1 = providers()[1] + let expiry = 10.periods + let duration = 20.periods + + # Let's create 2 availabilities + # SP 1 will hosts 2 slots + # SP 2 will hosts 1 slot + let availability0 = ( + await provider0.client.postAvailability( + totalSize = 2 * size.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 3 * size * collateralPerByte, + ) + ).get + let availability1 = ( + await provider1.client.postAvailability( + totalSize = size.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = size * collateralPerByte, + ) + ).get + + let purchaseId = await createPurchase(client0.client) + + let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + + # Wait for purchase starts, meaning that the slots are filled. + check eventually( + await client0.client.purchaseStateIs(purchaseId, "started"), + timeout = expiry.int * 1000, + ) + + # stop client so it doesn't serve any blocks anymore + await client0.stop() + + # Let's disable the second availability, + # SP 2 will not pick the slot again. + await provider1.client.patchAvailability( + availabilityId = availability1.id, enabled = false.some + ) + + # Update the size of the availability for the SP 1, + # he will repair and host the freed slot + await provider0.client.patchAvailability( + availabilityId = availability0.id, + totalSize = (3 * size.truncate(uint64)).uint64.some, + ) + + # Let's free the slot to speed up the process + await freeSlot(provider1.client) + + # We expect that the freed slot is added in the filled slot id list, + # meaning that the slot was repaired locally by SP 1. + check eventually( + freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000 + ) + + await filledSubscription.unsubscribe() + await slotFreedsubscription.unsubscribe() + + test "repair from local and remote store", + NodeConfigs( + clients: CodexConfigs.init(nodes = 1) + # .debug() + # .withLogTopics("node", "erasure") + .some, + providers: CodexConfigs.init(nodes = 3) + # .debug() + # .withLogFile() + # .withLogTopics("marketplace", "sales", "statemachine", "reservations") + .some, + ): + let client0 = clients()[0] + let provider0 = providers()[0] + let provider1 = providers()[1] + let provider2 = providers()[2] + let expiry = 10.periods + let duration = 20.periods + + # SP 1, SP 2 and SP 3 will host one slot + let availability0 = ( + await provider0.client.postAvailability( + totalSize = size.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = size * collateralPerByte, + ) + ).get + let availability1 = ( + await provider1.client.postAvailability( + totalSize = size.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = size * collateralPerByte, + ) + ).get + discard await provider2.client.postAvailability( + totalSize = size.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = size * collateralPerByte, + ) + + let purchaseId = await createPurchase(client0.client) + + let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + + # Wait for purchase starts, meaning that the slots are filled. + check eventually( + await client0.client.purchaseStateIs(purchaseId, "started"), + timeout = expiry.int * 1000, + ) + + # stop client so it doesn't serve any blocks anymore + await client0.stop() + + # Let's disable the availability, + # SP 2 will not pick the slot again. + await provider1.client.patchAvailability(availability1.id, enabled = false.some) + + # Update the size of the availability for the SP 1, + # he will repair and host the freed slot + await provider0.client.patchAvailability( + availability0.id, + totalSize = (2 * size.truncate(uint64)).some, + totalCollateral = (2 * size * collateralPerByte).some, + ) + + # Let's free the slot to speed up the process + await freeSlot(provider1.client) + + # We expect that the freed slot is added in the filled slot id list, + # meaning that the slot was repaired locally and remotely (using SP 3) by SP 1. + check eventually(freedSlotId.isSome, timeout = expiry.int * 1000) + check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000) + + await filledSubscription.unsubscribe() + await slotFreedsubscription.unsubscribe() + + test "repair from remote store only", + NodeConfigs( + clients: CodexConfigs.init(nodes = 1) + # .debug() + # .withLogFile() + # .withLogTopics("node", "erasure") + .some, + providers: CodexConfigs.init(nodes = 3) + # .debug() + # .withLogFile() + # .withLogTopics("marketplace", "sales", "statemachine", "reservations") + .some, + ): + let client0 = clients()[0] + let provider0 = providers()[0] + let provider1 = providers()[1] + let provider2 = providers()[2] + let expiry = 10.periods + let duration = expiry + 10.periods + + # SP 1 will host 2 slots + # SP 2 will host 1 slot + discard await provider0.client.postAvailability( + totalSize = 2 * size.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 2 * size * collateralPerByte, + ) + let availability1 = ( + await provider1.client.postAvailability( + totalSize = size.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = size * collateralPerByte, + ) + ).get + + let purchaseId = await createPurchase(client0.client) + + let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + + # Wait for purchase starts, meaning that the slots are filled. + check eventually( + await client0.client.purchaseStateIs(purchaseId, "started"), + timeout = expiry.int * 1000, + ) + + # stop client so it doesn't serve any blocks anymore + await client0.stop() + + # Let's create an availability for SP3, + # he will host the repaired slot. + discard await provider2.client.postAvailability( + totalSize = size.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = size * collateralPerByte, + ) + + # Let's disable the availability, + # SP 2 will not pick the slot again. + await provider1.client.patchAvailability(availability1.id, enabled = false.some) + + # Let's free the slot to speed up the process + await freeSlot(provider1.client) + + # At this point, SP 3 should repair the slot from SP 1 and host it. + check eventually(freedSlotId.isSome, timeout = expiry.int * 1000) + check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000) + + await filledSubscription.unsubscribe() + await slotFreedsubscription.unsubscribe() diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index 152d22dd..07a69749 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -9,5 +9,6 @@ import ./integration/testmarketplace import ./integration/testproofs import ./integration/testvalidator import ./integration/testecbug +import ./integration/testslotrepair {.warning[UnusedImport]: off.}