diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 54cb7316f..f52e06002 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -19,7 +19,6 @@ import filter_api, admin_api, private_api], - ../../waku/v2/protocol/message_notifier, ../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_store/[waku_store, waku_store_types], ../../waku/v2/protocol/waku_swap/waku_swap, @@ -226,8 +225,6 @@ procSuite "Waku v2 JSON-RPC API": peer = PeerInfo.init(key) node.mountStore(persistMessages = true) - let - subscription = node.wakuStore.subscription() var listenSwitch = newStandardSwitch(some(key)) discard waitFor listenSwitch.start() @@ -237,9 +234,6 @@ procSuite "Waku v2 JSON-RPC API": listenSwitch.mount(node.wakuRelay) listenSwitch.mount(node.wakuStore) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions[testCodec] = subscription - # Now prime it with some history before tests var msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: 0), @@ -254,7 +248,7 @@ procSuite "Waku v2 JSON-RPC API": WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: 9)] for wakuMsg in msgList: - waitFor subscriptions.notify(defaultTopic, wakuMsg) + waitFor node.wakuStore.handleMessage(defaultTopic, wakuMsg) let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort) diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 80f7cd8b7..ae7466d7e 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -9,7 +9,6 @@ import libp2p/crypto/crypto, libp2p/multistream, ../../waku/v2/node/peer_manager/peer_manager, - ../../waku/v2/protocol/message_notifier, ../../waku/v2/protocol/waku_filter/waku_filter, ../test_helpers, ./utils @@ -47,19 +46,15 @@ procSuite "Waku Filter": proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = discard - let - proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) - subscription = proto2.subscription() + let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription listenSwitch.mount(proto2) let id = (await proto.subscribe(rpc)).get() await sleepAsync(2.seconds) - await subscriptions.notify(defaultTopic, post) + await proto2.handleMessage(defaultTopic, post) check: (await responseRequestIdFuture) == id @@ -96,19 +91,15 @@ procSuite "Waku Filter": proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = discard - let - proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) - subscription = proto2.subscription() + let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription listenSwitch.mount(proto2) let id = (await proto.subscribe(rpc)).get() await sleepAsync(2.seconds) - await subscriptions.notify(defaultTopic, post) + await proto2.handleMessage(defaultTopic, post) check: # Check that subscription works as expected @@ -124,7 +115,7 @@ procSuite "Waku Filter": await sleepAsync(2.seconds) - await subscriptions.notify(defaultTopic, post) + await proto2.handleMessage(defaultTopic, post) check: # Check that unsubscribe works as expected diff --git a/tests/v2/test_waku_lightpush.nim b/tests/v2/test_waku_lightpush.nim index 2316b8bcf..e13f0c3b3 100644 --- a/tests/v2/test_waku_lightpush.nim +++ b/tests/v2/test_waku_lightpush.nim @@ -9,7 +9,6 @@ import libp2p/crypto/crypto, libp2p/multistream, ../../waku/v2/node/peer_manager/peer_manager, - ../../waku/v2/protocol/message_notifier, ../../waku/v2/protocol/waku_lightpush/waku_lightpush, ../test_helpers, ./utils diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 600933849..c3c8d0119 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -8,7 +8,7 @@ import libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, libp2p/protocols/pubsub/rpc/message, - ../../waku/v2/protocol/[waku_message, message_notifier], + ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/peer_manager/peer_manager, @@ -33,18 +33,14 @@ procSuite "Waku Store": let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) - subscription = proto.subscription() rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) proto.setPeer(listenSwitch.peerInfo) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription - listenSwitch.mount(proto) - await subscriptions.notify("foo", msg) - await subscriptions.notify("foo", msg2) + await proto.handleMessage("foo", msg) + await proto.handleMessage("foo", msg2) var completionFut = newFuture[bool]() @@ -77,19 +73,15 @@ procSuite "Waku Store": let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) - subscription = proto.subscription() rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3)]) proto.setPeer(listenSwitch.peerInfo) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription - listenSwitch.mount(proto) - await subscriptions.notify("foo", msg1) - await subscriptions.notify("foo", msg2) - await subscriptions.notify("foo", msg3) + await proto.handleMessage("foo", msg1) + await proto.handleMessage("foo", msg2) + await proto.handleMessage("foo", msg3) var completionFut = newFuture[bool]() @@ -126,21 +118,17 @@ procSuite "Waku Store": proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) pubsubtopic1 = "queried topic" pubsubtopic2 = "non queried topic" - subscription: MessageNotificationSubscription = proto.subscription() # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1) proto.setPeer(listenSwitch.peerInfo) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription - listenSwitch.mount(proto) # publish messages - await subscriptions.notify(pubsubtopic1, msg1) - await subscriptions.notify(pubsubtopic2, msg2) - await subscriptions.notify(pubsubtopic2, msg3) + await proto.handleMessage(pubsubtopic1, msg1) + await proto.handleMessage(pubsubtopic2, msg2) + await proto.handleMessage(pubsubtopic2, msg3) var completionFut = newFuture[bool]() @@ -174,21 +162,17 @@ procSuite "Waku Store": proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) pubsubtopic1 = "queried topic" pubsubtopic2 = "non queried topic" - subscription: MessageNotificationSubscription = proto.subscription() # this query targets: pubsubtopic1 rpc = HistoryQuery(pubsubTopic: pubsubTopic1) proto.setPeer(listenSwitch.peerInfo) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription - listenSwitch.mount(proto) # publish messages - await subscriptions.notify(pubsubtopic2, msg1) - await subscriptions.notify(pubsubtopic2, msg2) - await subscriptions.notify(pubsubtopic2, msg3) + await proto.handleMessage(pubsubtopic2, msg1) + await proto.handleMessage(pubsubtopic2, msg2) + await proto.handleMessage(pubsubtopic2, msg3) var completionFut = newFuture[bool]() @@ -218,21 +202,17 @@ procSuite "Waku Store": let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) pubsubtopic = "queried topic" - subscription: MessageNotificationSubscription = proto.subscription() # this query targets: pubsubtopic rpc = HistoryQuery(pubsubTopic: pubsubtopic) proto.setPeer(listenSwitch.peerInfo) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription - listenSwitch.mount(proto) # publish messages - await subscriptions.notify(pubsubtopic, msg1) - await subscriptions.notify(pubsubtopic, msg2) - await subscriptions.notify(pubsubtopic, msg3) + await proto.handleMessage(pubsubtopic, msg1) + await proto.handleMessage(pubsubtopic, msg2) + await proto.handleMessage(pubsubtopic, msg3) var completionFut = newFuture[bool]() @@ -267,19 +247,15 @@ procSuite "Waku Store": let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) - subscription = proto.subscription() rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) proto.setPeer(listenSwitch.peerInfo) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription - listenSwitch.mount(proto) - await subscriptions.notify("foo", msg) + await proto.handleMessage("foo", msg) await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically - await subscriptions.notify("foo", msg2) + await proto.handleMessage("foo", msg2) var completionFut = newFuture[bool]() @@ -341,18 +317,14 @@ procSuite "Waku Store": let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) - subscription = proto.subscription() rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) ) proto.setPeer(listenSwitch.peerInfo) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription - listenSwitch.mount(proto) for wakuMsg in msgList: - await subscriptions.notify("foo", wakuMsg) + await proto.handleMessage("foo", wakuMsg) await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically var completionFut = newFuture[bool]() @@ -392,18 +364,14 @@ procSuite "Waku Store": var listenSwitch = newStandardSwitch(some(key)) discard await listenSwitch.start() - let - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) - subscription = proto.subscription() - proto.setPeer(listenSwitch.peerInfo) + let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription + proto.setPeer(listenSwitch.peerInfo) listenSwitch.mount(proto) for wakuMsg in msgList: - await subscriptions.notify("foo", wakuMsg) + await proto.handleMessage("foo", wakuMsg) await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically var completionFut = newFuture[bool]() @@ -443,18 +411,14 @@ procSuite "Waku Store": var listenSwitch = newStandardSwitch(some(key)) discard await listenSwitch.start() - let - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) - subscription = proto.subscription() - proto.setPeer(listenSwitch.peerInfo) + let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription + proto.setPeer(listenSwitch.peerInfo) listenSwitch.mount(proto) for wakuMsg in msgList: - await subscriptions.notify("foo", wakuMsg) + await proto.handleMessage("foo", wakuMsg) await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically var completionFut = newFuture[bool]() @@ -585,19 +549,15 @@ procSuite "Waku Store": var listenSwitch = newStandardSwitch(some(key)) discard await listenSwitch.start() - let - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) - subscription = proto.subscription() - proto.setPeer(listenSwitch.peerInfo) + let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription + proto.setPeer(listenSwitch.peerInfo) listenSwitch.mount(proto) for wakuMsg in msgList: # the pubsub topic should be DefaultTopic - await subscriptions.notify(DefaultTopic, wakuMsg) + await proto.handleMessage(DefaultTopic, wakuMsg) asyncTest "handle temporal history query with a valid time window": var completionFut = newFuture[bool]() diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index ad322394d..9bf8dca42 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -10,7 +10,7 @@ import libp2p/crypto/[crypto, secp], libp2p/switch, eth/keys, - ../../waku/v2/protocol/[waku_message, message_notifier], + ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/node/wakunode2, @@ -67,7 +67,7 @@ procSuite "Waku SWAP Accounting": node2.mountSwap() node2.mountStore(persistMessages = true) - await node2.subscriptions.notify("/waku/2/default-waku/proto", message) + await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) await sleepAsync(2000.millis) @@ -117,7 +117,7 @@ procSuite "Waku SWAP Accounting": node2.mountSwap(swapConfig) node2.mountStore(persistMessages = true) - await node2.subscriptions.notify("/waku/2/default-waku/proto", message) + await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) await sleepAsync(2000.millis) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 1c9d3d600..1b41fe8d9 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -14,7 +14,7 @@ import eth/keys, ../../waku/v2/node/storage/sqlite, ../../waku/v2/node/storage/message/waku_message_store, - ../../waku/v2/protocol/[waku_relay, waku_message, message_notifier], + ../../waku/v2/protocol/[waku_relay, waku_message], ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/protocol/waku_lightpush/waku_lightpush, @@ -279,7 +279,7 @@ procSuite "WakuNode": await node2.start() node2.mountStore(persistMessages = true) - await node2.subscriptions.notify("/waku/2/default-waku/proto", message) + await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) await sleepAsync(2000.millis) @@ -290,10 +290,8 @@ procSuite "WakuNode": response.messages[0] == message completionFut.complete(true) - await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler) - check: (await completionFut.withTimeout(5.seconds)) == true await node1.stop() @@ -328,7 +326,7 @@ procSuite "WakuNode": await sleepAsync(2000.millis) - await node2.subscriptions.notify("/waku/2/default-waku/proto", message) + await node2.wakuFilter.handleMessage("/waku/2/default-waku/proto", message) await sleepAsync(2000.millis) @@ -762,7 +760,7 @@ procSuite "WakuNode": await node2.start() node2.mountStore(persistMessages = true) - await node2.subscriptions.notify("/waku/2/default-waku/proto", message) + await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) await sleepAsync(2000.millis) @@ -802,8 +800,8 @@ procSuite "WakuNode": await node2.start() node2.mountStore(persistMessages = true) - await node2.subscriptions.notify(DefaultTopic, msg1) - await node2.subscriptions.notify(DefaultTopic, msg2) + await node2.wakuStore.handleMessage(DefaultTopic, msg1) + await node2.wakuStore.handleMessage(DefaultTopic, msg2) await sleepAsync(2000.millis) diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 9ab8f1ea8..8a407c480 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -15,7 +15,7 @@ import libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/gossipsub, libp2p/builders, - ../protocol/[waku_relay, waku_message, message_notifier], + ../protocol/[waku_relay, waku_message], ../protocol/waku_store/waku_store, ../protocol/waku_swap/waku_swap, ../protocol/waku_filter/waku_filter, @@ -75,7 +75,6 @@ type # TODO Revist messages field indexing as well as if this should be Message or WakuMessage messages*: seq[(Topic, WakuMessage)] filters*: Filters - subscriptions*: MessageNotificationSubscriptions rng*: ref BrHmacDrbgContext started*: bool # Indicates that node has started listening @@ -161,7 +160,6 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, switch: switch, rng: rng, peerInfo: peerInfo, - subscriptions: newTable[string, MessageNotificationSubscription](), filters: initTable[string, Filter]() ) @@ -206,7 +204,13 @@ proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) = let msg = WakuMessage.init(data) if msg.isOk(): - await node.subscriptions.notify(topic, msg.value()) # Trigger subscription handlers on a store/filter node + # Notify mounted protocols of new message + if (not node.wakuFilter.isNil): + await node.wakuFilter.handleMessage(topic, msg.value()) + + if (not node.wakuStore.isNil): + await node.wakuStore.handleMessage(topic, msg.value()) + waku_node_messages.inc(labelValues = ["relay"]) let wakuRelay = node.wakuRelay @@ -399,7 +403,6 @@ proc mountFilter*(node: WakuNode) = node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler) node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec)) - node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription()) # NOTE: If using the swap protocol, it must be mounted before store. This is # because store is using a reference to the swap protocol. @@ -415,14 +418,12 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: boo if node.wakuSwap.isNil: debug "mounting store without swap" - node.wakuStore = WakuStore.init(node.peerManager, node.rng, store) + node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, persistMessages=persistMessages) else: debug "mounting store with swap" - node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap) + node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap, persistMessages=persistMessages) node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec)) - if persistMessages: - node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) when defined(rln): proc mountRlnRelay*(node: WakuNode, ethClientAddress: Option[string] = none(string), ethAccountAddress: Option[Address] = none(Address), membershipContractAddress: Option[Address] = none(Address)) {.async.} = diff --git a/waku/v2/protocol/message_notifier.nim b/waku/v2/protocol/message_notifier.nim deleted file mode 100644 index 14b9f8fb9..000000000 --- a/waku/v2/protocol/message_notifier.nim +++ /dev/null @@ -1,65 +0,0 @@ -import - std/tables, - chronos, - waku_message - -## The Message Notification system is a method to notify various protocols -## running on a node when a new message was received. -# -## Protocols can subscribe to messages of specific topics, then when one is received -## The notification handler function will be called. -## -## This works as follows: -## -## .. code-block:: -## var topic = "foo" -## -## proc handle(topic: string, msg: WakuMessage) {.async.} = -## info "new message", msg = msg -## -## MessageNotificationSubscription.init(@[topic], handle) -## -## var subscriptions = newTable[string, MessageNotificationSubscription]() -## subscriptions["identifier"] = subscription -## -## await subscriptions.notify(topic, WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(1))) - -type - MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[ - void] {.gcsafe, closure.} - - MessageNotificationSubscriptionIdentifier* = string - - MessageNotificationSubscription* = object - topics*: seq[string] # @TODO TOPIC - handler*: MessageNotificationHandler - - MessageNotificationSubscriptions* = TableRef[MessageNotificationSubscriptionIdentifier, MessageNotificationSubscription] - -proc subscribe*(subscriptions: MessageNotificationSubscriptions, name: string, subscription: MessageNotificationSubscription) = - subscriptions.add(name, subscription) - -proc init*(T: type MessageNotificationSubscription, topics: seq[string], handler: MessageNotificationHandler): T = - result = T( - topics: topics, - handler: handler - ) - -proc containsMatch(lhs: seq[string], rhs: seq[string]): bool = - for leftItem in lhs: - if leftItem in rhs: - return true - - return false - -proc notify*(subscriptions: MessageNotificationSubscriptions, topic: string, msg: WakuMessage) {.async, gcsafe.} = - var futures = newSeq[Future[void]]() - - for subscription in subscriptions.mvalues: - # @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES - if subscription.topics.len > 0 and topic notin subscription.topics: - continue - - futures.add(subscription.handler(topic, msg)) - - await allFutures(futures) diff --git a/waku/v2/protocol/waku_filter/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim index 9cad753fc..122e6c314 100644 --- a/waku/v2/protocol/waku_filter/waku_filter.nim +++ b/waku/v2/protocol/waku_filter/waku_filter.nim @@ -9,7 +9,6 @@ import libp2p/protobuf/minprotobuf, libp2p/stream/connection, libp2p/crypto/crypto, - ../message_notifier, waku_filter_types, ../../utils/requests, ../../node/peer_manager/peer_manager @@ -191,33 +190,30 @@ proc setPeer*(wf: WakuFilter, peer: PeerInfo) = wf.peerManager.addPeer(peer, WakuFilterCodec) waku_filter_peers.inc() -proc subscription*(proto: WakuFilter): MessageNotificationSubscription = - ## Returns a Filter for the specific protocol - ## This filter can then be used to send messages to subscribers that match conditions. - proc handle(topic: string, msg: WakuMessage) {.async.} = - trace "handle WakuFilter subscription", topic=topic, msg=msg - for subscriber in proto.subscribers: - if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic: - trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic - continue +proc handleMessage*(wf: WakuFilter, topic: string, msg: WakuMessage) {.async.} = + # Handle WakuMessage according to filter protocol + trace "handle message in WakuFilter", topic=topic, msg=msg - for filter in subscriber.filter.contentFilters: - if msg.contentTopic == filter.contentTopic: - trace "Found matching contentTopic", filter=filter, msg=msg - let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) - - let connOpt = await proto.peerManager.dialPeer(subscriber.peer, WakuFilterCodec) + for subscriber in wf.subscribers: + if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic: + trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic + continue - if connOpt.isSome: - await connOpt.get().writeLP(push.encode().buffer) - else: - # @TODO more sophisticated error handling here - error "failed to push messages to remote peer" - waku_filter_errors.inc(labelValues = [dialFailure]) - break + for filter in subscriber.filter.contentFilters: + if msg.contentTopic == filter.contentTopic: + trace "Found matching contentTopic", filter=filter, msg=msg + let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) + + let connOpt = await wf.peerManager.dialPeer(subscriber.peer, WakuFilterCodec) - MessageNotificationSubscription.init(@[], handle) + if connOpt.isSome: + await connOpt.get().writeLP(push.encode().buffer) + else: + # @TODO more sophisticated error handling here + error "failed to push messages to remote peer" + waku_filter_errors.inc(labelValues = [dialFailure]) + break proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} = let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) diff --git a/waku/v2/protocol/waku_lightpush/waku_lightpush.nim b/waku/v2/protocol/waku_lightpush/waku_lightpush.nim index 3cb5716b3..8837f8987 100644 --- a/waku/v2/protocol/waku_lightpush/waku_lightpush.nim +++ b/waku/v2/protocol/waku_lightpush/waku_lightpush.nim @@ -9,7 +9,6 @@ import libp2p/protobuf/minprotobuf, libp2p/stream/connection, libp2p/crypto/crypto, - ../message_notifier, waku_lightpush_types, ../../utils/requests, ../../node/peer_manager/peer_manager, diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 68afaad95..28e705fb7 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -12,7 +12,6 @@ import libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, - ../message_notifier, ../../node/storage/message/message_store, ../waku_swap/waku_swap, ./waku_store_types, @@ -397,13 +396,14 @@ proc init*(ws: WakuStore) {.raises: [Defect, Exception]} = proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext, - store: MessageStore = nil, wakuSwap: WakuSwap = nil): T {.raises: [Defect, Exception]} = + store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true): T {.raises: [Defect, Exception]} = debug "init" new result result.rng = rng result.peerManager = peerManager result.store = store result.wakuSwap = wakuSwap + result.persistMessages = persistMessages result.init() # @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY @@ -411,25 +411,24 @@ proc setPeer*(ws: WakuStore, peer: PeerInfo) {.raises: [Defect, Exception]} = ws.peerManager.addPeer(peer, WakuStoreCodec) waku_store_peers.inc() -proc subscription*(proto: WakuStore): MessageNotificationSubscription = - ## The filter function returns the pubsub filter for the node. - ## This is used to pipe messages into the storage, therefore - ## the filter should be used by the component that receives - ## new messages. - proc handle(topic: string, msg: WakuMessage) {.async.} = - debug "subscription handle", topic=topic - let index = msg.computeIndex() - proto.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) - waku_store_messages.inc(labelValues = ["stored"]) - if proto.store.isNil: - return - - let res = proto.store.put(index, msg, topic) - if res.isErr: - warn "failed to store messages", err = res.error - waku_store_errors.inc(labelValues = ["store_failure"]) +proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = + if (not w.persistMessages): + # Store is mounted but new messages should not be stored + return - result = MessageNotificationSubscription.init(@[], handle) + # Handle WakuMessage according to store protocol + trace "handle message in WakuStore", topic=topic, msg=msg + + let index = msg.computeIndex() + w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) + waku_store_messages.inc(labelValues = ["stored"]) + if w.store.isNil: + return + + let res = w.store.put(index, msg, topic) + if res.isErr: + warn "failed to store messages", err = res.error + waku_store_errors.inc(labelValues = ["store_failure"]) proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 743ff4965..f73005a41 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -70,3 +70,4 @@ type messages*: seq[IndexedWakuMessage] store*: MessageStore wakuSwap*: WakuSwap + persistMessages*: bool diff --git a/waku/v2/protocol/waku_swap/waku_swap.nim b/waku/v2/protocol/waku_swap/waku_swap.nim index e1fec7db9..8f20222e7 100644 --- a/waku/v2/protocol/waku_swap/waku_swap.nim +++ b/waku/v2/protocol/waku_swap/waku_swap.nim @@ -30,7 +30,6 @@ import libp2p/protobuf/minprotobuf, libp2p/stream/connection, ../../node/peer_manager/peer_manager, - ../message_notifier, ./waku_swap_types, ../../waku/v2/protocol/waku_swap/waku_swap_contracts