diff --git a/library/libsds.h b/library/libsds.h index 044151a..18e6b61 100644 --- a/library/libsds.h +++ b/library/libsds.h @@ -26,7 +26,7 @@ typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* u void* NewReliabilityManager(const char* channelId, SdsCallBack callback, void* userData); -void SetEventCallback(void* ctx, SdsCallBack callback, void* userData); +void SdsSetEventCallback(void* ctx, SdsCallBack callback, void* userData); int CleanupReliabilityManager(void* ctx, SdsCallBack callback, void* userData); diff --git a/library/libsds.nim b/library/libsds.nim index 7da862d..0e840cb 100644 --- a/library/libsds.nim +++ b/library/libsds.nim @@ -35,32 +35,27 @@ template checkLibsdsParams*( return RET_MISSING_CALLBACK template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped) = - echo "------------- callEventCallback 1" if isNil(ctx[].eventCallback): error eventName & " - eventCallback is nil" return - echo "------------- callEventCallback 2" if isNil(ctx[].eventUserData): error eventName & " - eventUserData is nil" return foreignThreadGc: try: - echo "------------- callEventCallback 3" let event = body cast[SdsCallBack](ctx[].eventCallback)( RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData ) except Exception, CatchableError: - echo "------------- callEventCallback 4" let msg = "Exception " & eventName & " when calling 'eventCallBack': " & getCurrentExceptionMsg() cast[SdsCallBack](ctx[].eventCallback)( RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData ) - echo "------------- callEventCallback 5" proc handleRequest( ctx: ptr SdsContext, @@ -69,15 +64,11 @@ proc handleRequest( callback: SdsCallBack, userData: pointer, ): cint = - echo "---------------- handleRequest 1" sds_thread.sendRequestToSdsThread(ctx, requestType, content, callback, userData).isOkOr: - echo "---------------- handleRequest 2" let msg = "libsds error: " & $error callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - echo "---------------- handleRequest 3" return RET_ERR - echo "---------------- handleRequest 4" return RET_OK proc onMessageReady(ctx: ptr SdsContext): MessageReadyCallback = @@ -142,12 +133,10 @@ proc initializeLibrary() {.exported.} = proc NewReliabilityManager( channelId: cstring, callback: SdsCallBack, userData: pointer ): pointer {.dynlib, exportc, cdecl.} = - echo "------------- NewReliabilityManager 1" initializeLibrary() ## Creates a new instance of the Reliability Manager. if isNil(callback): - echo "error: missing callback in NewReliabilityManager" return nil ## Create the SDS thread that will keep waiting for req from the main thread. @@ -180,10 +169,9 @@ proc NewReliabilityManager( return ctx -proc SetEventCallback( +proc SdsSetEventCallback( ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer ) {.dynlib, exportc.} = - echo "------------- SetEventCallback 1" initializeLibrary() ctx[].eventCallback = cast[pointer](callback) ctx[].eventUserData = userData @@ -191,7 +179,6 @@ proc SetEventCallback( proc CleanupReliabilityManager( ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer ): cint {.dynlib, exportc.} = - echo "------------- CleanupReliabilityManager 1" initializeLibrary() checkLibsdsParams(ctx, callback, userData) @@ -208,7 +195,6 @@ proc CleanupReliabilityManager( proc ResetReliabilityManager( ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer ): cint {.dynlib, exportc.} = - echo "------------- ResetReliabilityManager 1" initializeLibrary() checkLibsdsParams(ctx, callback, userData) handleRequest( @@ -227,26 +213,19 @@ proc WrapOutgoingMessage( callback: SdsCallBack, userData: pointer, ): cint {.dynlib, exportc.} = - echo "------------- WrapOutgoingMessage 1" initializeLibrary() - echo "------------- WrapOutgoingMessage 2" checkLibsdsParams(ctx, callback, userData) - echo "------------- WrapOutgoingMessage 3" if message == nil and messageLen > 0: - echo "------------- WrapOutgoingMessage 4" let msg = "libsds error: " & "message pointer is NULL but length > 0" callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR if messageId == nil: - echo "------------- WrapOutgoingMessage 5" let msg = "libsds error: " & "message ID pointer is NULL" callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR - echo "------------- WrapOutgoingMessage 6" - echo "------------- WrapOutgoingMessage messageId: ", $messageId handleRequest( ctx, RequestType.MESSAGE, @@ -264,19 +243,14 @@ proc UnwrapReceivedMessage( callback: SdsCallBack, userData: pointer, ): cint {.dynlib, exportc.} = - echo "------------- UnwrapReceivedMessage 1" initializeLibrary() - echo "------------- UnwrapReceivedMessage 2" checkLibsdsParams(ctx, callback, userData) - echo "------------- UnwrapReceivedMessage 3" if message == nil and messageLen > 0: - echo "------------- UnwrapReceivedMessage 4" let msg = "libsds error: " & "message pointer is NULL but length > 0" callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR - echo "------------- UnwrapReceivedMessage 5" handleRequest( ctx, RequestType.MESSAGE, @@ -297,14 +271,11 @@ proc MarkDependenciesMet( initializeLibrary() checkLibsdsParams(ctx, callback, userData) - echo "------------- MarkDependenciesMet 1" if messageIds == nil and count > 0: - echo "------------- MarkDependenciesMet 2" let msg = "libsds error: " & "MessageIDs pointer is NULL but count > 0" callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR - echo "------------- MarkDependenciesMet 3" handleRequest( ctx, RequestType.DEPENDENCIES, @@ -318,11 +289,8 @@ proc MarkDependenciesMet( proc StartPeriodicTasks( ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer ): cint {.dynlib, exportc.} = - echo "------------- StartPeriodicTasks 1" initializeLibrary() - echo "------------- StartPeriodicTasks 2" checkLibsdsParams(ctx, callback, userData) - echo "------------- StartPeriodicTasks 3" handleRequest( ctx, RequestType.LIFECYCLE, diff --git a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim index 371b0f9..476a65e 100644 --- a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim +++ b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim @@ -48,7 +48,6 @@ proc process*( of WRAP_MESSAGE: let messageBytes = self.message.toSeq() - echo "-------------- WRAP_MESSAGE bytes: ", $messageBytes let wrappedMessage = wrapOutgoingMessage(rm[], messageBytes, $self.messageId).valueOr: error "WRAP_MESSAGE failed", error = error return err("error processing WRAP_MESSAGE request: " & $error) diff --git a/library/sds_thread/inter_thread_communication/sds_thread_request.nim b/library/sds_thread/inter_thread_communication/sds_thread_request.nim index 57ae78e..f40bef4 100644 --- a/library/sds_thread/inter_thread_communication/sds_thread_request.nim +++ b/library/sds_thread/inter_thread_communication/sds_thread_request.nim @@ -27,7 +27,6 @@ proc createShared*( callback: SdsCallBack, userData: pointer, ): ptr type T = - echo "---------- createShared 1" var ret = createShared(T) ret[].reqType = reqType ret[].reqContent = reqContent @@ -41,12 +40,10 @@ proc handleRes[T: string | void]( ## Handles the Result responses, which can either be Result[string, string] or ## Result[void, string]. - echo "-------------- handleRes 1" defer: deallocShared(request) if res.isErr(): - echo "-------------- handleRes 2" foreignThreadGc: let msg = "libsds error: handleRes fireSyncRes error: " & $res.error request[].callback( @@ -55,14 +52,12 @@ proc handleRes[T: string | void]( return foreignThreadGc: - echo "-------------- handleRes 3" var msg: cstring = "" when T is string: msg = res.get().cstring() request[].callback( RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData ) - echo "-------------- handleRes 4" return proc process*( diff --git a/library/sds_thread/sds_thread.nim b/library/sds_thread/sds_thread.nim index ab9a4c4..4a2cce5 100644 --- a/library/sds_thread/sds_thread.nim +++ b/library/sds_thread/sds_thread.nim @@ -26,33 +26,25 @@ proc runSds(ctx: ptr SdsContext) {.async.} = ## This is the worker body. This runs the SDS instance ## and attends library user requests (stop, connect_to, etc.) - echo "---------------- runSds 1" var rm: ReliabilityManager while true: - echo "---------------- runSds 2" await ctx.reqSignal.wait() - echo "---------------- runSds 3" if ctx.running.load == false: - echo "---------------- runSds 4" break ## Trying to get a request from the libsds requestor thread var request: ptr SdsThreadRequest - echo "---------------- runSds 4" let recvOk = ctx.reqChannel.tryRecv(request) if not recvOk: - echo "---------------- runSds 5" error "sds thread could not receive a request" continue let fireRes = ctx.reqReceivedSignal.fireSync() - echo "---------------- runSds 6" if fireRes.isErr(): error "could not fireSync back to requester thread", error = fireRes.error - echo "---------------- runSds 7" ## Handle the request asyncSpawn SdsThreadRequest.process(request, addr rm) @@ -107,7 +99,6 @@ proc sendRequestToSdsThread*( ): Result[void, string] = let req = SdsThreadRequest.createShared(reqType, reqContent, callback, userData) - echo "------------- sendRequestToSdsThread 1" # This lock is only necessary while we use a SP Channel and while the signalling # between threads assumes that there aren't concurrent requests. # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive @@ -118,32 +109,24 @@ proc sendRequestToSdsThread*( ## Sending the request let sentOk = ctx.reqChannel.trySend(req) if not sentOk: - echo "------------- sendRequestToSdsThread 2" deallocShared(req) return err("Couldn't send a request to the sds thread: " & $req[]) let fireSyncRes = ctx.reqSignal.fireSync() - echo "------------- sendRequestToSdsThread 3" if fireSyncRes.isErr(): - echo "------------- sendRequestToSdsThread 4" deallocShared(req) return err("failed fireSync: " & $fireSyncRes.error) - echo "------------- sendRequestToSdsThread 5" if fireSyncRes.get() == false: - echo "------------- sendRequestToSdsThread 6" deallocShared(req) return err("Couldn't fireSync in time") ## wait until the SDS Thread properly received the request let res = ctx.reqReceivedSignal.waitSync() - echo "------------- sendRequestToSdsThread 7" if res.isErr(): - echo "------------- sendRequestToSdsThread 8" deallocShared(req) return err("Couldn't receive reqReceivedSignal signal") ## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the ## process proc. - echo "------------- sendRequestToSdsThread 9" ok() diff --git a/src/reliability.nim b/src/reliability.nim index dc2e501..97a39a9 100644 --- a/src/reliability.nim +++ b/src/reliability.nim @@ -13,17 +13,13 @@ proc newReliabilityManager*( ## ## Returns: ## A Result containing either a new ReliabilityManager instance or an error. - echo "--------------- newReliabilityManager 1" if not channelId.isSome(): - echo "--------------- newReliabilityManager 2" return err(ReliabilityError.reInvalidArgument) try: - echo "--------------- newReliabilityManager 3" let bloomFilter = newRollingBloomFilter(config.bloomFilterCapacity, config.bloomFilterErrorRate) - echo "--------------- newReliabilityManager 4" let rm = ReliabilityManager( lamportTimestamp: 0, messageHistory: @[], @@ -33,9 +29,7 @@ proc newReliabilityManager*( channelId: channelId, config: config, ) - echo "--------------- newReliabilityManager 5" initLock(rm.lock) - echo "--------------- newReliabilityManager 6" return ok(rm) except Exception: error "Failed to create ReliabilityManager", msg = getCurrentExceptionMsg() @@ -275,13 +269,11 @@ proc setCallbacks*( ## - onMessageSent: Callback function called when a message is confirmed as sent. ## - onMissingDependencies: Callback function called when a message has missing dependencies. ## - onPeriodicSync: Callback function called to notify about periodic sync - echo "---------- calling setCallbacks" withLock rm.lock: rm.onMessageReady = onMessageReady rm.onMessageSent = onMessageSent rm.onMissingDependencies = onMissingDependencies rm.onPeriodicSync = onPeriodicSync - echo "-------- after setCallbacks" proc checkUnacknowledgedMessages(rm: ReliabilityManager) {.gcsafe.} = ## Checks and processes unacknowledged messages in the outgoing buffer.