diff --git a/src/nim_chat_poc.nim b/src/nim_chat_poc.nim index 8a0abf5..e40937d 100644 --- a/src/nim_chat_poc.nim +++ b/src/nim_chat_poc.nim @@ -1,13 +1,19 @@ import + chronicles, chronos import waku_client + +proc handleMessages(pubsubTopic: string, message: seq[byte]): Future[ + void] {.gcsafe, raises: [Defect].} = + info "ClientRecv", pubTopic = pubsubTopic, msg = message + proc main(): Future[void] {.async.} = echo "Starting POC" let cfg = DefaultConfig() - let client = initWakuClient(cfg) + let client = initWakuClient(cfg, @[PayloadHandler(handleMessages)]) await client.start() echo "End of POC" diff --git a/src/waku_client.nim b/src/waku_client.nim index 1c62e71..81c27b7 100644 --- a/src/waku_client.nim +++ b/src/waku_client.nim @@ -27,6 +27,10 @@ const FilterContentTopic = ContentTopic("/chatsdk/test/proto") +type PayloadHandler* = proc(pubsubTopic: string, message: seq[byte]): Future[void] {. + gcsafe, raises: [Defect] + .} + type WakuConfig* = object port*: uint16 clusterId*: uint16 @@ -37,6 +41,7 @@ type WakuClient* = ref object cfg: WakuConfig node*: WakuNode + handlers: seq[PayloadHandler] proc DefaultConfig*(): WakuConfig = @@ -102,8 +107,8 @@ proc buildWakuNode(cfg: WakuConfig): WakuNode = result = node -proc messagePushHandler( - pubsubTopic: PubsubTopic, message: WakuMessage +proc messageHandler(client: WakuClient, pubsubTopic: PubsubTopic, + message: WakuMessage ) {.async, gcsafe.} = let payloadStr = string.fromBytes(message.payload) notice "message received", @@ -113,6 +118,10 @@ proc messagePushHandler( timestamp = message.timestamp + for handler in client.handlers: + discard handler(pubsubTopic, message.payload) + + proc taskKeepAlive(client: WakuClient) {.async.} = let peer = parsePeerInfo(StaticPeer).get() while true: @@ -154,19 +163,20 @@ proc start*(client: WakuClient) {.async.} = quit(1) client.node.peerManager.start() - client.node.wakuFilterClient.registerPushHandler(messagePushHandler) - - let topic = PubsubTopic(client.cfg.pubsubTopic) let subscription: SubscriptionEvent = (kind: PubsubSub, topic: client.cfg.pubsubTopic) - let res = subscribe(client.node, subscription, messagePushHandler) + let msg_handler = proc(pubsubTopic: PubsubTopic, + message: WakuMessage) {.async, gcsafe.} = discard client.messageHandler( + pubsubTopic, message) + + let res = subscribe(client.node, subscription, msg_handler) if res.isErr: error "Subscribe failed", err = res.error await allFutures(taskKeepAlive(client), taskPublishDemo(client)) - -proc initWakuClient*(cfg: WakuConfig): WakuClient = - result = WakuClient(cfg: cfg, node: buildWakuNode(cfg)) +proc initWakuClient*(cfg: WakuConfig, handlers: seq[ + PayloadHandler]): WakuClient = + result = WakuClient(cfg: cfg, node: buildWakuNode(cfg), handlers: handlers)