refactor/rename-filter-subscription (#138)

* fixes

* added

* added comment
This commit is contained in:
Dean Eigenmann 2020-09-07 13:26:32 +02:00 committed by GitHub
parent 5b4e429a19
commit a4c6caa62f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 60 additions and 55 deletions

View File

@ -11,7 +11,7 @@ import
libp2p/multistream,
libp2p/transports/transport,
libp2p/transports/tcptransport,
../../waku/protocol/v2/[waku_filter, filter],
../../waku/protocol/v2/[waku_filter, message_notifier],
../test_helpers, ./utils
procSuite "Waku Filter":
@ -35,10 +35,10 @@ procSuite "Waku Filter":
asyncTest "handle filter":
let
proto = WakuFilter.init()
filter = proto.filter()
subscription = proto.subscription()
var filters = initTable[string, Filter]()
filters["test"] = filter
var subscriptions = initTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
let
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
@ -73,8 +73,8 @@ procSuite "Waku Filter":
await sleepAsync(2.seconds)
filters.notify(msg)
filters.notify(msg2)
subscriptions.notify(msg)
subscriptions.notify(msg2)
var message = await conn.readLp(64*1024)

View File

@ -11,7 +11,7 @@ import
libp2p/multistream,
libp2p/transports/transport,
libp2p/transports/tcptransport,
../../waku/protocol/v2/[waku_store, filter],
../../waku/protocol/v2/[waku_store, message_notifier],
../test_helpers, ./utils
procSuite "Waku Store":
@ -34,18 +34,18 @@ procSuite "Waku Store":
asyncTest "handle query":
let
proto = WakuStore.init()
filter = proto.filter()
subscription = proto.subscription()
var filters = initTable[string, Filter]()
filters["test"] = filter
var subscriptions = initTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
let
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false)
msg2 = Message.init(peer, @[byte 1, 2, 3], "topic2", 4, false)
filters.notify(msg)
filters.notify(msg2)
subscriptions.notify(msg)
subscriptions.notify(msg2)
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get()

View File

@ -1,37 +0,0 @@
import
std/tables,
libp2p/protocols/pubsub/rpc/messages
type
FilterMessageHandler* = proc(msg: Message) {.gcsafe, closure.}
Filter* = object
topics: seq[string] # @TODO TOPIC
handler: FilterMessageHandler
Filters* = Table[string, Filter]
proc subscribe*(filters: var Filters, name: string, filter: Filter) =
filters.add(name, filter)
proc init*(T: type Filter, topics: seq[string], handler: FilterMessageHandler): T =
result = T(
topics: topics,
handler: handler
)
proc containsMatch(lhs: seq[string], rhs: seq[string]): bool =
for leftItem in lhs:
if leftItem in rhs:
return true
return false
proc notify*(filters: var Filters, msg: Message) {.gcsafe.} =
for filter in filters.mvalues:
# @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES
if filter.topics.len > 0 and not filter.topics.containsMatch(msg.topicIDs):
continue
filter.handler(msg)

View File

@ -0,0 +1,42 @@
import
std/tables,
libp2p/protocols/pubsub/rpc/messages
# The Message Notification system is a method to notify various protocols
# running on a node when a new message was received.
#
# Protocols can subscribe to messages of specific topics, then when one is received
# The notification handler function will be called.
type
MessageNotificationHandler* = proc(msg: Message) {.gcsafe, closure.}
MessageNotificationSubscription* = object
topics: seq[string] # @TODO TOPIC
handler: MessageNotificationHandler
MessageNotificationSubscriptions* = Table[string, MessageNotificationSubscription]
proc subscribe*(subscriptions: var MessageNotificationSubscriptions, name: string, subscription: MessageNotificationSubscription) =
subscriptions.add(name, subscription)
proc init*(T: type MessageNotificationSubscription, topics: seq[string], handler: MessageNotificationHandler): T =
result = T(
topics: topics,
handler: handler
)
proc containsMatch(lhs: seq[string], rhs: seq[string]): bool =
for leftItem in lhs:
if leftItem in rhs:
return true
return false
proc notify*(subscriptions: var MessageNotificationSubscriptions, msg: Message) {.gcsafe.} =
for subscription in subscriptions.mvalues:
# @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES
if subscription.topics.len > 0 and not subscription.topics.containsMatch(msg.topicIDs):
continue
subscription.handler(msg)

View File

@ -8,7 +8,7 @@ import
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
./filter
./message_notifier
# NOTE This is just a start, the design of this protocol isn't done yet. It
# should be direct payload exchange (a la req-resp), not be coupled with the
@ -140,7 +140,7 @@ proc init*(T: type WakuFilter): T =
ws.codec = WakuFilterCodec
result = ws
proc filter*(proto: WakuFilter): Filter =
proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
## Returns a Filter for the specific protocol
## This filter can then be used to send messages to subscribers that match conditions.
proc handle(msg: Message) =
@ -151,4 +151,4 @@ proc filter*(proto: WakuFilter): Filter =
discard subscriber.connection.writeLp(FilterRPC(messagePush: @[MessagePush(message: @[msg])]).encode().buffer)
break
Filter.init(@[], handle)
MessageNotificationSubscription.init(@[], handle)

View File

@ -5,7 +5,7 @@ import
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
./filter
./message_notifier
const
WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha2"
@ -126,7 +126,7 @@ proc init*(T: type WakuStore): T =
ws.codec = WakuStoreCodec
result = ws
proc filter*(proto: WakuStore): Filter =
proc subscription*(proto: WakuStore): MessageNotificationSubscription =
## The filter function returns the pubsub filter for the node.
## This is used to pipe messages into the storage, therefore
## the filter should be used by the component that receives
@ -134,4 +134,4 @@ proc filter*(proto: WakuStore): Filter =
proc handle(msg: Message) =
proto.messages.add(msg)
Filter.init(@[], handle)
MessageNotificationSubscription.init(@[], handle)