fix/message-notifier-message-to-waku-message (#159)

* update message type

* cleanup
This commit is contained in:
Dean Eigenmann 2020-09-16 04:59:10 +02:00 committed by GitHub
parent 8ba3db1a7e
commit dfee0359af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 40 additions and 38 deletions

View File

@ -7,11 +7,11 @@ import
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection], libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/protocols/pubsub/rpc/[message, messages, protobuf],
libp2p/multistream, libp2p/multistream,
libp2p/transports/transport, libp2p/transports/transport,
libp2p/transports/tcptransport, libp2p/transports/tcptransport,
../../waku/protocol/v2/[waku_relay, waku_filter, message_notifier], ../../waku/protocol/v2/[waku_relay, waku_filter, message_notifier],
../../waku/node/v2/waku_types,
../test_helpers, ./utils ../test_helpers, ./utils
procSuite "Waku Filter": procSuite "Waku Filter":
@ -26,8 +26,8 @@ procSuite "Waku Filter":
let let
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false) msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "pew")
msg2 = Message.init(peer, @[byte 1, 2, 3], "topic2", 4, false) msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "pew2")
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get() let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get()
@ -51,14 +51,14 @@ procSuite "Waku Filter":
let transport2: TcpTransport = TcpTransport.init() let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
var rpc = FilterRequest(contentFilter: @[ContentFilter(topics: @[])], topic: "topic") var rpc = FilterRequest(contentFilter: @[waku_filter.ContentFilter(topics: @["pew", "pew2"])], topic: "topic")
discard await msDial.select(conn, WakuFilterCodec) discard await msDial.select(conn, WakuFilterCodec)
await conn.writeLP(rpc.encode().buffer) await conn.writeLP(rpc.encode().buffer)
await sleepAsync(2.seconds) await sleepAsync(2.seconds)
subscriptions.notify(msg) subscriptions.notify("topic", msg)
subscriptions.notify(msg2) subscriptions.notify("topic", msg2)
var message = await conn.readLp(64*1024) var message = await conn.readLp(64*1024)

View File

@ -12,6 +12,7 @@ import
libp2p/transports/transport, libp2p/transports/transport,
libp2p/transports/tcptransport, libp2p/transports/tcptransport,
../../waku/protocol/v2/[waku_store, message_notifier], ../../waku/protocol/v2/[waku_store, message_notifier],
../../waku/node/v2/waku_types,
../test_helpers, ./utils ../test_helpers, ./utils
procSuite "Waku Store": procSuite "Waku Store":
@ -25,11 +26,11 @@ procSuite "Waku Store":
let let
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false) msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic")
msg2 = Message.init(peer, @[byte 1, 2, 3], "topic2", 4, false) msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic2")
subscriptions.notify(msg) subscriptions.notify("foo", msg)
subscriptions.notify(msg2) subscriptions.notify("foo", msg2)
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get() let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get()

View File

@ -1,6 +1,6 @@
import import
std/tables, std/tables,
libp2p/protocols/pubsub/rpc/messages ./../../node/v2/waku_types
# The Message Notification system is a method to notify various protocols # The Message Notification system is a method to notify various protocols
# running on a node when a new message was received. # running on a node when a new message was received.
@ -9,7 +9,7 @@ import
# The notification handler function will be called. # The notification handler function will be called.
type type
MessageNotificationHandler* = proc(msg: Message) {.gcsafe, closure.} MessageNotificationHandler* = proc(topic: string, msg: WakuMessage) {.gcsafe, closure.}
MessageNotificationSubscription* = object MessageNotificationSubscription* = object
topics: seq[string] # @TODO TOPIC topics: seq[string] # @TODO TOPIC
@ -33,10 +33,10 @@ proc containsMatch(lhs: seq[string], rhs: seq[string]): bool =
return false return false
proc notify*(subscriptions: var MessageNotificationSubscriptions, msg: Message) {.gcsafe.} = proc notify*(subscriptions: var MessageNotificationSubscriptions, topic: string, msg: WakuMessage) {.gcsafe.} =
for subscription in subscriptions.mvalues: for subscription in subscriptions.mvalues:
# @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES # @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES
if subscription.topics.len > 0 and not subscription.topics.containsMatch(msg.topicIDs): if subscription.topics.len > 0 and topic notin subscription.topics:
continue continue
subscription.handler(msg) subscription.handler(topic, msg)

View File

@ -4,18 +4,18 @@ import
libp2p/protocols/pubsub/pubsubpeer, libp2p/protocols/pubsub/pubsubpeer,
libp2p/protocols/pubsub/floodsub, libp2p/protocols/pubsub/floodsub,
libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/[messages, protobuf],
libp2p/protocols/protocol, libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/stream/connection, libp2p/stream/connection,
./message_notifier ./message_notifier,
./../../node/v2/waku_types
# NOTE This is just a start, the design of this protocol isn't done yet. It # NOTE This is just a start, the design of this protocol isn't done yet. It
# should be direct payload exchange (a la req-resp), not be coupled with the # should be direct payload exchange (a la req-resp), not be coupled with the
# relay protocol. # relay protocol.
const const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha4" WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha5"
type type
ContentFilter* = object ContentFilter* = object
@ -26,7 +26,7 @@ type
topic*: string topic*: string
MessagePush* = object MessagePush* = object
messages*: seq[Message] messages*: seq[WakuMessage]
Subscriber = object Subscriber = object
connection: Connection connection: Connection
@ -75,7 +75,7 @@ proc encode*(push: MessagePush): ProtoBuffer =
result = initProtoBuffer() result = initProtoBuffer()
for push in push.messages: for push in push.messages:
result.write(1, push.encodeMessage()) result.write(1, push.encode())
proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
var push = MessagePush() var push = MessagePush()
@ -85,7 +85,7 @@ proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
discard ? pb.getRepeatedField(1, messages) discard ? pb.getRepeatedField(1, messages)
for buf in messages: for buf in messages:
push.messages.add(? protobuf.decodeMessage(initProtoBuffer(buf))) push.messages.add(? WakuMessage.init(buf))
ok(push) ok(push)
@ -111,11 +111,14 @@ proc init*(T: type WakuFilter): T =
proc subscription*(proto: WakuFilter): MessageNotificationSubscription = proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
## Returns a Filter for the specific protocol ## Returns a Filter for the specific protocol
## This filter can then be used to send messages to subscribers that match conditions. ## This filter can then be used to send messages to subscribers that match conditions.
proc handle(msg: Message) = proc handle(topic: string, msg: WakuMessage) =
for subscriber in proto.subscribers: for subscriber in proto.subscribers:
if subscriber.filter.topic in msg.topicIDs: if subscriber.filter.topic != topic:
# @TODO PROBABLY WANT TO BATCH MESSAGES continue
for filter in subscriber.filter.contentFilter:
if msg.contentTopic in filter.topics:
discard subscriber.connection.writeLp(MessagePush(messages: @[msg]).encode().buffer) discard subscriber.connection.writeLp(MessagePush(messages: @[msg]).encode().buffer)
break break

View File

@ -1,14 +1,14 @@
import import
std/tables, std/tables,
chronos, chronicles, metrics, stew/results, chronos, chronicles, metrics, stew/results,
libp2p/protocols/pubsub/rpc/[messages, protobuf],
libp2p/protocols/protocol, libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/stream/connection, libp2p/stream/connection,
./message_notifier ./message_notifier,
./../../node/v2/waku_types
const const
WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha4" WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha5"
type type
HistoryQuery* = object HistoryQuery* = object
@ -17,10 +17,10 @@ type
HistoryResponse* = object HistoryResponse* = object
uuid*: string uuid*: string
messages*: seq[Message] messages*: seq[WakuMessage]
WakuStore* = ref object of LPProtocol WakuStore* = ref object of LPProtocol
messages*: seq[Message] messages*: seq[WakuMessage]
proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryQuery() var msg = HistoryQuery()
@ -44,7 +44,7 @@ proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
discard ? pb.getRepeatedField(2, messages) discard ? pb.getRepeatedField(2, messages)
for buf in messages: for buf in messages:
msg.messages.add(? protobuf.decodeMessage(initProtoBuffer(buf))) msg.messages.add(? WakuMessage.init(buf))
ok(msg) ok(msg)
@ -62,15 +62,13 @@ proc encode*(response: HistoryResponse): ProtoBuffer =
result.write(1, response.uuid) result.write(1, response.uuid)
for msg in response.messages: for msg in response.messages:
result.write(2, msg.encodeMessage()) result.write(2, msg.encode())
proc query(w: WakuStore, query: HistoryQuery): HistoryResponse = proc query(w: WakuStore, query: HistoryQuery): HistoryResponse =
result = HistoryResponse(uuid: query.uuid, messages: newSeq[Message]()) result = HistoryResponse(uuid: query.uuid, messages: newSeq[WakuMessage]())
for msg in w.messages: for msg in w.messages:
for topic in query.topics: if msg.contentTopic in query.topics:
if topic in msg.topicIDs: result.messages.insert(msg)
result.messages.insert(msg)
break
proc init*(T: type WakuStore): T = proc init*(T: type WakuStore): T =
var ws = WakuStore() var ws = WakuStore()
@ -96,7 +94,7 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription =
## This is used to pipe messages into the storage, therefore ## This is used to pipe messages into the storage, therefore
## the filter should be used by the component that receives ## the filter should be used by the component that receives
## new messages. ## new messages.
proc handle(msg: Message) = proc handle(topic: string, msg: WakuMessage) =
proto.messages.add(msg) proto.messages.add(msg)
MessageNotificationSubscription.init(@[], handle) MessageNotificationSubscription.init(@[], handle)