logos-storage-nim/tests/codex/node/testslotrepair.nim
Chrysostomos Nanakos be759baf4d
feat: Block exchange optimizations (#1325)
Signed-off-by: Giuliano Mega <giuliano.mega@gmail.com>
Signed-off-by: Chrysostomos Nanakos <chris@include.gr>
Co-authored-by: gmega <giuliano.mega@gmail.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
2025-11-13 05:47:02 +00:00

222 lines
6.9 KiB
Nim

import std/options
import std/importutils
import std/times
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/codex/logutils
import pkg/codex/stores
import pkg/codex/contracts
import pkg/codex/slots
import pkg/codex/manifest
import pkg/codex/erasure
import pkg/codex/blocktype as bt
import pkg/chronos/transports/stream
import pkg/codex/node {.all.}
import ../../asynctest
import ../../examples
import ../helpers
import ./helpers
privateAccess(CodexNodeRef) # enable access to private fields
logScope:
topics = "testSlotRepair"
proc fetchStreamData(stream: LPStream, datasetSize: int): Future[seq[byte]] {.async.} =
var buf = newSeq[byte](datasetSize)
await stream.readExactly(addr buf[0], datasetSize)
buf
proc flatten[T](s: seq[seq[T]]): seq[T] =
var t = newSeq[T](0)
for ss in s:
t &= ss
return t
asyncchecksuite "Test Node - Slot Repair":
let
numNodes = 12
config = NodeConfig(
useRepoStore: true,
findFreePorts: true,
createFullNode: true,
enableBootstrap: true,
enableDiscovery: true,
)
var
manifest: Manifest
builder: Poseidon2Builder
verifiable: Manifest
verifiableBlock: bt.Block
protected: Manifest
cluster: NodesCluster
nodes: seq[CodexNodeRef]
localStores: seq[BlockStore]
setup:
cluster = generateNodes(numNodes, config = config)
nodes = cluster.nodes
localStores = cluster.localStores
teardown:
await cluster.cleanup()
localStores = @[]
nodes = @[]
test "repair slots (2,1)":
let
expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix
numBlocks = 5
datasetSize = numBlocks * DefaultBlockSize.int
ecK = 2
ecM = 1
localStore = localStores[0]
store = nodes[0].blockStore
blocks =
await makeRandomBlocks(datasetSize = datasetSize, blockSize = DefaultBlockSize)
data = (
block:
collect(newSeq):
for blk in blocks:
blk.data
).flatten()
check blocks.len == numBlocks
# Populate manifest in local store
manifest = await storeDataGetManifest(localStore, blocks)
let
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
erasure =
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool)
(await localStore.putBlock(manifestBlock)).tryGet()
protected = (await erasure.encode(manifest, ecK, ecM)).tryGet()
builder = Poseidon2Builder.new(localStore, protected).tryGet()
verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock =
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
# Populate protected manifest in local store
(await localStore.putBlock(verifiableBlock)).tryGet()
var request = StorageRequest.example
request.content.cid = verifiableBlock.cid
for i in 0 ..< protected.numSlots.uint64:
(await nodes[i + 1].onStore(request, expiry, i, nil, isRepairing = false)).tryGet()
await nodes[0].switch.stop() # acts as client
await nodes[1].switch.stop() # slot 0 missing now
# repair missing slot
(await nodes[4].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet()
await nodes[2].switch.stop() # slot 1 missing now
(await nodes[5].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet()
await nodes[3].switch.stop() # slot 2 missing now
(await nodes[6].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet()
await nodes[4].switch.stop() # slot 0 missing now
# repair missing slot from repaired slots
(await nodes[7].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet()
await nodes[5].switch.stop() # slot 1 missing now
# repair missing slot from repaired slots
(await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet()
await nodes[6].switch.stop() # slot 2 missing now
# repair missing slot from repaired slots
(await nodes[9].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet()
let
stream = (await nodes[10].retrieve(verifiableBlock.cid, local = false)).tryGet()
expectedData = await fetchStreamData(stream, datasetSize)
check expectedData.len == data.len
check expectedData == data
test "repair slots (3,2)":
let
expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix
numBlocks = 40
datasetSize = numBlocks * DefaultBlockSize.int
ecK = 3
ecM = 2
localStore = localStores[0]
store = nodes[0].blockStore
blocks =
await makeRandomBlocks(datasetSize = datasetSize, blockSize = DefaultBlockSize)
data = (
block:
collect(newSeq):
for blk in blocks:
blk.data
).flatten()
check blocks.len == numBlocks
# Populate manifest in local store
manifest = await storeDataGetManifest(localStore, blocks)
let
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
erasure =
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool)
(await localStore.putBlock(manifestBlock)).tryGet()
protected = (await erasure.encode(manifest, ecK, ecM)).tryGet()
builder = Poseidon2Builder.new(localStore, protected).tryGet()
verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock =
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
# Populate protected manifest in local store
(await localStore.putBlock(verifiableBlock)).tryGet()
var request = StorageRequest.example
request.content.cid = verifiableBlock.cid
for i in 0 ..< protected.numSlots.uint64:
(await nodes[i + 1].onStore(request, expiry, i, nil, isRepairing = false)).tryGet()
await nodes[0].switch.stop() # acts as client
await nodes[1].switch.stop() # slot 0 missing now
await nodes[3].switch.stop() # slot 2 missing now
# repair missing slots
(await nodes[6].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet()
(await nodes[7].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet()
await nodes[2].switch.stop() # slot 1 missing now
await nodes[4].switch.stop() # slot 3 missing now
# repair missing slots from repaired slots
(await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet()
(await nodes[9].onStore(request, expiry, 3.uint64, nil, isRepairing = true)).tryGet()
await nodes[5].switch.stop() # slot 4 missing now
# repair missing slot from repaired slots
(await nodes[10].onStore(request, expiry, 4.uint64, nil, isRepairing = true)).tryGet()
let
stream = (await nodes[11].retrieve(verifiableBlock.cid, local = false)).tryGet()
expectedData = await fetchStreamData(stream, datasetSize)
check expectedData.len == data.len
check expectedData == data