mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 23:43:07 +00:00
feat: topic health tracking (#3212)
This commit is contained in:
parent
90cac35c64
commit
e4a07a99ce
23
library/events/json_topic_health_change_event.nim
Normal file
23
library/events/json_topic_health_change_event.nim
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
import system, results, std/json
|
||||||
|
import stew/byteutils
|
||||||
|
import ../../waku/common/base64, ./json_base_event
|
||||||
|
import ../../waku/waku_relay
|
||||||
|
|
||||||
|
type JsonTopicHealthChangeEvent* = ref object of JsonEvent
|
||||||
|
pubsubTopic*: string
|
||||||
|
topicHealth*: TopicHealth
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: type JsonTopicHealthChangeEvent, pubsubTopic: string, topicHealth: TopicHealth
|
||||||
|
): T =
|
||||||
|
# Returns a TopicHealthChange event as indicated in
|
||||||
|
# https://rfc.vac.dev/spec/36/#jsonmessageevent-type
|
||||||
|
|
||||||
|
return JsonTopicHealthChangeEvent(
|
||||||
|
eventType: "relay_topic_health_change",
|
||||||
|
pubsubTopic: pubsubTopic,
|
||||||
|
topicHealth: topicHealth,
|
||||||
|
)
|
||||||
|
|
||||||
|
method `$`*(jsonTopicHealthChange: JsonTopicHealthChangeEvent): string =
|
||||||
|
$(%*jsonTopicHealthChange)
|
||||||
@ -13,8 +13,8 @@ import
|
|||||||
waku/node/waku_node,
|
waku/node/waku_node,
|
||||||
waku/waku_core/topics/pubsub_topic,
|
waku/waku_core/topics/pubsub_topic,
|
||||||
waku/waku_core/subscription/push_handler,
|
waku/waku_core/subscription/push_handler,
|
||||||
waku/waku_relay/protocol,
|
waku/waku_relay,
|
||||||
./events/json_message_event,
|
./events/[json_message_event, json_topic_health_change_event],
|
||||||
./waku_thread/waku_thread,
|
./waku_thread/waku_thread,
|
||||||
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
|
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
|
||||||
./waku_thread/inter_thread_communication/requests/peer_manager_request,
|
./waku_thread/inter_thread_communication/requests/peer_manager_request,
|
||||||
@ -84,6 +84,31 @@ proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler =
|
|||||||
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
|
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
|
||||||
)
|
)
|
||||||
|
|
||||||
|
proc onTopicHealthChange(ctx: ptr WakuContext): TopicHealthChangeHandler =
|
||||||
|
return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} =
|
||||||
|
# Callback that hadles the Waku Relay events. i.e. messages or errors.
|
||||||
|
if isNil(ctx[].eventCallback):
|
||||||
|
error "onTopicHealthChange - eventCallback is nil"
|
||||||
|
return
|
||||||
|
|
||||||
|
if isNil(ctx[].eventUserData):
|
||||||
|
error "onTopicHealthChange - eventUserData is nil"
|
||||||
|
return
|
||||||
|
|
||||||
|
foreignThreadGc:
|
||||||
|
try:
|
||||||
|
let event = $JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth)
|
||||||
|
cast[WakuCallBack](ctx[].eventCallback)(
|
||||||
|
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
|
||||||
|
)
|
||||||
|
except Exception, CatchableError:
|
||||||
|
let msg =
|
||||||
|
"Exception onTopicHealthChange when calling 'eventCallBack': " &
|
||||||
|
getCurrentExceptionMsg()
|
||||||
|
cast[WakuCallBack](ctx[].eventCallback)(
|
||||||
|
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
|
||||||
|
)
|
||||||
|
|
||||||
### End of not-exported components
|
### End of not-exported components
|
||||||
################################################################################
|
################################################################################
|
||||||
|
|
||||||
@ -139,7 +164,10 @@ proc waku_new(
|
|||||||
|
|
||||||
ctx.userData = userData
|
ctx.userData = userData
|
||||||
|
|
||||||
let appCallbacks = AppCallbacks(relayHandler: onReceivedMessage(ctx))
|
let appCallbacks = AppCallbacks(
|
||||||
|
relayHandler: onReceivedMessage(ctx),
|
||||||
|
topicHealthChangeHandler: onTopicHealthChange(ctx),
|
||||||
|
)
|
||||||
|
|
||||||
let retCode = handleRequest(
|
let retCode = handleRequest(
|
||||||
ctx,
|
ctx,
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
import ../waku_relay/protocol
|
import ../waku_relay
|
||||||
|
|
||||||
type AppCallbacks* = ref object
|
type AppCallbacks* = ref object
|
||||||
relayHandler*: WakuRelayHandler
|
relayHandler*: WakuRelayHandler
|
||||||
|
topicHealthChangeHandler*: TopicHealthChangeHandler
|
||||||
|
|||||||
@ -169,7 +169,13 @@ proc setupAppCallbacks(
|
|||||||
for shard in shards:
|
for shard in shards:
|
||||||
discard node.wakuRelay.subscribe($shard, appCallbacks.relayHandler)
|
discard node.wakuRelay.subscribe($shard, appCallbacks.relayHandler)
|
||||||
|
|
||||||
return ok()
|
if not appCallbacks.topicHealthChangeHandler.isNil():
|
||||||
|
if node.wakuRelay.isNil():
|
||||||
|
return
|
||||||
|
err("Cannot configure topicHealthChangeHandler callback without Relay mounted")
|
||||||
|
node.wakuRelay.onTopicHealthChange = appCallbacks.topicHealthChangeHandler
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: type Waku, confCopy: var WakuNodeConf, appCallbacks: AppCallbacks = nil
|
T: type Waku, confCopy: var WakuNodeConf, appCallbacks: AppCallbacks = nil
|
||||||
|
|||||||
@ -1,3 +1,3 @@
|
|||||||
import ./waku_relay/protocol
|
import ./waku_relay/[protocol, topic_health]
|
||||||
|
|
||||||
export protocol
|
export protocol, topic_health
|
||||||
|
|||||||
@ -18,7 +18,8 @@ import
|
|||||||
libp2p/protocols/pubsub/rpc/messages,
|
libp2p/protocols/pubsub/rpc/messages,
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
libp2p/switch
|
libp2p/switch
|
||||||
import ../waku_core, ./message_id, ../node/delivery_monitor/publish_observer
|
import
|
||||||
|
../waku_core, ./message_id, ./topic_health, ../node/delivery_monitor/publish_observer
|
||||||
|
|
||||||
from ../waku_core/codecs import WakuRelayCodec
|
from ../waku_core/codecs import WakuRelayCodec
|
||||||
export WakuRelayCodec
|
export WakuRelayCodec
|
||||||
@ -131,6 +132,9 @@ type
|
|||||||
# a map of validators to error messages to return when validation fails
|
# a map of validators to error messages to return when validation fails
|
||||||
validatorInserted: Table[PubsubTopic, bool]
|
validatorInserted: Table[PubsubTopic, bool]
|
||||||
publishObservers: seq[PublishObserver]
|
publishObservers: seq[PublishObserver]
|
||||||
|
topicsHealth*: Table[string, TopicHealth]
|
||||||
|
onTopicHealthChange*: TopicHealthChangeHandler
|
||||||
|
topicHealthLoopHandle*: Future[void]
|
||||||
|
|
||||||
proc initProtocolHandler(w: WakuRelay) =
|
proc initProtocolHandler(w: WakuRelay) =
|
||||||
proc handler(conn: Connection, proto: string) {.async.} =
|
proc handler(conn: Connection, proto: string) {.async.} =
|
||||||
@ -289,6 +293,7 @@ proc new*(
|
|||||||
procCall GossipSub(w).initPubSub()
|
procCall GossipSub(w).initPubSub()
|
||||||
w.initProtocolHandler()
|
w.initProtocolHandler()
|
||||||
w.initRelayObservers()
|
w.initRelayObservers()
|
||||||
|
w.topicsHealth = initTable[string, TopicHealth]()
|
||||||
except InitializationError:
|
except InitializationError:
|
||||||
return err("initialization error: " & getCurrentExceptionMsg())
|
return err("initialization error: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
@ -309,13 +314,78 @@ proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
|
|||||||
## Observes when a message is sent/received from the GossipSub PoV
|
## Observes when a message is sent/received from the GossipSub PoV
|
||||||
procCall GossipSub(w).addObserver(observer)
|
procCall GossipSub(w).addObserver(observer)
|
||||||
|
|
||||||
|
proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] =
|
||||||
|
## Returns the number of peers in a mesh defined by the passed pubsub topic.
|
||||||
|
## The 'mesh' atribute is defined in the GossipSub ref object.
|
||||||
|
|
||||||
|
if not w.mesh.hasKey(pubsubTopic):
|
||||||
|
debug "getNumPeersInMesh - there is no mesh peer for the given pubsub topic",
|
||||||
|
pubsubTopic = pubsubTopic
|
||||||
|
return ok(0)
|
||||||
|
|
||||||
|
let peersRes = catch:
|
||||||
|
w.mesh[pubsubTopic]
|
||||||
|
|
||||||
|
let peers: HashSet[PubSubPeer] = peersRes.valueOr:
|
||||||
|
return
|
||||||
|
err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg)
|
||||||
|
|
||||||
|
return ok(peers.len)
|
||||||
|
|
||||||
|
proc calculateTopicHealth(wakuRelay: WakuRelay, topic: string): TopicHealth =
|
||||||
|
let numPeersInMesh = wakuRelay.getNumPeersInMesh(topic).valueOr:
|
||||||
|
error "Could not calculate topic health", topic = topic, error = error
|
||||||
|
return TopicHealth.UNHEALTHY
|
||||||
|
|
||||||
|
if numPeersInMesh < 1:
|
||||||
|
return TopicHealth.UNHEALTHY
|
||||||
|
elif numPeersInMesh < wakuRelay.parameters.dLow:
|
||||||
|
return TopicHealth.MINIMALLY_HEALTHY
|
||||||
|
return TopicHealth.SUFFICIENTLY_HEALTHY
|
||||||
|
|
||||||
|
proc updateTopicsHealth(wakuRelay: WakuRelay) {.async.} =
|
||||||
|
var futs = newSeq[Future[void]]()
|
||||||
|
for topic in toSeq(wakuRelay.topics.keys):
|
||||||
|
## loop over all the topics I'm subscribed to
|
||||||
|
let
|
||||||
|
oldHealth = wakuRelay.topicsHealth.getOrDefault(topic)
|
||||||
|
currentHealth = wakuRelay.calculateTopicHealth(topic)
|
||||||
|
|
||||||
|
if oldHealth == currentHealth:
|
||||||
|
continue
|
||||||
|
|
||||||
|
wakuRelay.topicsHealth[topic] = currentHealth
|
||||||
|
if not wakuRelay.onTopicHealthChange.isNil():
|
||||||
|
let fut = wakuRelay.onTopicHealthChange(topic, currentHealth)
|
||||||
|
if not fut.completed(): # Fast path for successful sync handlers
|
||||||
|
futs.add(fut)
|
||||||
|
|
||||||
|
if futs.len() > 0:
|
||||||
|
# slow path - we have to wait for the handlers to complete
|
||||||
|
try:
|
||||||
|
futs = await allFinished(futs)
|
||||||
|
except CancelledError:
|
||||||
|
# check for errors in futures
|
||||||
|
for fut in futs:
|
||||||
|
if fut.failed:
|
||||||
|
let err = fut.readError()
|
||||||
|
warn "Error in health change handler", description = err.msg
|
||||||
|
|
||||||
|
proc topicsHealthLoop(wakuRelay: WakuRelay) {.async.} =
|
||||||
|
while true:
|
||||||
|
await wakuRelay.updateTopicsHealth()
|
||||||
|
await sleepAsync(10.seconds)
|
||||||
|
|
||||||
method start*(w: WakuRelay) {.async, base.} =
|
method start*(w: WakuRelay) {.async, base.} =
|
||||||
debug "start"
|
debug "start"
|
||||||
await procCall GossipSub(w).start()
|
await procCall GossipSub(w).start()
|
||||||
|
w.topicHealthLoopHandle = w.topicsHealthLoop()
|
||||||
|
|
||||||
method stop*(w: WakuRelay) {.async, base.} =
|
method stop*(w: WakuRelay) {.async, base.} =
|
||||||
debug "stop"
|
debug "stop"
|
||||||
await procCall GossipSub(w).stop()
|
await procCall GossipSub(w).stop()
|
||||||
|
if not w.topicHealthLoopHandle.isNil():
|
||||||
|
await w.topicHealthLoopHandle.cancelAndWait()
|
||||||
|
|
||||||
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
|
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
|
||||||
GossipSub(w).topics.hasKey(topic)
|
GossipSub(w).topics.hasKey(topic)
|
||||||
@ -455,25 +525,6 @@ proc publish*(
|
|||||||
|
|
||||||
return relayedPeerCount
|
return relayedPeerCount
|
||||||
|
|
||||||
proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] =
|
|
||||||
## Returns the number of peers in a mesh defined by the passed pubsub topic.
|
|
||||||
## The 'mesh' atribute is defined in the GossipSub ref object.
|
|
||||||
|
|
||||||
if not w.mesh.hasKey(pubsubTopic):
|
|
||||||
return err(
|
|
||||||
"getNumPeersInMesh - there is no mesh peer for the given pubsub topic: " &
|
|
||||||
pubsubTopic
|
|
||||||
)
|
|
||||||
|
|
||||||
let peersRes = catch:
|
|
||||||
w.mesh[pubsubTopic]
|
|
||||||
|
|
||||||
let peers: HashSet[PubSubPeer] = peersRes.valueOr:
|
|
||||||
return
|
|
||||||
err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg)
|
|
||||||
|
|
||||||
return ok(peers.len)
|
|
||||||
|
|
||||||
proc getNumConnectedPeers*(
|
proc getNumConnectedPeers*(
|
||||||
w: WakuRelay, pubsubTopic: PubsubTopic
|
w: WakuRelay, pubsubTopic: PubsubTopic
|
||||||
): Result[int, string] =
|
): Result[int, string] =
|
||||||
|
|||||||
19
waku/waku_relay/topic_health.nim
Normal file
19
waku/waku_relay/topic_health.nim
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
import chronos
|
||||||
|
|
||||||
|
import ../waku_core
|
||||||
|
|
||||||
|
type TopicHealth* = enum
|
||||||
|
UNHEALTHY
|
||||||
|
MINIMALLY_HEALTHY
|
||||||
|
SUFFICIENTLY_HEALTHY
|
||||||
|
|
||||||
|
proc `$`*(t: TopicHealth): string =
|
||||||
|
result =
|
||||||
|
case t
|
||||||
|
of UNHEALTHY: "UnHealthy"
|
||||||
|
of MINIMALLY_HEALTHY: "MinimallyHealthy"
|
||||||
|
of SUFFICIENTLY_HEALTHY: "SufficientlyHealthy"
|
||||||
|
|
||||||
|
type TopicHealthChangeHandler* = proc(
|
||||||
|
pubsubTopic: PubsubTopic, topicHealth: TopicHealth
|
||||||
|
): Future[void] {.gcsafe, raises: [Defect].}
|
||||||
Loading…
x
Reference in New Issue
Block a user