feat: connection change event (#3225)

This commit is contained in:
gabrielmer 2025-01-08 18:53:00 +01:00 committed by GitHub
parent d932dd10cc
commit e81a5517be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 76 additions and 47 deletions

View File

@ -0,0 +1,20 @@
import system, std/json, libp2p/[connmanager, peerid]
import ../../waku/common/base64, ./json_base_event
type JsonConnectionChangeEvent* = ref object of JsonEvent
peerId*: string
peerEvent*: PeerEventKind
proc new*(
T: type JsonConnectionChangeEvent, peerId: string, peerEvent: PeerEventKind
): T =
# Returns a JsonConnectionChangeEvent event as indicated in
# https://rfc.vac.dev/spec/36/#jsonmessageevent-type
return JsonConnectionChangeEvent(
eventType: "connection_change", peerId: peerId, peerEvent: peerEvent
)
method `$`*(jsonConnectionChangeEvent: JsonConnectionChangeEvent): string =
$(%*jsonConnectionChangeEvent)

View File

@ -11,10 +11,12 @@ import
waku/common/base64,
waku/waku_core/message/message,
waku/node/waku_node,
waku/node/peer_manager,
waku/waku_core/topics/pubsub_topic,
waku/waku_core/subscription/push_handler,
waku/waku_relay,
./events/[json_message_event, json_topic_health_change_event],
./events/
[json_message_event, json_topic_health_change_event, json_connection_change_event],
./waku_thread/waku_thread,
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
./waku_thread/inter_thread_communication/requests/peer_manager_request,
@ -45,6 +47,29 @@ template checkLibwakuParams*(
if isNil(callback):
return RET_MISSING_CALLBACK
template callEventCallback(ctx: ptr WakuContext, eventName: string, body: untyped) =
if isNil(ctx[].eventCallback):
error eventName & " - eventCallback is nil"
return
if isNil(ctx[].eventUserData):
error eventName & " - eventUserData is nil"
return
foreignThreadGc:
try:
let event = body
cast[WakuCallBack](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
except Exception, CatchableError:
let msg =
"Exception " & eventName & " when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)
proc handleRequest(
ctx: ptr WakuContext,
requestType: RequestType,
@ -59,55 +84,20 @@ proc handleRequest(
return RET_OK
proc onConnectionChange(ctx: ptr WakuContext): ConnectionChangeHandler =
return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} =
callEventCallback(ctx, "onConnectionChange"):
$JsonConnectionChangeEvent.new($peerId, peerEvent)
proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler =
return proc(
pubsubTopic: PubsubTopic, msg: WakuMessage
): Future[system.void] {.async.} =
# Callback that hadles the Waku Relay events. i.e. messages or errors.
if isNil(ctx[].eventCallback):
error "eventCallback is nil"
return
if isNil(ctx[].eventUserData):
error "eventUserData is nil"
return
foreignThreadGc:
try:
let event = $JsonMessageEvent.new(pubsubTopic, msg)
cast[WakuCallBack](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
except Exception, CatchableError:
let msg = "Exception when calling 'eventCallBack': " & getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
callEventCallback(ctx, "onReceivedMessage"):
$JsonMessageEvent.new(pubsubTopic, msg)
proc onTopicHealthChange(ctx: ptr WakuContext): TopicHealthChangeHandler =
return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} =
# Callback that hadles the Waku Relay events. i.e. messages or errors.
if isNil(ctx[].eventCallback):
error "onTopicHealthChange - eventCallback is nil"
return
if isNil(ctx[].eventUserData):
error "onTopicHealthChange - eventUserData is nil"
return
foreignThreadGc:
try:
let event = $JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth)
cast[WakuCallBack](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
except Exception, CatchableError:
let msg =
"Exception onTopicHealthChange when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)
callEventCallback(ctx, "onTopicHealthChange"):
$JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth)
### End of not-exported components
################################################################################
@ -167,6 +157,7 @@ proc waku_new(
let appCallbacks = AppCallbacks(
relayHandler: onReceivedMessage(ctx),
topicHealthChangeHandler: onTopicHealthChange(ctx),
connectionChangeHandler: onConnectionChange(ctx),
)
let retCode = handleRequest(

View File

@ -1,5 +1,6 @@
import ../waku_relay
import ../waku_relay, ../node/peer_manager
type AppCallbacks* = ref object
relayHandler*: WakuRelayHandler
topicHealthChangeHandler*: TopicHealthChangeHandler
connectionChangeHandler*: ConnectionChangeHandler

View File

@ -175,6 +175,12 @@ proc setupAppCallbacks(
err("Cannot configure topicHealthChangeHandler callback without Relay mounted")
node.wakuRelay.onTopicHealthChange = appCallbacks.topicHealthChangeHandler
if not appCallbacks.connectionChangeHandler.isNil():
if node.peerManager.isNil():
return
err("Cannot configure connectionChangeHandler callback with empty peer manager")
node.peerManager.onConnectionChange = appCallbacks.connectionChangeHandler
return ok()
proc new*(

View File

@ -71,6 +71,10 @@ const
# Max peers that we allow from the same IP
DefaultColocationLimit* = 5
type ConnectionChangeHandler* = proc(
peerId: PeerId, peerEvent: PeerEventKind
): Future[void] {.gcsafe, raises: [Defect].}
type PeerManager* = ref object of RootObj
switch*: Switch
wakuPeerStore*: WakuPeerStore
@ -87,6 +91,7 @@ type PeerManager* = ref object of RootObj
colocationLimit*: int
started: bool
shardedPeerManagement: bool # temp feature flag
onConnectionChange*: ConnectionChangeHandler
#~~~~~~~~~~~~~~~~~~~#
# Helper Functions #
@ -676,6 +681,9 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
asyncSpawn(pm.switch.disconnect(peerId))
pm.wakuPeerStore.delete(peerId)
if not pm.onConnectionChange.isNil():
# we don't want to await for the callback to finish
asyncSpawn pm.onConnectionChange(peerId, Joined)
of Left:
direction = UnknownDirection
connectedness = CanConnect
@ -687,6 +695,9 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
if pm.ipTable[ip].len == 0:
pm.ipTable.del(ip)
break
if not pm.onConnectionChange.isNil():
# we don't want to await for the callback to finish
asyncSpawn pm.onConnectionChange(peerId, Left)
of Identified:
debug "event identified", peerId = peerId