Add support for slot reconstruction on unavailable slot detection (#1235)

Co-authored-by: Arnaud <arnaud@status.im>
This commit is contained in:
Chrysostomos Nanakos 2025-06-13 01:19:42 +03:00 committed by GitHub
parent 3d2d8273e6
commit b305e00160
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1019 additions and 153 deletions

View File

@ -124,6 +124,10 @@ proc start*(b: Advertiser) {.async: (raises: []).} =
trace "Advertiser start"
# The advertiser is expected to be started only once.
if b.advertiserRunning:
raiseAssert "Advertiser can only be started once — this should not happen"
proc onBlock(cid: Cid) {.async: (raises: []).} =
try:
await b.advertiseBlock(cid)
@ -133,10 +137,6 @@ proc start*(b: Advertiser) {.async: (raises: []).} =
doAssert(b.localStore.onBlockStored.isNone())
b.localStore.onBlockStored = onBlock.some
if b.advertiserRunning:
warn "Starting advertiser twice"
return
b.advertiserRunning = true
for i in 0 ..< b.concurrentAdvReqs:
let fut = b.processQueueLoop()

View File

@ -56,6 +56,7 @@ type
codexNode: CodexNodeRef
repoStore: RepoStore
maintenance: BlockMaintainer
taskpool: Taskpool
CodexPrivateKey* = libp2p.PrivateKey # alias
EthWallet = ethers.Wallet
@ -193,6 +194,9 @@ proc stop*(s: CodexServer) {.async.} =
error "Failed to stop codex node", failures = res.failure.len
raiseAssert "Failed to stop codex node"
if not s.taskpool.isNil:
s.taskpool.shutdown()
proc new*(
T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey
): CodexServer =
@ -333,4 +337,5 @@ proc new*(
restServer: restServer,
repoStore: repoStore,
maintenance: maintenance,
taskpool: taskpool,
)

View File

@ -25,6 +25,7 @@ import ../logutils
import ../manifest
import ../merkletree
import ../stores
import ../clock
import ../blocktype as bt
import ../utils
import ../utils/asynciter
@ -120,19 +121,25 @@ func indexToPos(steps, idx, step: int): int {.inline.} =
(idx - step) div steps
proc getPendingBlocks(
self: Erasure, manifest: Manifest, indicies: seq[int]
): AsyncIter[(?!bt.Block, int)] =
self: Erasure, manifest: Manifest, indices: seq[int]
): (AsyncIter[(?!bt.Block, int)], seq[Future[?!bt.Block]]) =
## Get pending blocks iterator
##
var
pendingBlockFutures: seq[Future[?!bt.Block]] = @[]
pendingBlocks: seq[Future[(?!bt.Block, int)]] = @[]
proc attachIndex(
fut: Future[?!bt.Block], i: int
): Future[(?!bt.Block, int)] {.async.} =
## avoids closure capture issues
return (await fut, i)
for blockIndex in indices:
# request blocks from the store
pendingBlocks = indicies.map(
(i: int) =>
self.store.getBlock(BlockAddress.init(manifest.treeCid, i)).map(
(r: ?!bt.Block) => (r, i)
) # Get the data blocks (first K)
)
let fut = self.store.getBlock(BlockAddress.init(manifest.treeCid, blockIndex))
pendingBlockFutures.add(fut)
pendingBlocks.add(attachIndex(fut, blockIndex))
proc isFinished(): bool =
pendingBlocks.len == 0
@ -150,7 +157,7 @@ proc getPendingBlocks(
$index,
)
AsyncIter[(?!bt.Block, int)].new(genNext, isFinished)
(AsyncIter[(?!bt.Block, int)].new(genNext, isFinished), pendingBlockFutures)
proc prepareEncodingData(
self: Erasure,
@ -168,16 +175,16 @@ proc prepareEncodingData(
strategy = params.strategy.init(
firstIndex = 0, lastIndex = params.rounded - 1, iterations = params.steps
)
indicies = toSeq(strategy.getIndicies(step))
pendingBlocksIter =
self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount))
indices = toSeq(strategy.getIndices(step))
(pendingBlocksIter, _) =
self.getPendingBlocks(manifest, indices.filterIt(it < manifest.blocksCount))
var resolved = 0
for fut in pendingBlocksIter:
let (blkOrErr, idx) = await fut
without blk =? blkOrErr, err:
warn "Failed retreiving a block", treeCid = manifest.treeCid, idx, msg = err.msg
continue
warn "Failed retrieving a block", treeCid = manifest.treeCid, idx, msg = err.msg
return failure(err)
let pos = indexToPos(params.steps, idx, step)
shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data)
@ -185,7 +192,7 @@ proc prepareEncodingData(
resolved.inc()
for idx in indicies.filterIt(it >= manifest.blocksCount):
for idx in indices.filterIt(it >= manifest.blocksCount):
let pos = indexToPos(params.steps, idx, step)
trace "Padding with empty block", idx
shallowCopy(data[pos], emptyBlock)
@ -218,8 +225,8 @@ proc prepareDecodingData(
strategy = encoded.protectedStrategy.init(
firstIndex = 0, lastIndex = encoded.blocksCount - 1, iterations = encoded.steps
)
indicies = toSeq(strategy.getIndicies(step))
pendingBlocksIter = self.getPendingBlocks(encoded, indicies)
indices = toSeq(strategy.getIndices(step))
(pendingBlocksIter, pendingBlockFutures) = self.getPendingBlocks(encoded, indices)
var
dataPieces = 0
@ -233,7 +240,7 @@ proc prepareDecodingData(
let (blkOrErr, idx) = await fut
without blk =? blkOrErr, err:
trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg
trace "Failed retrieving a block", idx, treeCid = encoded.treeCid, msg = err.msg
continue
let pos = indexToPos(encoded.steps, idx, step)
@ -259,6 +266,11 @@ proc prepareDecodingData(
resolved.inc
pendingBlockFutures.apply(
proc(fut: auto) =
fut.cancel()
)
return success (dataPieces.Natural, parityPieces.Natural)
proc init*(
@ -352,7 +364,7 @@ proc asyncEncode*(
return failure(joinErr)
if not task.success.load():
return failure("Leopard encoding failed")
return failure("Leopard encoding task failed")
success()
@ -382,6 +394,8 @@ proc encodeData(
var
data = seq[seq[byte]].new() # number of blocks to encode
parity = createDoubleArray(params.ecM, manifest.blockSize.int)
defer:
freeDoubleArray(parity, params.ecM)
data[].setLen(params.ecK)
# TODO: this is a tight blocking loop so we sleep here to allow
@ -406,8 +420,6 @@ proc encodeData(
return failure(err)
except CancelledError as exc:
raise exc
finally:
freeDoubleArray(parity, params.ecM)
var idx = params.rounded + step
for j in 0 ..< params.ecM:
@ -544,17 +556,13 @@ proc asyncDecode*(
return failure(joinErr)
if not task.success.load():
return failure("Leopard encoding failed")
return failure("Leopard decoding task failed")
success()
proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
## Decode a protected manifest into it's original
## manifest
##
## `encoded` - the encoded (protected) manifest to
## be recovered
##
proc decodeInternal(
self: Erasure, encoded: Manifest
): Future[?!(ref seq[Cid], seq[Natural])] {.async.} =
logScope:
steps = encoded.steps
rounded_blocks = encoded.rounded
@ -578,6 +586,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
data = seq[seq[byte]].new()
parityData = seq[seq[byte]].new()
recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int)
defer:
freeDoubleArray(recovered, encoded.ecK)
data[].setLen(encoded.ecK) # set len to K
parityData[].setLen(encoded.ecM) # set len to M
@ -604,8 +614,6 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
return failure(err)
except CancelledError as exc:
raise exc
finally:
freeDoubleArray(recovered, encoded.ecK)
for i in 0 ..< encoded.ecK:
let idx = i * encoded.steps + step
@ -634,6 +642,19 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
finally:
decoder.release()
return (cids, recoveredIndices).success
proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
## Decode a protected manifest into it's original
## manifest
##
## `encoded` - the encoded (protected) manifest to
## be recovered
##
without (cids, recoveredIndices) =? (await self.decodeInternal(encoded)), err:
return failure(err)
without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err:
return failure(err)
@ -655,6 +676,44 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
return decoded.success
proc repair*(self: Erasure, encoded: Manifest): Future[?!void] {.async.} =
## Repair a protected manifest by reconstructing the full dataset
##
## `encoded` - the encoded (protected) manifest to
## be repaired
##
without (cids, _) =? (await self.decodeInternal(encoded)), err:
return failure(err)
without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err:
return failure(err)
without treeCid =? tree.rootCid, err:
return failure(err)
if treeCid != encoded.originalTreeCid:
return failure(
"Original tree root differs from the tree root computed out of recovered data"
)
if err =? (await self.store.putAllProofs(tree)).errorOption:
return failure(err)
without repaired =? (
await self.encode(
Manifest.new(encoded), encoded.ecK, encoded.ecM, encoded.protectedStrategy
)
), err:
return failure(err)
if repaired.treeCid != encoded.treeCid:
return failure(
"Original tree root differs from the repaired tree root encoded out of recovered data"
)
return success()
proc start*(self: Erasure) {.async.} =
return

View File

@ -24,13 +24,17 @@ type
IndexingError* = object of CodexError
IndexingWrongIndexError* = object of IndexingError
IndexingWrongIterationsError* = object of IndexingError
IndexingWrongGroupCountError* = object of IndexingError
IndexingWrongPadBlockCountError* = object of IndexingError
IndexingStrategy* = object
strategyType*: StrategyType
strategyType*: StrategyType # Indexing strategy algorithm
firstIndex*: int # Lowest index that can be returned
lastIndex*: int # Highest index that can be returned
iterations*: int # getIndices(iteration) will run from 0 ..< iterations
step*: int
iterations*: int # Number of iteration steps (0 ..< iterations)
step*: int # Step size between generated indices
groupCount*: int # Number of groups to partition indices into
padBlockCount*: int # Number of padding blocks to append per group
func checkIteration(
self: IndexingStrategy, iteration: int
@ -44,39 +48,47 @@ func getIter(first, last, step: int): Iter[int] =
{.cast(noSideEffect).}:
Iter[int].new(first, last, step)
func getLinearIndicies(
self: IndexingStrategy, iteration: int
): Iter[int] {.raises: [IndexingError].} =
self.checkIteration(iteration)
func getLinearIndices(self: IndexingStrategy, iteration: int): Iter[int] =
let
first = self.firstIndex + iteration * self.step
last = min(first + self.step - 1, self.lastIndex)
getIter(first, last, 1)
func getSteppedIndicies(
self: IndexingStrategy, iteration: int
): Iter[int] {.raises: [IndexingError].} =
self.checkIteration(iteration)
func getSteppedIndices(self: IndexingStrategy, iteration: int): Iter[int] =
let
first = self.firstIndex + iteration
last = self.lastIndex
getIter(first, last, self.iterations)
func getIndicies*(
self: IndexingStrategy, iteration: int
): Iter[int] {.raises: [IndexingError].} =
func getStrategyIndices(self: IndexingStrategy, iteration: int): Iter[int] =
case self.strategyType
of StrategyType.LinearStrategy:
self.getLinearIndicies(iteration)
self.getLinearIndices(iteration)
of StrategyType.SteppedStrategy:
self.getSteppedIndicies(iteration)
self.getSteppedIndices(iteration)
func getIndices*(
self: IndexingStrategy, iteration: int
): Iter[int] {.raises: [IndexingError].} =
self.checkIteration(iteration)
{.cast(noSideEffect).}:
Iter[int].new(
iterator (): int {.gcsafe.} =
for value in self.getStrategyIndices(iteration):
yield value
for i in 0 ..< self.padBlockCount:
yield self.lastIndex + (iteration + 1) + i * self.groupCount
)
func init*(
strategy: StrategyType, firstIndex, lastIndex, iterations: int
strategy: StrategyType,
firstIndex, lastIndex, iterations: int,
groupCount = 0,
padBlockCount = 0,
): IndexingStrategy {.raises: [IndexingError].} =
if firstIndex > lastIndex:
raise newException(
@ -91,10 +103,24 @@ func init*(
"iterations (" & $iterations & ") must be greater than zero.",
)
if padBlockCount < 0:
raise newException(
IndexingWrongPadBlockCountError,
"padBlockCount (" & $padBlockCount & ") must be equal or greater than zero.",
)
if padBlockCount > 0 and groupCount <= 0:
raise newException(
IndexingWrongGroupCountError,
"groupCount (" & $groupCount & ") must be greater than zero.",
)
IndexingStrategy(
strategyType: strategy,
firstIndex: firstIndex,
lastIndex: lastIndex,
iterations: iterations,
step: divUp((lastIndex - firstIndex + 1), iterations),
groupCount: groupCount,
padBlockCount: padBlockCount,
)

View File

@ -639,10 +639,6 @@ proc onStore(
trace "Received a request to store a slot"
# TODO: Use the isRepairing to manage the slot download.
# If isRepairing is true, the slot has to be repaired before
# being downloaded.
without manifest =? (await self.fetchManifest(cid)), err:
trace "Unable to fetch manifest for cid", cid, err = err.msg
return failure(err)
@ -675,18 +671,33 @@ proc onStore(
return success()
if slotIdx > int.high.uint64:
error "Cannot cast slot index to int", slotIndex = slotIdx
return
if isRepairing:
trace "start repairing slot", slotIdx
try:
let erasure = Erasure.new(
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
)
if err =? (await erasure.repair(manifest)).errorOption:
error "Unable to erasure decode repairing manifest",
cid = manifest.treeCid, exc = err.msg
return failure(err)
except CatchableError as exc:
error "Error erasure decoding repairing manifest",
cid = manifest.treeCid, exc = exc.msg
return failure(exc.msg)
else:
without indexer =?
manifest.verifiableStrategy.init(0, manifest.blocksCount - 1, manifest.numSlots).catch,
err:
trace "Unable to create indexing strategy from protected manifest", err = err.msg
return failure(err)
if slotIdx > int.high.uint64:
error "Cannot cast slot index to int", slotIndex = slotIdx
return
without blksIter =? indexer.getIndicies(slotIdx.int).catch, err:
trace "Unable to get indicies from strategy", err = err.msg
without blksIter =? indexer.getIndices(slotIdx.int).catch, err:
trace "Unable to get indices from strategy", err = err.msg
return failure(err)
if err =? (
@ -699,8 +710,6 @@ proc onStore(
trace "Unable to build slot", err = err.msg
return failure(err)
trace "Slot successfully retrieved and reconstructed"
if cid =? slotRoot.toSlotCid() and cid != manifest.slotRoots[slotIdx]:
trace "Slot root mismatch",
manifest = manifest.slotRoots[slotIdx.int], recovered = slotRoot.toSlotCid()
@ -842,9 +851,6 @@ proc start*(self: CodexNodeRef) {.async.} =
proc stop*(self: CodexNodeRef) {.async.} =
trace "Stopping node"
if not self.taskpool.isNil:
self.taskpool.shutdown()
await self.trackedFutures.cancelTracked()
if not self.engine.isNil:

View File

@ -113,17 +113,17 @@ func numSlotCells*[T, H](self: SlotsBuilder[T, H]): Natural =
self.numBlockCells * self.numSlotBlocks
func slotIndiciesIter*[T, H](self: SlotsBuilder[T, H], slot: Natural): ?!Iter[int] =
func slotIndicesIter*[T, H](self: SlotsBuilder[T, H], slot: Natural): ?!Iter[int] =
## Returns the slot indices.
##
self.strategy.getIndicies(slot).catch
self.strategy.getIndices(slot).catch
func slotIndicies*[T, H](self: SlotsBuilder[T, H], slot: Natural): seq[int] =
func slotIndices*[T, H](self: SlotsBuilder[T, H], slot: Natural): seq[int] =
## Returns the slot indices.
##
if iter =? self.strategy.getIndicies(slot).catch:
if iter =? self.strategy.getIndices(slot).catch:
return toSeq(iter)
func manifest*[T, H](self: SlotsBuilder[T, H]): Manifest =
@ -184,7 +184,7 @@ proc getCellHashes*[T, H](
slotIndex = slotIndex
let hashes = collect(newSeq):
for i, blkIdx in self.strategy.getIndicies(slotIndex):
for i, blkIdx in self.strategy.getIndices(slotIndex):
logScope:
blkIdx = blkIdx
pos = i
@ -310,7 +310,7 @@ proc new*[T, H](
_: type SlotsBuilder[T, H],
store: BlockStore,
manifest: Manifest,
strategy = SteppedStrategy,
strategy = LinearStrategy,
cellSize = DefaultCellSize,
): ?!SlotsBuilder[T, H] =
if not manifest.protected:
@ -354,7 +354,14 @@ proc new*[T, H](
emptyBlock = newSeq[byte](manifest.blockSize.int)
emptyDigestTree = ?T.digestTree(emptyBlock, cellSize.int)
strategy = ?strategy.init(0, numBlocksTotal - 1, manifest.numSlots).catch
strategy =
?strategy.init(
0,
manifest.blocksCount - 1,
manifest.numSlots,
manifest.numSlots,
numPadSlotBlocks,
).catch
logScope:
numSlotBlocks = numSlotBlocks

View File

@ -53,7 +53,7 @@ proc getSample*[T, H](
cellsPerBlock = self.builder.numBlockCells
blkCellIdx = cellIdx.toCellInBlk(cellsPerBlock) # block cell index
blkSlotIdx = cellIdx.toBlkInSlot(cellsPerBlock) # slot tree index
origBlockIdx = self.builder.slotIndicies(self.index)[blkSlotIdx]
origBlockIdx = self.builder.slotIndices(self.index)[blkSlotIdx]
# convert to original dataset block index
logScope:

View File

@ -7,6 +7,7 @@ type
Function*[T, U] = proc(fut: T): U {.raises: [CatchableError], gcsafe, closure.}
IsFinished* = proc(): bool {.raises: [], gcsafe, closure.}
GenNext*[T] = proc(): T {.raises: [CatchableError], gcsafe.}
Iterator[T] = iterator (): T
Iter*[T] = ref object
finished: bool
next*: GenNext[T]
@ -89,6 +90,37 @@ proc new*[T](_: type Iter[T], items: seq[T]): Iter[T] =
Iter[int].new(0 ..< items.len).map((i: int) => items[i])
proc new*[T](_: type Iter[T], iter: Iterator[T]): Iter[T] =
## Creates a new Iter from an iterator
##
var nextOrErr: Option[?!T]
proc tryNext(): void =
nextOrErr = none(?!T)
while not iter.finished:
try:
let t: T = iter()
if not iter.finished:
nextOrErr = some(success(t))
break
except CatchableError as err:
nextOrErr = some(T.failure(err))
proc genNext(): T {.raises: [CatchableError].} =
if nextOrErr.isNone:
raise newException(CatchableError, "Iterator finished but genNext was called")
without u =? nextOrErr.unsafeGet, err:
raise err
tryNext()
return u
proc isFinished(): bool =
nextOrErr.isNone
tryNext()
Iter[T].new(genNext, isFinished)
proc empty*[T](_: type Iter[T]): Iter[T] =
## Creates an empty Iter
##
@ -105,10 +137,10 @@ proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] =
Iter[U].new(genNext = () => fn(iter.next()), isFinished = () => iter.finished)
proc mapFilter*[T, U](iter: Iter[T], mapPredicate: Function[T, Option[U]]): Iter[U] =
var nextUOrErr: Option[Result[U, ref CatchableError]]
var nextUOrErr: Option[?!U]
proc tryFetch(): void =
nextUOrErr = Result[U, ref CatchableError].none
nextUOrErr = none(?!U)
while not iter.finished:
try:
let t = iter.next()
@ -119,6 +151,9 @@ proc mapFilter*[T, U](iter: Iter[T], mapPredicate: Function[T, Option[U]]): Iter
nextUOrErr = some(U.failure(err))
proc genNext(): U {.raises: [CatchableError].} =
if nextUOrErr.isNone:
raise newException(CatchableError, "Iterator finished but genNext was called")
# at this point nextUOrErr should always be some(..)
without u =? nextUOrErr.unsafeGet, err:
raise err

View File

@ -26,8 +26,8 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
setup:
blocks1 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb)
blocks2 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb)
nodeCmps1 = generateNodes(1, blocks1)[0]
nodeCmps2 = generateNodes(1, blocks2)[0]
nodeCmps1 = generateNodes(1, blocks1).components[0]
nodeCmps2 = generateNodes(1, blocks2).components[0]
await allFuturesThrowing(
nodeCmps1.switch.start(),

View File

@ -1,6 +1,7 @@
import std/sequtils
import pkg/chronos
import pkg/taskpools
import pkg/libp2p
import pkg/libp2p/errors
@ -8,11 +9,59 @@ import pkg/codex/discovery
import pkg/codex/stores
import pkg/codex/blocktype as bt
import pkg/codex/blockexchange
import pkg/codex/systemclock
import pkg/codex/nat
import pkg/codex/utils/natutils
import pkg/codex/slots
import pkg/codex/node
import ../examples
import ../../helpers
type NodesComponents* =
tuple[
proc nextFreePort*(startPort: int): Future[int] {.async.} =
proc client(server: StreamServer, transp: StreamTransport) {.async: (raises: []).} =
await transp.closeWait()
var port = startPort
while true:
try:
let host = initTAddress("127.0.0.1", port)
var server = createStreamServer(host, client, {ReuseAddr})
await server.closeWait()
return port
except TransportOsError:
inc port
type
NodesComponents* = object
switch*: Switch
blockDiscovery*: Discovery
wallet*: WalletRef
network*: BlockExcNetwork
localStore*: BlockStore
peerStore*: PeerCtxStore
pendingBlocks*: PendingBlocksManager
discovery*: DiscoveryEngine
engine*: BlockExcEngine
networkStore*: NetworkStore
node*: CodexNodeRef = nil
tempDbs*: seq[TempLevelDb] = @[]
NodesCluster* = ref object
components*: seq[NodesComponents]
taskpool*: Taskpool
NodeConfig* = object
useRepoStore*: bool = false
findFreePorts*: bool = false
basePort*: int = 8080
createFullNode*: bool = false
enableBootstrap*: bool = false
converter toTuple*(
nc: NodesComponents
): tuple[
switch: Switch,
blockDiscovery: Discovery,
wallet: WalletRef,
@ -23,45 +72,158 @@ type NodesComponents* =
discovery: DiscoveryEngine,
engine: BlockExcEngine,
networkStore: NetworkStore,
]
] =
(
nc.switch, nc.blockDiscovery, nc.wallet, nc.network, nc.localStore, nc.peerStore,
nc.pendingBlocks, nc.discovery, nc.engine, nc.networkStore,
)
converter toComponents*(cluster: NodesCluster): seq[NodesComponents] =
cluster.components
proc nodes*(cluster: NodesCluster): seq[CodexNodeRef] =
cluster.components.filterIt(it.node != nil).mapIt(it.node)
proc localStores*(cluster: NodesCluster): seq[BlockStore] =
cluster.components.mapIt(it.localStore)
proc switches*(cluster: NodesCluster): seq[Switch] =
cluster.components.mapIt(it.switch)
proc generateNodes*(
num: Natural, blocks: openArray[bt.Block] = []
): seq[NodesComponents] =
num: Natural, blocks: openArray[bt.Block] = [], config: NodeConfig = NodeConfig()
): NodesCluster =
var
components: seq[NodesComponents] = @[]
taskpool = Taskpool.new()
bootstrapNodes: seq[SignedPeerRecord] = @[]
for i in 0 ..< num:
let basePortForNode = config.basePort + 2 * i.int
let listenPort =
if config.findFreePorts:
waitFor nextFreePort(basePortForNode)
else:
basePortForNode
let bindPort =
if config.findFreePorts:
waitFor nextFreePort(listenPort + 1)
else:
listenPort + 1
let
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
discovery = Discovery.new(
switch.peerInfo.privateKey,
announceAddrs =
@[
MultiAddress.init("/ip4/127.0.0.1/tcp/0").expect(
"Should return multiaddress"
listenAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/" & $listenPort).expect(
"invalid multiaddress"
)
],
switch = newStandardSwitch(
transportFlags = {ServerFlags.ReuseAddr},
sendSignedPeerRecord = config.enableBootstrap,
addrs =
if config.findFreePorts:
listenAddr
else:
MultiAddress.init("/ip4/127.0.0.1/tcp/0").expect("invalid multiaddress"),
)
wallet = WalletRef.example
wallet =
if config.createFullNode:
WalletRef.new(EthPrivateKey.random())
else:
WalletRef.example
network = BlockExcNetwork.new(switch)
localStore = CacheStore.new(blocks.mapIt(it))
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
advertiser = Advertiser.new(localStore, discovery)
blockDiscovery =
DiscoveryEngine.new(localStore, peerStore, network, discovery, pendingBlocks)
let (localStore, tempDbs, blockDiscovery) =
if config.useRepoStore:
let
bdStore = TempLevelDb.new()
repoStore = TempLevelDb.new()
mdStore = TempLevelDb.new()
store =
RepoStore.new(repoStore.newDb(), mdStore.newDb(), clock = SystemClock.new())
blockDiscoveryStore = bdStore.newDb()
discovery = Discovery.new(
switch.peerInfo.privateKey,
announceAddrs = @[listenAddr],
bindPort = bindPort.Port,
store = blockDiscoveryStore,
bootstrapNodes = bootstrapNodes,
)
waitFor store.start()
(store.BlockStore, @[bdStore, repoStore, mdStore], discovery)
else:
let
store = CacheStore.new(blocks.mapIt(it))
discovery =
Discovery.new(switch.peerInfo.privateKey, announceAddrs = @[listenAddr])
(store.BlockStore, newSeq[TempLevelDb](), discovery)
let
discovery = DiscoveryEngine.new(
localStore, peerStore, network, blockDiscovery, pendingBlocks
)
advertiser = Advertiser.new(localStore, blockDiscovery)
engine = BlockExcEngine.new(
localStore, wallet, network, blockDiscovery, advertiser, peerStore,
pendingBlocks,
localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks
)
networkStore = NetworkStore.new(engine, localStore)
switch.mount(network)
let nc: NodesComponents = (
switch, discovery, wallet, network, localStore, peerStore, pendingBlocks,
blockDiscovery, engine, networkStore,
let node =
if config.createFullNode:
let fullNode = CodexNodeRef.new(
switch = switch,
networkStore = networkStore,
engine = engine,
prover = Prover.none,
discovery = blockDiscovery,
taskpool = taskpool,
)
result.add(nc)
if config.enableBootstrap:
waitFor switch.peerInfo.update()
let (announceAddrs, discoveryAddrs) = nattedAddress(
NatConfig(hasExtIp: false, nat: NatNone),
switch.peerInfo.addrs,
bindPort.Port,
)
blockDiscovery.updateAnnounceRecord(announceAddrs)
blockDiscovery.updateDhtRecord(discoveryAddrs)
if blockDiscovery.dhtRecord.isSome:
bootstrapNodes.add !blockDiscovery.dhtRecord
fullNode
else:
nil
let nodeComponent = NodesComponents(
switch: switch,
blockDiscovery: blockDiscovery,
wallet: wallet,
network: network,
localStore: localStore,
peerStore: peerStore,
pendingBlocks: pendingBlocks,
discovery: discovery,
engine: engine,
networkStore: networkStore,
node: node,
tempDbs: tempDbs,
)
components.add(nodeComponent)
if config.createFullNode:
for component in components:
if component.node != nil:
waitFor component.node.switch.start()
waitFor component.node.start()
return NodesCluster(components: components, taskpool: taskpool)
proc connectNodes*(nodes: seq[Switch]) {.async.} =
for dialer in nodes:
@ -71,3 +233,22 @@ proc connectNodes*(nodes: seq[Switch]) {.async.} =
proc connectNodes*(nodes: seq[NodesComponents]) {.async.} =
await connectNodes(nodes.mapIt(it.switch))
proc connectNodes*(cluster: NodesCluster) {.async.} =
await connectNodes(cluster.components)
proc cleanup*(cluster: NodesCluster) {.async.} =
for component in cluster.components:
if component.node != nil:
await component.node.switch.stop()
await component.node.stop()
for component in cluster.components:
for db in component.tempDbs:
await db.destroyDb()
for component in cluster.components:
if component.tempDbs.len > 0:
await RepoStore(component.localStore).stop()
cluster.taskpool.shutdown()

View File

@ -129,11 +129,11 @@ asyncchecksuite "Test Node - Host contracts":
(await onStore(request, expiry, 1.uint64, onBlocks, isRepairing = false)).tryGet()
check fetchedBytes == 12 * DefaultBlockSize.uint
let indexer = verifiable.protectedStrategy.init(
0, verifiable.numSlotBlocks() - 1, verifiable.numSlots
let indexer = verifiable.verifiableStrategy.init(
0, verifiable.blocksCount - 1, verifiable.numSlots
)
for index in indexer.getIndicies(1):
for index in indexer.getIndices(1):
let
blk = (await localStore.getBlock(verifiable.treeCid, index)).tryGet
key = (createBlockExpirationMetadataKey(blk.cid)).tryGet

View File

@ -0,0 +1,220 @@
import std/options
import std/importutils
import std/times
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/codex/logutils
import pkg/codex/stores
import pkg/codex/contracts
import pkg/codex/slots
import pkg/codex/manifest
import pkg/codex/erasure
import pkg/codex/blocktype as bt
import pkg/chronos/transports/stream
import pkg/codex/node {.all.}
import ../../asynctest
import ../../examples
import ../helpers
import ./helpers
privateAccess(CodexNodeRef) # enable access to private fields
logScope:
topics = "testSlotRepair"
proc fetchStreamData(stream: LPStream, datasetSize: int): Future[seq[byte]] {.async.} =
var buf = newSeq[byte](datasetSize)
await stream.readExactly(addr buf[0], datasetSize)
buf
proc flatten[T](s: seq[seq[T]]): seq[T] =
var t = newSeq[T](0)
for ss in s:
t &= ss
return t
asyncchecksuite "Test Node - Slot Repair":
let
numNodes = 12
config = NodeConfig(
useRepoStore: true,
findFreePorts: true,
createFullNode: true,
enableBootstrap: true,
)
var
manifest: Manifest
builder: Poseidon2Builder
verifiable: Manifest
verifiableBlock: bt.Block
protected: Manifest
cluster: NodesCluster
nodes: seq[CodexNodeRef]
localStores: seq[BlockStore]
setup:
cluster = generateNodes(numNodes, config = config)
nodes = cluster.nodes
localStores = cluster.localStores
teardown:
await cluster.cleanup()
localStores = @[]
nodes = @[]
test "repair slots (2,1)":
let
expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix
numBlocks = 5
datasetSize = numBlocks * DefaultBlockSize.int
ecK = 2
ecM = 1
localStore = localStores[0]
store = nodes[0].blockStore
blocks =
await makeRandomBlocks(datasetSize = datasetSize, blockSize = DefaultBlockSize)
data = (
block:
collect(newSeq):
for blk in blocks:
blk.data
).flatten()
check blocks.len == numBlocks
# Populate manifest in local store
manifest = await storeDataGetManifest(localStore, blocks)
let
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
erasure =
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool)
(await localStore.putBlock(manifestBlock)).tryGet()
protected = (await erasure.encode(manifest, ecK, ecM)).tryGet()
builder = Poseidon2Builder.new(localStore, protected).tryGet()
verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock =
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
# Populate protected manifest in local store
(await localStore.putBlock(verifiableBlock)).tryGet()
var request = StorageRequest.example
request.content.cid = verifiableBlock.cid
for i in 0 ..< protected.numSlots.uint64:
(await nodes[i + 1].onStore(request, expiry, i, nil, isRepairing = false)).tryGet()
await nodes[0].switch.stop() # acts as client
await nodes[1].switch.stop() # slot 0 missing now
# repair missing slot
(await nodes[4].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet()
await nodes[2].switch.stop() # slot 1 missing now
(await nodes[5].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet()
await nodes[3].switch.stop() # slot 2 missing now
(await nodes[6].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet()
await nodes[4].switch.stop() # slot 0 missing now
# repair missing slot from repaired slots
(await nodes[7].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet()
await nodes[5].switch.stop() # slot 1 missing now
# repair missing slot from repaired slots
(await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet()
await nodes[6].switch.stop() # slot 2 missing now
# repair missing slot from repaired slots
(await nodes[9].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet()
let
stream = (await nodes[10].retrieve(verifiableBlock.cid, local = false)).tryGet()
expectedData = await fetchStreamData(stream, datasetSize)
check expectedData.len == data.len
check expectedData == data
test "repair slots (3,2)":
let
expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix
numBlocks = 40
datasetSize = numBlocks * DefaultBlockSize.int
ecK = 3
ecM = 2
localStore = localStores[0]
store = nodes[0].blockStore
blocks =
await makeRandomBlocks(datasetSize = datasetSize, blockSize = DefaultBlockSize)
data = (
block:
collect(newSeq):
for blk in blocks:
blk.data
).flatten()
check blocks.len == numBlocks
# Populate manifest in local store
manifest = await storeDataGetManifest(localStore, blocks)
let
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
erasure =
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool)
(await localStore.putBlock(manifestBlock)).tryGet()
protected = (await erasure.encode(manifest, ecK, ecM)).tryGet()
builder = Poseidon2Builder.new(localStore, protected).tryGet()
verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock =
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
# Populate protected manifest in local store
(await localStore.putBlock(verifiableBlock)).tryGet()
var request = StorageRequest.example
request.content.cid = verifiableBlock.cid
for i in 0 ..< protected.numSlots.uint64:
(await nodes[i + 1].onStore(request, expiry, i, nil, isRepairing = false)).tryGet()
await nodes[0].switch.stop() # acts as client
await nodes[1].switch.stop() # slot 0 missing now
await nodes[3].switch.stop() # slot 2 missing now
# repair missing slots
(await nodes[6].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet()
(await nodes[7].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet()
await nodes[2].switch.stop() # slot 1 missing now
await nodes[4].switch.stop() # slot 3 missing now
# repair missing slots from repaired slots
(await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet()
(await nodes[9].onStore(request, expiry, 3.uint64, nil, isRepairing = true)).tryGet()
await nodes[5].switch.stop() # slot 4 missing now
# repair missing slot from repaired slots
(await nodes[10].onStore(request, expiry, 4.uint64, nil, isRepairing = true)).tryGet()
let
stream = (await nodes[11].retrieve(verifiableBlock.cid, local = false)).tryGet()
expectedData = await fetchStreamData(stream, datasetSize)
check expectedData.len == data.len
check expectedData == data

View File

@ -27,7 +27,7 @@ import pkg/codex/slots {.all.}
privateAccess(Poseidon2Builder) # enable access to private fields
privateAccess(Manifest) # enable access to private fields
const Strategy = SteppedStrategy
const Strategy = LinearStrategy
suite "Slot builder":
let
@ -67,7 +67,6 @@ suite "Slot builder":
var
datasetBlocks: seq[bt.Block]
padBlocks: seq[bt.Block]
localStore: BlockStore
manifest: Manifest
protectedManifest: Manifest
@ -167,7 +166,9 @@ suite "Slot builder":
test "Should build slot hashes for all slots":
let
steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots)
linearStrategy = Strategy.init(
0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks
)
builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize)
@ -176,7 +177,7 @@ suite "Slot builder":
for i in 0 ..< numSlots:
let
expectedHashes = collect(newSeq):
for j, idx in steppedStrategy.getIndicies(i):
for j, idx in linearStrategy.getIndices(i):
if j > (protectedManifest.numSlotBlocks - 1):
emptyDigest
else:
@ -190,7 +191,9 @@ suite "Slot builder":
test "Should build slot trees for all slots":
let
steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots)
linearStrategy = Strategy.init(
0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks
)
builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize)
@ -199,7 +202,7 @@ suite "Slot builder":
for i in 0 ..< numSlots:
let
expectedHashes = collect(newSeq):
for j, idx in steppedStrategy.getIndicies(i):
for j, idx in linearStrategy.getIndices(i):
if j > (protectedManifest.numSlotBlocks - 1):
emptyDigest
else:
@ -235,7 +238,9 @@ suite "Slot builder":
test "Should build correct verification root":
let
steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots)
linearStrategy = Strategy.init(
0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks
)
builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize)
.tryGet()
@ -245,7 +250,7 @@ suite "Slot builder":
slotsHashes = collect(newSeq):
for i in 0 ..< numSlots:
let slotHashes = collect(newSeq):
for j, idx in steppedStrategy.getIndicies(i):
for j, idx in linearStrategy.getIndices(i):
if j > (protectedManifest.numSlotBlocks - 1):
emptyDigest
else:
@ -261,7 +266,9 @@ suite "Slot builder":
test "Should build correct verification root manifest":
let
steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots)
linearStrategy = Strategy.init(
0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks
)
builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize)
.tryGet()
@ -269,7 +276,7 @@ suite "Slot builder":
slotsHashes = collect(newSeq):
for i in 0 ..< numSlots:
let slotHashes = collect(newSeq):
for j, idx in steppedStrategy.getIndicies(i):
for j, idx in linearStrategy.getIndices(i):
if j > (protectedManifest.numSlotBlocks - 1):
emptyDigest
else:

View File

@ -19,15 +19,15 @@ for offset in @[0, 1, 2, 100]:
test "linear":
check:
toSeq(linear.getIndicies(0)) == @[0, 1, 2, 3, 4].mapIt(it + offset)
toSeq(linear.getIndicies(1)) == @[5, 6, 7, 8, 9].mapIt(it + offset)
toSeq(linear.getIndicies(2)) == @[10, 11, 12].mapIt(it + offset)
toSeq(linear.getIndices(0)) == @[0, 1, 2, 3, 4].mapIt(it + offset)
toSeq(linear.getIndices(1)) == @[5, 6, 7, 8, 9].mapIt(it + offset)
toSeq(linear.getIndices(2)) == @[10, 11, 12].mapIt(it + offset)
test "stepped":
check:
toSeq(stepped.getIndicies(0)) == @[0, 3, 6, 9, 12].mapIt(it + offset)
toSeq(stepped.getIndicies(1)) == @[1, 4, 7, 10].mapIt(it + offset)
toSeq(stepped.getIndicies(2)) == @[2, 5, 8, 11].mapIt(it + offset)
toSeq(stepped.getIndices(0)) == @[0, 3, 6, 9, 12].mapIt(it + offset)
toSeq(stepped.getIndices(1)) == @[1, 4, 7, 10].mapIt(it + offset)
toSeq(stepped.getIndices(2)) == @[2, 5, 8, 11].mapIt(it + offset)
suite "Indexing strategies":
let
@ -39,16 +39,16 @@ suite "Indexing strategies":
l = LinearStrategy.init(0, 0, 1)
s = SteppedStrategy.init(0, 0, 1)
check:
toSeq(l.getIndicies(0)) == @[0]
toSeq(s.getIndicies(0)) == @[0]
toSeq(l.getIndices(0)) == @[0]
toSeq(s.getIndices(0)) == @[0]
test "smallest range 1":
let
l = LinearStrategy.init(0, 1, 1)
s = SteppedStrategy.init(0, 1, 1)
check:
toSeq(l.getIndicies(0)) == @[0, 1]
toSeq(s.getIndicies(0)) == @[0, 1]
toSeq(l.getIndices(0)) == @[0, 1]
toSeq(s.getIndices(0)) == @[0, 1]
test "first index must be smaller than last index":
expect IndexingWrongIndexError:
@ -61,14 +61,14 @@ suite "Indexing strategies":
test "should split elements evenly when possible":
let l = LinearStrategy.init(0, 11, 3)
check:
toSeq(l.getIndicies(0)) == @[0, 1, 2, 3].mapIt(it)
toSeq(l.getIndicies(1)) == @[4, 5, 6, 7].mapIt(it)
toSeq(l.getIndicies(2)) == @[8, 9, 10, 11].mapIt(it)
toSeq(l.getIndices(0)) == @[0, 1, 2, 3].mapIt(it)
toSeq(l.getIndices(1)) == @[4, 5, 6, 7].mapIt(it)
toSeq(l.getIndices(2)) == @[8, 9, 10, 11].mapIt(it)
test "linear - oob":
expect IndexingError:
discard linear.getIndicies(3)
discard linear.getIndices(3)
test "stepped - oob":
expect IndexingError:
discard stepped.getIndicies(3)
discard stepped.getIndices(3)

View File

@ -1,4 +1,5 @@
import ./node/testnode
import ./node/testcontracts
import ./node/testslotrepair
{.warning[UnusedImport]: off.}

View File

@ -0,0 +1,318 @@
import pkg/questionable
import pkg/codex/logutils
import ../contracts/time
import ../contracts/deployment
import ../codex/helpers
import ../examples
import ./marketplacesuite
import ./nodeconfigs
export logutils
logScope:
topics = "integration test slot repair"
marketplacesuite "SP Slot Repair":
const minPricePerBytePerSecond = 1.u256
const collateralPerByte = 1.u256
const blocks = 3
const ecNodes = 3
const ecTolerance = 1
const size = slotSize(blocks, ecNodes, ecTolerance)
var filledSlotIds: seq[SlotId] = @[]
var freedSlotId = none SlotId
var requestId: RequestId
# Here we are keeping track of the slot filled using their ids.
proc onSlotFilled(eventResult: ?!SlotFilled) =
assert not eventResult.isErr
let event = !eventResult
if event.requestId == requestId:
let slotId = slotId(event.requestId, event.slotIndex)
filledSlotIds.add slotId
# Here we are retrieving the slot id freed.
# When the event is triggered, the slot id is removed
# from the filled slot id list.
proc onSlotFreed(eventResult: ?!SlotFreed) =
assert not eventResult.isErr
let event = !eventResult
let slotId = slotId(event.requestId, event.slotIndex)
if event.requestId == requestId:
assert slotId in filledSlotIds
filledSlotIds.del(filledSlotIds.find(slotId))
freedSlotId = some(slotId)
proc createPurchase(client: CodexClient): Future[PurchaseId] {.async.} =
let data = await RandomChunker.example(blocks = blocks)
let cid = (await client.upload(data)).get
let purchaseId = await client.requestStorage(
cid,
expiry = 10.periods,
duration = 20.periods,
nodes = ecNodes,
tolerance = ecTolerance,
collateralPerByte = 1.u256,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 1.u256,
)
requestId = (await client.requestId(purchaseId)).get
return purchaseId
proc freeSlot(provider: CodexClient): Future[void] {.async.} =
# Get the second provider signer.
let signer = ethProvider.getSigner(accounts[2])
let marketplaceWithSecondProviderSigner = marketplace.connect(signer)
# Call freeSlot to speed up the process.
# It accelerates the test by skipping validator
# proof verification and not waiting for the full period.
# The downside is that this doesn't reflect the real slot freed process.
let slots = (await provider.getSlots()).get()
let slotId = slotId(requestId, slots[0].slotIndex)
discard await marketplaceWithSecondProviderSigner.freeSlot(slotId)
setup:
filledSlotIds = @[]
freedSlotId = none SlotId
test "repair from local store",
NodeConfigs(
clients: CodexConfigs.init(nodes = 1).some,
# .debug()
# .withLogFile()
# .withLogTopics("node", "erasure").some,
providers: CodexConfigs
.init(nodes = 2)
.withSimulateProofFailures(idx = 1, failEveryNProofs = 1)
# .debug()
.withLogFile()
.withLogTopics("marketplace", "sales", "reservations", "statemachine").some,
validators: CodexConfigs.init(nodes = 1).some,
# .debug()
# .withLogFile()
# .withLogTopics("validator").some,
):
let client0 = clients()[0]
let provider0 = providers()[0]
let provider1 = providers()[1]
let expiry = 10.periods
let duration = 20.periods
# Let's create 2 availabilities
# SP 1 will hosts 2 slots
# SP 2 will hosts 1 slot
let availability0 = (
await provider0.client.postAvailability(
totalSize = 2 * size.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 3 * size * collateralPerByte,
)
).get
let availability1 = (
await provider1.client.postAvailability(
totalSize = size.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = size * collateralPerByte,
)
).get
let purchaseId = await createPurchase(client0.client)
let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
# Wait for purchase starts, meaning that the slots are filled.
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"),
timeout = expiry.int * 1000,
)
# stop client so it doesn't serve any blocks anymore
await client0.stop()
# Let's disable the second availability,
# SP 2 will not pick the slot again.
await provider1.client.patchAvailability(
availabilityId = availability1.id, enabled = false.some
)
# Update the size of the availability for the SP 1,
# he will repair and host the freed slot
await provider0.client.patchAvailability(
availabilityId = availability0.id,
totalSize = (3 * size.truncate(uint64)).uint64.some,
)
# Let's free the slot to speed up the process
await freeSlot(provider1.client)
# We expect that the freed slot is added in the filled slot id list,
# meaning that the slot was repaired locally by SP 1.
check eventually(
freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000
)
await filledSubscription.unsubscribe()
await slotFreedsubscription.unsubscribe()
test "repair from local and remote store",
NodeConfigs(
clients: CodexConfigs.init(nodes = 1)
# .debug()
# .withLogTopics("node", "erasure")
.some,
providers: CodexConfigs.init(nodes = 3)
# .debug()
# .withLogFile()
# .withLogTopics("marketplace", "sales", "statemachine", "reservations")
.some,
):
let client0 = clients()[0]
let provider0 = providers()[0]
let provider1 = providers()[1]
let provider2 = providers()[2]
let expiry = 10.periods
let duration = 20.periods
# SP 1, SP 2 and SP 3 will host one slot
let availability0 = (
await provider0.client.postAvailability(
totalSize = size.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = size * collateralPerByte,
)
).get
let availability1 = (
await provider1.client.postAvailability(
totalSize = size.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = size * collateralPerByte,
)
).get
discard await provider2.client.postAvailability(
totalSize = size.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = size * collateralPerByte,
)
let purchaseId = await createPurchase(client0.client)
let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
# Wait for purchase starts, meaning that the slots are filled.
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"),
timeout = expiry.int * 1000,
)
# stop client so it doesn't serve any blocks anymore
await client0.stop()
# Let's disable the availability,
# SP 2 will not pick the slot again.
await provider1.client.patchAvailability(availability1.id, enabled = false.some)
# Update the size of the availability for the SP 1,
# he will repair and host the freed slot
await provider0.client.patchAvailability(
availability0.id,
totalSize = (2 * size.truncate(uint64)).some,
totalCollateral = (2 * size * collateralPerByte).some,
)
# Let's free the slot to speed up the process
await freeSlot(provider1.client)
# We expect that the freed slot is added in the filled slot id list,
# meaning that the slot was repaired locally and remotely (using SP 3) by SP 1.
check eventually(freedSlotId.isSome, timeout = expiry.int * 1000)
check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000)
await filledSubscription.unsubscribe()
await slotFreedsubscription.unsubscribe()
test "repair from remote store only",
NodeConfigs(
clients: CodexConfigs.init(nodes = 1)
# .debug()
# .withLogFile()
# .withLogTopics("node", "erasure")
.some,
providers: CodexConfigs.init(nodes = 3)
# .debug()
# .withLogFile()
# .withLogTopics("marketplace", "sales", "statemachine", "reservations")
.some,
):
let client0 = clients()[0]
let provider0 = providers()[0]
let provider1 = providers()[1]
let provider2 = providers()[2]
let expiry = 10.periods
let duration = expiry + 10.periods
# SP 1 will host 2 slots
# SP 2 will host 1 slot
discard await provider0.client.postAvailability(
totalSize = 2 * size.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 2 * size * collateralPerByte,
)
let availability1 = (
await provider1.client.postAvailability(
totalSize = size.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = size * collateralPerByte,
)
).get
let purchaseId = await createPurchase(client0.client)
let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
# Wait for purchase starts, meaning that the slots are filled.
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"),
timeout = expiry.int * 1000,
)
# stop client so it doesn't serve any blocks anymore
await client0.stop()
# Let's create an availability for SP3,
# he will host the repaired slot.
discard await provider2.client.postAvailability(
totalSize = size.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = size * collateralPerByte,
)
# Let's disable the availability,
# SP 2 will not pick the slot again.
await provider1.client.patchAvailability(availability1.id, enabled = false.some)
# Let's free the slot to speed up the process
await freeSlot(provider1.client)
# At this point, SP 3 should repair the slot from SP 1 and host it.
check eventually(freedSlotId.isSome, timeout = expiry.int * 1000)
check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000)
await filledSubscription.unsubscribe()
await slotFreedsubscription.unsubscribe()

View File

@ -9,5 +9,6 @@ import ./integration/testmarketplace
import ./integration/testproofs
import ./integration/testvalidator
import ./integration/testecbug
import ./integration/testslotrepair
{.warning[UnusedImport]: off.}