Contracts handler (#672)

* get rid of unneeded files

* don't reuse batch callback

* move out storage contract handlers

* wip

* add storage handler, to handle storage contracts interactions

* split out node tests

* a bit more cleanup

* remove storage handler, move back into node

* add missing raises:

* wip: add support for rebuilding slot

* split out tests

* wip

* rework indexing strategy to return an iterator

* convert to seq

* minor api change (meh, might revert)

* steping strategy should not die

* allow fetching batches for manifests and indicies

* restored expiry update

* restored expiry update functionality

* avoid closing datastores twice

* correct wrong rename

* fixes sigsegv
This commit is contained in:
Dmitriy Ryajov 2024-01-15 10:45:04 -06:00 committed by GitHub
parent 9b9cf9c450
commit 2fc7c75fd2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 823 additions and 599 deletions

View File

@ -56,8 +56,7 @@ type
EthWallet = ethers.Wallet EthWallet = ethers.Wallet
proc bootstrapInteractions( proc bootstrapInteractions(
s: CodexServer s: CodexServer): Future[void] {.async.} =
): Future[void] {.async.} =
## bootstrap interactions and return contracts ## bootstrap interactions and return contracts
## using clients, hosts, validators pairings ## using clients, hosts, validators pairings
## ##
@ -190,10 +189,9 @@ proc stop*(s: CodexServer) {.async.} =
s.runHandle.complete() s.runHandle.complete()
proc new*( proc new*(
T: type CodexServer, T: type CodexServer,
config: CodexConf, config: CodexConf,
privateKey: CodexPrivateKey privateKey: CodexPrivateKey): CodexServer =
): CodexServer =
## create CodexServer including setting up datastore, repostore, etc ## create CodexServer including setting up datastore, repostore, etc
let let
switch = SwitchBuilder switch = SwitchBuilder
@ -279,5 +277,4 @@ proc new*(
codexNode: codexNode, codexNode: codexNode,
restServer: restServer, restServer: restServer,
repoStore: repoStore, repoStore: repoStore,
maintenance: maintenance maintenance: maintenance)
)

View File

@ -112,7 +112,8 @@ proc getPendingBlocks(
return await completedFut return await completedFut
else: else:
let (_, index) = await completedFut let (_, index) = await completedFut
raise newException(CatchableError, raise newException(
CatchableError,
"Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index) "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index)
Iter.new(genNext, isFinished) Iter.new(genNext, isFinished)
@ -134,7 +135,7 @@ proc prepareEncodingData(
lastIndex = params.rounded - 1, lastIndex = params.rounded - 1,
numberOfIterations = params.steps numberOfIterations = params.steps
) )
indicies = strategy.getIndicies(step) indicies = toSeq(strategy.getIndicies(step))
pendingBlocksIter = self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount)) pendingBlocksIter = self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount))
var resolved = 0 var resolved = 0
@ -183,7 +184,7 @@ proc prepareDecodingData(
lastIndex = encoded.blocksCount - 1, lastIndex = encoded.blocksCount - 1,
numberOfIterations = encoded.steps numberOfIterations = encoded.steps
) )
indicies = strategy.getIndicies(step) indicies = toSeq(strategy.getIndicies(step))
pendingBlocksIter = self.getPendingBlocks(encoded, indicies) pendingBlocksIter = self.getPendingBlocks(encoded, indicies)
var var

View File

@ -1,14 +1,20 @@
import std/sequtils import std/sequtils
import ./utils
# I'm choosing to use an assert here because: import ./errors
# 1. These are a programmer errors and *should not* happen during application runtime. import ./utils
# 2. Users don't have to deal with Result types. import ./utils/asynciter
{.push raises: [].}
type type
# Representing a strategy for grouping indices (of blocks usually) # Representing a strategy for grouping indices (of blocks usually)
# Given an interation-count as input, will produce a seq of # Given an interation-count as input, will produce a seq of
# selected indices. # selected indices.
IndexingError* = object of CodexError
IndexingWrongIndexError* = object of IndexingError
IndexingWrongIterationsError* = object of IndexingError
IndexingStrategy* = ref object of RootObj IndexingStrategy* = ref object of RootObj
firstIndex*: int # Lowest index that can be returned firstIndex*: int # Lowest index that can be returned
lastIndex*: int # Highest index that can be returned lastIndex*: int # Highest index that can be returned
@ -27,18 +33,71 @@ type
# 2 => 2, 5, 8 # 2 => 2, 5, 8
SteppedIndexingStrategy* = ref object of IndexingStrategy SteppedIndexingStrategy* = ref object of IndexingStrategy
proc assertIteration(self: IndexingStrategy, iteration: int): void = proc checkIteration(
self: IndexingStrategy,
iteration: int): void {.raises: [IndexingError].} =
if iteration >= self.numberOfIterations: if iteration >= self.numberOfIterations:
raiseAssert("Indexing iteration can't be greater than or equal to numberOfIterations.") raise newException(
IndexingError,
"Indexing iteration can't be greater than or equal to numberOfIterations.")
method getIndicies*(self: IndexingStrategy, iteration: int): seq[int] {.base.} = method getIndicies*(
self: IndexingStrategy,
iteration: int): Iter[int] {.base, raises: [IndexingError].} =
raiseAssert("Not implemented") raiseAssert("Not implemented")
proc new*(T: type IndexingStrategy, firstIndex, lastIndex, numberOfIterations: int): T = proc getIter(first, last, step: int): Iter[int] =
var
finish = false
cur = first
proc get(): int =
result = cur
cur += step
if cur > last:
finish = true
proc isFinished(): bool =
finish
Iter.new(get, isFinished)
method getIndicies*(
self: LinearIndexingStrategy,
iteration: int): Iter[int] {.raises: [IndexingError].} =
self.checkIteration(iteration)
let
first = self.firstIndex + iteration * (self.step + 1)
last = min(first + self.step, self.lastIndex)
getIter(first, last, 1)
method getIndicies*(
self: SteppedIndexingStrategy,
iteration: int): Iter[int] {.raises: [IndexingError].} =
self.checkIteration(iteration)
let
first = self.firstIndex + iteration
last = self.lastIndex
getIter(first, last, self.numberOfIterations)
proc new*(
T: type IndexingStrategy,
firstIndex, lastIndex, numberOfIterations: int): T {.raises: [IndexingError].} =
if firstIndex > lastIndex: if firstIndex > lastIndex:
raiseAssert("firstIndex (" & $firstIndex & ") can't be greater than lastIndex (" & $lastIndex & ")") raise newException(
IndexingWrongIndexError,
"firstIndex (" & $firstIndex & ") can't be greater than lastIndex (" & $lastIndex & ")")
if numberOfIterations <= 0: if numberOfIterations <= 0:
raiseAssert("numberOfIteration (" & $numberOfIterations & ") must be greater than zero.") raise newException(
IndexingWrongIterationsError,
"numberOfIteration (" & $numberOfIterations & ") must be greater than zero.")
T( T(
firstIndex: firstIndex, firstIndex: firstIndex,
@ -46,16 +105,3 @@ proc new*(T: type IndexingStrategy, firstIndex, lastIndex, numberOfIterations: i
numberOfIterations: numberOfIterations, numberOfIterations: numberOfIterations,
step: divUp((lastIndex - firstIndex), numberOfIterations) step: divUp((lastIndex - firstIndex), numberOfIterations)
) )
method getIndicies*(self: LinearIndexingStrategy, iteration: int): seq[int] =
self.assertIteration(iteration)
let
first = self.firstIndex + iteration * (self.step + 1)
last = min(first + self.step, self.lastIndex)
toSeq(countup(first, last, 1))
method getIndicies*(self: SteppedIndexingStrategy, iteration: int): seq[int] =
self.assertIteration(iteration)
toSeq(countup(self.firstIndex + iteration, self.lastIndex, self.numberOfIterations))

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
{.push raises: [].}
import std/options import std/options
import std/sequtils import std/sequtils
import std/strformat import std/strformat
@ -37,12 +39,8 @@ import ./streams
import ./erasure import ./erasure
import ./discovery import ./discovery
import ./contracts import ./contracts
import ./node/batch
import ./utils import ./utils
import ./errors import ./errors
import ./merkletree
export batch
logScope: logScope:
topics = "codex node" topics = "codex node"
@ -51,14 +49,12 @@ const
FetchBatch = 200 FetchBatch = 200
type type
CodexError = object of CatchableError
Contracts* = tuple Contracts* = tuple
client: ?ClientInteractions client: ?ClientInteractions
host: ?HostInteractions host: ?HostInteractions
validator: ?ValidatorInteractions validator: ?ValidatorInteractions
CodexNodeRef* = ref object CodexNode* = object
switch: Switch switch: Switch
networkId: PeerId networkId: PeerId
blockStore: BlockStore blockStore: BlockStore
@ -67,8 +63,12 @@ type
discovery: Discovery discovery: Discovery
contracts*: Contracts contracts*: Contracts
clock*: Clock clock*: Clock
storage*: Contracts
OnManifest* = proc(cid: Cid, manifest: Manifest): void {.gcsafe, closure.} CodexNodeRef* = ref CodexNode
OnManifest* = proc(cid: Cid, manifest: Manifest): void {.gcsafe, raises: [].}
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].}
func switch*(self: CodexNodeRef): Switch = func switch*(self: CodexNodeRef): Switch =
return self.switch return self.switch
@ -85,7 +85,9 @@ func erasure*(self: CodexNodeRef): Erasure =
func discovery*(self: CodexNodeRef): Discovery = func discovery*(self: CodexNodeRef): Discovery =
return self.discovery return self.discovery
proc storeManifest*(self: CodexNodeRef, manifest: Manifest): Future[?!bt.Block] {.async.} = proc storeManifest*(
self: CodexNodeRef,
manifest: Manifest): Future[?!bt.Block] {.async.} =
without encodedVerifiable =? manifest.encode(), err: without encodedVerifiable =? manifest.encode(), err:
trace "Unable to encode manifest" trace "Unable to encode manifest"
return failure(err) return failure(err)
@ -101,21 +103,21 @@ proc storeManifest*(self: CodexNodeRef, manifest: Manifest): Future[?!bt.Block]
success blk success blk
proc findPeer*( proc findPeer*(
node: CodexNodeRef, self: CodexNodeRef,
peerId: PeerId): Future[?PeerRecord] {.async.} = peerId: PeerId): Future[?PeerRecord] {.async.} =
## Find peer using the discovery service from the given CodexNode ## Find peer using the discovery service from the given CodexNode
## ##
return await node.discovery.findPeer(peerId) return await self.discovery.findPeer(peerId)
proc connect*( proc connect*(
node: CodexNodeRef, self: CodexNodeRef,
peerId: PeerId, peerId: PeerId,
addrs: seq[MultiAddress] addrs: seq[MultiAddress]
): Future[void] = ): Future[void] =
node.switch.connect(peerId, addrs) self.switch.connect(peerId, addrs)
proc fetchManifest*( proc fetchManifest*(
node: CodexNodeRef, self: CodexNodeRef,
cid: Cid): Future[?!Manifest] {.async.} = cid: Cid): Future[?!Manifest] {.async.} =
## Fetch and decode a manifest block ## Fetch and decode a manifest block
## ##
@ -125,7 +127,7 @@ proc fetchManifest*(
trace "Retrieving manifest for cid", cid trace "Retrieving manifest for cid", cid
without blk =? await node.blockStore.getBlock(BlockAddress.init(cid)), err: without blk =? await self.blockStore.getBlock(BlockAddress.init(cid)), err:
trace "Error retrieve manifest block", cid, err = err.msg trace "Error retrieve manifest block", cid, err = err.msg
return failure err return failure err
@ -139,15 +141,21 @@ proc fetchManifest*(
return manifest.success return manifest.success
proc updateExpiry*(node: CodexNodeRef, manifestCid: Cid, expiry: SecondsSince1970): Future[?!void] {.async.} = proc updateExpiry*(
without manifest =? await node.fetchManifest(manifestCid), error: self: CodexNodeRef,
manifestCid: Cid,
expiry: SecondsSince1970): Future[?!void] {.async.} =
without manifest =? await self.fetchManifest(manifestCid), error:
trace "Unable to fetch manifest for cid", manifestCid trace "Unable to fetch manifest for cid", manifestCid
return failure(error) return failure(error)
try: try:
let ensuringFutures = Iter.fromSlice(0..<manifest.blocksCount) let
.mapIt(node.blockStore.ensureExpiry( manifest.treeCid, it, expiry )) ensuringFutures = Iter
await allFuturesThrowing(ensuringFutures) .fromSlice(0..<manifest.blocksCount)
.mapIt(self.blockStore.ensureExpiry( manifest.treeCid, it, expiry ))
await allFuturesThrowing(ensuringFutures)
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
@ -156,21 +164,20 @@ proc updateExpiry*(node: CodexNodeRef, manifestCid: Cid, expiry: SecondsSince197
return success() return success()
proc fetchBatched*( proc fetchBatched*(
node: CodexNodeRef, self: CodexNodeRef,
manifest: Manifest, cid: Cid,
iter: Iter[int],
batchSize = FetchBatch, batchSize = FetchBatch,
onBatch: BatchProc = nil): Future[?!void] {.async, gcsafe.} = onBatch: BatchProc = nil): Future[?!void] {.async, gcsafe.} =
## Fetch manifest in batches of `batchSize` ## Fetch blocks in batches of `batchSize`
## ##
let batchCount = divUp(manifest.blocksCount, batchSize) let
iter = iter.map(
(i: int) => self.blockStore.getBlock(BlockAddress.init(cid, i))
)
trace "Fetching blocks in batches of", size = batchSize while not iter.finished:
let iter = Iter.fromSlice(0..<manifest.blocksCount)
.map((i: int) => node.blockStore.getBlock(BlockAddress.init(manifest.treeCid, i)))
for batchNum in 0..<batchCount:
let blocks = collect: let blocks = collect:
for i in 0..<batchSize: for i in 0..<batchSize:
if not iter.finished: if not iter.finished:
@ -179,29 +186,43 @@ proc fetchBatched*(
if blocksErr =? (await allFutureResult(blocks)).errorOption: if blocksErr =? (await allFutureResult(blocks)).errorOption:
return failure(blocksErr) return failure(blocksErr)
if not onBatch.isNil and batchErr =? (await onBatch(blocks.mapIt( it.read.get ))).errorOption: if not onBatch.isNil and
batchErr =? (await onBatch(blocks.mapIt( it.read.get ))).errorOption:
return failure(batchErr) return failure(batchErr)
return success() success()
proc fetchBatched*(
self: CodexNodeRef,
manifest: Manifest,
batchSize = FetchBatch,
onBatch: BatchProc = nil): Future[?!void] =
## Fetch manifest in batches of `batchSize`
##
trace "Fetching blocks in batches of", size = batchSize
let iter = Iter.fromSlice(0..<manifest.blocksCount)
self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch)
proc retrieve*( proc retrieve*(
node: CodexNodeRef, self: CodexNodeRef,
cid: Cid, cid: Cid,
local: bool = true): Future[?!LPStream] {.async.} = local: bool = true): Future[?!LPStream] {.async.} =
## Retrieve by Cid a single block or an entire dataset described by manifest ## Retrieve by Cid a single block or an entire dataset described by manifest
## ##
if local and not await (cid in node.blockStore): if local and not await (cid in self.blockStore):
return failure((ref BlockNotFoundError)(msg: "Block not found in local store")) return failure((ref BlockNotFoundError)(msg: "Block not found in local store"))
if manifest =? (await node.fetchManifest(cid)): if manifest =? (await self.fetchManifest(cid)):
trace "Retrieving blocks from manifest", cid trace "Retrieving blocks from manifest", cid
if manifest.protected: if manifest.protected:
# Retrieve, decode and save to the local store all EС groups # Retrieve, decode and save to the local store all EС groups
proc erasureJob(): Future[void] {.async.} = proc erasureJob(): Future[void] {.async.} =
try: try:
# Spawn an erasure decoding job # Spawn an erasure decoding job
without res =? (await node.erasure.decode(manifest)), error: without res =? (await self.erasure.decode(manifest)), error:
trace "Unable to erasure decode manifest", cid, exc = error.msg trace "Unable to erasure decode manifest", cid, exc = error.msg
except CatchableError as exc: except CatchableError as exc:
trace "Exception decoding manifest", cid, exc = exc.msg trace "Exception decoding manifest", cid, exc = exc.msg
@ -210,12 +231,12 @@ proc retrieve*(
# Retrieve all blocks of the dataset sequentially from the local store or network # Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", cid trace "Creating store stream for manifest", cid
LPStream(StoreStream.new(node.blockStore, manifest, pad = false)).success LPStream(StoreStream.new(self.blockStore, manifest, pad = false)).success
else: else:
let let
stream = BufferStream.new() stream = BufferStream.new()
without blk =? (await node.blockStore.getBlock(BlockAddress.init(cid))), err: without blk =? (await self.blockStore.getBlock(BlockAddress.init(cid))), err:
return failure(err) return failure(err)
proc streamOneBlock(): Future[void] {.async.} = proc streamOneBlock(): Future[void] {.async.} =
@ -310,14 +331,14 @@ proc store*(
return manifestBlk.cid.success return manifestBlk.cid.success
proc iterateManifests*(node: CodexNodeRef, onManifest: OnManifest) {.async.} = proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
without cids =? await node.blockStore.listBlocks(BlockType.Manifest): without cids =? await self.blockStore.listBlocks(BlockType.Manifest):
warn "Failed to listBlocks" warn "Failed to listBlocks"
return return
for c in cids: for c in cids:
if cid =? await c: if cid =? await c:
without blk =? await node.blockStore.getBlock(cid): without blk =? await self.blockStore.getBlock(cid):
warn "Failed to get manifest block by cid", cid warn "Failed to get manifest block by cid", cid
return return
@ -458,6 +479,179 @@ proc requestStorage*(
let purchase = await contracts.purchasing.purchase(request) let purchase = await contracts.purchasing.purchase(request)
success purchase.id success purchase.id
proc onStore(
self: CodexNodeRef,
request: StorageRequest,
slotIdx: UInt256,
blocksCb: BlocksCb): Future[?!void] {.async.} =
## store data in local storage
##
logScope:
cid = request.content.cid
slotIdx = slotIdx
trace "Received a request to store a slot!"
without cid =? Cid.init(request.content.cid):
trace "Unable to parse Cid", cid
let err = newException(CodexError, "Unable to parse Cid")
return failure(err)
without manifest =? (await self.fetchManifest(cid)), err:
trace "Unable to fetch manifest for cid", cid, err = err.msg
return failure(err)
without builder =? SlotsBuilder.new(self.blockStore, manifest), err:
trace "Unable to create slots builder", err = err.msg
return failure(err)
let
slotIdx = slotIdx.truncate(int)
expiry = request.expiry.toSecondsSince1970
if slotIdx > manifest.slotRoots.high:
trace "Slot index not in manifest", slotIdx
return failure(newException(CodexError, "Slot index not in manifest"))
proc updateExpiry(blocks: seq[bt.Block]): Future[?!void] {.async.} =
trace "Updating expiry for blocks", blocks = blocks.len
let ensureExpiryFutures = blocks.mapIt(self.blockStore.ensureExpiry(it.cid, expiry))
if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption:
return failure(updateExpiryErr)
echo "blocksCb.isNil: ", blocksCb.isNil
if not blocksCb.isNil and err =? (await blocksCb(blocks)).errorOption:
trace "Unable to process blocks", err = err.msg
return failure(err)
return success()
if blksIter =? builder.slotIndicies(slotIdx) and
err =? (await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry)).errorOption:
trace "Unable to fetch blocks", err = err.msg
return failure(err)
without slotRoot =? (await builder.buildSlot(slotIdx.Natural)), err:
trace "Unable to build slot", err = err.msg
return failure(err)
if cid =? slotRoot.toSlotCid() and cid != manifest.slotRoots[slotIdx.int]:
trace "Slot root mismatch", manifest = manifest.slotRoots[slotIdx.int], recovered = slotRoot.toSlotCid()
return failure(newException(CodexError, "Slot root mismatch"))
return success()
proc onExpiryUpdate(
self: CodexNodeRef,
rootCid: string,
expiry: SecondsSince1970): Future[?!void] {.async.} =
without cid =? Cid.init(rootCid):
trace "Unable to parse Cid", cid
let error = newException(CodexError, "Unable to parse Cid")
return failure(error)
return await self.updateExpiry(cid, expiry)
proc onClear(
self: CodexNodeRef,
request: StorageRequest,
slotIndex: UInt256) =
# TODO: remove data from local storage
discard
proc onProve(
self: CodexNodeRef,
slot: Slot,
challenge: ProofChallenge): Future[seq[byte]] {.async.} =
# TODO: generate proof
return @[42'u8]
proc start*(self: CodexNodeRef) {.async.} =
if not self.engine.isNil:
await self.engine.start()
if not self.erasure.isNil:
await self.erasure.start()
if not self.discovery.isNil:
await self.discovery.start()
if not self.clock.isNil:
await self.clock.start()
if hostContracts =? self.contracts.host:
hostContracts.sales.onStore =
proc(
request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] = self.onStore(request, slot, onBatch)
hostContracts.sales.onExpiryUpdate =
proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] =
self.onExpiryUpdate(rootCid, expiry)
hostContracts.sales.onClear =
proc(request: StorageRequest, slotIndex: UInt256) =
# TODO: remove data from local storage
self.onClear(request, slotIndex)
hostContracts.sales.onProve =
proc(slot: Slot, challenge: ProofChallenge): Future[seq[byte]] =
# TODO: generate proof
self.onProve(slot, challenge)
try:
await hostContracts.start()
except CatchableError as error:
error "Unable to start host contract interactions", error=error.msg
self.contracts.host = HostInteractions.none
if clientContracts =? self.contracts.client:
try:
await clientContracts.start()
except CatchableError as error:
error "Unable to start client contract interactions: ", error=error.msg
self.contracts.client = ClientInteractions.none
if validatorContracts =? self.contracts.validator:
try:
await validatorContracts.start()
except CatchableError as error:
error "Unable to start validator contract interactions: ", error=error.msg
self.contracts.validator = ValidatorInteractions.none
self.networkId = self.switch.peerInfo.peerId
notice "Started codex node", id = $self.networkId, addrs = self.switch.peerInfo.addrs
proc stop*(self: CodexNodeRef) {.async.} =
trace "Stopping node"
if not self.engine.isNil:
await self.engine.stop()
if not self.erasure.isNil:
await self.erasure.stop()
if not self.discovery.isNil:
await self.discovery.stop()
if not self.clock.isNil:
await self.clock.stop()
if clientContracts =? self.contracts.client:
await clientContracts.stop()
if hostContracts =? self.contracts.host:
await hostContracts.stop()
if validatorContracts =? self.contracts.validator:
await validatorContracts.stop()
if not self.blockStore.isNil:
await self.blockStore.close
proc new*( proc new*(
T: type CodexNodeRef, T: type CodexNodeRef,
switch: Switch, switch: Switch,
@ -466,8 +660,9 @@ proc new*(
erasure: Erasure, erasure: Erasure,
discovery: Discovery, discovery: Discovery,
contracts = Contracts.default): CodexNodeRef = contracts = Contracts.default): CodexNodeRef =
## Create new instance of a Codex node, call `start` to run it ## Create new instance of a Codex self, call `start` to run it
## ##
CodexNodeRef( CodexNodeRef(
switch: switch, switch: switch,
blockStore: store, blockStore: store,
@ -475,119 +670,3 @@ proc new*(
erasure: erasure, erasure: erasure,
discovery: discovery, discovery: discovery,
contracts: contracts) contracts: contracts)
proc start*(node: CodexNodeRef) {.async.} =
if not node.engine.isNil:
await node.engine.start()
if not node.erasure.isNil:
await node.erasure.start()
if not node.discovery.isNil:
await node.discovery.start()
if not node.clock.isNil:
await node.clock.start()
if hostContracts =? node.contracts.host:
# TODO: remove Sales callbacks, pass BlockStore and StorageProofs instead
hostContracts.sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
## store data in local storage
##
without cid =? Cid.init(request.content.cid):
trace "Unable to parse Cid", cid
let error = newException(CodexError, "Unable to parse Cid")
return failure(error)
without manifest =? await node.fetchManifest(cid), error:
trace "Unable to fetch manifest for cid", cid
return failure(error)
trace "Fetching block for manifest", cid
let expiry = request.expiry.toSecondsSince1970
proc expiryUpdateOnBatch(blocks: seq[bt.Block]): Future[?!void] {.async.} =
let ensureExpiryFutures = blocks.mapIt(node.blockStore.ensureExpiry(it.cid, expiry))
if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption:
return failure(updateExpiryErr)
if not onBatch.isNil and onBatchErr =? (await onBatch(blocks)).errorOption:
return failure(onBatchErr)
return success()
if fetchErr =? (await node.fetchBatched(manifest, onBatch = expiryUpdateOnBatch)).errorOption:
let error = newException(CodexError, "Unable to retrieve blocks")
error.parent = fetchErr
return failure(error)
return success()
hostContracts.sales.onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} =
without cid =? Cid.init(rootCid):
trace "Unable to parse Cid", cid
let error = newException(CodexError, "Unable to parse Cid")
return failure(error)
return await node.updateExpiry(cid, expiry)
hostContracts.sales.onClear = proc(request: StorageRequest,
slotIndex: UInt256) =
# TODO: remove data from local storage
discard
hostContracts.sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[seq[byte]] {.async.} =
# TODO: generate proof
return @[42'u8]
try:
await hostContracts.start()
except CatchableError as error:
error "Unable to start host contract interactions", error=error.msg
node.contracts.host = HostInteractions.none
if clientContracts =? node.contracts.client:
try:
await clientContracts.start()
except CatchableError as error:
error "Unable to start client contract interactions: ", error=error.msg
node.contracts.client = ClientInteractions.none
if validatorContracts =? node.contracts.validator:
try:
await validatorContracts.start()
except CatchableError as error:
error "Unable to start validator contract interactions: ", error=error.msg
node.contracts.validator = ValidatorInteractions.none
node.networkId = node.switch.peerInfo.peerId
notice "Started codex node", id = $node.networkId, addrs = node.switch.peerInfo.addrs
proc stop*(node: CodexNodeRef) {.async.} =
trace "Stopping node"
if not node.engine.isNil:
await node.engine.stop()
if not node.erasure.isNil:
await node.erasure.stop()
if not node.discovery.isNil:
await node.discovery.stop()
if not node.clock.isNil:
await node.clock.stop()
if clientContracts =? node.contracts.client:
await clientContracts.stop()
if hostContracts =? node.contracts.host:
await hostContracts.stop()
if validatorContracts =? node.contracts.validator:
await validatorContracts.stop()
if not node.blockStore.isNil:
await node.blockStore.close

View File

@ -1,7 +0,0 @@
import pkg/chronos
import pkg/questionable/results
import pkg/upraises
import ../blocktype as bt
type
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, upraises:[].}

View File

@ -40,6 +40,7 @@ import ./utils/trackedfutures
export stint export stint
export reservations export reservations
export salesagent export salesagent
export salescontext
logScope: logScope:
topics = "sales marketplace" topics = "sales marketplace"

View File

@ -1,11 +1,12 @@
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/upraises import pkg/upraises
import ../node/batch
import ../market import ../market
import ../clock import ../clock
import ./slotqueue import ./slotqueue
import ./reservations import ./reservations
import ../blocktype as bt
type type
SalesContext* = ref object SalesContext* = ref object
@ -22,9 +23,10 @@ type
slotQueue*: SlotQueue slotQueue*: SlotQueue
simulateProofFailures*: int simulateProofFailures*: int
BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].}
OnStore* = proc(request: StorageRequest, OnStore* = proc(request: StorageRequest,
slot: UInt256, slot: UInt256,
onBatch: BatchProc): Future[?!void] {.gcsafe, upraises: [].} blocksCb: BlocksCb): Future[?!void] {.gcsafe, upraises: [].}
OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[seq[byte]] {.gcsafe, upraises: [].} OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[seq[byte]] {.gcsafe, upraises: [].}
OnExpiryUpdate* = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.gcsafe, upraises: [].} OnExpiryUpdate* = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.gcsafe, upraises: [].}
OnClear* = proc(request: StorageRequest, OnClear* = proc(request: StorageRequest,

View File

@ -51,7 +51,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
reservationId = reservation.id reservationId = reservation.id
availabilityId = reservation.availabilityId availabilityId = reservation.availabilityId
proc onBatch(blocks: seq[bt.Block]): Future[?!void] {.async.} = proc onBlocks(blocks: seq[bt.Block]): Future[?!void] {.async.} =
# release batches of blocks as they are written to disk and # release batches of blocks as they are written to disk and
# update availability size # update availability size
var bytes: uint = 0 var bytes: uint = 0
@ -66,7 +66,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
trace "Starting download" trace "Starting download"
if err =? (await onStore(request, if err =? (await onStore(request,
data.slotIndex, data.slotIndex,
onBatch)).errorOption: onBlocks)).errorOption:
return some State(SaleErrored(error: err)) return some State(SaleErrored(error: err))
trace "Download complete" trace "Download complete"

View File

@ -26,9 +26,14 @@ import ../merkletree
import ../stores import ../stores
import ../manifest import ../manifest
import ../utils import ../utils
import ../utils/asynciter
import ../utils/digest import ../utils/digest
import ../utils/poseidon2digest import ../utils/poseidon2digest
import ./converters
export converters
const const
# TODO: Unified with the CellSize specified in branch "data-sampler" # TODO: Unified with the CellSize specified in branch "data-sampler"
# in the proving circuit. # in the proving circuit.
@ -116,54 +121,12 @@ func numBlockCells*(self: SlotsBuilder): Natural =
self.manifest.blockSize.int div self.cellSize self.manifest.blockSize.int div self.cellSize
func toCellCid*(cell: Poseidon2Hash): ?!Cid = func slotIndicies*(self: SlotsBuilder, slot: Natural): ?!Iter[int] =
let ## Returns the slot indices.
cellMhash = ? MultiHash.init(Pos2Bn128MrklCodec, cell.toBytes).mapFailure ## TODO: should return an iterator
cellCid = ? Cid.init(CIDv1, CodexSlotCellCodec, cellMhash).mapFailure ##
success cellCid self.strategy.getIndicies(slot).catch
func toSlotCid*(root: Poseidon2Hash): ?!Cid =
let
mhash = ? MultiHash.init($multiCodec("identity"), root.toBytes).mapFailure
treeCid = ? Cid.init(CIDv1, SlotRootCodec, mhash).mapFailure
success treeCid
func toSlotCids*(slotRoots: openArray[Poseidon2Hash]): ?!seq[Cid] =
success slotRoots.mapIt( ? it.toSlotCid )
func toSlotsRootsCid*(root: Poseidon2Hash): ?!Cid =
let
mhash = ? MultiHash.init($multiCodec("identity"), root.toBytes).mapFailure
treeCid = ? Cid.init(CIDv1, SlotProvingRootCodec, mhash).mapFailure
success treeCid
func toEncodableProof*(
proof: Poseidon2Proof): ?!CodexProof =
let
encodableProof = CodexProof(
mcodec: multiCodec("identity"), # copy bytes as is
index: proof.index,
nleaves: proof.nleaves,
path: proof.path.mapIt( @(it.toBytes) ))
success encodableProof
func toVerifiableProof*(
proof: CodexProof): ?!Poseidon2Proof =
let
verifiableProof = Poseidon2Proof(
index: proof.index,
nleaves: proof.nleaves,
path: proof.path.mapIt(
? Poseidon2Hash.fromBytes(it.toArray32).toFailure
))
success verifiableProof
proc getCellHashes*( proc getCellHashes*(
self: SlotsBuilder, self: SlotsBuilder,
@ -212,7 +175,7 @@ proc buildSlotTree*(
proc buildSlot*( proc buildSlot*(
self: SlotsBuilder, self: SlotsBuilder,
slotIndex: int): Future[?!Poseidon2Hash] {.async.} = slotIndex: Natural): Future[?!Poseidon2Hash] {.async.} =
## Build a slot tree and store it in the block store. ## Build a slot tree and store it in the block store.
## ##
@ -313,8 +276,8 @@ proc new*(
let let
strategy = if strategy.isNone: strategy = if strategy.isNone:
SteppedIndexingStrategy.new( ? SteppedIndexingStrategy.new(
0, manifest.blocksCount - 1, manifest.numSlots) 0, manifest.blocksCount - 1, manifest.numSlots).catch
else: else:
strategy.get strategy.get
@ -338,7 +301,7 @@ proc new*(
if manifest.verifiable: if manifest.verifiable:
if manifest.slotRoots.len == 0 or manifest.slotRoots.len != manifest.numSlots: if manifest.slotRoots.len == 0 or manifest.slotRoots.len != manifest.numSlots:
return failure "Manifest is verifiable but slot r" return failure "Manifest is verifiable but slot roots are missing or invalid."
let let
slotRoot = ? Poseidon2Hash.fromBytes( slotRoot = ? Poseidon2Hash.fromBytes(

View File

@ -0,0 +1,69 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/poseidon2
import pkg/poseidon2/io
import ../codextypes
import ../merkletree
import ../errors
func toCellCid*(cell: Poseidon2Hash): ?!Cid =
let
cellMhash = ? MultiHash.init(Pos2Bn128MrklCodec, cell.toBytes).mapFailure
cellCid = ? Cid.init(CIDv1, CodexSlotCellCodec, cellMhash).mapFailure
success cellCid
func toSlotCid*(root: Poseidon2Hash): ?!Cid =
let
mhash = ? MultiHash.init($multiCodec("identity"), root.toBytes).mapFailure
treeCid = ? Cid.init(CIDv1, SlotRootCodec, mhash).mapFailure
success treeCid
func toSlotCids*(slotRoots: openArray[Poseidon2Hash]): ?!seq[Cid] =
success slotRoots.mapIt( ? it.toSlotCid )
func toSlotsRootsCid*(root: Poseidon2Hash): ?!Cid =
let
mhash = ? MultiHash.init($multiCodec("identity"), root.toBytes).mapFailure
treeCid = ? Cid.init(CIDv1, SlotProvingRootCodec, mhash).mapFailure
success treeCid
func toEncodableProof*(
proof: Poseidon2Proof): ?!CodexProof =
let
encodableProof = CodexProof(
mcodec: multiCodec("identity"), # copy bytes as is
index: proof.index,
nleaves: proof.nleaves,
path: proof.path.mapIt( @( it.toBytes ) ))
success encodableProof
func toVerifiableProof*(
proof: CodexProof): ?!Poseidon2Proof =
let
verifiableProof = Poseidon2Proof(
index: proof.index,
nleaves: proof.nleaves,
path: proof.path.mapIt(
? Poseidon2Hash.fromBytes(it.toArray32).toFailure
))
success verifiableProof

View File

@ -1,6 +0,0 @@
import ./storageproofs/por
import ./storageproofs/stpstore
import ./storageproofs/stpnetwork
import ./storageproofs/stpproto
export por, stpstore, stpnetwork, stpproto

View File

@ -540,8 +540,13 @@ method close*(self: RepoStore): Future[void] {.async.} =
## For some implementations this may be a no-op ## For some implementations this may be a no-op
## ##
(await self.metaDs.close()).expect("Should meta datastore") trace "Closing repostore"
(await self.repoDs.close()).expect("Should repo datastore")
if not self.metaDs.isNil:
(await self.metaDs.close()).expect("Should meta datastore")
if not self.repoDs.isNil:
(await self.repoDs.close()).expect("Should repo datastore")
proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
## Reserve bytes ## Reserve bytes
@ -650,8 +655,7 @@ proc stop*(self: RepoStore): Future[void] {.async.} =
return return
trace "Stopping repo" trace "Stopping repo"
(await self.repoDs.close()).expect("Should close repo store!") await self.close()
(await self.metaDs.close()).expect("Should close meta store!")
self.started = false self.started = false

View File

@ -0,0 +1,101 @@
import std/times
import pkg/libp2p
import pkg/chronos
import pkg/asynctest
import pkg/codex/codextypes
import pkg/codex/chunker
proc toTimesDuration*(d: chronos.Duration): times.Duration =
initDuration(seconds = d.seconds)
proc drain*(
stream: LPStream | Result[lpstream.LPStream, ref CatchableError]):
Future[seq[byte]] {.async.} =
let
stream =
when typeof(stream) is Result[lpstream.LPStream, ref CatchableError]:
stream.tryGet()
else:
stream
defer:
await stream.close()
var data: seq[byte]
while not stream.atEof:
var
buf = newSeq[byte](DefaultBlockSize.int)
res = await stream.readOnce(addr buf[0], DefaultBlockSize.int)
check res <= DefaultBlockSize.int
buf.setLen(res)
data &= buf
data
proc pipeChunker*(stream: BufferStream, chunker: Chunker) {.async.} =
try:
while (
let chunk = await chunker.getBytes();
chunk.len > 0):
await stream.pushData(chunk)
finally:
await stream.pushEof()
await stream.close()
template setupAndTearDown*() {.dirty.} =
var
file: File
chunker: Chunker
switch: Switch
wallet: WalletRef
network: BlockExcNetwork
clock: Clock
localStore: RepoStore
localStoreRepoDs: DataStore
localStoreMetaDs: DataStore
engine: BlockExcEngine
store: NetworkStore
node: CodexNodeRef
blockDiscovery: Discovery
peerStore: PeerCtxStore
pendingBlocks: PendingBlocksManager
discovery: DiscoveryEngine
erasure: Erasure
let
path = currentSourcePath().parentDir
setup:
file = open(path /../ "" /../ "fixtures" / "test.jpg")
chunker = FileChunker.new(file = file, chunkSize = DefaultBlockSize)
switch = newStandardSwitch()
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
clock = SystemClock.new()
localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet()
localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet()
localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock = clock)
await localStore.start()
blockDiscovery = Discovery.new(
switch.peerInfo.privateKey,
announceAddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/0")
.expect("Should return multiaddress")])
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, localStore)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
node = CodexNodeRef.new(switch, store, engine, erasure, blockDiscovery)
await node.start()
teardown:
close(file)
await node.stop()

View File

@ -0,0 +1,138 @@
import std/os
import std/options
import std/math
import std/times
import std/sequtils
import std/importutils
import pkg/asynctest
import pkg/chronos
import pkg/chronicles
import pkg/stew/byteutils
import pkg/datastore
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/poseidon2
import pkg/poseidon2/io
import pkg/nitro
import pkg/codexdht/discv5/protocol as discv5
import pkg/codex/stores
import pkg/codex/clock
import pkg/codex/contracts
import pkg/codex/systemclock
import pkg/codex/blockexchange
import pkg/codex/chunker
import pkg/codex/slots
import pkg/codex/manifest
import pkg/codex/discovery
import pkg/codex/erasure
import pkg/codex/merkletree
import pkg/codex/blocktype as bt
import pkg/codex/utils/asynciter
import pkg/codex/node {.all.}
import ../../examples
import ../helpers
import ../helpers/mockmarket
import ../helpers/mockclock
import ./helpers
privateAccess(CodexNodeRef) # enable access to private fields
asyncchecksuite "Test Node - Host contracts":
setupAndTearDown()
var
sales: Sales
purchasing: Purchasing
manifest: Manifest
manifestCidStr: string
manifestCid: Cid
market: MockMarket
builder: SlotsBuilder
verifiable: Manifest
verifiableBlock: bt.Block
protected: Manifest
setup:
# Setup Host Contracts and dependencies
market = MockMarket.new()
sales = Sales.new(market, clock, localStore)
node.contracts = (
none ClientInteractions,
some HostInteractions.new(clock, sales),
none ValidatorInteractions)
await node.start()
# Populate manifest in local store
manifest = await storeDataGetManifest(localStore, chunker)
let
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = ManifestCodec).tryGet()
manifestCid = manifestBlock.cid
manifestCidStr = $(manifestCid)
(await localStore.putBlock(manifestBlock)).tryGet()
protected = (await erasure.encode(manifest, 3, 2)).tryGet()
builder = SlotsBuilder.new(localStore, protected).tryGet()
verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock = bt.Block.new(
verifiable.encode().tryGet(),
codec = ManifestCodec).tryGet()
(await localStore.putBlock(verifiableBlock)).tryGet()
test "onExpiryUpdate callback is set":
check sales.onExpiryUpdate.isSome
test "onExpiryUpdate callback":
let
# The blocks have set default TTL, so in order to update it we have to have larger TTL
expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123
expiryUpdateCallback = !sales.onExpiryUpdate
(await expiryUpdateCallback(manifestCidStr, expectedExpiry)).tryGet()
for index in 0..<manifest.blocksCount:
let
blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet
expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet
expiry = await localStoreMetaDs.get(expiryKey)
check (expiry.tryGet).toSecondsSince1970 == expectedExpiry
test "onStore callback is set":
check sales.onStore.isSome
test "onStore callback":
let onStore = !sales.onStore
var request = StorageRequest.example
request.content.cid = $verifiableBlock.cid
request.expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.u256
var fetchedBytes: uint = 0
let onBlocks = proc(blocks: seq[bt.Block]): Future[?!void] {.async.} =
for blk in blocks:
fetchedBytes += blk.data.len.uint
return success()
(await onStore(request, 1.u256, onBlocks)).tryGet()
check fetchedBytes == 786432
for index in !builder.slotIndicies(1):
let
blk = (await localStore.getBlock(verifiable.treeCid, index)).tryGet
expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet
expiry = await localStoreMetaDs.get(expiryKey)
check (expiry.tryGet).toSecondsSince1970 == request.expiry.toSecondsSince1970

View File

@ -0,0 +1,151 @@
import std/os
import std/options
import std/math
import std/times
import std/sequtils
import std/importutils
import pkg/asynctest
import pkg/chronos
import pkg/chronicles
import pkg/stew/byteutils
import pkg/datastore
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/poseidon2
import pkg/poseidon2/io
import pkg/nitro
import pkg/codexdht/discv5/protocol as discv5
import pkg/codex/stores
import pkg/codex/clock
import pkg/codex/contracts
import pkg/codex/systemclock
import pkg/codex/blockexchange
import pkg/codex/chunker
import pkg/codex/slots
import pkg/codex/manifest
import pkg/codex/discovery
import pkg/codex/erasure
import pkg/codex/merkletree
import pkg/codex/blocktype as bt
import pkg/codex/node {.all.}
import ../examples
import ../helpers
import ../helpers/mockmarket
import ../helpers/mockclock
import ./helpers
privateAccess(CodexNodeRef) # enable access to private fields
asyncchecksuite "Test Node - Basic":
setupAndTearDown()
test "Fetch Manifest":
let
manifest = await storeDataGetManifest(localStore, chunker)
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = ManifestCodec).tryGet()
(await localStore.putBlock(manifestBlock)).tryGet()
let
fetched = (await node.fetchManifest(manifestBlock.cid)).tryGet()
check:
fetched == manifest
test "Block Batching":
let manifest = await storeDataGetManifest(localStore, chunker)
for batchSize in 1..12:
(await node.fetchBatched(
manifest,
batchSize = batchSize,
proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, async.} =
check blocks.len > 0 and blocks.len <= batchSize
return success()
)).tryGet()
test "Store and retrieve Data Stream":
let
stream = BufferStream.new()
storeFut = node.store(stream)
oddChunkSize = math.trunc(DefaultBlockSize.float / 3.14).NBytes # Let's check that node.store can correctly rechunk these odd chunks
oddChunker = FileChunker.new(file = file, chunkSize = oddChunkSize, pad = false) # TODO: doesn't work with pad=tue
var
original: seq[byte]
try:
while (
let chunk = await oddChunker.getBytes();
chunk.len > 0):
original &= chunk
await stream.pushData(chunk)
finally:
await stream.pushEof()
await stream.close()
let
manifestCid = (await storeFut).tryGet()
manifestBlock = (await localStore.getBlock(manifestCid)).tryGet()
localManifest = Manifest.decode(manifestBlock).tryGet()
data = await (await node.retrieve(manifestCid)).drain()
check:
data.len == localManifest.datasetSize.int
data.len == original.len
sha256.digest(data) == sha256.digest(original)
test "Retrieve One Block":
let
testString = "Block 1"
blk = bt.Block.new(testString.toBytes).tryGet()
(await localStore.putBlock(blk)).tryGet()
let stream = (await node.retrieve(blk.cid)).tryGet()
defer: await stream.close()
var data = newSeq[byte](testString.len)
await stream.readExactly(addr data[0], data.len)
check string.fromBytes(data) == testString
test "Setup purchase request":
let
manifest = await storeDataGetManifest(localStore, chunker)
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = ManifestCodec).tryGet()
protected = (await erasure.encode(manifest, 3, 2)).tryGet()
builder = SlotsBuilder.new(localStore, protected).tryGet()
verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock = bt.Block.new(
verifiable.encode().tryGet(),
codec = ManifestCodec).tryGet()
(await localStore.putBlock(manifestBlock)).tryGet()
let
request = (await node.setupRequest(
cid = manifestBlock.cid,
nodes = 5,
tolerance = 2,
duration = 100.u256,
reward = 2.u256,
proofProbability = 3.u256,
expiry = 200.u256,
collateral = 200.u256)).tryGet
check:
(await verifiableBlock.cid in localStore) == true
request.content.cid == $verifiableBlock.cid
request.content.merkleRoot == builder.verifyRoot.get.toBytes

View File

@ -2,6 +2,8 @@ import std/sequtils
import pkg/chronos import pkg/chronos
import pkg/asynctest import pkg/asynctest
import pkg/codex/utils/asynciter
import ./helpers import ./helpers
import pkg/codex/indexingstrategy import pkg/codex/indexingstrategy
@ -17,15 +19,15 @@ for offset in @[0, 1, 2, 100]:
test "linear": test "linear":
check: check:
linear.getIndicies(0) == @[0, 1, 2, 3, 4].mapIt(it + offset) toSeq(linear.getIndicies(0)) == @[0, 1, 2, 3, 4].mapIt(it + offset)
linear.getIndicies(1) == @[5, 6, 7, 8, 9].mapIt(it + offset) toSeq(linear.getIndicies(1)) == @[5, 6, 7, 8, 9].mapIt(it + offset)
linear.getIndicies(2) == @[10, 11, 12].mapIt(it + offset) toSeq(linear.getIndicies(2)) == @[10, 11, 12].mapIt(it + offset)
test "stepped": test "stepped":
check: check:
stepped.getIndicies(0) == @[0, 3, 6, 9, 12].mapIt(it + offset) toSeq(stepped.getIndicies(0)) == @[0, 3, 6, 9, 12].mapIt(it + offset)
stepped.getIndicies(1) == @[1, 4, 7, 10].mapIt(it + offset) toSeq(stepped.getIndicies(1)) == @[1, 4, 7, 10].mapIt(it + offset)
stepped.getIndicies(2) == @[2, 5, 8, 11].mapIt(it + offset) toSeq(stepped.getIndicies(2)) == @[2, 5, 8, 11].mapIt(it + offset)
suite "Indexing strategies": suite "Indexing strategies":
let let
@ -37,29 +39,29 @@ suite "Indexing strategies":
l = LinearIndexingStrategy.new(0, 0, 1) l = LinearIndexingStrategy.new(0, 0, 1)
s = SteppedIndexingStrategy.new(0, 0, 1) s = SteppedIndexingStrategy.new(0, 0, 1)
check: check:
l.getIndicies(0) == @[0] toSeq(l.getIndicies(0)) == @[0]
s.getIndicies(0) == @[0] toSeq(s.getIndicies(0)) == @[0]
test "smallest range 1": test "smallest range 1":
let let
l = LinearIndexingStrategy.new(0, 1, 1) l = LinearIndexingStrategy.new(0, 1, 1)
s = SteppedIndexingStrategy.new(0, 1, 1) s = SteppedIndexingStrategy.new(0, 1, 1)
check: check:
l.getIndicies(0) == @[0, 1] toSeq(l.getIndicies(0)) == @[0, 1]
s.getIndicies(0) == @[0, 1] toSeq(s.getIndicies(0)) == @[0, 1]
test "first index must be smaller than last index": test "first index must be smaller than last index":
expect AssertionDefect: expect IndexingWrongIndexError:
discard LinearIndexingStrategy.new(10, 0, 1) discard LinearIndexingStrategy.new(10, 0, 1)
test "numberOfIterations must be greater than zero": test "numberOfIterations must be greater than zero":
expect AssertionDefect: expect IndexingWrongIterationsError:
discard LinearIndexingStrategy.new(0, 10, 0) discard LinearIndexingStrategy.new(0, 10, 0)
test "linear - oob": test "linear - oob":
expect AssertionDefect: expect IndexingError:
discard linear.getIndicies(3) discard linear.getIndicies(3)
test "stepped - oob": test "stepped - oob":
expect AssertionDefect: expect IndexingError:
discard stepped.getIndicies(3) discard stepped.getIndicies(3)

View File

@ -1,321 +1,4 @@
import std/os import ./node/testnode
import std/options import ./node/testcontracts
import std/math
import std/times
import std/sequtils
import std/importutils
import pkg/asynctest {.warning[UnusedImport]: off.}
import pkg/chronos
import pkg/chronicles
import pkg/stew/byteutils
import pkg/datastore
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/poseidon2
import pkg/poseidon2/io
import pkg/nitro
import pkg/codexdht/discv5/protocol as discv5
import pkg/codex/stores
import pkg/codex/clock
import pkg/codex/contracts
import pkg/codex/systemclock
import pkg/codex/blockexchange
import pkg/codex/chunker
import pkg/codex/slots
import pkg/codex/manifest
import pkg/codex/discovery
import pkg/codex/erasure
import pkg/codex/merkletree
import pkg/codex/blocktype as bt
import pkg/codex/node {.all.}
import ../examples
import ./helpers
import ./helpers/mockmarket
import ./helpers/mockclock
privateAccess(CodexNodeRef) # enable access to private fields
proc toTimesDuration(d: chronos.Duration): times.Duration =
initDuration(seconds = d.seconds)
proc drain(
stream: LPStream | Result[lpstream.LPStream, ref CatchableError]):
Future[seq[byte]] {.async.} =
let
stream =
when typeof(stream) is Result[lpstream.LPStream, ref CatchableError]:
stream.tryGet()
else:
stream
defer:
await stream.close()
var data: seq[byte]
while not stream.atEof:
var
buf = newSeq[byte](DefaultBlockSize.int)
res = await stream.readOnce(addr buf[0], DefaultBlockSize.int)
check res <= DefaultBlockSize.int
buf.setLen(res)
data &= buf
data
proc pipeChunker(stream: BufferStream, chunker: Chunker) {.async.} =
try:
while (
let chunk = await chunker.getBytes();
chunk.len > 0):
await stream.pushData(chunk)
finally:
await stream.pushEof()
await stream.close()
template setupAndTearDown() {.dirty.} =
var
file: File
chunker: Chunker
switch: Switch
wallet: WalletRef
network: BlockExcNetwork
clock: Clock
localStore: RepoStore
localStoreRepoDs: DataStore
localStoreMetaDs: DataStore
engine: BlockExcEngine
store: NetworkStore
node: CodexNodeRef
blockDiscovery: Discovery
peerStore: PeerCtxStore
pendingBlocks: PendingBlocksManager
discovery: DiscoveryEngine
erasure: Erasure
let
path = currentSourcePath().parentDir
setup:
file = open(path /../ "fixtures" / "test.jpg")
chunker = FileChunker.new(file = file, chunkSize = DefaultBlockSize)
switch = newStandardSwitch()
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
clock = SystemClock.new()
localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet()
localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet()
localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock = clock)
await localStore.start()
blockDiscovery = Discovery.new(
switch.peerInfo.privateKey,
announceAddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/0")
.expect("Should return multiaddress")])
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, localStore)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
node = CodexNodeRef.new(switch, store, engine, erasure, blockDiscovery)
await node.start()
teardown:
close(file)
await node.stop()
asyncchecksuite "Test Node - Basic":
setupAndTearDown()
test "Fetch Manifest":
let
manifest = await storeDataGetManifest(localStore, chunker)
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = ManifestCodec).tryGet()
(await localStore.putBlock(manifestBlock)).tryGet()
let
fetched = (await node.fetchManifest(manifestBlock.cid)).tryGet()
check:
fetched == manifest
test "Block Batching":
let manifest = await storeDataGetManifest(localStore, chunker)
for batchSize in 1..12:
(await node.fetchBatched(
manifest,
batchSize = batchSize,
proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, async.} =
check blocks.len > 0 and blocks.len <= batchSize
return success()
)).tryGet()
test "Store and retrieve Data Stream":
let
stream = BufferStream.new()
storeFut = node.store(stream)
oddChunkSize = math.trunc(DefaultBlockSize.float / 3.14).NBytes # Let's check that node.store can correctly rechunk these odd chunks
oddChunker = FileChunker.new(file = file, chunkSize = oddChunkSize, pad = false) # TODO: doesn't work with pad=tue
var
original: seq[byte]
try:
while (
let chunk = await oddChunker.getBytes();
chunk.len > 0):
original &= chunk
await stream.pushData(chunk)
finally:
await stream.pushEof()
await stream.close()
let
manifestCid = (await storeFut).tryGet()
manifestBlock = (await localStore.getBlock(manifestCid)).tryGet()
localManifest = Manifest.decode(manifestBlock).tryGet()
data = await (await node.retrieve(manifestCid)).drain()
check:
data.len == localManifest.datasetSize.int
data.len == original.len
sha256.digest(data) == sha256.digest(original)
test "Retrieve One Block":
let
testString = "Block 1"
blk = bt.Block.new(testString.toBytes).tryGet()
(await localStore.putBlock(blk)).tryGet()
let stream = (await node.retrieve(blk.cid)).tryGet()
defer: await stream.close()
var data = newSeq[byte](testString.len)
await stream.readExactly(addr data[0], data.len)
check string.fromBytes(data) == testString
test "Setup purchase request":
let
manifest = await storeDataGetManifest(localStore, chunker)
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = ManifestCodec).tryGet()
protected = (await erasure.encode(manifest, 3, 2)).tryGet()
builder = SlotsBuilder.new(localStore, protected).tryGet()
verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock = bt.Block.new(
verifiable.encode().tryGet(),
codec = ManifestCodec).tryGet()
(await localStore.putBlock(manifestBlock)).tryGet()
let
request = (await node.setupRequest(
cid = manifestBlock.cid,
nodes = 5,
tolerance = 2,
duration = 100.u256,
reward = 2.u256,
proofProbability = 3.u256,
expiry = 200.u256,
collateral = 200.u256)).tryGet
check:
(await verifiableBlock.cid in localStore) == true
request.content.cid == $verifiableBlock.cid
request.content.merkleRoot == builder.verifyRoot.get.toBytes
asyncchecksuite "Test Node - Host contracts":
setupAndTearDown()
var
sales: Sales
purchasing: Purchasing
manifest: Manifest
manifestCidStr: string
manifestCid: Cid
market: MockMarket
setup:
# Setup Host Contracts and dependencies
market = MockMarket.new()
sales = Sales.new(market, clock, localStore)
node.contracts = (
none ClientInteractions,
some HostInteractions.new(clock, sales),
none ValidatorInteractions)
await node.start()
# Populate manifest in local store
manifest = await storeDataGetManifest(localStore, chunker)
let
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = ManifestCodec).tryGet()
manifestCid = manifestBlock.cid
manifestCidStr = $(manifestCid)
(await localStore.putBlock(manifestBlock)).tryGet()
test "onExpiryUpdate callback is set":
check sales.onExpiryUpdate.isSome
test "onExpiryUpdate callback":
let
# The blocks have set default TTL, so in order to update it we have to have larger TTL
expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123
expiryUpdateCallback = !sales.onExpiryUpdate
(await expiryUpdateCallback(manifestCidStr, expectedExpiry)).tryGet()
for index in 0..<manifest.blocksCount:
let
blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet
expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet
expiry = await localStoreMetaDs.get(expiryKey)
check (expiry.tryGet).toSecondsSince1970 == expectedExpiry
test "onStore callback is set":
check sales.onStore.isSome
test "onStore callback":
let onStore = !sales.onStore
var request = StorageRequest.example
request.content.cid = manifestCidStr
request.expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.u256
var fetchedBytes: uint = 0
let onBatch = proc(blocks: seq[bt.Block]): Future[?!void] {.async.} =
for blk in blocks:
fetchedBytes += blk.data.len.uint
return success()
(await onStore(request, 0.u256, onBatch)).tryGet()
check fetchedBytes == 2293760
for index in 0..<manifest.blocksCount:
let
blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet
expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet
expiry = await localStoreMetaDs.get(expiryKey)
check (expiry.tryGet).toSecondsSince1970 == request.expiry.toSecondsSince1970

@ -1 +1 @@
Subproject commit 0cde8aeb67c59fd0ac95496dc6b5e1168d6632aa Subproject commit 8a95ed9c90a9ea31fc1341b92c8a9c0935368cd9