mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-10 09:23:12 +00:00
Refactor codebase: enhance error handling, improve type definitions, and streamline async procedures
This commit is contained in:
parent
180a748167
commit
087cc4ce7b
@ -208,22 +208,13 @@ proc new*(
|
||||
.withTcpTransport({ServerFlags.ReuseAddr})
|
||||
.build()
|
||||
|
||||
var
|
||||
cache: CacheStore = nil
|
||||
taskpool: Taskpool
|
||||
var tp =
|
||||
try:
|
||||
Taskpool.new(numThreads = int(config.numThreads))
|
||||
except CatchableError as exc:
|
||||
raiseAssert("Failure in tp initialization:" & exc.msg)
|
||||
|
||||
try:
|
||||
if config.numThreads == ThreadCount(0):
|
||||
taskpool = Taskpool.new(numThreads = min(countProcessors(), 16))
|
||||
else:
|
||||
taskpool = Taskpool.new(numThreads = int(config.numThreads))
|
||||
info "Threadpool started", numThreads = taskpool.numThreads
|
||||
except CatchableError as exc:
|
||||
raiseAssert("Failure in taskpool initialization:" & exc.msg)
|
||||
|
||||
if config.cacheSize > 0'nb:
|
||||
cache = CacheStore.new(cacheSize = config.cacheSize)
|
||||
## Is unused?
|
||||
info "Threadpool started", numThreads = tp.numThreads
|
||||
|
||||
let discoveryDir = config.dataDir / CodexDhtNamespace
|
||||
|
||||
@ -299,9 +290,8 @@ proc new*(
|
||||
store = NetworkStore.new(engine, repoStore)
|
||||
prover =
|
||||
if config.prover:
|
||||
let backend =
|
||||
config.initializeBackend().expect("Unable to create prover backend.")
|
||||
some Prover.new(store, backend, config.numProofSamples)
|
||||
let prover = config.initializeProver(tp).expect("Unable to create prover.")
|
||||
some prover
|
||||
else:
|
||||
none Prover
|
||||
|
||||
@ -311,7 +301,7 @@ proc new*(
|
||||
engine = engine,
|
||||
discovery = discovery,
|
||||
prover = prover,
|
||||
taskPool = taskpool,
|
||||
taskPool = tp,
|
||||
)
|
||||
|
||||
restServer = RestServerRef
|
||||
|
||||
141
codex/node.nim
141
codex/node.nim
@ -46,6 +46,7 @@ import ./errors
|
||||
import ./logutils
|
||||
import ./utils/asynciter
|
||||
import ./utils/trackedfutures
|
||||
import ./utils/poseidon2digest
|
||||
|
||||
export logutils
|
||||
|
||||
@ -63,17 +64,17 @@ type
|
||||
]
|
||||
|
||||
CodexNode* = object
|
||||
switch: Switch
|
||||
networkId: PeerId
|
||||
networkStore: NetworkStore
|
||||
engine: BlockExcEngine
|
||||
prover: ?Prover
|
||||
discovery: Discovery
|
||||
contracts*: Contracts
|
||||
clock*: Clock
|
||||
storage*: Contracts
|
||||
taskpool: Taskpool
|
||||
trackedFutures: TrackedFutures
|
||||
switch: Switch # the libp2p network switch
|
||||
networkId: PeerId # the peer id of the node
|
||||
networkStore: NetworkStore # the network store
|
||||
engine: BlockExcEngine # the block exchange engine
|
||||
prover: ?Prover # the prover
|
||||
discovery: Discovery # the discovery service
|
||||
contracts*: Contracts # the contracts
|
||||
clock*: Clock # the clock
|
||||
storage*: Contracts # the storage
|
||||
taskpool: Taskpool # the taskpool
|
||||
trackedFutures: TrackedFutures # the tracked futures
|
||||
|
||||
CodexNodeRef* = ref CodexNode
|
||||
|
||||
@ -96,18 +97,12 @@ func discovery*(self: CodexNodeRef): Discovery =
|
||||
|
||||
proc storeManifest*(
|
||||
self: CodexNodeRef, manifest: Manifest
|
||||
): Future[?!bt.Block] {.async.} =
|
||||
without encodedVerifiable =? manifest.encode(), err:
|
||||
trace "Unable to encode manifest"
|
||||
return failure(err)
|
||||
): Future[?!bt.Block] {.async: (raises: [CancelledError]).} =
|
||||
let
|
||||
encodedVerifiable = ?manifest.encode()
|
||||
blk = ?bt.Block.new(data = encodedVerifiable, codec = ManifestCodec)
|
||||
|
||||
without blk =? bt.Block.new(data = encodedVerifiable, codec = ManifestCodec), error:
|
||||
trace "Unable to create block from manifest"
|
||||
return failure(error)
|
||||
|
||||
if err =? (await self.networkStore.putBlock(blk)).errorOption:
|
||||
trace "Unable to store manifest block", cid = blk.cid, err = err.msg
|
||||
return failure(err)
|
||||
?await self.networkStore.putBlock(blk)
|
||||
|
||||
success blk
|
||||
|
||||
@ -338,7 +333,9 @@ proc retrieve*(
|
||||
|
||||
await self.streamEntireDataset(manifest, cid)
|
||||
|
||||
proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
|
||||
proc deleteSingleBlock(
|
||||
self: CodexNodeRef, cid: Cid
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
if err =? (await self.networkStore.delBlock(cid)).errorOption:
|
||||
error "Error deleting block", cid, err = err.msg
|
||||
return failure(err)
|
||||
@ -346,7 +343,9 @@ proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
|
||||
trace "Deleted block", cid
|
||||
return success()
|
||||
|
||||
proc deleteEntireDataset(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
|
||||
proc deleteEntireDataset(
|
||||
self: CodexNodeRef, cid: Cid
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
# Deletion is a strictly local operation
|
||||
var store = self.networkStore.localStore
|
||||
|
||||
@ -403,7 +402,7 @@ proc store*(
|
||||
filename: ?string = string.none,
|
||||
mimetype: ?string = string.none,
|
||||
blockSize = DefaultBlockSize,
|
||||
): Future[?!Cid] {.async.} =
|
||||
): Future[?!Cid] {.async: (raises: [CancelledError]).} =
|
||||
## Save stream contents as dataset with given blockSize
|
||||
## to nodes's BlockStore, and return Cid of its manifest
|
||||
##
|
||||
@ -478,7 +477,9 @@ proc store*(
|
||||
|
||||
return manifestBlk.cid.success
|
||||
|
||||
proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
|
||||
proc iterateManifests*(
|
||||
self: CodexNodeRef, onManifest: OnManifest
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest):
|
||||
warn "Failed to listBlocks"
|
||||
return
|
||||
@ -505,7 +506,7 @@ proc setupRequest(
|
||||
pricePerBytePerSecond: UInt256,
|
||||
collateralPerByte: UInt256,
|
||||
expiry: uint64,
|
||||
): Future[?!StorageRequest] {.async.} =
|
||||
): Future[?!StorageRequest] {.async: (raises: [CancelledError]).} =
|
||||
## Setup slots for a given dataset
|
||||
##
|
||||
|
||||
@ -527,32 +528,20 @@ proc setupRequest(
|
||||
|
||||
trace "Setting up slots"
|
||||
|
||||
without manifest =? await self.fetchManifest(cid), error:
|
||||
trace "Unable to fetch manifest for cid"
|
||||
return failure error
|
||||
|
||||
# Erasure code the dataset according to provided parameters
|
||||
let erasure = Erasure.new(
|
||||
self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
|
||||
)
|
||||
|
||||
without encoded =? (await erasure.encode(manifest, ecK, ecM)), error:
|
||||
trace "Unable to erasure code dataset"
|
||||
return failure(error)
|
||||
|
||||
without builder =? Poseidon2Builder.new(self.networkStore.localStore, encoded), err:
|
||||
trace "Unable to create slot builder"
|
||||
return failure(err)
|
||||
|
||||
without verifiable =? (await builder.buildManifest()), err:
|
||||
trace "Unable to build verifiable manifest"
|
||||
return failure(err)
|
||||
|
||||
without manifestBlk =? await self.storeManifest(verifiable), err:
|
||||
trace "Unable to store verifiable manifest"
|
||||
return failure(err)
|
||||
|
||||
let
|
||||
manifest = ?await self.fetchManifest(cid)
|
||||
|
||||
# Erasure code the dataset according to provided parameters
|
||||
erasure = Erasure.new(
|
||||
self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider,
|
||||
self.taskpool,
|
||||
)
|
||||
|
||||
encoded = ?await erasure.encode(manifest, ecK, ecM)
|
||||
builder = ?Poseidon2Builder.new(self.networkStore.localStore, encoded)
|
||||
verifiable = ?await builder.buildManifest()
|
||||
manifestBlk = ?await self.storeManifest(verifiable)
|
||||
|
||||
verifyRoot =
|
||||
if builder.verifyRoot.isNone:
|
||||
return failure("No slots root")
|
||||
@ -586,7 +575,7 @@ proc requestStorage*(
|
||||
pricePerBytePerSecond: UInt256,
|
||||
collateralPerByte: UInt256,
|
||||
expiry: uint64,
|
||||
): Future[?!PurchaseId] {.async.} =
|
||||
): Future[?!PurchaseId] {.async: (raises: [CancelledError]).} =
|
||||
## Initiate a request for storage sequence, this might
|
||||
## be a multistep procedure.
|
||||
##
|
||||
@ -617,7 +606,17 @@ proc requestStorage*(
|
||||
trace "Unable to setup request"
|
||||
return failure err
|
||||
|
||||
let purchase = await contracts.purchasing.purchase(request)
|
||||
# TODO: remove try/except once state machine has checked exceptions
|
||||
let purchase =
|
||||
try:
|
||||
await contracts.purchasing.purchase(request)
|
||||
except CancelledError as err:
|
||||
trace "Purchase cancelled", err = err.msg
|
||||
raise err
|
||||
except CatchableError as err:
|
||||
trace "Unable to purchase storage", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
success purchase.id
|
||||
|
||||
proc onStore(
|
||||
@ -731,38 +730,28 @@ proc onProve(
|
||||
if prover =? self.prover:
|
||||
trace "Prover enabled"
|
||||
|
||||
without cid =? Cid.init(cidStr).mapFailure, err:
|
||||
error "Unable to parse Cid", cid, err = err.msg
|
||||
return failure(err)
|
||||
|
||||
without manifest =? await self.fetchManifest(cid), err:
|
||||
error "Unable to fetch manifest for cid", err = err.msg
|
||||
return failure(err)
|
||||
let
|
||||
cid = ?Cid.init(cidStr).mapFailure
|
||||
manifest = ?await self.fetchManifest(cid)
|
||||
builder =
|
||||
?Poseidon2Builder.new(self.networkStore, manifest, manifest.verifiableStrategy)
|
||||
sampler = ?Poseidon2Sampler.new(slotIdx, self.networkStore, builder)
|
||||
|
||||
when defined(verify_circuit):
|
||||
without (inputs, proof) =? await prover.prove(slotIdx.int, manifest, challenge),
|
||||
err:
|
||||
error "Unable to generate proof", err = err.msg
|
||||
return failure(err)
|
||||
let (proof, checked) =
|
||||
?await prover.prove(sampler, manifest, challenge, verify = true)
|
||||
|
||||
without checked =? await prover.verify(proof, inputs), err:
|
||||
error "Unable to verify proof", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if not checked:
|
||||
if checked.isSome and not checked.get:
|
||||
error "Proof verification failed"
|
||||
return failure("Proof verification failed")
|
||||
|
||||
trace "Proof verified successfully"
|
||||
else:
|
||||
without (_, proof) =? await prover.prove(slotIdx.int, manifest, challenge), err:
|
||||
error "Unable to generate proof", err = err.msg
|
||||
return failure(err)
|
||||
let (proof, _) = ?await prover.prove(sampler, manifest, challenge, verify = false)
|
||||
|
||||
let groth16Proof = proof.toGroth16Proof()
|
||||
trace "Proof generated successfully", groth16Proof
|
||||
trace "Proof generated successfully", proof
|
||||
|
||||
success groth16Proof
|
||||
success proof
|
||||
else:
|
||||
warn "Prover not enabled"
|
||||
failure "Prover not enabled"
|
||||
|
||||
@ -3,6 +3,6 @@ import ./converters
|
||||
|
||||
import ../merkletree
|
||||
|
||||
export builder, converters
|
||||
export builder, converters, merkletree
|
||||
|
||||
type Poseidon2Builder* = SlotsBuilder[Poseidon2Tree, Poseidon2Hash]
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import ./proofs/backends
|
||||
import ./proofs/prover
|
||||
import ./proofs/backendfactory
|
||||
import ./proofs/proverfactory
|
||||
|
||||
export circomcompat, prover, backendfactory
|
||||
export backends, prover, proverfactory
|
||||
|
||||
@ -8,23 +8,23 @@
|
||||
## those terms.
|
||||
|
||||
type
|
||||
Sample*[H] = object
|
||||
cellData*: seq[H]
|
||||
merklePaths*: seq[H]
|
||||
Sample*[SomeHash] = object
|
||||
cellData*: seq[SomeHash]
|
||||
merklePaths*: seq[SomeHash]
|
||||
|
||||
PublicInputs*[H] = object
|
||||
PublicInputs*[SomeHash] = object
|
||||
slotIndex*: int
|
||||
datasetRoot*: H
|
||||
entropy*: H
|
||||
datasetRoot*: SomeHash
|
||||
entropy*: SomeHash
|
||||
|
||||
ProofInputs*[H] = object
|
||||
entropy*: H
|
||||
datasetRoot*: H
|
||||
ProofInputs*[SomeHash] = object
|
||||
entropy*: SomeHash
|
||||
datasetRoot*: SomeHash
|
||||
slotIndex*: Natural
|
||||
slotRoot*: H
|
||||
slotRoot*: SomeHash
|
||||
nCellsPerSlot*: Natural
|
||||
nSlotsPerDataSet*: Natural
|
||||
slotProof*: seq[H]
|
||||
slotProof*: seq[SomeHash]
|
||||
# inclusion proof that shows that the slot root (leaf) is part of the dataset (root)
|
||||
samples*: seq[Sample[H]]
|
||||
samples*: seq[Sample[SomeHash]]
|
||||
# inclusion proofs which show that the selected cells (leafs) are part of the slot (roots)
|
||||
|
||||
@ -25,7 +25,7 @@ import ../merkletree
|
||||
|
||||
proc putSomeProofs*(
|
||||
store: BlockStore, tree: CodexTree, iter: Iter[int]
|
||||
): Future[?!void] {.async.} =
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
without treeCid =? tree.rootCid, err:
|
||||
return failure(err)
|
||||
|
||||
@ -51,8 +51,10 @@ proc putSomeProofs*(
|
||||
|
||||
proc putSomeProofs*(
|
||||
store: BlockStore, tree: CodexTree, iter: Iter[Natural]
|
||||
): Future[?!void] =
|
||||
): Future[?!void] {.async: (raises: [CancelledError], raw: true).} =
|
||||
store.putSomeProofs(tree, iter.map((i: Natural) => i.ord))
|
||||
|
||||
proc putAllProofs*(store: BlockStore, tree: CodexTree): Future[?!void] =
|
||||
proc putAllProofs*(
|
||||
store: BlockStore, tree: CodexTree
|
||||
): Future[?!void] {.async: (raises: [CancelledError], raw: true).} =
|
||||
store.putSomeProofs(tree, Iter[int].new(0 ..< tree.leavesCount))
|
||||
|
||||
@ -1,5 +1,3 @@
|
||||
import std/sequtils
|
||||
|
||||
proc createDoubleArray*(
|
||||
outerLen, innerLen: int
|
||||
): ptr UncheckedArray[ptr UncheckedArray[byte]] =
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user