From 58c9e2001bbadd4a6d79f8e67c78ced0d3eda290 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Sun, 27 Apr 2025 01:30:37 +0200 Subject: [PATCH] moves tarball-related stuff out of node + adds explicit padding argument to "store" + adds "onBatch" callback to "fetchDatasetAsync" --- codex/node.nim | 116 ++-------------------- codex/tarballs/tarballnodeextensions.nim | 118 +++++++++++++++++++++++ 2 files changed, 125 insertions(+), 109 deletions(-) create mode 100644 codex/tarballs/tarballnodeextensions.nim diff --git a/codex/node.nim b/codex/node.nim index 32e43db7..ca562b9f 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -13,7 +13,6 @@ import std/options import std/sequtils import std/strformat import std/sugar -import std/streams import times import pkg/taskpools @@ -48,9 +47,6 @@ import ./logutils import ./utils/asynciter import ./utils/trackedfutures -# tarball -import ./tarballs/[tarballs, encoding, decoding, stdstreamwrapper, directorymanifest] - export logutils logScope: @@ -136,31 +132,6 @@ proc fetchManifest*(self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.} return manifest.success -proc fetchDirectoryManifest*( - self: CodexNodeRef, cid: Cid -): Future[?!DirectoryManifest] {.async.} = - ## Fetch and decode a manifest block - ## - - if err =? cid.isManifest.errorOption: - return failure "CID has invalid content type for manifest {$cid}" - - trace "Retrieving manifest for cid", cid - - without blk =? await self.networkStore.getBlock(BlockAddress.init(cid)), err: - trace "Error retrieve manifest block", cid, err = err.msg - return failure err - - trace "Decoding manifest for cid", cid - - without manifest =? DirectoryManifest.decode(blk), err: - trace "Unable to decode as manifest", err = err.msg - return failure("Unable to decode as manifest") - - trace "Decoded manifest", cid - - manifest.success - proc findPeer*(self: CodexNodeRef, peerId: PeerId): Future[?PeerRecord] {.async.} = ## Find peer using the discovery service from the given CodexNode ## @@ -255,7 +226,7 @@ proc fetchBatched*( self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal) proc fetchDatasetAsync*( - self: CodexNodeRef, manifest: Manifest, fetchLocal = true + self: CodexNodeRef, manifest: Manifest, fetchLocal = true, onBatch: BatchProc = nil ): Future[void] {.async: (raises: []).} = ## Asynchronously fetch a dataset in the background. ## This task will be tracked and cleaned up on node shutdown. @@ -263,7 +234,10 @@ proc fetchDatasetAsync*( try: if err =? ( await self.fetchBatched( - manifest = manifest, batchSize = DefaultFetchBatch, fetchLocal = fetchLocal + manifest = manifest, + batchSize = DefaultFetchBatch, + fetchLocal = fetchLocal, + onBatch = onBatch, ) ).errorOption: error "Unable to fetch blocks", err = err.msg @@ -423,6 +397,7 @@ proc store*( filename: ?string = string.none, mimetype: ?string = string.none, blockSize = DefaultBlockSize, + pad = true, ): Future[?!Cid] {.async.} = ## Save stream contents as dataset with given blockSize ## to nodes's BlockStore, and return Cid of its manifest @@ -432,7 +407,7 @@ proc store*( let hcodec = Sha256HashCodec dataCodec = BlockCodec - chunker = LPStreamChunker.new(stream, chunkSize = blockSize) + chunker = LPStreamChunker.new(stream, chunkSize = blockSize, pad) var cids: seq[Cid] @@ -515,83 +490,6 @@ proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} = onManifest(cid, manifest) -proc storeDirectoryManifest*( - self: CodexNodeRef, manifest: DirectoryManifest -): Future[?!bt.Block] {.async.} = - let encodedManifest = manifest.encode() - - without blk =? bt.Block.new(data = encodedManifest, codec = ManifestCodec), error: - trace "Unable to create block from manifest" - return failure(error) - - if err =? (await self.networkStore.putBlock(blk)).errorOption: - trace "Unable to store manifest block", cid = blk.cid, err = err.msg - return failure(err) - - success blk - -proc storeTarball*( - self: CodexNodeRef, stream: AsyncStreamReader -): Future[?!string] {.async.} = - info "Storing tarball data" - - # Just as a proof of concept, we process tar bar in memory - # Later to see how to do actual streaming to either store - # tarball locally in some tmp folder, or to process the - # tarball incrementally - let tarballBytes = await stream.read() - let stream = newStringStream(string.fromBytes(tarballBytes)) - - proc onProcessedTarFile( - stream: Stream, fileName: string - ): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} = - try: - echo "onProcessedTarFile:name: ", fileName - let stream = newStdStreamWrapper(stream) - await self.store(stream, filename = some fileName) - except CancelledError as e: - raise e - except CatchableError as e: - error "Error processing tar file", fileName, exc = e.msg - return failure(e.msg) - - proc onProcessedTarDir( - name: string, cids: seq[Cid] - ): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} = - try: - echo "onProcessedTarDir:name: ", name - echo "onProcessedTarDir:cids: ", cids - let directoryManifest = newDirectoryManifest(name = name, cids = cids) - without manifestBlk =? await self.storeDirectoryManifest(directoryManifest), err: - error "Unable to store manifest" - return failure(err) - manifestBlk.cid.success - except CancelledError as e: - raise e - except CatchableError as e: - error "Error processing tar dir", name, exc = e.msg - return failure(e.msg) - - let tarball = Tarball() - if err =? (await tarball.open(stream, onProcessedTarFile)).errorOption: - error "Unable to open tarball", err = err.msg - return failure(err) - echo "tarball = ", $tarball - without root =? tarball.findRootDir(), err: - return failure(err.msg) - echo "root = ", root - let dirs = processDirEntries(tarball) - echo "dirs = ", dirs - without tree =? (await buildTree(root = root, dirs = dirs, onProcessedTarDir)), err: - error "Unable to build tree", err = err.msg - return failure(err) - echo "" - echo "preorderTraversal:" - let json = newJArray() - preorderTraversal(tree, json) - echo "json = ", json - success($json) - proc setupRequest( self: CodexNodeRef, cid: Cid, diff --git a/codex/tarballs/tarballnodeextensions.nim b/codex/tarballs/tarballnodeextensions.nim new file mode 100644 index 00000000..9aa36fbd --- /dev/null +++ b/codex/tarballs/tarballnodeextensions.nim @@ -0,0 +1,118 @@ +import std/streams + +import pkg/chronos +import pkg/libp2p/cid +import pkg/questionable/results + +import ../node +import ../blocktype +import ../manifest +import ../stores/blockstore + +import ./tarballs +import ./stdstreamwrapper +import ./directorymanifest +import ./encoding +import ./decoding + +proc fetchDirectoryManifest*( + self: CodexNodeRef, cid: Cid +): Future[?!DirectoryManifest] {.async: (raises: [CancelledError]).} = + ## Fetch and decode a manifest block + ## + + if err =? cid.isManifest.errorOption: + return failure "CID has invalid content type for manifest {$cid}" + + trace "Retrieving directory manifest for cid", cid + + without blk =? await self.blockStore.getBlock(BlockAddress.init(cid)), err: + trace "Error retrieving directory manifest block", cid, err = err.msg + return failure err + + trace "Decoding directory manifest for cid", cid + + without manifest =? DirectoryManifest.decode(blk), err: + trace "Unable to decode as directory manifest", err = err.msg + return failure("Unable to decode as directory manifest") + + trace "Decoded directory manifest", cid + + manifest.success + +proc storeDirectoryManifest*( + self: CodexNodeRef, manifest: DirectoryManifest +): Future[?!Block] {.async.} = + let encodedManifest = manifest.encode() + + without blk =? Block.new(data = encodedManifest, codec = ManifestCodec), error: + trace "Unable to create block from manifest" + return failure(error) + + if err =? (await self.blockStore.putBlock(blk)).errorOption: + trace "Unable to store manifest block", cid = blk.cid, err = err.msg + return failure(err) + + success blk + +proc storeTarball*( + self: CodexNodeRef, stream: AsyncStreamReader +): Future[?!string] {.async.} = + info "Storing tarball data" + + # Just as a proof of concept, we process tar bar in memory + # Later to see how to do actual streaming to either store + # tarball locally in some tmp folder, or to process the + # tarball incrementally + let tarballBytes = await stream.read() + let stream = newStringStream(string.fromBytes(tarballBytes)) + + proc onProcessedTarFile( + stream: Stream, fileName: string + ): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} = + try: + echo "onProcessedTarFile:name: ", fileName + let stream = newStdStreamWrapper(stream) + await self.store(stream, filename = some fileName, pad = false) + except CancelledError as e: + raise e + except CatchableError as e: + error "Error processing tar file", fileName, exc = e.msg + return failure(e.msg) + + proc onProcessedTarDir( + name: string, cids: seq[Cid] + ): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} = + try: + echo "onProcessedTarDir:name: ", name + echo "onProcessedTarDir:cids: ", cids + let directoryManifest = newDirectoryManifest(name = name, cids = cids) + without manifestBlk =? await self.storeDirectoryManifest(directoryManifest), err: + error "Unable to store manifest" + return failure(err) + manifestBlk.cid.success + except CancelledError as e: + raise e + except CatchableError as e: + error "Error processing tar dir", name, exc = e.msg + return failure(e.msg) + + let tarball = Tarball() + if err =? (await tarball.open(stream, onProcessedTarFile)).errorOption: + error "Unable to open tarball", err = err.msg + return failure(err) + echo "tarball = ", $tarball + without root =? tarball.findRootDir(), err: + return failure(err.msg) + echo "root = ", root + let dirs = processDirEntries(tarball) + echo "dirs = ", dirs + without tree =? (await buildTree(root = root, dirs = dirs, onProcessedTarDir)), err: + error "Unable to build tree", err = err.msg + return failure(err) + echo "" + echo "preorderTraversal:" + let json = newJArray() + preorderTraversal(tree, json) + echo "json = ", json + success($json)