Merge bba3941c723e347b7cd0cfa6c0a08adb972e4bc3 into a9324e973a5f059b160e40842db0447d814ea758

This commit is contained in:
Arnaud 2026-06-23 13:00:14 +02:00 committed by GitHub
commit 05d6a2db96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 89 additions and 29 deletions

View File

@ -119,6 +119,7 @@ proc storage_new(
).isOkOr:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
discard storage_context.destroyStorageContext(ctx)
return nil
return ctx
@ -310,11 +311,8 @@ proc storage_upload_chunk(
initializeLibrary()
checkLibstorageParams(ctx, callback, userData)
let chunk = newSeq[byte](len)
copyMem(addr chunk[0], data, len)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.CHUNK, sessionId = sessionId, chunk = chunk
NodeUploadMsgType.CHUNK, sessionId = sessionId, chunkData = data, chunkLen = len.int
)
let res = storage_context.sendRequestToStorageThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData

View File

@ -37,7 +37,7 @@ type StorageContext* = object
# To notify the Logos Storage thread that a request is ready
reqSignal: ThreadSignalPtr
# To notify the client thread that the request was received.
# To notify the client thread that the request was received.
# It is acknowledgment signal (handshake).
reqReceivedSignal: ThreadSignalPtr
@ -48,7 +48,7 @@ type StorageContext* = object
# Function called by the library to notify the client of global events
eventCallback*: pointer
# Custom state attached by the client to the context,
# Custom state attached by the client to the context,
# returned with every event callback
eventUserData*: pointer
@ -56,8 +56,8 @@ type StorageContext* = object
running: Atomic[bool]
template callEventCallback(ctx: ptr StorageContext, eventName: string, body: untyped) =
## Template used to notify the client of global events
## Example: onConnectionChanged, onProofMissing, etc.
## Template used to notify the client of global events
## Example: onConnectionChanged, onProofMissing, etc.
if isNil(ctx[].eventCallback):
error eventName & " - eventCallback is nil"
return
@ -94,27 +94,28 @@ proc sendRequestToStorageThread*(
# Send the request to the Logos Storage thread
let sentOk = ctx.reqChannel.trySend(req)
if not sentOk:
deallocShared(req)
return err("Failed to send request to the Logos Storage thread: " & $req[])
let reqDesc = $req[]
req.destroy()
return err("Failed to send request to the Logos Storage thread: " & reqDesc)
# trySend has succeeded: req is published in the channel and
# owned by the Logos Storage thread, which frees it in handleRes.
# Notify the Logos Storage thread that a request is available
let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
deallocShared(req)
return err(
"Failed to send request to the Logos Storage thread: unable to fireSync: " &
$fireSyncRes.error
)
if fireSyncRes.get() == false:
deallocShared(req)
return
err("Failed to send request to the Logos Storage thread: fireSync timed out.")
# Wait until the Logos Storage thread properly received the request
let res = ctx.reqReceivedSignal.waitSync(timeout)
if res.isErr():
deallocShared(req)
return err(
"Failed to send request to the Logos Storage thread: unable to receive reqReceivedSignal signal."
)
@ -172,13 +173,13 @@ proc createStorageContext*(): Result[ptr StorageContext, string] =
# Allocates a StorageContext in shared memory (for the main thread)
var ctx = createShared(StorageContext, 1)
# This signal is used by the main side to wake the Logos Storage thread
# This signal is used by the main side to wake the Logos Storage thread
# when a new request is enqueued.
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return
err("Failed to create a context: unable to create reqSignal ThreadSignalPtr.")
# Used to let the caller know that the Logos Storage thread has
# Used to let the caller know that the Logos Storage thread has
# acknowledged / picked up a request (like a handshake).
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err(

View File

@ -41,7 +41,7 @@ proc createShared*(
ret[].logLevel = logLevel.alloc()
return ret
proc destroyShared(self: ptr NodeDebugRequest) =
proc destroyShared*(self: ptr NodeDebugRequest) =
deallocShared(self[].peerId)
deallocShared(self[].logLevel)
deallocShared(self)

View File

@ -72,7 +72,7 @@ proc createShared*(
return ret
proc destroyShared(self: ptr NodeDownloadRequest) =
proc destroyShared*(self: ptr NodeDownloadRequest) =
deallocShared(self[].cid)
deallocShared(self[].filepath)
deallocShared(self)

View File

@ -31,7 +31,7 @@ proc createShared*(T: type NodeInfoRequest, op: NodeInfoMsgType): ptr type T =
ret[].operation = op
return ret
proc destroyShared(self: ptr NodeInfoRequest) =
proc destroyShared*(self: ptr NodeInfoRequest) =
deallocShared(self)
proc getRepo(

View File

@ -90,7 +90,7 @@ proc createShared*(
ret[].configJson = configJson.alloc()
return ret
proc destroyShared(self: ptr NodeLifecycleRequest) =
proc destroyShared*(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)

View File

@ -21,7 +21,9 @@ type NodeP2PMsgType* = enum
type NodeP2PRequest* = object
operation: NodeP2PMsgType
peerId: cstring
peerAddresses: seq[cstring]
# Use a pointer to free the memory in destroyShared
peerAddresses: ptr UncheckedArray[cstring]
peerAddressesLen: int
proc createShared*(
T: type NodeP2PRequest,
@ -32,11 +34,26 @@ proc createShared*(
var ret = createShared(T)
ret[].operation = op
ret[].peerId = peerId.alloc()
ret[].peerAddresses = peerAddresses
ret[].peerAddressesLen = peerAddresses.len
if peerAddresses.len > 0:
ret[].peerAddresses = cast[ptr UncheckedArray[cstring]](allocShared(
sizeof(cstring) * peerAddresses.len
))
for i in 0 ..< peerAddresses.len:
ret[].peerAddresses[i] = peerAddresses[i]
return ret
proc destroyShared(self: ptr NodeP2PRequest) =
proc destroyShared*(self: ptr NodeP2PRequest) =
if self == nil:
return
deallocShared(self[].peerId)
if self[].peerAddresses != nil:
deallocShared(self[].peerAddresses)
deallocShared(self)
proc connect(
@ -88,7 +105,13 @@ proc process*(
case self.operation
of NodeP2PMsgType.CONNECT:
let res = (await connect(storage, self.peerId, self.peerAddresses))
# connect() takes seq[cstring]; self.peerAddresses is a raw shared array,
# so copy the pointers into a seq.
var peerAddresses = newSeq[cstring](self.peerAddressesLen)
for i in 0 ..< self.peerAddressesLen:
peerAddresses[i] = self.peerAddresses[i]
let res = (await connect(storage, self.peerId, peerAddresses))
if res.isErr:
error "Failed to CONNECT.", error = res.error
return err($res.error)

View File

@ -50,7 +50,7 @@ proc createShared*(
return ret
proc destroyShared(self: ptr NodeStorageRequest) =
proc destroyShared*(self: ptr NodeStorageRequest) =
deallocShared(self[].cid)
deallocShared(self)

View File

@ -47,7 +47,9 @@ type NodeUploadRequest* = object
operation: NodeUploadMsgType
sessionId: cstring
filepath: cstring
chunk: seq[byte]
# Use a pointer to free the memory in destroyShared
chunk: pointer
chunkLen: int
chunkSize: csize_t
type
@ -68,21 +70,30 @@ proc createShared*(
op: NodeUploadMsgType,
sessionId: cstring = "",
filepath: cstring = "",
chunk: seq[byte] = @[],
chunkData: ptr byte = nil,
chunkLen: int = 0,
chunkSize: csize_t = 0,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].sessionId = sessionId.alloc()
ret[].filepath = filepath.alloc()
ret[].chunk = chunk
ret[].chunkLen = chunkLen
ret[].chunkSize = chunkSize
if chunkLen > 0:
ret[].chunk = allocShared(chunkLen)
copyMem(ret[].chunk, chunkData, chunkLen)
return ret
proc destroyShared(self: ptr NodeUploadRequest) =
proc destroyShared*(self: ptr NodeUploadRequest) =
deallocShared(self[].filepath)
deallocShared(self[].sessionId)
if self[].chunk != nil:
deallocShared(self[].chunk)
deallocShared(self)
proc init(
@ -347,7 +358,13 @@ proc process*(
return err($res.error)
return res
of NodeUploadMsgType.CHUNK:
let res = (await chunk(storage, self.sessionId, self.chunk))
# self.chunk is a raw memory pointer, but chunk() takes a seq[byte],
# so we copy the bytes into a new seq.
var data = newSeq[byte](self.chunkLen)
if self.chunkLen > 0:
copyMem(addr data[0], self.chunk, self.chunkLen)
let res = (await chunk(storage, self.sessionId, data))
if res.isErr:
error "Failed to CHUNK.", error = res.error
return err($res.error)

View File

@ -54,6 +54,27 @@ proc createShared*(
ret[].userData = userData
return ret
proc destroy*(request: ptr StorageThreadRequest) =
## Frees the payload (reqContent) and the wrapper on error paths,
## when the request is not handed to the storage thread.
## On success paths, the storage thread frees the payload and the wrapper.
case request[].reqType
of LIFECYCLE:
cast[ptr NodeLifecycleRequest](request[].reqContent).destroyShared()
of INFO:
cast[ptr NodeInfoRequest](request[].reqContent).destroyShared()
of RequestType.DEBUG:
cast[ptr NodeDebugRequest](request[].reqContent).destroyShared()
of P2P:
cast[ptr NodeP2PRequest](request[].reqContent).destroyShared()
of UPLOAD:
cast[ptr NodeUploadRequest](request[].reqContent).destroyShared()
of DOWNLOAD:
cast[ptr NodeDownloadRequest](request[].reqContent).destroyShared()
of STORAGE:
cast[ptr NodeStorageRequest](request[].reqContent).destroyShared()
deallocShared(request)
# NOTE: User callbacks are executed on the working thread.
# They must be fast and non-blocking; otherwise this thread will be blocked
# and no further requests can be processed.