feat(filter): split waku filter protocol into server and client

This commit is contained in:
Lorenzo Delgado 2022-11-02 11:59:58 +01:00 committed by GitHub
parent 1c46b61402
commit 8fee1b9bed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 741 additions and 960 deletions

View File

@ -326,9 +326,7 @@ proc writeAndPrint(c: Chat) {.async.} =
if not c.node.wakuFilter.isNil():
echo "unsubscribing from content filters..."
await c.node.unsubscribe(
FilterRequest(contentFilters: @[ContentFilter(contentTopic: c.contentTopic)], pubSubTopic: DefaultTopic, subscribe: false)
)
await c.node.unsubscribe(pubsubTopic=DefaultTopic, contentTopics=c.contentTopic)
echo "quitting..."
@ -499,18 +497,16 @@ proc processInput(rfd: AsyncFD) {.async.} =
if conf.filternode != "":
await node.mountFilter()
await node.mountFilterClient()
node.wakuFilter.setPeer(parseRemotePeerInfo(conf.filternode))
node.setFilterPeer(parseRemotePeerInfo(conf.filternode))
proc filterHandler(msg: WakuMessage) {.gcsafe.} =
proc filterHandler(pubsubTopic: string, msg: WakuMessage) {.gcsafe.} =
trace "Hit filter handler", contentTopic=msg.contentTopic
chat.printReceivedMessage(msg)
await node.subscribe(
FilterRequest(contentFilters: @[ContentFilter(contentTopic: chat.contentTopic)], pubSubTopic: DefaultTopic, subscribe: true),
filterHandler
)
await node.subscribe(pubsubTopic=DefaultTopic, contentTopics=chat.contentTopic, filterHandler)
# Subscribe to a topic, if relay is mounted
if conf.relay:

View File

@ -418,17 +418,18 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
return err("failed to set node waku lightpush peer: " & getCurrentExceptionMsg())
# Filter setup. NOTE Must be mounted after relay
if (conf.filternode != "") or (conf.filter):
if conf.filter:
try:
await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
except:
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
if conf.filternode != "":
try:
setFilterPeer(node, conf.filternode)
except:
return err("failed to set node waku filter peer: " & getCurrentExceptionMsg())
if conf.filternode != "":
try:
await mountFilterClient(node)
setFilterPeer(node, conf.filternode)
except:
return err("failed to set node waku filter peer: " & getCurrentExceptionMsg())
# waku peer exchange setup
if (conf.peerExchangeNode != "") or (conf.peerExchange):

View File

@ -5,7 +5,7 @@ import
chronicles,
testutils/unittests, stew/shims/net as stewNet,
json_rpc/[rpcserver, rpcclient],
eth/[keys, rlp], eth/common/eth_types,
eth/keys, eth/common/eth_types,
libp2p/[builders, switch, multiaddress],
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
@ -27,6 +27,7 @@ import
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_filter/client,
../../waku/v2/utils/peers,
../../waku/v2/utils/time,
./testlib/common
@ -51,7 +52,7 @@ procSuite "Waku v2 JSON-RPC API":
# RPC server setup
let
rpcPort = Port(8545)
rpcPort = Port(8546)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
@ -78,7 +79,7 @@ procSuite "Waku v2 JSON-RPC API":
# RPC server setup
let
rpcPort = Port(8545)
rpcPort = Port(8547)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
@ -150,7 +151,7 @@ procSuite "Waku v2 JSON-RPC API":
# RPC server setup
let
rpcPort = Port(8545)
rpcPort = Port(8548)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
@ -164,7 +165,7 @@ procSuite "Waku v2 JSON-RPC API":
# First see if we can retrieve messages published on the default topic (node is already subscribed)
await node2.publish(DefaultPubsubTopic, message1)
await sleepAsync(2000.millis)
await sleepAsync(100.millis)
var messages = await client.get_waku_v2_relay_v1_messages(DefaultPubsubTopic)
@ -182,7 +183,7 @@ procSuite "Waku v2 JSON-RPC API":
var response = await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
await sleepAsync(2000.millis)
await sleepAsync(100.millis)
check:
# Node is now subscribed to pubSubTopic
@ -191,7 +192,7 @@ procSuite "Waku v2 JSON-RPC API":
# Now publish a message on node1 and see if we receive it on node3
await node1.publish(pubSubTopic, message2)
await sleepAsync(2000.millis)
await sleepAsync(100.millis)
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
@ -219,7 +220,7 @@ procSuite "Waku v2 JSON-RPC API":
# RPC server setup
let
rpcPort = Port(8545)
rpcPort = Port(8549)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
@ -273,19 +274,26 @@ procSuite "Waku v2 JSON-RPC API":
await node.stop()
asyncTest "Filter API: subscribe/unsubscribe":
await node.start()
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, bindIp, Port(60390))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, bindIp, Port(60392))
await node.mountRelay()
await allFutures(node1.start(), node2.start())
await node.mountFilter()
await node1.mountFilter()
await node2.mountFilterClient()
node2.setFilterPeer(node1.peerInfo.toRemotePeerInfo())
# RPC server setup
let
rpcPort = Port(8545)
rpcPort = Port(8550)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installFilterApiHandlers(node, server, newTable[ContentTopic, seq[WakuMessage]]())
installFilterApiHandlers(node2, server, newTable[ContentTopic, seq[WakuMessage]]())
server.start()
let client = newRpcHttpClient()
@ -293,109 +301,31 @@ procSuite "Waku v2 JSON-RPC API":
check:
# Light node has not yet subscribed to any filters
node.filters.len() == 0
node2.wakuFilterClient.getSubscriptionsCount() == 0
let contentFilters = @[ContentFilter(contentTopic: DefaultContentTopic),
ContentFilter(contentTopic: ContentTopic("2")),
ContentFilter(contentTopic: ContentTopic("3")),
ContentFilter(contentTopic: ContentTopic("4")),
]
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(DefaultPubsubTopic))
let contentFilters = @[
ContentFilter(contentTopic: DefaultContentTopic),
ContentFilter(contentTopic: ContentTopic("2")),
ContentFilter(contentTopic: ContentTopic("3")),
ContentFilter(contentTopic: ContentTopic("4")),
]
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic))
check:
# Light node has successfully subscribed to a single filter
node.filters.len() == 1
response == true
# Light node has successfully subscribed to 4 content topics
node2.wakuFilterClient.getSubscriptionsCount() == 4
response = await client.delete_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(DefaultPubsubTopic))
response = await client.delete_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic))
check:
response == true
# Light node has successfully unsubscribed from all filters
node.filters.len() == 0
response == true
node2.wakuFilterClient.getSubscriptionsCount() == 0
## Cleanup
await server.stop()
await server.closeWait()
await node.stop()
asyncTest "Filter API: get latest messages":
await node.start()
# RPC server setup
let
rpcPort = Port(8545)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installFilterApiHandlers(node, server, newTable[ContentTopic, seq[WakuMessage]]())
server.start()
await node.mountFilter()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
# First ensure subscription exists
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopic: DefaultContentTopic)], topic = some(DefaultPubsubTopic))
check:
sub
# Now prime the node with some messages before tests
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))]
let
filters = node.filters
requestId = toSeq(Table(filters).keys)[0]
for wakuMsg in msgList:
filters.notify(wakuMsg, requestId)
var response = await client.get_waku_v2_filter_v1_messages(DefaultContentTopic)
check:
response.len() == 8
response.allIt(it.contentTopic == DefaultContentTopic)
# No new messages
response = await client.get_waku_v2_filter_v1_messages(DefaultContentTopic)
check:
response.len() == 0
# Now ensure that no more than the preset max messages can be cached
let maxSize = filter_api.maxCache
for x in 1..(maxSize + 1):
# Try to cache 1 more than maximum allowed
filters.notify(WakuMessage(payload: @[byte x], contentTopic: DefaultContentTopic), requestId)
await sleepAsync(2000.millis)
response = await client.get_waku_v2_filter_v1_messages(DefaultContentTopic)
check:
# Max messages has not been exceeded
response.len == maxSize
response.allIt(it.contentTopic == DefaultContentTopic)
# Check that oldest item has been removed
response[0].payload == @[byte 2]
response[maxSize - 1].payload == @[byte (maxSize + 1)]
await server.stop()
await server.closeWait()
await node.stop()
await allFutures(node1.stop(), node2.stop())
asyncTest "Admin API: connect to ad-hoc peers":
# Create a couple of nodes
@ -417,7 +347,7 @@ procSuite "Waku v2 JSON-RPC API":
# RPC server setup
let
rpcPort = Port(8545)
rpcPort = Port(8551)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
@ -458,10 +388,10 @@ procSuite "Waku v2 JSON-RPC API":
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60220))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60222))
peerInfo2 = node2.switch.peerInfo
peerInfo2 = node2.peerInfo
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60224))
peerInfo3 = node3.switch.peerInfo
peerInfo3 = node3.peerInfo
await allFutures([node1.start(), node2.start(), node3.start()])
@ -475,7 +405,7 @@ procSuite "Waku v2 JSON-RPC API":
# RPC server setup
let
rpcPort = Port(8545)
rpcPort = Port(8552)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
@ -510,7 +440,7 @@ procSuite "Waku v2 JSON-RPC API":
# RPC server setup
let
rpcPort = Port(8545)
rpcPort = Port(8553)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
@ -521,6 +451,7 @@ procSuite "Waku v2 JSON-RPC API":
await client.connect("127.0.0.1", rpcPort, false)
await node.mountFilter()
await node.mountFilterClient()
await node.mountSwap()
let store = StoreQueueRef.new()
await node.mountStore(store=store)
@ -539,13 +470,13 @@ procSuite "Waku v2 JSON-RPC API":
storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
storePeer = PeerInfo.new(storeKey, @[locationAddr])
node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo())
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
node.setStorePeer(storePeer.toRemotePeerInfo())
node.setFilterPeer(filterPeer.toRemotePeerInfo())
let response = await client.get_waku_v2_admin_v1_peers()
## Then
check:
response.len == 3
# Check filter peer
@ -555,6 +486,7 @@ procSuite "Waku v2 JSON-RPC API":
# Check store peer
(response.filterIt(it.protocol == WakuStoreCodec)[0]).multiaddr == constructMultiaddrStr(storePeer)
## Cleanup
await server.stop()
await server.closeWait()
@ -588,10 +520,10 @@ procSuite "Waku v2 JSON-RPC API":
# Setup two servers so we can see both sides of encrypted communication
let
rpcPort1 = Port(8545)
rpcPort1 = Port(8554)
ta1 = initTAddress(bindIp, rpcPort1)
server1 = newRpcHttpServer([ta1])
rpcPort3 = Port(8546)
rpcPort3 = Port(8555)
ta3 = initTAddress(bindIp, rpcPort3)
server3 = newRpcHttpServer([ta3])
@ -616,7 +548,7 @@ procSuite "Waku v2 JSON-RPC API":
let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
await sleepAsync(2000.millis)
await sleepAsync(100.millis)
check:
# node3 is now subscribed to pubSubTopic
@ -627,7 +559,7 @@ procSuite "Waku v2 JSON-RPC API":
check:
posted
await sleepAsync(2000.millis)
await sleepAsync(100.millis)
# Let's see if we can receive, and decrypt, this message on node3
var messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr())
@ -679,10 +611,10 @@ procSuite "Waku v2 JSON-RPC API":
# Setup two servers so we can see both sides of encrypted communication
let
rpcPort1 = Port(8545)
rpcPort1 = Port(8556)
ta1 = initTAddress(bindIp, rpcPort1)
server1 = newRpcHttpServer([ta1])
rpcPort3 = Port(8546)
rpcPort3 = Port(8557)
ta3 = initTAddress(bindIp, rpcPort3)
server3 = newRpcHttpServer([ta3])
@ -707,7 +639,7 @@ procSuite "Waku v2 JSON-RPC API":
let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
await sleepAsync(2000.millis)
await sleepAsync(100.millis)
check:
# node3 is now subscribed to pubSubTopic
@ -718,7 +650,7 @@ procSuite "Waku v2 JSON-RPC API":
check:
posted
await sleepAsync(2000.millis)
await sleepAsync(100.millis)
# Let's see if we can receive, and decrypt, this message on node3
var messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr())

View File

@ -19,7 +19,6 @@ import
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/storage/peer/waku_peer_storage,
../../waku/v2/node/waku_node,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter,
@ -30,11 +29,9 @@ procSuite "Peer Manager":
asyncTest "Peer dialing works":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60800))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60802))
peerInfo2 = node2.switch.peerInfo
await allFutures([node1.start(), node2.start()])
@ -63,11 +60,9 @@ procSuite "Peer Manager":
asyncTest "Dialing fails gracefully":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60810))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60812))
peerInfo2 = node2.switch.peerInfo
await node1.start()
@ -88,8 +83,7 @@ procSuite "Peer Manager":
asyncTest "Adding, selecting and filtering peers work":
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"),
Port(60000))
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60820))
# Create filter peer
filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
filterKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
@ -105,14 +99,14 @@ procSuite "Peer Manager":
await node.start()
await node.mountFilter()
await node.mountFilterClient()
await node.mountSwap()
node.mountStoreClient()
node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo())
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
node.setStorePeer(storePeer.toRemotePeerInfo())
node.setFilterPeer(filterPeer.toRemotePeerInfo())
# Check peers were successfully added to peer manager
check:
@ -133,11 +127,9 @@ procSuite "Peer Manager":
asyncTest "Peer manager keeps track of connections":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60830))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60832))
peerInfo2 = node2.switch.peerInfo
await node1.start()
@ -178,11 +170,9 @@ procSuite "Peer Manager":
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000), peerStorage = storage)
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60840), peerStorage = storage)
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60842))
peerInfo2 = node2.switch.peerInfo
await node1.start()
@ -201,8 +191,7 @@ procSuite "Peer Manager":
# Simulate restart by initialising a new node using the same storage
let
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"),
Port(60004), peerStorage = storage)
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60844), peerStorage = storage)
await node3.start()
check:
@ -226,11 +215,9 @@ procSuite "Peer Manager":
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000), peerStorage = storage)
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60850), peerStorage = storage)
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60852))
peerInfo2 = node2.switch.peerInfo
betaCodec = "/vac/waku/relay/2.0.0-beta2"
stableCodec = "/vac/waku/relay/2.0.0"
@ -254,8 +241,7 @@ procSuite "Peer Manager":
# Simulate restart by initialising a new node using the same storage
let
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"),
Port(60004), peerStorage = storage)
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60854), peerStorage = storage)
await node3.mountRelay()
node3.wakuRelay.codec = stableCodec

View File

@ -5,193 +5,171 @@ import
testutils/unittests,
chronos,
chronicles,
libp2p/crypto/crypto,
libp2p/multistream
libp2p/crypto/crypto
import
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_filter,
../test_helpers,
../../waku/v2/protocol/waku_filter/client,
./utils,
./testlib/common,
./testlib/switch
const dummyHandler = proc(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = discard
proc newTestWakuFilterNode(switch: Switch, timeout: Duration = 2.hours): Future[WakuFilter] {.async.} =
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
proto = WakuFilter.new(peerManager, rng, timeout)
await proto.start()
switch.mount(proto)
return proto
proc newTestWakuFilterClient(switch: Switch): Future[WakuFilterClient] {.async.} =
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
proto = WakuFilterClient.new(peerManager, rng)
await proto.start()
switch.mount(proto)
return proto
# TODO: Extend test coverage
procSuite "Waku Filter":
suite "Waku Filter":
asyncTest "should forward messages to client after subscribed":
## Setup
let rng = crypto.newRng()
let
clientSwitch = newTestSwitch()
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuFilterNode(serverSwitch)
client = await newTestWakuFilterClient(clientSwitch)
## Given
# Server
let
serverPeerManager = PeerManager.new(serverSwitch)
serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler)
await serverProto.start()
serverSwitch.mount(serverProto)
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
let pushHandlerFuture = newFuture[(string, WakuMessage)]()
proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} =
pushHandlerFuture.complete((pubsubTopic, message))
# Client
let handlerFuture = newFuture[(string, MessagePush)]()
proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} =
handlerFuture.complete((requestId, push))
let
clientPeerManager = PeerManager.new(clientSwitch)
clientProto = WakuFilter.init(clientPeerManager, rng, handler)
await clientProto.start()
clientSwitch.mount(clientProto)
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
let
pubsubTopic = DefaultPubsubTopic
contentTopic = "test-content-topic"
msg = fakeWakuMessage(contentTopic=contentTopic)
## When
let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic])
require resSubscription.isOk()
require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk()
# WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc
await sleepAsync(5.milliseconds)
let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic)
await serverProto.handleMessage(DefaultPubsubTopic, message)
await server.handleMessage(pubsubTopic, msg)
require await pushHandlerFuture.withTimeout(5.seconds)
## Then
let subscriptionRequestId = resSubscription.get()
let (requestId, push) = await handlerFuture
let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read()
check:
requestId == subscriptionRequestId
push.messages == @[message]
pushedMsgPubsubTopic == pubsubTopic
pushedMsg == msg
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "should not forward messages to client after unsuscribed":
## Setup
let rng = crypto.newRng()
let
clientSwitch = newTestSwitch()
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuFilterNode(serverSwitch)
client = await newTestWakuFilterClient(clientSwitch)
## Given
# Server
let
serverPeerManager = PeerManager.new(serverSwitch)
serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler)
await serverProto.start()
serverSwitch.mount(serverProto)
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
# Client
var handlerFuture = newFuture[void]()
proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} =
handlerFuture.complete()
var pushHandlerFuture = newFuture[void]()
proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} =
pushHandlerFuture.complete()
let
clientPeerManager = PeerManager.new(clientSwitch)
clientProto = WakuFilter.init(clientPeerManager, rng, handler)
await clientProto.start()
clientSwitch.mount(clientProto)
let
pubsubTopic = DefaultPubsubTopic
contentTopic = "test-content-topic"
msg = fakeWakuMessage(contentTopic=contentTopic)
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic)
let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic])
require resSubscription.isOk()
## When
require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk()
# WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc
await sleepAsync(5.milliseconds)
await serverProto.handleMessage(DefaultPubsubTopic, message)
let handlerWasCalledAfterSubscription = await handlerFuture.withTimeout(1.seconds)
require handlerWasCalledAfterSubscription
await server.handleMessage(pubsubTopic, msg)
require await pushHandlerFuture.withTimeout(1.seconds)
# Reset to test unsubscribe
handlerFuture = newFuture[void]()
pushHandlerFuture = newFuture[void]()
let resUnsubscription = await clientProto.unsubscribe(DefaultPubsubTopic, @[DefaultContentTopic])
require resUnsubscription.isOk()
require (await client.unsubscribe(pubsubTopic, contentTopic, peer=serverAddr)).isOk()
# WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc
await sleepAsync(5.milliseconds)
await serverProto.handleMessage(DefaultPubsubTopic, message)
await server.handleMessage(pubsubTopic, msg)
## Then
let handlerWasCalledAfterUnsubscription = await handlerFuture.withTimeout(1.seconds)
let handlerWasCalledAfterUnsubscription = await pushHandlerFuture.withTimeout(1.seconds)
check:
not handlerWasCalledAfterUnsubscription
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "subscription should fail if no filter peer is provided":
## Setup
let clientSwitch = newTestSwitch()
await clientSwitch.start()
## Given
let clientProto = WakuFilter.init(PeerManager.new(clientSwitch), crypto.newRng(), dummyHandler)
await clientProto.start()
clientSwitch.mount(clientProto)
## When
let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic])
## Then
check:
resSubscription.isErr()
resSubscription.error() == "peer_not_found_failure"
asyncTest "peer subscription should be dropped if connection fails for second time after the timeout has elapsed":
## Setup
let rng = crypto.newRng()
let
clientSwitch = newTestSwitch()
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuFilterNode(serverSwitch, timeout=200.milliseconds)
client = await newTestWakuFilterClient(clientSwitch)
## Given
# Server
let
serverPeerManager = PeerManager.new(serverSwitch)
serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler, timeout=1.seconds)
await serverProto.start()
serverSwitch.mount(serverProto)
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
# Client
var handlerFuture = newFuture[void]()
proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} =
handlerFuture.complete()
var pushHandlerFuture = newFuture[void]()
proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} =
pushHandlerFuture.complete()
let
clientPeerManager = PeerManager.new(clientSwitch)
clientProto = WakuFilter.init(clientPeerManager, rng, handler)
await clientProto.start()
clientSwitch.mount(clientProto)
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
let
pubsubTopic = DefaultPubsubTopic
contentTopic = "test-content-topic"
msg = fakeWakuMessage(contentTopic=contentTopic)
## When
let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic])
check resSubscription.isOk()
require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk()
# WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc
await sleepAsync(5.milliseconds)
let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic)
await serverProto.handleMessage(DefaultPubsubTopic, message)
let handlerShouldHaveBeenCalled = await handlerFuture.withTimeout(1.seconds)
require handlerShouldHaveBeenCalled
await server.handleMessage(DefaultPubsubTopic, msg)
# Push handler should be called
require await pushHandlerFuture.withTimeout(1.seconds)
# Stop client node to test timeout unsubscription
await clientSwitch.stop()
@ -199,19 +177,19 @@ procSuite "Waku Filter":
await sleepAsync(5.milliseconds)
# First failure should not remove the subscription
await serverProto.handleMessage(DefaultPubsubTopic, message)
await server.handleMessage(DefaultPubsubTopic, msg)
let
subscriptionsBeforeTimeout = serverProto.subscriptions.len()
failedPeersBeforeTimeout = serverProto.failedPeers.len()
subscriptionsBeforeTimeout = server.subscriptions.len()
failedPeersBeforeTimeout = server.failedPeers.len()
# Wait for peer connection failure timeout to elapse
await sleepAsync(1.seconds)
# Wait for the configured peer connection timeout to elapse (200ms)
await sleepAsync(200.milliseconds)
#Second failure should remove the subscription
await serverProto.handleMessage(DefaultPubsubTopic, message)
# Second failure should remove the subscription
await server.handleMessage(DefaultPubsubTopic, msg)
let
subscriptionsAfterTimeout = serverProto.subscriptions.len()
failedPeersAfterTimeout = serverProto.failedPeers.len()
subscriptionsAfterTimeout = server.subscriptions.len()
failedPeersAfterTimeout = server.failedPeers.len()
## Then
check:
@ -222,55 +200,45 @@ procSuite "Waku Filter":
## Cleanup
await serverSwitch.stop()
asyncTest "peer subscription should not be dropped if connection recovers before timeout elapses":
## Setup
let
clientKey = PrivateKey.random(ECDSA, rng[]).get()
clientAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/65000").get()
let rng = crypto.newRng()
let
clientSwitch = newTestSwitch(some(clientKey), some(clientAddress))
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuFilterNode(serverSwitch, timeout=200.milliseconds)
client = await newTestWakuFilterClient(clientSwitch)
## Given
# Server
let
serverPeerManager = PeerManager.new(serverSwitch)
serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler, timeout=2.seconds)
await serverProto.start()
serverSwitch.mount(serverProto)
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
# Client
var handlerFuture = newFuture[void]()
proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} =
handlerFuture.complete()
var pushHandlerFuture = newFuture[void]()
proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} =
pushHandlerFuture.complete()
let
clientPeerManager = PeerManager.new(clientSwitch)
clientProto = WakuFilter.init(clientPeerManager, rng, handler)
await clientProto.start()
clientSwitch.mount(clientProto)
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
let
pubsubTopic = DefaultPubsubTopic
contentTopic = "test-content-topic"
msg = fakeWakuMessage(contentTopic=contentTopic)
## When
let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic)
let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic])
check resSubscription.isOk()
require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk()
# WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc
await sleepAsync(5.milliseconds)
await serverProto.handleMessage(DefaultPubsubTopic, message)
handlerFuture = newFuture[void]()
await server.handleMessage(DefaultPubsubTopic, msg)
# Push handler should be called
require await pushHandlerFuture.withTimeout(1.seconds)
let
subscriptionsBeforeFailure = serverProto.subscriptions.len()
failedPeersBeforeFailure = serverProto.failedPeers.len()
subscriptionsBeforeFailure = server.subscriptions.len()
failedPeersBeforeFailure = server.failedPeers.len()
# Stop switch to test unsubscribe
await clientSwitch.stop()
@ -278,29 +246,33 @@ procSuite "Waku Filter":
await sleepAsync(5.milliseconds)
# First failure should add to failure list
await serverProto.handleMessage(DefaultPubsubTopic, message)
handlerFuture = newFuture[void]()
await server.handleMessage(DefaultPubsubTopic, msg)
pushHandlerFuture = newFuture[void]()
let
subscriptionsAfterFailure = serverProto.subscriptions.len()
failedPeersAfterFailure = serverProto.failedPeers.len()
subscriptionsAfterFailure = server.subscriptions.len()
failedPeersAfterFailure = server.failedPeers.len()
await sleepAsync(100.milliseconds)
await sleepAsync(250.milliseconds)
# Start switch with same key as before
var clientSwitch2 = newTestSwitch(some(clientKey), some(clientAddress))
let clientSwitch2 = newTestSwitch(
some(clientSwitch.peerInfo.privateKey),
some(clientSwitch.peerInfo.addrs[0])
)
await clientSwitch2.start()
await clientProto.start()
clientSwitch2.mount(clientProto)
await client.start()
clientSwitch2.mount(client)
# If push succeeds after failure, the peer should removed from failed peers list
await serverProto.handleMessage(DefaultPubsubTopic, message)
let handlerShouldHaveBeenCalled = await handlerFuture.withTimeout(1.seconds)
let
subscriptionsAfterSuccessfulConnection = serverProto.subscriptions.len()
failedPeersAfterSuccessfulConnection = serverProto.failedPeers.len()
await server.handleMessage(DefaultPubsubTopic, msg)
let handlerShouldHaveBeenCalled = await pushHandlerFuture.withTimeout(1.seconds)
let
subscriptionsAfterSuccessfulConnection = server.subscriptions.len()
failedPeersAfterSuccessfulConnection = server.failedPeers.len()
## Then
check:
handlerShouldHaveBeenCalled
@ -314,6 +286,6 @@ procSuite "Waku Filter":
failedPeersBeforeFailure == 0
failedPeersAfterFailure == 1
failedPeersAfterSuccessfulConnection == 0
## Cleanup
await allFutures(clientSwitch2.stop(), serverSwitch.stop())

View File

@ -6,292 +6,60 @@ import
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/peerid,
libp2p/multiaddress,
libp2p/switch,
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub,
eth/keys
libp2p/crypto/crypto
import
../../waku/v2/protocol/[waku_relay, waku_message],
../../waku/v2/protocol/waku_filter,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/waku_node,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_filter,
../../waku/v2/utils/peers,
../../waku/v2/node/waku_node
./testlib/common
procSuite "WakuNode - Filter":
let rng = keys.newRng()
suite "WakuNode - Filter":
asyncTest "Message published with content filter is retrievable":
asyncTest "subscriber should receive the message handled by the publisher":
## Setup
let rng = crypto.newRng()
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"),
Port(60000))
pubSubTopic = "chat"
contentTopic = ContentTopic("/waku/2/default-content/proto")
filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true)
message = WakuMessage(payload: "hello world".toBytes(),
contentTopic: contentTopic)
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60110))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60111))
# This could/should become a more fixed handler (at least default) that
# would be enforced on WakuNode level.
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
check:
topic == "chat"
node.filters.notify(msg.value(), topic)
await allFutures(server.start(), client.start())
var completionFut = newFuture[bool]()
await server.mountFilter()
await client.mountFilterClient()
# This would be the actual application handler
proc contentHandler(msg: WakuMessage) {.gcsafe, closure.} =
let message = string.fromBytes(msg.payload)
check:
message == "hello world"
completionFut.complete(true)
## Given
let serverPeerInfo = server.peerInfo.toRemotePeerInfo()
await node.start()
await node.mountRelay()
# Subscribe our node to the pubSubTopic where all chat data go onto.
node.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
await node.subscribe(filterRequest, contentHandler)
await sleepAsync(2000.millis)
await node.publish(pubSubTopic, message)
check:
(await completionFut.withTimeout(5.seconds)) == true
await node.stop()
asyncTest "Content filtered publishing over network":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
let
pubSubTopic = "chat"
contentTopic = ContentTopic("/waku/2/default-content/proto")
filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true)
message = WakuMessage(payload: "hello world".toBytes(),
contentTopic: contentTopic)
pubSubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
message = fakeWakuMessage(contentTopic=contentTopic)
var completionFut = newFuture[bool]()
var filterPushHandlerFut = newFuture[(PubsubTopic, WakuMessage)]()
proc filterPushHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} =
filterPushHandlerFut.complete((pubsubTopic, msg))
# This could/should become a more fixed handler (at least default) that
# would be enforced on WakuNode level.
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
check:
topic == "chat"
node1.filters.notify(msg.value(), topic)
## When
await client.filterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo)
# This would be the actual application handler
proc contentHandler(msg: WakuMessage) {.gcsafe, closure.} =
let message = string.fromBytes(msg.payload)
check:
message == "hello world"
completionFut.complete(true)
# Wait for subscription to take effect
await sleepAsync(100.millis)
await allFutures([node1.start(), node2.start()])
await server.filterHandleMessage(pubSubTopic, message)
await node1.mountRelay()
await node2.mountRelay()
await node1.mountFilter()
await node2.mountFilter()
# Subscribe our node to the pubSubTopic where all chat data go onto.
node1.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
node1.wakuFilter.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
await node1.subscribe(filterRequest, contentHandler)
await sleepAsync(2000.millis)
# Connect peers by dialing from node2 to node1
discard await node2.switch.dial(node1.switch.peerInfo.peerId, node1.switch.peerInfo.addrs, WakuRelayCodec)
# We need to sleep to allow the subscription to go through
info "Going to sleep to allow subscribe to go through"
await sleepAsync(2000.millis)
info "Waking up and publishing"
await node2.publish(pubSubTopic, message)
require await filterPushHandlerFut.withTimeout(5.seconds)
## Then
check filterPushHandlerFut.completed()
let (filterPubsubTopic, filterMessage) = filterPushHandlerFut.read()
check:
(await completionFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Can receive filtered messages published on both default and other topics":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
defaultTopic = "/waku/2/default-waku/proto"
otherTopic = "/non/waku/formatted"
defaultContentTopic = "defaultCT"
otherContentTopic = "otherCT"
defaultPayload = @[byte 1]
otherPayload = @[byte 9]
defaultMessage = WakuMessage(payload: defaultPayload, contentTopic: defaultContentTopic)
otherMessage = WakuMessage(payload: otherPayload, contentTopic: otherContentTopic)
defaultFR = FilterRequest(contentFilters: @[ContentFilter(contentTopic: defaultContentTopic)], subscribe: true)
otherFR = FilterRequest(contentFilters: @[ContentFilter(contentTopic: otherContentTopic)], subscribe: true)
filterPubsubTopic == pubsubTopic
filterMessage == message
await node1.start()
await node1.mountRelay()
await node1.mountFilter()
await node2.start()
await node2.mountRelay()
await node2.mountFilter()
node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
var defaultComplete = newFuture[bool]()
var otherComplete = newFuture[bool]()
# Subscribe nodes 1 and 2 to otherTopic
proc emptyHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# Do not notify filters or subscriptions here. This should be default behaviour for all topics
discard
node1.subscribe(otherTopic, emptyHandler)
node2.subscribe(otherTopic, emptyHandler)
await sleepAsync(2000.millis)
proc defaultHandler(msg: WakuMessage) {.gcsafe, closure.} =
check:
msg.payload == defaultPayload
msg.contentTopic == defaultContentTopic
defaultComplete.complete(true)
proc otherHandler(msg: WakuMessage) {.gcsafe, closure.} =
check:
msg.payload == otherPayload
msg.contentTopic == otherContentTopic
otherComplete.complete(true)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
await node2.subscribe(defaultFR, defaultHandler)
await sleepAsync(2000.millis)
# Let's check that content filtering works on the default topic
await node1.publish(defaultTopic, defaultMessage)
check:
(await defaultComplete.withTimeout(5.seconds)) == true
# Now check that content filtering works on other topics
await node2.subscribe(otherFR, otherHandler)
await sleepAsync(2000.millis)
await node1.publish(otherTopic,otherMessage)
check:
(await otherComplete.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Filter protocol works on node without relay capability":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
defaultTopic = "/waku/2/default-waku/proto"
contentTopic = "defaultCT"
payload = @[byte 1]
message = WakuMessage(payload: payload, contentTopic: contentTopic)
filterRequest = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true)
await node1.start()
await node1.mountRelay()
await node1.mountFilter()
await node2.start()
await node2.mountFilter()
node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
check:
node1.wakuRelay.isNil == false # Node1 is a full node
node2.wakuRelay.isNil == true # Node 2 is a light node
var completeFut = newFuture[bool]()
proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} =
check:
msg.payload == payload
msg.contentTopic == contentTopic
completeFut.complete(true)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
await node2.subscribe(filterRequest, filterHandler)
await sleepAsync(2000.millis)
# Let's check that content filtering works on the default topic
await node1.publish(defaultTopic, message)
check:
(await completeFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Filter protocol returns expected message":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
contentTopic = ContentTopic("/waku/2/default-content/proto")
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
var completionFut = newFuture[bool]()
await node1.start()
await node1.mountFilter()
await node2.start()
await node2.mountFilter()
node1.wakuFilter.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
proc handler(msg: WakuMessage) {.gcsafe, closure.} =
check:
msg == message
completionFut.complete(true)
await node1.subscribe(FilterRequest(pubSubTopic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), handler)
await sleepAsync(2000.millis)
await node2.wakuFilter.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(2000.millis)
check:
(await completionFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
## Cleanup
await allFutures(client.stop(), server.stop())

View File

@ -39,9 +39,9 @@ procSuite "WakuNode - Store":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60432))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60430))
await allFutures(client.start(), server.start())
await server.mountStore(store=newTestMessageStore())
@ -73,34 +73,31 @@ procSuite "WakuNode - Store":
## Setup
let
filterSourceKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
filterSource = WakuNode.new(filterSourceKey, ValidIpAddress.init("0.0.0.0"), Port(60004))
filterSource = WakuNode.new(filterSourceKey, ValidIpAddress.init("0.0.0.0"), Port(60404))
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60402))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60400))
await allFutures(client.start(), server.start(), filterSource.start())
await filterSource.mountFilter()
await server.mountStore(store=newTestMessageStore())
await server.mountFilter()
await client.mountStore()
await server.mountFilterClient()
client.mountStoreClient()
server.wakuFilter.setPeer(filterSource.peerInfo.toRemotePeerInfo())
## Given
let message = fakeWakuMessage()
let serverPeer = server.peerInfo.toRemotePeerInfo()
## Then
let filterFut = newFuture[bool]()
proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} =
check:
msg == message
filterFut.complete(true)
let
serverPeer = server.peerInfo.toRemotePeerInfo()
filterSourcePeer = filterSource.peerInfo.toRemotePeerInfo()
let filterReq = FilterRequest(pubSubTopic: DefaultPubsubTopic, contentFilters: @[ContentFilter(contentTopic: DefaultContentTopic)], subscribe: true)
await server.subscribe(filterReq, filterReqHandler)
## Then
let filterFut = newFuture[(PubsubTopic, WakuMessage)]()
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} =
filterFut.complete((pubsubTopic, msg))
await server.filterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer)
await sleepAsync(100.millis)
@ -108,7 +105,7 @@ procSuite "WakuNode - Store":
await filterSource.wakuFilter.handleMessage(DefaultPubsubTopic, message)
# Wait for the server filter to receive the push message
require (await filterFut.withTimeout(5.seconds))
require await filterFut.withTimeout(5.seconds)
let res = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]), peer=serverPeer)
@ -120,6 +117,11 @@ procSuite "WakuNode - Store":
response.messages.len == 1
response.messages[0] == message
let (handledPubsubTopic, handledMsg) = filterFut.read()
check:
handledPubsubTopic == DefaultPubsubTopic
handledMsg == message
## Cleanup
await allFutures(client.stop(), server.stop(), filterSource.stop())
@ -128,9 +130,9 @@ procSuite "WakuNode - Store":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60412))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60410))
await allFutures(client.start(), server.start())
@ -161,9 +163,9 @@ procSuite "WakuNode - Store":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60422))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60420))
await allFutures(server.start(), client.start())
await server.mountStore(store=StoreQueueRef.new())

View File

@ -1,95 +1,100 @@
{.push raises: [Defect].}
import
std/[tables,sequtils],
std/[tables, sequtils],
chronicles,
json_rpc/rpcserver
import
../../protocol/waku_message,
../../protocol/waku_filter,
../../protocol/waku_filter/client,
../waku_node,
./jsonrpc_types
export jsonrpc_types
logScope:
topics = "filter api"
topics = "wakunode.rpc.filter"
const DefaultPubsubTopic: PubsubTopic = "/waku/2/default-waku/proto"
const futTimeout* = 5.seconds # Max time to wait for futures
const maxCache* = 30 # Max number of messages cached per topic @TODO make this configurable
const maxCache* = 30 # Max number of messages cached per topic TODO: make this configurable
proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: MessageCache) =
proc filterHandler(msg: WakuMessage) {.gcsafe, closure, raises: [Defect].} =
# Add message to current cache
trace "WakuMessage received", msg=msg
# Make a copy of msgs for this topic to modify
var msgs = messageCache.getOrDefault(msg.contentTopic, @[])
if msgs.len >= maxCache:
# Message cache on this topic exceeds maximum. Delete oldest.
# @TODO this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed.
msgs.delete(0,0)
msgs.add(msg)
# Replace indexed entry with copy
# @TODO max number of content topics could be limited in node
messageCache[msg.contentTopic] = msgs
## Filter API version 1 definitions
rpcsrv.rpc("get_waku_v2_filter_v1_messages") do(contentTopic: ContentTopic) -> seq[WakuMessage]:
rpcsrv.rpc("get_waku_v2_filter_v1_messages") do (contentTopic: ContentTopic) -> seq[WakuMessage]:
## Returns all WakuMessages received on a content topic since the
## last time this method was called
## @TODO ability to specify a return message limit
## TODO: ability to specify a return message limit
debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic
if messageCache.hasKey(contentTopic):
let msgs = messageCache[contentTopic]
# Clear cache before next call
messageCache[contentTopic] = @[]
return msgs
else:
# Not subscribed to this content topic
if not messageCache.hasKey(contentTopic):
raise newException(ValueError, "Not subscribed to content topic: " & $contentTopic)
let msgs = messageCache[contentTopic]
# Clear cache before next call
messageCache[contentTopic] = @[]
return msgs
rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
## Subscribes a node to a list of content filters
debug "post_waku_v2_filter_v1_subscription"
# Construct a filter request
# @TODO use default PubSub topic if undefined
let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true)
let
pubsubTopic: string = topic.get(DefaultPubsubTopic)
contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
let pushHandler:FilterPushHandler = proc(pubsubTopic: string, msg: WakuMessage) {.gcsafe, closure.} =
# Add message to current cache
trace "WakuMessage received", msg=msg
# Make a copy of msgs for this topic to modify
var msgs = messageCache.getOrDefault(msg.contentTopic, @[])
if msgs.len >= maxCache:
# Message cache on this topic exceeds maximum. Delete oldest.
# TODO: this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed.
msgs.delete(0,0)
msgs.add(msg)
# Replace indexed entry with copy
# TODO: max number of content topics could be limited in node
messageCache[msg.contentTopic] = msgs
let subFut = node.subscribe(pubsubTopic, contentTopics, pushHandler)
if not await subFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to subscribe to contentFilters")
# Successfully subscribed to all content filters
for cTopic in contentTopics:
# Create message cache for each subscribed content topic
messageCache[cTopic] = @[]
if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)):
# Successfully subscribed to all content filters
for cTopic in contentFilters.mapIt(it.contentTopic):
# Create message cache for each subscribed content topic
messageCache[cTopic] = @[]
return true
else:
# Failed to subscribe to one or more content filters
raise newException(ValueError, "Failed to subscribe to contentFilters " & repr(fReq))
return true
rpcsrv.rpc("delete_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
## Unsubscribes a node from a list of content filters
debug "delete_waku_v2_filter_v1_subscription"
# Construct a filter request
# @TODO consider using default PubSub topic if undefined
let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false)
let
pubsubTopic: string = topic.get(DefaultPubsubTopic)
contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
if (await node.unsubscribe(fReq).withTimeout(futTimeout)):
# Successfully unsubscribed from all content filters
let unsubFut = node.unsubscribe(pubsubTopic, contentTopics)
for cTopic in contentFilters.mapIt(it.contentTopic):
# Remove message cache for each unsubscribed content topic
messageCache.del(cTopic)
if not await unsubFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to unsubscribe from contentFilters")
# Successfully unsubscribed from all content filters
for cTopic in contentTopics:
# Remove message cache for each unsubscribed content topic
messageCache.del(cTopic)
return true
else:
# Failed to unsubscribe from one or more content filters
raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq))
return true

View File

@ -1,14 +1,13 @@
{.push raises: [Defect].}
import
stew/results,
stew/shims/net,
chronicles,
chronos,
metrics,
metrics/chronos_httpserver
import
../protocol/waku_filter,
../protocol/waku_filter/protocol_metrics as filter_metrics,
../protocol/waku_store/protocol_metrics as store_metrics,
../protocol/waku_lightpush/protocol_metrics as lightpush_metrics,
../protocol/waku_swap/waku_swap,

View File

@ -21,11 +21,13 @@ import
../protocol/waku_store/client,
../protocol/waku_swap/waku_swap,
../protocol/waku_filter,
../protocol/waku_filter/client,
../protocol/waku_lightpush,
../protocol/waku_lightpush/client,
../protocol/waku_rln_relay/waku_rln_relay_types,
../protocol/waku_peer_exchange,
../utils/[peers, requests, wakuenr],
../utils/peers,
../utils/wakuenr,
./peer_manager/peer_manager,
./storage/message/waku_store_queue,
./storage/message/message_retention_policy,
@ -37,24 +39,26 @@ import
declarePublicGauge waku_version, "Waku version info (in git describe format)", ["version"]
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
declarePublicGauge waku_filter_peers, "number of filter peers"
declarePublicGauge waku_store_peers, "number of store peers"
declarePublicGauge waku_px_peers, "number of peers (in the node's peerManager) supporting the peer exchange protocol"
logScope:
topics = "wakunode"
# Git version in git describe format (defined compile time)
const git_version* {.strdefine.} = "n/a"
# Default clientId
const clientId* = "Nimbus Waku v2 node"
# Default topic
const defaultTopic* = "/waku/2/default-waku/proto"
# TODO: Unify pubusub topic type and default value
type PubsubTopic* = string
const defaultTopic*: PubsubTopic = "/waku/2/default-waku/proto"
# Default Waku Filter Timeout
const WakuFilterTimeout: Duration = 1.days
@ -63,7 +67,6 @@ const WakuFilterTimeout: Duration = 1.days
# key and crypto modules different
type
# XXX: Weird type, should probably be using pubsub PubsubTopic object name?
PubsubTopic* = string
Message* = seq[byte]
WakuInfo* = object
@ -80,6 +83,7 @@ type
wakuStore*: WakuStore
wakuStoreClient*: WakuStoreClient
wakuFilter*: WakuFilter
wakuFilterClient*: WakuFilterClient
wakuSwap*: WakuSwap
wakuRlnRelay*: WakuRLNRelay
wakuLightPush*: WakuLightPush
@ -87,7 +91,6 @@ type
wakuPeerExchange*: WakuPeerExchange
enr*: enr.Record
libp2pPing*: Ping
filters*: Filters
rng*: ref rand.HmacDrbgContext
wakuDiscv5*: WakuDiscoveryV5
announcedAddresses* : seq[MultiAddress]
@ -217,7 +220,6 @@ proc new*(T: type WakuNode,
switch: switch,
rng: rng,
enr: enr,
filters: Filters.init(),
announcedAddresses: announcedAddresses
)
@ -410,80 +412,123 @@ proc mountRelay*(node: WakuNode,
## Waku filter
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} =
info "mounting filter"
proc filterHandler(requestId: string, msg: MessagePush) {.async, gcsafe.} =
info "push received"
for message in msg.messages:
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
info "mounting filter protocol"
node.wakuFilter = WakuFilter.new(node.peerManager, node.rng, filterTimeout)
if not node.wakuStore.isNil and (requestId in node.filters):
let pubSubTopic = node.filters[requestId].pubSubTopic
node.wakuStore.handleMessage(pubSubTopic, message)
waku_node_messages.inc(labelValues = ["filter"])
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout)
if node.started:
# Node has started already. Let's start filter too.
await node.wakuFilter.start()
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec))
proc setFilterPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} =
proc filterHandleMessage*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.}=
if node.wakuFilter.isNil():
error "could not set peer, waku filter is nil"
error "cannot handle filter message", error="waku filter is nil"
return
info "Set filter peer", peer=peer
await node.wakuFilter.handleMessage(pubsubTopic, message)
proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} =
info "mounting filter client"
node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng)
if node.started:
# Node has started already. Let's start filter too.
await node.wakuFilterClient.start()
node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterCodec))
proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic],
handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
if node.wakuFilterClient.isNil():
error "cannot register filter subscription to topic", error="waku filter client is nil"
return
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer
# Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled
# TODO: Move this logic to wakunode2 app
let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.raises: [Exception].} =
if node.wakuRelay.isNil() and not node.wakuStore.isNil():
node.wakuStore.handleMessage(pubSubTopic, message)
handler(pubsubTopic, message)
let subRes = await node.wakuFilterClient.subscribe(pubsubTopic, contentTopics, handlerWrapper, peer=remotePeer)
if subRes.isOk():
info "subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics
else:
error "failed filter subscription", error=subRes.error
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic],
peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} =
## Unsubscribe from a content filter.
if node.wakuFilterClient.isNil():
error "cannot unregister filter subscription to content", error="waku filter client is nil"
return
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer
let unsubRes = await node.wakuFilterClient.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer)
if unsubRes.isOk():
info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics
else:
error "failed filter unsubscription", error=unsubRes.error
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
# TODO: Move to application module (e.g., wakunode2.nim)
proc setFilterPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError],
deprecated: "Use the explicit destination peer procedures".} =
if node.wakuFilterClient.isNil():
error "could not set peer, waku filter client is nil"
return
info "seting filter client peer", peer=peer
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
node.wakuFilter.setPeer(remotePeer)
node.peerManager.addPeer(remotePeer, WakuFilterCodec)
proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} =
waku_filter_peers.inc()
# TODO: Move to application module (e.g., wakunode2.nim)
proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe,
deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
## FilterHandler is a method that takes a MessagePush.
if node.wakuFilterClient.isNil():
error "cannot register filter subscription to topic", error="waku filter client is nil"
return
# Sanity check for well-formed subscribe FilterRequest
doAssert(request.subscribe, "invalid subscribe request")
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers"
return
info "subscribe content", filter=request
await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get())
var id = generateRequestId(node.rng)
if not node.wakuFilter.isNil():
let
pubsubTopic = request.pubsubTopic
contentTopics = request.contentFilters.mapIt(it.contentTopic)
let resSubscription = await node.wakuFilter.subscribe(pubsubTopic, contentTopics)
if resSubscription.isOk():
id = resSubscription.get()
else:
# Failed to subscribe
error "remote subscription to filter failed", filter = request
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
# Register handler for filter, whether remote subscription succeeded or not
node.filters.addContentFilters(id, request.pubSubTopic, request.contentFilters, handler)
waku_node_filters.set(node.filters.len.int64)
proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} =
# TODO: Move to application module (e.g., wakunode2.nim)
proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe,
deprecated: "Use the explicit destination peer procedure. Use 'node.filterUnsusbscribe()' instead.".} =
## Unsubscribe from a content filter.
if node.wakuFilterClient.isNil():
error "cannot unregister filter subscription to content", error="waku filter client is nil"
return
# Sanity check for well-formed unsubscribe FilterRequest
doAssert(request.subscribe == false, "invalid unsubscribe request")
info "unsubscribe content", filter=request
let
pubsubTopic = request.pubsubTopic
contentTopics = request.contentFilters.mapIt(it.contentTopic)
discard await node.wakuFilter.unsubscribe(pubsubTopic, contentTopics)
node.filters.removeContentFilters(request.contentFilters)
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers"
return
waku_node_filters.set(node.filters.len.int64)
await node.filterUnsubscribe(pubsubTopic, contentTopics, peer=peerOpt.get())
## Waku swap
@ -503,11 +548,6 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as
## Waku store
proc mountStoreClient*(node: WakuNode, store: MessageStore = nil) =
info "mounting store client"
node.wakuStoreClient = WakuStoreClient.new(node.peerManager, node.rng, store)
const MessageStoreDefaultRetentionPolicyInterval* = 30.minutes
proc executeMessageRetentionPolicy*(node: WakuNode) =
@ -557,6 +597,12 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
proc mountStoreClient*(node: WakuNode, store: MessageStore = nil) =
info "mounting store client"
node.wakuStoreClient = WakuStoreClient.new(node.peerManager, node.rng, store)
proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
## Queries known nodes for historical messages
if node.wakuStoreClient.isNil():
@ -626,12 +672,6 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re
## Waku lightpush
proc mountLightPushClient*(node: WakuNode) =
info "mounting light push client"
node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng)
proc mountLightPush*(node: WakuNode) {.async.} =
info "mounting light push"
@ -654,6 +694,12 @@ proc mountLightPush*(node: WakuNode) {.async.} =
node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))
proc mountLightPushClient*(node: WakuNode) =
info "mounting light push client"
node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng)
proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage, peer: RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not.

View File

@ -3,11 +3,9 @@
import
./waku_filter/rpc,
./waku_filter/rpc_codec,
./waku_filter/protocol,
./waku_filter/client
./waku_filter/protocol
export
rpc,
rpc_codec,
protocol,
client
protocol

View File

@ -1,69 +1,207 @@
{.push raises: [Defect].}
import
std/[tables, sequtils],
chronicles
std/[options, tables, sequtils],
stew/results,
chronicles,
chronos,
metrics,
bearssl/rand,
libp2p/protocols/protocol as libp2p_protocol
import
../waku_message,
./rpc
type
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure, raises: [Defect].}
Filter* = object
pubSubTopic*: string
contentFilters*: seq[ContentFilter]
handler*: ContentFilterHandler
Filters* = Table[string, Filter]
../../node/peer_manager/peer_manager,
../../utils/requests,
./rpc,
./rpc_codec,
./protocol,
./protocol_metrics
proc init*(T: type Filters): T =
initTable[string, Filter]()
logScope:
topics = "wakufilter.client"
proc addContentFilters*(filters: var Filters, requestId: string, pubsubTopic: string, contentFilters: seq[ContentFilter], handler: ContentFilterHandler) {.gcsafe.}=
filters[requestId] = Filter(
pubSubTopic: pubsubTopic,
contentFilters: contentFilters,
handler: handler
const Defaultstring = "/waku/2/default-waku/proto"
### Client, filter subscripton manager
type FilterPushHandler* = proc(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.}
## Subscription manager
type SubscriptionManager = object
subscriptions: TableRef[(string, ContentTopic), FilterPushHandler]
proc init(T: type SubscriptionManager): T =
SubscriptionManager(subscriptions: newTable[(string, ContentTopic), FilterPushHandler]())
proc clear(m: var SubscriptionManager) =
m.subscriptions.clear()
proc registerSubscription(m: SubscriptionManager, pubsubTopic: string, contentTopic: ContentTopic, handler: FilterPushHandler) =
try:
m.subscriptions[(pubsubTopic, contentTopic)]= handler
except:
error "failed to register filter subscription", error=getCurrentExceptionMsg()
proc removeSubscription(m: SubscriptionManager, pubsubTopic: string, contentTopic: ContentTopic) =
m.subscriptions.del((pubsubTopic, contentTopic))
proc notifySubscriptionHandler(m: SubscriptionManager, pubsubTopic: string, contentTopic: ContentTopic, message: WakuMessage) =
if not m.subscriptions.hasKey((pubsubTopic, contentTopic)):
return
try:
let handler = m.subscriptions[(pubsubTopic, contentTopic)]
handler(pubsubTopic, message)
except:
discard
proc getSubscriptionsCount(m: SubscriptionManager): int =
m.subscriptions.len()
## Client
type MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
type WakuFilterClient* = ref object of LPProtocol
rng: ref rand.HmacDrbgContext
peerManager: PeerManager
subManager: SubscriptionManager
proc handleMessagePush(wf: WakuFilterClient, peerId: PeerId, requestId: string, rpc: MessagePush) =
for msg in rpc.messages:
let
pubsubTopic = Defaultstring # TODO: Extend the filter push rpc to provide the pubsub topic. This is a limitation
contentTopic = msg.contentTopic
wf.subManager.notifySubscriptionHandler(pubsubTopic, contentTopic, msg)
proc initProtocolHandler(wf: WakuFilterClient) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buffer = await conn.readLp(MaxRpcSize.int)
let decodeReqRes = FilterRPC.init(buffer)
if decodeReqRes.isErr():
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return
let rpc = decodeReqRes.get()
trace "filter message received"
if rpc.push == MessagePush():
waku_filter_errors.inc(labelValues = [emptyMessagePushFailure])
# TODO: Manage the empty push message error. Perform any action?
return
waku_filter_messages.inc(labelValues = ["MessagePush"])
let
peerId = conn.peerId
requestId = rpc.requestId
push = rpc.push
info "received filter message push", peerId=conn.peerId, requestId=requestId
wf.handleMessagePush(peerId, requestId, push)
wf.handler = handle
wf.codec = WakuFilterCodec
proc new*(T: type WakuFilterClient,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext): T =
let wf = WakuFilterClient(
peerManager: peerManager,
rng: rng,
subManager: SubscriptionManager.init()
)
wf.initProtocolHandler()
wf
proc sendFilterRpc(wf: WakuFilterClient, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}=
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
if connOpt.isNone():
return err(dialFailure)
let connection = connOpt.get()
await connection.writeLP(rpc.encode().buffer)
return ok()
proc sendFilterRequestRpc(wf: WakuFilterClient,
pubsubTopic: string,
contentTopics: seq[ContentTopic],
subscribe: bool,
peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} =
let requestId = generateRequestId(wf.rng)
let contentFilters = contentTopics.mapIt(ContentFilter(contentTopic: it))
let rpc = FilterRpc(
requestId: requestId,
request: FilterRequest(
subscribe: subscribe,
pubSubTopic: pubsubTopic,
contentFilters: contentFilters
)
)
proc removeContentFilters*(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} =
# Flatten all unsubscribe topics into single seq
let unsubscribeTopics = contentFilters.mapIt(it.contentTopic)
debug "unsubscribing", unsubscribeTopics=unsubscribeTopics
let sendRes = await wf.sendFilterRpc(rpc, peer)
if sendRes.isErr():
waku_filter_errors.inc(labelValues = [sendRes.error])
return err(sendRes.error)
return ok()
var rIdToRemove: seq[string] = @[]
for rId, f in filters.mpairs:
# Iterate filter entries to remove matching content topics
# make sure we delete the content filter
# if no more topics are left
f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics)
if f.contentFilters.len == 0:
rIdToRemove.add(rId)
proc subscribe*(wf: WakuFilterClient,
pubsubTopic: string,
contentTopic: ContentTopic|seq[ContentTopic],
handler: FilterPushHandler,
peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} =
var topics: seq[ContentTopic]
when contentTopic is seq[ContentTopic]:
topics = contentTopic
else:
topics = @[contentTopic]
# make sure we delete the filter entry
# if no more content filters left
for rId in rIdToRemove:
filters.del(rId)
debug "filters modified", filters=filters
let sendRes = await wf.sendFilterRequestRpc(pubsubTopic, topics, subscribe=true, peer=peer)
if sendRes.isErr():
return err(sendRes.error)
proc notify*(filters: Filters, msg: WakuMessage, requestId: string) =
for key, filter in filters.pairs:
# We do this because the key for the filter is set to the requestId received from the filter protocol.
# This means we do not need to check the content filter explicitly as all MessagePushs already contain
# the requestId of the coresponding filter.
if requestId != "" and requestId == key:
filter.handler(msg)
continue
for topic in topics:
wf.subManager.registerSubscription(pubsubTopic, topic, handler)
# TODO: In case of no topics we should either trigger here for all messages,
# or we should not allow such filter to exist in the first place.
for contentFilter in filter.contentFilters:
if msg.contentTopic == contentFilter.contentTopic:
filter.handler(msg)
break
return ok()
proc unsubscribe*(wf: WakuFilterClient,
pubsubTopic: string,
contentTopic: ContentTopic|seq[ContentTopic],
peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} =
var topics: seq[ContentTopic]
when contentTopic is seq[ContentTopic]:
topics = contentTopic
else:
topics = @[contentTopic]
let sendRes = await wf.sendFilterRequestRpc(pubsubTopic, topics, subscribe=false, peer=peer)
if sendRes.isErr():
return err(sendRes.error)
for topic in topics:
wf.subManager.removeSubscription(pubsubTopic, topic)
return ok()
proc clearSubscriptions*(wf: WakuFilterClient) =
wf.subManager.clear()
proc getSubscriptionsCount*(wf: WakuFilterClient): int =
wf.subManager.getSubscriptionsCount()

View File

@ -10,37 +10,26 @@ import
import
../waku_message,
../../node/peer_manager/peer_manager,
../../utils/requests,
./rpc,
./rpc_codec
./rpc_codec,
./protocol_metrics
declarePublicGauge waku_filter_peers, "number of filter peers"
declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers"
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
declarePublicGauge waku_filter_messages, "number of filter messages received", ["type"]
logScope:
topics = "wakufilter"
const
# We add a 64kB safety buffer for protocol overhead.
# 10x-multiplier also for safety: currently we never
# push more than 1 message at a time.
MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
WakuFilterTimeout: Duration = 2.hours
# Error types (metric label values)
const
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
peerNotFoundFailure = "peer_not_found_failure"
type WakuFilterResult*[T] = Result[T, string]
## Subscription manager
type Subscription = object
requestId: string
peer: PeerID
@ -68,108 +57,86 @@ proc removeSubscription(subscriptions: var seq[Subscription], peer: PeerId, unsu
subscriptions.keepItIf(it.contentTopics.len > 0)
## Protocol
type
MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
WakuFilterResult*[T] = Result[T, string]
WakuFilter* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
pushHandler*: MessagePushHandler
subscriptions*: seq[Subscription]
failedPeers*: Table[string, chronos.Moment]
timeout*: chronos.Duration
proc init(wf: WakuFilter) =
proc handleFilterRequest(wf: WakuFilter, peerId: PeerId, rpc: FilterRPC) =
let
requestId = rpc.requestId
subscribe = rpc.request.subscribe
pubsubTopic = rpc.request.pubsubTopic
contentTopics = rpc.request.contentFilters.mapIt(it.contentTopic)
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let message = await conn.readLp(MaxRpcSize.int)
if subscribe:
info "added filter subscritpiton", peerId=peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics
wf.subscriptions.addSubscription(peerId, requestId, pubsubTopic, contentTopics)
else:
info "removed filter subscritpiton", peerId=peerId, contentTopics=contentTopics
wf.subscriptions.removeSubscription(peerId, contentTopics)
let res = FilterRPC.init(message)
if res.isErr():
waku_filter_subscribers.set(wf.subscriptions.len.int64)
proc initProtocolHandler(wf: WakuFilter) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buffer = await conn.readLp(MaxRpcSize.int)
let decodeRpcRes = FilterRPC.init(buffer)
if decodeRpcRes.isErr():
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return
trace "filter message received"
let rpc = res.get()
let rpc = decodeRpcRes.get()
## Filter request
# We are receiving a subscription/unsubscription request
if rpc.request != FilterRequest():
waku_filter_messages.inc(labelValues = ["FilterRequest"])
# Subscription/unsubscription request
if rpc.request == FilterRequest():
waku_filter_errors.inc(labelValues = [emptyFilterRequestFailure])
# TODO: Manage the empty filter request message error. Perform any action?
return
let
requestId = rpc.requestId
subscribe = rpc.request.subscribe
pubsubTopic = rpc.request.pubsubTopic
contentTopics = rpc.request.contentFilters.mapIt(it.contentTopic)
if subscribe:
info "added filter subscritpiton", peerId=conn.peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics
wf.subscriptions.addSubscription(conn.peerId, requestId, pubsubTopic, contentTopics)
else:
info "removed filter subscritpiton", peerId=conn.peerId, contentTopics=contentTopics
wf.subscriptions.removeSubscription(conn.peerId, contentTopics)
waku_filter_subscribers.set(wf.subscriptions.len.int64)
waku_filter_messages.inc(labelValues = ["FilterRequest"])
wf.handleFilterRequest(conn.peerId, rpc)
## Push message
# We are receiving a messages from the peer that we subscribed to
if rpc.push != MessagePush():
waku_filter_messages.inc(labelValues = ["MessagePush"])
let
requestId = rpc.requestId
push = rpc.push
info "received filter message push", peerId=conn.peerId
await wf.pushHandler(requestId, push)
wf.handler = handle
wf.handler = handler
wf.codec = WakuFilterCodec
proc new*(T: type WakuFilter,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
timeout: Duration = WakuFilterTimeout): T =
let wf = WakuFilter(rng: rng,
peerManager: peerManager,
timeout: timeout)
wf.initProtocolHandler()
return wf
proc init*(T: type WakuFilter,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
handler: MessagePushHandler,
timeout: Duration = WakuFilterTimeout): T =
let wf = WakuFilter(rng: rng,
peerManager: peerManager,
pushHandler: handler,
timeout: timeout)
wf.init()
return wf
timeout: Duration = WakuFilterTimeout): T {.
deprecated: "WakuFilter.new()' instead".} =
WakuFilter.new(peerManager, rng, timeout)
proc setPeer*(wf: WakuFilter, peer: RemotePeerInfo) =
wf.peerManager.addPeer(peer, WakuFilterCodec)
waku_filter_peers.inc()
proc sendFilterRpcToPeer(wf: WakuFilter, rpc: FilterRPC, peer: PeerId): Future[WakuFilterResult[void]] {.async, gcsafe.}=
proc sendFilterRpc(wf: WakuFilter, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}=
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
if connOpt.isNone():
return err(dialFailure)
let connection = connOpt.get()
await connection.writeLP(rpc.encode().buffer)
return ok()
proc sendFilterRpcToRemotePeer(wf: WakuFilter, rpc: FilterRPC, peer: RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}=
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
if connOpt.isNone():
return err(dialFailure)
let connection = connOpt.get()
await connection.writeLP(rpc.encode().buffer)
return ok()
@ -199,6 +166,9 @@ proc handleClientError(wf: WakuFilter, subs: seq[Subscription]) {.raises: [Defec
proc handleMessage*(wf: WakuFilter, pubsubTopic: string, msg: WakuMessage) {.async.} =
trace "handling message", pubsubTopic, contentTopic=msg.contentTopic, subscriptions=wf.subscriptions.len
if wf.subscriptions.len <= 0:
return
@ -218,70 +188,14 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: string, msg: WakuMessage) {.asy
push: MessagePush(messages: @[msg])
)
let res = await wf.sendFilterRpcToPeer(rpc, sub.peer)
let res = await wf.sendFilterRpc(rpc, sub.peer)
if res.isErr():
waku_filter_errors.inc(labelValues = [res.error()])
failedSubscriptions.add(sub)
continue
connectedSubscriptions.add(sub)
wf.removePeerFromFailedPeersTable(connectedSubscriptions)
wf.handleClientError(failedSubscriptions)
### Send subscription/unsubscription
proc subscribe(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic], peer: RemotePeerInfo): Future[WakuFilterResult[string]] {.async, gcsafe.} =
let id = generateRequestId(wf.rng)
let rpc = FilterRPC(
requestId: id,
request: FilterRequest(
subscribe: true,
pubSubTopic: pubsubTopic,
contentFilters: contentTopics.mapIt(ContentFilter(contentTopic: it))
)
)
let res = await wf.sendFilterRpcToRemotePeer(rpc, peer)
if res.isErr():
waku_filter_errors.inc(labelValues = [res.error()])
return err(res.error())
return ok(id)
proc subscribe*(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic]): Future[WakuFilterResult[string]] {.async, gcsafe.} =
let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
waku_filter_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
return await wf.subscribe(pubsubTopic, contentTopics, peerOpt.get())
proc unsubscribe(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic], peer: RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.} =
let id = generateRequestId(wf.rng)
let rpc = FilterRPC(
requestId: id,
request: FilterRequest(
subscribe: false,
pubSubTopic: pubsubTopic,
contentFilters: contentTopics.mapIt(ContentFilter(contentTopic: it))
)
)
let res = await wf.sendFilterRpcToRemotePeer(rpc, peer)
if res.isErr():
waku_filter_errors.inc(labelValues = [res.error()])
return err(res.error())
return ok()
proc unsubscribe*(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic]): Future[WakuFilterResult[void]] {.async, gcsafe.} =
let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
waku_filter_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
return await wf.unsubscribe(pubsubTopic, contentTopics, peerOpt.get())

View File

@ -0,0 +1,19 @@
{.push raises: [Defect].}
import metrics
declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers"
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
declarePublicGauge waku_filter_messages, "number of filter messages received", ["type"]
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
# Error types (metric label values)
const
dialFailure* = "dial_failure"
decodeRpcFailure* = "decode_rpc_failure"
peerNotFoundFailure* = "peer_not_found_failure"
emptyMessagePushFailure* = "empty_message_push_failure"
emptyFilterRequestFailure* = "empty_filter_request_failure"

View File

@ -9,6 +9,11 @@ import
./rpc
# Multiply by 10 for safety. Currently we never push more than 1 message at a time
# We add a 64kB safety buffer for protocol overhead.
const MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024
proc encode*(filter: ContentFilter): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, filter.contentTopic)