mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-05 23:13:09 +00:00
Add progress callback for upload
This commit is contained in:
parent
fa91e2efcc
commit
ea06e3df3b
@ -81,6 +81,8 @@ type
|
|||||||
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.
|
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.
|
||||||
gcsafe, async: (raises: [CancelledError])
|
gcsafe, async: (raises: [CancelledError])
|
||||||
.}
|
.}
|
||||||
|
OnBlockStoreProc =
|
||||||
|
proc(chunk: seq[byte]): Future[void] {.gcsafe, async: (raises: [CancelledError]).}
|
||||||
|
|
||||||
func switch*(self: CodexNodeRef): Switch =
|
func switch*(self: CodexNodeRef): Switch =
|
||||||
return self.switch
|
return self.switch
|
||||||
@ -403,6 +405,7 @@ proc store*(
|
|||||||
filename: ?string = string.none,
|
filename: ?string = string.none,
|
||||||
mimetype: ?string = string.none,
|
mimetype: ?string = string.none,
|
||||||
blockSize = DefaultBlockSize,
|
blockSize = DefaultBlockSize,
|
||||||
|
onBlockStoreProc: OnBlockStoreProc = nil,
|
||||||
): Future[?!Cid] {.async.} =
|
): Future[?!Cid] {.async.} =
|
||||||
## Save stream contents as dataset with given blockSize
|
## Save stream contents as dataset with given blockSize
|
||||||
## to nodes's BlockStore, and return Cid of its manifest
|
## to nodes's BlockStore, and return Cid of its manifest
|
||||||
@ -432,6 +435,9 @@ proc store*(
|
|||||||
if err =? (await self.networkStore.putBlock(blk)).errorOption:
|
if err =? (await self.networkStore.putBlock(blk)).errorOption:
|
||||||
error "Unable to store block", cid = blk.cid, err = err.msg
|
error "Unable to store block", cid = blk.cid, err = err.msg
|
||||||
return failure(&"Unable to store block {blk.cid}")
|
return failure(&"Unable to store block {blk.cid}")
|
||||||
|
|
||||||
|
if not onBlockStoreProc.isNil:
|
||||||
|
discard onBlockStoreProc(chunk)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
raise exc
|
raise exc
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
|
|||||||
@ -104,8 +104,8 @@ package main
|
|||||||
return codex_peer_debug(codexCtx, peerId, (CodexCallback) callback, resp);
|
return codex_peer_debug(codexCtx, peerId, (CodexCallback) callback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int cGoCodexUploadInit(void* codexCtx, char* filepath, void* resp) {
|
static int cGoCodexUploadInit(void* codexCtx, char* filepath, size_t chunkSize, void* resp) {
|
||||||
return codex_upload_init(codexCtx, filepath, (CodexCallback) callback, resp);
|
return codex_upload_init(codexCtx, filepath, chunkSize, (CodexCallback) callback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) {
|
static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) {
|
||||||
@ -120,8 +120,8 @@ package main
|
|||||||
return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp);
|
return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int cGoCodexUploadFile(void* codexCtx, char* sessionId, size_t chunkSize, void* resp) {
|
static int cGoCodexUploadFile(void* codexCtx, char* sessionId, void* resp) {
|
||||||
return codex_upload_file(codexCtx, sessionId, chunkSize, (CodexCallback) callback, resp);
|
return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int cGoCodexStart(void* codexCtx, void* resp) {
|
static int cGoCodexStart(void* codexCtx, void* resp) {
|
||||||
@ -253,12 +253,23 @@ type CodexNode struct {
|
|||||||
ctx unsafe.Pointer
|
ctx unsafe.Pointer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const defaultBlockSize = 1024 * 64
|
||||||
|
|
||||||
|
type CodexUploadOptions struct {
|
||||||
|
filepath string
|
||||||
|
chunkSize int
|
||||||
|
onProgress func(read, total int, percent float64)
|
||||||
|
}
|
||||||
|
|
||||||
type bridgeCtx struct {
|
type bridgeCtx struct {
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
h cgo.Handle
|
h cgo.Handle
|
||||||
resp unsafe.Pointer
|
resp unsafe.Pointer
|
||||||
result string
|
result string
|
||||||
err error
|
err error
|
||||||
|
|
||||||
|
// Callback used for upload and download
|
||||||
|
onProgress func(read int)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBridgeCtx() *bridgeCtx {
|
func newBridgeCtx() *bridgeCtx {
|
||||||
@ -273,18 +284,13 @@ func newBridgeCtx() *bridgeCtx {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *bridgeCtx) free() {
|
func (b *bridgeCtx) free() {
|
||||||
|
b.h.Delete()
|
||||||
|
b.h = 0
|
||||||
|
|
||||||
C.freeResp(b.resp)
|
C.freeResp(b.resp)
|
||||||
b.resp = nil
|
b.resp = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bridgeCtx) isOK() bool {
|
|
||||||
return C.getRet(b.resp) == C.RET_OK
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *bridgeCtx) isError() bool {
|
|
||||||
return C.getRet(b.resp) == C.RET_ERR
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *bridgeCtx) CallError(name string) error {
|
func (b *bridgeCtx) CallError(name string) error {
|
||||||
return fmt.Errorf("Failed the call to %s. Returned code: %d.", name, C.getRet(b.resp))
|
return fmt.Errorf("Failed the call to %s. Returned code: %d.", name, C.getRet(b.resp))
|
||||||
}
|
}
|
||||||
@ -295,6 +301,21 @@ func (b *bridgeCtx) wait() (string, error) {
|
|||||||
return b.result, b.err
|
return b.result, b.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getReaderSize(r io.Reader) int64 {
|
||||||
|
switch v := r.(type) {
|
||||||
|
case *os.File:
|
||||||
|
stat, err := v.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return stat.Size()
|
||||||
|
case *bytes.Buffer:
|
||||||
|
return int64(v.Len())
|
||||||
|
default:
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//export callback
|
//export callback
|
||||||
func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
||||||
if resp == nil {
|
if resp == nil {
|
||||||
@ -306,6 +327,16 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
|||||||
m.msg = msg
|
m.msg = msg
|
||||||
m.len = len
|
m.len = len
|
||||||
|
|
||||||
|
if ret == C.RET_PROGRESS {
|
||||||
|
if m.h != 0 {
|
||||||
|
h := cgo.Handle(m.h)
|
||||||
|
if v, ok := h.Value().(*bridgeCtx); ok && v.onProgress != nil {
|
||||||
|
v.onProgress(int(len))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if m.h == 0 {
|
if m.h == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -320,20 +351,15 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
|||||||
if ret == C.RET_OK || ret == C.RET_ERR {
|
if ret == C.RET_OK || ret == C.RET_ERR {
|
||||||
retMsg := C.GoStringN(msg, C.int(len))
|
retMsg := C.GoStringN(msg, C.int(len))
|
||||||
|
|
||||||
// log.Println("Callback called with ret:", ret, " msg:", retMsg, " len:", len)
|
|
||||||
|
|
||||||
if ret == C.RET_OK {
|
if ret == C.RET_OK {
|
||||||
v.result = retMsg
|
v.result = retMsg
|
||||||
|
v.err = nil
|
||||||
} else {
|
} else {
|
||||||
v.err = errors.New(retMsg)
|
v.err = errors.New(retMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
h.Delete()
|
|
||||||
m.h = 0
|
|
||||||
|
|
||||||
if v.wg != nil {
|
if v.wg != nil {
|
||||||
v.wg.Done()
|
v.wg.Done()
|
||||||
v = nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -500,14 +526,32 @@ func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
|
|||||||
return record, err
|
return record, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *CodexNode) CodexUploadInit(filename string) (string, error) {
|
func (self *CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) {
|
||||||
bridge := newBridgeCtx()
|
bridge := newBridgeCtx()
|
||||||
defer bridge.free()
|
totalRead := 0
|
||||||
|
|
||||||
var cFilename = C.CString(filename)
|
bridge.onProgress = func(bytes int) {
|
||||||
|
if bytes == -1 {
|
||||||
|
bridge.free()
|
||||||
|
} else {
|
||||||
|
totalRead += bytes
|
||||||
|
|
||||||
|
if options.onProgress != nil {
|
||||||
|
options.onProgress(bytes, totalRead, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var cFilename = C.CString(options.filepath)
|
||||||
defer C.free(unsafe.Pointer(cFilename))
|
defer C.free(unsafe.Pointer(cFilename))
|
||||||
|
|
||||||
if C.cGoCodexUploadInit(self.ctx, cFilename, bridge.resp) != C.RET_OK {
|
if options.chunkSize == 0 {
|
||||||
|
options.chunkSize = defaultBlockSize
|
||||||
|
}
|
||||||
|
|
||||||
|
var cChunkSize = C.size_t(options.chunkSize)
|
||||||
|
|
||||||
|
if C.cGoCodexUploadInit(self.ctx, cFilename, cChunkSize, bridge.resp) != C.RET_OK {
|
||||||
return "", bridge.CallError("cGoCodexUploadInit")
|
return "", bridge.CallError("cGoCodexUploadInit")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -563,13 +607,26 @@ func (self *CodexNode) CodexUploadCancel(sessionId string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *CodexNode) CodexUploadReader(filename string, r io.Reader, chunkSize int) (string, error) {
|
func (self *CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) {
|
||||||
sessionId, err := self.CodexUploadInit(filename)
|
if options.onProgress != nil {
|
||||||
|
size := getReaderSize(r)
|
||||||
|
|
||||||
|
if size > 0 {
|
||||||
|
fn := options.onProgress
|
||||||
|
|
||||||
|
options.onProgress = func(read, total int, _ float64) {
|
||||||
|
percent := float64(total) / float64(size) * 100.0
|
||||||
|
fn(read, total, percent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sessionId, err := self.CodexUploadInit(&options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, chunkSize)
|
buf := make([]byte, options.chunkSize)
|
||||||
for {
|
for {
|
||||||
n, err := r.Read(buf)
|
n, err := r.Read(buf)
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
@ -592,23 +649,38 @@ func (self *CodexNode) CodexUploadReader(filename string, r io.Reader, chunkSize
|
|||||||
return self.CodexUploadFinalize(sessionId)
|
return self.CodexUploadFinalize(sessionId)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO provide an async version of CodexUploadReader
|
func (self *CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) {
|
||||||
// that starts a gorountine to upload the chunks
|
go func() {
|
||||||
// and take:
|
cid, err := self.CodexUploadReader(options, r)
|
||||||
// a callback to be called when done
|
onDone(cid, err)
|
||||||
// another callback to cancel the upload
|
}()
|
||||||
func (self *CodexNode) CodexUploadReaderAsync(filename string, r io.Reader, chunkSize int) (string, error) {
|
|
||||||
return "", nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *CodexNode) CodexUploadFile(filepath string, chunkSize int) (string, error) {
|
func (self *CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) {
|
||||||
|
if options.onProgress != nil {
|
||||||
|
stat, err := os.Stat(options.filepath)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
size := stat.Size()
|
||||||
|
if size > 0 {
|
||||||
|
fn := options.onProgress
|
||||||
|
|
||||||
|
options.onProgress = func(read, total int, _ float64) {
|
||||||
|
percent := float64(total) / float64(size) * 100.0
|
||||||
|
fn(read, total, percent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bridge := newBridgeCtx()
|
bridge := newBridgeCtx()
|
||||||
defer bridge.free()
|
defer bridge.free()
|
||||||
|
|
||||||
var cFilePath = C.CString(filepath)
|
var cFilePath = C.CString(options.filepath)
|
||||||
defer C.free(unsafe.Pointer(cFilePath))
|
defer C.free(unsafe.Pointer(cFilePath))
|
||||||
|
|
||||||
sessionId, err := self.CodexUploadInit(filepath)
|
sessionId, err := self.CodexUploadInit(&options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -616,12 +688,7 @@ func (self *CodexNode) CodexUploadFile(filepath string, chunkSize int) (string,
|
|||||||
var cSessionId = C.CString(sessionId)
|
var cSessionId = C.CString(sessionId)
|
||||||
defer C.free(unsafe.Pointer(cSessionId))
|
defer C.free(unsafe.Pointer(cSessionId))
|
||||||
|
|
||||||
var cChunkSize = C.size_t(0)
|
if C.cGoCodexUploadFile(self.ctx, cSessionId, bridge.resp) != C.RET_OK {
|
||||||
if chunkSize > 0 {
|
|
||||||
cChunkSize = C.size_t(chunkSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
if C.cGoCodexUploadFile(self.ctx, cSessionId, cChunkSize, bridge.resp) != C.RET_OK {
|
|
||||||
return "", bridge.CallError("cGoCodexUploadFile")
|
return "", bridge.CallError("cGoCodexUploadFile")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -634,7 +701,7 @@ func (self *CodexNode) CodexUploadFile(filepath string, chunkSize int) (string,
|
|||||||
// and take:
|
// and take:
|
||||||
// a callback to be called when done
|
// a callback to be called when done
|
||||||
// another callback to cancel the upload
|
// another callback to cancel the upload
|
||||||
func (self *CodexNode) CodexUploadFileAsync(filepath string, chunkSize int) (string, error) {
|
func (self *CodexNode) CodexUploadFileAsync(filepath string, chunkSize int, cb func(error)) (string, error) {
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -650,20 +717,11 @@ func (self *CodexNode) CodexStart() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *CodexNode) CodexStartAsync(cb func(error)) error {
|
func (self *CodexNode) CodexStartAsync(onDone func(error)) {
|
||||||
bridge := newBridgeCtx()
|
|
||||||
defer bridge.free()
|
|
||||||
|
|
||||||
if C.cGoCodexStart(self.ctx, bridge.resp) != C.RET_OK {
|
|
||||||
return bridge.CallError("cGoCodexStart")
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
_, err := bridge.wait()
|
err := self.CodexStart()
|
||||||
cb(err)
|
onDone(err)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *CodexNode) CodexStop() error {
|
func (self *CodexNode) CodexStop() error {
|
||||||
@ -717,7 +775,7 @@ func main() {
|
|||||||
|
|
||||||
log.Println("Codex created.")
|
log.Println("Codex created.")
|
||||||
|
|
||||||
// node.CodexSetEventCallback()
|
node.CodexSetEventCallback()
|
||||||
|
|
||||||
version, err := node.CodexVersion()
|
version, err := node.CodexVersion()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -783,7 +841,7 @@ func main() {
|
|||||||
|
|
||||||
log.Println("Codex Log Level set to TRACE")
|
log.Println("Codex Log Level set to TRACE")
|
||||||
|
|
||||||
sessionId, err := node.CodexUploadInit("hello.txt")
|
sessionId, err := node.CodexUploadInit(&CodexUploadOptions{filepath: "hello.txt"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Error happened:", err.Error())
|
log.Fatal("Error happened:", err.Error())
|
||||||
}
|
}
|
||||||
@ -808,7 +866,7 @@ func main() {
|
|||||||
log.Println("Codex Upload Finalized, cid:", cid)
|
log.Println("Codex Upload Finalized, cid:", cid)
|
||||||
|
|
||||||
buf := bytes.NewBuffer([]byte("Hello World!"))
|
buf := bytes.NewBuffer([]byte("Hello World!"))
|
||||||
cid, err = node.CodexUploadReader("hello.txt", buf, 16*1024)
|
cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt"}, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Error happened:", err.Error())
|
log.Fatal("Error happened:", err.Error())
|
||||||
}
|
}
|
||||||
@ -820,9 +878,14 @@ func main() {
|
|||||||
log.Fatal("Error happened:", err.Error())
|
log.Fatal("Error happened:", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Choose a big file to see the progress logs
|
||||||
filepath := path.Join(current, "examples", "golang", "hello.txt")
|
filepath := path.Join(current, "examples", "golang", "hello.txt")
|
||||||
log.Println("Uploading file:", filepath)
|
// filepath := path.Join(current, "examples", "golang", "bigfile.zip")
|
||||||
cid, err = node.CodexUploadFile(filepath, 1024)
|
|
||||||
|
cid, err = node.CodexUploadFile(CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64) {
|
||||||
|
log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent)
|
||||||
|
}})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Error happened:", err.Error())
|
log.Fatal("Error happened:", err.Error())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,12 +18,16 @@ type NodeUploadMsgType* = enum
|
|||||||
CANCEL
|
CANCEL
|
||||||
FILE
|
FILE
|
||||||
|
|
||||||
|
type OnProgressHandler =
|
||||||
|
proc(bytes: int): Future[void] {.gcsafe, async: (raises: [CancelledError]).}
|
||||||
|
|
||||||
type NodeUploadRequest* = object
|
type NodeUploadRequest* = object
|
||||||
operation: NodeUploadMsgType
|
operation: NodeUploadMsgType
|
||||||
sessionId: cstring
|
sessionId: cstring
|
||||||
filepath: cstring
|
filepath: cstring
|
||||||
chunk: seq[byte]
|
chunk: seq[byte]
|
||||||
chunkSize: csize_t
|
chunkSize: csize_t
|
||||||
|
onProgress: OnProgressHandler
|
||||||
|
|
||||||
type
|
type
|
||||||
UploadSessionId* = string
|
UploadSessionId* = string
|
||||||
@ -32,6 +36,7 @@ type
|
|||||||
stream: BufferStream
|
stream: BufferStream
|
||||||
fut: Future[?!Cid]
|
fut: Future[?!Cid]
|
||||||
filepath: string
|
filepath: string
|
||||||
|
chunkSize: int
|
||||||
|
|
||||||
var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession]
|
var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession]
|
||||||
var nexUploadSessionCount {.threadvar.}: UploadSessionCount
|
var nexUploadSessionCount {.threadvar.}: UploadSessionCount
|
||||||
@ -42,6 +47,7 @@ proc createShared*(
|
|||||||
sessionId: cstring = "",
|
sessionId: cstring = "",
|
||||||
filepath: cstring = "",
|
filepath: cstring = "",
|
||||||
chunk: seq[byte] = @[],
|
chunk: seq[byte] = @[],
|
||||||
|
onProgress: OnProgressHandler = nil,
|
||||||
chunkSize: csize_t = 0,
|
chunkSize: csize_t = 0,
|
||||||
): ptr type T =
|
): ptr type T =
|
||||||
var ret = createShared(T)
|
var ret = createShared(T)
|
||||||
@ -50,6 +56,7 @@ proc createShared*(
|
|||||||
ret[].filepath = filepath.alloc()
|
ret[].filepath = filepath.alloc()
|
||||||
ret[].chunk = chunk
|
ret[].chunk = chunk
|
||||||
ret[].chunkSize = chunkSize
|
ret[].chunkSize = chunkSize
|
||||||
|
ret[].onProgress = onProgress
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
proc destroyShared(self: ptr NodeUploadRequest) =
|
proc destroyShared(self: ptr NodeUploadRequest) =
|
||||||
@ -64,7 +71,10 @@ proc destroyShared(self: ptr NodeUploadRequest) =
|
|||||||
## or it can be the filename when the file will be uploaded via chunks.
|
## or it can be the filename when the file will be uploaded via chunks.
|
||||||
## The mimetype is deduced from the filename extension.
|
## The mimetype is deduced from the filename extension.
|
||||||
proc init(
|
proc init(
|
||||||
codex: ptr CodexServer, filepath: cstring = ""
|
codex: ptr CodexServer,
|
||||||
|
filepath: cstring = "",
|
||||||
|
chunkSize: csize_t = 0,
|
||||||
|
onProgress: OnProgressHandler,
|
||||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||||
var filenameOpt, mimetypeOpt = string.none
|
var filenameOpt, mimetypeOpt = string.none
|
||||||
|
|
||||||
@ -94,9 +104,25 @@ proc init(
|
|||||||
let stream = BufferStream.new()
|
let stream = BufferStream.new()
|
||||||
let lpStream = LPStream(stream)
|
let lpStream = LPStream(stream)
|
||||||
let node = codex[].node
|
let node = codex[].node
|
||||||
let fut = node.store(lpStream, filenameOpt, mimetypeOpt)
|
|
||||||
uploadSessions[sessionId] =
|
let onBlockStore = proc(
|
||||||
UploadSession(stream: stream, fut: fut, filepath: $filepath)
|
chunk: seq[byte]
|
||||||
|
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
|
||||||
|
discard onProgress(chunk.len)
|
||||||
|
|
||||||
|
let blockSize =
|
||||||
|
if chunkSize.NBytes > 0.NBytes: chunkSize.NBytes else: DefaultBlockSize
|
||||||
|
let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStore)
|
||||||
|
|
||||||
|
proc cb(_: pointer) {.raises: [].} =
|
||||||
|
# Signal end of upload
|
||||||
|
discard onProgress(-1)
|
||||||
|
|
||||||
|
fut.addCallback(cb)
|
||||||
|
|
||||||
|
uploadSessions[sessionId] = UploadSession(
|
||||||
|
stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int
|
||||||
|
)
|
||||||
|
|
||||||
return ok(sessionId)
|
return ok(sessionId)
|
||||||
|
|
||||||
@ -163,20 +189,21 @@ proc cancel(
|
|||||||
return ok("")
|
return ok("")
|
||||||
|
|
||||||
proc file(
|
proc file(
|
||||||
codex: ptr CodexServer, sessionId: cstring, chunkSize: csize_t = 1024
|
codex: ptr CodexServer, sessionId: cstring
|
||||||
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
|
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
|
||||||
if not uploadSessions.contains($sessionId):
|
if not uploadSessions.contains($sessionId):
|
||||||
return err("Invalid session ID")
|
return err("Invalid session ID")
|
||||||
|
|
||||||
let size = if chunkSize > 0: chunkSize else: 1024
|
|
||||||
var buffer = newSeq[byte](size)
|
|
||||||
var session: UploadSession
|
var session: UploadSession
|
||||||
|
|
||||||
## Here we certainly need to spawn a new thread to avoid blocking
|
## Here we certainly need to spawn a new thread to avoid blocking
|
||||||
## the worker thread while reading the file.
|
## the worker thread while reading the file.
|
||||||
try:
|
try:
|
||||||
session = uploadSessions[$sessionId]
|
session = uploadSessions[$sessionId]
|
||||||
|
var buffer = newSeq[byte](session.chunkSize)
|
||||||
let fs = openFileStream(session.filepath)
|
let fs = openFileStream(session.filepath)
|
||||||
|
defer:
|
||||||
|
fs.close()
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
let bytesRead = fs.readData(addr buffer[0], buffer.len)
|
let bytesRead = fs.readData(addr buffer[0], buffer.len)
|
||||||
@ -185,13 +212,7 @@ proc file(
|
|||||||
break
|
break
|
||||||
await session.stream.pushData(buffer[0 ..< bytesRead])
|
await session.stream.pushData(buffer[0 ..< bytesRead])
|
||||||
|
|
||||||
await session.stream.pushEof()
|
return await codex.finalize(sessionId)
|
||||||
|
|
||||||
let res = await session.fut
|
|
||||||
if res.isErr:
|
|
||||||
return err("Upload failed: " & res.error().msg)
|
|
||||||
|
|
||||||
return ok($res.get())
|
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
return err("Invalid session ID")
|
return err("Invalid session ID")
|
||||||
except LPStreamError, IOError:
|
except LPStreamError, IOError:
|
||||||
@ -202,6 +223,7 @@ proc file(
|
|||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
return err("Upload failed: " & $e.msg)
|
return err("Upload failed: " & $e.msg)
|
||||||
finally:
|
finally:
|
||||||
|
session.fut.cancel()
|
||||||
uploadSessions.del($sessionId)
|
uploadSessions.del($sessionId)
|
||||||
|
|
||||||
proc process*(
|
proc process*(
|
||||||
@ -212,7 +234,7 @@ proc process*(
|
|||||||
|
|
||||||
case self.operation
|
case self.operation
|
||||||
of NodeUploadMsgType.INIT:
|
of NodeUploadMsgType.INIT:
|
||||||
let res = (await init(codex, self.filepath))
|
let res = (await init(codex, self.filepath, self.chunkSize, self.onProgress))
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
error "INIT failed", error = res.error
|
error "INIT failed", error = res.error
|
||||||
return err($res.error)
|
return err($res.error)
|
||||||
@ -236,7 +258,7 @@ proc process*(
|
|||||||
return err($res.error)
|
return err($res.error)
|
||||||
return res
|
return res
|
||||||
of NodeUploadMsgType.FILE:
|
of NodeUploadMsgType.FILE:
|
||||||
let res = (await file(codex, self.sessionId, self.chunkSize))
|
let res = (await file(codex, self.sessionId))
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
error "FILE failed", error = res.error
|
error "FILE failed", error = res.error
|
||||||
return err($res.error)
|
return err($res.error)
|
||||||
|
|||||||
@ -14,6 +14,7 @@ type CodexCallback* = proc(
|
|||||||
const RET_OK*: cint = 0
|
const RET_OK*: cint = 0
|
||||||
const RET_ERR*: cint = 1
|
const RET_ERR*: cint = 1
|
||||||
const RET_MISSING_CALLBACK*: cint = 2
|
const RET_MISSING_CALLBACK*: cint = 2
|
||||||
|
const RET_PROGRESS*: cint = 3
|
||||||
|
|
||||||
## Returns RET_OK as acknowledgment and call the callback
|
## Returns RET_OK as acknowledgment and call the callback
|
||||||
## with RET_OK code and the provided message.
|
## with RET_OK code and the provided message.
|
||||||
|
|||||||
@ -18,6 +18,7 @@
|
|||||||
#define RET_OK 0
|
#define RET_OK 0
|
||||||
#define RET_ERR 1
|
#define RET_ERR 1
|
||||||
#define RET_MISSING_CALLBACK 2
|
#define RET_MISSING_CALLBACK 2
|
||||||
|
#define RET_PROGRESS 3
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
@ -84,6 +85,7 @@ int codex_peer_debug(
|
|||||||
int codex_upload_init(
|
int codex_upload_init(
|
||||||
void* ctx,
|
void* ctx,
|
||||||
const char* filepath,
|
const char* filepath,
|
||||||
|
size_t chunkSize,
|
||||||
CodexCallback callback,
|
CodexCallback callback,
|
||||||
void* userData);
|
void* userData);
|
||||||
|
|
||||||
@ -110,7 +112,6 @@ int codex_upload_cancel(
|
|||||||
int codex_upload_file(
|
int codex_upload_file(
|
||||||
void* ctx,
|
void* ctx,
|
||||||
const char* sessionId,
|
const char* sessionId,
|
||||||
size_t chunkSize,
|
|
||||||
CodexCallback callback,
|
CodexCallback callback,
|
||||||
void* userData);
|
void* userData);
|
||||||
|
|
||||||
|
|||||||
@ -247,12 +247,25 @@ proc codex_destroy(
|
|||||||
return callback.success("", userData)
|
return callback.success("", userData)
|
||||||
|
|
||||||
proc codex_upload_init(
|
proc codex_upload_init(
|
||||||
ctx: ptr CodexContext, filepath: cstring, callback: CodexCallback, userData: pointer
|
ctx: ptr CodexContext,
|
||||||
|
filepath: cstring,
|
||||||
|
chunkSize: csize_t,
|
||||||
|
callback: CodexCallback,
|
||||||
|
userData: pointer,
|
||||||
): cint {.dynlib, exportc.} =
|
): cint {.dynlib, exportc.} =
|
||||||
init(ctx, callback, userData)
|
init(ctx, callback, userData)
|
||||||
|
|
||||||
let reqContent =
|
let onProgress = proc(
|
||||||
NodeUploadRequest.createShared(NodeUploadMsgType.INIT, filepath = filepath)
|
bytes: int
|
||||||
|
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
|
||||||
|
callback(RET_PROGRESS, nil, bytes.csize_t, userData)
|
||||||
|
|
||||||
|
let reqContent = NodeUploadRequest.createShared(
|
||||||
|
NodeUploadMsgType.INIT,
|
||||||
|
filepath = filepath,
|
||||||
|
chunkSize = chunkSize,
|
||||||
|
onProgress = onProgress,
|
||||||
|
)
|
||||||
let res = codex_context.sendRequestToCodexThread(
|
let res = codex_context.sendRequestToCodexThread(
|
||||||
ctx, RequestType.UPLOAD, reqContent, callback, userData
|
ctx, RequestType.UPLOAD, reqContent, callback, userData
|
||||||
)
|
)
|
||||||
@ -317,15 +330,13 @@ proc codex_upload_cancel(
|
|||||||
proc codex_upload_file(
|
proc codex_upload_file(
|
||||||
ctx: ptr CodexContext,
|
ctx: ptr CodexContext,
|
||||||
sessionId: cstring,
|
sessionId: cstring,
|
||||||
chunkSize: csize_t,
|
|
||||||
callback: CodexCallback,
|
callback: CodexCallback,
|
||||||
userData: pointer,
|
userData: pointer,
|
||||||
): cint {.dynlib, exportc.} =
|
): cint {.dynlib, exportc.} =
|
||||||
init(ctx, callback, userData)
|
init(ctx, callback, userData)
|
||||||
|
|
||||||
let reqContent = NodeUploadRequest.createShared(
|
let reqContent =
|
||||||
NodeUploadMsgType.FILE, sessionId = sessionId, chunkSize = chunkSize
|
NodeUploadRequest.createShared(NodeUploadMsgType.FILE, sessionId = sessionId)
|
||||||
)
|
|
||||||
|
|
||||||
let res = codex_context.sendRequestToCodexThread(
|
let res = codex_context.sendRequestToCodexThread(
|
||||||
ctx, RequestType.UPLOAD, reqContent, callback, userData
|
ctx, RequestType.UPLOAD, reqContent, callback, userData
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user