From b88f767f08a476fadd78b3f6f84c2a0e494cb4ee Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 15 Jan 2024 21:47:40 -0600 Subject: [PATCH] wip sampler integration --- codex/node.nim | 83 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 25 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index affc4307..14dc6a59 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -102,20 +102,6 @@ proc storeManifest*( success blk -proc findPeer*( - self: CodexNodeRef, - peerId: PeerId): Future[?PeerRecord] {.async.} = - ## Find peer using the discovery service from the given CodexNode - ## - return await self.discovery.findPeer(peerId) - -proc connect*( - self: CodexNodeRef, - peerId: PeerId, - addrs: seq[MultiAddress] -): Future[void] = - self.switch.connect(peerId, addrs) - proc fetchManifest*( self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.} = @@ -141,6 +127,20 @@ proc fetchManifest*( return manifest.success +proc findPeer*( + self: CodexNodeRef, + peerId: PeerId): Future[?PeerRecord] {.async.} = + ## Find peer using the discovery service from the given CodexNode + ## + return await self.discovery.findPeer(peerId) + +proc connect*( + self: CodexNodeRef, + peerId: PeerId, + addrs: seq[MultiAddress] +): Future[void] = + self.switch.connect(peerId, addrs) + proc updateExpiry*( self: CodexNodeRef, manifestCid: Cid, @@ -493,9 +493,8 @@ proc onStore( trace "Received a request to store a slot!" - without cid =? Cid.init(request.content.cid): + without cid =? Cid.init(request.content.cid).mapFailure, err: trace "Unable to parse Cid", cid - let err = newException(CodexError, "Unable to parse Cid") return failure(err) without manifest =? (await self.fetchManifest(cid)), err: @@ -528,8 +527,11 @@ proc onStore( return success() - if blksIter =? builder.slotIndicies(slotIdx) and - err =? (await self.fetchBatched(manifest.treeCid, blksIter, onBatch = updateExpiry)).errorOption: + if blksIter =? builder.slotIndiciesIter(slotIdx) and + err =? (await self.fetchBatched( + manifest.treeCid, + blksIter, + onBatch = updateExpiry)).errorOption: trace "Unable to fetch blocks", err = err.msg return failure(err) @@ -543,6 +545,44 @@ proc onStore( return success() +proc onProve( + self: CodexNodeRef, + slot: Slot, + challenge: ProofChallenge): Future[seq[byte]] {.async.} = + ## Generats a proof for a given slot and challenge + ## + + let + cidStr = slot.request.content.cid + slotIdx = slot.slotIndex.truncate(Natural) + + logScope: + cid = cidStr + slot = slot + challenge = challenge + + trace "Received proof challenge" + + without cid =? Cid.init(cidStr).mapFailure, err: + trace "Unable to parse Cid", cid, err = err.msg + return + + without manifest =? await self.fetchManifest(cid), err: + trace "Unable to fetch manifest for cid", err = err.msg + return + + without builder =? SlotsBuilder.new(self.blockStore, manifest), err: + trace "Unable to create slots builder", err = err.msg + return + + without sampler =? DataSampler.new(slotIdx, self.blockStore, builder), err: + trace "Unable to create data sampler", err = err.msg + return + + without proof =? await sampler.getProofs(challenge, slotIdx), err: + trace "Unable to get proofs for slot", err = err.msg + return + proc onExpiryUpdate( self: CodexNodeRef, rootCid: string, @@ -561,13 +601,6 @@ proc onClear( # TODO: remove data from local storage discard -proc onProve( - self: CodexNodeRef, - slot: Slot, - challenge: ProofChallenge): Future[seq[byte]] {.async.} = - # TODO: generate proof - return @[42'u8] - proc start*(self: CodexNodeRef) {.async.} = if not self.engine.isNil: await self.engine.start()