diff --git a/.gitignore b/.gitignore index 2b32cdb..dbe5894 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,8 @@ *.out # Generated dependencies and cache -nimcache \ No newline at end of file +nimcache + +# Test files +codex/testdata/hello.downloaded.txt +codex/testdata/hello.downloaded.writer.txt \ No newline at end of file diff --git a/codex/download.go b/codex/download.go new file mode 100644 index 0000000..b708824 --- /dev/null +++ b/codex/download.go @@ -0,0 +1,272 @@ +package codex + +/* + #include "bridge.h" + #include + + static int cGoCodexDownloadInit(void* codexCtx, char* cid, size_t chunkSize, bool local, void* resp) { + return codex_download_init(codexCtx, cid, chunkSize, local, (CodexCallback) callback, resp); + } + + static int cGoCodexDownloadChunk(void* codexCtx, char* cid, void* resp) { + return codex_download_chunk(codexCtx, cid, (CodexCallback) callback, resp); + } + + static int cGoCodexDownloadStream(void* codexCtx, char* cid, size_t chunkSize, bool local, const char* filepath, void* resp) { + return codex_download_stream(codexCtx, cid, chunkSize, local, filepath, (CodexCallback) callback, resp); + } + + static int cGoCodexDownloadCancel(void* codexCtx, char* cid, void* resp) { + return codex_download_cancel(codexCtx, cid, (CodexCallback) callback, resp); + } + + static int cGoCodexDownloadManifest(void* codexCtx, char* cid, void* resp) { + return codex_download_manifest(codexCtx, cid, (CodexCallback) callback, resp); + } +*/ +import "C" +import ( + "encoding/json" + "io" + "unsafe" +) + +type onDownloadProgressFunc func(read, total int, percent float64, err error) + +// DownloadStreamOptions is used to download a file +// in a streaming manner in Codex. +type DownloadStreamOptions = struct { + // filepath is the path destination used by DownloadStream. + // If it is set, the content will be written into the specified + // path. + filepath string + + // chunkSize is the size of each downloaded chunk. Default is to 64 KB. + chunkSize chunckSize + + // onProgress is a callback function that is called after each chunk is download with: + // - read: the number of bytes downloaded for the last chunk. + // - total: the total number of bytes downloaded so far. + // - percent: the percentage of the total file size that has been downloaded. It is + // determined from `datasetSize`. + // - err: an error, if one occurred. + onProgress onDownloadProgressFunc + + // writer is the path destination used by DownloadStream. + // If it is set, the content will be written into the specified + // writer. + writer io.Writer + + // local defines the way to download the content. + // If true, the content will be downloaded from the + // local node. + // If false (default), the content will be downloaded + // from the network. + local bool + + // datasetSize is the total size of the dataset being downloaded. + datasetSize int + + // datasetSizeAuto if true, will fetch the manifest before starting + // the downloaded to retrive the size of the data. + datasetSizeAuto bool +} + +// DownloadInitOptions is used to create a download session. +type DownloadInitOptions = struct { + // local defines the way to download the content. + // If true, the content will be downloaded from the + // local node. + // If false (default), the content will be downloaded + // from the network. + local bool + + // chunkSize is the size of each downloaded chunk. Default is to 64 KB. + chunkSize chunckSize +} + +// Manifest is the object containing the information of +// a file in Codex. +type Manifest struct { + // Cid is the content identifier over the network + Cid string + + // TreeCid is the root of the merkle tree + TreeCid string `json:"treeCid"` + + // DatasetSize is the total size of all blocks + DatasetSize int `json:"datasetSize"` + + // BlockSize is the size of each contained block + BlockSize int `json:"blockSize"` + + // Filename is the name of the file (optional) + Filename string `json:"filename"` + + // Mimetype is the MIME type of the file (optional) + Mimetype string `json:"mimetype"` + + // Protected datasets have erasure coded info + Protected bool `json:"protected"` +} + +// DownloadManifest retrieves the Codex manifest from its cid. +// The session identifier is the cid, i.e you cannot have multiple +// sessions for a cid. +func (node CodexNode) DownloadManifest(cid string) (Manifest, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + if C.cGoCodexDownloadManifest(node.ctx, cCid, bridge.resp) != C.RET_OK { + return Manifest{}, bridge.callError("cGoCodexDownloadManifest") + } + + val, err := bridge.wait() + if err != nil { + return Manifest{}, err + } + + manifest := Manifest{Cid: cid} + err = json.Unmarshal([]byte(val), &manifest) + if err != nil { + return Manifest{}, err + } + + return manifest, nil +} + +// DownloadStream download the data corresponding to a cid. +// If options.datasetSizeAuto is true, the manifest will be fetched first +// to get the dataset size. +// If options.filepath is set, the data will be written into that path. +// If options.writer is set, the data will be written into that writer. +// The options filepath and writer are not mutually exclusive, i.e you can write +// in different places in a same call. +func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions) error { + bridge := newBridgeCtx() + defer bridge.free() + + if options.datasetSizeAuto { + manifest, err := node.DownloadManifest(cid) + + if err != nil { + return err + } + + options.datasetSize = manifest.DatasetSize + } + + total := 0 + bridge.onProgress = func(read int, chunk []byte) { + if read == 0 { + return + } + + if options.writer != nil { + w := options.writer + if _, err := w.Write(chunk); err != nil { + if options.onProgress != nil { + options.onProgress(0, 0, 0.0, err) + } + } + } + + total += read + + if options.onProgress != nil { + var percent = 0.0 + if options.datasetSize > 0 { + percent = float64(total) / float64(options.datasetSize) * 100.0 + } + + options.onProgress(read, total, percent, nil) + } + } + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + var cFilepath = C.CString(options.filepath) + defer C.free(unsafe.Pointer(cFilepath)) + + var cLocal = C.bool(options.local) + + if C.cGoCodexDownloadStream(node.ctx, cCid, options.chunkSize.toSizeT(), cLocal, cFilepath, bridge.resp) != C.RET_OK { + return bridge.callError("cGoCodexDownloadLocal") + } + + _, err := bridge.wait() + return err +} + +// DownloadInit initializes the download process for a specific CID. +// This method should be used if you want to manage the download session +// and the chunk downloads manually. +func (node CodexNode) DownloadInit(cid string, options DownloadInitOptions) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + var cLocal = C.bool(options.local) + + if C.cGoCodexDownloadInit(node.ctx, cCid, options.chunkSize.toSizeT(), cLocal, bridge.resp) != C.RET_OK { + return bridge.callError("cGoCodexDownloadInit") + } + + _, err := bridge.wait() + return err +} + +// DownloadChunk downloads a chunk from its cid. +// You HAVE TO call `DownloadInit` before using this method. +// When using this method, you are managing at your own +// the total size downloaded (use DownloadManifest to get the +// datasetSize). +// When the download is complete, you need to call `CodexDownloadCancel` +// to free the resources. +func (node CodexNode) DownloadChunk(cid string) ([]byte, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var bytes []byte + + bridge.onProgress = func(read int, chunk []byte) { + bytes = chunk + } + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + if C.cGoCodexDownloadChunk(node.ctx, cCid, bridge.resp) != C.RET_OK { + return nil, bridge.callError("cGoCodexDownloadChunk") + } + + if _, err := bridge.wait(); err != nil { + return nil, err + } + + return bytes, nil +} + +// DownloadCancel cancels a download session. +// It can be only if the download session is managed manually. +// It doesn't work with DownloadStream. +func (node CodexNode) DownloadCancel(cid string) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + if C.cGoCodexDownloadCancel(node.ctx, cCid, bridge.resp) != C.RET_OK { + return bridge.callError("cGoCodexDownloadCancel") + } + + _, err := bridge.wait() + return err +} diff --git a/codex/download_test.go b/codex/download_test.go new file mode 100644 index 0000000..0827da3 --- /dev/null +++ b/codex/download_test.go @@ -0,0 +1,128 @@ +package codex + +import ( + "bytes" + "os" + "strings" + "testing" +) + +func uploadHelper(t *testing.T, codex *CodexNode) (string, int) { + t.Helper() + + buf := bytes.NewBuffer([]byte("Hello World!")) + len := buf.Len() + cid, err := codex.UploadReader(UploadOptions{filepath: "hello.txt"}, buf) + if err != nil { + t.Fatalf("Error happened during upload: %v\n", err) + } + + return cid, len +} + +func TestDownloadStream(t *testing.T) { + start := true + codex := newCodexNode(t, start) + cid, len := uploadHelper(t, codex) + + f, err := os.Create("testdata/hello.downloaded.txt") + if err != nil { + t.Fatal(err) + } + defer f.Close() + + totalBytes := 0 + finalPercent := 0.0 + opt := DownloadStreamOptions{ + writer: f, + datasetSize: len, + filepath: "testdata/hello.downloaded.writer.txt", + onProgress: func(read, total int, percent float64, err error) { + if err != nil { + t.Fatalf("Error happening during download: %v\n", err) + } + + totalBytes = total + finalPercent = percent + }, + } + + if err := codex.DownloadStream(cid, opt); err != nil { + t.Fatal("Error happened:", err.Error()) + } + + if finalPercent != 100.0 { + t.Fatalf("UploadReader progress callback final percent %.2f but expected 100.0", finalPercent) + } + + if totalBytes != len { + t.Fatalf("UploadReader progress callback total bytes %d but expected %d", totalBytes, len) + } + + data, err := os.ReadFile("testdata/hello.writer.txt") + if err != nil { + t.Fatal(err) + } + + if string(data) != "Hello World!" { + t.Fatalf("Downloaded content does not match, expected Hello World! got %s", data) + } +} + +func TestDownloadStreamWithAutosize(t *testing.T) { + start := true + codex := newCodexNode(t, start) + cid, len := uploadHelper(t, codex) + + totalBytes := 0 + finalPercent := 0.0 + opt := DownloadStreamOptions{ + datasetSizeAuto: true, + onProgress: func(read, total int, percent float64, err error) { + if err != nil { + t.Fatalf("Error happening during download: %v\n", err) + } + + totalBytes = total + finalPercent = percent + }, + } + + if err := codex.DownloadStream(cid, opt); err != nil { + t.Fatal("Error happened:", err.Error()) + } + + if finalPercent != 100.0 { + t.Fatalf("UploadReader progress callback final percent %.2f but expected 100.0", finalPercent) + } + + if totalBytes != len { + t.Fatalf("UploadReader progress callback total bytes %d but expected %d", totalBytes, len) + } +} + +func TestDownloadManual(t *testing.T) { + start := true + codex := newCodexNode(t, start) + cid, _ := uploadHelper(t, codex) + + if err := codex.DownloadInit(cid, DownloadInitOptions{}); err != nil { + t.Fatal("Error when initializing download:", err) + } + + var b strings.Builder + if chunk, err := codex.DownloadChunk(cid); err != nil { + t.Fatal("Error when downloading chunk:", err) + } else { + b.Write(chunk) + } + + data := b.String() + if data != "Hello World!" { + t.Fatalf("Expected data was \"Hello World!\" got %s", data) + } + + if err := codex.DownloadCancel(cid); err != nil { + t.Fatalf("Error when cancelling the download %s", err) + } +}