diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index d094c454..162ba554 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -25,6 +25,9 @@ import ../../stores/blockstore import ../../logutils import ../../manifest +# tarballs +import ../../tarballs/[directorymanifest, decoding] + logScope: topics = "codex discoveryengine advertiser" @@ -66,7 +69,11 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError]) return without manifest =? Manifest.decode(blk), err: - error "Unable to decode as manifest", err = err.msg + # Try if it not a directory manifest + without manifest =? DirectoryManifest.decode(blk), err: + error "Unable to decode as manifest", err = err.msg + return + await b.addCidToQueue(cid) return # announce manifest cid and tree cid diff --git a/codex/node.nim b/codex/node.nim index fb653c0d..32e43db7 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -13,6 +13,7 @@ import std/options import std/sequtils import std/strformat import std/sugar +import std/streams import times import pkg/taskpools @@ -47,6 +48,9 @@ import ./logutils import ./utils/asynciter import ./utils/trackedfutures +# tarball +import ./tarballs/[tarballs, encoding, decoding, stdstreamwrapper, directorymanifest] + export logutils logScope: @@ -132,6 +136,31 @@ proc fetchManifest*(self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.} return manifest.success +proc fetchDirectoryManifest*( + self: CodexNodeRef, cid: Cid +): Future[?!DirectoryManifest] {.async.} = + ## Fetch and decode a manifest block + ## + + if err =? cid.isManifest.errorOption: + return failure "CID has invalid content type for manifest {$cid}" + + trace "Retrieving manifest for cid", cid + + without blk =? await self.networkStore.getBlock(BlockAddress.init(cid)), err: + trace "Error retrieve manifest block", cid, err = err.msg + return failure err + + trace "Decoding manifest for cid", cid + + without manifest =? DirectoryManifest.decode(blk), err: + trace "Unable to decode as manifest", err = err.msg + return failure("Unable to decode as manifest") + + trace "Decoded manifest", cid + + manifest.success + proc findPeer*(self: CodexNodeRef, peerId: PeerId): Future[?PeerRecord] {.async.} = ## Find peer using the discovery service from the given CodexNode ## @@ -486,6 +515,83 @@ proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} = onManifest(cid, manifest) +proc storeDirectoryManifest*( + self: CodexNodeRef, manifest: DirectoryManifest +): Future[?!bt.Block] {.async.} = + let encodedManifest = manifest.encode() + + without blk =? bt.Block.new(data = encodedManifest, codec = ManifestCodec), error: + trace "Unable to create block from manifest" + return failure(error) + + if err =? (await self.networkStore.putBlock(blk)).errorOption: + trace "Unable to store manifest block", cid = blk.cid, err = err.msg + return failure(err) + + success blk + +proc storeTarball*( + self: CodexNodeRef, stream: AsyncStreamReader +): Future[?!string] {.async.} = + info "Storing tarball data" + + # Just as a proof of concept, we process tar bar in memory + # Later to see how to do actual streaming to either store + # tarball locally in some tmp folder, or to process the + # tarball incrementally + let tarballBytes = await stream.read() + let stream = newStringStream(string.fromBytes(tarballBytes)) + + proc onProcessedTarFile( + stream: Stream, fileName: string + ): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} = + try: + echo "onProcessedTarFile:name: ", fileName + let stream = newStdStreamWrapper(stream) + await self.store(stream, filename = some fileName) + except CancelledError as e: + raise e + except CatchableError as e: + error "Error processing tar file", fileName, exc = e.msg + return failure(e.msg) + + proc onProcessedTarDir( + name: string, cids: seq[Cid] + ): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} = + try: + echo "onProcessedTarDir:name: ", name + echo "onProcessedTarDir:cids: ", cids + let directoryManifest = newDirectoryManifest(name = name, cids = cids) + without manifestBlk =? await self.storeDirectoryManifest(directoryManifest), err: + error "Unable to store manifest" + return failure(err) + manifestBlk.cid.success + except CancelledError as e: + raise e + except CatchableError as e: + error "Error processing tar dir", name, exc = e.msg + return failure(e.msg) + + let tarball = Tarball() + if err =? (await tarball.open(stream, onProcessedTarFile)).errorOption: + error "Unable to open tarball", err = err.msg + return failure(err) + echo "tarball = ", $tarball + without root =? tarball.findRootDir(), err: + return failure(err.msg) + echo "root = ", root + let dirs = processDirEntries(tarball) + echo "dirs = ", dirs + without tree =? (await buildTree(root = root, dirs = dirs, onProcessedTarDir)), err: + error "Unable to build tree", err = err.msg + return failure(err) + echo "" + echo "preorderTraversal:" + let json = newJArray() + preorderTraversal(tree, json) + echo "json = ", json + success($json) + proc setupRequest( self: CodexNodeRef, cid: Cid, diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 7c7dcd34..de0bdbdd 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -39,6 +39,8 @@ import ../manifest import ../streams/asyncstreamwrapper import ../stores import ../utils/options +# tarballs +import ../tarballs/directorymanifest import ./coders import ./json @@ -52,6 +54,11 @@ declareCounter(codex_api_downloads, "codex API downloads") proc validate(pattern: string, value: string): int {.gcsafe, raises: [Defect].} = 0 +proc formatDirectoryManifest( + cid: Cid, manifest: DirectoryManifest +): RestDirectoryContent = + return RestDirectoryContent.init(cid, manifest) + proc formatManifest(cid: Cid, manifest: Manifest): RestContent = return RestContent.init(cid, manifest) @@ -179,7 +186,76 @@ proc getFilenameFromContentDisposition(contentDisposition: string): ?string = return filename[0 ..^ 2].some proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) = - let allowedOrigin = router.allowedOrigin # prevents capture inside of api defintion + let allowedOrigin = router.allowedOrigin # prevents capture inside of api definition + + router.api(MethodOptions, "/api/codex/v1/tar") do( + resp: HttpResponseRef + ) -> RestApiResponse: + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("POST", corsOrigin) + resp.setHeader( + "Access-Control-Allow-Headers", "content-type, content-disposition" + ) + + resp.status = Http204 + await resp.sendBody("") + + router.rawApi(MethodPost, "/api/codex/v1/tar") do() -> RestApiResponse: + ## Upload a file in a streaming manner + ## + + trace "Handling upload of a tar file" + var bodyReader = request.getBodyReader() + if bodyReader.isErr(): + return RestApiResponse.error(Http500, msg = bodyReader.error()) + + # Attempt to handle `Expect` header + # some clients (curl), wait 1000ms + # before giving up + # + await request.handleExpect() + + var mimetype = request.headers.getString(ContentTypeHeader).some + + if mimetype.get() != "": + let mimetypeVal = mimetype.get() + var m = newMimetypes() + let extension = m.getExt(mimetypeVal, "") + if extension == "": + return RestApiResponse.error( + Http422, "The MIME type '" & mimetypeVal & "' is not valid." + ) + else: + mimetype = string.none + + const ContentDispositionHeader = "Content-Disposition" + let contentDisposition = request.headers.getString(ContentDispositionHeader) + let filename = getFilenameFromContentDisposition(contentDisposition) + + if filename.isSome and not isValidFilename(filename.get()): + return RestApiResponse.error(Http422, "The filename is not valid.") + + # Here we could check if the extension matches the filename if needed + + let reader = bodyReader.get() + let stream = AsyncStreamReader(reader) + + try: + without json =? (await node.storeTarball(stream = stream)), error: + error "Error uploading tarball", exc = error.msg + return RestApiResponse.error(Http500, error.msg) + + codex_api_uploads.inc() + trace "Uploaded tarball", result = json + return RestApiResponse.response(json, contentType = "application/json") + except CancelledError: + trace "Upload cancelled error" + return RestApiResponse.error(Http500) + except AsyncStreamError: + trace "Async stream error" + return RestApiResponse.error(Http500) + finally: + await reader.closeWait() router.api(MethodOptions, "/api/codex/v1/data") do( resp: HttpResponseRef @@ -363,6 +439,24 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute let json = %formatManifest(cid.get(), manifest) return RestApiResponse.response($json, contentType = "application/json") + router.api(MethodGet, "/api/codex/v1/data/{cid}/network/dirmanifest") do( + cid: Cid, resp: HttpResponseRef + ) -> RestApiResponse: + ## Download only the directory manifest. + ## + + var headers = buildCorsHeaders("GET", allowedOrigin) + + if cid.isErr: + return RestApiResponse.error(Http400, $cid.error(), headers = headers) + + without manifest =? (await node.fetchDirectoryManifest(cid.get())), err: + error "Failed to fetch directory manifest", err = err.msg + return RestApiResponse.error(Http404, err.msg, headers = headers) + + let json = %formatDirectoryManifest(cid.get(), manifest) + return RestApiResponse.response($json, contentType = "application/json") + router.api(MethodGet, "/api/codex/v1/space") do() -> RestApiResponse: let json = %RestRepoStore( diff --git a/codex/rest/json.nim b/codex/rest/json.nim index 1b9459c1..ec64c311 100644 --- a/codex/rest/json.nim +++ b/codex/rest/json.nim @@ -9,6 +9,8 @@ import ../utils/json import ../manifest import ../units +import ../tarballs/directorymanifest + export json type @@ -47,6 +49,10 @@ type cid* {.serialize.}: Cid manifest* {.serialize.}: Manifest + RestDirectoryContent* = object + cid* {.serialize.}: Cid + manifest* {.serialize.}: DirectoryManifest + RestContentList* = object content* {.serialize.}: seq[RestContent] @@ -81,6 +87,11 @@ proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList proc init*(_: type RestContent, cid: Cid, manifest: Manifest): RestContent = RestContent(cid: cid, manifest: manifest) +proc init*( + _: type RestDirectoryContent, cid: Cid, manifest: DirectoryManifest +): RestDirectoryContent = + RestDirectoryContent(cid: cid, manifest: manifest) + proc init*(_: type RestNode, node: dn.Node): RestNode = RestNode( nodeId: RestNodeId.init(node.id), diff --git a/codex/tarballs/decoding.nim b/codex/tarballs/decoding.nim new file mode 100644 index 00000000..d1963d27 --- /dev/null +++ b/codex/tarballs/decoding.nim @@ -0,0 +1,60 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p/cid +import pkg/libp2p/multihash +import pkg/libp2p/protobuf/minprotobuf + +import pkg/questionable/results + +import ../blocktype +import ./directorymanifest + +func decode*(_: type DirectoryManifest, data: openArray[byte]): ?!DirectoryManifest = + # ```protobuf + # Message DirectoryManifest { + # Message Cid { + # bytes data = 1; + # } + # string name = 1; + # repeated Cid cids = 2; + # ``` + + var + pbNode = initProtoBuffer(data) + pbInfo: ProtoBuffer + name: string + cids: seq[Cid] + cidsBytes: seq[seq[byte]] + + if pbNode.getField(1, name).isErr: + return failure("Unable to decode `name` from DirectoryManifest") + + if ?pbNode.getRepeatedField(2, cidsBytes).mapFailure: + for cidEntry in cidsBytes: + var pbCid = initProtoBuffer(cidEntry) + var dataBuf = newSeq[byte]() + if pbCid.getField(1, dataBuf).isErr: + return failure("Unable to decode piece `data` to Cid") + without cid =? Cid.init(dataBuf).mapFailure, err: + return failure(err.msg) + cids.add(cid) + + DirectoryManifest(name: name, cids: cids).success + +func decode*(_: type DirectoryManifest, blk: Block): ?!DirectoryManifest = + ## Decode a directory manifest using `decoder` + ## + + if not ?blk.cid.isManifest: + return failure "Cid is not a Directory Manifest Cid" + + DirectoryManifest.decode(blk.data) diff --git a/codex/tarballs/directorymanifest.nim b/codex/tarballs/directorymanifest.nim new file mode 100644 index 00000000..c81bb80e --- /dev/null +++ b/codex/tarballs/directorymanifest.nim @@ -0,0 +1,34 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p/cid +# import pkg/stew/byteutils +# import pkg/questionable +# import pkg/questionable/results + +# import ../merkletree/codex/codex +import ../utils/json + +# import ../errors +# import ../codextypes + +type DirectoryManifest* = ref object + name* {.serialize.}: string + cids* {.serialize.}: seq[Cid] + +proc `$`*(self: DirectoryManifest): string = + "DirectoryManifest(name: " & self.name & ", cids: " & $self.cids & ")" + +func `==`*(a: DirectoryManifest, b: DirectoryManifest): bool = + a.name == b.name and a.cids == b.cids + +proc newDirectoryManifest*(name: string, cids: seq[Cid]): DirectoryManifest = + DirectoryManifest(name: name, cids: cids) diff --git a/codex/tarballs/encoding.nim b/codex/tarballs/encoding.nim new file mode 100644 index 00000000..8e25853c --- /dev/null +++ b/codex/tarballs/encoding.nim @@ -0,0 +1,39 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p/cid +import pkg/libp2p/protobuf/minprotobuf + +import ./directorymanifest + +proc write(pb: var ProtoBuffer, field: int, value: Cid) = + var ipb = initProtoBuffer() + ipb.write(1, value.data.buffer) + ipb.finish() + pb.write(field, ipb) + +proc encode*(manifest: DirectoryManifest): seq[byte] = + # ```protobuf + # Message DirectoryManifest { + # Message Cid { + # bytes data = 1; + # } + # + # string name = 1; + # repeated Cid cids = 2; + # ``` + + var ipb = initProtoBuffer() + ipb.write(1, manifest.name) + for cid in manifest.cids: + ipb.write(2, cid) + ipb.finish() + ipb.buffer diff --git a/codex/tarballs/stdstreamwrapper.nim b/codex/tarballs/stdstreamwrapper.nim new file mode 100644 index 00000000..10e3aeb0 --- /dev/null +++ b/codex/tarballs/stdstreamwrapper.nim @@ -0,0 +1,72 @@ +## Nim-LibP2P +## Copyright (c) 2019 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/streams +import pkg/libp2p +import pkg/chronos + +import ../logutils + +logScope: + topics = "libp2p stdstreamwrapper" + +const StdStreamWrapperName* = "StdStreamWrapper" + +type StdStreamWrapper* = ref object of LPStream + stream*: Stream + +method initStream*(self: StdStreamWrapper) = + if self.objName.len == 0: + self.objName = StdStreamWrapperName + + procCall LPStream(self).initStream() + +proc newStdStreamWrapper*(stream: Stream = nil): StdStreamWrapper = + let stream = StdStreamWrapper(stream: stream) + + stream.initStream() + return stream + +template withExceptions(body: untyped) = + try: + body + except CatchableError as exc: + raise newException(Defect, "Unexpected error in StdStreamWrapper", exc) + +method readOnce*( + self: StdStreamWrapper, pbytes: pointer, nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = + trace "Reading bytes from stream", bytes = nbytes + if isNil(self.stream): + error "StdStreamWrapper: stream is nil" + raiseAssert("StdStreamWrapper: stream is nil") + + if self.atEof: + raise newLPStreamEOFError() + + withExceptions: + return self.stream.readData(pbytes, nbytes) + +method atEof*(self: StdStreamWrapper): bool = + withExceptions: + return self.stream.atEnd() + +method closeImpl*(self: StdStreamWrapper) {.async: (raises: []).} = + try: + trace "Shutting down std stream" + + self.stream.close() + + trace "Shutdown async chronos stream" + except CatchableError as exc: + trace "Error closing std stream", msg = exc.msg + + await procCall LPStream(self).closeImpl() diff --git a/codex/tarballs/tarballs.nim b/codex/tarballs/tarballs.nim new file mode 100644 index 00000000..04db1c29 --- /dev/null +++ b/codex/tarballs/tarballs.nim @@ -0,0 +1,278 @@ +{.push raises: [].} + +import std/os +import std/times +import std/strutils +import std/strformat +import std/sequtils +import std/streams +import std/tables + +import std/random + +import pkg/chronos +import pkg/questionable/results +import pkg/libp2p/[cid, multicodec, multihash] +import pkg/serde/json + +import ../blocktype + +proc example2*(_: type Block, size: int = 4096): ?!Block = + let length = rand(size) + let bytes = newSeqWith(length, rand(uint8)) + Block.new(bytes) + +proc example2*(_: type Cid): ?!Cid = + Block.example2 .? cid + +const + TUREAD* = 0o00400'u32 # read by owner */ + TUWRITE* = 0o00200'u32 # write by owner */ + TUEXEC* = 0o00100'u32 # execute/search by owner */ + TGREAD* = 0o00040'u32 # read by group */ + TGWRITE* = 0o00020'u32 # write by group */ + TGEXEC* = 0o00010'u32 # execute/search by group */ + TOREAD* = 0o00004'u32 # read by other */ + TOWRITE* = 0o00002'u32 # write by other */ + TOEXEC* = 0o00001'u32 # execute/search by other */ + +type + EntryKind* = enum + ekNormalFile = '0' + ekDirectory = '5' + + TarballEntry* = object + kind*: EntryKind + name: string + cid: Cid + contentLength: int + lastModified*: times.Time + permissions*: set[FilePermission] + + Tarball* = ref object + contents*: OrderedTable[string, TarballEntry] + + TarballError* = object of ValueError + + TarballTree* = ref object + name*: string + cid*: Cid + children*: seq[TarballTree] + + # ToDo: make sure we also record files permissions, modification time, etc... + # For now, only fileName so that we do not have to change the Codex manifest + # right away + OnProcessedTarFile* = proc(stream: Stream, fileName: string): Future[?!Cid] {. + gcsafe, async: (raises: [CancelledError]) + .} + + OnProcessedTarDir* = proc(name: string, cids: seq[Cid]): Future[?!Cid] {. + gcsafe, async: (raises: [CancelledError]) + .} + +proc `$`*(tarball: Tarball): string = + result = "Tarball with " & $tarball.contents.len & " entries" + for name, entry in tarball.contents.pairs: + var lastModified: string = "(unknown)" + try: + let lastModified = $entry.lastModified + except TimeFormatParseError: + discard + result.add( + "\n " & + fmt"{name}: name = {entry.name}, {entry.kind} ({entry.contentLength} bytes) @ {lastModified} [{entry.cid}]" + ) + +proc `$`*(tarballEntry: TarballEntry): string = + ## Returns a string representation of the tarball entry. + result = fmt"({tarballEntry.kind}, {tarballEntry.name})" + +proc parseFilePermissions(permissions: uint32): set[FilePermission] = + if defined(windows) or permissions == 0: + # Ignore file permissions on Windows. If they are absent (.zip made on + # Windows for example), set default permissions. + result.incl fpUserRead + result.incl fpUserWrite + result.incl fpGroupRead + result.incl fpOthersRead + else: + if (permissions and TUREAD) != 0: + result.incl(fpUserRead) + if (permissions and TUWRITE) != 0: + result.incl(fpUserWrite) + if (permissions and TUEXEC) != 0: + result.incl(fpUserExec) + if (permissions and TGREAD) != 0: + result.incl(fpGroupRead) + if (permissions and TGWRITE) != 0: + result.incl(fpGroupWrite) + if (permissions and TGEXEC) != 0: + result.incl(fpGroupExec) + if (permissions and TOREAD) != 0: + result.incl(fpOthersRead) + if (permissions and TOWRITE) != 0: + result.incl(fpOthersWrite) + if (permissions and TOEXEC) != 0: + result.incl(fpOthersExec) + +proc toUnixPath(path: string): string = + path.replace('\\', '/') + +proc clear*(tarball: Tarball) = + tarball.contents.clear() + +proc openStreamImpl( + tarball: Tarball, stream: Stream, onProcessedTarFile: OnProcessedTarFile = nil +): Future[?!void] {.async: (raises: []).} = + tarball.clear() + + proc trim(s: string): string = + for i in 0 ..< s.len: + if s[i] == '\0': + return s[0 ..< i] + s + + try: + var data = stream.readAll() # TODO: actually treat as a stream + + var pos: int + while pos < data.len: + if pos + 512 > data.len: + return failure("Attempted to read past end of file, corrupted tarball?") + + let + header = data[pos ..< pos + 512] + fileName = header[0 ..< 100].trim() + + pos += 512 + + if fileName.len == 0: + continue + + let + fileSize = + try: + parseOctInt(header[124 .. 134]) + except ValueError: + raise newException(TarballError, "Unexpected error while opening tarball") + lastModified = + try: + parseOctInt(header[136 .. 146]) + except ValueError: + raise newException(TarballError, "Unexpected error while opening tarball") + typeFlag = header[156] + fileMode = + try: + parseOctInt(header[100 ..< 106]) + except ValueError: + raise newException( + TarballError, "Unexpected error while opening tarball (mode)" + ) + fileNamePrefix = + if header[257 ..< 263] == "ustar\0": + header[345 ..< 500].trim() + else: + "" + + if pos + fileSize > data.len: + return failure("Attempted to read past end of file, corrupted tarball?") + + let normalizedFileName = normalizePathEnd(fileName) + if typeFlag == '0' or typeFlag == '\0': + if not onProcessedTarFile.isNil: + let stream = newStringStream(data[pos ..< pos + fileSize]) + without cid =? await onProcessedTarFile(stream, normalizedFileName), err: + return failure(err.msg) + tarball.contents[(fileNamePrefix / fileName).toUnixPath()] = TarballEntry( + kind: ekNormalFile, + name: normalizedFileName, + contentLength: fileSize, + cid: cid, + lastModified: initTime(lastModified, 0), + permissions: parseFilePermissions(cast[uint32](fileMode)), + ) + elif typeFlag == '5': + tarball.contents[normalizePathEnd((fileNamePrefix / fileName).toUnixPath())] = TarballEntry( + kind: ekDirectory, + name: normalizedFileName, + lastModified: initTime(lastModified, 0), + permissions: parseFilePermissions(cast[uint32](fileMode)), + ) + + # Move pos by fileSize, where fileSize is 512 byte aligned + pos += (fileSize + 511) and not 511 + success() + except CatchableError as e: + return failure(e.msg) + +proc open*( + tarball: Tarball, bytes: string, onProcessedTarFile: OnProcessedTarFile = nil +): Future[?!void] {.async: (raw: true, raises: []).} = + let stream = newStringStream(bytes) + tarball.openStreamImpl(stream, onProcessedTarFile) + +proc open*( + tarball: Tarball, stream: Stream, onProcessedTarFile: OnProcessedTarFile = nil +): Future[?!void] {.async: (raw: true, raises: []).} = + tarball.openStreamImpl(stream, onProcessedTarFile) + +proc processDirEntries*(tarball: Tarball): Table[string, seq[TarballEntry]] = + result = initTable[string, seq[TarballEntry]]() + for name, entry in tarball.contents.pairs: + let path = normalizePathEnd(name) + if not isRootDir(path): + let (head, _) = splitPath(path) + result.withValue(head, value): + value[].add(entry) + do: + result[head] = @[entry] + +proc findRootDir*(tarball: Tarball): ?!string = + var rootDir = "" + for entry in tarball.contents.values: + if entry.kind == ekDirectory: + if isRootDir(entry.name): + return success(entry.name) + failure("No root directory found in tarball") + +proc buildTree*( + root: string, + dirs: Table[string, seq[TarballEntry]], + onProcessedTarDir: OnProcessedTarDir = nil, +): Future[?!TarballTree] {.async: (raises: [CancelledError]).} = + let tree = TarballTree(name: root.lastPathPart, children: @[]) + let entries = dirs.getOrDefault(root) + for entry in entries: + if entry.kind == ekDirectory: + without subTree =? + await buildTree(root = entry.name, dirs = dirs, onProcessedTarDir), err: + return failure(err.msg) + # compute Cid for the subtree + # let cids = subTree.children.mapIt(it.cid) + # if not onProcessedTarDir.isNil: + # without cid =? await onProcessedTarDir(subTree.name, cids), err: + # return failure(err.msg) + # subTree.cid = cid + tree.children.add(subTree) + else: + let child = + TarballTree(name: entry.name.lastPathPart, children: @[], cid: entry.cid) + tree.children.add(child) + let cids = tree.children.mapIt(it.cid) + if not onProcessedTarDir.isNil: + without cid =? await onProcessedTarDir(tree.name, cids), err: + return failure(err.msg) + tree.cid = cid + success(tree) + +proc preorderTraversal*(root: TarballTree, json: JsonNode) = + echo root.name + let jsonObj = newJObject() + jsonObj["name"] = newJString(root.name) + jsonObj["cid"] = newJString($root.cid) + json.add(jsonObj) + if root.children.len > 0: + let jsonArray = newJArray() + jsonObj["children"] = jsonArray + for child in root.children: + preorderTraversal(child, jsonArray) diff --git a/tests/fixtures/tarballs/dir/dir1/file11.txt b/tests/fixtures/tarballs/dir/dir1/file11.txt new file mode 100644 index 00000000..f6854cfe --- /dev/null +++ b/tests/fixtures/tarballs/dir/dir1/file11.txt @@ -0,0 +1 @@ +File 11 diff --git a/tests/fixtures/tarballs/dir/dir1/file12.txt b/tests/fixtures/tarballs/dir/dir1/file12.txt new file mode 100644 index 00000000..e64f6a28 --- /dev/null +++ b/tests/fixtures/tarballs/dir/dir1/file12.txt @@ -0,0 +1 @@ +File 12 diff --git a/tests/fixtures/tarballs/dir/file1.txt b/tests/fixtures/tarballs/dir/file1.txt new file mode 100644 index 00000000..50fcd26d --- /dev/null +++ b/tests/fixtures/tarballs/dir/file1.txt @@ -0,0 +1 @@ +File 1 diff --git a/tests/fixtures/tarballs/dir/file2.txt b/tests/fixtures/tarballs/dir/file2.txt new file mode 100644 index 00000000..4475433e --- /dev/null +++ b/tests/fixtures/tarballs/dir/file2.txt @@ -0,0 +1 @@ +File 2 diff --git a/tests/fixtures/tarballs/testtarbar.tar b/tests/fixtures/tarballs/testtarbar.tar new file mode 100644 index 00000000..3fe06031 Binary files /dev/null and b/tests/fixtures/tarballs/testtarbar.tar differ