mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-01-02 14:13:07 +00:00
fixes and removing logs
This commit is contained in:
parent
619a4ec689
commit
f4bcb68266
@ -26,7 +26,7 @@ typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* u
|
||||
|
||||
void* NewReliabilityManager(const char* channelId, SdsCallBack callback, void* userData);
|
||||
|
||||
void SetEventCallback(void* ctx, SdsCallBack callback, void* userData);
|
||||
void SdsSetEventCallback(void* ctx, SdsCallBack callback, void* userData);
|
||||
|
||||
int CleanupReliabilityManager(void* ctx, SdsCallBack callback, void* userData);
|
||||
|
||||
|
||||
@ -35,32 +35,27 @@ template checkLibsdsParams*(
|
||||
return RET_MISSING_CALLBACK
|
||||
|
||||
template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped) =
|
||||
echo "------------- callEventCallback 1"
|
||||
if isNil(ctx[].eventCallback):
|
||||
error eventName & " - eventCallback is nil"
|
||||
return
|
||||
|
||||
echo "------------- callEventCallback 2"
|
||||
if isNil(ctx[].eventUserData):
|
||||
error eventName & " - eventUserData is nil"
|
||||
return
|
||||
|
||||
foreignThreadGc:
|
||||
try:
|
||||
echo "------------- callEventCallback 3"
|
||||
let event = body
|
||||
cast[SdsCallBack](ctx[].eventCallback)(
|
||||
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
|
||||
)
|
||||
except Exception, CatchableError:
|
||||
echo "------------- callEventCallback 4"
|
||||
let msg =
|
||||
"Exception " & eventName & " when calling 'eventCallBack': " &
|
||||
getCurrentExceptionMsg()
|
||||
cast[SdsCallBack](ctx[].eventCallback)(
|
||||
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
|
||||
)
|
||||
echo "------------- callEventCallback 5"
|
||||
|
||||
proc handleRequest(
|
||||
ctx: ptr SdsContext,
|
||||
@ -69,15 +64,11 @@ proc handleRequest(
|
||||
callback: SdsCallBack,
|
||||
userData: pointer,
|
||||
): cint =
|
||||
echo "---------------- handleRequest 1"
|
||||
sds_thread.sendRequestToSdsThread(ctx, requestType, content, callback, userData).isOkOr:
|
||||
echo "---------------- handleRequest 2"
|
||||
let msg = "libsds error: " & $error
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
echo "---------------- handleRequest 3"
|
||||
return RET_ERR
|
||||
|
||||
echo "---------------- handleRequest 4"
|
||||
return RET_OK
|
||||
|
||||
proc onMessageReady(ctx: ptr SdsContext): MessageReadyCallback =
|
||||
@ -142,12 +133,10 @@ proc initializeLibrary() {.exported.} =
|
||||
proc NewReliabilityManager(
|
||||
channelId: cstring, callback: SdsCallBack, userData: pointer
|
||||
): pointer {.dynlib, exportc, cdecl.} =
|
||||
echo "------------- NewReliabilityManager 1"
|
||||
initializeLibrary()
|
||||
|
||||
## Creates a new instance of the Reliability Manager.
|
||||
if isNil(callback):
|
||||
echo "error: missing callback in NewReliabilityManager"
|
||||
return nil
|
||||
|
||||
## Create the SDS thread that will keep waiting for req from the main thread.
|
||||
@ -180,10 +169,9 @@ proc NewReliabilityManager(
|
||||
|
||||
return ctx
|
||||
|
||||
proc SetEventCallback(
|
||||
proc SdsSetEventCallback(
|
||||
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||
) {.dynlib, exportc.} =
|
||||
echo "------------- SetEventCallback 1"
|
||||
initializeLibrary()
|
||||
ctx[].eventCallback = cast[pointer](callback)
|
||||
ctx[].eventUserData = userData
|
||||
@ -191,7 +179,6 @@ proc SetEventCallback(
|
||||
proc CleanupReliabilityManager(
|
||||
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
echo "------------- CleanupReliabilityManager 1"
|
||||
initializeLibrary()
|
||||
checkLibsdsParams(ctx, callback, userData)
|
||||
|
||||
@ -208,7 +195,6 @@ proc CleanupReliabilityManager(
|
||||
proc ResetReliabilityManager(
|
||||
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
echo "------------- ResetReliabilityManager 1"
|
||||
initializeLibrary()
|
||||
checkLibsdsParams(ctx, callback, userData)
|
||||
handleRequest(
|
||||
@ -227,26 +213,19 @@ proc WrapOutgoingMessage(
|
||||
callback: SdsCallBack,
|
||||
userData: pointer,
|
||||
): cint {.dynlib, exportc.} =
|
||||
echo "------------- WrapOutgoingMessage 1"
|
||||
initializeLibrary()
|
||||
echo "------------- WrapOutgoingMessage 2"
|
||||
checkLibsdsParams(ctx, callback, userData)
|
||||
echo "------------- WrapOutgoingMessage 3"
|
||||
|
||||
if message == nil and messageLen > 0:
|
||||
echo "------------- WrapOutgoingMessage 4"
|
||||
let msg = "libsds error: " & "message pointer is NULL but length > 0"
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
return RET_ERR
|
||||
|
||||
if messageId == nil:
|
||||
echo "------------- WrapOutgoingMessage 5"
|
||||
let msg = "libsds error: " & "message ID pointer is NULL"
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
return RET_ERR
|
||||
|
||||
echo "------------- WrapOutgoingMessage 6"
|
||||
echo "------------- WrapOutgoingMessage messageId: ", $messageId
|
||||
handleRequest(
|
||||
ctx,
|
||||
RequestType.MESSAGE,
|
||||
@ -264,19 +243,14 @@ proc UnwrapReceivedMessage(
|
||||
callback: SdsCallBack,
|
||||
userData: pointer,
|
||||
): cint {.dynlib, exportc.} =
|
||||
echo "------------- UnwrapReceivedMessage 1"
|
||||
initializeLibrary()
|
||||
echo "------------- UnwrapReceivedMessage 2"
|
||||
checkLibsdsParams(ctx, callback, userData)
|
||||
echo "------------- UnwrapReceivedMessage 3"
|
||||
|
||||
if message == nil and messageLen > 0:
|
||||
echo "------------- UnwrapReceivedMessage 4"
|
||||
let msg = "libsds error: " & "message pointer is NULL but length > 0"
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
return RET_ERR
|
||||
|
||||
echo "------------- UnwrapReceivedMessage 5"
|
||||
handleRequest(
|
||||
ctx,
|
||||
RequestType.MESSAGE,
|
||||
@ -297,14 +271,11 @@ proc MarkDependenciesMet(
|
||||
initializeLibrary()
|
||||
checkLibsdsParams(ctx, callback, userData)
|
||||
|
||||
echo "------------- MarkDependenciesMet 1"
|
||||
if messageIds == nil and count > 0:
|
||||
echo "------------- MarkDependenciesMet 2"
|
||||
let msg = "libsds error: " & "MessageIDs pointer is NULL but count > 0"
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
return RET_ERR
|
||||
|
||||
echo "------------- MarkDependenciesMet 3"
|
||||
handleRequest(
|
||||
ctx,
|
||||
RequestType.DEPENDENCIES,
|
||||
@ -318,11 +289,8 @@ proc MarkDependenciesMet(
|
||||
proc StartPeriodicTasks(
|
||||
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
echo "------------- StartPeriodicTasks 1"
|
||||
initializeLibrary()
|
||||
echo "------------- StartPeriodicTasks 2"
|
||||
checkLibsdsParams(ctx, callback, userData)
|
||||
echo "------------- StartPeriodicTasks 3"
|
||||
handleRequest(
|
||||
ctx,
|
||||
RequestType.LIFECYCLE,
|
||||
|
||||
@ -48,7 +48,6 @@ proc process*(
|
||||
of WRAP_MESSAGE:
|
||||
let messageBytes = self.message.toSeq()
|
||||
|
||||
echo "-------------- WRAP_MESSAGE bytes: ", $messageBytes
|
||||
let wrappedMessage = wrapOutgoingMessage(rm[], messageBytes, $self.messageId).valueOr:
|
||||
error "WRAP_MESSAGE failed", error = error
|
||||
return err("error processing WRAP_MESSAGE request: " & $error)
|
||||
|
||||
@ -27,7 +27,6 @@ proc createShared*(
|
||||
callback: SdsCallBack,
|
||||
userData: pointer,
|
||||
): ptr type T =
|
||||
echo "---------- createShared 1"
|
||||
var ret = createShared(T)
|
||||
ret[].reqType = reqType
|
||||
ret[].reqContent = reqContent
|
||||
@ -41,12 +40,10 @@ proc handleRes[T: string | void](
|
||||
## Handles the Result responses, which can either be Result[string, string] or
|
||||
## Result[void, string].
|
||||
|
||||
echo "-------------- handleRes 1"
|
||||
defer:
|
||||
deallocShared(request)
|
||||
|
||||
if res.isErr():
|
||||
echo "-------------- handleRes 2"
|
||||
foreignThreadGc:
|
||||
let msg = "libsds error: handleRes fireSyncRes error: " & $res.error
|
||||
request[].callback(
|
||||
@ -55,14 +52,12 @@ proc handleRes[T: string | void](
|
||||
return
|
||||
|
||||
foreignThreadGc:
|
||||
echo "-------------- handleRes 3"
|
||||
var msg: cstring = ""
|
||||
when T is string:
|
||||
msg = res.get().cstring()
|
||||
request[].callback(
|
||||
RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
|
||||
)
|
||||
echo "-------------- handleRes 4"
|
||||
return
|
||||
|
||||
proc process*(
|
||||
|
||||
@ -26,33 +26,25 @@ proc runSds(ctx: ptr SdsContext) {.async.} =
|
||||
## This is the worker body. This runs the SDS instance
|
||||
## and attends library user requests (stop, connect_to, etc.)
|
||||
|
||||
echo "---------------- runSds 1"
|
||||
var rm: ReliabilityManager
|
||||
|
||||
while true:
|
||||
echo "---------------- runSds 2"
|
||||
await ctx.reqSignal.wait()
|
||||
echo "---------------- runSds 3"
|
||||
|
||||
if ctx.running.load == false:
|
||||
echo "---------------- runSds 4"
|
||||
break
|
||||
|
||||
## Trying to get a request from the libsds requestor thread
|
||||
var request: ptr SdsThreadRequest
|
||||
echo "---------------- runSds 4"
|
||||
let recvOk = ctx.reqChannel.tryRecv(request)
|
||||
if not recvOk:
|
||||
echo "---------------- runSds 5"
|
||||
error "sds thread could not receive a request"
|
||||
continue
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
echo "---------------- runSds 6"
|
||||
if fireRes.isErr():
|
||||
error "could not fireSync back to requester thread", error = fireRes.error
|
||||
|
||||
echo "---------------- runSds 7"
|
||||
## Handle the request
|
||||
asyncSpawn SdsThreadRequest.process(request, addr rm)
|
||||
|
||||
@ -107,7 +99,6 @@ proc sendRequestToSdsThread*(
|
||||
): Result[void, string] =
|
||||
let req = SdsThreadRequest.createShared(reqType, reqContent, callback, userData)
|
||||
|
||||
echo "------------- sendRequestToSdsThread 1"
|
||||
# This lock is only necessary while we use a SP Channel and while the signalling
|
||||
# between threads assumes that there aren't concurrent requests.
|
||||
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
|
||||
@ -118,32 +109,24 @@ proc sendRequestToSdsThread*(
|
||||
## Sending the request
|
||||
let sentOk = ctx.reqChannel.trySend(req)
|
||||
if not sentOk:
|
||||
echo "------------- sendRequestToSdsThread 2"
|
||||
deallocShared(req)
|
||||
return err("Couldn't send a request to the sds thread: " & $req[])
|
||||
|
||||
let fireSyncRes = ctx.reqSignal.fireSync()
|
||||
echo "------------- sendRequestToSdsThread 3"
|
||||
if fireSyncRes.isErr():
|
||||
echo "------------- sendRequestToSdsThread 4"
|
||||
deallocShared(req)
|
||||
return err("failed fireSync: " & $fireSyncRes.error)
|
||||
|
||||
echo "------------- sendRequestToSdsThread 5"
|
||||
if fireSyncRes.get() == false:
|
||||
echo "------------- sendRequestToSdsThread 6"
|
||||
deallocShared(req)
|
||||
return err("Couldn't fireSync in time")
|
||||
|
||||
## wait until the SDS Thread properly received the request
|
||||
let res = ctx.reqReceivedSignal.waitSync()
|
||||
echo "------------- sendRequestToSdsThread 7"
|
||||
if res.isErr():
|
||||
echo "------------- sendRequestToSdsThread 8"
|
||||
deallocShared(req)
|
||||
return err("Couldn't receive reqReceivedSignal signal")
|
||||
|
||||
## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the
|
||||
## process proc.
|
||||
echo "------------- sendRequestToSdsThread 9"
|
||||
ok()
|
||||
|
||||
@ -13,17 +13,13 @@ proc newReliabilityManager*(
|
||||
##
|
||||
## Returns:
|
||||
## A Result containing either a new ReliabilityManager instance or an error.
|
||||
echo "--------------- newReliabilityManager 1"
|
||||
if not channelId.isSome():
|
||||
echo "--------------- newReliabilityManager 2"
|
||||
return err(ReliabilityError.reInvalidArgument)
|
||||
|
||||
try:
|
||||
echo "--------------- newReliabilityManager 3"
|
||||
let bloomFilter =
|
||||
newRollingBloomFilter(config.bloomFilterCapacity, config.bloomFilterErrorRate)
|
||||
|
||||
echo "--------------- newReliabilityManager 4"
|
||||
let rm = ReliabilityManager(
|
||||
lamportTimestamp: 0,
|
||||
messageHistory: @[],
|
||||
@ -33,9 +29,7 @@ proc newReliabilityManager*(
|
||||
channelId: channelId,
|
||||
config: config,
|
||||
)
|
||||
echo "--------------- newReliabilityManager 5"
|
||||
initLock(rm.lock)
|
||||
echo "--------------- newReliabilityManager 6"
|
||||
return ok(rm)
|
||||
except Exception:
|
||||
error "Failed to create ReliabilityManager", msg = getCurrentExceptionMsg()
|
||||
@ -275,13 +269,11 @@ proc setCallbacks*(
|
||||
## - onMessageSent: Callback function called when a message is confirmed as sent.
|
||||
## - onMissingDependencies: Callback function called when a message has missing dependencies.
|
||||
## - onPeriodicSync: Callback function called to notify about periodic sync
|
||||
echo "---------- calling setCallbacks"
|
||||
withLock rm.lock:
|
||||
rm.onMessageReady = onMessageReady
|
||||
rm.onMessageSent = onMessageSent
|
||||
rm.onMissingDependencies = onMissingDependencies
|
||||
rm.onPeriodicSync = onPeriodicSync
|
||||
echo "-------- after setCallbacks"
|
||||
|
||||
proc checkUnacknowledgedMessages(rm: ReliabilityManager) {.gcsafe.} =
|
||||
## Checks and processes unacknowledged messages in the outgoing buffer.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user