From dfee0359af8f7ae57ae3fc618564c6ab35dadd77 Mon Sep 17 00:00:00 2001 From: Dean Eigenmann <7621705+decanus@users.noreply.github.com> Date: Wed, 16 Sep 2020 04:59:10 +0200 Subject: [PATCH] fix/message-notifier-message-to-waku-message (#159) * update message type * cleanup --- tests/v2/test_waku_filter.nim | 12 ++++++------ tests/v2/test_waku_store.nim | 9 +++++---- waku/protocol/v2/message_notifier.nim | 10 +++++----- waku/protocol/v2/waku_filter.nim | 23 +++++++++++++---------- waku/protocol/v2/waku_store.nim | 24 +++++++++++------------- 5 files changed, 40 insertions(+), 38 deletions(-) diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 5702f40bb..648ae11cd 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -7,11 +7,11 @@ import libp2p/protobuf/minprotobuf, libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, - libp2p/protocols/pubsub/rpc/[message, messages, protobuf], libp2p/multistream, libp2p/transports/transport, libp2p/transports/tcptransport, ../../waku/protocol/v2/[waku_relay, waku_filter, message_notifier], + ../../waku/node/v2/waku_types, ../test_helpers, ./utils procSuite "Waku Filter": @@ -26,8 +26,8 @@ procSuite "Waku Filter": let peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) - msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false) - msg2 = Message.init(peer, @[byte 1, 2, 3], "topic2", 4, false) + msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "pew") + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "pew2") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get() @@ -51,14 +51,14 @@ procSuite "Waku Filter": let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) - var rpc = FilterRequest(contentFilter: @[ContentFilter(topics: @[])], topic: "topic") + var rpc = FilterRequest(contentFilter: @[waku_filter.ContentFilter(topics: @["pew", "pew2"])], topic: "topic") discard await msDial.select(conn, WakuFilterCodec) await conn.writeLP(rpc.encode().buffer) await sleepAsync(2.seconds) - subscriptions.notify(msg) - subscriptions.notify(msg2) + subscriptions.notify("topic", msg) + subscriptions.notify("topic", msg2) var message = await conn.readLp(64*1024) diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 375baeb83..a7c30ebb2 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -12,6 +12,7 @@ import libp2p/transports/transport, libp2p/transports/tcptransport, ../../waku/protocol/v2/[waku_store, message_notifier], + ../../waku/node/v2/waku_types, ../test_helpers, ./utils procSuite "Waku Store": @@ -25,11 +26,11 @@ procSuite "Waku Store": let peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) - msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false) - msg2 = Message.init(peer, @[byte 1, 2, 3], "topic2", 4, false) + msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic") + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic2") - subscriptions.notify(msg) - subscriptions.notify(msg2) + subscriptions.notify("foo", msg) + subscriptions.notify("foo", msg2) let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get() diff --git a/waku/protocol/v2/message_notifier.nim b/waku/protocol/v2/message_notifier.nim index 9df7345c1..c4bec6157 100644 --- a/waku/protocol/v2/message_notifier.nim +++ b/waku/protocol/v2/message_notifier.nim @@ -1,6 +1,6 @@ import std/tables, - libp2p/protocols/pubsub/rpc/messages + ./../../node/v2/waku_types # The Message Notification system is a method to notify various protocols # running on a node when a new message was received. @@ -9,7 +9,7 @@ import # The notification handler function will be called. type - MessageNotificationHandler* = proc(msg: Message) {.gcsafe, closure.} + MessageNotificationHandler* = proc(topic: string, msg: WakuMessage) {.gcsafe, closure.} MessageNotificationSubscription* = object topics: seq[string] # @TODO TOPIC @@ -33,10 +33,10 @@ proc containsMatch(lhs: seq[string], rhs: seq[string]): bool = return false -proc notify*(subscriptions: var MessageNotificationSubscriptions, msg: Message) {.gcsafe.} = +proc notify*(subscriptions: var MessageNotificationSubscriptions, topic: string, msg: WakuMessage) {.gcsafe.} = for subscription in subscriptions.mvalues: # @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES - if subscription.topics.len > 0 and not subscription.topics.containsMatch(msg.topicIDs): + if subscription.topics.len > 0 and topic notin subscription.topics: continue - subscription.handler(msg) + subscription.handler(topic, msg) diff --git a/waku/protocol/v2/waku_filter.nim b/waku/protocol/v2/waku_filter.nim index 5cfc818ab..781e07078 100644 --- a/waku/protocol/v2/waku_filter.nim +++ b/waku/protocol/v2/waku_filter.nim @@ -4,18 +4,18 @@ import libp2p/protocols/pubsub/pubsubpeer, libp2p/protocols/pubsub/floodsub, libp2p/protocols/pubsub/gossipsub, - libp2p/protocols/pubsub/rpc/[messages, protobuf], libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, - ./message_notifier + ./message_notifier, + ./../../node/v2/waku_types # NOTE This is just a start, the design of this protocol isn't done yet. It # should be direct payload exchange (a la req-resp), not be coupled with the # relay protocol. const - WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha4" + WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha5" type ContentFilter* = object @@ -26,7 +26,7 @@ type topic*: string MessagePush* = object - messages*: seq[Message] + messages*: seq[WakuMessage] Subscriber = object connection: Connection @@ -75,7 +75,7 @@ proc encode*(push: MessagePush): ProtoBuffer = result = initProtoBuffer() for push in push.messages: - result.write(1, push.encodeMessage()) + result.write(1, push.encode()) proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = var push = MessagePush() @@ -85,7 +85,7 @@ proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = discard ? pb.getRepeatedField(1, messages) for buf in messages: - push.messages.add(? protobuf.decodeMessage(initProtoBuffer(buf))) + push.messages.add(? WakuMessage.init(buf)) ok(push) @@ -111,11 +111,14 @@ proc init*(T: type WakuFilter): T = proc subscription*(proto: WakuFilter): MessageNotificationSubscription = ## Returns a Filter for the specific protocol - ## This filter can then be used to send messages to subscribers that match conditions. - proc handle(msg: Message) = + ## This filter can then be used to send messages to subscribers that match conditions. + proc handle(topic: string, msg: WakuMessage) = for subscriber in proto.subscribers: - if subscriber.filter.topic in msg.topicIDs: - # @TODO PROBABLY WANT TO BATCH MESSAGES + if subscriber.filter.topic != topic: + continue + + for filter in subscriber.filter.contentFilter: + if msg.contentTopic in filter.topics: discard subscriber.connection.writeLp(MessagePush(messages: @[msg]).encode().buffer) break diff --git a/waku/protocol/v2/waku_store.nim b/waku/protocol/v2/waku_store.nim index 2845f832d..a57f71481 100644 --- a/waku/protocol/v2/waku_store.nim +++ b/waku/protocol/v2/waku_store.nim @@ -1,14 +1,14 @@ import std/tables, chronos, chronicles, metrics, stew/results, - libp2p/protocols/pubsub/rpc/[messages, protobuf], libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, - ./message_notifier + ./message_notifier, + ./../../node/v2/waku_types const - WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha4" + WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha5" type HistoryQuery* = object @@ -17,10 +17,10 @@ type HistoryResponse* = object uuid*: string - messages*: seq[Message] + messages*: seq[WakuMessage] WakuStore* = ref object of LPProtocol - messages*: seq[Message] + messages*: seq[WakuMessage] proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = var msg = HistoryQuery() @@ -44,7 +44,7 @@ proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = discard ? pb.getRepeatedField(2, messages) for buf in messages: - msg.messages.add(? protobuf.decodeMessage(initProtoBuffer(buf))) + msg.messages.add(? WakuMessage.init(buf)) ok(msg) @@ -62,15 +62,13 @@ proc encode*(response: HistoryResponse): ProtoBuffer = result.write(1, response.uuid) for msg in response.messages: - result.write(2, msg.encodeMessage()) + result.write(2, msg.encode()) proc query(w: WakuStore, query: HistoryQuery): HistoryResponse = - result = HistoryResponse(uuid: query.uuid, messages: newSeq[Message]()) + result = HistoryResponse(uuid: query.uuid, messages: newSeq[WakuMessage]()) for msg in w.messages: - for topic in query.topics: - if topic in msg.topicIDs: - result.messages.insert(msg) - break + if msg.contentTopic in query.topics: + result.messages.insert(msg) proc init*(T: type WakuStore): T = var ws = WakuStore() @@ -96,7 +94,7 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription = ## This is used to pipe messages into the storage, therefore ## the filter should be used by the component that receives ## new messages. - proc handle(msg: Message) = + proc handle(topic: string, msg: WakuMessage) = proto.messages.add(msg) MessageNotificationSubscription.init(@[], handle)