323 lines
8.7 KiB
Go
Raw Permalink Normal View History

2025-10-08 09:03:09 +02:00
package codex
/*
#include "bridge.h"
#include <stdlib.h>
static int cGoCodexDownloadInit(void* codexCtx, char* cid, size_t chunkSize, bool local, void* resp) {
return codex_download_init(codexCtx, cid, chunkSize, local, (CodexCallback) callback, resp);
}
static int cGoCodexDownloadChunk(void* codexCtx, char* cid, void* resp) {
return codex_download_chunk(codexCtx, cid, (CodexCallback) callback, resp);
}
static int cGoCodexDownloadStream(void* codexCtx, char* cid, size_t chunkSize, bool local, const char* filepath, void* resp) {
return codex_download_stream(codexCtx, cid, chunkSize, local, filepath, (CodexCallback) callback, resp);
}
static int cGoCodexDownloadCancel(void* codexCtx, char* cid, void* resp) {
return codex_download_cancel(codexCtx, cid, (CodexCallback) callback, resp);
}
static int cGoCodexDownloadManifest(void* codexCtx, char* cid, void* resp) {
return codex_download_manifest(codexCtx, cid, (CodexCallback) callback, resp);
}
*/
import "C"
import (
"context"
2025-10-08 09:03:09 +02:00
"encoding/json"
"fmt"
2025-10-08 09:03:09 +02:00
"io"
2025-10-30 07:56:43 +01:00
"sync/atomic"
2025-10-08 09:03:09 +02:00
"unsafe"
)
type OnDownloadProgressFunc func(read, total int, percent float64, err error)
2025-10-08 09:03:09 +02:00
// DownloadStreamOptions is used to download a file
// in a streaming manner in Codex.
type DownloadStreamOptions = struct {
// Filepath is the path destination used by DownloadStream.
2025-10-08 09:03:09 +02:00
// If it is set, the content will be written into the specified
// path.
Filepath string
2025-10-08 09:03:09 +02:00
// ChunkSize is the size of each downloaded chunk. Default is to 64 KB.
ChunkSize ChunkSize
2025-10-08 09:03:09 +02:00
// OnProgress is a callback function that is called after each chunk is download with:
2025-10-08 09:03:09 +02:00
// - read: the number of bytes downloaded for the last chunk.
// - total: the total number of bytes downloaded so far.
// - percent: the percentage of the total file size that has been downloaded. It is
// determined from `datasetSize`.
// - err: an error, if one occurred.
OnProgress OnDownloadProgressFunc
2025-10-08 09:03:09 +02:00
// Writer is the path destination used by DownloadStream.
2025-10-08 09:03:09 +02:00
// If it is set, the content will be written into the specified
// Writer.
Writer io.Writer
2025-10-08 09:03:09 +02:00
// Local defines the way to download the content.
2025-10-08 09:03:09 +02:00
// If true, the content will be downloaded from the
// Local node.
2025-10-08 09:03:09 +02:00
// If false (default), the content will be downloaded
// from the network.
Local bool
2025-10-08 09:03:09 +02:00
// DatasetSize is the total size of the dataset being downloaded.
DatasetSize int
2025-10-08 09:03:09 +02:00
// DatasetSizeAuto if true, will fetch the manifest before starting
2025-10-08 09:03:09 +02:00
// the downloaded to retrive the size of the data.
DatasetSizeAuto bool
2025-10-08 09:03:09 +02:00
}
// DownloadInitOptions is used to create a download session.
type DownloadInitOptions = struct {
// Local defines the way to download the content.
2025-10-08 09:03:09 +02:00
// If true, the content will be downloaded from the
// local node.
// If false (default), the content will be downloaded
// from the network.
Local bool
2025-10-08 09:03:09 +02:00
// ChunkSize is the size of each downloaded chunk. Default is to 64 KB.
ChunkSize ChunkSize
2025-10-08 09:03:09 +02:00
}
// Manifest is the object containing the information of
// a file in Codex.
type Manifest struct {
// Cid is the content identifier over the network
Cid string
// TreeCid is the root of the merkle tree
TreeCid string `json:"treeCid"`
// DatasetSize is the total size of all blocks
DatasetSize int `json:"datasetSize"`
// BlockSize is the size of each contained block
BlockSize int `json:"blockSize"`
// Filename is the name of the file (optional)
Filename string `json:"filename"`
// Mimetype is the MIME type of the file (optional)
Mimetype string `json:"mimetype"`
// Protected datasets have erasure coded info
Protected bool `json:"protected"`
}
// DownloadManifest retrieves the Codex manifest from its cid.
// The session identifier is the cid, i.e you cannot have multiple
// sessions for a cid.
func (node CodexNode) DownloadManifest(cid string) (Manifest, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if C.cGoCodexDownloadManifest(node.ctx, cCid, bridge.resp) != C.RET_OK {
return Manifest{}, bridge.callError("cGoCodexDownloadManifest")
}
val, err := bridge.wait()
if err != nil {
return Manifest{}, err
}
manifest := Manifest{Cid: cid}
err = json.Unmarshal([]byte(val), &manifest)
if err != nil {
return Manifest{}, err
}
return manifest, nil
}
// DownloadStream download the data corresponding to a cid.
// If options.datasetSizeAuto is true, the manifest will be fetched first
// to get the dataset size.
// If options.filepath is set, the data will be written into that path.
// If options.writer is set, the data will be written into that writer.
// The options filepath and writer are not mutually exclusive, i.e you can write
// in different places in a same call.
func (node CodexNode) DownloadStream(ctx context.Context, cid string, options DownloadStreamOptions) error {
2025-10-08 09:03:09 +02:00
bridge := newBridgeCtx()
defer bridge.free()
if options.DatasetSizeAuto {
2025-10-08 09:03:09 +02:00
manifest, err := node.DownloadManifest(cid)
if err != nil {
return err
}
options.DatasetSize = manifest.DatasetSize
2025-10-08 09:03:09 +02:00
}
total := 0
bridge.onProgress = func(read int, chunk []byte) {
if read == 0 {
return
}
if options.Writer != nil {
w := options.Writer
2025-10-08 09:03:09 +02:00
if _, err := w.Write(chunk); err != nil {
if options.OnProgress != nil {
options.OnProgress(0, 0, 0.0, err)
2025-10-08 09:03:09 +02:00
}
}
}
total += read
if options.OnProgress != nil {
2025-10-08 09:03:09 +02:00
var percent = 0.0
if options.DatasetSize > 0 {
percent = float64(total) / float64(options.DatasetSize) * 100.0
2025-10-08 09:03:09 +02:00
}
options.OnProgress(read, total, percent, nil)
2025-10-08 09:03:09 +02:00
}
}
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
err := node.DownloadInit(cid, DownloadInitOptions{
ChunkSize: options.ChunkSize,
Local: options.Local,
})
if err != nil {
return err
}
defer node.DownloadCancel(cid)
var cFilepath = C.CString(options.Filepath)
2025-10-08 09:03:09 +02:00
defer C.free(unsafe.Pointer(cFilepath))
var cLocal = C.bool(options.Local)
2025-10-08 09:03:09 +02:00
if C.cGoCodexDownloadStream(node.ctx, cCid, options.ChunkSize.toSizeT(), cLocal, cFilepath, bridge.resp) != C.RET_OK {
2025-10-08 09:03:09 +02:00
return bridge.callError("cGoCodexDownloadLocal")
}
2025-10-23 09:47:39 +02:00
// Create a done channel to signal the goroutine to stop
// when the download is complete and avoid goroutine leaks.
2025-10-20 13:06:27 +02:00
done := make(chan struct{})
2025-10-23 09:47:39 +02:00
defer close(done)
2025-10-20 13:06:27 +02:00
channelError := make(chan error, 1)
2025-10-30 07:56:43 +01:00
var cancelled atomic.Bool
2025-10-20 13:06:27 +02:00
go func() {
select {
case <-ctx.Done():
channelError <- node.DownloadCancel(cid)
2025-10-30 07:56:43 +01:00
cancelled.Store(true)
2025-10-20 13:06:27 +02:00
case <-done:
// Nothing to do, download finished
}
}()
_, err = bridge.wait()
// Extract the potential cancellation error
var cancelError error
select {
2025-10-20 13:06:27 +02:00
case cancelError = <-channelError:
default:
}
if err != nil {
2025-10-20 13:06:27 +02:00
if cancelError != nil {
2025-10-30 07:56:43 +01:00
return fmt.Errorf("context canceled: %v, but failed to cancel download session: %v", ctx.Err(), cancelError)
}
if cancelled.Load() {
return context.Canceled
}
2025-10-20 13:06:27 +02:00
return err
}
2025-10-23 06:11:31 +02:00
return cancelError
2025-10-08 09:03:09 +02:00
}
// DownloadInit initializes the download process for a specific CID.
// This method should be used if you want to manage the download session
// and the chunk downloads manually.
func (node CodexNode) DownloadInit(cid string, options DownloadInitOptions) error {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
var cLocal = C.bool(options.Local)
2025-10-08 09:03:09 +02:00
if C.cGoCodexDownloadInit(node.ctx, cCid, options.ChunkSize.toSizeT(), cLocal, bridge.resp) != C.RET_OK {
2025-10-08 09:03:09 +02:00
return bridge.callError("cGoCodexDownloadInit")
}
_, err := bridge.wait()
return err
}
// DownloadChunk downloads a chunk from its cid.
// You HAVE TO call `DownloadInit` before using this method.
// When using this method, you are managing at your own
// the total size downloaded (use DownloadManifest to get the
// datasetSize).
// When the download is complete, you need to call `CodexDownloadCancel`
// to free the resources.
func (node CodexNode) DownloadChunk(cid string) ([]byte, error) {
bridge := newBridgeCtx()
defer bridge.free()
var bytes []byte
bridge.onProgress = func(read int, chunk []byte) {
bytes = chunk
}
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if C.cGoCodexDownloadChunk(node.ctx, cCid, bridge.resp) != C.RET_OK {
return nil, bridge.callError("cGoCodexDownloadChunk")
}
if _, err := bridge.wait(); err != nil {
return nil, err
}
return bytes, nil
}
// DownloadCancel cancels a download session.
// It can be only if the download session is managed manually.
// It doesn't work with DownloadStream.
func (node CodexNode) DownloadCancel(cid string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if C.cGoCodexDownloadCancel(node.ctx, cCid, bridge.resp) != C.RET_OK {
return bridge.callError("cGoCodexDownloadCancel")
}
_, err := bridge.wait()
return err
}