Move filter.notify to WakuRelay handler and add test (#128)

This commit is contained in:
Kim De Mey 2020-09-02 05:15:25 +02:00 committed by GitHub
parent 7a2bbdff11
commit 2717209f05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 123 additions and 66 deletions

View File

@ -5,25 +5,101 @@ import
chronicles, chronos, stew/shims/net as stewNet, stew/byteutils,
libp2p/crypto/crypto,
libp2p/crypto/secp,
libp2p/switch,
eth/keys,
../../waku/protocol/v2/waku_relay,
../../waku/node/v2/[wakunode2, waku_types],
../test_helpers
procSuite "WakuNode":
let rng = keys.newRng()
asyncTest "Message published with content filter is retrievable":
let
rng = keys.newRng()
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
Port(60000))
pubSubTopic = "chat"
contentTopic = "foobar"
contentFilter = ContentFilter(topics: @[contentTopic])
message = WakuMessage(payload: "hello world".toBytes(),
contentTopic: contentTopic)
# This could/should become a more fixed handler (at least default) that
# would be enforced on WakuNode level.
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
check:
topic == "chat"
node.filters.notify(msg[])
var completionFut = newFuture[bool]()
# This would be the actual application handler
proc contentHandler(message: seq[byte]) {.gcsafe, closure.} =
let msg = string.fromBytes(message)
check:
msg == "hello world"
completionFut.complete(true)
await node.start()
# Subscribe our node to the pubSubTopic where all chat data go onto.
node.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
node.subscribe(contentFilter, contentHandler)
node.publish(pubSubTopic, message)
check:
(await completionFut.withTimeout(5.seconds)) == true
await node.stop()
asyncTest "Content filtered publishing over network":
let
topic = "foo"
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
pubSubTopic = "chat"
contentTopic = "foobar"
wakuMessage = WakuMessage(payload: ("hello world").toBytes,
contentFilter = ContentFilter(topics: @[contentTopic])
message = WakuMessage(payload: "hello world".toBytes(),
contentTopic: contentTopic)
node.publish(topic, wakuMessage)
# TODO: Add actual subscribe part with receival of message
var completionFut = newFuture[bool]()
# This could/should become a more fixed handler (at least default) that
# would be enforced on WakuNode level.
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
check:
topic == "chat"
node1.filters.notify(msg[])
# This would be the actual application handler
proc contentHandler1(message: seq[byte]) {.gcsafe, closure.} =
let msg = string.fromBytes(message)
check:
msg == "hello world"
completionFut.complete(true)
await allFutures([node1.start(), node2.start()])
# Subscribe our node to the pubSubTopic where all chat data go onto.
node1.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
node1.subscribe(contentFilter, contentHandler1)
# Connect peers by dialing from node2 to node1
let conn = await node2.switch.dial(node1.peerInfo, WakuRelayCodec)
node2.publish(pubSubTopic, message)
check:
(await completionFut.withTimeout(5.seconds)) == true
await allFutures([node1.stop(), node2.stop()])

View File

@ -26,8 +26,8 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
# XXX Why is casting necessary here but not in Nim node API?
let wakuRelay = cast[WakuRelay](node.switch.pubSub.get())
# XXX also future return type
var message = WakuMessage(payload: payload, contentTopic: "foo")
discard wakuRelay.publish(topic, message)
# TODO: Shouldn't we really be doing WakuNode publish here?
discard wakuRelay.publish(topic, payload)
return true
#if not result:
# raise newException(ValueError, "Message could not be posted")
@ -40,6 +40,7 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
info "Hit subscribe handler", topic=topic, data=data
# TODO: Shouldn't we really be doing WakuNode subscribe here?
discard wakuRelay.subscribe(topic, handler)
return true
#if not result:

View File

@ -3,6 +3,7 @@
## TODO Move more common data types here
import
std/tables,
chronos,
libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
libp2p/protobuf/minprotobuf
@ -13,6 +14,20 @@ type
Topic* = string
Message* = seq[byte]
# TODO: these filter structures can be simplified but like this for now to
# match Node API
# Also, should reuse in filter/wakufilter code, but cyclic imports right now.
ContentFilter* = object
topics*: seq[string]
ContentFilterHandler* = proc(message: seq[byte]) {.gcsafe, closure.}
Filter* = object
contentFilter*: ContentFilter
handler*: ContentFilterHandler
Filters* = Table[string, Filter]
# NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object of RootObj
switch*: Switch
@ -20,6 +35,7 @@ type
libp2pTransportLoops*: seq[Future[void]]
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage
messages*: seq[(Topic, WakuMessage)]
filters*: Filters
WakuMessage* = object
payload*: seq[byte]
@ -41,3 +57,11 @@ proc encode*(message: WakuMessage): ProtoBuffer =
result.write(1, message.payload)
result.write(2, message.contentTopic)
proc notify*(filters: Filters, msg: WakuMessage) =
for filter in filters.values:
# TODO: In case of no topics we should either trigger here for all messages,
# or we should not allow such filter to exist in the first place.
if filter.contentFilter.topics.len > 0:
if msg.contentTopic in filter.contentFilter.topics:
filter.handler(msg.payload)

View File

@ -1,5 +1,5 @@
import
std/[strutils, options],
std/[strutils, options, tables],
chronos, confutils, json_rpc/rpcserver, metrics, stew/shims/net as stewNet,
# TODO: Why do we need eth keys?
eth/keys,
@ -24,8 +24,6 @@ type
# TODO Get rid of this and use waku_types one
Topic* = waku_types.Topic
Message* = seq[byte]
ContentFilter* = object
contentTopic*: string
HistoryQuery* = object
topics*: seq[string]
@ -142,10 +140,11 @@ proc start*(node: WakuNode) {.async.} =
## XXX: this should be /ip4..., / stripped?
info "Listening on", full = listenStr
# NOTE TopicHandler is defined in pubsub.nim, roughly:
#type TopicHandler* = proc(topic: string, data: seq[byte])
proc stop*(node: WakuNode) {.async.} =
let wakuRelay = node.switch.pubSub.get()
await wakuRelay.stop()
type ContentFilterHandler* = proc(contentFilter: ContentFilter, message: Message)
await node.switch.stop()
proc subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) =
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
@ -159,14 +158,13 @@ proc subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) =
discard wakuRelay.subscribe(topic, handler)
proc subscribe*(w: WakuNode, contentFilter: ContentFilter, handler: ContentFilterHandler) =
echo "NYI"
## Subscribes to a ContentFilter. Triggers handler when receiving messages on
## this content filter. ContentFilter is a method that takes some content
## filter, specifically with `ContentTopic`, and a `Message`. The `Message`
## has to match the `ContentTopic`.
## Status: Not yet implemented.
## TODO Implement as wrapper around `waku_filter` and `subscribe` above.
# TODO: get some random id, or use the Filter directly as key
w.filters.add("some random id", Filter(contentFilter: contentFilter, handler: handler))
proc unsubscribe*(w: WakuNode, topic: Topic) =
echo "NYI"
@ -198,8 +196,11 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) =
# Commenting out as it is later expected to be Message type, not WakuMessage
#node.messages.insert((topic, message))
debug "publish", topic=topic, contentTopic=message.contentTopic
let data = message.encode().buffer
# XXX Consider awaiting here
discard wakuRelay.publish(topic, message)
discard wakuRelay.publish(topic, data)
proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse =
## Queries for historical messages.

View File

@ -22,7 +22,6 @@ const WakuRelayCodec* = "/vac/waku/relay/2.0.0-alpha2"
type
WakuRelay* = ref object of GossipSub
gossipEnabled*: bool
filters*: Filters
method init*(w: WakuRelay) =
debug "init"
@ -37,7 +36,6 @@ method init*(w: WakuRelay) =
# XXX: Handler hijack GossipSub here?
w.handler = handler
w.filters = initTable[string, Filter]()
w.codec = WakuRelayCodec
method initPubSub*(w: WakuRelay) =
@ -62,59 +60,16 @@ method subscribe*(w: WakuRelay,
else:
await procCall FloodSub(w).subscribe(topic, handler)
method subscribeTopic*(w: WakuRelay,
topic: string,
subscribe: bool,
peerId: string) {.async, gcsafe.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
info "Hit NOOP handler", topic
# Currently we are using the libp2p topic here.
# This will need to be change to a Waku topic.
debug "subscribeTopic", topic=topic, subscribe=subscribe, peerId=peerId
if w.gossipEnabled:
await procCall GossipSub(w).subscribeTopic(topic, subscribe, peerId)
else:
await procCall FloodSub(w).subscribeTopic(topic, subscribe, peerId)
# XXX: This should distingish light and etc node
# NOTE: Relay subscription
# TODO: If peer is light node
info "about to call subscribe"
await w.subscribe(topic, handler)
method rpcHandler*(w: WakuRelay,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} =
debug "rpcHandler"
# XXX: Right place?
total_messages.inc()
if w.gossipEnabled:
await procCall GossipSub(w).rpcHandler(peer, rpcMsgs)
else:
await procCall FloodSub(w).rpcHandler(peer, rpcMsgs)
# XXX: here
for rpcs in rpcMsgs:
for msg in rpcs.messages:
w.filters.notify(msg)
method publish*(w: WakuRelay,
topic: string,
message: WakuMessage
message: seq[byte]
): Future[int] {.async.} =
debug "publish", topic=topic, contentTopic=message.contentTopic
let data = message.encode().buffer
debug "publish", topic=topic
if w.gossipEnabled:
return await procCall GossipSub(w).publish(topic, data)
return await procCall GossipSub(w).publish(topic, message)
else:
return await procCall FloodSub(w).publish(topic, data)
return await procCall FloodSub(w).publish(topic, message)
method unsubscribe*(w: WakuRelay,
topics: seq[TopicPair]) {.async.} =