moves tarball-related stuff out of node + adds explicit padding argument to "store" + adds "onBatch" callback to "fetchDatasetAsync"

This commit is contained in:
Marcin Czenko 2025-04-27 01:30:37 +02:00
parent 14f85a8f8d
commit 58c9e2001b
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0
2 changed files with 125 additions and 109 deletions

View File

@ -13,7 +13,6 @@ import std/options
import std/sequtils
import std/strformat
import std/sugar
import std/streams
import times
import pkg/taskpools
@ -48,9 +47,6 @@ import ./logutils
import ./utils/asynciter
import ./utils/trackedfutures
# tarball
import ./tarballs/[tarballs, encoding, decoding, stdstreamwrapper, directorymanifest]
export logutils
logScope:
@ -136,31 +132,6 @@ proc fetchManifest*(self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.}
return manifest.success
proc fetchDirectoryManifest*(
self: CodexNodeRef, cid: Cid
): Future[?!DirectoryManifest] {.async.} =
## Fetch and decode a manifest block
##
if err =? cid.isManifest.errorOption:
return failure "CID has invalid content type for manifest {$cid}"
trace "Retrieving manifest for cid", cid
without blk =? await self.networkStore.getBlock(BlockAddress.init(cid)), err:
trace "Error retrieve manifest block", cid, err = err.msg
return failure err
trace "Decoding manifest for cid", cid
without manifest =? DirectoryManifest.decode(blk), err:
trace "Unable to decode as manifest", err = err.msg
return failure("Unable to decode as manifest")
trace "Decoded manifest", cid
manifest.success
proc findPeer*(self: CodexNodeRef, peerId: PeerId): Future[?PeerRecord] {.async.} =
## Find peer using the discovery service from the given CodexNode
##
@ -255,7 +226,7 @@ proc fetchBatched*(
self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal)
proc fetchDatasetAsync*(
self: CodexNodeRef, manifest: Manifest, fetchLocal = true
self: CodexNodeRef, manifest: Manifest, fetchLocal = true, onBatch: BatchProc = nil
): Future[void] {.async: (raises: []).} =
## Asynchronously fetch a dataset in the background.
## This task will be tracked and cleaned up on node shutdown.
@ -263,7 +234,10 @@ proc fetchDatasetAsync*(
try:
if err =? (
await self.fetchBatched(
manifest = manifest, batchSize = DefaultFetchBatch, fetchLocal = fetchLocal
manifest = manifest,
batchSize = DefaultFetchBatch,
fetchLocal = fetchLocal,
onBatch = onBatch,
)
).errorOption:
error "Unable to fetch blocks", err = err.msg
@ -423,6 +397,7 @@ proc store*(
filename: ?string = string.none,
mimetype: ?string = string.none,
blockSize = DefaultBlockSize,
pad = true,
): Future[?!Cid] {.async.} =
## Save stream contents as dataset with given blockSize
## to nodes's BlockStore, and return Cid of its manifest
@ -432,7 +407,7 @@ proc store*(
let
hcodec = Sha256HashCodec
dataCodec = BlockCodec
chunker = LPStreamChunker.new(stream, chunkSize = blockSize)
chunker = LPStreamChunker.new(stream, chunkSize = blockSize, pad)
var cids: seq[Cid]
@ -515,83 +490,6 @@ proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
onManifest(cid, manifest)
proc storeDirectoryManifest*(
self: CodexNodeRef, manifest: DirectoryManifest
): Future[?!bt.Block] {.async.} =
let encodedManifest = manifest.encode()
without blk =? bt.Block.new(data = encodedManifest, codec = ManifestCodec), error:
trace "Unable to create block from manifest"
return failure(error)
if err =? (await self.networkStore.putBlock(blk)).errorOption:
trace "Unable to store manifest block", cid = blk.cid, err = err.msg
return failure(err)
success blk
proc storeTarball*(
self: CodexNodeRef, stream: AsyncStreamReader
): Future[?!string] {.async.} =
info "Storing tarball data"
# Just as a proof of concept, we process tar bar in memory
# Later to see how to do actual streaming to either store
# tarball locally in some tmp folder, or to process the
# tarball incrementally
let tarballBytes = await stream.read()
let stream = newStringStream(string.fromBytes(tarballBytes))
proc onProcessedTarFile(
stream: Stream, fileName: string
): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} =
try:
echo "onProcessedTarFile:name: ", fileName
let stream = newStdStreamWrapper(stream)
await self.store(stream, filename = some fileName)
except CancelledError as e:
raise e
except CatchableError as e:
error "Error processing tar file", fileName, exc = e.msg
return failure(e.msg)
proc onProcessedTarDir(
name: string, cids: seq[Cid]
): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} =
try:
echo "onProcessedTarDir:name: ", name
echo "onProcessedTarDir:cids: ", cids
let directoryManifest = newDirectoryManifest(name = name, cids = cids)
without manifestBlk =? await self.storeDirectoryManifest(directoryManifest), err:
error "Unable to store manifest"
return failure(err)
manifestBlk.cid.success
except CancelledError as e:
raise e
except CatchableError as e:
error "Error processing tar dir", name, exc = e.msg
return failure(e.msg)
let tarball = Tarball()
if err =? (await tarball.open(stream, onProcessedTarFile)).errorOption:
error "Unable to open tarball", err = err.msg
return failure(err)
echo "tarball = ", $tarball
without root =? tarball.findRootDir(), err:
return failure(err.msg)
echo "root = ", root
let dirs = processDirEntries(tarball)
echo "dirs = ", dirs
without tree =? (await buildTree(root = root, dirs = dirs, onProcessedTarDir)), err:
error "Unable to build tree", err = err.msg
return failure(err)
echo ""
echo "preorderTraversal:"
let json = newJArray()
preorderTraversal(tree, json)
echo "json = ", json
success($json)
proc setupRequest(
self: CodexNodeRef,
cid: Cid,

View File

@ -0,0 +1,118 @@
import std/streams
import pkg/chronos
import pkg/libp2p/cid
import pkg/questionable/results
import ../node
import ../blocktype
import ../manifest
import ../stores/blockstore
import ./tarballs
import ./stdstreamwrapper
import ./directorymanifest
import ./encoding
import ./decoding
proc fetchDirectoryManifest*(
self: CodexNodeRef, cid: Cid
): Future[?!DirectoryManifest] {.async: (raises: [CancelledError]).} =
## Fetch and decode a manifest block
##
if err =? cid.isManifest.errorOption:
return failure "CID has invalid content type for manifest {$cid}"
trace "Retrieving directory manifest for cid", cid
without blk =? await self.blockStore.getBlock(BlockAddress.init(cid)), err:
trace "Error retrieving directory manifest block", cid, err = err.msg
return failure err
trace "Decoding directory manifest for cid", cid
without manifest =? DirectoryManifest.decode(blk), err:
trace "Unable to decode as directory manifest", err = err.msg
return failure("Unable to decode as directory manifest")
trace "Decoded directory manifest", cid
manifest.success
proc storeDirectoryManifest*(
self: CodexNodeRef, manifest: DirectoryManifest
): Future[?!Block] {.async.} =
let encodedManifest = manifest.encode()
without blk =? Block.new(data = encodedManifest, codec = ManifestCodec), error:
trace "Unable to create block from manifest"
return failure(error)
if err =? (await self.blockStore.putBlock(blk)).errorOption:
trace "Unable to store manifest block", cid = blk.cid, err = err.msg
return failure(err)
success blk
proc storeTarball*(
self: CodexNodeRef, stream: AsyncStreamReader
): Future[?!string] {.async.} =
info "Storing tarball data"
# Just as a proof of concept, we process tar bar in memory
# Later to see how to do actual streaming to either store
# tarball locally in some tmp folder, or to process the
# tarball incrementally
let tarballBytes = await stream.read()
let stream = newStringStream(string.fromBytes(tarballBytes))
proc onProcessedTarFile(
stream: Stream, fileName: string
): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} =
try:
echo "onProcessedTarFile:name: ", fileName
let stream = newStdStreamWrapper(stream)
await self.store(stream, filename = some fileName, pad = false)
except CancelledError as e:
raise e
except CatchableError as e:
error "Error processing tar file", fileName, exc = e.msg
return failure(e.msg)
proc onProcessedTarDir(
name: string, cids: seq[Cid]
): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} =
try:
echo "onProcessedTarDir:name: ", name
echo "onProcessedTarDir:cids: ", cids
let directoryManifest = newDirectoryManifest(name = name, cids = cids)
without manifestBlk =? await self.storeDirectoryManifest(directoryManifest), err:
error "Unable to store manifest"
return failure(err)
manifestBlk.cid.success
except CancelledError as e:
raise e
except CatchableError as e:
error "Error processing tar dir", name, exc = e.msg
return failure(e.msg)
let tarball = Tarball()
if err =? (await tarball.open(stream, onProcessedTarFile)).errorOption:
error "Unable to open tarball", err = err.msg
return failure(err)
echo "tarball = ", $tarball
without root =? tarball.findRootDir(), err:
return failure(err.msg)
echo "root = ", root
let dirs = processDirEntries(tarball)
echo "dirs = ", dirs
without tree =? (await buildTree(root = root, dirs = dirs, onProcessedTarDir)), err:
error "Unable to build tree", err = err.msg
return failure(err)
echo ""
echo "preorderTraversal:"
let json = newJArray()
preorderTraversal(tree, json)
echo "json = ", json
success($json)