diff --git a/library/events/json_message_ready_event.nim b/library/events/json_message_ready_event.nim index d64f251..723a534 100644 --- a/library/events/json_message_ready_event.nim +++ b/library/events/json_message_ready_event.nim @@ -3,9 +3,10 @@ import ./json_base_event, ../../src/[message] type JsonMessageReadyEvent* = ref object of JsonEvent messageId*: SdsMessageID + channelId*: SdsChannelID -proc new*(T: type JsonMessageReadyEvent, messageId: SdsMessageID): T = - return JsonMessageReadyEvent(eventType: "message_ready", messageId: messageId) +proc new*(T: type JsonMessageReadyEvent, messageId: SdsMessageID, channelId: SdsChannelID): T = + return JsonMessageReadyEvent(eventType: "message_ready", messageId: messageId, channelId: channelId) method `$`*(jsonMessageReady: JsonMessageReadyEvent): string = $(%*jsonMessageReady) diff --git a/library/events/json_message_sent_event.nim b/library/events/json_message_sent_event.nim index a9c7439..7689f9b 100644 --- a/library/events/json_message_sent_event.nim +++ b/library/events/json_message_sent_event.nim @@ -3,9 +3,10 @@ import ./json_base_event, ../../src/[message] type JsonMessageSentEvent* = ref object of JsonEvent messageId*: SdsMessageID + channelId*: SdsChannelID -proc new*(T: type JsonMessageSentEvent, messageId: SdsMessageID): T = - return JsonMessageSentEvent(eventType: "message_sent", messageId: messageId) +proc new*(T: type JsonMessageSentEvent, messageId: SdsMessageID, channelId: SdsChannelID): T = + return JsonMessageSentEvent(eventType: "message_sent", messageId: messageId, channelId: channelId) method `$`*(jsonMessageSent: JsonMessageSentEvent): string = $(%*jsonMessageSent) diff --git a/library/events/json_missing_dependencies_event.nim b/library/events/json_missing_dependencies_event.nim index b7af04c..ef7ae57 100644 --- a/library/events/json_missing_dependencies_event.nim +++ b/library/events/json_missing_dependencies_event.nim @@ -4,14 +4,16 @@ import ./json_base_event, ../../src/[message] type JsonMissingDependenciesEvent* = ref object of JsonEvent messageId*: SdsMessageID missingDeps: seq[SdsMessageID] + channelId*: SdsChannelID proc new*( T: type JsonMissingDependenciesEvent, messageId: SdsMessageID, missingDeps: seq[SdsMessageID], + channelId: SdsChannelID, ): T = return JsonMissingDependenciesEvent( - eventType: "missing_dependencies", messageId: messageId, missingDeps: missingDeps + eventType: "missing_dependencies", messageId: messageId, missingDeps: missingDeps, channelId: channelId ) method `$`*(jsonMissingDependencies: JsonMissingDependenciesEvent): string = diff --git a/library/libsds.h b/library/libsds.h index aa06b02..886d3cb 100644 --- a/library/libsds.h +++ b/library/libsds.h @@ -24,7 +24,7 @@ typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* u // --- Core API Functions --- -void* SdsNewReliabilityManager(const char* channelId, SdsCallBack callback, void* userData); +void* SdsNewReliabilityManager(SdsCallBack callback, void* userData); void SdsSetEventCallback(void* ctx, SdsCallBack callback, void* userData); @@ -36,6 +36,7 @@ int SdsWrapOutgoingMessage(void* ctx, void* message, size_t messageLen, const char* messageId, + const char* channelId, SdsCallBack callback, void* userData); @@ -48,6 +49,7 @@ int SdsUnwrapReceivedMessage(void* ctx, int SdsMarkDependenciesMet(void* ctx, char** messageIDs, size_t count, + const char* channelId, SdsCallBack callback, void* userData); diff --git a/library/libsds.nim b/library/libsds.nim index c0c70d2..53d2a4e 100644 --- a/library/libsds.nim +++ b/library/libsds.nim @@ -5,7 +5,7 @@ when defined(linux): {.passl: "-Wl,-soname,libsds.so".} -import std/[locks, typetraits, tables, atomics], chronos, chronicles +import std/[typetraits, tables, atomics], chronos, chronicles import ./sds_thread/sds_thread, ./alloc, @@ -13,7 +13,7 @@ import ./sds_thread/inter_thread_communication/sds_thread_request, ./sds_thread/inter_thread_communication/requests/ [sds_lifecycle_request, sds_message_request, sds_dependencies_request], - ../src/[reliability, reliability_utils, message], + ../src/[reliability_utils, message], ./events/[ json_message_ready_event, json_message_sent_event, json_missing_dependencies_event, json_periodic_sync_event, @@ -72,19 +72,19 @@ proc handleRequest( return RET_OK proc onMessageReady(ctx: ptr SdsContext): MessageReadyCallback = - return proc(messageId: SdsMessageID) {.gcsafe.} = + return proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = callEventCallback(ctx, "onMessageReady"): - $JsonMessageReadyEvent.new(messageId) + $JsonMessageReadyEvent.new(messageId, channelId) proc onMessageSent(ctx: ptr SdsContext): MessageSentCallback = - return proc(messageId: SdsMessageID) {.gcsafe.} = + return proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = callEventCallback(ctx, "onMessageSent"): - $JsonMessageSentEvent.new(messageId) + $JsonMessageSentEvent.new(messageId, channelId) proc onMissingDependencies(ctx: ptr SdsContext): MissingDependenciesCallback = - return proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + return proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = callEventCallback(ctx, "onMissingDependencies"): - $JsonMissingDependenciesEvent.new(messageId, missingDeps) + $JsonMissingDependenciesEvent.new(messageId, missingDeps, channelId) proc onPeriodicSync(ctx: ptr SdsContext): PeriodicSyncCallback = return proc() {.gcsafe.} = @@ -131,7 +131,7 @@ proc initializeLibrary() {.exported.} = ### Exported procs proc SdsNewReliabilityManager( - channelId: cstring, callback: SdsCallBack, userData: pointer + callback: SdsCallBack, userData: pointer ): pointer {.dynlib, exportc, cdecl.} = initializeLibrary() @@ -159,7 +159,7 @@ proc SdsNewReliabilityManager( ctx, RequestType.LIFECYCLE, SdsLifecycleRequest.createShared( - SdsLifecycleMsgType.CREATE_RELIABILITY_MANAGER, channelId, appCallbacks + SdsLifecycleMsgType.CREATE_RELIABILITY_MANAGER, nil, appCallbacks ), callback, userData, @@ -211,6 +211,7 @@ proc SdsWrapOutgoingMessage( message: pointer, messageLen: csize_t, messageId: cstring, + channelId: cstring, callback: SdsCallBack, userData: pointer, ): cint {.dynlib, exportc.} = @@ -227,11 +228,21 @@ proc SdsWrapOutgoingMessage( callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR + if channelId == nil: + let msg = "libsds error: " & "channel ID pointer is NULL" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + if channelId != nil and $channelId == "": + let msg = "libsds error: " & "channel ID is empty string" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + handleRequest( ctx, RequestType.MESSAGE, SdsMessageRequest.createShared( - SdsMessageMsgType.WRAP_MESSAGE, message, messageLen, messageId + SdsMessageMsgType.WRAP_MESSAGE, message, messageLen, messageId, channelId ), callback, userData, @@ -266,6 +277,7 @@ proc SdsMarkDependenciesMet( ctx: ptr SdsContext, messageIds: pointer, count: csize_t, + channelId: cstring, callback: SdsCallBack, userData: pointer, ): cint {.dynlib, exportc.} = @@ -277,11 +289,21 @@ proc SdsMarkDependenciesMet( callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR + if channelId == nil: + let msg = "libsds error: " & "channel ID pointer is NULL" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + if channelId != nil and $channelId == "": + let msg = "libsds error: " & "channel ID is empty string" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + handleRequest( ctx, RequestType.DEPENDENCIES, SdsDependenciesRequest.createShared( - SdsDependenciesMsgType.MARK_DEPENDENCIES_MET, messageIds, count + SdsDependenciesMsgType.MARK_DEPENDENCIES_MET, messageIds, count, channelId ), callback, userData, diff --git a/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim index ebb45c2..4d5a0b7 100644 --- a/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim +++ b/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim @@ -1,8 +1,8 @@ -import std/[options, json, strutils, net, sequtils] -import chronos, chronicles, results, confutils, confutils/std/net +import std/[json, strutils, net, sequtils] +import chronos, chronicles, results import ../../../alloc -import ../../../../src/[reliability_utils, reliability, message] +import ../../../../src/[reliability_utils, reliability] type SdsDependenciesMsgType* = enum MARK_DEPENDENCIES_MET @@ -11,21 +11,25 @@ type SdsDependenciesRequest* = object operation: SdsDependenciesMsgType messageIds: SharedSeq[cstring] count: csize_t + channelId: cstring proc createShared*( T: type SdsDependenciesRequest, op: SdsDependenciesMsgType, messageIds: pointer, count: csize_t = 0, + channelId: cstring = "", ): ptr type T = var ret = createShared(T) ret[].operation = op ret[].count = count + ret[].channelId = channelId.alloc() ret[].messageIds = allocSharedSeqFromCArray(cast[ptr cstring](messageIds), count.int) return ret proc destroyShared(self: ptr SdsDependenciesRequest) = deallocSharedSeq(self[].messageIds) + deallocShared(self[].channelId) deallocShared(self) proc process*( @@ -39,7 +43,7 @@ proc process*( let messageIdsC = self.messageIds.toSeq() let messageIds = messageIdsC.mapIt($it) - markDependenciesMet(rm[], messageIds).isOkOr: + markDependenciesMet(rm[], messageIds, $self.channelId).isOkOr: error "MARK_DEPENDENCIES_MET failed", error = error return err("error processing MARK_DEPENDENCIES_MET request: " & $error) diff --git a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim index 2307fce..fd5a615 100644 --- a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim +++ b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim @@ -1,8 +1,8 @@ -import std/[options, json, strutils, net] -import chronos, chronicles, results, confutils, confutils/std/net +import std/json +import chronos, chronicles, results import ../../../alloc -import ../../../../src/[reliability_utils, reliability, message] +import ../../../../src/[reliability_utils, reliability] type SdsLifecycleMsgType* = enum CREATE_RELIABILITY_MANAGER @@ -31,14 +31,9 @@ proc destroyShared(self: ptr SdsLifecycleRequest) = deallocShared(self) proc createReliabilityManager( - channelIdCStr: cstring, appCallbacks: AppCallbacks = nil + appCallbacks: AppCallbacks = nil ): Future[Result[ReliabilityManager, string]] {.async.} = - let channelId = $channelIdCStr - if channelId.len == 0: - error "Failed creating ReliabilityManager: Channel ID cannot be empty" - return err("Failed creating ReliabilityManager: Channel ID cannot be empty") - - let rm = newReliabilityManager(some(channelId)).valueOr: + let rm = newReliabilityManager().valueOr: error "Failed creating reliability manager", error = error return err("Failed creating reliability manager: " & $error) @@ -57,7 +52,7 @@ proc process*( case self.operation of CREATE_RELIABILITY_MANAGER: - rm[] = (await createReliabilityManager(self.channelId, self.appCallbacks)).valueOr: + rm[] = (await createReliabilityManager(self.appCallbacks)).valueOr: error "CREATE_RELIABILITY_MANAGER failed", error = error return err("error processing CREATE_RELIABILITY_MANAGER request: " & $error) of RESET_RELIABILITY_MANAGER: diff --git a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim index 476a65e..d41c15a 100644 --- a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim +++ b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim @@ -1,5 +1,5 @@ -import std/[options, json, strutils, net, sequtils] -import chronos, chronicles, results, confutils, confutils/std/net +import std/[json, strutils, net, sequtils] +import chronos, chronicles, results import ../../../alloc import ../../../../src/[reliability_utils, reliability, message] @@ -13,6 +13,7 @@ type SdsMessageRequest* = object message: SharedSeq[byte] messageLen: csize_t messageId: cstring + channelId: cstring type SdsUnwrapResponse* = object message*: seq[byte] @@ -24,11 +25,13 @@ proc createShared*( message: pointer, messageLen: csize_t = 0, messageId: cstring = "", + channelId: cstring = "", ): ptr type T = var ret = createShared(T) ret[].operation = op ret[].messageLen = messageLen ret[].messageId = messageId.alloc() + ret[].channelId = channelId.alloc() ret[].message = allocSharedSeqFromCArray(cast[ptr byte](message), messageLen.int) return ret @@ -36,6 +39,7 @@ proc createShared*( proc destroyShared(self: ptr SdsMessageRequest) = deallocSharedSeq(self[].message) deallocShared(self[].messageId) + deallocShared(self[].channelId) deallocShared(self) proc process*( @@ -48,7 +52,7 @@ proc process*( of WRAP_MESSAGE: let messageBytes = self.message.toSeq() - let wrappedMessage = wrapOutgoingMessage(rm[], messageBytes, $self.messageId).valueOr: + let wrappedMessage = wrapOutgoingMessage(rm[], messageBytes, $self.messageId, $self.channelId).valueOr: error "WRAP_MESSAGE failed", error = error return err("error processing WRAP_MESSAGE request: " & $error) @@ -57,7 +61,7 @@ proc process*( of UNWRAP_MESSAGE: let messageBytes = self.message.toSeq() - let (unwrappedMessage, missingDeps) = unwrapReceivedMessage(rm[], messageBytes).valueOr: + let (unwrappedMessage, missingDeps, _) = unwrapReceivedMessage(rm[], messageBytes).valueOr: error "UNWRAP_MESSAGE failed", error = error return err("error processing UNWRAP_MESSAGE request: " & $error)