mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-02 21:43:11 +00:00
Add manifest
This commit is contained in:
parent
cc662e30e9
commit
6ee348cefd
@ -140,6 +140,10 @@ package main
|
||||
return codex_download_cancel(codexCtx, cid, (CodexCallback) callback, resp);
|
||||
}
|
||||
|
||||
static int cGoCodexDownloadManifest(void* codexCtx, char* cid, void* resp) {
|
||||
return codex_download_manifest(codexCtx, cid, (CodexCallback) callback, resp);
|
||||
}
|
||||
|
||||
static int cGoCodexStart(void* codexCtx, void* resp) {
|
||||
return codex_start(codexCtx, (CodexCallback) callback, resp);
|
||||
}
|
||||
@ -293,11 +297,13 @@ func (c ChunckSize) toSizeT() C.size_t {
|
||||
}
|
||||
|
||||
type CodexDownloadStreamOptions = struct {
|
||||
filepath string
|
||||
chunkSize ChunckSize
|
||||
onProgress OnUploadProgressFunc
|
||||
writer io.Writer
|
||||
local bool
|
||||
filepath string
|
||||
chunkSize ChunckSize
|
||||
onProgress OnUploadProgressFunc
|
||||
writer io.Writer
|
||||
local bool
|
||||
datasetSize int
|
||||
datasetSizeAuto bool
|
||||
}
|
||||
|
||||
type CodexDownloadInitOptions = struct {
|
||||
@ -324,6 +330,15 @@ type bridgeCtx struct {
|
||||
onProgress func(bytes int, chunk []byte)
|
||||
}
|
||||
|
||||
type CodexManifest struct {
|
||||
TreeCid string `json:"treeCid"`
|
||||
DatasetSize int `json:"datasetSize"`
|
||||
BlockSize int `json:"blockSize"`
|
||||
Filename string `json:"filename"`
|
||||
Mimetype string `json:"mimetype"`
|
||||
Protected bool `json:"protected"`
|
||||
}
|
||||
|
||||
func newBridgeCtx() *bridgeCtx {
|
||||
bridge := &bridgeCtx{}
|
||||
bridge.wg = &sync.WaitGroup{}
|
||||
@ -772,10 +787,45 @@ func (self CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone fu
|
||||
}()
|
||||
}
|
||||
|
||||
func (self CodexNode) CodexDownloadManifest(cid string) (CodexManifest, error) {
|
||||
bridge := newBridgeCtx()
|
||||
defer bridge.free()
|
||||
|
||||
var cCid = C.CString(cid)
|
||||
defer C.free(unsafe.Pointer(cCid))
|
||||
|
||||
if C.cGoCodexDownloadManifest(self.ctx, cCid, bridge.resp) != C.RET_OK {
|
||||
return CodexManifest{}, bridge.CallError("cGoCodexDownloadManifest")
|
||||
}
|
||||
|
||||
val, err := bridge.wait()
|
||||
if err != nil {
|
||||
return CodexManifest{}, err
|
||||
}
|
||||
|
||||
var manifest CodexManifest
|
||||
err = json.Unmarshal([]byte(val), &manifest)
|
||||
if err != nil {
|
||||
return CodexManifest{}, err
|
||||
}
|
||||
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
func (self CodexNode) CodexDownloadStream(cid string, options CodexDownloadStreamOptions) error {
|
||||
bridge := newBridgeCtx()
|
||||
defer bridge.free()
|
||||
|
||||
if options.datasetSizeAuto {
|
||||
manifest, err := self.CodexDownloadManifest(cid)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
options.datasetSize = manifest.DatasetSize
|
||||
}
|
||||
|
||||
total := 0
|
||||
bridge.onProgress = func(read int, chunk []byte) {
|
||||
if read == 0 {
|
||||
@ -794,8 +844,10 @@ func (self CodexNode) CodexDownloadStream(cid string, options CodexDownloadStrea
|
||||
total += read
|
||||
|
||||
if options.onProgress != nil {
|
||||
// TODO: retrieve the total size from the manifest of from the options struct
|
||||
percent := 0.0
|
||||
var percent = 0.0
|
||||
if options.datasetSize > 0 {
|
||||
percent = float64(total) / float64(options.datasetSize) * 100.0
|
||||
}
|
||||
|
||||
options.onProgress(read, total, percent, nil)
|
||||
}
|
||||
@ -1107,6 +1159,13 @@ func main() {
|
||||
}
|
||||
|
||||
log.Println("Codex Download Chunk finished. Size:", len(chunk))
|
||||
|
||||
manifest, err := node.CodexDownloadManifest(cid)
|
||||
if err != nil {
|
||||
log.Fatal("Error happened:", err.Error())
|
||||
}
|
||||
|
||||
log.Println("Manifest content:", manifest)
|
||||
// }
|
||||
|
||||
// err = node.CodexConnect(peerId, []string{})
|
||||
|
||||
@ -19,12 +19,14 @@ import std/[options, streams]
|
||||
import chronos
|
||||
import chronicles
|
||||
import libp2p/stream/[lpstream]
|
||||
import serde/json as serde
|
||||
import ../../alloc
|
||||
import ../../../codex/units
|
||||
import ../../../codex/codextypes
|
||||
|
||||
from ../../../codex/codex import CodexServer, node
|
||||
from ../../../codex/node import retrieve
|
||||
from ../../../codex/node import retrieve, fetchManifest
|
||||
from ../../../codex/rest/json import `%`, RestContent
|
||||
from libp2p import Cid, init, `$`
|
||||
|
||||
logScope:
|
||||
@ -35,6 +37,7 @@ type NodeDownloadMsgType* = enum
|
||||
CHUNK
|
||||
STREAM
|
||||
CANCEL
|
||||
MANIFEST
|
||||
|
||||
type OnChunkHandler = proc(bytes: seq[byte]): void {.gcsafe, raises: [].}
|
||||
|
||||
@ -72,6 +75,8 @@ proc createShared*(
|
||||
return ret
|
||||
|
||||
proc destroyShared(self: ptr NodeDownloadRequest) =
|
||||
deallocShared(self[].cid)
|
||||
deallocShared(self[].filepath)
|
||||
deallocShared(self)
|
||||
|
||||
proc init(
|
||||
@ -257,6 +262,23 @@ proc cancel(
|
||||
|
||||
return ok("")
|
||||
|
||||
proc manifest(
|
||||
codex: ptr CodexServer, cCid: cstring
|
||||
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
|
||||
let cid = Cid.init($cCid)
|
||||
if cid.isErr:
|
||||
return err("Failed to fetch manifest: cannot parse cid: " & $cCid)
|
||||
|
||||
try:
|
||||
let node = codex[].node
|
||||
let manifest = await node.fetchManifest(cid.get())
|
||||
if manifest.isErr:
|
||||
return err("Failed to fetch manifest: " & manifest.error.msg)
|
||||
|
||||
return ok(serde.toJson(manifest.get()))
|
||||
except CancelledError:
|
||||
return err("Failed to fetch manifest: download cancelled.")
|
||||
|
||||
proc process*(
|
||||
self: ptr NodeDownloadRequest, codex: ptr CodexServer, onChunk: OnChunkHandler
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
@ -290,3 +312,9 @@ proc process*(
|
||||
error "Failed to CANCEL.", error = res.error
|
||||
return err($res.error)
|
||||
return res
|
||||
of NodeDownloadMsgType.MANIFEST:
|
||||
let res = (await manifest(codex, self.cid))
|
||||
if res.isErr:
|
||||
error "Failed to MANIFEST.", error = res.error
|
||||
return err($res.error)
|
||||
return res
|
||||
|
||||
@ -144,6 +144,12 @@ int codex_download_cancel(
|
||||
CodexCallback callback,
|
||||
void* userData);
|
||||
|
||||
int codex_download_manifest(
|
||||
void* ctx,
|
||||
const char* cid,
|
||||
CodexCallback callback,
|
||||
void* userData);
|
||||
|
||||
int codex_start(void* ctx,
|
||||
CodexCallback callback,
|
||||
void* userData);
|
||||
|
||||
@ -427,6 +427,20 @@ proc codex_download_cancel(
|
||||
|
||||
result = callback.okOrError(res, userData)
|
||||
|
||||
proc codex_download_manifest(
|
||||
ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
initializeLibrary()
|
||||
checkLibcodexParams(ctx, callback, userData)
|
||||
|
||||
let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.MANIFEST, cid = cid)
|
||||
|
||||
let res = codex_context.sendRequestToCodexThread(
|
||||
ctx, RequestType.DOWNLOAD, req, callback, userData
|
||||
)
|
||||
|
||||
result = callback.okOrError(res, userData)
|
||||
|
||||
proc codex_start(
|
||||
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user