From a4c6caa62fc5bd5c9eb6d73facb6945062b56277 Mon Sep 17 00:00:00 2001 From: Dean Eigenmann <7621705+decanus@users.noreply.github.com> Date: Mon, 7 Sep 2020 13:26:32 +0200 Subject: [PATCH] refactor/rename-filter-subscription (#138) * fixes * added * added comment --- tests/v2/test_waku_filter.nim | 12 ++++---- tests/v2/test_waku_store.nim | 12 ++++---- waku/protocol/v2/filter.nim | 37 ----------------------- waku/protocol/v2/message_notifier.nim | 42 +++++++++++++++++++++++++++ waku/protocol/v2/waku_filter.nim | 6 ++-- waku/protocol/v2/waku_store.nim | 6 ++-- 6 files changed, 60 insertions(+), 55 deletions(-) delete mode 100644 waku/protocol/v2/filter.nim create mode 100644 waku/protocol/v2/message_notifier.nim diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 3a786e6dc..9c2d475aa 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -11,7 +11,7 @@ import libp2p/multistream, libp2p/transports/transport, libp2p/transports/tcptransport, - ../../waku/protocol/v2/[waku_filter, filter], + ../../waku/protocol/v2/[waku_filter, message_notifier], ../test_helpers, ./utils procSuite "Waku Filter": @@ -35,10 +35,10 @@ procSuite "Waku Filter": asyncTest "handle filter": let proto = WakuFilter.init() - filter = proto.filter() + subscription = proto.subscription() - var filters = initTable[string, Filter]() - filters["test"] = filter + var subscriptions = initTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription let peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) @@ -73,8 +73,8 @@ procSuite "Waku Filter": await sleepAsync(2.seconds) - filters.notify(msg) - filters.notify(msg2) + subscriptions.notify(msg) + subscriptions.notify(msg2) var message = await conn.readLp(64*1024) diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 37c02901d..ba7a0b1aa 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -11,7 +11,7 @@ import libp2p/multistream, libp2p/transports/transport, libp2p/transports/tcptransport, - ../../waku/protocol/v2/[waku_store, filter], + ../../waku/protocol/v2/[waku_store, message_notifier], ../test_helpers, ./utils procSuite "Waku Store": @@ -34,18 +34,18 @@ procSuite "Waku Store": asyncTest "handle query": let proto = WakuStore.init() - filter = proto.filter() + subscription = proto.subscription() - var filters = initTable[string, Filter]() - filters["test"] = filter + var subscriptions = initTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription let peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false) msg2 = Message.init(peer, @[byte 1, 2, 3], "topic2", 4, false) - filters.notify(msg) - filters.notify(msg2) + subscriptions.notify(msg) + subscriptions.notify(msg2) let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get() diff --git a/waku/protocol/v2/filter.nim b/waku/protocol/v2/filter.nim deleted file mode 100644 index 867afb948..000000000 --- a/waku/protocol/v2/filter.nim +++ /dev/null @@ -1,37 +0,0 @@ -import - std/tables, - libp2p/protocols/pubsub/rpc/messages - -type - - FilterMessageHandler* = proc(msg: Message) {.gcsafe, closure.} - - Filter* = object - topics: seq[string] # @TODO TOPIC - handler: FilterMessageHandler - - Filters* = Table[string, Filter] - -proc subscribe*(filters: var Filters, name: string, filter: Filter) = - filters.add(name, filter) - -proc init*(T: type Filter, topics: seq[string], handler: FilterMessageHandler): T = - result = T( - topics: topics, - handler: handler - ) - -proc containsMatch(lhs: seq[string], rhs: seq[string]): bool = - for leftItem in lhs: - if leftItem in rhs: - return true - - return false - -proc notify*(filters: var Filters, msg: Message) {.gcsafe.} = - for filter in filters.mvalues: - # @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES - if filter.topics.len > 0 and not filter.topics.containsMatch(msg.topicIDs): - continue - - filter.handler(msg) diff --git a/waku/protocol/v2/message_notifier.nim b/waku/protocol/v2/message_notifier.nim new file mode 100644 index 000000000..9df7345c1 --- /dev/null +++ b/waku/protocol/v2/message_notifier.nim @@ -0,0 +1,42 @@ +import + std/tables, + libp2p/protocols/pubsub/rpc/messages + +# The Message Notification system is a method to notify various protocols +# running on a node when a new message was received. +# +# Protocols can subscribe to messages of specific topics, then when one is received +# The notification handler function will be called. + +type + MessageNotificationHandler* = proc(msg: Message) {.gcsafe, closure.} + + MessageNotificationSubscription* = object + topics: seq[string] # @TODO TOPIC + handler: MessageNotificationHandler + + MessageNotificationSubscriptions* = Table[string, MessageNotificationSubscription] + +proc subscribe*(subscriptions: var MessageNotificationSubscriptions, name: string, subscription: MessageNotificationSubscription) = + subscriptions.add(name, subscription) + +proc init*(T: type MessageNotificationSubscription, topics: seq[string], handler: MessageNotificationHandler): T = + result = T( + topics: topics, + handler: handler + ) + +proc containsMatch(lhs: seq[string], rhs: seq[string]): bool = + for leftItem in lhs: + if leftItem in rhs: + return true + + return false + +proc notify*(subscriptions: var MessageNotificationSubscriptions, msg: Message) {.gcsafe.} = + for subscription in subscriptions.mvalues: + # @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES + if subscription.topics.len > 0 and not subscription.topics.containsMatch(msg.topicIDs): + continue + + subscription.handler(msg) diff --git a/waku/protocol/v2/waku_filter.nim b/waku/protocol/v2/waku_filter.nim index 632e8c463..a4af5c0ce 100644 --- a/waku/protocol/v2/waku_filter.nim +++ b/waku/protocol/v2/waku_filter.nim @@ -8,7 +8,7 @@ import libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, - ./filter + ./message_notifier # NOTE This is just a start, the design of this protocol isn't done yet. It # should be direct payload exchange (a la req-resp), not be coupled with the @@ -140,7 +140,7 @@ proc init*(T: type WakuFilter): T = ws.codec = WakuFilterCodec result = ws -proc filter*(proto: WakuFilter): Filter = +proc subscription*(proto: WakuFilter): MessageNotificationSubscription = ## Returns a Filter for the specific protocol ## This filter can then be used to send messages to subscribers that match conditions. proc handle(msg: Message) = @@ -151,4 +151,4 @@ proc filter*(proto: WakuFilter): Filter = discard subscriber.connection.writeLp(FilterRPC(messagePush: @[MessagePush(message: @[msg])]).encode().buffer) break - Filter.init(@[], handle) + MessageNotificationSubscription.init(@[], handle) diff --git a/waku/protocol/v2/waku_store.nim b/waku/protocol/v2/waku_store.nim index 66c3bff28..19287a29e 100644 --- a/waku/protocol/v2/waku_store.nim +++ b/waku/protocol/v2/waku_store.nim @@ -5,7 +5,7 @@ import libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, - ./filter + ./message_notifier const WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha2" @@ -126,7 +126,7 @@ proc init*(T: type WakuStore): T = ws.codec = WakuStoreCodec result = ws -proc filter*(proto: WakuStore): Filter = +proc subscription*(proto: WakuStore): MessageNotificationSubscription = ## The filter function returns the pubsub filter for the node. ## This is used to pipe messages into the storage, therefore ## the filter should be used by the component that receives @@ -134,4 +134,4 @@ proc filter*(proto: WakuStore): Filter = proc handle(msg: Message) = proto.messages.add(msg) - Filter.init(@[], handle) + MessageNotificationSubscription.init(@[], handle)