Add support for slot reconstruction on unavailable slot detection (#892)

Includes corresponding tests.

Various small fixes.
This commit is contained in:
Chrysostomos Nanakos 2025-04-11 12:09:54 +03:00
parent 19a5e05c13
commit 0fa4eec52e
No known key found for this signature in database
16 changed files with 1052 additions and 125 deletions

View File

@ -126,16 +126,16 @@ proc start*(b: Advertiser) {.async: (raises: []).} =
trace "Advertiser start"
if b.advertiserRunning:
warn "Starting advertiser twice"
return
proc onBlock(cid: Cid) {.async.} =
await b.advertiseBlock(cid)
doAssert(b.localStore.onBlockStored.isNone())
b.localStore.onBlockStored = onBlock.some
if b.advertiserRunning:
warn "Starting advertiser twice"
return
b.advertiserRunning = true
for i in 0 ..< b.concurrentAdvReqs:
let fut = b.processQueueLoop()

View File

@ -56,6 +56,7 @@ type
codexNode: CodexNodeRef
repoStore: RepoStore
maintenance: BlockMaintainer
taskpool: Taskpool
CodexPrivateKey* = libp2p.PrivateKey # alias
EthWallet = ethers.Wallet
@ -191,6 +192,9 @@ proc stop*(s: CodexServer) {.async.} =
error "Failed to stop codex node", failures = res.failure.len
raiseAssert "Failed to stop codex node"
if not s.taskpool.isNil:
s.taskpool.shutdown()
proc new*(
T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey
): CodexServer =
@ -331,4 +335,5 @@ proc new*(
restServer: restServer,
repoStore: repoStore,
maintenance: maintenance,
taskpool: taskpool,
)

View File

@ -82,6 +82,7 @@ type
rounded: Natural
steps: Natural
blocksCount: Natural
totalGroups: Natural
strategy: StrategyType
ErasureError* = object of CodexError
@ -166,11 +167,14 @@ proc prepareEncodingData(
let
strategy = params.strategy.init(
firstIndex = 0, lastIndex = params.rounded - 1, iterations = params.steps
firstIndex = 0,
lastIndex = params.rounded - 1,
iterations = params.steps,
totalGroups = params.totalGroups,
)
indicies = toSeq(strategy.getIndicies(step))
indices = toSeq(strategy.getIndices(step))
pendingBlocksIter =
self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount))
self.getPendingBlocks(manifest, indices.filterIt(it < manifest.blocksCount))
var resolved = 0
for fut in pendingBlocksIter:
@ -185,7 +189,7 @@ proc prepareEncodingData(
resolved.inc()
for idx in indicies.filterIt(it >= manifest.blocksCount):
for idx in indices.filterIt(it >= manifest.blocksCount):
let pos = indexToPos(params.steps, idx, step)
trace "Padding with empty block", idx
shallowCopy(data[pos], emptyBlock)
@ -216,10 +220,13 @@ proc prepareDecodingData(
let
strategy = encoded.protectedStrategy.init(
firstIndex = 0, lastIndex = encoded.blocksCount - 1, iterations = encoded.steps
firstIndex = 0,
lastIndex = encoded.blocksCount - 1,
iterations = encoded.steps,
totalGroups = encoded.numSlots,
)
indicies = toSeq(strategy.getIndicies(step))
pendingBlocksIter = self.getPendingBlocks(encoded, indicies)
indices = toSeq(strategy.getIndices(step))
pendingBlocksIter = self.getPendingBlocks(encoded, indices)
var
dataPieces = 0
@ -281,6 +288,7 @@ proc init*(
rounded = roundUp(manifest.blocksCount, ecK)
steps = divUp(rounded, ecK)
blocksCount = rounded + (steps * ecM)
totalGroups = ecK + ecM
success EncodingParams(
ecK: ecK,
@ -288,6 +296,7 @@ proc init*(
rounded: rounded,
steps: steps,
blocksCount: blocksCount,
totalGroups: totalGroups,
strategy: strategy,
)
@ -384,6 +393,8 @@ proc encodeData(
var
data = seq[seq[byte]].new() # number of blocks to encode
parity = createDoubleArray(params.ecM, manifest.blockSize.int)
defer:
freeDoubleArray(parity, params.ecM)
data[].setLen(params.ecK)
# TODO: this is a tight blocking loop so we sleep here to allow
@ -408,8 +419,6 @@ proc encodeData(
return failure(err)
except CancelledError as exc:
raise exc
finally:
freeDoubleArray(parity, params.ecM)
var idx = params.rounded + step
for j in 0 ..< params.ecM:
@ -582,6 +591,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
data = seq[seq[byte]].new()
parityData = seq[seq[byte]].new()
recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int)
defer:
freeDoubleArray(recovered, encoded.ecK)
data[].setLen(encoded.ecK) # set len to K
parityData[].setLen(encoded.ecM) # set len to M
@ -608,8 +619,6 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
return failure(err)
except CancelledError as exc:
raise exc
finally:
freeDoubleArray(recovered, encoded.ecK)
for i in 0 ..< encoded.ecK:
let idx = i * encoded.steps + step
@ -659,6 +668,137 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
return decoded.success
proc repair*(self: Erasure, encoded: Manifest, slotIdx: int): Future[?!void] {.async.} =
## Repair a protected manifest slot
##
## `encoded` - the encoded (protected) manifest to
## be repaired
##
logScope:
steps = encoded.steps
rounded_blocks = encoded.rounded
new_manifest = encoded.blocksCount
var
cids = seq[Cid].new()
decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM)
emptyBlock = newSeq[byte](encoded.blockSize.int)
cids[].setLen(encoded.blocksCount)
try:
for step in 0 ..< encoded.steps:
await sleepAsync(10.millis)
var
data = seq[seq[byte]].new()
parityData = seq[seq[byte]].new()
recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int)
data[].setLen(encoded.ecK)
parityData[].setLen(encoded.ecM)
without (dataPieces, _) =? (
await self.prepareDecodingData(
encoded, step, data, parityData, cids, emptyBlock
)
), err:
trace "Unable to prepare decoding data", error = err.msg
return failure(err)
if dataPieces >= encoded.ecK:
trace "Retrieved all the required data blocks for this step"
continue
trace "Erasure decoding data"
try:
if err =? (
await self.asyncDecode(
encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered
)
).errorOption:
return failure(err)
except CancelledError as exc:
raise exc
finally:
freeDoubleArray(recovered, encoded.ecK)
for i in 0 ..< encoded.ecK:
let idx = i * encoded.steps + step
if data[i].len <= 0 and not cids[idx].isEmpty:
var innerPtr: ptr UncheckedArray[byte] = recovered[][i]
without blk =? bt.Block.new(
innerPtr.toOpenArray(0, encoded.blockSize.int - 1)
), error:
trace "Unable to create data block!", exc = error.msg
return failure(error)
trace "Recovered data block", cid = blk.cid, index = i
if isErr (await self.store.putBlock(blk)):
trace "Unable to store data block!", cid = blk.cid
return failure("Unable to store data block!")
cids[idx] = blk.cid
except CancelledError as exc:
trace "Erasure coding decoding cancelled"
raise exc # cancellation needs to be propagated
except CatchableError as exc:
trace "Erasure coding decoding error", exc = exc.msg
return failure(exc)
finally:
decoder.release()
without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err:
return failure(err)
without treeCid =? tree.rootCid, err:
return failure(err)
if treeCid != encoded.originalTreeCid:
return failure(
"Original tree root differs from the tree root computed out of recovered data"
)
if err =? (await self.store.putAllProofs(tree)).errorOption:
return failure(err)
without repaired =? (
await self.encode(
Manifest.new(encoded), encoded.ecK, encoded.ecM, encoded.protectedStrategy
)
), err:
return failure(err)
if repaired.treeCid != encoded.treeCid:
return failure(
"Original tree root differs from the repaired tree root encoded out of recovered data"
)
let
strategy =
?encoded.protectedStrategy.init(
firstIndex = 0,
lastIndex = encoded.blocksCount - 1,
iterations = encoded.steps,
totalGroups = encoded.numSlots,
).catch
groupIndices = ?toSeq(strategy.getGroupIndices(slotIdx)).catch
for step in 0 ..< encoded.steps:
let indices = strategy.getIndices(step)
for i in indices:
if i notin groupIndices:
if isErr (await self.store.delBlock(encoded.treeCid, i)):
trace "Failed to remove block from tree ",
treeCid = encoded.treeCid, index = i
for i, cid in cids[0 ..< encoded.originalBlocksCount]:
if i notin groupIndices:
if isErr (await self.store.delBlock(treeCid, i)):
trace "Failed to remove original block from tree ", treeCid = treeCid, index = i
return success()
proc start*(self: Erasure) {.async.} =
return

View File

@ -24,13 +24,17 @@ type
IndexingError* = object of CodexError
IndexingWrongIndexError* = object of IndexingError
IndexingWrongIterationsError* = object of IndexingError
IndexingWrongTotalGroupsError* = object of IndexingError
IndexingWrongNumPadGroupBlocksError* = object of IndexingError
IndexingStrategy* = object
strategyType*: StrategyType
strategyType*: StrategyType # Strategy algorithm
firstIndex*: int # Lowest index that can be returned
lastIndex*: int # Highest index that can be returned
iterations*: int # getIndices(iteration) will run from 0 ..< iterations
step*: int
iterations*: int # Number of iterations (0 ..< iterations)
step*: int # Step size between indices
totalGroups*: int # Total number of groups to distribute indices into
numPadGroupBlocks*: int # Optional number of padding blocks per group
func checkIteration(
self: IndexingStrategy, iteration: int
@ -44,7 +48,7 @@ func getIter(first, last, step: int): Iter[int] =
{.cast(noSideEffect).}:
Iter[int].new(first, last, step)
func getLinearIndicies(
func getLinearIndices(
self: IndexingStrategy, iteration: int
): Iter[int] {.raises: [IndexingError].} =
self.checkIteration(iteration)
@ -55,7 +59,7 @@ func getLinearIndicies(
getIter(first, last, 1)
func getSteppedIndicies(
func getSteppedIndices(
self: IndexingStrategy, iteration: int
): Iter[int] {.raises: [IndexingError].} =
self.checkIteration(iteration)
@ -66,17 +70,54 @@ func getSteppedIndicies(
getIter(first, last, self.iterations)
func getIndicies*(
func getIndices*(
self: IndexingStrategy, iteration: int
): Iter[int] {.raises: [IndexingError].} =
## defines the layout of blocks per encoding iteration (data + parity)
##
case self.strategyType
of StrategyType.LinearStrategy:
self.getLinearIndicies(iteration)
self.getLinearIndices(iteration)
of StrategyType.SteppedStrategy:
self.getSteppedIndicies(iteration)
self.getSteppedIndices(iteration)
func getGroupIndices*(
self: IndexingStrategy, groupIndex: int
): Iter[int] {.raises: [IndexingError].} =
## defines failure recovery groups by selecting specific block indices
## from each encoding step (using getIndices)
##
{.cast(noSideEffect).}:
Iter[int].new(
iterator (): int {.raises: [IndexingError], gcsafe.} =
var idx = groupIndex
for step in 0 ..< self.iterations:
var
current = 0
found = false
for value in self.getIndices(step):
if current == idx:
yield value
found = true
break
inc current
if not found:
raise newException(
IndexingError, "groupIndex exceeds indices length in iteration " & $step
)
idx = (idx + 1) mod self.totalGroups
for i in 0 ..< self.numPadGroupBlocks:
yield self.lastIndex + (groupIndex + 1) + i * self.totalGroups
)
func init*(
strategy: StrategyType, firstIndex, lastIndex, iterations: int
strategy: StrategyType,
firstIndex, lastIndex, iterations, totalGroups: int,
numPadGroupBlocks = 0.int,
): IndexingStrategy {.raises: [IndexingError].} =
if firstIndex > lastIndex:
raise newException(
@ -91,10 +132,25 @@ func init*(
"iterations (" & $iterations & ") must be greater than zero.",
)
if totalGroups <= 0:
raise newException(
IndexingWrongTotalGroupsError,
"totalGroups (" & $totalGroups & ") must be greater than zero.",
)
if numPadGroupBlocks < 0:
raise newException(
IndexingWrongNumPadGroupBlocksError,
"numPadGroupBlocks (" & $numPadGroupBlocks &
") must be equal or greater than zero.",
)
IndexingStrategy(
strategyType: strategy,
firstIndex: firstIndex,
lastIndex: lastIndex,
iterations: iterations,
totalGroups: totalGroups,
step: divUp((lastIndex - firstIndex + 1), iterations),
numPadGroupBlocks: numPadGroupBlocks,
)

View File

@ -182,32 +182,35 @@ proc fetchBatched*(
# (i: int) => self.networkStore.getBlock(BlockAddress.init(cid, i))
# )
while not iter.finished:
let blockFutures = collect:
for i in 0 ..< batchSize:
if not iter.finished:
let address = BlockAddress.init(cid, iter.next())
if not (await address in self.networkStore) or fetchLocal:
self.networkStore.getBlock(address)
try:
while not iter.finished:
let blockFutures = collect:
for i in 0 ..< batchSize:
if not iter.finished:
let address = BlockAddress.init(cid, iter.next())
if not (await address in self.networkStore) or fetchLocal:
self.networkStore.getBlock(address)
without blockResults =? await allFinishedValues(blockFutures), err:
trace "Some blocks failed to fetch", err = err.msg
return failure(err)
without blockResults =? await allFinishedValues(blockFutures), err:
trace "Some blocks failed to fetch", err = err.msg
return failure(err)
let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)
let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)
let numOfFailedBlocks = blockResults.len - blocks.len
if numOfFailedBlocks > 0:
return
failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")")
let numOfFailedBlocks = blockResults.len - blocks.len
if numOfFailedBlocks > 0:
return
failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")")
if not onBatch.isNil and batchErr =? (await onBatch(blocks)).errorOption:
return failure(batchErr)
if not onBatch.isNil and batchErr =? (await onBatch(blocks)).errorOption:
return failure(batchErr)
if not iter.finished:
await sleepAsync(1.millis)
if not iter.finished:
await sleepAsync(1.millis)
success()
success()
except IndexingError as e:
failure(e.msg)
proc fetchBatched*(
self: CodexNodeRef,
@ -629,10 +632,6 @@ proc onStore(
trace "Received a request to store a slot"
# TODO: Use the isRepairing to manage the slot download.
# If isRepairing is true, the slot has to be repaired before
# being downloaded.
without manifest =? (await self.fetchManifest(cid)), err:
trace "Unable to fetch manifest for cid", cid, err = err.msg
return failure(err)
@ -665,32 +664,46 @@ proc onStore(
return success()
without indexer =?
manifest.verifiableStrategy.init(0, manifest.blocksCount - 1, manifest.numSlots).catch,
err:
trace "Unable to create indexing strategy from protected manifest", err = err.msg
return failure(err)
if slotIdx > int.high.uint64:
error "Cannot cast slot index to int", slotIndex = slotIdx
return
without blksIter =? indexer.getIndicies(slotIdx.int).catch, err:
trace "Unable to get indicies from strategy", err = err.msg
return failure(err)
if isRepairing:
trace "start repairing slot", slotIdx
try:
let erasure = Erasure.new(
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
)
if err =? (await erasure.repair(manifest, slotIdx.int)).errorOption:
error "Unable to erasure decode repairing manifest",
cid = manifest.treeCid, exc = err.msg
return failure(err)
except CatchableError as exc:
error "Error erasure decoding repairing manifest",
cid = manifest.treeCid, exc = exc.msg
return failure(exc.msg)
else:
without indexer =?
manifest.verifiableStrategy.init(
0, manifest.blocksCount - 1, manifest.steps, manifest.numSlots
).catch, err:
trace "Unable to create indexing strategy from protected manifest", err = err.msg
return failure(err)
if err =? (
await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry)
).errorOption:
trace "Unable to fetch blocks", err = err.msg
return failure(err)
without blksIter =? indexer.getGroupIndices(slotIdx.int).catch, err:
trace "Unable to get indices from strategy", err = err.msg
return failure(err)
if err =? (
await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry)
).errorOption:
trace "Unable to fetch blocks", err = err.msg
return failure(err)
without slotRoot =? (await builder.buildSlot(slotIdx.int)), err:
trace "Unable to build slot", err = err.msg
return failure(err)
trace "Slot successfully retrieved and reconstructed"
if cid =? slotRoot.toSlotCid() and cid != manifest.slotRoots[slotIdx]:
trace "Slot root mismatch",
manifest = manifest.slotRoots[slotIdx.int], recovered = slotRoot.toSlotCid()
@ -831,9 +844,6 @@ proc start*(self: CodexNodeRef) {.async.} =
proc stop*(self: CodexNodeRef) {.async.} =
trace "Stopping node"
if not self.taskpool.isNil:
self.taskpool.shutdown()
await self.trackedFutures.cancelTracked()
if not self.engine.isNil:

View File

@ -113,18 +113,19 @@ func numSlotCells*[T, H](self: SlotsBuilder[T, H]): Natural =
self.numBlockCells * self.numSlotBlocks
func slotIndiciesIter*[T, H](self: SlotsBuilder[T, H], slot: Natural): ?!Iter[int] =
func slotIndicesIter*[T, H](self: SlotsBuilder[T, H], slot: Natural): Iter[int] =
## Returns the slot indices.
##
self.strategy.getIndicies(slot).catch
self.strategy.getGroupIndicies(slot)
func slotIndicies*[T, H](self: SlotsBuilder[T, H], slot: Natural): seq[int] =
func slotIndices*[T, H](
self: SlotsBuilder[T, H], slot: Natural
): seq[int] {.raises: [IndexingError].} =
## Returns the slot indices.
##
if iter =? self.strategy.getIndicies(slot).catch:
return toSeq(iter)
toSeq(self.strategy.getGroupIndices(slot))
func manifest*[T, H](self: SlotsBuilder[T, H]): Manifest =
## Returns the manifest.
@ -183,22 +184,25 @@ proc getCellHashes*[T, H](
numberOfSlots = numberOfSlots
slotIndex = slotIndex
let hashes = collect(newSeq):
for i, blkIdx in self.strategy.getIndicies(slotIndex):
logScope:
blkIdx = blkIdx
pos = i
try:
let hashes = collect(newSeq):
for i, blkIdx in self.strategy.getGroupIndices(slotIndex):
logScope:
blkIdx = blkIdx
pos = i
trace "Getting block CID for tree at index"
without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root,
err:
error "Failed to get block CID for tree at index", err = err.msg
return failure(err)
trace "Getting block CID for tree at index"
without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and
digest =? tree.root, err:
error "Failed to get block CID for tree at index", err = err.msg
return failure(err)
trace "Get block digest", digest = digest.toHex
digest
trace "Get block digest", digest = digest.toHex
digest
success hashes
success hashes
except IndexingError as e:
failure e.msg
proc buildSlotTree*[T, H](
self: SlotsBuilder[T, H], slotIndex: Natural
@ -346,7 +350,14 @@ proc new*[T, H](
emptyBlock = newSeq[byte](manifest.blockSize.int)
emptyDigestTree = ?T.digestTree(emptyBlock, cellSize.int)
strategy = ?strategy.init(0, numBlocksTotal - 1, manifest.numSlots).catch
strategy =
?strategy.init(
firstIndex = 0,
lastIndex = manifest.blocksCount - 1,
iterations = manifest.steps,
totalGroups = manifest.numSlots,
numPadGroupBlocks = numPadSlotBlocks,
).catch
logScope:
numSlotBlocks = numSlotBlocks

View File

@ -53,8 +53,8 @@ proc getSample*[T, H](
cellsPerBlock = self.builder.numBlockCells
blkCellIdx = cellIdx.toCellInBlk(cellsPerBlock) # block cell index
blkSlotIdx = cellIdx.toBlkInSlot(cellsPerBlock) # slot tree index
origBlockIdx = self.builder.slotIndicies(self.index)[blkSlotIdx]
# convert to original dataset block index
slotIndices = ?self.builder.slotIndices(self.index).catch
origBlockIdx = slotIndices[blkSlotIdx] # convert to original dataset block index
logScope:
cellIdx = cellIdx

View File

@ -128,6 +128,13 @@ method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
trace "Deleting block from network store", cid
return self.localStore.delBlock(cid)
method delBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!void] =
## Delete a block from the blockstore
##
trace "Deleting block from network store", treeCid, index
return self.localStore.delBlock(treeCid, index)
{.pop.}
method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} =

View File

@ -7,6 +7,7 @@ type
Function*[T, U] = proc(fut: T): U {.raises: [CatchableError], gcsafe, closure.}
IsFinished* = proc(): bool {.raises: [], gcsafe, closure.}
GenNext*[T] = proc(): T {.raises: [CatchableError], gcsafe.}
Iterable[T] = iterator (): T
Iter*[T] = ref object
finished: bool
next*: GenNext[T]
@ -89,6 +90,36 @@ proc new*[T](_: type Iter[T], items: seq[T]): Iter[T] =
Iter[int].new(0 ..< items.len).map((i: int) => items[i])
proc new*[T](_: type Iter[T], iter: Iterable[T]): Iter[T] =
## Creates a new Iter from an iterator
##
var nextOrErr: Option[Result[T, ref CatchableError]]
proc tryNext(): void =
nextOrErr = Result[T, ref CatchableError].none
while not iter.finished:
try:
let t: T = iter()
if not iter.finished:
nextOrErr = some(success(t))
break
except CatchableError as err:
nextOrErr = some(T.failure(err))
except Exception:
assert(false)
proc genNext(): T {.raises: [CatchableError].} =
without u =? nextOrErr.unsafeGet, err:
raise err
tryNext()
return u
proc isFinished(): bool =
nextOrErr.isNone
tryNext()
Iter[T].new(genNext, isFinished)
proc empty*[T](_: type Iter[T]): Iter[T] =
## Creates an empty Iter
##

View File

@ -125,14 +125,14 @@ asyncchecksuite "Test Node - Host contracts":
fetchedBytes += blk.data.len.uint
return success()
(await onStore(request, 1.uint64, onBlocks, isRepairing = false)).tryGet()
(await onStore(request, 0.uint64, onBlocks, isRepairing = false)).tryGet()
check fetchedBytes == 12 * DefaultBlockSize.uint
let indexer = verifiable.protectedStrategy.init(
0, verifiable.numSlotBlocks() - 1, verifiable.numSlots
0, verifiable.blocksCount - 1, verifiable.steps, verifiable.numSlots
)
for index in indexer.getIndicies(1):
for index in indexer.getGroupIndices(0.int):
let
blk = (await localStore.getBlock(verifiable.treeCid, index)).tryGet
key = (createBlockExpirationMetadataKey(blk.cid)).tryGet

View File

@ -0,0 +1,316 @@
import std/options
import std/importutils
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/taskpools
import pkg/nitro
import pkg/codexdht/discv5/protocol as discv5
import pkg/codex/logutils
import pkg/codex/stores
import pkg/codex/contracts
import pkg/codex/blockexchange
import pkg/codex/chunker
import pkg/codex/slots
import pkg/codex/systemclock
import pkg/codex/manifest
import pkg/codex/discovery
import pkg/codex/erasure
import pkg/codex/blocktype as bt
import pkg/codex/indexingstrategy
import pkg/codex/nat
import pkg/codex/utils/natutils
import pkg/chronos/transports/stream
import pkg/codex/node {.all.}
import ../../asynctest
import ../../examples
import ../helpers
privateAccess(CodexNodeRef) # enable access to private fields
logScope:
topics = "testSlotRepair"
proc nextFreePort*(startPort: int): Future[int] {.async.} =
proc client(server: StreamServer, transp: StreamTransport) {.async: (raises: []).} =
await transp.closeWait()
var port = startPort
while true:
try:
let host = initTAddress("127.0.0.1", port)
var server = createStreamServer(host, client, {ReuseAddr})
await server.closeWait()
return port
except TransportOsError:
inc port
proc fetchStreamData(stream: LPStream, datasetSize: int): Future[seq[byte]] {.async.} =
var buf = newSeqUninitialized[byte](datasetSize)
while not stream.atEof:
var length = await stream.readOnce(addr buf[0], buf.len)
if length <= 0:
break
assert buf.len == datasetSize
buf
proc flatten[T](s: seq[seq[T]]): seq[T] =
var t = newSeq[T](0)
for ss in s:
t &= ss
return t
asyncchecksuite "Test Node - Slot Repair":
var
manifest: Manifest
builder: Poseidon2Builder
verifiable: Manifest
verifiableBlock: bt.Block
protected: Manifest
tempLevelDbs: seq[TempLevelDb] = newSeq[TempLevelDb]()
localStores: seq[RepoStore] = newSeq[RepoStore]()
nodes: seq[CodexNodeRef] = newSeq[CodexNodeRef]()
taskpool: Taskpool
let numNodes = 12
setup:
taskpool = Taskpool.new()
var bootstrapNodes: seq[SignedPeerRecord] = @[]
for i in 0 ..< numNodes:
let
listenPort = await nextFreePort(8080 + 2 * i)
bindPort = await nextFreePort(listenPort + 1)
listenAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/" & $listenPort).expect(
"invalid multiaddress"
)
switch = newStandardSwitch(
transportFlags = {ServerFlags.ReuseAddr},
sendSignedPeerRecord = true,
addrs = listenAddr,
)
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
bdStore = TempLevelDb.new()
blockDiscoveryStore = bdStore.newDb()
repoStore = TempLevelDb.new()
mdStore = TempLevelDb.new()
localStore =
RepoStore.new(repoStore.newDb(), mdStore.newDb(), clock = SystemClock.new())
blockDiscovery = Discovery.new(
switch.peerInfo.privateKey,
announceAddrs = @[listenAddr],
bindPort = bindPort.Port,
store = blockDiscoveryStore,
bootstrapNodes = bootstrapNodes,
)
discovery = DiscoveryEngine.new(
localStore, peerStore, network, blockDiscovery, pendingBlocks
)
advertiser = Advertiser.new(localStore, blockDiscovery)
engine = BlockExcEngine.new(
localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks
)
store = NetworkStore.new(engine, localStore)
node = CodexNodeRef.new(
switch = switch,
networkStore = store,
engine = engine,
prover = Prover.none,
discovery = blockDiscovery,
taskpool = taskpool,
)
await localStore.start()
await switch.peerInfo.update()
switch.mount(network)
let (announceAddrs, discoveryAddrs) = nattedAddress(
NatConfig(hasExtIp: false, nat: NatNone), switch.peerInfo.addrs, bindPort.Port
)
node.discovery.updateAnnounceRecord(announceAddrs)
node.discovery.updateDhtRecord(discoveryAddrs)
check node.discovery.dhtRecord.isSome
bootstrapNodes.add !node.discovery.dhtRecord
tempLevelDbs.add bdStore
tempLevelDbs.add repoStore
tempLevelDbs.add mdStore
localStores.add localStore
nodes.add node
for node in nodes:
await node.switch.start()
await node.start()
teardown:
for node in nodes:
await node.switch.stop()
await node.stop()
for s in tempLevelDbs:
await s.destroyDb()
for l in localStores:
await l.stop()
taskpool.shutdown()
localStores = @[]
nodes = @[]
tempLevelDbs = @[]
test "repair slots (2,1)":
let
numBlocks = 5
datasetSize = numBlocks * DefaultBlockSize.int
ecK = 2
ecM = 1
localStore = localStores[0]
store = nodes[0].blockStore
blocks =
await makeRandomBlocks(datasetSize = datasetSize, blockSize = DefaultBlockSize)
data = (
block:
collect(newSeq) do:
for blk in blocks:
blk.data
).flatten()
assert blocks.len == numBlocks
# Populate manifest in local store
manifest = await storeDataGetManifest(localStore, blocks)
let
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
(await localStore.putBlock(manifestBlock)).tryGet()
protected = (await erasure.encode(manifest, ecK, ecM)).tryGet()
builder = Poseidon2Builder.new(localStore, protected).tryGet()
verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock =
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
# Populate protected manifest in local store
(await localStore.putBlock(verifiableBlock)).tryGet()
var request = StorageRequest.example
request.content.cid = verifiableBlock.cid
request.ask.slots = protected.numSlots.uint64
request.ask.slotSize = DefaultBlockSize.uint64
for i in 0 ..< protected.numSlots.uint64:
(await nodes[i + 1].onStore(request, i, nil, isRepairing = false)).tryGet()
await nodes[0].switch.stop() # acts as client
await nodes[1].switch.stop() # slot 0 missing now
# repair missing slot
(await nodes[4].onStore(request, 0.uint64, nil, isRepairing = true)).tryGet()
await nodes[2].switch.stop() # slot 1 missing now
(await nodes[5].onStore(request, 1.uint64, nil, isRepairing = true)).tryGet()
await nodes[3].switch.stop() # slot 2 missing now
(await nodes[6].onStore(request, 2.uint64, nil, isRepairing = true)).tryGet()
await nodes[4].switch.stop() # slot 0 missing now
# repair missing slot from repaired slots
(await nodes[7].onStore(request, 0.uint64, nil, isRepairing = true)).tryGet()
await nodes[5].switch.stop() # slot 1 missing now
# repair missing slot from repaired slots
(await nodes[8].onStore(request, 1.uint64, nil, isRepairing = true)).tryGet()
await nodes[6].switch.stop() # slot 2 missing now
# repair missing slot from repaired slots
(await nodes[9].onStore(request, 2.uint64, nil, isRepairing = true)).tryGet()
let
stream = (await nodes[10].retrieve(verifiableBlock.cid, local = false)).tryGet()
expectedData = await fetchStreamData(stream, datasetSize)
assert expectedData.len == data.len
assert expectedData == data
test "repair slots (3,2)":
let
numBlocks = 40
datasetSize = numBlocks * DefaultBlockSize.int
ecK = 3
ecM = 2
localStore = localStores[0]
store = nodes[0].blockStore
blocks =
await makeRandomBlocks(datasetSize = datasetSize, blockSize = DefaultBlockSize)
data = (
block:
collect(newSeq) do:
for blk in blocks:
blk.data
).flatten()
assert blocks.len == numBlocks
# Populate manifest in local store
manifest = await storeDataGetManifest(localStore, blocks)
let
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
(await localStore.putBlock(manifestBlock)).tryGet()
protected = (await erasure.encode(manifest, ecK, ecM)).tryGet()
builder = Poseidon2Builder.new(localStore, protected).tryGet()
verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock =
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
# Populate protected manifest in local store
(await localStore.putBlock(verifiableBlock)).tryGet()
var request = StorageRequest.example
request.content.cid = verifiableBlock.cid
request.ask.slots = protected.numSlots.uint64
request.ask.slotSize = DefaultBlockSize.uint64
for i in 0 ..< protected.numSlots.uint64:
(await nodes[i + 1].onStore(request, i, nil, isRepairing = false)).tryGet()
await nodes[0].switch.stop() # acts as client
await nodes[1].switch.stop() # slot 0 missing now
await nodes[3].switch.stop() # slot 2 missing now
# repair missing slots
(await nodes[6].onStore(request, 0.uint64, nil, isRepairing = true)).tryGet()
(await nodes[7].onStore(request, 2.uint64, nil, isRepairing = true)).tryGet()
await nodes[2].switch.stop() # slot 1 missing now
await nodes[4].switch.stop() # slot 3 missing now
# repair missing slots from repaired slots
(await nodes[8].onStore(request, 1.uint64, nil, isRepairing = true)).tryGet()
(await nodes[9].onStore(request, 3.uint64, nil, isRepairing = true)).tryGet()
await nodes[5].switch.stop() # slot 4 missing now
# repair missing slot from repaired slots
(await nodes[10].onStore(request, 4.uint64, nil, isRepairing = true)).tryGet()
let
stream = (await nodes[11].retrieve(verifiableBlock.cid, local = false)).tryGet()
expectedData = await fetchStreamData(stream, datasetSize)
assert expectedData.len == data.len
assert expectedData == data

View File

@ -167,7 +167,13 @@ suite "Slot builder":
test "Should build slot hashes for all slots":
let
steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots)
steppedStrategy = Strategy.init(
0,
protectedManifest.blocksCount - 1,
protectedManifest.steps,
numSlots,
numPadSlotBlocks,
)
builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize)
@ -176,7 +182,7 @@ suite "Slot builder":
for i in 0 ..< numSlots:
let
expectedHashes = collect(newSeq):
for j, idx in steppedStrategy.getIndicies(i):
for j, idx in steppedStrategy.getGroupIndices(i):
if j > (protectedManifest.numSlotBlocks - 1):
emptyDigest
else:
@ -190,7 +196,13 @@ suite "Slot builder":
test "Should build slot trees for all slots":
let
steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots)
steppedStrategy = Strategy.init(
0,
protectedManifest.blocksCount - 1,
protectedManifest.steps,
numSlots,
numPadSlotBlocks,
)
builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize)
@ -199,7 +211,7 @@ suite "Slot builder":
for i in 0 ..< numSlots:
let
expectedHashes = collect(newSeq):
for j, idx in steppedStrategy.getIndicies(i):
for j, idx in steppedStrategy.getGroupIndices(i):
if j > (protectedManifest.numSlotBlocks - 1):
emptyDigest
else:
@ -235,7 +247,13 @@ suite "Slot builder":
test "Should build correct verification root":
let
steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots)
steppedStrategy = Strategy.init(
0,
protectedManifest.blocksCount - 1,
protectedManifest.steps,
numSlots,
numPadSlotBlocks,
)
builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize)
.tryGet()
@ -245,7 +263,7 @@ suite "Slot builder":
slotsHashes = collect(newSeq):
for i in 0 ..< numSlots:
let slotHashes = collect(newSeq):
for j, idx in steppedStrategy.getIndicies(i):
for j, idx in steppedStrategy.getGroupIndices(i):
if j > (protectedManifest.numSlotBlocks - 1):
emptyDigest
else:
@ -261,7 +279,13 @@ suite "Slot builder":
test "Should build correct verification root manifest":
let
steppedStrategy = Strategy.init(0, numBlocksTotal - 1, numSlots)
steppedStrategy = Strategy.init(
0,
protectedManifest.blocksCount - 1,
protectedManifest.steps,
numSlots,
numPadSlotBlocks,
)
builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize)
.tryGet()
@ -269,7 +293,7 @@ suite "Slot builder":
slotsHashes = collect(newSeq):
for i in 0 ..< numSlots:
let slotHashes = collect(newSeq):
for j, idx in steppedStrategy.getIndicies(i):
for j, idx in steppedStrategy.getGroupIndices(i):
if j > (protectedManifest.numSlotBlocks - 1):
emptyDigest
else:

View File

@ -1,4 +1,5 @@
import std/sequtils
import std/algorithm
import pkg/chronos
import pkg/codex/utils/asynciter
@ -14,61 +15,67 @@ for offset in @[0, 1, 2, 100]:
firstIndex = 0 + offset
lastIndex = 12 + offset
nIters = 3
linear = LinearStrategy.init(firstIndex, lastIndex, nIters)
stepped = SteppedStrategy.init(firstIndex, lastIndex, nIters)
totalGroups = 1
linear = LinearStrategy.init(firstIndex, lastIndex, nIters, totalGroups)
stepped = SteppedStrategy.init(firstIndex, lastIndex, nIters, totalGroups)
test "linear":
check:
toSeq(linear.getIndicies(0)) == @[0, 1, 2, 3, 4].mapIt(it + offset)
toSeq(linear.getIndicies(1)) == @[5, 6, 7, 8, 9].mapIt(it + offset)
toSeq(linear.getIndicies(2)) == @[10, 11, 12].mapIt(it + offset)
toSeq(linear.getIndices(0)) == @[0, 1, 2, 3, 4].mapIt(it + offset)
toSeq(linear.getIndices(1)) == @[5, 6, 7, 8, 9].mapIt(it + offset)
toSeq(linear.getIndices(2)) == @[10, 11, 12].mapIt(it + offset)
test "stepped":
check:
toSeq(stepped.getIndicies(0)) == @[0, 3, 6, 9, 12].mapIt(it + offset)
toSeq(stepped.getIndicies(1)) == @[1, 4, 7, 10].mapIt(it + offset)
toSeq(stepped.getIndicies(2)) == @[2, 5, 8, 11].mapIt(it + offset)
toSeq(stepped.getIndices(0)) == @[0, 3, 6, 9, 12].mapIt(it + offset)
toSeq(stepped.getIndices(1)) == @[1, 4, 7, 10].mapIt(it + offset)
toSeq(stepped.getIndices(2)) == @[2, 5, 8, 11].mapIt(it + offset)
suite "Indexing strategies":
let
linear = LinearStrategy.init(0, 10, 3)
stepped = SteppedStrategy.init(0, 10, 3)
totalGroups = 1
linear = LinearStrategy.init(0, 10, 3, totalGroups)
stepped = SteppedStrategy.init(0, 10, 3, totalGroups)
test "smallest range 0":
let
l = LinearStrategy.init(0, 0, 1)
s = SteppedStrategy.init(0, 0, 1)
l = LinearStrategy.init(0, 0, 1, totalGroups)
s = SteppedStrategy.init(0, 0, 1, totalGroups)
check:
toSeq(l.getIndicies(0)) == @[0]
toSeq(s.getIndicies(0)) == @[0]
toSeq(l.getIndices(0)) == @[0]
toSeq(s.getIndices(0)) == @[0]
test "smallest range 1":
let
l = LinearStrategy.init(0, 1, 1)
s = SteppedStrategy.init(0, 1, 1)
l = LinearStrategy.init(0, 1, 1, totalGroups)
s = SteppedStrategy.init(0, 1, 1, totalGroups)
check:
toSeq(l.getIndicies(0)) == @[0, 1]
toSeq(s.getIndicies(0)) == @[0, 1]
toSeq(l.getIndices(0)) == @[0, 1]
toSeq(s.getIndices(0)) == @[0, 1]
test "first index must be smaller than last index":
expect IndexingWrongIndexError:
discard LinearStrategy.init(10, 0, 1)
discard LinearStrategy.init(10, 0, 1, totalGroups)
test "iterations must be greater than zero":
expect IndexingWrongIterationsError:
discard LinearStrategy.init(0, 10, 0)
discard LinearStrategy.init(0, 10, 0, totalGroups)
test "totalGroups must be greater than zero":
expect IndexingWrongTotalGroupsError:
discard LinearStrategy.init(1, 1, 1, 0)
test "should split elements evenly when possible":
let l = LinearStrategy.init(0, 11, 3)
let l = LinearStrategy.init(0, 11, 3, totalGroups)
check:
toSeq(l.getIndicies(0)) == @[0, 1, 2, 3].mapIt(it)
toSeq(l.getIndicies(1)) == @[4, 5, 6, 7].mapIt(it)
toSeq(l.getIndicies(2)) == @[8, 9, 10, 11].mapIt(it)
toSeq(l.getIndices(0)) == @[0, 1, 2, 3].mapIt(it)
toSeq(l.getIndices(1)) == @[4, 5, 6, 7].mapIt(it)
toSeq(l.getIndices(2)) == @[8, 9, 10, 11].mapIt(it)
test "linear - oob":
expect IndexingError:
discard linear.getIndicies(3)
discard linear.getIndices(3)
test "stepped - oob":
expect IndexingError:
discard stepped.getIndicies(3)
discard stepped.getIndices(3)

View File

@ -1,4 +1,5 @@
import ./node/testnode
import ./node/testcontracts
import ./node/testslotrepair
{.warning[UnusedImport]: off.}

View File

@ -0,0 +1,318 @@
import pkg/questionable
import pkg/codex/logutils
import ../contracts/time
import ../contracts/deployment
import ../codex/helpers
import ../examples
import ./marketplacesuite
import ./nodeconfigs
export logutils
logScope:
topics = "integration test slot repair"
marketplacesuite "SP Slot Repair":
const minPricePerBytePerSecond = 1.u256
const collateralPerByte = 1.u256
const blocks = 3
const ecNodes = 5
const ecTolerance = 2
test "repair from local store",
NodeConfigs(
clients: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics("node", "erasure").some,
providers: CodexConfigs
.init(nodes = 2)
.withSimulateProofFailures(idx = 1, failEveryNProofs = 1)
.debug()
.withLogFile()
.withLogTopics("marketplace", "sales", "reservations", "node", "statemachine").some,
validators: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics("validator").some,
):
let client0 = clients()[0]
let provider0 = providers()[0]
let provider1 = providers()[1]
let expiry = 10.periods
let duration = expiry + 10.periods
let data = await RandomChunker.example(blocks = blocks)
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
let availability = (
await provider0.client.postAvailability(
totalSize = 4 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 100 * slotSize * collateralPerByte,
)
).get
discard await provider1.client.postAvailability(
totalSize = slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 100 * slotSize * collateralPerByte,
)
var filledSlotIds: seq[SlotId] = @[]
proc onSlotFilled(eventResult: ?!SlotFilled) =
assert not eventResult.isErr
let event = !eventResult
let slotId = slotId(event.requestId, event.slotIndex)
filledSlotIds.add slotId
let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
var freedSlotId = none SlotId
proc onSlotFreed(eventResult: ?!SlotFreed) =
assert not eventResult.isErr
let event = !eventResult
let slotId = slotId(event.requestId, event.slotIndex)
assert slotId in filledSlotIds
filledSlotIds.del(filledSlotIds.find(slotId))
freedSlotId = some(slotId)
let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
let cid = (await client0.client.upload(data)).get
let purchaseId = await client0.client.requestStorage(
cid,
expiry = expiry,
duration = duration,
nodes = ecNodes,
tolerance = ecTolerance,
proofProbability = 1.u256,
)
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"),
timeout = expiry.int * 1000,
)
await client0.stop()
await provider0.client.patchAvailability(
availabilityId = availability.id,
totalSize = (5 * slotSize.truncate(uint64)).uint64.some,
duration = duration.uint64.some,
minPricePerBytePerSecond = minPricePerBytePerSecond.some,
totalCollateral = (100 * slotSize * collateralPerByte).some,
)
check eventually(freedSlotId.isSome, timeout = (duration - expiry).int * 1000)
check eventually(
freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000
)
await filledSubscription.unsubscribe()
await slotFreedsubscription.unsubscribe()
test "repair from remote store",
NodeConfigs(
clients: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics("node", "erasure").some,
providers: CodexConfigs
.init(nodes = 3)
.withSimulateProofFailures(idx = 1, failEveryNProofs = 1)
.debug()
.withLogFile()
.withLogTopics("marketplace", "sales", "reservations", "node", "statemachine").some,
validators: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics("validator").some,
):
let client0 = clients()[0]
let provider0 = providers()[0]
let provider1 = providers()[1]
let provider2 = providers()[2]
let expiry = 10.periods
let duration = expiry + 10.periods
let data = await RandomChunker.example(blocks = blocks)
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
discard await provider0.client.postAvailability(
totalSize = 4 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 100 * slotSize * collateralPerByte,
)
discard await provider1.client.postAvailability(
totalSize = slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 100 * slotSize * collateralPerByte,
)
var filledSlotIds: seq[SlotId] = @[]
proc onSlotFilled(eventResult: ?!SlotFilled) =
assert not eventResult.isErr
let event = !eventResult
let slotId = slotId(event.requestId, event.slotIndex)
filledSlotIds.add slotId
let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
var freedSlotId = none SlotId
proc onSlotFreed(eventResult: ?!SlotFreed) =
assert not eventResult.isErr
let event = !eventResult
let slotId = slotId(event.requestId, event.slotIndex)
assert slotId in filledSlotIds
filledSlotIds.del(filledSlotIds.find(slotId))
freedSlotId = some(slotId)
let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
let cid = (await client0.client.upload(data)).get
let purchaseId = await client0.client.requestStorage(
cid,
expiry = expiry,
duration = duration,
nodes = ecNodes,
tolerance = ecTolerance,
proofProbability = 1.u256,
)
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"),
timeout = expiry.int * 1000,
)
await client0.stop()
discard await provider2.client.postAvailability(
totalSize = slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 100 * slotSize * collateralPerByte,
)
check eventually(freedSlotId.isSome, timeout = (duration - expiry).int * 1000)
await provider1.stop()
check eventually(
freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000
)
await filledSubscription.unsubscribe()
await slotFreedsubscription.unsubscribe()
test "storage provider slot repair",
NodeConfigs(
clients: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics("node", "erasure").some,
providers: CodexConfigs
.init(nodes = 4)
.debug()
.withLogFile()
.withLogTopics("marketplace", "sales", "reservations", "node").some,
validators: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics("validator").some,
):
let client0 = clients()[0]
let expiry = 10.periods
let duration = expiry + 10.periods
let size = 0xFFFFFF.uint64
let data = await RandomChunker.example(blocks = blocks)
let datasetSize =
datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance)
await createAvailabilities(
size, duration, datasetSize * collateralPerByte, minPricePerBytePerSecond
)
let cid = (await client0.client.upload(data)).get
let purchaseId = await client0.client.requestStorage(
cid,
expiry = expiry,
duration = duration,
collateralPerByte = collateralPerByte,
nodes = ecNodes,
tolerance = ecTolerance,
proofProbability = 1.u256,
pricePerBytePerSecond = minPricePerBytePerSecond,
)
let requestId = (await client0.client.requestId(purchaseId)).get
var filledSlotIds: seq[SlotId] = @[]
proc onSlotFilled(eventResult: ?!SlotFilled) =
assert not eventResult.isErr
let event = !eventResult
if event.requestId == requestId:
let slotId = slotId(event.requestId, event.slotIndex)
filledSlotIds.add slotId
let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"),
timeout = expiry.int * 1000,
)
await client0.stop()
check eventually(
filledSlotIds.len == blocks, timeout = (duration - expiry).int * 1000
)
trace "all slots have been filled"
var slotWasFreed = false
proc onSlotFreed(event: ?!SlotFreed) =
if event.isOk and event.value.requestId == requestId:
trace "slot was freed", slotIndex = $event.value.slotIndex
slotWasFreed = true
let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
block provider_search:
while true:
for slotId in filledSlotIds:
for provider in providers():
if (await provider.client.saleStateIs(slotId, "SaleProving")):
await provider.stop()
break provider_search
await sleepAsync(100.milliseconds)
check eventually(slotWasFreed, timeout = (duration - expiry).int * 1000)
await slotFreedsubscription.unsubscribe()
check eventually(
filledSlotIds.len > blocks, timeout = (duration - expiry).int * 1000
)
trace "freed slot was filled"
await filledSubscription.unsubscribe()

View File

@ -9,5 +9,6 @@ import ./integration/testmarketplace
import ./integration/testproofs
import ./integration/testvalidator
import ./integration/testecbug
import ./integration/testslotrepair
{.warning[UnusedImport]: off.}