initial addition of dependencies request

This commit is contained in:
Gabriel mermelstein 2025-04-15 18:03:49 +03:00
parent 79f42fb461
commit c0431d53a6
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D
5 changed files with 114 additions and 5 deletions

View File

@ -30,6 +30,14 @@ proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] =
return (cast[ptr UncheckedArray[T]](data), s.len)
proc deallocSharedSeq*[T](s: var SharedSeq[T]) =
if not s.data.isNil:
when T is cstring:
# For array of cstrings, deallocate each string first
for i in 0 ..< s.len:
if not s.data[i].isNil:
# Deallocate each cstring
deallocShared(s.data[i])
deallocShared(s.data)
s.len = 0
@ -48,6 +56,18 @@ proc allocSharedSeqFromCArray*[T](arr: ptr T, len: int): SharedSeq[T] =
if arr.isNil or len <= 0:
return (nil, 0)
let data = allocShared(sizeof(T) * len)
copyMem(data, arr, sizeof(T) * len)
return (cast[ptr UncheckedArray[T]](data), len)
when T is cstring:
# Special handling for arrays of cstrings
let data = cast[ptr UncheckedArray[cstring]](allocShared(sizeof(cstring) * len))
let cstrArr = cast[ptr UncheckedArray[cstring]](arr)
for i in 0 ..< len:
# Use the existing alloc proc to properly allocate each cstring
data[i] = cstrArr[i].alloc()
return (data, len)
else:
# Original handling for non-cstring types
let data = allocShared(sizeof(T) * len)
copyMem(data, arr, sizeof(T) * len)
return (cast[ptr UncheckedArray[T]](data), len)

View File

@ -45,6 +45,13 @@ int UnwrapReceivedMessage(void* ctx,
SdsCallBack callback,
void* userData);
int MarkDependenciesMet(void* ctx,
char** messageIDs,
size_t count,
SdsCallBack callback,
void* userData);
#ifdef __cplusplus
}

View File

@ -14,7 +14,7 @@ import
./ffi_types,
./sds_thread/inter_thread_communication/sds_thread_request,
./sds_thread/inter_thread_communication/requests/
[sds_lifecycle_request, sds_message_request],
[sds_lifecycle_request, sds_message_request, sds_dependencies_request],
../src/[reliability, reliability_utils, message]
################################################################################
@ -242,5 +242,35 @@ proc UnwrapReceivedMessage(
userData,
)
proc MarkDependenciesMet(
ctx: ptr SdsContext,
messageIds: pointer,
count: csize_t,
callback: SdsCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibsdsParams(ctx, callback, userData)
if messageIds == nil and count > 0:
let msg = "libsds error: " & "MessageIDs pointer is NULL but count > 0"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
var messageIds = allocSharedSeqFromCArray(cast[ptr cstring](messageIds), count.int)
defer:
deallocSharedSeq(messageIds)
handleRequest(
ctx,
RequestType.DEPENDENCIES,
SdsDependenciesRequest.createShared(
SdsDependenciesMsgType.MARK_DEPENDENCIES_MET, messageIds, count
),
callback,
userData,
)
### End of exported procs
################################################################################

View File

@ -0,0 +1,49 @@
import std/[options, json, strutils, net, sequtils]
import chronos, chronicles, results, confutils, confutils/std/net
import ../../../alloc
import ../../../../src/[reliability_utils, reliability, message]
type SdsDependenciesMsgType* = enum
MARK_DEPENDENCIES_MET
type SdsDependenciesRequest* = object
operation: SdsDependenciesMsgType
messageIds: SharedSeq[cstring]
count: csize_t
proc createShared*(
T: type SdsDependenciesRequest,
op: SdsDependenciesMsgType,
messageIds: SharedSeq[cstring],
count: csize_t = 0,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].messageIds = messageIds # check if alloc is needed
ret[].count = count
return ret
proc destroyShared(self: ptr SdsDependenciesRequest) =
#deallocShared(self[].message)
#deallocShared(self[].messageId)
deallocShared(self)
proc process*(
self: ptr SdsDependenciesRequest, rm: ptr ReliabilityManager
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
case self.operation
of MARK_DEPENDENCIES_MET:
let messageIdsC = self.messageIds.toSeq()
let messageIds = messageIdsC.mapIt($it)
markDependenciesMet(rm[], messageIds).isOkOr:
error "MARK_DEPENDENCIES_MET failed", error = error
return err("error processing MARK_DEPENDENCIES_MET request: " & $error)
return ok("")
return ok("")

View File

@ -6,12 +6,13 @@ import std/json, results
import chronos, chronos/threadsync
import
../../ffi_types,
./requests/[sds_lifecycle_request, sds_message_request],
./requests/[sds_lifecycle_request, sds_message_request, sds_dependencies_request],
../../../src/[reliability_utils]
type RequestType* {.pure.} = enum
LIFECYCLE
MESSAGE
DEPENDENCIES
type SdsThreadRequest* = object
reqType: RequestType
@ -68,6 +69,8 @@ proc process*(
cast[ptr SdsLifecycleRequest](request[].reqContent).process(rm)
of MESSAGE:
cast[ptr SdsMessageRequest](request[].reqContent).process(rm)
of DEPENDENCIES:
cast[ptr SdsDependenciesRequest](request[].reqContent).process(rm)
handleRes(await retFut, request)