Refactor to avoid memory issues and add download local

This commit is contained in:
Arnaud 2025-09-30 06:05:04 +02:00 committed by Eric
parent 2c3849f119
commit 5cc4f24dc5
No known key found for this signature in database
4 changed files with 536 additions and 102 deletions

View File

@ -124,8 +124,20 @@ package main
return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexUploadSubscribe(void* codexCtx, char* sessionId, void* resp) {
return codex_upload_subscribe(codexCtx, sessionId, (CodexCallback) callback, resp);
static int cGoCodexDownloadInit(void* codexCtx, char* cid, size_t chunkSize, bool local, void* resp) {
return codex_download_init(codexCtx, cid, chunkSize, local, (CodexCallback) callback, resp);
}
static int cGoCodexDownloadChunk(void* codexCtx, char* cid, void* resp) {
return codex_download_chunk(codexCtx, cid, (CodexCallback) callback, resp);
}
static int cGoCodexDownloadLocal(void* codexCtx, char* cid, size_t chunkSize, void* resp) {
return codex_download_local(codexCtx, cid, chunkSize, (CodexCallback) callback, resp);
}
static int cGoCodexDownloadCancel(void* codexCtx, char* cid, void* resp) {
return codex_download_cancel(codexCtx, cid, (CodexCallback) callback, resp);
}
static int cGoCodexStart(void* codexCtx, void* resp) {
@ -158,7 +170,6 @@ package main
*/
import "C"
import (
"bytes"
"encoding/json"
@ -236,7 +247,7 @@ type RestNode struct {
NodeId string `json:"nodeId"`
PeerId string `json:"peerId"`
Record string `json:"record"`
Address *string `json:"address"` // Use pointer for nullable
Address *string `json:"address"`
Seen bool `json:"seen"`
}
@ -259,7 +270,7 @@ type CodexNode struct {
const defaultBlockSize = 1024 * 64
type OnProgressFunc func(read, total int, percent float64)
type OnProgressFunc func(read, total int, percent float64, err error)
type CodexUploadOptions struct {
filepath string
@ -275,16 +286,16 @@ type bridgeCtx struct {
err error
// Callback used for upload and download
onProgress func(read int)
onProgress func(read int, chunk []byte)
}
func newBridgeCtx() *bridgeCtx {
bridge := &bridgeCtx{}
bridge.wg = &sync.WaitGroup{}
bridge.wg.Add(1)
bridge.h = cgo.NewHandle(bridge)
bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h)))
return bridge
}
@ -307,12 +318,7 @@ func (b *bridgeCtx) CallError(name string) error {
func (b *bridgeCtx) wait() (string, error) {
b.wg.Wait()
result := b.result
err := b.err
b.free()
return result, err
return b.result, b.err
}
func getReaderSize(r io.Reader) int64 {
@ -354,8 +360,15 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
if v, ok := h.Value().(*bridgeCtx); ok {
switch ret {
case C.RET_PROGRESS:
if v.onProgress != nil {
v.onProgress(int(C.int(len)))
if v.onProgress == nil {
return
}
if msg != nil {
chunk := C.GoBytes(unsafe.Pointer(msg), C.int(len))
v.onProgress(int(C.int(len)), chunk)
} else {
v.onProgress(int(C.int(len)), nil)
}
case C.RET_OK:
retMsg := C.GoStringN(msg, C.int(len))
@ -378,8 +391,10 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
func CodexNew(config CodexConfig) (*CodexNode, error) {
bridge := newBridgeCtx()
defer bridge.free()
jsonConfig, err := json.Marshal(config)
if err != nil {
return nil, err
}
@ -398,6 +413,7 @@ func CodexNew(config CodexConfig) (*CodexNode, error) {
func (self CodexNode) CodexVersion() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexVersion(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexVersion")
@ -408,6 +424,7 @@ func (self CodexNode) CodexVersion() (string, error) {
func (self CodexNode) CodexRevision() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexRevision(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexRevision")
@ -418,6 +435,7 @@ func (self CodexNode) CodexRevision() (string, error) {
func (self CodexNode) CodexRepo() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexRepo(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexRepo")
@ -430,6 +448,7 @@ func (self CodexNode) CodexDebug() (CodexDebugInfo, error) {
var info CodexDebugInfo
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexDebug(self.ctx, bridge.resp) != C.RET_OK {
return info, bridge.CallError("cGoCodexDebug")
@ -447,6 +466,7 @@ func (self CodexNode) CodexDebug() (CodexDebugInfo, error) {
func (self CodexNode) CodexSpr() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexSpr(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexSpr")
@ -457,6 +477,7 @@ func (self CodexNode) CodexSpr() (string, error) {
func (self CodexNode) CodexPeerId() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexPeerId(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexPeerId")
@ -467,6 +488,7 @@ func (self CodexNode) CodexPeerId() (string, error) {
func (self CodexNode) CodexLogLevel(logLevel LogLevel) error {
bridge := newBridgeCtx()
defer bridge.free()
var cLogLevel = C.CString(fmt.Sprintf("%s", logLevel))
defer C.free(unsafe.Pointer(cLogLevel))
@ -476,11 +498,13 @@ func (self CodexNode) CodexLogLevel(logLevel LogLevel) error {
}
_, err := bridge.wait()
return err
}
func (self CodexNode) CodexConnect(peerId string, peerAddresses []string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cPeerId = C.CString(peerId)
defer C.free(unsafe.Pointer(cPeerId))
@ -509,6 +533,7 @@ func (self CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
var record RestPeerRecord
bridge := newBridgeCtx()
defer bridge.free()
var cPeerId = C.CString(peerId)
defer C.free(unsafe.Pointer(cPeerId))
@ -529,6 +554,7 @@ func (self CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cFilename = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilename))
@ -536,7 +562,6 @@ func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, erro
if options.chunkSize == 0 {
options.chunkSize = defaultBlockSize
}
var cChunkSize = C.size_t(options.chunkSize)
if C.cGoCodexUploadInit(self.ctx, cFilename, cChunkSize, bridge.resp) != C.RET_OK {
@ -548,6 +573,7 @@ func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, erro
func (self CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
@ -562,11 +588,13 @@ func (self CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error {
}
_, err := bridge.wait()
return err
}
func (self CodexNode) CodexUploadFinalize(sessionId string) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
@ -580,6 +608,7 @@ func (self CodexNode) CodexUploadFinalize(sessionId string) (string, error) {
func (self CodexNode) CodexUploadCancel(sessionId string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
@ -589,66 +618,40 @@ func (self CodexNode) CodexUploadCancel(sessionId string) error {
}
_, err := bridge.wait()
return err
}
func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) {
sessionId, err := self.CodexUploadInit(&options)
if err != nil {
return "", err
}
if options.onProgress != nil {
size := getReaderSize(r)
total := 0
if size > 0 {
onProgress := func(read int) {
if read == 0 {
return
}
total += read
percent := float64(total) / float64(size) * 100.0
// The last block could be a bit over the size due to padding
// on the chunk size.
if percent > 100.0 {
percent = 100.0
}
options.onProgress(read, int(size), percent)
}
if err := self.CodexUploadSubscribe(sessionId, onProgress); err != nil {
if err := self.CodexUploadCancel(sessionId); err != nil {
log.Println("Error cancelling upload after subscribe failure:", err)
}
return "", err
}
}
}
if options.chunkSize == 0 {
options.chunkSize = defaultBlockSize
}
buf := make([]byte, options.chunkSize)
total := 0
var size int64
if options.onProgress != nil {
size = getReaderSize(r)
}
for {
n, err := r.Read(buf)
if n > 0 {
if err := self.CodexUploadChunk(sessionId, buf[:n]); err != nil {
return "", err
}
}
if err == io.EOF {
break
}
if err != nil {
self.CodexUploadCancel(sessionId)
if cancelErr := self.CodexUploadCancel(sessionId); cancelErr != nil {
return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr)
}
return "", err
}
@ -656,6 +659,25 @@ func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader)
if n == 0 {
break
}
if err := self.CodexUploadChunk(sessionId, buf[:n]); err != nil {
if cancelErr := self.CodexUploadCancel(sessionId); cancelErr != nil {
return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr)
}
return "", err
}
total += n
if options.onProgress != nil && size > 0 {
percent := float64(total) / float64(size) * 100.0
// The last block could be a bit over the size due to padding
// on the chunk size.
if percent > 100.0 {
percent = 100.0
}
options.onProgress(n, total, percent, nil)
}
}
return self.CodexUploadFinalize(sessionId)
@ -670,9 +692,11 @@ func (self CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Re
func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if options.onProgress != nil {
stat, err := os.Stat(options.filepath)
if err != nil {
return "", err
}
@ -681,7 +705,7 @@ func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error
total := 0
if size > 0 {
bridge.onProgress = func(read int) {
bridge.onProgress = func(read int, _ []byte) {
if read == 0 {
return
}
@ -695,14 +719,11 @@ func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error
percent = 100.0
}
options.onProgress(read, int(size), percent)
options.onProgress(read, int(size), percent, nil)
}
}
}
var cFilePath = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilePath))
sessionId, err := self.CodexUploadInit(&options)
if err != nil {
return "", err
@ -725,27 +746,104 @@ func (self CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone fu
}()
}
func (self CodexNode) CodexUploadSubscribe(sessionId string, onProgress func(read int)) error {
func (self CodexNode) CodexDownloadLocal(cid string, chunkSize int, w io.Writer) error {
bridge := newBridgeCtx()
defer bridge.free()
bridge.onProgress = onProgress
bridge.onProgress = func(read int, chunk []byte) {
if read == 0 {
return
}
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
log.Println("Subscribing to upload progress...")
if C.cGoCodexUploadSubscribe(self.ctx, cSessionId, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexUploadSubscribe")
if _, err := w.Write(chunk); err != nil {
log.Println(err)
}
}
go func() {
if _, err := bridge.wait(); err != nil {
log.Println("Error in CodexUploadSubscribe:", err)
}
}()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
return nil
if chunkSize == 0 {
chunkSize = defaultBlockSize
}
var cChunkSize = C.size_t(chunkSize)
if C.cGoCodexDownloadLocal(self.ctx, cCid, cChunkSize, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexDownloadLocal")
}
_, err := bridge.wait()
return err
}
func (self CodexNode) CodexDownloadLocalAsync(cid string, chunkSize int, w io.Writer, onDone func(error)) {
go func() {
err := self.CodexDownloadLocal(cid, chunkSize, w)
onDone(err)
}()
}
func (self CodexNode) CodexDownloadInit(cid string, chunkSize int, local bool) error {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if chunkSize == 0 {
chunkSize = defaultBlockSize
}
var cChunkSize = C.size_t(chunkSize)
var cLocal = C.bool(local)
if C.cGoCodexDownloadInit(self.ctx, cCid, cChunkSize, cLocal, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexDownloadInit")
}
_, err := bridge.wait()
return err
}
func (self CodexNode) CodexDownloadChunk(cid string) ([]byte, error) {
bridge := newBridgeCtx()
defer bridge.free()
var bytes []byte
bridge.onProgress = func(read int, chunk []byte) {
bytes = chunk
}
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if C.cGoCodexDownloadChunk(self.ctx, cCid, bridge.resp) != C.RET_OK {
return nil, bridge.CallError("cGoCodexDownloadChunk")
}
if _, err := bridge.wait(); err != nil {
return nil, err
}
return bytes, nil
}
func (self CodexNode) CodexDownloadCancel(cid string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if C.cGoCodexDownloadCancel(self.ctx, cCid, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexDownloadCancel")
}
_, err := bridge.wait()
return err
}
func (self CodexNode) CodexStart() error {
@ -757,6 +855,7 @@ func (self CodexNode) CodexStart() error {
}
_, err := bridge.wait()
return err
}
@ -769,7 +868,6 @@ func (self CodexNode) CodexStartAsync(onDone func(error)) {
func (self CodexNode) CodexStop() error {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexStop(self.ctx, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexStop")
@ -781,7 +879,6 @@ func (self CodexNode) CodexStop() error {
func (self CodexNode) CodexDestroy() error {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexDestroy(self.ctx, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexDestroy")
@ -851,6 +948,8 @@ func main() {
log.Println("Codex started...")
// for i := 0; i < 150; i++ {
debug, err := node.CodexDebug()
if err != nil {
log.Fatal("Error happened:", err.Error())
@ -908,7 +1007,11 @@ func main() {
log.Println("Codex Upload Finalized, cid:", cid)
buf := bytes.NewBuffer([]byte("Hello World!"))
cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64) {
cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64, err error) {
if err != nil {
log.Fatal("Error happened during upload: %v\n", err)
}
log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent)
}}, buf)
if err != nil {
@ -921,12 +1024,15 @@ func main() {
if err != nil {
log.Fatal("Error happened:", err.Error())
}
// Choose a big file to see the progress logs
filepath := path.Join(current, "examples", "golang", "hello.txt")
//filepath := path.Join(current, "examples", "golang", "discord-0.0.109.deb")
options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64) {
options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64, err error) {
if err != nil {
log.Fatal("Error happened during upload: %v\n", err)
}
log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent)
}}
@ -938,6 +1044,38 @@ func main() {
log.Println("Codex Upload File finalized, cid: .", cid)
f, err := os.Create("hello.loaded.txt")
if err != nil {
log.Fatal(err)
}
defer f.Close()
// log.Println("Codex Download Local starting... attempt", i+1)
if err := node.CodexDownloadLocal(cid, 0, f); err != nil {
log.Fatal("Error happened:", err.Error())
}
log.Println("Codex Download Local finished.")
// log.Println("Codex Download Init starting... attempt", i+1)
if err := node.CodexDownloadInit(cid, 0, true); err != nil {
log.Fatal("Error happened:", err.Error())
}
log.Println("Codex Download Init finished.")
// log.Println("Codex Download Chunk starting... attempt", i+1)
chunk, err := node.CodexDownloadChunk(cid)
if err != nil {
log.Fatal("Error happened:", err.Error())
}
log.Println("Codex Download Chunk finished. Size:", len(chunk))
// }
// err = node.CodexConnect(peerId, []string{})
// if err != nil {
// log.Fatal("Error happened:", err.Error())

View File

@ -11,6 +11,7 @@ import ./requests/node_info_request
import ./requests/node_debug_request
import ./requests/node_p2p_request
import ./requests/node_upload_request
import ./requests/node_download_request
from ../../codex/codex import CodexServer
@ -20,6 +21,7 @@ type RequestType* {.pure.} = enum
DEBUG
P2P
UPLOAD
DOWNLOAD
type CodexThreadRequest* = object
reqType: RequestType
@ -94,11 +96,22 @@ proc process*(
cast[ptr NodeDebugRequest](request[].reqContent).process(codex)
of P2P:
cast[ptr NodeP2PRequest](request[].reqContent).process(codex)
of DOWNLOAD:
let onChunk = proc(bytes: seq[byte]) =
if bytes.len > 0:
request[].callback(
RET_PROGRESS,
cast[ptr cchar](unsafeAddr bytes[0]),
cast[csize_t](bytes.len),
request[].userData,
)
cast[ptr NodeDownloadRequest](request[].reqContent).process(codex, onChunk)
of UPLOAD:
let onUploadProgress = proc(bytes: int) =
let onBlockReceived = proc(bytes: int) =
request[].callback(RET_PROGRESS, nil, cast[csize_t](bytes), request[].userData)
cast[ptr NodeUploadRequest](request[].reqContent).process(codex, onUploadProgress)
cast[ptr NodeUploadRequest](request[].reqContent).process(codex, onBlockReceived)
handleRes(await retFut, request)

View File

@ -0,0 +1,245 @@
{.push raises: [].}
## This file contains the download request.
import std/[options, streams]
import chronos
import chronicles
import libp2p/stream/[lpstream]
import ../../alloc
import ../../../codex/units
import ../../../codex/codextypes
from ../../../codex/codex import CodexServer, node
from ../../../codex/node import retrieve
from libp2p import Cid, init, `$`
logScope:
topics = "codexlib codexlibdownload"
type NodeDownloadMsgType* = enum
INIT
LOCAL
NETWORK
CHUNK
CANCEL
type OnChunkHandler = proc(bytes: seq[byte]): void {.gcsafe, raises: [].}
type NodeDownloadRequest* = object
operation: NodeDownloadMsgType
cid: cstring
chunkSize: csize_t
local: bool
type
DownloadSessionId* = string
DownloadSessionCount* = int
DownloadSession* = object
stream: LPStream
chunkSize: int
var downloadSessions {.threadvar.}: Table[DownloadSessionId, DownloadSession]
proc createShared*(
T: type NodeDownloadRequest,
op: NodeDownloadMsgType,
cid: cstring = "",
chunkSize: csize_t = 0,
local: bool = false,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].cid = cid.alloc()
ret[].chunkSize = chunkSize
ret[].local = local
return ret
proc destroyShared(self: ptr NodeDownloadRequest) =
deallocShared(self)
proc init(
codex: ptr CodexServer,
cCid: cstring = "",
chunkSize: csize_t = 0,
local: bool = true,
): Future[Result[string, string]] {.async: (raises: []).} =
if downloadSessions.contains($cCid):
return ok("Download session already exists.")
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to download locally: cannot parse cid: " & $cCid)
let node = codex[].node
var stream: LPStream
try:
let res = await node.retrieve(cid.get(), local)
if res.isErr():
return err("Failed to init the download: " & res.error.msg)
stream = res.get()
except CancelledError:
downloadSessions.del($cCid)
return err("Failed to init the download: download cancelled.")
let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int
downloadSessions[$cCid] = DownloadSession(stream: stream, chunkSize: blockSize)
return ok("")
proc chunk(
codex: ptr CodexServer, cid: cstring = "", onChunk: OnChunkHandler
): Future[Result[string, string]] {.async: (raises: []).} =
if not downloadSessions.contains($cid):
return err("Failed to download chunk: no session for cid " & $cid)
var session: DownloadSession
try:
session = downloadSessions[$cid]
except KeyError:
return err("Failed to download chunk: no session for cid " & $cid)
let stream = session.stream
let chunkSize = session.chunkSize
if stream.atEof:
return ok("")
var buf = newSeq[byte](chunkSize)
try:
let read = await stream.readOnce(addr buf[0], buf.len)
buf.setLen(read)
except LPStreamError as e:
await stream.close()
downloadSessions.del($cid)
return err("Failed to download chunk: " & $e.msg)
except CancelledError:
await stream.close()
downloadSessions.del($cid)
return err("Failed to download chunk: download cancelled.")
if buf.len <= 0:
return err("Failed to download chunk: no data")
onChunk(buf)
return ok("")
proc streamFile(
codex: ptr CodexServer,
cid: Cid,
local: bool = true,
onChunk: OnChunkHandler,
chunkSize: csize_t,
): Future[Result[string, string]] {.async: (raises: [CancelledError]).} =
let node = codex[].node
let res = await node.retrieve(cid, local = local)
if res.isErr():
return err("Failed to retrieve CID: " & res.error.msg)
let stream = res.get()
if stream.atEof:
return err("Failed to retrieve CID: empty stream.")
let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int
var buf = newSeq[byte](blockSize)
var read = 0
try:
while not stream.atEof:
let read = await stream.readOnce(addr buf[0], buf.len)
buf.setLen(read)
if buf.len <= 0:
break
if onChunk != nil:
onChunk(buf)
except LPStreamError as e:
return err("Failed to stream file: " & $e.msg)
finally:
await stream.close()
downloadSessions.del($cid)
return ok("")
proc local(
codex: ptr CodexServer, cCid: cstring, chunkSize: csize_t, onChunk: OnChunkHandler
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
let node = codex[].node
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to download locally: cannot parse cid: " & $cCid)
try:
let local = true
let res = await codex.streamFile(cid.get(), true, onChunk, chunkSize)
if res.isErr:
return err($res.error)
except CancelledError:
return err("Failed to download locally: download cancelled.")
return ok("")
proc cancel(
codex: ptr CodexServer, cCid: cstring
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
if not downloadSessions.contains($cCid):
return err("Failed to download chunk: no session for cid " & $cCid)
var session: DownloadSession
try:
session = downloadSessions[$cCid]
except KeyError:
return err("Failed to download chunk: no session for cid " & $cCid)
let stream = session.stream
await stream.close()
downloadSessions.del($cCid)
return ok("")
proc process*(
self: ptr NodeDownloadRequest, codex: ptr CodexServer, onChunk: OnChunkHandler
): Future[Result[string, string]] {.async: (raises: []).} =
defer:
destroyShared(self)
case self.operation
of NodeDownloadMsgType.INIT:
let res = (await init(codex, self.cid, self.chunkSize, self.local))
if res.isErr:
error "Failed to INIT.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.LOCAL:
let res = (await local(codex, self.cid, self.chunkSize, onChunk))
if res.isErr:
error "Failed to LOCAL.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.NETWORK:
return err("NETWORK download not implemented yet.")
# let res = (await local(codex, self.cid, self.onChunk2, self.chunkSize, onChunk))
# if res.isErr:
# error "Failed to NETWORK.", error = res.error
# return err($res.error)
# return res
of NodeDownloadMsgType.CHUNK:
let res = (await chunk(codex, self.cid, onChunk))
if res.isErr:
error "Failed to CHUNK.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.CANCEL:
let res = (await cancel(codex, self.cid))
if res.isErr:
error "Failed to CANCEL.", error = res.error
return err($res.error)
return res

View File

@ -34,6 +34,7 @@ import ./codex_thread_requests/requests/node_info_request
import ./codex_thread_requests/requests/node_debug_request
import ./codex_thread_requests/requests/node_p2p_request
import ./codex_thread_requests/requests/node_upload_request
import ./codex_thread_requests/requests/node_download_request
import ./ffi_types
from ../codex/conf import codexVersion, updateLogLevel
@ -342,15 +343,8 @@ proc codex_upload_file(
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let onProgress = proc(
bytes: int
): Future[void] {.async: (raises: [CancelledError]).} =
if userData != nil:
callback(RET_PROGRESS, nil, cast[csize_t](bytes), userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.FILE, sessionId = sessionId, onProgress = onProgress
)
let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.FILE, sessionId = sessionId)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
@ -358,30 +352,74 @@ proc codex_upload_file(
return callback.okOrError(res, userData)
proc codex_upload_subscribe(
proc codex_download_init(
ctx: ptr CodexContext,
sessionId: cstring,
cid: cstring,
chunkSize: csize_t,
local: bool,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let onProgress = proc(
bytes: int
): Future[void] {.async: (raises: [CancelledError]).} =
if userData != nil:
callback(RET_PROGRESS, nil, cast[csize_t](bytes), userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.SUBSCRIBE, sessionId = sessionId, onProgress = onProgress
let req = NodeDownloadRequest.createShared(
NodeDownloadMsgType.INIT, cid = cid, chunkSize = chunkSize, local = local
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
ctx, RequestType.DOWNLOAD, req, callback, userData
)
return callback.okOrError(res, userData)
result = callback.okOrError(res, userData)
proc codex_download_chunk(
ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.CHUNK, cid = cid)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DOWNLOAD, req, callback, userData
)
result = callback.okOrError(res, userData)
proc codex_download_local(
ctx: ptr CodexContext,
cid: cstring,
chunkSize: csize_t,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeDownloadRequest.createShared(
NodeDownloadMsgType.LOCAL, cid = cid, chunkSize = chunkSize
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DOWNLOAD, req, callback, userData
)
result = callback.okOrError(res, userData)
proc codex_download_cancel(
ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.CANCEL, cid = cid)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DOWNLOAD, req, callback, userData
)
result = callback.okOrError(res, userData)
proc codex_start(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer