feat: libsds - support for multiple channels per RM (#15)

This commit is contained in:
Akhil 2025-07-17 14:13:21 +05:30 committed by GitHub
parent 8312df7b53
commit 4e10d77218
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 68 additions and 37 deletions

View File

@ -3,9 +3,10 @@ import ./json_base_event, ../../src/[message]
type JsonMessageReadyEvent* = ref object of JsonEvent
messageId*: SdsMessageID
channelId*: SdsChannelID
proc new*(T: type JsonMessageReadyEvent, messageId: SdsMessageID): T =
return JsonMessageReadyEvent(eventType: "message_ready", messageId: messageId)
proc new*(T: type JsonMessageReadyEvent, messageId: SdsMessageID, channelId: SdsChannelID): T =
return JsonMessageReadyEvent(eventType: "message_ready", messageId: messageId, channelId: channelId)
method `$`*(jsonMessageReady: JsonMessageReadyEvent): string =
$(%*jsonMessageReady)

View File

@ -3,9 +3,10 @@ import ./json_base_event, ../../src/[message]
type JsonMessageSentEvent* = ref object of JsonEvent
messageId*: SdsMessageID
channelId*: SdsChannelID
proc new*(T: type JsonMessageSentEvent, messageId: SdsMessageID): T =
return JsonMessageSentEvent(eventType: "message_sent", messageId: messageId)
proc new*(T: type JsonMessageSentEvent, messageId: SdsMessageID, channelId: SdsChannelID): T =
return JsonMessageSentEvent(eventType: "message_sent", messageId: messageId, channelId: channelId)
method `$`*(jsonMessageSent: JsonMessageSentEvent): string =
$(%*jsonMessageSent)

View File

@ -4,14 +4,16 @@ import ./json_base_event, ../../src/[message]
type JsonMissingDependenciesEvent* = ref object of JsonEvent
messageId*: SdsMessageID
missingDeps: seq[SdsMessageID]
channelId*: SdsChannelID
proc new*(
T: type JsonMissingDependenciesEvent,
messageId: SdsMessageID,
missingDeps: seq[SdsMessageID],
channelId: SdsChannelID,
): T =
return JsonMissingDependenciesEvent(
eventType: "missing_dependencies", messageId: messageId, missingDeps: missingDeps
eventType: "missing_dependencies", messageId: messageId, missingDeps: missingDeps, channelId: channelId
)
method `$`*(jsonMissingDependencies: JsonMissingDependenciesEvent): string =

View File

@ -24,7 +24,7 @@ typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* u
// --- Core API Functions ---
void* SdsNewReliabilityManager(const char* channelId, SdsCallBack callback, void* userData);
void* SdsNewReliabilityManager(SdsCallBack callback, void* userData);
void SdsSetEventCallback(void* ctx, SdsCallBack callback, void* userData);
@ -36,6 +36,7 @@ int SdsWrapOutgoingMessage(void* ctx,
void* message,
size_t messageLen,
const char* messageId,
const char* channelId,
SdsCallBack callback,
void* userData);
@ -48,6 +49,7 @@ int SdsUnwrapReceivedMessage(void* ctx,
int SdsMarkDependenciesMet(void* ctx,
char** messageIDs,
size_t count,
const char* channelId,
SdsCallBack callback,
void* userData);

View File

@ -5,7 +5,7 @@
when defined(linux):
{.passl: "-Wl,-soname,libsds.so".}
import std/[locks, typetraits, tables, atomics], chronos, chronicles
import std/[typetraits, tables, atomics], chronos, chronicles
import
./sds_thread/sds_thread,
./alloc,
@ -13,7 +13,7 @@ import
./sds_thread/inter_thread_communication/sds_thread_request,
./sds_thread/inter_thread_communication/requests/
[sds_lifecycle_request, sds_message_request, sds_dependencies_request],
../src/[reliability, reliability_utils, message],
../src/[reliability_utils, message],
./events/[
json_message_ready_event, json_message_sent_event, json_missing_dependencies_event,
json_periodic_sync_event,
@ -72,19 +72,19 @@ proc handleRequest(
return RET_OK
proc onMessageReady(ctx: ptr SdsContext): MessageReadyCallback =
return proc(messageId: SdsMessageID) {.gcsafe.} =
return proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
callEventCallback(ctx, "onMessageReady"):
$JsonMessageReadyEvent.new(messageId)
$JsonMessageReadyEvent.new(messageId, channelId)
proc onMessageSent(ctx: ptr SdsContext): MessageSentCallback =
return proc(messageId: SdsMessageID) {.gcsafe.} =
return proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
callEventCallback(ctx, "onMessageSent"):
$JsonMessageSentEvent.new(messageId)
$JsonMessageSentEvent.new(messageId, channelId)
proc onMissingDependencies(ctx: ptr SdsContext): MissingDependenciesCallback =
return proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
return proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
callEventCallback(ctx, "onMissingDependencies"):
$JsonMissingDependenciesEvent.new(messageId, missingDeps)
$JsonMissingDependenciesEvent.new(messageId, missingDeps, channelId)
proc onPeriodicSync(ctx: ptr SdsContext): PeriodicSyncCallback =
return proc() {.gcsafe.} =
@ -131,7 +131,7 @@ proc initializeLibrary() {.exported.} =
### Exported procs
proc SdsNewReliabilityManager(
channelId: cstring, callback: SdsCallBack, userData: pointer
callback: SdsCallBack, userData: pointer
): pointer {.dynlib, exportc, cdecl.} =
initializeLibrary()
@ -159,7 +159,7 @@ proc SdsNewReliabilityManager(
ctx,
RequestType.LIFECYCLE,
SdsLifecycleRequest.createShared(
SdsLifecycleMsgType.CREATE_RELIABILITY_MANAGER, channelId, appCallbacks
SdsLifecycleMsgType.CREATE_RELIABILITY_MANAGER, nil, appCallbacks
),
callback,
userData,
@ -211,6 +211,7 @@ proc SdsWrapOutgoingMessage(
message: pointer,
messageLen: csize_t,
messageId: cstring,
channelId: cstring,
callback: SdsCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
@ -227,11 +228,21 @@ proc SdsWrapOutgoingMessage(
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
if channelId == nil:
let msg = "libsds error: " & "channel ID pointer is NULL"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
if channelId != nil and $channelId == "":
let msg = "libsds error: " & "channel ID is empty string"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
handleRequest(
ctx,
RequestType.MESSAGE,
SdsMessageRequest.createShared(
SdsMessageMsgType.WRAP_MESSAGE, message, messageLen, messageId
SdsMessageMsgType.WRAP_MESSAGE, message, messageLen, messageId, channelId
),
callback,
userData,
@ -266,6 +277,7 @@ proc SdsMarkDependenciesMet(
ctx: ptr SdsContext,
messageIds: pointer,
count: csize_t,
channelId: cstring,
callback: SdsCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
@ -277,11 +289,21 @@ proc SdsMarkDependenciesMet(
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
if channelId == nil:
let msg = "libsds error: " & "channel ID pointer is NULL"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
if channelId != nil and $channelId == "":
let msg = "libsds error: " & "channel ID is empty string"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
handleRequest(
ctx,
RequestType.DEPENDENCIES,
SdsDependenciesRequest.createShared(
SdsDependenciesMsgType.MARK_DEPENDENCIES_MET, messageIds, count
SdsDependenciesMsgType.MARK_DEPENDENCIES_MET, messageIds, count, channelId
),
callback,
userData,

View File

@ -1,8 +1,8 @@
import std/[options, json, strutils, net, sequtils]
import chronos, chronicles, results, confutils, confutils/std/net
import std/[json, strutils, net, sequtils]
import chronos, chronicles, results
import ../../../alloc
import ../../../../src/[reliability_utils, reliability, message]
import ../../../../src/[reliability_utils, reliability]
type SdsDependenciesMsgType* = enum
MARK_DEPENDENCIES_MET
@ -11,21 +11,25 @@ type SdsDependenciesRequest* = object
operation: SdsDependenciesMsgType
messageIds: SharedSeq[cstring]
count: csize_t
channelId: cstring
proc createShared*(
T: type SdsDependenciesRequest,
op: SdsDependenciesMsgType,
messageIds: pointer,
count: csize_t = 0,
channelId: cstring = "",
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].count = count
ret[].channelId = channelId.alloc()
ret[].messageIds = allocSharedSeqFromCArray(cast[ptr cstring](messageIds), count.int)
return ret
proc destroyShared(self: ptr SdsDependenciesRequest) =
deallocSharedSeq(self[].messageIds)
deallocShared(self[].channelId)
deallocShared(self)
proc process*(
@ -39,7 +43,7 @@ proc process*(
let messageIdsC = self.messageIds.toSeq()
let messageIds = messageIdsC.mapIt($it)
markDependenciesMet(rm[], messageIds).isOkOr:
markDependenciesMet(rm[], messageIds, $self.channelId).isOkOr:
error "MARK_DEPENDENCIES_MET failed", error = error
return err("error processing MARK_DEPENDENCIES_MET request: " & $error)

View File

@ -1,8 +1,8 @@
import std/[options, json, strutils, net]
import chronos, chronicles, results, confutils, confutils/std/net
import std/json
import chronos, chronicles, results
import ../../../alloc
import ../../../../src/[reliability_utils, reliability, message]
import ../../../../src/[reliability_utils, reliability]
type SdsLifecycleMsgType* = enum
CREATE_RELIABILITY_MANAGER
@ -31,14 +31,9 @@ proc destroyShared(self: ptr SdsLifecycleRequest) =
deallocShared(self)
proc createReliabilityManager(
channelIdCStr: cstring, appCallbacks: AppCallbacks = nil
appCallbacks: AppCallbacks = nil
): Future[Result[ReliabilityManager, string]] {.async.} =
let channelId = $channelIdCStr
if channelId.len == 0:
error "Failed creating ReliabilityManager: Channel ID cannot be empty"
return err("Failed creating ReliabilityManager: Channel ID cannot be empty")
let rm = newReliabilityManager(some(channelId)).valueOr:
let rm = newReliabilityManager().valueOr:
error "Failed creating reliability manager", error = error
return err("Failed creating reliability manager: " & $error)
@ -57,7 +52,7 @@ proc process*(
case self.operation
of CREATE_RELIABILITY_MANAGER:
rm[] = (await createReliabilityManager(self.channelId, self.appCallbacks)).valueOr:
rm[] = (await createReliabilityManager(self.appCallbacks)).valueOr:
error "CREATE_RELIABILITY_MANAGER failed", error = error
return err("error processing CREATE_RELIABILITY_MANAGER request: " & $error)
of RESET_RELIABILITY_MANAGER:

View File

@ -1,5 +1,5 @@
import std/[options, json, strutils, net, sequtils]
import chronos, chronicles, results, confutils, confutils/std/net
import std/[json, strutils, net, sequtils]
import chronos, chronicles, results
import ../../../alloc
import ../../../../src/[reliability_utils, reliability, message]
@ -13,6 +13,7 @@ type SdsMessageRequest* = object
message: SharedSeq[byte]
messageLen: csize_t
messageId: cstring
channelId: cstring
type SdsUnwrapResponse* = object
message*: seq[byte]
@ -24,11 +25,13 @@ proc createShared*(
message: pointer,
messageLen: csize_t = 0,
messageId: cstring = "",
channelId: cstring = "",
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].messageLen = messageLen
ret[].messageId = messageId.alloc()
ret[].channelId = channelId.alloc()
ret[].message = allocSharedSeqFromCArray(cast[ptr byte](message), messageLen.int)
return ret
@ -36,6 +39,7 @@ proc createShared*(
proc destroyShared(self: ptr SdsMessageRequest) =
deallocSharedSeq(self[].message)
deallocShared(self[].messageId)
deallocShared(self[].channelId)
deallocShared(self)
proc process*(
@ -48,7 +52,7 @@ proc process*(
of WRAP_MESSAGE:
let messageBytes = self.message.toSeq()
let wrappedMessage = wrapOutgoingMessage(rm[], messageBytes, $self.messageId).valueOr:
let wrappedMessage = wrapOutgoingMessage(rm[], messageBytes, $self.messageId, $self.channelId).valueOr:
error "WRAP_MESSAGE failed", error = error
return err("error processing WRAP_MESSAGE request: " & $error)
@ -57,7 +61,7 @@ proc process*(
of UNWRAP_MESSAGE:
let messageBytes = self.message.toSeq()
let (unwrappedMessage, missingDeps) = unwrapReceivedMessage(rm[], messageBytes).valueOr:
let (unwrappedMessage, missingDeps, _) = unwrapReceivedMessage(rm[], messageBytes).valueOr:
error "UNWRAP_MESSAGE failed", error = error
return err("error processing UNWRAP_MESSAGE request: " & $error)