Add support for slot reconstruction on unavailable slot detection (#892)

Includes corresponding tests.

Various small fixes.
This commit is contained in:
Chrysostomos Nanakos 2025-04-11 12:09:54 +03:00
parent 19a5e05c13
commit acc50aa4ba
No known key found for this signature in database
8 changed files with 680 additions and 24 deletions

View File

@ -126,16 +126,16 @@ proc start*(b: Advertiser) {.async: (raises: []).} =
trace "Advertiser start"
if b.advertiserRunning:
warn "Starting advertiser twice"
return
proc onBlock(cid: Cid) {.async.} =
await b.advertiseBlock(cid)
doAssert(b.localStore.onBlockStored.isNone())
b.localStore.onBlockStored = onBlock.some
if b.advertiserRunning:
warn "Starting advertiser twice"
return
b.advertiserRunning = true
for i in 0 ..< b.concurrentAdvReqs:
let fut = b.processQueueLoop()

View File

@ -659,6 +659,139 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
return decoded.success
proc repair*(self: Erasure, encoded: Manifest, slotIdx: int): Future[?!void] {.async.} =
## Repair a protected manifest slot
##
## `encoded` - the encoded (protected) manifest to
## be repaired
##
logScope:
steps = encoded.steps
rounded_blocks = encoded.rounded
new_manifest = encoded.blocksCount
var
cids = seq[Cid].new()
decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM)
emptyBlock = newSeq[byte](encoded.blockSize.int)
cids[].setLen(encoded.blocksCount)
try:
for step in 0 ..< encoded.steps:
await sleepAsync(10.millis)
var
data = seq[seq[byte]].new()
parityData = seq[seq[byte]].new()
recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int)
data[].setLen(encoded.ecK)
parityData[].setLen(encoded.ecM)
without (dataPieces, _) =? (
await self.prepareDecodingData(
encoded, step, data, parityData, cids, emptyBlock
)
), err:
trace "Unable to prepare decoding data", error = err.msg
return failure(err)
if dataPieces >= encoded.ecK:
trace "Retrieved all the required data blocks for this step"
continue
trace "Erasure decoding data"
try:
if err =? (
await self.asyncDecode(
encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered
)
).errorOption:
return failure(err)
except CancelledError as exc:
raise exc
finally:
freeDoubleArray(recovered, encoded.ecK)
for i in 0 ..< encoded.ecK:
let idx = i * encoded.steps + step
if data[i].len <= 0 and not cids[idx].isEmpty:
var innerPtr: ptr UncheckedArray[byte] = recovered[][i]
without blk =? bt.Block.new(
innerPtr.toOpenArray(0, encoded.blockSize.int - 1)
), error:
trace "Unable to create data block!", exc = error.msg
return failure(error)
trace "Recovered data block", cid = blk.cid, index = i
if isErr (await self.store.putBlock(blk)):
trace "Unable to store data block!", cid = blk.cid
return failure("Unable to store data block!")
cids[idx] = blk.cid
except CancelledError as exc:
trace "Erasure coding decoding cancelled"
raise exc # cancellation needs to be propagated
except CatchableError as exc:
trace "Erasure coding decoding error", exc = exc.msg
return failure(exc)
finally:
decoder.release()
without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err:
return failure(err)
without treeCid =? tree.rootCid, err:
return failure(err)
if treeCid != encoded.originalTreeCid:
return failure(
"Original tree root differs from the tree root computed out of recovered data"
)
if err =? (await self.store.putAllProofs(tree)).errorOption:
return failure(err)
without repaired =? (
await self.encode(
Manifest.new(encoded), encoded.ecK, encoded.ecM, encoded.protectedStrategy
)
), err:
return failure(err)
if repaired.treeCid != encoded.treeCid:
return failure(
"Original tree root differs from the repaired tree root encoded out of recovered data"
)
let
groupStrategy = ?encoded.protectedStrategy.init(
firstIndex = 0,
lastIndex = encoded.blocksCount - 1,
iterations = encoded.numSlots
).catch
groupIndices = toSeq(groupStrategy.getIndicies(slotIdx))
strategy = ?encoded.protectedStrategy.init(
firstIndex = 0,
lastIndex = encoded.blocksCount - 1,
iterations = encoded.steps
).catch
for step in 0 ..< encoded.steps:
let indices = strategy.getIndicies(step)
for i in indices:
if i notin groupIndices:
if isErr (await self.store.delBlock(encoded.treeCid, i)):
trace "Failed to remove block from tree ", treeCid = encoded.treeCid, index = i
for i, cid in cids[0 ..< encoded.originalBlocksCount]:
if i notin groupIndices:
if isErr (await self.store.delBlock(treeCid, i)):
trace "Failed to remove original block from tree ", treeCid = treeCid, index = i
return success()
proc start*(self: Erasure) {.async.} =
return

View File

@ -629,10 +629,6 @@ proc onStore(
trace "Received a request to store a slot"
# TODO: Use the isRepairing to manage the slot download.
# If isRepairing is true, the slot has to be repaired before
# being downloaded.
without manifest =? (await self.fetchManifest(cid)), err:
trace "Unable to fetch manifest for cid", cid, err = err.msg
return failure(err)
@ -665,32 +661,45 @@ proc onStore(
return success()
without indexer =?
manifest.verifiableStrategy.init(0, manifest.blocksCount - 1, manifest.numSlots).catch,
err:
trace "Unable to create indexing strategy from protected manifest", err = err.msg
return failure(err)
if slotIdx > int.high.uint64:
error "Cannot cast slot index to int", slotIndex = slotIdx
return
without blksIter =? indexer.getIndicies(slotIdx.int).catch, err:
trace "Unable to get indicies from strategy", err = err.msg
return failure(err)
if isRepairing:
trace "start repairing slot", slotIdx
try:
let erasure = Erasure.new(
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
)
if err =? (await erasure.repair(manifest, slotIdx.int)).errorOption:
error "Unable to erasure decode repairing manifest",
cid = manifest.treeCid, exc = err.msg
return failure(err)
except CatchableError as exc:
error "Error erasure decoding repairing manifest",
cid = manifest.treeCid, exc = exc.msg
return failure(exc.msg)
else:
without indexer =?
manifest.verifiableStrategy.init(0, manifest.blocksCount - 1, manifest.numSlots).catch,
err:
trace "Unable to create indexing strategy from protected manifest", err = err.msg
return failure(err)
if err =? (
await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry)
).errorOption:
trace "Unable to fetch blocks", err = err.msg
return failure(err)
without blksIter =? indexer.getIndicies(slotIdx.int).catch, err:
trace "Unable to get indicies from strategy", err = err.msg
return failure(err)
if err =? (
await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry)
).errorOption:
trace "Unable to fetch blocks", err = err.msg
return failure(err)
without slotRoot =? (await builder.buildSlot(slotIdx.int)), err:
trace "Unable to build slot", err = err.msg
return failure(err)
trace "Slot successfully retrieved and reconstructed"
if cid =? slotRoot.toSlotCid() and cid != manifest.slotRoots[slotIdx]:
trace "Slot root mismatch",
manifest = manifest.slotRoots[slotIdx.int], recovered = slotRoot.toSlotCid()

View File

@ -128,6 +128,13 @@ method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
trace "Deleting block from network store", cid
return self.localStore.delBlock(cid)
method delBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!void] =
## Delete a block from the blockstore
##
trace "Deleting block from network store", treeCid, index
return self.localStore.delBlock(treeCid, index)
{.pop.}
method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} =

View File

@ -0,0 +1,190 @@
import std/options
import std/importutils
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/taskpools
import pkg/nitro
import pkg/codexdht/discv5/protocol as discv5
import pkg/codex/logutils
import pkg/codex/stores
import pkg/codex/contracts
import pkg/codex/blockexchange
import pkg/codex/chunker
import pkg/codex/slots
import pkg/codex/manifest
import pkg/codex/discovery
import pkg/codex/erasure
import pkg/codex/blocktype as bt
import pkg/codex/indexingstrategy
import pkg/codex/nat
import pkg/codex/utils/natutils
import pkg/chronos/transports/stream
import pkg/codex/node {.all.}
import ../../asynctest
import ../../examples
import ../helpers
privateAccess(CodexNodeRef) # enable access to private fields
logScope:
topics = "testSlotRepair"
proc nextFreePort*(startPort: int): Future[int] {.async.} =
proc client(server: StreamServer, transp: StreamTransport) {.async: (raises: []).} =
await transp.closeWait()
var port = startPort
while true:
try:
let host = initTAddress("127.0.0.1", port)
var server = createStreamServer(host, client, {ReuseAddr})
await server.closeWait()
return port
except TransportOsError:
inc port
asyncchecksuite "Test Node - Slot Repair":
var
manifest: Manifest
builder: Poseidon2Builder
verifiable: Manifest
verifiableBlock: bt.Block
protected: Manifest
localStores: seq[CacheStore] = newSeq[CacheStore]()
nodes: seq[CodexNodeRef] = newSeq[CodexNodeRef]()
let
numNodes = 11
numBlocks = 24
ecK = 3
ecM = 2
setup:
var bootstrapNodes: seq[SignedPeerRecord] = @[]
for i in 0 ..< numNodes:
let
listenPort = await nextFreePort(8080 + 2 * i)
bindPort = await nextFreePort(listenPort + 1)
listenAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/" & $listenPort).expect(
"invalid multiaddress"
)
switch = newStandardSwitch(
transportFlags = {ServerFlags.ReuseAddr},
sendSignedPeerRecord = true,
addrs = listenAddr,
)
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
blockDiscoveryStore = TempLevelDb.new().newDb()
localStore = CacheStore.new()
blockDiscovery = Discovery.new(
switch.peerInfo.privateKey,
announceAddrs = @[listenAddr],
bindPort = bindPort.Port,
store = blockDiscoveryStore,
bootstrapNodes = bootstrapNodes,
)
discovery = DiscoveryEngine.new(
localStore, peerStore, network, blockDiscovery, pendingBlocks
)
advertiser = Advertiser.new(localStore, blockDiscovery)
engine = BlockExcEngine.new(
localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks
)
store = NetworkStore.new(engine, localStore)
node = CodexNodeRef.new(
switch = switch,
networkStore = store,
engine = engine,
prover = Prover.none,
discovery = blockDiscovery,
taskpool = Taskpool.new(),
)
await switch.peerInfo.update()
switch.mount(network)
let (announceAddrs, discoveryAddrs) = nattedAddress(
NatConfig(hasExtIp: false, nat: NatNone), switch.peerInfo.addrs, bindPort.Port
)
node.discovery.updateAnnounceRecord(announceAddrs)
node.discovery.updateDhtRecord(discoveryAddrs)
check node.discovery.dhtRecord.isSome
bootstrapNodes.add !node.discovery.dhtRecord
localStores.add localStore
nodes.add node
for node in nodes:
await node.switch.start()
await node.start()
let
localStore = localStores[0]
store = nodes[0].blockStore
let blocks =
await makeRandomBlocks(datasetSize = numBlocks * 64.KiBs.int, blockSize = 64.KiBs)
assert blocks.len == numBlocks
# Populate manifest in local store
manifest = await storeDataGetManifest(localStore, blocks)
let
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new)
(await localStore.putBlock(manifestBlock)).tryGet()
protected = (await erasure.encode(manifest, ecK, ecM)).tryGet()
builder = Poseidon2Builder.new(localStore, protected).tryGet()
verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock =
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
# Populate protected manifest in local store
(await localStore.putBlock(verifiableBlock)).tryGet()
teardown:
for node in nodes:
await node.switch.stop()
localStores = @[]
nodes = @[]
test "repair slot":
var request = StorageRequest.example
request.content.cid = verifiableBlock.cid
request.ask.slots = protected.numSlots.uint64
request.ask.slotSize = DefaultBlockSize.uint64
for i in 0 ..< protected.numSlots.uint64:
(await nodes[i + 1].onStore(request, i, nil, isRepairing = false)).tryGet()
await nodes[0].switch.stop() # acts as client
await nodes[1].switch.stop() # slot 0 missing now
await nodes[3].switch.stop() # slot 2 missing now
# repair missing slot
(await nodes[6].onStore(request, 0.uint64, nil, isRepairing = true)).tryGet()
(await nodes[7].onStore(request, 2.uint64, nil, isRepairing = true)).tryGet()
await nodes[2].switch.stop() # slot 1 missing now
await nodes[4].switch.stop() # slot 3 missing now
(await nodes[8].onStore(request, 1.uint64, nil, isRepairing = true)).tryGet()
(await nodes[9].onStore(request, 3.uint64, nil, isRepairing = true)).tryGet()
await nodes[5].switch.stop() # slot 4 missing now
(await nodes[10].onStore(request, 4.uint64, nil, isRepairing = true)).tryGet()

View File

@ -1,4 +1,5 @@
import ./node/testnode
import ./node/testcontracts
import ./node/testslotrepair
{.warning[UnusedImport]: off.}

View File

@ -0,0 +1,315 @@
import pkg/questionable
import pkg/codex/logutils
import ../contracts/time
import ../contracts/deployment
import ../codex/helpers
import ../examples
import ./marketplacesuite
import ./nodeconfigs
export logutils
logScope:
topics = "integration test slot repair"
marketplacesuite "SP Slot Repair":
const minPricePerBytePerSecond = 1.u256
const collateralPerByte = 1.u256
const blocks = 3
const ecNodes = 5
const ecTolerance = 2
test "repair from local store",
NodeConfigs(
clients: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics("node", "erasure").some,
providers: CodexConfigs
.init(nodes = 2)
.withSimulateProofFailures(idx = 1, failEveryNProofs = 1)
.debug()
.withLogFile()
.withLogTopics("marketplace", "sales", "reservations", "node", "statemachine").some,
validators: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics("validator").some,
):
let client0 = clients()[0]
let provider0 = providers()[0]
let provider1 = providers()[1]
let expiry = 10.periods
let duration = expiry + 10.periods
let data = await RandomChunker.example(blocks = blocks)
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
let availability = (
await provider0.client.postAvailability(
totalSize = 4 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 100 * slotSize * collateralPerByte,
)
).get
discard await provider1.client.postAvailability(
totalSize = slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 100 * slotSize * collateralPerByte,
)
var filledSlotIds: seq[SlotId] = @[]
proc onSlotFilled(eventResult: ?!SlotFilled) =
assert not eventResult.isErr
let event = !eventResult
let slotId = slotId(event.requestId, event.slotIndex)
filledSlotIds.add slotId
let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
var freedSlotId = none SlotId
proc onSlotFreed(eventResult: ?!SlotFreed) =
assert not eventResult.isErr
let event = !eventResult
let slotId = slotId(event.requestId, event.slotIndex)
assert slotId in filledSlotIds
filledSlotIds.del(filledSlotIds.find(slotId))
freedSlotId = some(slotId)
let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
let cid = (await client0.client.upload(data)).get
let purchaseId = await client0.client.requestStorage(
cid,
expiry = expiry,
duration = duration,
nodes = ecNodes,
tolerance = ecTolerance,
proofProbability = 1.u256,
)
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000
)
await client0.stop()
await provider0.client.patchAvailability(
availabilityId = availability.id,
totalSize = (5 * slotSize.truncate(uint64)).uint64.some,
duration = duration.uint64.some,
minPricePerBytePerSecond = minPricePerBytePerSecond.some,
totalCollateral = (100 * slotSize * collateralPerByte).some,
)
check eventually(freedSlotId.isSome, timeout = (duration - expiry).int * 1000)
check eventually(
freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000
)
await filledSubscription.unsubscribe()
await slotFreedsubscription.unsubscribe()
test "repair from remote store",
NodeConfigs(
clients: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics("node", "erasure").some,
providers: CodexConfigs
.init(nodes = 3)
.withSimulateProofFailures(idx = 1, failEveryNProofs = 1)
.debug()
.withLogFile()
.withLogTopics("marketplace", "sales", "reservations", "node", "statemachine").some,
validators: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics("validator").some,
):
let client0 = clients()[0]
let provider0 = providers()[0]
let provider1 = providers()[1]
let provider2 = providers()[2]
let expiry = 10.periods
let duration = expiry + 10.periods
let data = await RandomChunker.example(blocks = blocks)
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
discard await provider0.client.postAvailability(
totalSize = 4 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 100 * slotSize * collateralPerByte,
)
discard await provider1.client.postAvailability(
totalSize = slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 100 * slotSize * collateralPerByte,
)
var filledSlotIds: seq[SlotId] = @[]
proc onSlotFilled(eventResult: ?!SlotFilled) =
assert not eventResult.isErr
let event = !eventResult
let slotId = slotId(event.requestId, event.slotIndex)
filledSlotIds.add slotId
let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
var freedSlotId = none SlotId
proc onSlotFreed(eventResult: ?!SlotFreed) =
assert not eventResult.isErr
let event = !eventResult
let slotId = slotId(event.requestId, event.slotIndex)
assert slotId in filledSlotIds
filledSlotIds.del(filledSlotIds.find(slotId))
freedSlotId = some(slotId)
let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
let cid = (await client0.client.upload(data)).get
let purchaseId = await client0.client.requestStorage(
cid,
expiry = expiry,
duration = duration,
nodes = ecNodes,
tolerance = ecTolerance,
proofProbability = 1.u256,
)
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000
)
await client0.stop()
discard await provider2.client.postAvailability(
totalSize = slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 100 * slotSize * collateralPerByte,
)
check eventually(freedSlotId.isSome, timeout = (duration - expiry).int * 1000)
await provider1.stop()
check eventually(
freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000
)
await filledSubscription.unsubscribe()
await slotFreedsubscription.unsubscribe()
test "storage provider slot repair",
NodeConfigs(
clients: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics("node", "erasure").some,
providers: CodexConfigs
.init(nodes = 4)
.debug()
.withLogFile()
.withLogTopics("marketplace", "sales", "reservations", "node").some,
validators: CodexConfigs
.init(nodes = 1)
.debug()
.withLogFile()
.withLogTopics("validator").some,
):
let client0 = clients()[0]
let expiry = 10.periods
let duration = expiry + 10.periods
let size = 0xFFFFFF.uint64
let data = await RandomChunker.example(blocks = blocks)
let datasetSize =
datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance)
await createAvailabilities(
size, duration, datasetSize * collateralPerByte, minPricePerBytePerSecond
)
let cid = (await client0.client.upload(data)).get
let purchaseId = await client0.client.requestStorage(
cid,
expiry = expiry,
duration = duration,
collateralPerByte = collateralPerByte,
nodes = ecNodes,
tolerance = ecTolerance,
proofProbability = 1.u256,
pricePerBytePerSecond = minPricePerBytePerSecond,
)
let requestId = (await client0.client.requestId(purchaseId)).get
var filledSlotIds: seq[SlotId] = @[]
proc onSlotFilled(eventResult: ?!SlotFilled) =
assert not eventResult.isErr
let event = !eventResult
if event.requestId == requestId:
let slotId = slotId(event.requestId, event.slotIndex)
filledSlotIds.add slotId
let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000
)
await client0.stop()
check eventually(
filledSlotIds.len == blocks, timeout = (duration - expiry).int * 1000
)
trace "all slots have been filled"
var slotWasFreed = false
proc onSlotFreed(event: ?!SlotFreed) =
if event.isOk and event.value.requestId == requestId:
trace "slot was freed", slotIndex = $event.value.slotIndex
slotWasFreed = true
let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
block provider_search:
while true:
for slotId in filledSlotIds:
for provider in providers():
if (await provider.client.saleStateIs(slotId, "SaleProving")):
await provider.stop()
break provider_search
await sleepAsync(100.milliseconds)
check eventually(slotWasFreed, timeout = (duration - expiry).int * 1000)
await slotFreedsubscription.unsubscribe()
check eventually(
filledSlotIds.len > blocks, timeout = (duration - expiry).int * 1000
)
trace "freed slot was filled"
await filledSubscription.unsubscribe()

View File

@ -9,5 +9,6 @@ import ./integration/testmarketplace
import ./integration/testproofs
import ./integration/testvalidator
import ./integration/testecbug
import ./integration/testslotrepair
{.warning[UnusedImport]: off.}