From e81a5517be6d86278755d9301ec3c15f33e674b6 Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Wed, 8 Jan 2025 18:53:00 +0100 Subject: [PATCH] feat: connection change event (#3225) --- .../events/json_connection_change_event.nim | 20 +++++ library/libwaku.nim | 83 +++++++++---------- waku/factory/app_callbacks.nim | 3 +- waku/factory/waku.nim | 6 ++ waku/node/peer_manager/peer_manager.nim | 11 +++ 5 files changed, 76 insertions(+), 47 deletions(-) create mode 100644 library/events/json_connection_change_event.nim diff --git a/library/events/json_connection_change_event.nim b/library/events/json_connection_change_event.nim new file mode 100644 index 000000000..1a00237b6 --- /dev/null +++ b/library/events/json_connection_change_event.nim @@ -0,0 +1,20 @@ +import system, std/json, libp2p/[connmanager, peerid] + +import ../../waku/common/base64, ./json_base_event + +type JsonConnectionChangeEvent* = ref object of JsonEvent + peerId*: string + peerEvent*: PeerEventKind + +proc new*( + T: type JsonConnectionChangeEvent, peerId: string, peerEvent: PeerEventKind +): T = + # Returns a JsonConnectionChangeEvent event as indicated in + # https://rfc.vac.dev/spec/36/#jsonmessageevent-type + + return JsonConnectionChangeEvent( + eventType: "connection_change", peerId: peerId, peerEvent: peerEvent + ) + +method `$`*(jsonConnectionChangeEvent: JsonConnectionChangeEvent): string = + $(%*jsonConnectionChangeEvent) diff --git a/library/libwaku.nim b/library/libwaku.nim index 1488cebc1..208f20257 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -11,10 +11,12 @@ import waku/common/base64, waku/waku_core/message/message, waku/node/waku_node, + waku/node/peer_manager, waku/waku_core/topics/pubsub_topic, waku/waku_core/subscription/push_handler, waku/waku_relay, - ./events/[json_message_event, json_topic_health_change_event], + ./events/ + [json_message_event, json_topic_health_change_event, json_connection_change_event], ./waku_thread/waku_thread, ./waku_thread/inter_thread_communication/requests/node_lifecycle_request, ./waku_thread/inter_thread_communication/requests/peer_manager_request, @@ -45,6 +47,29 @@ template checkLibwakuParams*( if isNil(callback): return RET_MISSING_CALLBACK +template callEventCallback(ctx: ptr WakuContext, eventName: string, body: untyped) = + if isNil(ctx[].eventCallback): + error eventName & " - eventCallback is nil" + return + + if isNil(ctx[].eventUserData): + error eventName & " - eventUserData is nil" + return + + foreignThreadGc: + try: + let event = body + cast[WakuCallBack](ctx[].eventCallback)( + RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData + ) + except Exception, CatchableError: + let msg = + "Exception " & eventName & " when calling 'eventCallBack': " & + getCurrentExceptionMsg() + cast[WakuCallBack](ctx[].eventCallback)( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData + ) + proc handleRequest( ctx: ptr WakuContext, requestType: RequestType, @@ -59,55 +84,20 @@ proc handleRequest( return RET_OK +proc onConnectionChange(ctx: ptr WakuContext): ConnectionChangeHandler = + return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} = + callEventCallback(ctx, "onConnectionChange"): + $JsonConnectionChangeEvent.new($peerId, peerEvent) + proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler = - return proc( - pubsubTopic: PubsubTopic, msg: WakuMessage - ): Future[system.void] {.async.} = - # Callback that hadles the Waku Relay events. i.e. messages or errors. - if isNil(ctx[].eventCallback): - error "eventCallback is nil" - return - - if isNil(ctx[].eventUserData): - error "eventUserData is nil" - return - - foreignThreadGc: - try: - let event = $JsonMessageEvent.new(pubsubTopic, msg) - cast[WakuCallBack](ctx[].eventCallback)( - RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData - ) - except Exception, CatchableError: - let msg = "Exception when calling 'eventCallBack': " & getCurrentExceptionMsg() - cast[WakuCallBack](ctx[].eventCallback)( - RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData - ) + return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = + callEventCallback(ctx, "onReceivedMessage"): + $JsonMessageEvent.new(pubsubTopic, msg) proc onTopicHealthChange(ctx: ptr WakuContext): TopicHealthChangeHandler = return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} = - # Callback that hadles the Waku Relay events. i.e. messages or errors. - if isNil(ctx[].eventCallback): - error "onTopicHealthChange - eventCallback is nil" - return - - if isNil(ctx[].eventUserData): - error "onTopicHealthChange - eventUserData is nil" - return - - foreignThreadGc: - try: - let event = $JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth) - cast[WakuCallBack](ctx[].eventCallback)( - RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData - ) - except Exception, CatchableError: - let msg = - "Exception onTopicHealthChange when calling 'eventCallBack': " & - getCurrentExceptionMsg() - cast[WakuCallBack](ctx[].eventCallback)( - RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData - ) + callEventCallback(ctx, "onTopicHealthChange"): + $JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth) ### End of not-exported components ################################################################################ @@ -167,6 +157,7 @@ proc waku_new( let appCallbacks = AppCallbacks( relayHandler: onReceivedMessage(ctx), topicHealthChangeHandler: onTopicHealthChange(ctx), + connectionChangeHandler: onConnectionChange(ctx), ) let retCode = handleRequest( diff --git a/waku/factory/app_callbacks.nim b/waku/factory/app_callbacks.nim index 1bcd6cc0e..d28b9f2d1 100644 --- a/waku/factory/app_callbacks.nim +++ b/waku/factory/app_callbacks.nim @@ -1,5 +1,6 @@ -import ../waku_relay +import ../waku_relay, ../node/peer_manager type AppCallbacks* = ref object relayHandler*: WakuRelayHandler topicHealthChangeHandler*: TopicHealthChangeHandler + connectionChangeHandler*: ConnectionChangeHandler diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 4ad094dff..4f7d45448 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -175,6 +175,12 @@ proc setupAppCallbacks( err("Cannot configure topicHealthChangeHandler callback without Relay mounted") node.wakuRelay.onTopicHealthChange = appCallbacks.topicHealthChangeHandler + if not appCallbacks.connectionChangeHandler.isNil(): + if node.peerManager.isNil(): + return + err("Cannot configure connectionChangeHandler callback with empty peer manager") + node.peerManager.onConnectionChange = appCallbacks.connectionChangeHandler + return ok() proc new*( diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index e17e9fd0d..385da471f 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -71,6 +71,10 @@ const # Max peers that we allow from the same IP DefaultColocationLimit* = 5 +type ConnectionChangeHandler* = proc( + peerId: PeerId, peerEvent: PeerEventKind +): Future[void] {.gcsafe, raises: [Defect].} + type PeerManager* = ref object of RootObj switch*: Switch wakuPeerStore*: WakuPeerStore @@ -87,6 +91,7 @@ type PeerManager* = ref object of RootObj colocationLimit*: int started: bool shardedPeerManagement: bool # temp feature flag + onConnectionChange*: ConnectionChangeHandler #~~~~~~~~~~~~~~~~~~~# # Helper Functions # @@ -676,6 +681,9 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip asyncSpawn(pm.switch.disconnect(peerId)) pm.wakuPeerStore.delete(peerId) + if not pm.onConnectionChange.isNil(): + # we don't want to await for the callback to finish + asyncSpawn pm.onConnectionChange(peerId, Joined) of Left: direction = UnknownDirection connectedness = CanConnect @@ -687,6 +695,9 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = if pm.ipTable[ip].len == 0: pm.ipTable.del(ip) break + if not pm.onConnectionChange.isNil(): + # we don't want to await for the callback to finish + asyncSpawn pm.onConnectionChange(peerId, Left) of Identified: debug "event identified", peerId = peerId