Refactor to avoid memory issues and add download local

This commit is contained in:
Arnaud 2025-09-30 06:05:04 +02:00
parent 74f22df253
commit 20dd45e8af
No known key found for this signature in database
GPG Key ID: B8FBC178F10CA7AE
4 changed files with 536 additions and 102 deletions

View File

@ -124,8 +124,20 @@ package main
return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp); return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp);
} }
static int cGoCodexUploadSubscribe(void* codexCtx, char* sessionId, void* resp) { static int cGoCodexDownloadInit(void* codexCtx, char* cid, size_t chunkSize, bool local, void* resp) {
return codex_upload_subscribe(codexCtx, sessionId, (CodexCallback) callback, resp); return codex_download_init(codexCtx, cid, chunkSize, local, (CodexCallback) callback, resp);
}
static int cGoCodexDownloadChunk(void* codexCtx, char* cid, void* resp) {
return codex_download_chunk(codexCtx, cid, (CodexCallback) callback, resp);
}
static int cGoCodexDownloadLocal(void* codexCtx, char* cid, size_t chunkSize, void* resp) {
return codex_download_local(codexCtx, cid, chunkSize, (CodexCallback) callback, resp);
}
static int cGoCodexDownloadCancel(void* codexCtx, char* cid, void* resp) {
return codex_download_cancel(codexCtx, cid, (CodexCallback) callback, resp);
} }
static int cGoCodexStart(void* codexCtx, void* resp) { static int cGoCodexStart(void* codexCtx, void* resp) {
@ -158,7 +170,6 @@ package main
*/ */
import "C" import "C"
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
@ -236,7 +247,7 @@ type RestNode struct {
NodeId string `json:"nodeId"` NodeId string `json:"nodeId"`
PeerId string `json:"peerId"` PeerId string `json:"peerId"`
Record string `json:"record"` Record string `json:"record"`
Address *string `json:"address"` // Use pointer for nullable Address *string `json:"address"`
Seen bool `json:"seen"` Seen bool `json:"seen"`
} }
@ -259,7 +270,7 @@ type CodexNode struct {
const defaultBlockSize = 1024 * 64 const defaultBlockSize = 1024 * 64
type OnProgressFunc func(read, total int, percent float64) type OnProgressFunc func(read, total int, percent float64, err error)
type CodexUploadOptions struct { type CodexUploadOptions struct {
filepath string filepath string
@ -275,16 +286,16 @@ type bridgeCtx struct {
err error err error
// Callback used for upload and download // Callback used for upload and download
onProgress func(read int) onProgress func(read int, chunk []byte)
} }
func newBridgeCtx() *bridgeCtx { func newBridgeCtx() *bridgeCtx {
bridge := &bridgeCtx{} bridge := &bridgeCtx{}
bridge.wg = &sync.WaitGroup{} bridge.wg = &sync.WaitGroup{}
bridge.wg.Add(1) bridge.wg.Add(1)
bridge.h = cgo.NewHandle(bridge) bridge.h = cgo.NewHandle(bridge)
bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h))) bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h)))
return bridge return bridge
} }
@ -307,12 +318,7 @@ func (b *bridgeCtx) CallError(name string) error {
func (b *bridgeCtx) wait() (string, error) { func (b *bridgeCtx) wait() (string, error) {
b.wg.Wait() b.wg.Wait()
result := b.result return b.result, b.err
err := b.err
b.free()
return result, err
} }
func getReaderSize(r io.Reader) int64 { func getReaderSize(r io.Reader) int64 {
@ -354,8 +360,15 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
if v, ok := h.Value().(*bridgeCtx); ok { if v, ok := h.Value().(*bridgeCtx); ok {
switch ret { switch ret {
case C.RET_PROGRESS: case C.RET_PROGRESS:
if v.onProgress != nil { if v.onProgress == nil {
v.onProgress(int(C.int(len))) return
}
if msg != nil {
chunk := C.GoBytes(unsafe.Pointer(msg), C.int(len))
v.onProgress(int(C.int(len)), chunk)
} else {
v.onProgress(int(C.int(len)), nil)
} }
case C.RET_OK: case C.RET_OK:
retMsg := C.GoStringN(msg, C.int(len)) retMsg := C.GoStringN(msg, C.int(len))
@ -378,8 +391,10 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
func CodexNew(config CodexConfig) (*CodexNode, error) { func CodexNew(config CodexConfig) (*CodexNode, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
jsonConfig, err := json.Marshal(config) jsonConfig, err := json.Marshal(config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -398,6 +413,7 @@ func CodexNew(config CodexConfig) (*CodexNode, error) {
func (self CodexNode) CodexVersion() (string, error) { func (self CodexNode) CodexVersion() (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexVersion(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexVersion(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexVersion") return "", bridge.CallError("cGoCodexVersion")
@ -408,6 +424,7 @@ func (self CodexNode) CodexVersion() (string, error) {
func (self CodexNode) CodexRevision() (string, error) { func (self CodexNode) CodexRevision() (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexRevision(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexRevision(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexRevision") return "", bridge.CallError("cGoCodexRevision")
@ -418,6 +435,7 @@ func (self CodexNode) CodexRevision() (string, error) {
func (self CodexNode) CodexRepo() (string, error) { func (self CodexNode) CodexRepo() (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexRepo(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexRepo(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexRepo") return "", bridge.CallError("cGoCodexRepo")
@ -430,6 +448,7 @@ func (self CodexNode) CodexDebug() (CodexDebugInfo, error) {
var info CodexDebugInfo var info CodexDebugInfo
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexDebug(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexDebug(self.ctx, bridge.resp) != C.RET_OK {
return info, bridge.CallError("cGoCodexDebug") return info, bridge.CallError("cGoCodexDebug")
@ -447,6 +466,7 @@ func (self CodexNode) CodexDebug() (CodexDebugInfo, error) {
func (self CodexNode) CodexSpr() (string, error) { func (self CodexNode) CodexSpr() (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexSpr(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexSpr(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexSpr") return "", bridge.CallError("cGoCodexSpr")
@ -457,6 +477,7 @@ func (self CodexNode) CodexSpr() (string, error) {
func (self CodexNode) CodexPeerId() (string, error) { func (self CodexNode) CodexPeerId() (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexPeerId(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexPeerId(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexPeerId") return "", bridge.CallError("cGoCodexPeerId")
@ -467,6 +488,7 @@ func (self CodexNode) CodexPeerId() (string, error) {
func (self CodexNode) CodexLogLevel(logLevel LogLevel) error { func (self CodexNode) CodexLogLevel(logLevel LogLevel) error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cLogLevel = C.CString(fmt.Sprintf("%s", logLevel)) var cLogLevel = C.CString(fmt.Sprintf("%s", logLevel))
defer C.free(unsafe.Pointer(cLogLevel)) defer C.free(unsafe.Pointer(cLogLevel))
@ -476,11 +498,13 @@ func (self CodexNode) CodexLogLevel(logLevel LogLevel) error {
} }
_, err := bridge.wait() _, err := bridge.wait()
return err return err
} }
func (self CodexNode) CodexConnect(peerId string, peerAddresses []string) error { func (self CodexNode) CodexConnect(peerId string, peerAddresses []string) error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cPeerId = C.CString(peerId) var cPeerId = C.CString(peerId)
defer C.free(unsafe.Pointer(cPeerId)) defer C.free(unsafe.Pointer(cPeerId))
@ -509,6 +533,7 @@ func (self CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
var record RestPeerRecord var record RestPeerRecord
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cPeerId = C.CString(peerId) var cPeerId = C.CString(peerId)
defer C.free(unsafe.Pointer(cPeerId)) defer C.free(unsafe.Pointer(cPeerId))
@ -529,6 +554,7 @@ func (self CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) { func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cFilename = C.CString(options.filepath) var cFilename = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilename)) defer C.free(unsafe.Pointer(cFilename))
@ -536,7 +562,6 @@ func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, erro
if options.chunkSize == 0 { if options.chunkSize == 0 {
options.chunkSize = defaultBlockSize options.chunkSize = defaultBlockSize
} }
var cChunkSize = C.size_t(options.chunkSize) var cChunkSize = C.size_t(options.chunkSize)
if C.cGoCodexUploadInit(self.ctx, cFilename, cChunkSize, bridge.resp) != C.RET_OK { if C.cGoCodexUploadInit(self.ctx, cFilename, cChunkSize, bridge.resp) != C.RET_OK {
@ -548,6 +573,7 @@ func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, erro
func (self CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error { func (self CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId) var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId)) defer C.free(unsafe.Pointer(cSessionId))
@ -562,11 +588,13 @@ func (self CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error {
} }
_, err := bridge.wait() _, err := bridge.wait()
return err return err
} }
func (self CodexNode) CodexUploadFinalize(sessionId string) (string, error) { func (self CodexNode) CodexUploadFinalize(sessionId string) (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId) var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId)) defer C.free(unsafe.Pointer(cSessionId))
@ -580,6 +608,7 @@ func (self CodexNode) CodexUploadFinalize(sessionId string) (string, error) {
func (self CodexNode) CodexUploadCancel(sessionId string) error { func (self CodexNode) CodexUploadCancel(sessionId string) error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId) var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId)) defer C.free(unsafe.Pointer(cSessionId))
@ -589,66 +618,40 @@ func (self CodexNode) CodexUploadCancel(sessionId string) error {
} }
_, err := bridge.wait() _, err := bridge.wait()
return err return err
} }
func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) { func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) {
sessionId, err := self.CodexUploadInit(&options) sessionId, err := self.CodexUploadInit(&options)
if err != nil { if err != nil {
return "", err return "", err
} }
if options.onProgress != nil {
size := getReaderSize(r)
total := 0
if size > 0 {
onProgress := func(read int) {
if read == 0 {
return
}
total += read
percent := float64(total) / float64(size) * 100.0
// The last block could be a bit over the size due to padding
// on the chunk size.
if percent > 100.0 {
percent = 100.0
}
options.onProgress(read, int(size), percent)
}
if err := self.CodexUploadSubscribe(sessionId, onProgress); err != nil {
if err := self.CodexUploadCancel(sessionId); err != nil {
log.Println("Error cancelling upload after subscribe failure:", err)
}
return "", err
}
}
}
if options.chunkSize == 0 { if options.chunkSize == 0 {
options.chunkSize = defaultBlockSize options.chunkSize = defaultBlockSize
} }
buf := make([]byte, options.chunkSize) buf := make([]byte, options.chunkSize)
total := 0
var size int64
if options.onProgress != nil {
size = getReaderSize(r)
}
for { for {
n, err := r.Read(buf) n, err := r.Read(buf)
if n > 0 {
if err := self.CodexUploadChunk(sessionId, buf[:n]); err != nil {
return "", err
}
}
if err == io.EOF { if err == io.EOF {
break break
} }
if err != nil { if err != nil {
self.CodexUploadCancel(sessionId) if cancelErr := self.CodexUploadCancel(sessionId); cancelErr != nil {
return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr)
}
return "", err return "", err
} }
@ -656,6 +659,25 @@ func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader)
if n == 0 { if n == 0 {
break break
} }
if err := self.CodexUploadChunk(sessionId, buf[:n]); err != nil {
if cancelErr := self.CodexUploadCancel(sessionId); cancelErr != nil {
return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr)
}
return "", err
}
total += n
if options.onProgress != nil && size > 0 {
percent := float64(total) / float64(size) * 100.0
// The last block could be a bit over the size due to padding
// on the chunk size.
if percent > 100.0 {
percent = 100.0
}
options.onProgress(n, total, percent, nil)
}
} }
return self.CodexUploadFinalize(sessionId) return self.CodexUploadFinalize(sessionId)
@ -670,9 +692,11 @@ func (self CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Re
func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) { func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if options.onProgress != nil { if options.onProgress != nil {
stat, err := os.Stat(options.filepath) stat, err := os.Stat(options.filepath)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -681,7 +705,7 @@ func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error
total := 0 total := 0
if size > 0 { if size > 0 {
bridge.onProgress = func(read int) { bridge.onProgress = func(read int, _ []byte) {
if read == 0 { if read == 0 {
return return
} }
@ -695,14 +719,11 @@ func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error
percent = 100.0 percent = 100.0
} }
options.onProgress(read, int(size), percent) options.onProgress(read, int(size), percent, nil)
} }
} }
} }
var cFilePath = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilePath))
sessionId, err := self.CodexUploadInit(&options) sessionId, err := self.CodexUploadInit(&options)
if err != nil { if err != nil {
return "", err return "", err
@ -725,27 +746,104 @@ func (self CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone fu
}() }()
} }
func (self CodexNode) CodexUploadSubscribe(sessionId string, onProgress func(read int)) error { func (self CodexNode) CodexDownloadLocal(cid string, chunkSize int, w io.Writer) error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
bridge.onProgress = onProgress bridge.onProgress = func(read int, chunk []byte) {
if read == 0 {
return
}
var cSessionId = C.CString(sessionId) if _, err := w.Write(chunk); err != nil {
defer C.free(unsafe.Pointer(cSessionId)) log.Println(err)
}
log.Println("Subscribing to upload progress...")
if C.cGoCodexUploadSubscribe(self.ctx, cSessionId, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexUploadSubscribe")
} }
go func() { var cCid = C.CString(cid)
if _, err := bridge.wait(); err != nil { defer C.free(unsafe.Pointer(cCid))
log.Println("Error in CodexUploadSubscribe:", err)
}
}()
return nil if chunkSize == 0 {
chunkSize = defaultBlockSize
}
var cChunkSize = C.size_t(chunkSize)
if C.cGoCodexDownloadLocal(self.ctx, cCid, cChunkSize, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexDownloadLocal")
}
_, err := bridge.wait()
return err
}
func (self CodexNode) CodexDownloadLocalAsync(cid string, chunkSize int, w io.Writer, onDone func(error)) {
go func() {
err := self.CodexDownloadLocal(cid, chunkSize, w)
onDone(err)
}()
}
func (self CodexNode) CodexDownloadInit(cid string, chunkSize int, local bool) error {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if chunkSize == 0 {
chunkSize = defaultBlockSize
}
var cChunkSize = C.size_t(chunkSize)
var cLocal = C.bool(local)
if C.cGoCodexDownloadInit(self.ctx, cCid, cChunkSize, cLocal, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexDownloadInit")
}
_, err := bridge.wait()
return err
}
func (self CodexNode) CodexDownloadChunk(cid string) ([]byte, error) {
bridge := newBridgeCtx()
defer bridge.free()
var bytes []byte
bridge.onProgress = func(read int, chunk []byte) {
bytes = chunk
}
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if C.cGoCodexDownloadChunk(self.ctx, cCid, bridge.resp) != C.RET_OK {
return nil, bridge.CallError("cGoCodexDownloadChunk")
}
if _, err := bridge.wait(); err != nil {
return nil, err
}
return bytes, nil
}
func (self CodexNode) CodexDownloadCancel(cid string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if C.cGoCodexDownloadCancel(self.ctx, cCid, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexDownloadCancel")
}
_, err := bridge.wait()
return err
} }
func (self CodexNode) CodexStart() error { func (self CodexNode) CodexStart() error {
@ -757,6 +855,7 @@ func (self CodexNode) CodexStart() error {
} }
_, err := bridge.wait() _, err := bridge.wait()
return err return err
} }
@ -769,7 +868,6 @@ func (self CodexNode) CodexStartAsync(onDone func(error)) {
func (self CodexNode) CodexStop() error { func (self CodexNode) CodexStop() error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexStop(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexStop(self.ctx, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexStop") return bridge.CallError("cGoCodexStop")
@ -781,7 +879,6 @@ func (self CodexNode) CodexStop() error {
func (self CodexNode) CodexDestroy() error { func (self CodexNode) CodexDestroy() error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexDestroy(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexDestroy(self.ctx, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexDestroy") return bridge.CallError("cGoCodexDestroy")
@ -851,6 +948,8 @@ func main() {
log.Println("Codex started...") log.Println("Codex started...")
// for i := 0; i < 150; i++ {
debug, err := node.CodexDebug() debug, err := node.CodexDebug()
if err != nil { if err != nil {
log.Fatal("Error happened:", err.Error()) log.Fatal("Error happened:", err.Error())
@ -908,7 +1007,11 @@ func main() {
log.Println("Codex Upload Finalized, cid:", cid) log.Println("Codex Upload Finalized, cid:", cid)
buf := bytes.NewBuffer([]byte("Hello World!")) buf := bytes.NewBuffer([]byte("Hello World!"))
cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64) { cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64, err error) {
if err != nil {
log.Fatal("Error happened during upload: %v\n", err)
}
log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent) log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent)
}}, buf) }}, buf)
if err != nil { if err != nil {
@ -921,12 +1024,15 @@ func main() {
if err != nil { if err != nil {
log.Fatal("Error happened:", err.Error()) log.Fatal("Error happened:", err.Error())
} }
// Choose a big file to see the progress logs // Choose a big file to see the progress logs
filepath := path.Join(current, "examples", "golang", "hello.txt") filepath := path.Join(current, "examples", "golang", "hello.txt")
//filepath := path.Join(current, "examples", "golang", "discord-0.0.109.deb") //filepath := path.Join(current, "examples", "golang", "discord-0.0.109.deb")
options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64) { options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64, err error) {
if err != nil {
log.Fatal("Error happened during upload: %v\n", err)
}
log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent) log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent)
}} }}
@ -938,6 +1044,38 @@ func main() {
log.Println("Codex Upload File finalized, cid: .", cid) log.Println("Codex Upload File finalized, cid: .", cid)
f, err := os.Create("hello.loaded.txt")
if err != nil {
log.Fatal(err)
}
defer f.Close()
// log.Println("Codex Download Local starting... attempt", i+1)
if err := node.CodexDownloadLocal(cid, 0, f); err != nil {
log.Fatal("Error happened:", err.Error())
}
log.Println("Codex Download Local finished.")
// log.Println("Codex Download Init starting... attempt", i+1)
if err := node.CodexDownloadInit(cid, 0, true); err != nil {
log.Fatal("Error happened:", err.Error())
}
log.Println("Codex Download Init finished.")
// log.Println("Codex Download Chunk starting... attempt", i+1)
chunk, err := node.CodexDownloadChunk(cid)
if err != nil {
log.Fatal("Error happened:", err.Error())
}
log.Println("Codex Download Chunk finished. Size:", len(chunk))
// }
// err = node.CodexConnect(peerId, []string{}) // err = node.CodexConnect(peerId, []string{})
// if err != nil { // if err != nil {
// log.Fatal("Error happened:", err.Error()) // log.Fatal("Error happened:", err.Error())

View File

@ -11,6 +11,7 @@ import ./requests/node_info_request
import ./requests/node_debug_request import ./requests/node_debug_request
import ./requests/node_p2p_request import ./requests/node_p2p_request
import ./requests/node_upload_request import ./requests/node_upload_request
import ./requests/node_download_request
from ../../codex/codex import CodexServer from ../../codex/codex import CodexServer
@ -20,6 +21,7 @@ type RequestType* {.pure.} = enum
DEBUG DEBUG
P2P P2P
UPLOAD UPLOAD
DOWNLOAD
type CodexThreadRequest* = object type CodexThreadRequest* = object
reqType: RequestType reqType: RequestType
@ -94,11 +96,22 @@ proc process*(
cast[ptr NodeDebugRequest](request[].reqContent).process(codex) cast[ptr NodeDebugRequest](request[].reqContent).process(codex)
of P2P: of P2P:
cast[ptr NodeP2PRequest](request[].reqContent).process(codex) cast[ptr NodeP2PRequest](request[].reqContent).process(codex)
of DOWNLOAD:
let onChunk = proc(bytes: seq[byte]) =
if bytes.len > 0:
request[].callback(
RET_PROGRESS,
cast[ptr cchar](unsafeAddr bytes[0]),
cast[csize_t](bytes.len),
request[].userData,
)
cast[ptr NodeDownloadRequest](request[].reqContent).process(codex, onChunk)
of UPLOAD: of UPLOAD:
let onUploadProgress = proc(bytes: int) = let onBlockReceived = proc(bytes: int) =
request[].callback(RET_PROGRESS, nil, cast[csize_t](bytes), request[].userData) request[].callback(RET_PROGRESS, nil, cast[csize_t](bytes), request[].userData)
cast[ptr NodeUploadRequest](request[].reqContent).process(codex, onUploadProgress) cast[ptr NodeUploadRequest](request[].reqContent).process(codex, onBlockReceived)
handleRes(await retFut, request) handleRes(await retFut, request)

View File

@ -0,0 +1,245 @@
{.push raises: [].}
## This file contains the download request.
import std/[options, streams]
import chronos
import chronicles
import libp2p/stream/[lpstream]
import ../../alloc
import ../../../codex/units
import ../../../codex/codextypes
from ../../../codex/codex import CodexServer, node
from ../../../codex/node import retrieve
from libp2p import Cid, init, `$`
logScope:
topics = "codexlib codexlibdownload"
type NodeDownloadMsgType* = enum
INIT
LOCAL
NETWORK
CHUNK
CANCEL
type OnChunkHandler = proc(bytes: seq[byte]): void {.gcsafe, raises: [].}
type NodeDownloadRequest* = object
operation: NodeDownloadMsgType
cid: cstring
chunkSize: csize_t
local: bool
type
DownloadSessionId* = string
DownloadSessionCount* = int
DownloadSession* = object
stream: LPStream
chunkSize: int
var downloadSessions {.threadvar.}: Table[DownloadSessionId, DownloadSession]
proc createShared*(
T: type NodeDownloadRequest,
op: NodeDownloadMsgType,
cid: cstring = "",
chunkSize: csize_t = 0,
local: bool = false,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].cid = cid.alloc()
ret[].chunkSize = chunkSize
ret[].local = local
return ret
proc destroyShared(self: ptr NodeDownloadRequest) =
deallocShared(self)
proc init(
codex: ptr CodexServer,
cCid: cstring = "",
chunkSize: csize_t = 0,
local: bool = true,
): Future[Result[string, string]] {.async: (raises: []).} =
if downloadSessions.contains($cCid):
return ok("Download session already exists.")
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to download locally: cannot parse cid: " & $cCid)
let node = codex[].node
var stream: LPStream
try:
let res = await node.retrieve(cid.get(), local)
if res.isErr():
return err("Failed to init the download: " & res.error.msg)
stream = res.get()
except CancelledError:
downloadSessions.del($cCid)
return err("Failed to init the download: download cancelled.")
let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int
downloadSessions[$cCid] = DownloadSession(stream: stream, chunkSize: blockSize)
return ok("")
proc chunk(
codex: ptr CodexServer, cid: cstring = "", onChunk: OnChunkHandler
): Future[Result[string, string]] {.async: (raises: []).} =
if not downloadSessions.contains($cid):
return err("Failed to download chunk: no session for cid " & $cid)
var session: DownloadSession
try:
session = downloadSessions[$cid]
except KeyError:
return err("Failed to download chunk: no session for cid " & $cid)
let stream = session.stream
let chunkSize = session.chunkSize
if stream.atEof:
return ok("")
var buf = newSeq[byte](chunkSize)
try:
let read = await stream.readOnce(addr buf[0], buf.len)
buf.setLen(read)
except LPStreamError as e:
await stream.close()
downloadSessions.del($cid)
return err("Failed to download chunk: " & $e.msg)
except CancelledError:
await stream.close()
downloadSessions.del($cid)
return err("Failed to download chunk: download cancelled.")
if buf.len <= 0:
return err("Failed to download chunk: no data")
onChunk(buf)
return ok("")
proc streamFile(
codex: ptr CodexServer,
cid: Cid,
local: bool = true,
onChunk: OnChunkHandler,
chunkSize: csize_t,
): Future[Result[string, string]] {.async: (raises: [CancelledError]).} =
let node = codex[].node
let res = await node.retrieve(cid, local = local)
if res.isErr():
return err("Failed to retrieve CID: " & res.error.msg)
let stream = res.get()
if stream.atEof:
return err("Failed to retrieve CID: empty stream.")
let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int
var buf = newSeq[byte](blockSize)
var read = 0
try:
while not stream.atEof:
let read = await stream.readOnce(addr buf[0], buf.len)
buf.setLen(read)
if buf.len <= 0:
break
if onChunk != nil:
onChunk(buf)
except LPStreamError as e:
return err("Failed to stream file: " & $e.msg)
finally:
await stream.close()
downloadSessions.del($cid)
return ok("")
proc local(
codex: ptr CodexServer, cCid: cstring, chunkSize: csize_t, onChunk: OnChunkHandler
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
let node = codex[].node
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to download locally: cannot parse cid: " & $cCid)
try:
let local = true
let res = await codex.streamFile(cid.get(), true, onChunk, chunkSize)
if res.isErr:
return err($res.error)
except CancelledError:
return err("Failed to download locally: download cancelled.")
return ok("")
proc cancel(
codex: ptr CodexServer, cCid: cstring
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
if not downloadSessions.contains($cCid):
return err("Failed to download chunk: no session for cid " & $cCid)
var session: DownloadSession
try:
session = downloadSessions[$cCid]
except KeyError:
return err("Failed to download chunk: no session for cid " & $cCid)
let stream = session.stream
await stream.close()
downloadSessions.del($cCid)
return ok("")
proc process*(
self: ptr NodeDownloadRequest, codex: ptr CodexServer, onChunk: OnChunkHandler
): Future[Result[string, string]] {.async: (raises: []).} =
defer:
destroyShared(self)
case self.operation
of NodeDownloadMsgType.INIT:
let res = (await init(codex, self.cid, self.chunkSize, self.local))
if res.isErr:
error "Failed to INIT.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.LOCAL:
let res = (await local(codex, self.cid, self.chunkSize, onChunk))
if res.isErr:
error "Failed to LOCAL.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.NETWORK:
return err("NETWORK download not implemented yet.")
# let res = (await local(codex, self.cid, self.onChunk2, self.chunkSize, onChunk))
# if res.isErr:
# error "Failed to NETWORK.", error = res.error
# return err($res.error)
# return res
of NodeDownloadMsgType.CHUNK:
let res = (await chunk(codex, self.cid, onChunk))
if res.isErr:
error "Failed to CHUNK.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.CANCEL:
let res = (await cancel(codex, self.cid))
if res.isErr:
error "Failed to CANCEL.", error = res.error
return err($res.error)
return res

View File

@ -34,6 +34,7 @@ import ./codex_thread_requests/requests/node_info_request
import ./codex_thread_requests/requests/node_debug_request import ./codex_thread_requests/requests/node_debug_request
import ./codex_thread_requests/requests/node_p2p_request import ./codex_thread_requests/requests/node_p2p_request
import ./codex_thread_requests/requests/node_upload_request import ./codex_thread_requests/requests/node_upload_request
import ./codex_thread_requests/requests/node_download_request
import ./ffi_types import ./ffi_types
from ../codex/conf import codexVersion, updateLogLevel from ../codex/conf import codexVersion, updateLogLevel
@ -342,15 +343,8 @@ proc codex_upload_file(
initializeLibrary() initializeLibrary()
checkLibcodexParams(ctx, callback, userData) checkLibcodexParams(ctx, callback, userData)
let onProgress = proc( let reqContent =
bytes: int NodeUploadRequest.createShared(NodeUploadMsgType.FILE, sessionId = sessionId)
): Future[void] {.async: (raises: [CancelledError]).} =
if userData != nil:
callback(RET_PROGRESS, nil, cast[csize_t](bytes), userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.FILE, sessionId = sessionId, onProgress = onProgress
)
let res = codex_context.sendRequestToCodexThread( let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData ctx, RequestType.UPLOAD, reqContent, callback, userData
@ -358,30 +352,74 @@ proc codex_upload_file(
return callback.okOrError(res, userData) return callback.okOrError(res, userData)
proc codex_upload_subscribe( proc codex_download_init(
ctx: ptr CodexContext, ctx: ptr CodexContext,
sessionId: cstring, cid: cstring,
chunkSize: csize_t,
local: bool,
callback: CodexCallback, callback: CodexCallback,
userData: pointer, userData: pointer,
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
initializeLibrary() initializeLibrary()
checkLibcodexParams(ctx, callback, userData) checkLibcodexParams(ctx, callback, userData)
let onProgress = proc( let req = NodeDownloadRequest.createShared(
bytes: int NodeDownloadMsgType.INIT, cid = cid, chunkSize = chunkSize, local = local
): Future[void] {.async: (raises: [CancelledError]).} =
if userData != nil:
callback(RET_PROGRESS, nil, cast[csize_t](bytes), userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.SUBSCRIBE, sessionId = sessionId, onProgress = onProgress
) )
let res = codex_context.sendRequestToCodexThread( let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData ctx, RequestType.DOWNLOAD, req, callback, userData
) )
return callback.okOrError(res, userData) result = callback.okOrError(res, userData)
proc codex_download_chunk(
ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.CHUNK, cid = cid)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DOWNLOAD, req, callback, userData
)
result = callback.okOrError(res, userData)
proc codex_download_local(
ctx: ptr CodexContext,
cid: cstring,
chunkSize: csize_t,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeDownloadRequest.createShared(
NodeDownloadMsgType.LOCAL, cid = cid, chunkSize = chunkSize
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DOWNLOAD, req, callback, userData
)
result = callback.okOrError(res, userData)
proc codex_download_cancel(
ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.CANCEL, cid = cid)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DOWNLOAD, req, callback, userData
)
result = callback.okOrError(res, userData)
proc codex_start( proc codex_start(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, callback: CodexCallback, userData: pointer