diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index fba109045..bf93a8a81 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -29,6 +29,18 @@ type Filters* = Table[string, Filter] + WakuMessage* = object + payload*: seq[byte] + contentTopic*: string + + MessageNotificationHandler* = proc(topic: string, msg: WakuMessage) {.gcsafe, closure.} + + MessageNotificationSubscriptions* = Table[string, MessageNotificationSubscription] + + MessageNotificationSubscription* = object + topics*: seq[string] # @TODO TOPIC + handler*: MessageNotificationHandler + # NOTE based on Eth2Node in NBC eth2_network.nim WakuNode* = ref object of RootObj switch*: Switch @@ -38,10 +50,7 @@ type # TODO Revist messages field indexing as well as if this should be Message or WakuMessage messages*: seq[(Topic, WakuMessage)] filters*: Filters - - WakuMessage* = object - payload*: seq[byte] - contentTopic*: string + subscriptions*: MessageNotificationSubscriptions WakuRelay* = ref object of GossipSub gossipEnabled*: bool diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 5eb220ed8..6d3d62752 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -10,7 +10,7 @@ import libp2p/protocols/pubsub/pubsub, libp2p/peerinfo, libp2p/standard_setup, - ../../protocol/v2/[waku_relay, waku_store, waku_filter], + ../../protocol/v2/[waku_relay, waku_store, waku_filter, message_notifier], ./waku_types logScope: @@ -105,9 +105,18 @@ proc start*(node: WakuNode) {.async.} = # NOTE WakuRelay is being instantiated as part of initing node let storeProto = WakuStore.init() node.switch.mount(storeProto) + node.subscriptions.subscribe(WakuStoreCodec, storeProto.subscription()) let filterProto = WakuFilter.init() node.switch.mount(filterProto) + node.subscriptions.subscribe(WakuFilterCodec, filterProto.subscription()) + + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + node.subscriptions.notify(topic, msg.value()) + + await node.wakuRelay.subscribe("waku", relayHandler) # TODO Get this from WakuNode obj let peerInfo = node.peerInfo diff --git a/waku/protocol/v2/message_notifier.nim b/waku/protocol/v2/message_notifier.nim index c4bec6157..983a08f75 100644 --- a/waku/protocol/v2/message_notifier.nim +++ b/waku/protocol/v2/message_notifier.nim @@ -7,16 +7,6 @@ import # # Protocols can subscribe to messages of specific topics, then when one is received # The notification handler function will be called. - -type - MessageNotificationHandler* = proc(topic: string, msg: WakuMessage) {.gcsafe, closure.} - - MessageNotificationSubscription* = object - topics: seq[string] # @TODO TOPIC - handler: MessageNotificationHandler - - MessageNotificationSubscriptions* = Table[string, MessageNotificationSubscription] - proc subscribe*(subscriptions: var MessageNotificationSubscriptions, name: string, subscription: MessageNotificationSubscription) = subscriptions.add(name, subscription)