mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-03 14:33:12 +00:00
chore: Libwaku watchdog that can potentially raise a WakuNotResponding event if Waku is blocked (#3466)
* refactor add waku not responding event to libwaku Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
This commit is contained in:
parent
5f5e0893e0
commit
d7a3a85db9
9
library/events/json_waku_not_responding_event.nim
Normal file
9
library/events/json_waku_not_responding_event.nim
Normal file
@ -0,0 +1,9 @@
|
||||
import system, std/json, ./json_base_event
|
||||
|
||||
type JsonWakuNotRespondingEvent* = ref object of JsonEvent
|
||||
|
||||
proc new*(T: type JsonWakuNotRespondingEvent): T =
|
||||
return JsonWakuNotRespondingEvent(eventType: "waku_not_responding")
|
||||
|
||||
method `$`*(event: JsonWakuNotRespondingEvent): string =
|
||||
$(%*event)
|
||||
@ -45,6 +45,8 @@ int waku_version(void* ctx,
|
||||
WakuCallBack callback,
|
||||
void* userData);
|
||||
|
||||
// Sets a callback that will be invoked whenever an event occurs.
|
||||
// It is crucial that the passed callback is fast, non-blocking and potentially thread-safe.
|
||||
void waku_set_event_callback(void* ctx,
|
||||
WakuCallBack callback,
|
||||
void* userData);
|
||||
|
||||
@ -15,8 +15,7 @@ import
|
||||
waku/waku_core/topics/pubsub_topic,
|
||||
waku/waku_core/subscription/push_handler,
|
||||
waku/waku_relay,
|
||||
./events/
|
||||
[json_message_event, json_topic_health_change_event, json_connection_change_event],
|
||||
./events/json_message_event,
|
||||
./waku_thread/waku_thread,
|
||||
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
|
||||
./waku_thread/inter_thread_communication/requests/peer_manager_request,
|
||||
@ -48,25 +47,6 @@ template checkLibwakuParams*(
|
||||
if isNil(callback):
|
||||
return RET_MISSING_CALLBACK
|
||||
|
||||
template callEventCallback(ctx: ptr WakuContext, eventName: string, body: untyped) =
|
||||
if isNil(ctx[].eventCallback):
|
||||
error eventName & " - eventCallback is nil"
|
||||
return
|
||||
|
||||
foreignThreadGc:
|
||||
try:
|
||||
let event = body
|
||||
cast[WakuCallBack](ctx[].eventCallback)(
|
||||
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
|
||||
)
|
||||
except Exception, CatchableError:
|
||||
let msg =
|
||||
"Exception " & eventName & " when calling 'eventCallBack': " &
|
||||
getCurrentExceptionMsg()
|
||||
cast[WakuCallBack](ctx[].eventCallback)(
|
||||
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
|
||||
)
|
||||
|
||||
proc handleRequest(
|
||||
ctx: ptr WakuContext,
|
||||
requestType: RequestType,
|
||||
@ -81,21 +61,6 @@ proc handleRequest(
|
||||
|
||||
return RET_OK
|
||||
|
||||
proc onConnectionChange(ctx: ptr WakuContext): ConnectionChangeHandler =
|
||||
return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} =
|
||||
callEventCallback(ctx, "onConnectionChange"):
|
||||
$JsonConnectionChangeEvent.new($peerId, peerEvent)
|
||||
|
||||
proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler =
|
||||
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
|
||||
callEventCallback(ctx, "onReceivedMessage"):
|
||||
$JsonMessageEvent.new(pubsubTopic, msg)
|
||||
|
||||
proc onTopicHealthChange(ctx: ptr WakuContext): TopicHealthChangeHandler =
|
||||
return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} =
|
||||
callEventCallback(ctx, "onTopicHealthChange"):
|
||||
$JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth)
|
||||
|
||||
### End of not-exported components
|
||||
################################################################################
|
||||
|
||||
@ -146,8 +111,8 @@ proc waku_new(
|
||||
return nil
|
||||
|
||||
## Create the Waku thread that will keep waiting for req from the main thread.
|
||||
var ctx = waku_thread.createWakuThread().valueOr:
|
||||
let msg = "Error in createWakuThread: " & $error
|
||||
var ctx = waku_thread.createWakuContext().valueOr:
|
||||
let msg = "Error in createWakuContext: " & $error
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
return nil
|
||||
|
||||
@ -180,7 +145,7 @@ proc waku_destroy(
|
||||
initializeLibrary()
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
waku_thread.destroyWakuThread(ctx).isOkOr:
|
||||
waku_thread.destroyWakuContext(ctx).isOkOr:
|
||||
let msg = "libwaku error: " & $error
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
return RET_ERR
|
||||
|
||||
@ -18,6 +18,7 @@ type DebugNodeMsgType* = enum
|
||||
RETRIEVE_MY_PEER_ID
|
||||
RETRIEVE_METRICS
|
||||
RETRIEVE_ONLINE_STATE
|
||||
CHECK_WAKU_NOT_BLOCKED
|
||||
|
||||
type DebugNodeRequest* = object
|
||||
operation: DebugNodeMsgType
|
||||
@ -55,6 +56,8 @@ proc process*(
|
||||
return ok(getMetrics())
|
||||
of RETRIEVE_ONLINE_STATE:
|
||||
return ok($waku.healthMonitor.onlineMonitor.amIOnline())
|
||||
of CHECK_WAKU_NOT_BLOCKED:
|
||||
return ok("waku thread is not blocked")
|
||||
|
||||
error "unsupported operation in DebugNodeRequest"
|
||||
return err("unsupported operation in DebugNodeRequest")
|
||||
|
||||
@ -4,10 +4,22 @@
|
||||
|
||||
import std/[options, atomics, os, net, locks]
|
||||
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
||||
import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types
|
||||
import
|
||||
waku/factory/waku,
|
||||
waku/node/peer_manager,
|
||||
waku/waku_relay/[protocol, topic_health],
|
||||
waku/waku_core/[topics/pubsub_topic, message],
|
||||
./inter_thread_communication/[waku_thread_request, requests/debug_node_request],
|
||||
../ffi_types,
|
||||
../events/[
|
||||
json_message_event, json_topic_health_change_event, json_connection_change_event,
|
||||
json_waku_not_responding_event,
|
||||
]
|
||||
|
||||
type WakuContext* = object
|
||||
thread: Thread[(ptr WakuContext)]
|
||||
wakuThread: Thread[(ptr WakuContext)]
|
||||
watchdogThread: Thread[(ptr WakuContext)]
|
||||
# monitors the Waku thread and notifies the Waku SDK consumer if it hangs
|
||||
lock: Lock
|
||||
reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest]
|
||||
reqSignal: ThreadSignalPtr
|
||||
@ -17,78 +29,48 @@ type WakuContext* = object
|
||||
userData*: pointer
|
||||
eventCallback*: pointer
|
||||
eventUserdata*: pointer
|
||||
running: Atomic[bool] # To control when the thread is running
|
||||
running: Atomic[bool] # To control when the threads are running
|
||||
|
||||
const git_version* {.strdefine.} = "n/a"
|
||||
const versionString = "version / git commit hash: " & waku.git_version
|
||||
|
||||
proc runWaku(ctx: ptr WakuContext) {.async.} =
|
||||
## This is the worker body. This runs the Waku node
|
||||
## and attends library user requests (stop, connect_to, etc.)
|
||||
template callEventCallback(ctx: ptr WakuContext, eventName: string, body: untyped) =
|
||||
if isNil(ctx[].eventCallback):
|
||||
error eventName & " - eventCallback is nil"
|
||||
return
|
||||
|
||||
var waku: Waku
|
||||
foreignThreadGc:
|
||||
try:
|
||||
let event = body
|
||||
cast[WakuCallBack](ctx[].eventCallback)(
|
||||
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
|
||||
)
|
||||
except Exception, CatchableError:
|
||||
let msg =
|
||||
"Exception " & eventName & " when calling 'eventCallBack': " &
|
||||
getCurrentExceptionMsg()
|
||||
cast[WakuCallBack](ctx[].eventCallback)(
|
||||
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
|
||||
)
|
||||
|
||||
while true:
|
||||
await ctx.reqSignal.wait()
|
||||
proc onConnectionChange*(ctx: ptr WakuContext): ConnectionChangeHandler =
|
||||
return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} =
|
||||
callEventCallback(ctx, "onConnectionChange"):
|
||||
$JsonConnectionChangeEvent.new($peerId, peerEvent)
|
||||
|
||||
if ctx.running.load == false:
|
||||
break
|
||||
proc onReceivedMessage*(ctx: ptr WakuContext): WakuRelayHandler =
|
||||
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
|
||||
callEventCallback(ctx, "onReceivedMessage"):
|
||||
$JsonMessageEvent.new(pubsubTopic, msg)
|
||||
|
||||
## Trying to get a request from the libwaku requestor thread
|
||||
var request: ptr WakuThreadRequest
|
||||
let recvOk = ctx.reqChannel.tryRecv(request)
|
||||
if not recvOk:
|
||||
error "waku thread could not receive a request"
|
||||
continue
|
||||
proc onTopicHealthChange*(ctx: ptr WakuContext): TopicHealthChangeHandler =
|
||||
return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} =
|
||||
callEventCallback(ctx, "onTopicHealthChange"):
|
||||
$JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth)
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "could not fireSync back to requester thread", error = fireRes.error
|
||||
|
||||
## Handle the request
|
||||
asyncSpawn WakuThreadRequest.process(request, addr waku)
|
||||
|
||||
proc run(ctx: ptr WakuContext) {.thread.} =
|
||||
## Launch waku worker
|
||||
waitFor runWaku(ctx)
|
||||
|
||||
proc createWakuThread*(): Result[ptr WakuContext, string] =
|
||||
## This proc is called from the main thread and it creates
|
||||
## the Waku working thread.
|
||||
var ctx = createShared(WakuContext, 1)
|
||||
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create reqSignal ThreadSignalPtr")
|
||||
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
|
||||
ctx.lock.initLock()
|
||||
|
||||
ctx.running.store(true)
|
||||
|
||||
try:
|
||||
createThread(ctx.thread, run, ctx)
|
||||
except ValueError, ResourceExhaustedError:
|
||||
# and freeShared for typed allocations!
|
||||
freeShared(ctx)
|
||||
|
||||
return err("failed to create the Waku thread: " & getCurrentExceptionMsg())
|
||||
|
||||
return ok(ctx)
|
||||
|
||||
proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] =
|
||||
ctx.running.store(false)
|
||||
|
||||
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
|
||||
return err("error in destroyWakuThread: " & $error)
|
||||
if not signaledOnTime:
|
||||
return err("failed to signal reqSignal on time in destroyWakuThread")
|
||||
|
||||
joinThread(ctx.thread)
|
||||
ctx.lock.deinitLock()
|
||||
?ctx.reqSignal.close()
|
||||
?ctx.reqReceivedSignal.close()
|
||||
freeShared(ctx)
|
||||
|
||||
return ok()
|
||||
proc onWakuNotResponding*(ctx: ptr WakuContext) =
|
||||
callEventCallback(ctx, "onWakuNotResponsive"):
|
||||
$JsonWakuNotRespondingEvent.new()
|
||||
|
||||
proc sendRequestToWakuThread*(
|
||||
ctx: ptr WakuContext,
|
||||
@ -96,16 +78,17 @@ proc sendRequestToWakuThread*(
|
||||
reqContent: pointer,
|
||||
callback: WakuCallBack,
|
||||
userData: pointer,
|
||||
timeout = InfiniteDuration,
|
||||
): Result[void, string] =
|
||||
let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData)
|
||||
|
||||
ctx.lock.acquire()
|
||||
# This lock is only necessary while we use a SP Channel and while the signalling
|
||||
# between threads assumes that there aren't concurrent requests.
|
||||
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
|
||||
# requests concurrently and spare us the need of locks
|
||||
ctx.lock.acquire()
|
||||
defer:
|
||||
ctx.lock.release()
|
||||
|
||||
let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData)
|
||||
## Sending the request
|
||||
let sentOk = ctx.reqChannel.trySend(req)
|
||||
if not sentOk:
|
||||
@ -122,11 +105,115 @@ proc sendRequestToWakuThread*(
|
||||
return err("Couldn't fireSync in time")
|
||||
|
||||
## wait until the Waku Thread properly received the request
|
||||
let res = ctx.reqReceivedSignal.waitSync()
|
||||
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
||||
if res.isErr():
|
||||
deallocShared(req)
|
||||
return err("Couldn't receive reqReceivedSignal signal")
|
||||
|
||||
## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the
|
||||
## process proc.
|
||||
## process proc. See the 'waku_thread_request.nim' module for more details.
|
||||
ok()
|
||||
|
||||
proc watchdogThreadBody(ctx: ptr WakuContext) {.thread.} =
|
||||
## Watchdog thread that monitors the Waku thread and notifies the library user if it hangs.
|
||||
|
||||
let watchdogRun = proc(ctx: ptr WakuContext) {.async.} =
|
||||
const WatchdogTimeinterval = 1.seconds
|
||||
const WakuNotRespondingTimeout = 3.seconds
|
||||
while true:
|
||||
await sleepAsync(WatchdogTimeinterval)
|
||||
|
||||
if ctx.running.load == false:
|
||||
debug "Watchdog thread exiting because WakuContext is not running"
|
||||
break
|
||||
|
||||
let wakuCallback = proc(
|
||||
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
||||
) {.cdecl, gcsafe, raises: [].} =
|
||||
discard ## Don't do anything. Just respecting the callback signature.
|
||||
const nilUserData = nil
|
||||
|
||||
trace "Sending watchdog request to Waku thread"
|
||||
|
||||
sendRequestToWakuThread(
|
||||
ctx,
|
||||
RequestType.DEBUG,
|
||||
DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED),
|
||||
wakuCallback,
|
||||
nilUserData,
|
||||
WakuNotRespondingTimeout,
|
||||
).isOkOr:
|
||||
error "Failed to send watchdog request to Waku thread", error = $error
|
||||
onWakuNotResponding(ctx)
|
||||
|
||||
waitFor watchdogRun(ctx)
|
||||
|
||||
proc wakuThreadBody(ctx: ptr WakuContext) {.thread.} =
|
||||
## Waku thread that attends library user requests (stop, connect_to, etc.)
|
||||
|
||||
let wakuRun = proc(ctx: ptr WakuContext) {.async.} =
|
||||
var waku: Waku
|
||||
while true:
|
||||
await ctx.reqSignal.wait()
|
||||
|
||||
if ctx.running.load == false:
|
||||
break
|
||||
|
||||
## Trying to get a request from the libwaku requestor thread
|
||||
var request: ptr WakuThreadRequest
|
||||
let recvOk = ctx.reqChannel.tryRecv(request)
|
||||
if not recvOk:
|
||||
error "waku thread could not receive a request"
|
||||
continue
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "could not fireSync back to requester thread", error = fireRes.error
|
||||
|
||||
## Handle the request
|
||||
asyncSpawn WakuThreadRequest.process(request, addr waku)
|
||||
|
||||
waitFor wakuRun(ctx)
|
||||
|
||||
proc createWakuContext*(): Result[ptr WakuContext, string] =
|
||||
## This proc is called from the main thread and it creates
|
||||
## the Waku working thread.
|
||||
var ctx = createShared(WakuContext, 1)
|
||||
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create reqSignal ThreadSignalPtr")
|
||||
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
|
||||
ctx.lock.initLock()
|
||||
|
||||
ctx.running.store(true)
|
||||
|
||||
try:
|
||||
createThread(ctx.wakuThread, wakuThreadBody, ctx)
|
||||
except ValueError, ResourceExhaustedError:
|
||||
freeShared(ctx)
|
||||
return err("failed to create the Waku thread: " & getCurrentExceptionMsg())
|
||||
|
||||
try:
|
||||
createThread(ctx.watchdogThread, watchdogThreadBody, ctx)
|
||||
except ValueError, ResourceExhaustedError:
|
||||
freeShared(ctx)
|
||||
return err("failed to create the watchdog thread: " & getCurrentExceptionMsg())
|
||||
|
||||
return ok(ctx)
|
||||
|
||||
proc destroyWakuContext*(ctx: ptr WakuContext): Result[void, string] =
|
||||
ctx.running.store(false)
|
||||
|
||||
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
|
||||
return err("error in destroyWakuContext: " & $error)
|
||||
if not signaledOnTime:
|
||||
return err("failed to signal reqSignal on time in destroyWakuContext")
|
||||
|
||||
joinThread(ctx.wakuThread)
|
||||
joinThread(ctx.watchdogThread)
|
||||
ctx.lock.deinitLock()
|
||||
?ctx.reqSignal.close()
|
||||
?ctx.reqReceivedSignal.close()
|
||||
freeShared(ctx)
|
||||
|
||||
return ok()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user