mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-03 22:13:12 +00:00
Uploading directories - first draft
This commit is contained in:
parent
b39d541227
commit
14f85a8f8d
@ -25,6 +25,9 @@ import ../../stores/blockstore
|
||||
import ../../logutils
|
||||
import ../../manifest
|
||||
|
||||
# tarballs
|
||||
import ../../tarballs/[directorymanifest, decoding]
|
||||
|
||||
logScope:
|
||||
topics = "codex discoveryengine advertiser"
|
||||
|
||||
@ -66,7 +69,11 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError])
|
||||
return
|
||||
|
||||
without manifest =? Manifest.decode(blk), err:
|
||||
error "Unable to decode as manifest", err = err.msg
|
||||
# Try if it not a directory manifest
|
||||
without manifest =? DirectoryManifest.decode(blk), err:
|
||||
error "Unable to decode as manifest", err = err.msg
|
||||
return
|
||||
await b.addCidToQueue(cid)
|
||||
return
|
||||
|
||||
# announce manifest cid and tree cid
|
||||
|
||||
106
codex/node.nim
106
codex/node.nim
@ -13,6 +13,7 @@ import std/options
|
||||
import std/sequtils
|
||||
import std/strformat
|
||||
import std/sugar
|
||||
import std/streams
|
||||
import times
|
||||
|
||||
import pkg/taskpools
|
||||
@ -47,6 +48,9 @@ import ./logutils
|
||||
import ./utils/asynciter
|
||||
import ./utils/trackedfutures
|
||||
|
||||
# tarball
|
||||
import ./tarballs/[tarballs, encoding, decoding, stdstreamwrapper, directorymanifest]
|
||||
|
||||
export logutils
|
||||
|
||||
logScope:
|
||||
@ -132,6 +136,31 @@ proc fetchManifest*(self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.}
|
||||
|
||||
return manifest.success
|
||||
|
||||
proc fetchDirectoryManifest*(
|
||||
self: CodexNodeRef, cid: Cid
|
||||
): Future[?!DirectoryManifest] {.async.} =
|
||||
## Fetch and decode a manifest block
|
||||
##
|
||||
|
||||
if err =? cid.isManifest.errorOption:
|
||||
return failure "CID has invalid content type for manifest {$cid}"
|
||||
|
||||
trace "Retrieving manifest for cid", cid
|
||||
|
||||
without blk =? await self.networkStore.getBlock(BlockAddress.init(cid)), err:
|
||||
trace "Error retrieve manifest block", cid, err = err.msg
|
||||
return failure err
|
||||
|
||||
trace "Decoding manifest for cid", cid
|
||||
|
||||
without manifest =? DirectoryManifest.decode(blk), err:
|
||||
trace "Unable to decode as manifest", err = err.msg
|
||||
return failure("Unable to decode as manifest")
|
||||
|
||||
trace "Decoded manifest", cid
|
||||
|
||||
manifest.success
|
||||
|
||||
proc findPeer*(self: CodexNodeRef, peerId: PeerId): Future[?PeerRecord] {.async.} =
|
||||
## Find peer using the discovery service from the given CodexNode
|
||||
##
|
||||
@ -486,6 +515,83 @@ proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
|
||||
|
||||
onManifest(cid, manifest)
|
||||
|
||||
proc storeDirectoryManifest*(
|
||||
self: CodexNodeRef, manifest: DirectoryManifest
|
||||
): Future[?!bt.Block] {.async.} =
|
||||
let encodedManifest = manifest.encode()
|
||||
|
||||
without blk =? bt.Block.new(data = encodedManifest, codec = ManifestCodec), error:
|
||||
trace "Unable to create block from manifest"
|
||||
return failure(error)
|
||||
|
||||
if err =? (await self.networkStore.putBlock(blk)).errorOption:
|
||||
trace "Unable to store manifest block", cid = blk.cid, err = err.msg
|
||||
return failure(err)
|
||||
|
||||
success blk
|
||||
|
||||
proc storeTarball*(
|
||||
self: CodexNodeRef, stream: AsyncStreamReader
|
||||
): Future[?!string] {.async.} =
|
||||
info "Storing tarball data"
|
||||
|
||||
# Just as a proof of concept, we process tar bar in memory
|
||||
# Later to see how to do actual streaming to either store
|
||||
# tarball locally in some tmp folder, or to process the
|
||||
# tarball incrementally
|
||||
let tarballBytes = await stream.read()
|
||||
let stream = newStringStream(string.fromBytes(tarballBytes))
|
||||
|
||||
proc onProcessedTarFile(
|
||||
stream: Stream, fileName: string
|
||||
): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
echo "onProcessedTarFile:name: ", fileName
|
||||
let stream = newStdStreamWrapper(stream)
|
||||
await self.store(stream, filename = some fileName)
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError as e:
|
||||
error "Error processing tar file", fileName, exc = e.msg
|
||||
return failure(e.msg)
|
||||
|
||||
proc onProcessedTarDir(
|
||||
name: string, cids: seq[Cid]
|
||||
): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
echo "onProcessedTarDir:name: ", name
|
||||
echo "onProcessedTarDir:cids: ", cids
|
||||
let directoryManifest = newDirectoryManifest(name = name, cids = cids)
|
||||
without manifestBlk =? await self.storeDirectoryManifest(directoryManifest), err:
|
||||
error "Unable to store manifest"
|
||||
return failure(err)
|
||||
manifestBlk.cid.success
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError as e:
|
||||
error "Error processing tar dir", name, exc = e.msg
|
||||
return failure(e.msg)
|
||||
|
||||
let tarball = Tarball()
|
||||
if err =? (await tarball.open(stream, onProcessedTarFile)).errorOption:
|
||||
error "Unable to open tarball", err = err.msg
|
||||
return failure(err)
|
||||
echo "tarball = ", $tarball
|
||||
without root =? tarball.findRootDir(), err:
|
||||
return failure(err.msg)
|
||||
echo "root = ", root
|
||||
let dirs = processDirEntries(tarball)
|
||||
echo "dirs = ", dirs
|
||||
without tree =? (await buildTree(root = root, dirs = dirs, onProcessedTarDir)), err:
|
||||
error "Unable to build tree", err = err.msg
|
||||
return failure(err)
|
||||
echo ""
|
||||
echo "preorderTraversal:"
|
||||
let json = newJArray()
|
||||
preorderTraversal(tree, json)
|
||||
echo "json = ", json
|
||||
success($json)
|
||||
|
||||
proc setupRequest(
|
||||
self: CodexNodeRef,
|
||||
cid: Cid,
|
||||
|
||||
@ -39,6 +39,8 @@ import ../manifest
|
||||
import ../streams/asyncstreamwrapper
|
||||
import ../stores
|
||||
import ../utils/options
|
||||
# tarballs
|
||||
import ../tarballs/directorymanifest
|
||||
|
||||
import ./coders
|
||||
import ./json
|
||||
@ -52,6 +54,11 @@ declareCounter(codex_api_downloads, "codex API downloads")
|
||||
proc validate(pattern: string, value: string): int {.gcsafe, raises: [Defect].} =
|
||||
0
|
||||
|
||||
proc formatDirectoryManifest(
|
||||
cid: Cid, manifest: DirectoryManifest
|
||||
): RestDirectoryContent =
|
||||
return RestDirectoryContent.init(cid, manifest)
|
||||
|
||||
proc formatManifest(cid: Cid, manifest: Manifest): RestContent =
|
||||
return RestContent.init(cid, manifest)
|
||||
|
||||
@ -179,7 +186,76 @@ proc getFilenameFromContentDisposition(contentDisposition: string): ?string =
|
||||
return filename[0 ..^ 2].some
|
||||
|
||||
proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) =
|
||||
let allowedOrigin = router.allowedOrigin # prevents capture inside of api defintion
|
||||
let allowedOrigin = router.allowedOrigin # prevents capture inside of api definition
|
||||
|
||||
router.api(MethodOptions, "/api/codex/v1/tar") do(
|
||||
resp: HttpResponseRef
|
||||
) -> RestApiResponse:
|
||||
if corsOrigin =? allowedOrigin:
|
||||
resp.setCorsHeaders("POST", corsOrigin)
|
||||
resp.setHeader(
|
||||
"Access-Control-Allow-Headers", "content-type, content-disposition"
|
||||
)
|
||||
|
||||
resp.status = Http204
|
||||
await resp.sendBody("")
|
||||
|
||||
router.rawApi(MethodPost, "/api/codex/v1/tar") do() -> RestApiResponse:
|
||||
## Upload a file in a streaming manner
|
||||
##
|
||||
|
||||
trace "Handling upload of a tar file"
|
||||
var bodyReader = request.getBodyReader()
|
||||
if bodyReader.isErr():
|
||||
return RestApiResponse.error(Http500, msg = bodyReader.error())
|
||||
|
||||
# Attempt to handle `Expect` header
|
||||
# some clients (curl), wait 1000ms
|
||||
# before giving up
|
||||
#
|
||||
await request.handleExpect()
|
||||
|
||||
var mimetype = request.headers.getString(ContentTypeHeader).some
|
||||
|
||||
if mimetype.get() != "":
|
||||
let mimetypeVal = mimetype.get()
|
||||
var m = newMimetypes()
|
||||
let extension = m.getExt(mimetypeVal, "")
|
||||
if extension == "":
|
||||
return RestApiResponse.error(
|
||||
Http422, "The MIME type '" & mimetypeVal & "' is not valid."
|
||||
)
|
||||
else:
|
||||
mimetype = string.none
|
||||
|
||||
const ContentDispositionHeader = "Content-Disposition"
|
||||
let contentDisposition = request.headers.getString(ContentDispositionHeader)
|
||||
let filename = getFilenameFromContentDisposition(contentDisposition)
|
||||
|
||||
if filename.isSome and not isValidFilename(filename.get()):
|
||||
return RestApiResponse.error(Http422, "The filename is not valid.")
|
||||
|
||||
# Here we could check if the extension matches the filename if needed
|
||||
|
||||
let reader = bodyReader.get()
|
||||
let stream = AsyncStreamReader(reader)
|
||||
|
||||
try:
|
||||
without json =? (await node.storeTarball(stream = stream)), error:
|
||||
error "Error uploading tarball", exc = error.msg
|
||||
return RestApiResponse.error(Http500, error.msg)
|
||||
|
||||
codex_api_uploads.inc()
|
||||
trace "Uploaded tarball", result = json
|
||||
return RestApiResponse.response(json, contentType = "application/json")
|
||||
except CancelledError:
|
||||
trace "Upload cancelled error"
|
||||
return RestApiResponse.error(Http500)
|
||||
except AsyncStreamError:
|
||||
trace "Async stream error"
|
||||
return RestApiResponse.error(Http500)
|
||||
finally:
|
||||
await reader.closeWait()
|
||||
|
||||
router.api(MethodOptions, "/api/codex/v1/data") do(
|
||||
resp: HttpResponseRef
|
||||
@ -363,6 +439,24 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
|
||||
let json = %formatManifest(cid.get(), manifest)
|
||||
return RestApiResponse.response($json, contentType = "application/json")
|
||||
|
||||
router.api(MethodGet, "/api/codex/v1/data/{cid}/network/dirmanifest") do(
|
||||
cid: Cid, resp: HttpResponseRef
|
||||
) -> RestApiResponse:
|
||||
## Download only the directory manifest.
|
||||
##
|
||||
|
||||
var headers = buildCorsHeaders("GET", allowedOrigin)
|
||||
|
||||
if cid.isErr:
|
||||
return RestApiResponse.error(Http400, $cid.error(), headers = headers)
|
||||
|
||||
without manifest =? (await node.fetchDirectoryManifest(cid.get())), err:
|
||||
error "Failed to fetch directory manifest", err = err.msg
|
||||
return RestApiResponse.error(Http404, err.msg, headers = headers)
|
||||
|
||||
let json = %formatDirectoryManifest(cid.get(), manifest)
|
||||
return RestApiResponse.response($json, contentType = "application/json")
|
||||
|
||||
router.api(MethodGet, "/api/codex/v1/space") do() -> RestApiResponse:
|
||||
let json =
|
||||
%RestRepoStore(
|
||||
|
||||
@ -9,6 +9,8 @@ import ../utils/json
|
||||
import ../manifest
|
||||
import ../units
|
||||
|
||||
import ../tarballs/directorymanifest
|
||||
|
||||
export json
|
||||
|
||||
type
|
||||
@ -47,6 +49,10 @@ type
|
||||
cid* {.serialize.}: Cid
|
||||
manifest* {.serialize.}: Manifest
|
||||
|
||||
RestDirectoryContent* = object
|
||||
cid* {.serialize.}: Cid
|
||||
manifest* {.serialize.}: DirectoryManifest
|
||||
|
||||
RestContentList* = object
|
||||
content* {.serialize.}: seq[RestContent]
|
||||
|
||||
@ -81,6 +87,11 @@ proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList
|
||||
proc init*(_: type RestContent, cid: Cid, manifest: Manifest): RestContent =
|
||||
RestContent(cid: cid, manifest: manifest)
|
||||
|
||||
proc init*(
|
||||
_: type RestDirectoryContent, cid: Cid, manifest: DirectoryManifest
|
||||
): RestDirectoryContent =
|
||||
RestDirectoryContent(cid: cid, manifest: manifest)
|
||||
|
||||
proc init*(_: type RestNode, node: dn.Node): RestNode =
|
||||
RestNode(
|
||||
nodeId: RestNodeId.init(node.id),
|
||||
|
||||
60
codex/tarballs/decoding.nim
Normal file
60
codex/tarballs/decoding.nim
Normal file
@ -0,0 +1,60 @@
|
||||
## Nim-Codex
|
||||
## Copyright (c) 2021 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import pkg/libp2p/cid
|
||||
import pkg/libp2p/multihash
|
||||
import pkg/libp2p/protobuf/minprotobuf
|
||||
|
||||
import pkg/questionable/results
|
||||
|
||||
import ../blocktype
|
||||
import ./directorymanifest
|
||||
|
||||
func decode*(_: type DirectoryManifest, data: openArray[byte]): ?!DirectoryManifest =
|
||||
# ```protobuf
|
||||
# Message DirectoryManifest {
|
||||
# Message Cid {
|
||||
# bytes data = 1;
|
||||
# }
|
||||
# string name = 1;
|
||||
# repeated Cid cids = 2;
|
||||
# ```
|
||||
|
||||
var
|
||||
pbNode = initProtoBuffer(data)
|
||||
pbInfo: ProtoBuffer
|
||||
name: string
|
||||
cids: seq[Cid]
|
||||
cidsBytes: seq[seq[byte]]
|
||||
|
||||
if pbNode.getField(1, name).isErr:
|
||||
return failure("Unable to decode `name` from DirectoryManifest")
|
||||
|
||||
if ?pbNode.getRepeatedField(2, cidsBytes).mapFailure:
|
||||
for cidEntry in cidsBytes:
|
||||
var pbCid = initProtoBuffer(cidEntry)
|
||||
var dataBuf = newSeq[byte]()
|
||||
if pbCid.getField(1, dataBuf).isErr:
|
||||
return failure("Unable to decode piece `data` to Cid")
|
||||
without cid =? Cid.init(dataBuf).mapFailure, err:
|
||||
return failure(err.msg)
|
||||
cids.add(cid)
|
||||
|
||||
DirectoryManifest(name: name, cids: cids).success
|
||||
|
||||
func decode*(_: type DirectoryManifest, blk: Block): ?!DirectoryManifest =
|
||||
## Decode a directory manifest using `decoder`
|
||||
##
|
||||
|
||||
if not ?blk.cid.isManifest:
|
||||
return failure "Cid is not a Directory Manifest Cid"
|
||||
|
||||
DirectoryManifest.decode(blk.data)
|
||||
34
codex/tarballs/directorymanifest.nim
Normal file
34
codex/tarballs/directorymanifest.nim
Normal file
@ -0,0 +1,34 @@
|
||||
## Nim-Codex
|
||||
## Copyright (c) 2021 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import pkg/libp2p/cid
|
||||
# import pkg/stew/byteutils
|
||||
# import pkg/questionable
|
||||
# import pkg/questionable/results
|
||||
|
||||
# import ../merkletree/codex/codex
|
||||
import ../utils/json
|
||||
|
||||
# import ../errors
|
||||
# import ../codextypes
|
||||
|
||||
type DirectoryManifest* = ref object
|
||||
name* {.serialize.}: string
|
||||
cids* {.serialize.}: seq[Cid]
|
||||
|
||||
proc `$`*(self: DirectoryManifest): string =
|
||||
"DirectoryManifest(name: " & self.name & ", cids: " & $self.cids & ")"
|
||||
|
||||
func `==`*(a: DirectoryManifest, b: DirectoryManifest): bool =
|
||||
a.name == b.name and a.cids == b.cids
|
||||
|
||||
proc newDirectoryManifest*(name: string, cids: seq[Cid]): DirectoryManifest =
|
||||
DirectoryManifest(name: name, cids: cids)
|
||||
39
codex/tarballs/encoding.nim
Normal file
39
codex/tarballs/encoding.nim
Normal file
@ -0,0 +1,39 @@
|
||||
## Nim-Codex
|
||||
## Copyright (c) 2021 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import pkg/libp2p/cid
|
||||
import pkg/libp2p/protobuf/minprotobuf
|
||||
|
||||
import ./directorymanifest
|
||||
|
||||
proc write(pb: var ProtoBuffer, field: int, value: Cid) =
|
||||
var ipb = initProtoBuffer()
|
||||
ipb.write(1, value.data.buffer)
|
||||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
|
||||
proc encode*(manifest: DirectoryManifest): seq[byte] =
|
||||
# ```protobuf
|
||||
# Message DirectoryManifest {
|
||||
# Message Cid {
|
||||
# bytes data = 1;
|
||||
# }
|
||||
#
|
||||
# string name = 1;
|
||||
# repeated Cid cids = 2;
|
||||
# ```
|
||||
|
||||
var ipb = initProtoBuffer()
|
||||
ipb.write(1, manifest.name)
|
||||
for cid in manifest.cids:
|
||||
ipb.write(2, cid)
|
||||
ipb.finish()
|
||||
ipb.buffer
|
||||
72
codex/tarballs/stdstreamwrapper.nim
Normal file
72
codex/tarballs/stdstreamwrapper.nim
Normal file
@ -0,0 +1,72 @@
|
||||
## Nim-LibP2P
|
||||
## Copyright (c) 2019 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/streams
|
||||
import pkg/libp2p
|
||||
import pkg/chronos
|
||||
|
||||
import ../logutils
|
||||
|
||||
logScope:
|
||||
topics = "libp2p stdstreamwrapper"
|
||||
|
||||
const StdStreamWrapperName* = "StdStreamWrapper"
|
||||
|
||||
type StdStreamWrapper* = ref object of LPStream
|
||||
stream*: Stream
|
||||
|
||||
method initStream*(self: StdStreamWrapper) =
|
||||
if self.objName.len == 0:
|
||||
self.objName = StdStreamWrapperName
|
||||
|
||||
procCall LPStream(self).initStream()
|
||||
|
||||
proc newStdStreamWrapper*(stream: Stream = nil): StdStreamWrapper =
|
||||
let stream = StdStreamWrapper(stream: stream)
|
||||
|
||||
stream.initStream()
|
||||
return stream
|
||||
|
||||
template withExceptions(body: untyped) =
|
||||
try:
|
||||
body
|
||||
except CatchableError as exc:
|
||||
raise newException(Defect, "Unexpected error in StdStreamWrapper", exc)
|
||||
|
||||
method readOnce*(
|
||||
self: StdStreamWrapper, pbytes: pointer, nbytes: int
|
||||
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
trace "Reading bytes from stream", bytes = nbytes
|
||||
if isNil(self.stream):
|
||||
error "StdStreamWrapper: stream is nil"
|
||||
raiseAssert("StdStreamWrapper: stream is nil")
|
||||
|
||||
if self.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
withExceptions:
|
||||
return self.stream.readData(pbytes, nbytes)
|
||||
|
||||
method atEof*(self: StdStreamWrapper): bool =
|
||||
withExceptions:
|
||||
return self.stream.atEnd()
|
||||
|
||||
method closeImpl*(self: StdStreamWrapper) {.async: (raises: []).} =
|
||||
try:
|
||||
trace "Shutting down std stream"
|
||||
|
||||
self.stream.close()
|
||||
|
||||
trace "Shutdown async chronos stream"
|
||||
except CatchableError as exc:
|
||||
trace "Error closing std stream", msg = exc.msg
|
||||
|
||||
await procCall LPStream(self).closeImpl()
|
||||
278
codex/tarballs/tarballs.nim
Normal file
278
codex/tarballs/tarballs.nim
Normal file
@ -0,0 +1,278 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/os
|
||||
import std/times
|
||||
import std/strutils
|
||||
import std/strformat
|
||||
import std/sequtils
|
||||
import std/streams
|
||||
import std/tables
|
||||
|
||||
import std/random
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/questionable/results
|
||||
import pkg/libp2p/[cid, multicodec, multihash]
|
||||
import pkg/serde/json
|
||||
|
||||
import ../blocktype
|
||||
|
||||
proc example2*(_: type Block, size: int = 4096): ?!Block =
|
||||
let length = rand(size)
|
||||
let bytes = newSeqWith(length, rand(uint8))
|
||||
Block.new(bytes)
|
||||
|
||||
proc example2*(_: type Cid): ?!Cid =
|
||||
Block.example2 .? cid
|
||||
|
||||
const
|
||||
TUREAD* = 0o00400'u32 # read by owner */
|
||||
TUWRITE* = 0o00200'u32 # write by owner */
|
||||
TUEXEC* = 0o00100'u32 # execute/search by owner */
|
||||
TGREAD* = 0o00040'u32 # read by group */
|
||||
TGWRITE* = 0o00020'u32 # write by group */
|
||||
TGEXEC* = 0o00010'u32 # execute/search by group */
|
||||
TOREAD* = 0o00004'u32 # read by other */
|
||||
TOWRITE* = 0o00002'u32 # write by other */
|
||||
TOEXEC* = 0o00001'u32 # execute/search by other */
|
||||
|
||||
type
|
||||
EntryKind* = enum
|
||||
ekNormalFile = '0'
|
||||
ekDirectory = '5'
|
||||
|
||||
TarballEntry* = object
|
||||
kind*: EntryKind
|
||||
name: string
|
||||
cid: Cid
|
||||
contentLength: int
|
||||
lastModified*: times.Time
|
||||
permissions*: set[FilePermission]
|
||||
|
||||
Tarball* = ref object
|
||||
contents*: OrderedTable[string, TarballEntry]
|
||||
|
||||
TarballError* = object of ValueError
|
||||
|
||||
TarballTree* = ref object
|
||||
name*: string
|
||||
cid*: Cid
|
||||
children*: seq[TarballTree]
|
||||
|
||||
# ToDo: make sure we also record files permissions, modification time, etc...
|
||||
# For now, only fileName so that we do not have to change the Codex manifest
|
||||
# right away
|
||||
OnProcessedTarFile* = proc(stream: Stream, fileName: string): Future[?!Cid] {.
|
||||
gcsafe, async: (raises: [CancelledError])
|
||||
.}
|
||||
|
||||
OnProcessedTarDir* = proc(name: string, cids: seq[Cid]): Future[?!Cid] {.
|
||||
gcsafe, async: (raises: [CancelledError])
|
||||
.}
|
||||
|
||||
proc `$`*(tarball: Tarball): string =
|
||||
result = "Tarball with " & $tarball.contents.len & " entries"
|
||||
for name, entry in tarball.contents.pairs:
|
||||
var lastModified: string = "(unknown)"
|
||||
try:
|
||||
let lastModified = $entry.lastModified
|
||||
except TimeFormatParseError:
|
||||
discard
|
||||
result.add(
|
||||
"\n " &
|
||||
fmt"{name}: name = {entry.name}, {entry.kind} ({entry.contentLength} bytes) @ {lastModified} [{entry.cid}]"
|
||||
)
|
||||
|
||||
proc `$`*(tarballEntry: TarballEntry): string =
|
||||
## Returns a string representation of the tarball entry.
|
||||
result = fmt"({tarballEntry.kind}, {tarballEntry.name})"
|
||||
|
||||
proc parseFilePermissions(permissions: uint32): set[FilePermission] =
|
||||
if defined(windows) or permissions == 0:
|
||||
# Ignore file permissions on Windows. If they are absent (.zip made on
|
||||
# Windows for example), set default permissions.
|
||||
result.incl fpUserRead
|
||||
result.incl fpUserWrite
|
||||
result.incl fpGroupRead
|
||||
result.incl fpOthersRead
|
||||
else:
|
||||
if (permissions and TUREAD) != 0:
|
||||
result.incl(fpUserRead)
|
||||
if (permissions and TUWRITE) != 0:
|
||||
result.incl(fpUserWrite)
|
||||
if (permissions and TUEXEC) != 0:
|
||||
result.incl(fpUserExec)
|
||||
if (permissions and TGREAD) != 0:
|
||||
result.incl(fpGroupRead)
|
||||
if (permissions and TGWRITE) != 0:
|
||||
result.incl(fpGroupWrite)
|
||||
if (permissions and TGEXEC) != 0:
|
||||
result.incl(fpGroupExec)
|
||||
if (permissions and TOREAD) != 0:
|
||||
result.incl(fpOthersRead)
|
||||
if (permissions and TOWRITE) != 0:
|
||||
result.incl(fpOthersWrite)
|
||||
if (permissions and TOEXEC) != 0:
|
||||
result.incl(fpOthersExec)
|
||||
|
||||
proc toUnixPath(path: string): string =
|
||||
path.replace('\\', '/')
|
||||
|
||||
proc clear*(tarball: Tarball) =
|
||||
tarball.contents.clear()
|
||||
|
||||
proc openStreamImpl(
|
||||
tarball: Tarball, stream: Stream, onProcessedTarFile: OnProcessedTarFile = nil
|
||||
): Future[?!void] {.async: (raises: []).} =
|
||||
tarball.clear()
|
||||
|
||||
proc trim(s: string): string =
|
||||
for i in 0 ..< s.len:
|
||||
if s[i] == '\0':
|
||||
return s[0 ..< i]
|
||||
s
|
||||
|
||||
try:
|
||||
var data = stream.readAll() # TODO: actually treat as a stream
|
||||
|
||||
var pos: int
|
||||
while pos < data.len:
|
||||
if pos + 512 > data.len:
|
||||
return failure("Attempted to read past end of file, corrupted tarball?")
|
||||
|
||||
let
|
||||
header = data[pos ..< pos + 512]
|
||||
fileName = header[0 ..< 100].trim()
|
||||
|
||||
pos += 512
|
||||
|
||||
if fileName.len == 0:
|
||||
continue
|
||||
|
||||
let
|
||||
fileSize =
|
||||
try:
|
||||
parseOctInt(header[124 .. 134])
|
||||
except ValueError:
|
||||
raise newException(TarballError, "Unexpected error while opening tarball")
|
||||
lastModified =
|
||||
try:
|
||||
parseOctInt(header[136 .. 146])
|
||||
except ValueError:
|
||||
raise newException(TarballError, "Unexpected error while opening tarball")
|
||||
typeFlag = header[156]
|
||||
fileMode =
|
||||
try:
|
||||
parseOctInt(header[100 ..< 106])
|
||||
except ValueError:
|
||||
raise newException(
|
||||
TarballError, "Unexpected error while opening tarball (mode)"
|
||||
)
|
||||
fileNamePrefix =
|
||||
if header[257 ..< 263] == "ustar\0":
|
||||
header[345 ..< 500].trim()
|
||||
else:
|
||||
""
|
||||
|
||||
if pos + fileSize > data.len:
|
||||
return failure("Attempted to read past end of file, corrupted tarball?")
|
||||
|
||||
let normalizedFileName = normalizePathEnd(fileName)
|
||||
if typeFlag == '0' or typeFlag == '\0':
|
||||
if not onProcessedTarFile.isNil:
|
||||
let stream = newStringStream(data[pos ..< pos + fileSize])
|
||||
without cid =? await onProcessedTarFile(stream, normalizedFileName), err:
|
||||
return failure(err.msg)
|
||||
tarball.contents[(fileNamePrefix / fileName).toUnixPath()] = TarballEntry(
|
||||
kind: ekNormalFile,
|
||||
name: normalizedFileName,
|
||||
contentLength: fileSize,
|
||||
cid: cid,
|
||||
lastModified: initTime(lastModified, 0),
|
||||
permissions: parseFilePermissions(cast[uint32](fileMode)),
|
||||
)
|
||||
elif typeFlag == '5':
|
||||
tarball.contents[normalizePathEnd((fileNamePrefix / fileName).toUnixPath())] = TarballEntry(
|
||||
kind: ekDirectory,
|
||||
name: normalizedFileName,
|
||||
lastModified: initTime(lastModified, 0),
|
||||
permissions: parseFilePermissions(cast[uint32](fileMode)),
|
||||
)
|
||||
|
||||
# Move pos by fileSize, where fileSize is 512 byte aligned
|
||||
pos += (fileSize + 511) and not 511
|
||||
success()
|
||||
except CatchableError as e:
|
||||
return failure(e.msg)
|
||||
|
||||
proc open*(
|
||||
tarball: Tarball, bytes: string, onProcessedTarFile: OnProcessedTarFile = nil
|
||||
): Future[?!void] {.async: (raw: true, raises: []).} =
|
||||
let stream = newStringStream(bytes)
|
||||
tarball.openStreamImpl(stream, onProcessedTarFile)
|
||||
|
||||
proc open*(
|
||||
tarball: Tarball, stream: Stream, onProcessedTarFile: OnProcessedTarFile = nil
|
||||
): Future[?!void] {.async: (raw: true, raises: []).} =
|
||||
tarball.openStreamImpl(stream, onProcessedTarFile)
|
||||
|
||||
proc processDirEntries*(tarball: Tarball): Table[string, seq[TarballEntry]] =
|
||||
result = initTable[string, seq[TarballEntry]]()
|
||||
for name, entry in tarball.contents.pairs:
|
||||
let path = normalizePathEnd(name)
|
||||
if not isRootDir(path):
|
||||
let (head, _) = splitPath(path)
|
||||
result.withValue(head, value):
|
||||
value[].add(entry)
|
||||
do:
|
||||
result[head] = @[entry]
|
||||
|
||||
proc findRootDir*(tarball: Tarball): ?!string =
|
||||
var rootDir = ""
|
||||
for entry in tarball.contents.values:
|
||||
if entry.kind == ekDirectory:
|
||||
if isRootDir(entry.name):
|
||||
return success(entry.name)
|
||||
failure("No root directory found in tarball")
|
||||
|
||||
proc buildTree*(
|
||||
root: string,
|
||||
dirs: Table[string, seq[TarballEntry]],
|
||||
onProcessedTarDir: OnProcessedTarDir = nil,
|
||||
): Future[?!TarballTree] {.async: (raises: [CancelledError]).} =
|
||||
let tree = TarballTree(name: root.lastPathPart, children: @[])
|
||||
let entries = dirs.getOrDefault(root)
|
||||
for entry in entries:
|
||||
if entry.kind == ekDirectory:
|
||||
without subTree =?
|
||||
await buildTree(root = entry.name, dirs = dirs, onProcessedTarDir), err:
|
||||
return failure(err.msg)
|
||||
# compute Cid for the subtree
|
||||
# let cids = subTree.children.mapIt(it.cid)
|
||||
# if not onProcessedTarDir.isNil:
|
||||
# without cid =? await onProcessedTarDir(subTree.name, cids), err:
|
||||
# return failure(err.msg)
|
||||
# subTree.cid = cid
|
||||
tree.children.add(subTree)
|
||||
else:
|
||||
let child =
|
||||
TarballTree(name: entry.name.lastPathPart, children: @[], cid: entry.cid)
|
||||
tree.children.add(child)
|
||||
let cids = tree.children.mapIt(it.cid)
|
||||
if not onProcessedTarDir.isNil:
|
||||
without cid =? await onProcessedTarDir(tree.name, cids), err:
|
||||
return failure(err.msg)
|
||||
tree.cid = cid
|
||||
success(tree)
|
||||
|
||||
proc preorderTraversal*(root: TarballTree, json: JsonNode) =
|
||||
echo root.name
|
||||
let jsonObj = newJObject()
|
||||
jsonObj["name"] = newJString(root.name)
|
||||
jsonObj["cid"] = newJString($root.cid)
|
||||
json.add(jsonObj)
|
||||
if root.children.len > 0:
|
||||
let jsonArray = newJArray()
|
||||
jsonObj["children"] = jsonArray
|
||||
for child in root.children:
|
||||
preorderTraversal(child, jsonArray)
|
||||
1
tests/fixtures/tarballs/dir/dir1/file11.txt
vendored
Normal file
1
tests/fixtures/tarballs/dir/dir1/file11.txt
vendored
Normal file
@ -0,0 +1 @@
|
||||
File 11
|
||||
1
tests/fixtures/tarballs/dir/dir1/file12.txt
vendored
Normal file
1
tests/fixtures/tarballs/dir/dir1/file12.txt
vendored
Normal file
@ -0,0 +1 @@
|
||||
File 12
|
||||
1
tests/fixtures/tarballs/dir/file1.txt
vendored
Normal file
1
tests/fixtures/tarballs/dir/file1.txt
vendored
Normal file
@ -0,0 +1 @@
|
||||
File 1
|
||||
1
tests/fixtures/tarballs/dir/file2.txt
vendored
Normal file
1
tests/fixtures/tarballs/dir/file2.txt
vendored
Normal file
@ -0,0 +1 @@
|
||||
File 2
|
||||
BIN
tests/fixtures/tarballs/testtarbar.tar
vendored
Normal file
BIN
tests/fixtures/tarballs/testtarbar.tar
vendored
Normal file
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user