diff --git a/codex/codex.nim b/codex/codex.nim index 2f5cf860..1d77ea9f 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -56,8 +56,7 @@ type EthWallet = ethers.Wallet proc bootstrapInteractions( - s: CodexServer -): Future[void] {.async.} = + s: CodexServer): Future[void] {.async.} = ## bootstrap interactions and return contracts ## using clients, hosts, validators pairings ## @@ -190,10 +189,9 @@ proc stop*(s: CodexServer) {.async.} = s.runHandle.complete() proc new*( - T: type CodexServer, - config: CodexConf, - privateKey: CodexPrivateKey -): CodexServer = + T: type CodexServer, + config: CodexConf, + privateKey: CodexPrivateKey): CodexServer = ## create CodexServer including setting up datastore, repostore, etc let switch = SwitchBuilder @@ -279,5 +277,4 @@ proc new*( codexNode: codexNode, restServer: restServer, repoStore: repoStore, - maintenance: maintenance - ) + maintenance: maintenance) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index b6977145..43eb20be 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -112,7 +112,8 @@ proc getPendingBlocks( return await completedFut else: let (_, index) = await completedFut - raise newException(CatchableError, + raise newException( + CatchableError, "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index) Iter.new(genNext, isFinished) @@ -134,7 +135,7 @@ proc prepareEncodingData( lastIndex = params.rounded - 1, numberOfIterations = params.steps ) - indicies = strategy.getIndicies(step) + indicies = toSeq(strategy.getIndicies(step)) pendingBlocksIter = self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount)) var resolved = 0 @@ -183,7 +184,7 @@ proc prepareDecodingData( lastIndex = encoded.blocksCount - 1, numberOfIterations = encoded.steps ) - indicies = strategy.getIndicies(step) + indicies = toSeq(strategy.getIndicies(step)) pendingBlocksIter = self.getPendingBlocks(encoded, indicies) var diff --git a/codex/indexingstrategy.nim b/codex/indexingstrategy.nim index b7774765..e4006ff1 100644 --- a/codex/indexingstrategy.nim +++ b/codex/indexingstrategy.nim @@ -1,14 +1,20 @@ import std/sequtils -import ./utils -# I'm choosing to use an assert here because: -# 1. These are a programmer errors and *should not* happen during application runtime. -# 2. Users don't have to deal with Result types. +import ./errors +import ./utils +import ./utils/asynciter + +{.push raises: [].} type # Representing a strategy for grouping indices (of blocks usually) # Given an interation-count as input, will produce a seq of # selected indices. + + IndexingError* = object of CodexError + IndexingWrongIndexError* = object of IndexingError + IndexingWrongIterationsError* = object of IndexingError + IndexingStrategy* = ref object of RootObj firstIndex*: int # Lowest index that can be returned lastIndex*: int # Highest index that can be returned @@ -27,18 +33,71 @@ type # 2 => 2, 5, 8 SteppedIndexingStrategy* = ref object of IndexingStrategy -proc assertIteration(self: IndexingStrategy, iteration: int): void = +proc checkIteration( + self: IndexingStrategy, + iteration: int): void {.raises: [IndexingError].} = if iteration >= self.numberOfIterations: - raiseAssert("Indexing iteration can't be greater than or equal to numberOfIterations.") + raise newException( + IndexingError, + "Indexing iteration can't be greater than or equal to numberOfIterations.") -method getIndicies*(self: IndexingStrategy, iteration: int): seq[int] {.base.} = +method getIndicies*( + self: IndexingStrategy, + iteration: int): Iter[int] {.base, raises: [IndexingError].} = raiseAssert("Not implemented") -proc new*(T: type IndexingStrategy, firstIndex, lastIndex, numberOfIterations: int): T = +proc getIter(first, last, step: int): Iter[int] = + var + finish = false + cur = first + proc get(): int = + result = cur + cur += step + + if cur > last: + finish = true + + proc isFinished(): bool = + finish + + Iter.new(get, isFinished) + +method getIndicies*( + self: LinearIndexingStrategy, + iteration: int): Iter[int] {.raises: [IndexingError].} = + + self.checkIteration(iteration) + + let + first = self.firstIndex + iteration * (self.step + 1) + last = min(first + self.step, self.lastIndex) + + getIter(first, last, 1) + +method getIndicies*( + self: SteppedIndexingStrategy, + iteration: int): Iter[int] {.raises: [IndexingError].} = + + self.checkIteration(iteration) + + let + first = self.firstIndex + iteration + last = self.lastIndex + + getIter(first, last, self.numberOfIterations) + +proc new*( + T: type IndexingStrategy, + firstIndex, lastIndex, numberOfIterations: int): T {.raises: [IndexingError].} = if firstIndex > lastIndex: - raiseAssert("firstIndex (" & $firstIndex & ") can't be greater than lastIndex (" & $lastIndex & ")") + raise newException( + IndexingWrongIndexError, + "firstIndex (" & $firstIndex & ") can't be greater than lastIndex (" & $lastIndex & ")") + if numberOfIterations <= 0: - raiseAssert("numberOfIteration (" & $numberOfIterations & ") must be greater than zero.") + raise newException( + IndexingWrongIterationsError, + "numberOfIteration (" & $numberOfIterations & ") must be greater than zero.") T( firstIndex: firstIndex, @@ -46,16 +105,3 @@ proc new*(T: type IndexingStrategy, firstIndex, lastIndex, numberOfIterations: i numberOfIterations: numberOfIterations, step: divUp((lastIndex - firstIndex), numberOfIterations) ) - -method getIndicies*(self: LinearIndexingStrategy, iteration: int): seq[int] = - self.assertIteration(iteration) - - let - first = self.firstIndex + iteration * (self.step + 1) - last = min(first + self.step, self.lastIndex) - - toSeq(countup(first, last, 1)) - -method getIndicies*(self: SteppedIndexingStrategy, iteration: int): seq[int] = - self.assertIteration(iteration) - toSeq(countup(self.firstIndex + iteration, self.lastIndex, self.numberOfIterations)) diff --git a/codex/node.nim b/codex/node.nim index 16d10821..affc4307 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [].} + import std/options import std/sequtils import std/strformat @@ -37,12 +39,8 @@ import ./streams import ./erasure import ./discovery import ./contracts -import ./node/batch import ./utils import ./errors -import ./merkletree - -export batch logScope: topics = "codex node" @@ -51,14 +49,12 @@ const FetchBatch = 200 type - CodexError = object of CatchableError - Contracts* = tuple client: ?ClientInteractions host: ?HostInteractions validator: ?ValidatorInteractions - CodexNodeRef* = ref object + CodexNode* = object switch: Switch networkId: PeerId blockStore: BlockStore @@ -67,8 +63,12 @@ type discovery: Discovery contracts*: Contracts clock*: Clock + storage*: Contracts - OnManifest* = proc(cid: Cid, manifest: Manifest): void {.gcsafe, closure.} + CodexNodeRef* = ref CodexNode + + OnManifest* = proc(cid: Cid, manifest: Manifest): void {.gcsafe, raises: [].} + BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].} func switch*(self: CodexNodeRef): Switch = return self.switch @@ -85,7 +85,9 @@ func erasure*(self: CodexNodeRef): Erasure = func discovery*(self: CodexNodeRef): Discovery = return self.discovery -proc storeManifest*(self: CodexNodeRef, manifest: Manifest): Future[?!bt.Block] {.async.} = +proc storeManifest*( + self: CodexNodeRef, + manifest: Manifest): Future[?!bt.Block] {.async.} = without encodedVerifiable =? manifest.encode(), err: trace "Unable to encode manifest" return failure(err) @@ -101,21 +103,21 @@ proc storeManifest*(self: CodexNodeRef, manifest: Manifest): Future[?!bt.Block] success blk proc findPeer*( - node: CodexNodeRef, + self: CodexNodeRef, peerId: PeerId): Future[?PeerRecord] {.async.} = ## Find peer using the discovery service from the given CodexNode ## - return await node.discovery.findPeer(peerId) + return await self.discovery.findPeer(peerId) proc connect*( - node: CodexNodeRef, + self: CodexNodeRef, peerId: PeerId, addrs: seq[MultiAddress] ): Future[void] = - node.switch.connect(peerId, addrs) + self.switch.connect(peerId, addrs) proc fetchManifest*( - node: CodexNodeRef, + self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.} = ## Fetch and decode a manifest block ## @@ -125,7 +127,7 @@ proc fetchManifest*( trace "Retrieving manifest for cid", cid - without blk =? await node.blockStore.getBlock(BlockAddress.init(cid)), err: + without blk =? await self.blockStore.getBlock(BlockAddress.init(cid)), err: trace "Error retrieve manifest block", cid, err = err.msg return failure err @@ -139,15 +141,21 @@ proc fetchManifest*( return manifest.success -proc updateExpiry*(node: CodexNodeRef, manifestCid: Cid, expiry: SecondsSince1970): Future[?!void] {.async.} = - without manifest =? await node.fetchManifest(manifestCid), error: +proc updateExpiry*( + self: CodexNodeRef, + manifestCid: Cid, + expiry: SecondsSince1970): Future[?!void] {.async.} = + + without manifest =? await self.fetchManifest(manifestCid), error: trace "Unable to fetch manifest for cid", manifestCid return failure(error) try: - let ensuringFutures = Iter.fromSlice(0.. self.blockStore.getBlock(BlockAddress.init(cid, i)) + ) - trace "Fetching blocks in batches of", size = batchSize - - let iter = Iter.fromSlice(0.. node.blockStore.getBlock(BlockAddress.init(manifest.treeCid, i))) - - for batchNum in 0.. manifest.slotRoots.high: + trace "Slot index not in manifest", slotIdx + return failure(newException(CodexError, "Slot index not in manifest")) + + proc updateExpiry(blocks: seq[bt.Block]): Future[?!void] {.async.} = + trace "Updating expiry for blocks", blocks = blocks.len + + let ensureExpiryFutures = blocks.mapIt(self.blockStore.ensureExpiry(it.cid, expiry)) + if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption: + return failure(updateExpiryErr) + + echo "blocksCb.isNil: ", blocksCb.isNil + if not blocksCb.isNil and err =? (await blocksCb(blocks)).errorOption: + trace "Unable to process blocks", err = err.msg + return failure(err) + + return success() + + if blksIter =? builder.slotIndicies(slotIdx) and + err =? (await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry)).errorOption: + trace "Unable to fetch blocks", err = err.msg + return failure(err) + + without slotRoot =? (await builder.buildSlot(slotIdx.Natural)), err: + trace "Unable to build slot", err = err.msg + return failure(err) + + if cid =? slotRoot.toSlotCid() and cid != manifest.slotRoots[slotIdx.int]: + trace "Slot root mismatch", manifest = manifest.slotRoots[slotIdx.int], recovered = slotRoot.toSlotCid() + return failure(newException(CodexError, "Slot root mismatch")) + + return success() + +proc onExpiryUpdate( + self: CodexNodeRef, + rootCid: string, + expiry: SecondsSince1970): Future[?!void] {.async.} = + without cid =? Cid.init(rootCid): + trace "Unable to parse Cid", cid + let error = newException(CodexError, "Unable to parse Cid") + return failure(error) + + return await self.updateExpiry(cid, expiry) + +proc onClear( + self: CodexNodeRef, + request: StorageRequest, + slotIndex: UInt256) = +# TODO: remove data from local storage + discard + +proc onProve( + self: CodexNodeRef, + slot: Slot, + challenge: ProofChallenge): Future[seq[byte]] {.async.} = + # TODO: generate proof + return @[42'u8] + +proc start*(self: CodexNodeRef) {.async.} = + if not self.engine.isNil: + await self.engine.start() + + if not self.erasure.isNil: + await self.erasure.start() + + if not self.discovery.isNil: + await self.discovery.start() + + if not self.clock.isNil: + await self.clock.start() + + if hostContracts =? self.contracts.host: + hostContracts.sales.onStore = + proc( + request: StorageRequest, + slot: UInt256, + onBatch: BatchProc): Future[?!void] = self.onStore(request, slot, onBatch) + + hostContracts.sales.onExpiryUpdate = + proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] = + self.onExpiryUpdate(rootCid, expiry) + + hostContracts.sales.onClear = + proc(request: StorageRequest, slotIndex: UInt256) = + # TODO: remove data from local storage + self.onClear(request, slotIndex) + + hostContracts.sales.onProve = + proc(slot: Slot, challenge: ProofChallenge): Future[seq[byte]] = + # TODO: generate proof + self.onProve(slot, challenge) + + try: + await hostContracts.start() + except CatchableError as error: + error "Unable to start host contract interactions", error=error.msg + self.contracts.host = HostInteractions.none + + if clientContracts =? self.contracts.client: + try: + await clientContracts.start() + except CatchableError as error: + error "Unable to start client contract interactions: ", error=error.msg + self.contracts.client = ClientInteractions.none + + if validatorContracts =? self.contracts.validator: + try: + await validatorContracts.start() + except CatchableError as error: + error "Unable to start validator contract interactions: ", error=error.msg + self.contracts.validator = ValidatorInteractions.none + + self.networkId = self.switch.peerInfo.peerId + notice "Started codex node", id = $self.networkId, addrs = self.switch.peerInfo.addrs + +proc stop*(self: CodexNodeRef) {.async.} = + trace "Stopping node" + + if not self.engine.isNil: + await self.engine.stop() + + if not self.erasure.isNil: + await self.erasure.stop() + + if not self.discovery.isNil: + await self.discovery.stop() + + if not self.clock.isNil: + await self.clock.stop() + + if clientContracts =? self.contracts.client: + await clientContracts.stop() + + if hostContracts =? self.contracts.host: + await hostContracts.stop() + + if validatorContracts =? self.contracts.validator: + await validatorContracts.stop() + + if not self.blockStore.isNil: + await self.blockStore.close + proc new*( T: type CodexNodeRef, switch: Switch, @@ -466,8 +660,9 @@ proc new*( erasure: Erasure, discovery: Discovery, contracts = Contracts.default): CodexNodeRef = - ## Create new instance of a Codex node, call `start` to run it + ## Create new instance of a Codex self, call `start` to run it ## + CodexNodeRef( switch: switch, blockStore: store, @@ -475,119 +670,3 @@ proc new*( erasure: erasure, discovery: discovery, contracts: contracts) - -proc start*(node: CodexNodeRef) {.async.} = - if not node.engine.isNil: - await node.engine.start() - - if not node.erasure.isNil: - await node.erasure.start() - - if not node.discovery.isNil: - await node.discovery.start() - - if not node.clock.isNil: - await node.clock.start() - - if hostContracts =? node.contracts.host: - # TODO: remove Sales callbacks, pass BlockStore and StorageProofs instead - hostContracts.sales.onStore = proc(request: StorageRequest, - slot: UInt256, - onBatch: BatchProc): Future[?!void] {.async.} = - ## store data in local storage - ## - - without cid =? Cid.init(request.content.cid): - trace "Unable to parse Cid", cid - let error = newException(CodexError, "Unable to parse Cid") - return failure(error) - - without manifest =? await node.fetchManifest(cid), error: - trace "Unable to fetch manifest for cid", cid - return failure(error) - - trace "Fetching block for manifest", cid - let expiry = request.expiry.toSecondsSince1970 - proc expiryUpdateOnBatch(blocks: seq[bt.Block]): Future[?!void] {.async.} = - let ensureExpiryFutures = blocks.mapIt(node.blockStore.ensureExpiry(it.cid, expiry)) - if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption: - return failure(updateExpiryErr) - - if not onBatch.isNil and onBatchErr =? (await onBatch(blocks)).errorOption: - return failure(onBatchErr) - - return success() - - if fetchErr =? (await node.fetchBatched(manifest, onBatch = expiryUpdateOnBatch)).errorOption: - let error = newException(CodexError, "Unable to retrieve blocks") - error.parent = fetchErr - return failure(error) - - return success() - - hostContracts.sales.onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} = - without cid =? Cid.init(rootCid): - trace "Unable to parse Cid", cid - let error = newException(CodexError, "Unable to parse Cid") - return failure(error) - - return await node.updateExpiry(cid, expiry) - - hostContracts.sales.onClear = proc(request: StorageRequest, - slotIndex: UInt256) = - # TODO: remove data from local storage - discard - - hostContracts.sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[seq[byte]] {.async.} = - # TODO: generate proof - return @[42'u8] - - try: - await hostContracts.start() - except CatchableError as error: - error "Unable to start host contract interactions", error=error.msg - node.contracts.host = HostInteractions.none - - if clientContracts =? node.contracts.client: - try: - await clientContracts.start() - except CatchableError as error: - error "Unable to start client contract interactions: ", error=error.msg - node.contracts.client = ClientInteractions.none - - if validatorContracts =? node.contracts.validator: - try: - await validatorContracts.start() - except CatchableError as error: - error "Unable to start validator contract interactions: ", error=error.msg - node.contracts.validator = ValidatorInteractions.none - - node.networkId = node.switch.peerInfo.peerId - notice "Started codex node", id = $node.networkId, addrs = node.switch.peerInfo.addrs - -proc stop*(node: CodexNodeRef) {.async.} = - trace "Stopping node" - - if not node.engine.isNil: - await node.engine.stop() - - if not node.erasure.isNil: - await node.erasure.stop() - - if not node.discovery.isNil: - await node.discovery.stop() - - if not node.clock.isNil: - await node.clock.stop() - - if clientContracts =? node.contracts.client: - await clientContracts.stop() - - if hostContracts =? node.contracts.host: - await hostContracts.stop() - - if validatorContracts =? node.contracts.validator: - await validatorContracts.stop() - - if not node.blockStore.isNil: - await node.blockStore.close diff --git a/codex/node/batch.nim b/codex/node/batch.nim deleted file mode 100644 index bbe5f0dd..00000000 --- a/codex/node/batch.nim +++ /dev/null @@ -1,7 +0,0 @@ -import pkg/chronos -import pkg/questionable/results -import pkg/upraises -import ../blocktype as bt - -type - BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, upraises:[].} diff --git a/codex/sales.nim b/codex/sales.nim index 17a97f77..d7a265cb 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -40,6 +40,7 @@ import ./utils/trackedfutures export stint export reservations export salesagent +export salescontext logScope: topics = "sales marketplace" diff --git a/codex/sales/salescontext.nim b/codex/sales/salescontext.nim index 4afa5f23..5d0b8308 100644 --- a/codex/sales/salescontext.nim +++ b/codex/sales/salescontext.nim @@ -1,11 +1,12 @@ import pkg/questionable import pkg/questionable/results import pkg/upraises -import ../node/batch + import ../market import ../clock import ./slotqueue import ./reservations +import ../blocktype as bt type SalesContext* = ref object @@ -22,9 +23,10 @@ type slotQueue*: SlotQueue simulateProofFailures*: int + BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].} OnStore* = proc(request: StorageRequest, slot: UInt256, - onBatch: BatchProc): Future[?!void] {.gcsafe, upraises: [].} + blocksCb: BlocksCb): Future[?!void] {.gcsafe, upraises: [].} OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[seq[byte]] {.gcsafe, upraises: [].} OnExpiryUpdate* = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.gcsafe, upraises: [].} OnClear* = proc(request: StorageRequest, diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index f79f0d68..741cdaee 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -51,7 +51,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} reservationId = reservation.id availabilityId = reservation.availabilityId - proc onBatch(blocks: seq[bt.Block]): Future[?!void] {.async.} = + proc onBlocks(blocks: seq[bt.Block]): Future[?!void] {.async.} = # release batches of blocks as they are written to disk and # update availability size var bytes: uint = 0 @@ -66,7 +66,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} trace "Starting download" if err =? (await onStore(request, data.slotIndex, - onBatch)).errorOption: + onBlocks)).errorOption: return some State(SaleErrored(error: err)) trace "Download complete" diff --git a/codex/slots/builder.nim b/codex/slots/builder.nim index 177c17c4..9ee63f05 100644 --- a/codex/slots/builder.nim +++ b/codex/slots/builder.nim @@ -26,9 +26,14 @@ import ../merkletree import ../stores import ../manifest import ../utils +import ../utils/asynciter import ../utils/digest import ../utils/poseidon2digest +import ./converters + +export converters + const # TODO: Unified with the CellSize specified in branch "data-sampler" # in the proving circuit. @@ -116,54 +121,12 @@ func numBlockCells*(self: SlotsBuilder): Natural = self.manifest.blockSize.int div self.cellSize -func toCellCid*(cell: Poseidon2Hash): ?!Cid = - let - cellMhash = ? MultiHash.init(Pos2Bn128MrklCodec, cell.toBytes).mapFailure - cellCid = ? Cid.init(CIDv1, CodexSlotCellCodec, cellMhash).mapFailure +func slotIndicies*(self: SlotsBuilder, slot: Natural): ?!Iter[int] = + ## Returns the slot indices. + ## TODO: should return an iterator + ## - success cellCid - -func toSlotCid*(root: Poseidon2Hash): ?!Cid = - let - mhash = ? MultiHash.init($multiCodec("identity"), root.toBytes).mapFailure - treeCid = ? Cid.init(CIDv1, SlotRootCodec, mhash).mapFailure - - success treeCid - -func toSlotCids*(slotRoots: openArray[Poseidon2Hash]): ?!seq[Cid] = - success slotRoots.mapIt( ? it.toSlotCid ) - -func toSlotsRootsCid*(root: Poseidon2Hash): ?!Cid = - let - mhash = ? MultiHash.init($multiCodec("identity"), root.toBytes).mapFailure - treeCid = ? Cid.init(CIDv1, SlotProvingRootCodec, mhash).mapFailure - - success treeCid - -func toEncodableProof*( - proof: Poseidon2Proof): ?!CodexProof = - - let - encodableProof = CodexProof( - mcodec: multiCodec("identity"), # copy bytes as is - index: proof.index, - nleaves: proof.nleaves, - path: proof.path.mapIt( @(it.toBytes) )) - - success encodableProof - -func toVerifiableProof*( - proof: CodexProof): ?!Poseidon2Proof = - - let - verifiableProof = Poseidon2Proof( - index: proof.index, - nleaves: proof.nleaves, - path: proof.path.mapIt( - ? Poseidon2Hash.fromBytes(it.toArray32).toFailure - )) - - success verifiableProof + self.strategy.getIndicies(slot).catch proc getCellHashes*( self: SlotsBuilder, @@ -212,7 +175,7 @@ proc buildSlotTree*( proc buildSlot*( self: SlotsBuilder, - slotIndex: int): Future[?!Poseidon2Hash] {.async.} = + slotIndex: Natural): Future[?!Poseidon2Hash] {.async.} = ## Build a slot tree and store it in the block store. ## @@ -313,8 +276,8 @@ proc new*( let strategy = if strategy.isNone: - SteppedIndexingStrategy.new( - 0, manifest.blocksCount - 1, manifest.numSlots) + ? SteppedIndexingStrategy.new( + 0, manifest.blocksCount - 1, manifest.numSlots).catch else: strategy.get @@ -338,7 +301,7 @@ proc new*( if manifest.verifiable: if manifest.slotRoots.len == 0 or manifest.slotRoots.len != manifest.numSlots: - return failure "Manifest is verifiable but slot r" + return failure "Manifest is verifiable but slot roots are missing or invalid." let slotRoot = ? Poseidon2Hash.fromBytes( diff --git a/codex/slots/converters.nim b/codex/slots/converters.nim new file mode 100644 index 00000000..b2927be3 --- /dev/null +++ b/codex/slots/converters.nim @@ -0,0 +1,69 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils + +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results +import pkg/poseidon2 +import pkg/poseidon2/io + +import ../codextypes +import ../merkletree +import ../errors + +func toCellCid*(cell: Poseidon2Hash): ?!Cid = + let + cellMhash = ? MultiHash.init(Pos2Bn128MrklCodec, cell.toBytes).mapFailure + cellCid = ? Cid.init(CIDv1, CodexSlotCellCodec, cellMhash).mapFailure + + success cellCid + +func toSlotCid*(root: Poseidon2Hash): ?!Cid = + let + mhash = ? MultiHash.init($multiCodec("identity"), root.toBytes).mapFailure + treeCid = ? Cid.init(CIDv1, SlotRootCodec, mhash).mapFailure + + success treeCid + +func toSlotCids*(slotRoots: openArray[Poseidon2Hash]): ?!seq[Cid] = + success slotRoots.mapIt( ? it.toSlotCid ) + +func toSlotsRootsCid*(root: Poseidon2Hash): ?!Cid = + let + mhash = ? MultiHash.init($multiCodec("identity"), root.toBytes).mapFailure + treeCid = ? Cid.init(CIDv1, SlotProvingRootCodec, mhash).mapFailure + + success treeCid + +func toEncodableProof*( + proof: Poseidon2Proof): ?!CodexProof = + + let + encodableProof = CodexProof( + mcodec: multiCodec("identity"), # copy bytes as is + index: proof.index, + nleaves: proof.nleaves, + path: proof.path.mapIt( @( it.toBytes ) )) + + success encodableProof + +func toVerifiableProof*( + proof: CodexProof): ?!Poseidon2Proof = + + let + verifiableProof = Poseidon2Proof( + index: proof.index, + nleaves: proof.nleaves, + path: proof.path.mapIt( + ? Poseidon2Hash.fromBytes(it.toArray32).toFailure + )) + + success verifiableProof diff --git a/codex/storageproofs.nim b/codex/storageproofs.nim deleted file mode 100644 index aabca183..00000000 --- a/codex/storageproofs.nim +++ /dev/null @@ -1,6 +0,0 @@ -import ./storageproofs/por -import ./storageproofs/stpstore -import ./storageproofs/stpnetwork -import ./storageproofs/stpproto - -export por, stpstore, stpnetwork, stpproto diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index 5664da33..1d95b245 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -540,8 +540,13 @@ method close*(self: RepoStore): Future[void] {.async.} = ## For some implementations this may be a no-op ## - (await self.metaDs.close()).expect("Should meta datastore") - (await self.repoDs.close()).expect("Should repo datastore") + trace "Closing repostore" + + if not self.metaDs.isNil: + (await self.metaDs.close()).expect("Should meta datastore") + + if not self.repoDs.isNil: + (await self.repoDs.close()).expect("Should repo datastore") proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = ## Reserve bytes @@ -650,8 +655,7 @@ proc stop*(self: RepoStore): Future[void] {.async.} = return trace "Stopping repo" - (await self.repoDs.close()).expect("Should close repo store!") - (await self.metaDs.close()).expect("Should close meta store!") + await self.close() self.started = false diff --git a/tests/codex/node/helpers.nim b/tests/codex/node/helpers.nim new file mode 100644 index 00000000..0501de18 --- /dev/null +++ b/tests/codex/node/helpers.nim @@ -0,0 +1,101 @@ + +import std/times + +import pkg/libp2p +import pkg/chronos +import pkg/asynctest + +import pkg/codex/codextypes +import pkg/codex/chunker + +proc toTimesDuration*(d: chronos.Duration): times.Duration = + initDuration(seconds = d.seconds) + +proc drain*( + stream: LPStream | Result[lpstream.LPStream, ref CatchableError]): + Future[seq[byte]] {.async.} = + + let + stream = + when typeof(stream) is Result[lpstream.LPStream, ref CatchableError]: + stream.tryGet() + else: + stream + + defer: + await stream.close() + + var data: seq[byte] + while not stream.atEof: + var + buf = newSeq[byte](DefaultBlockSize.int) + res = await stream.readOnce(addr buf[0], DefaultBlockSize.int) + check res <= DefaultBlockSize.int + buf.setLen(res) + data &= buf + + data + +proc pipeChunker*(stream: BufferStream, chunker: Chunker) {.async.} = + try: + while ( + let chunk = await chunker.getBytes(); + chunk.len > 0): + await stream.pushData(chunk) + finally: + await stream.pushEof() + await stream.close() + +template setupAndTearDown*() {.dirty.} = + var + file: File + chunker: Chunker + switch: Switch + wallet: WalletRef + network: BlockExcNetwork + clock: Clock + localStore: RepoStore + localStoreRepoDs: DataStore + localStoreMetaDs: DataStore + engine: BlockExcEngine + store: NetworkStore + node: CodexNodeRef + blockDiscovery: Discovery + peerStore: PeerCtxStore + pendingBlocks: PendingBlocksManager + discovery: DiscoveryEngine + erasure: Erasure + + let + path = currentSourcePath().parentDir + + setup: + file = open(path /../ "" /../ "fixtures" / "test.jpg") + chunker = FileChunker.new(file = file, chunkSize = DefaultBlockSize) + switch = newStandardSwitch() + wallet = WalletRef.new(EthPrivateKey.random()) + network = BlockExcNetwork.new(switch) + + clock = SystemClock.new() + localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet() + localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet() + localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock = clock) + await localStore.start() + + blockDiscovery = Discovery.new( + switch.peerInfo.privateKey, + announceAddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/0") + .expect("Should return multiaddress")]) + peerStore = PeerCtxStore.new() + pendingBlocks = PendingBlocksManager.new() + discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) + engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks) + store = NetworkStore.new(engine, localStore) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) + node = CodexNodeRef.new(switch, store, engine, erasure, blockDiscovery) + + await node.start() + + teardown: + close(file) + await node.stop() diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim new file mode 100644 index 00000000..460eef62 --- /dev/null +++ b/tests/codex/node/testcontracts.nim @@ -0,0 +1,138 @@ +import std/os +import std/options +import std/math +import std/times +import std/sequtils +import std/importutils + +import pkg/asynctest +import pkg/chronos +import pkg/chronicles +import pkg/stew/byteutils +import pkg/datastore +import pkg/questionable +import pkg/questionable/results +import pkg/stint +import pkg/poseidon2 +import pkg/poseidon2/io + +import pkg/nitro +import pkg/codexdht/discv5/protocol as discv5 + +import pkg/codex/stores +import pkg/codex/clock +import pkg/codex/contracts +import pkg/codex/systemclock +import pkg/codex/blockexchange +import pkg/codex/chunker +import pkg/codex/slots +import pkg/codex/manifest +import pkg/codex/discovery +import pkg/codex/erasure +import pkg/codex/merkletree +import pkg/codex/blocktype as bt +import pkg/codex/utils/asynciter + +import pkg/codex/node {.all.} + +import ../../examples +import ../helpers +import ../helpers/mockmarket +import ../helpers/mockclock + +import ./helpers + +privateAccess(CodexNodeRef) # enable access to private fields + +asyncchecksuite "Test Node - Host contracts": + setupAndTearDown() + + var + sales: Sales + purchasing: Purchasing + manifest: Manifest + manifestCidStr: string + manifestCid: Cid + market: MockMarket + builder: SlotsBuilder + verifiable: Manifest + verifiableBlock: bt.Block + protected: Manifest + + setup: + # Setup Host Contracts and dependencies + market = MockMarket.new() + sales = Sales.new(market, clock, localStore) + + node.contracts = ( + none ClientInteractions, + some HostInteractions.new(clock, sales), + none ValidatorInteractions) + + await node.start() + + # Populate manifest in local store + manifest = await storeDataGetManifest(localStore, chunker) + let + manifestBlock = bt.Block.new( + manifest.encode().tryGet(), + codec = ManifestCodec).tryGet() + + manifestCid = manifestBlock.cid + manifestCidStr = $(manifestCid) + + (await localStore.putBlock(manifestBlock)).tryGet() + + protected = (await erasure.encode(manifest, 3, 2)).tryGet() + builder = SlotsBuilder.new(localStore, protected).tryGet() + verifiable = (await builder.buildManifest()).tryGet() + verifiableBlock = bt.Block.new( + verifiable.encode().tryGet(), + codec = ManifestCodec).tryGet() + + (await localStore.putBlock(verifiableBlock)).tryGet() + + test "onExpiryUpdate callback is set": + check sales.onExpiryUpdate.isSome + + test "onExpiryUpdate callback": + let + # The blocks have set default TTL, so in order to update it we have to have larger TTL + expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123 + expiryUpdateCallback = !sales.onExpiryUpdate + + (await expiryUpdateCallback(manifestCidStr, expectedExpiry)).tryGet() + + for index in 0.. 0 and blocks.len <= batchSize + return success() + )).tryGet() + + test "Store and retrieve Data Stream": + let + stream = BufferStream.new() + storeFut = node.store(stream) + oddChunkSize = math.trunc(DefaultBlockSize.float / 3.14).NBytes # Let's check that node.store can correctly rechunk these odd chunks + oddChunker = FileChunker.new(file = file, chunkSize = oddChunkSize, pad = false) # TODO: doesn't work with pad=tue + + var + original: seq[byte] + + try: + while ( + let chunk = await oddChunker.getBytes(); + chunk.len > 0): + original &= chunk + await stream.pushData(chunk) + finally: + await stream.pushEof() + await stream.close() + + let + manifestCid = (await storeFut).tryGet() + manifestBlock = (await localStore.getBlock(manifestCid)).tryGet() + localManifest = Manifest.decode(manifestBlock).tryGet() + data = await (await node.retrieve(manifestCid)).drain() + + check: + data.len == localManifest.datasetSize.int + data.len == original.len + sha256.digest(data) == sha256.digest(original) + + test "Retrieve One Block": + let + testString = "Block 1" + blk = bt.Block.new(testString.toBytes).tryGet() + + (await localStore.putBlock(blk)).tryGet() + let stream = (await node.retrieve(blk.cid)).tryGet() + defer: await stream.close() + + var data = newSeq[byte](testString.len) + await stream.readExactly(addr data[0], data.len) + check string.fromBytes(data) == testString + + test "Setup purchase request": + let + manifest = await storeDataGetManifest(localStore, chunker) + manifestBlock = bt.Block.new( + manifest.encode().tryGet(), + codec = ManifestCodec).tryGet() + + protected = (await erasure.encode(manifest, 3, 2)).tryGet() + builder = SlotsBuilder.new(localStore, protected).tryGet() + verifiable = (await builder.buildManifest()).tryGet() + verifiableBlock = bt.Block.new( + verifiable.encode().tryGet(), + codec = ManifestCodec).tryGet() + + (await localStore.putBlock(manifestBlock)).tryGet() + + let + request = (await node.setupRequest( + cid = manifestBlock.cid, + nodes = 5, + tolerance = 2, + duration = 100.u256, + reward = 2.u256, + proofProbability = 3.u256, + expiry = 200.u256, + collateral = 200.u256)).tryGet + + check: + (await verifiableBlock.cid in localStore) == true + request.content.cid == $verifiableBlock.cid + request.content.merkleRoot == builder.verifyRoot.get.toBytes diff --git a/tests/codex/testindexingstrategy.nim b/tests/codex/testindexingstrategy.nim index 7a10c6b1..676ede1f 100644 --- a/tests/codex/testindexingstrategy.nim +++ b/tests/codex/testindexingstrategy.nim @@ -2,6 +2,8 @@ import std/sequtils import pkg/chronos import pkg/asynctest +import pkg/codex/utils/asynciter + import ./helpers import pkg/codex/indexingstrategy @@ -17,15 +19,15 @@ for offset in @[0, 1, 2, 100]: test "linear": check: - linear.getIndicies(0) == @[0, 1, 2, 3, 4].mapIt(it + offset) - linear.getIndicies(1) == @[5, 6, 7, 8, 9].mapIt(it + offset) - linear.getIndicies(2) == @[10, 11, 12].mapIt(it + offset) + toSeq(linear.getIndicies(0)) == @[0, 1, 2, 3, 4].mapIt(it + offset) + toSeq(linear.getIndicies(1)) == @[5, 6, 7, 8, 9].mapIt(it + offset) + toSeq(linear.getIndicies(2)) == @[10, 11, 12].mapIt(it + offset) test "stepped": check: - stepped.getIndicies(0) == @[0, 3, 6, 9, 12].mapIt(it + offset) - stepped.getIndicies(1) == @[1, 4, 7, 10].mapIt(it + offset) - stepped.getIndicies(2) == @[2, 5, 8, 11].mapIt(it + offset) + toSeq(stepped.getIndicies(0)) == @[0, 3, 6, 9, 12].mapIt(it + offset) + toSeq(stepped.getIndicies(1)) == @[1, 4, 7, 10].mapIt(it + offset) + toSeq(stepped.getIndicies(2)) == @[2, 5, 8, 11].mapIt(it + offset) suite "Indexing strategies": let @@ -37,29 +39,29 @@ suite "Indexing strategies": l = LinearIndexingStrategy.new(0, 0, 1) s = SteppedIndexingStrategy.new(0, 0, 1) check: - l.getIndicies(0) == @[0] - s.getIndicies(0) == @[0] + toSeq(l.getIndicies(0)) == @[0] + toSeq(s.getIndicies(0)) == @[0] test "smallest range 1": let l = LinearIndexingStrategy.new(0, 1, 1) s = SteppedIndexingStrategy.new(0, 1, 1) check: - l.getIndicies(0) == @[0, 1] - s.getIndicies(0) == @[0, 1] + toSeq(l.getIndicies(0)) == @[0, 1] + toSeq(s.getIndicies(0)) == @[0, 1] test "first index must be smaller than last index": - expect AssertionDefect: + expect IndexingWrongIndexError: discard LinearIndexingStrategy.new(10, 0, 1) test "numberOfIterations must be greater than zero": - expect AssertionDefect: + expect IndexingWrongIterationsError: discard LinearIndexingStrategy.new(0, 10, 0) test "linear - oob": - expect AssertionDefect: + expect IndexingError: discard linear.getIndicies(3) test "stepped - oob": - expect AssertionDefect: + expect IndexingError: discard stepped.getIndicies(3) diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 1ac99864..0ee4fc50 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -1,321 +1,4 @@ -import std/os -import std/options -import std/math -import std/times -import std/sequtils -import std/importutils +import ./node/testnode +import ./node/testcontracts -import pkg/asynctest -import pkg/chronos -import pkg/chronicles -import pkg/stew/byteutils -import pkg/datastore -import pkg/questionable -import pkg/questionable/results -import pkg/stint -import pkg/poseidon2 -import pkg/poseidon2/io - -import pkg/nitro -import pkg/codexdht/discv5/protocol as discv5 - -import pkg/codex/stores -import pkg/codex/clock -import pkg/codex/contracts -import pkg/codex/systemclock -import pkg/codex/blockexchange -import pkg/codex/chunker -import pkg/codex/slots -import pkg/codex/manifest -import pkg/codex/discovery -import pkg/codex/erasure -import pkg/codex/merkletree -import pkg/codex/blocktype as bt - -import pkg/codex/node {.all.} - -import ../examples -import ./helpers -import ./helpers/mockmarket -import ./helpers/mockclock - -privateAccess(CodexNodeRef) # enable access to private fields - -proc toTimesDuration(d: chronos.Duration): times.Duration = - initDuration(seconds = d.seconds) - -proc drain( - stream: LPStream | Result[lpstream.LPStream, ref CatchableError]): - Future[seq[byte]] {.async.} = - - let - stream = - when typeof(stream) is Result[lpstream.LPStream, ref CatchableError]: - stream.tryGet() - else: - stream - - defer: - await stream.close() - - var data: seq[byte] - while not stream.atEof: - var - buf = newSeq[byte](DefaultBlockSize.int) - res = await stream.readOnce(addr buf[0], DefaultBlockSize.int) - check res <= DefaultBlockSize.int - buf.setLen(res) - data &= buf - - data - -proc pipeChunker(stream: BufferStream, chunker: Chunker) {.async.} = - try: - while ( - let chunk = await chunker.getBytes(); - chunk.len > 0): - await stream.pushData(chunk) - finally: - await stream.pushEof() - await stream.close() - -template setupAndTearDown() {.dirty.} = - var - file: File - chunker: Chunker - switch: Switch - wallet: WalletRef - network: BlockExcNetwork - clock: Clock - localStore: RepoStore - localStoreRepoDs: DataStore - localStoreMetaDs: DataStore - engine: BlockExcEngine - store: NetworkStore - node: CodexNodeRef - blockDiscovery: Discovery - peerStore: PeerCtxStore - pendingBlocks: PendingBlocksManager - discovery: DiscoveryEngine - erasure: Erasure - - let - path = currentSourcePath().parentDir - - setup: - file = open(path /../ "fixtures" / "test.jpg") - chunker = FileChunker.new(file = file, chunkSize = DefaultBlockSize) - switch = newStandardSwitch() - wallet = WalletRef.new(EthPrivateKey.random()) - network = BlockExcNetwork.new(switch) - - clock = SystemClock.new() - localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet() - localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet() - localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock = clock) - await localStore.start() - - blockDiscovery = Discovery.new( - switch.peerInfo.privateKey, - announceAddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/0") - .expect("Should return multiaddress")]) - peerStore = PeerCtxStore.new() - pendingBlocks = PendingBlocksManager.new() - discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) - engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks) - store = NetworkStore.new(engine, localStore) - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) - node = CodexNodeRef.new(switch, store, engine, erasure, blockDiscovery) - - await node.start() - - teardown: - close(file) - await node.stop() - -asyncchecksuite "Test Node - Basic": - setupAndTearDown() - - test "Fetch Manifest": - let - manifest = await storeDataGetManifest(localStore, chunker) - - manifestBlock = bt.Block.new( - manifest.encode().tryGet(), - codec = ManifestCodec).tryGet() - - (await localStore.putBlock(manifestBlock)).tryGet() - - let - fetched = (await node.fetchManifest(manifestBlock.cid)).tryGet() - - check: - fetched == manifest - - test "Block Batching": - let manifest = await storeDataGetManifest(localStore, chunker) - - for batchSize in 1..12: - (await node.fetchBatched( - manifest, - batchSize = batchSize, - proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, async.} = - check blocks.len > 0 and blocks.len <= batchSize - return success() - )).tryGet() - - test "Store and retrieve Data Stream": - let - stream = BufferStream.new() - storeFut = node.store(stream) - oddChunkSize = math.trunc(DefaultBlockSize.float / 3.14).NBytes # Let's check that node.store can correctly rechunk these odd chunks - oddChunker = FileChunker.new(file = file, chunkSize = oddChunkSize, pad = false) # TODO: doesn't work with pad=tue - - var - original: seq[byte] - - try: - while ( - let chunk = await oddChunker.getBytes(); - chunk.len > 0): - original &= chunk - await stream.pushData(chunk) - finally: - await stream.pushEof() - await stream.close() - - let - manifestCid = (await storeFut).tryGet() - manifestBlock = (await localStore.getBlock(manifestCid)).tryGet() - localManifest = Manifest.decode(manifestBlock).tryGet() - data = await (await node.retrieve(manifestCid)).drain() - - check: - data.len == localManifest.datasetSize.int - data.len == original.len - sha256.digest(data) == sha256.digest(original) - - test "Retrieve One Block": - let - testString = "Block 1" - blk = bt.Block.new(testString.toBytes).tryGet() - - (await localStore.putBlock(blk)).tryGet() - let stream = (await node.retrieve(blk.cid)).tryGet() - defer: await stream.close() - - var data = newSeq[byte](testString.len) - await stream.readExactly(addr data[0], data.len) - check string.fromBytes(data) == testString - - test "Setup purchase request": - let - manifest = await storeDataGetManifest(localStore, chunker) - manifestBlock = bt.Block.new( - manifest.encode().tryGet(), - codec = ManifestCodec).tryGet() - - protected = (await erasure.encode(manifest, 3, 2)).tryGet() - builder = SlotsBuilder.new(localStore, protected).tryGet() - verifiable = (await builder.buildManifest()).tryGet() - verifiableBlock = bt.Block.new( - verifiable.encode().tryGet(), - codec = ManifestCodec).tryGet() - - (await localStore.putBlock(manifestBlock)).tryGet() - - let - request = (await node.setupRequest( - cid = manifestBlock.cid, - nodes = 5, - tolerance = 2, - duration = 100.u256, - reward = 2.u256, - proofProbability = 3.u256, - expiry = 200.u256, - collateral = 200.u256)).tryGet - - check: - (await verifiableBlock.cid in localStore) == true - request.content.cid == $verifiableBlock.cid - request.content.merkleRoot == builder.verifyRoot.get.toBytes - -asyncchecksuite "Test Node - Host contracts": - setupAndTearDown() - - var - sales: Sales - purchasing: Purchasing - manifest: Manifest - manifestCidStr: string - manifestCid: Cid - market: MockMarket - - setup: - # Setup Host Contracts and dependencies - market = MockMarket.new() - sales = Sales.new(market, clock, localStore) - - node.contracts = ( - none ClientInteractions, - some HostInteractions.new(clock, sales), - none ValidatorInteractions) - - await node.start() - - # Populate manifest in local store - manifest = await storeDataGetManifest(localStore, chunker) - let - manifestBlock = bt.Block.new( - manifest.encode().tryGet(), - codec = ManifestCodec).tryGet() - - manifestCid = manifestBlock.cid - manifestCidStr = $(manifestCid) - - (await localStore.putBlock(manifestBlock)).tryGet() - - test "onExpiryUpdate callback is set": - check sales.onExpiryUpdate.isSome - - test "onExpiryUpdate callback": - let - # The blocks have set default TTL, so in order to update it we have to have larger TTL - expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123 - expiryUpdateCallback = !sales.onExpiryUpdate - - (await expiryUpdateCallback(manifestCidStr, expectedExpiry)).tryGet() - - for index in 0..