Add external handler

This commit is contained in:
Jazz Turner-Baggs 2025-07-24 16:39:02 -07:00
parent c857cb05b1
commit c19738c0d8
No known key found for this signature in database
2 changed files with 26 additions and 10 deletions

View File

@ -1,13 +1,19 @@
import
chronicles,
chronos
import
waku_client
proc handleMessages(pubsubTopic: string, message: seq[byte]): Future[
void] {.gcsafe, raises: [Defect].} =
info "ClientRecv", pubTopic = pubsubTopic, msg = message
proc main(): Future[void] {.async.} =
echo "Starting POC"
let cfg = DefaultConfig()
let client = initWakuClient(cfg)
let client = initWakuClient(cfg, @[PayloadHandler(handleMessages)])
await client.start()
echo "End of POC"

View File

@ -27,6 +27,10 @@ const
FilterContentTopic = ContentTopic("/chatsdk/test/proto")
type PayloadHandler* = proc(pubsubTopic: string, message: seq[byte]): Future[void] {.
gcsafe, raises: [Defect]
.}
type WakuConfig* = object
port*: uint16
clusterId*: uint16
@ -37,6 +41,7 @@ type
WakuClient* = ref object
cfg: WakuConfig
node*: WakuNode
handlers: seq[PayloadHandler]
proc DefaultConfig*(): WakuConfig =
@ -102,8 +107,8 @@ proc buildWakuNode(cfg: WakuConfig): WakuNode =
result = node
proc messagePushHandler(
pubsubTopic: PubsubTopic, message: WakuMessage
proc messageHandler(client: WakuClient, pubsubTopic: PubsubTopic,
message: WakuMessage
) {.async, gcsafe.} =
let payloadStr = string.fromBytes(message.payload)
notice "message received",
@ -113,6 +118,10 @@ proc messagePushHandler(
timestamp = message.timestamp
for handler in client.handlers:
discard handler(pubsubTopic, message.payload)
proc taskKeepAlive(client: WakuClient) {.async.} =
let peer = parsePeerInfo(StaticPeer).get()
while true:
@ -154,19 +163,20 @@ proc start*(client: WakuClient) {.async.} =
quit(1)
client.node.peerManager.start()
client.node.wakuFilterClient.registerPushHandler(messagePushHandler)
let topic = PubsubTopic(client.cfg.pubsubTopic)
let subscription: SubscriptionEvent = (kind: PubsubSub, topic:
client.cfg.pubsubTopic)
let res = subscribe(client.node, subscription, messagePushHandler)
let msg_handler = proc(pubsubTopic: PubsubTopic,
message: WakuMessage) {.async, gcsafe.} = discard client.messageHandler(
pubsubTopic, message)
let res = subscribe(client.node, subscription, msg_handler)
if res.isErr:
error "Subscribe failed", err = res.error
await allFutures(taskKeepAlive(client), taskPublishDemo(client))
proc initWakuClient*(cfg: WakuConfig): WakuClient =
result = WakuClient(cfg: cfg, node: buildWakuNode(cfg))
proc initWakuClient*(cfg: WakuConfig, handlers: seq[
PayloadHandler]): WakuClient =
result = WakuClient(cfg: cfg, node: buildWakuNode(cfg), handlers: handlers)