chore: remove deprecated legacy filter protocol (#2507)

* chore: remove deprecated legacy filter protocol

* fix: do not use legacy import in test

* fix: remove legacy test references

* fix: more test fixes, starting filter client

* fix: sigh. more references to remove.

* fix: fix dereferencing error

* fix: fix merge mess up

* fix: sigh. merge tool used tabs.

* fix: more peer manager tests needed fixing

---------

Co-authored-by: Hanno Cornelius <hanno@status.im>
Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com>
This commit is contained in:
Alvaro Revuelta 2024-03-25 19:07:56 +01:00 committed by GitHub
parent b5e2edb724
commit e861317209
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 76 additions and 1649 deletions

View File

@ -37,7 +37,6 @@ import
../../waku/waku_core, ../../waku/waku_core,
../../waku/waku_lightpush/common, ../../waku/waku_lightpush/common,
../../waku/waku_lightpush/rpc, ../../waku/waku_lightpush/rpc,
../../waku/waku_filter,
../../waku/waku_enr, ../../waku/waku_enr,
../../waku/waku_store, ../../waku/waku_store,
../../waku/waku_dnsdisc, ../../waku/waku_dnsdisc,
@ -283,17 +282,6 @@ proc writeAndPrint(c: Chat) {.async.} =
c.nick = await readNick(c.transp) c.nick = await readNick(c.transp)
echo "You are now known as " & c.nick echo "You are now known as " & c.nick
elif line.startsWith("/exit"): elif line.startsWith("/exit"):
if not c.node.wakuFilterLegacy.isNil():
echo "unsubscribing from content filters..."
let peerOpt = c.node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isSome():
await c.node.legacyFilterUnsubscribe(
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = c.contentTopic,
peer = peerOpt.get(),
)
echo "quitting..." echo "quitting..."
try: try:
@ -514,9 +502,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
let peerInfo = parsePeerInfo(conf.filternode) let peerInfo = parsePeerInfo(conf.filternode)
if peerInfo.isOk(): if peerInfo.isOk():
await node.mountFilter() await node.mountFilter()
await node.mountLegacyFilter()
await node.mountFilterClient() await node.mountFilterClient()
node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec)
proc filterHandler( proc filterHandler(
pubsubTopic: PubsubTopic, msg: WakuMessage pubsubTopic: PubsubTopic, msg: WakuMessage
@ -524,14 +510,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
trace "Hit filter handler", contentTopic = msg.contentTopic trace "Hit filter handler", contentTopic = msg.contentTopic
chat.printReceivedMessage(msg) chat.printReceivedMessage(msg)
await node.legacyFilterSubscribe( # TODO: Here to support FilterV2 relevant subscription.
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = chat.contentTopic,
filterHandler,
peerInfo.value,
)
# TODO: Here to support FilterV2 relevant subscription, but still
# Legacy Filter is concurrent to V2 untill legacy filter will be removed
else: else:
error "Filter not mounted. Couldn't parse conf.filternode", error = peerInfo.error error "Filter not mounted. Couldn't parse conf.filternode", error = peerInfo.error

View File

@ -23,7 +23,6 @@ import
../../../waku/waku_core, ../../../waku/waku_core,
../../../waku/waku_node, ../../../waku/waku_node,
../../../waku/node/peer_manager, ../../../waku/node/peer_manager,
../../waku/waku_filter,
../../waku/waku_filter_v2, ../../waku/waku_filter_v2,
../../waku/waku_store, ../../waku/waku_store,
../../waku/factory/builder, ../../waku/factory/builder,
@ -294,7 +293,6 @@ when isMainModule:
if conf.filter: if conf.filter:
waitFor mountFilter(bridge.nodev2) waitFor mountFilter(bridge.nodev2)
waitFor mountLegacyFilter(bridge.nodev2)
if conf.staticnodes.len > 0: if conf.staticnodes.len > 0:
waitFor connectToNodes(bridge.nodev2, conf.staticnodes) waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
@ -309,7 +307,6 @@ when isMainModule:
if conf.filternode != "": if conf.filternode != "":
let filterPeer = parsePeerInfo(conf.filternode) let filterPeer = parsePeerInfo(conf.filternode)
if filterPeer.isOk(): if filterPeer.isOk():
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuLegacyFilterCodec)
bridge.nodev2.peerManager.addServicePeer( bridge.nodev2.peerManager.addServicePeer(
filterPeer.value, WakuFilterSubscribeCodec filterPeer.value, WakuFilterSubscribeCodec
) )

View File

@ -34,7 +34,6 @@ import
../../waku/waku_api/rest/server, ../../waku/waku_api/rest/server,
../../waku/waku_api/rest/debug/handlers as rest_debug_api, ../../waku/waku_api/rest/debug/handlers as rest_debug_api,
../../waku/waku_api/rest/relay/handlers as rest_relay_api, ../../waku/waku_api/rest/relay/handlers as rest_relay_api,
../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/waku_api/rest/filter/handlers as rest_filter_api, ../../waku/waku_api/rest/filter/handlers as rest_filter_api,
../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api, ../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
../../waku/waku_api/rest/store/handlers as rest_store_api, ../../waku/waku_api/rest/store/handlers as rest_store_api,
@ -48,7 +47,6 @@ import
../../waku/waku_rln_relay, ../../waku/waku_rln_relay,
../../waku/waku_store, ../../waku/waku_store,
../../waku/waku_lightpush/common, ../../waku/waku_lightpush/common,
../../waku/waku_filter,
../../waku/waku_filter_v2, ../../waku/waku_filter_v2,
../../waku/factory/node_factory, ../../waku/factory/node_factory,
../../waku/factory/internal_config, ../../waku/factory/internal_config,
@ -368,12 +366,8 @@ proc startRestServer(
"/relay endpoints are not available. Please check your configuration: --relay" "/relay endpoints are not available. Please check your configuration: --relay"
## Filter REST API ## Filter REST API
if conf.filternode != "" and app.node.wakuFilterClient != nil and if conf.filternode != "" and
app.node.wakuFilterClientLegacy != nil: app.node.wakuFilterClient != nil:
let legacyFilterCache = MessageCache.init()
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(
server.router, app.node, legacyFilterCache
)
let filterCache = MessageCache.init() let filterCache = MessageCache.init()

View File

@ -48,9 +48,6 @@ import
# Waku v2 tests # Waku v2 tests
./test_wakunode, ./test_wakunode,
./test_wakunode_lightpush, ./test_wakunode_lightpush,
# Waku Filter
./test_waku_filter_legacy,
./test_wakunode_filter_legacy,
./test_peer_store_extended, ./test_peer_store_extended,
./test_message_cache, ./test_message_cache,
./test_peer_manager, ./test_peer_manager,
@ -77,7 +74,6 @@ import
./wakunode_rest/test_rest_serdes, ./wakunode_rest/test_rest_serdes,
./wakunode_rest/test_rest_store, ./wakunode_rest/test_rest_store,
./wakunode_rest/test_rest_filter, ./wakunode_rest/test_rest_filter,
./wakunode_rest/test_rest_legacy_filter,
./wakunode_rest/test_rest_lightpush, ./wakunode_rest/test_rest_lightpush,
./wakunode_rest/test_rest_admin, ./wakunode_rest/test_rest_admin,
./wakunode_rest/test_rest_cors ./wakunode_rest/test_rest_cors

View File

@ -55,7 +55,6 @@ suite "Waku Filter - End to End":
await allFutures(server.start(), client.start()) await allFutures(server.start(), client.start())
await server.mountFilter() await server.mountFilter()
await server.mountLegacyFilter()
await client.mountFilterClient() await client.mountFilterClient()
client.wakuFilterClient.registerPushHandler(messagePushHandler) client.wakuFilterClient.registerPushHandler(messagePushHandler)

View File

@ -24,8 +24,8 @@ import
../../waku/waku_core, ../../waku/waku_core,
../../waku/waku_enr/capabilities, ../../waku/waku_enr/capabilities,
../../waku/waku_relay/protocol, ../../waku/waku_relay/protocol,
../../waku/waku_filter_v2/common,
../../waku/waku_store/common, ../../waku/waku_store/common,
../../waku/waku_filter/protocol,
../../waku/waku_lightpush/common, ../../waku/waku_lightpush/common,
../../waku/waku_peer_exchange, ../../waku/waku_peer_exchange,
../../waku/waku_metadata, ../../waku/waku_metadata,
@ -63,11 +63,10 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay())) await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter())) await allFutures(nodes.mapIt(it.mountFilter()))
await allFutures(nodes.mapIt(it.mountLegacyFilter()))
# Dial node2 from node1 # Dial node2 from node1
let conn = await nodes[0].peerManager.dialPeer( let conn = await nodes[0].peerManager.dialPeer(
nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterSubscribeCodec
) )
await sleepAsync(chronos.milliseconds(500)) await sleepAsync(chronos.milliseconds(500))
@ -107,13 +106,13 @@ procSuite "Peer Manager":
# Dial non-existent peer from node1 # Dial non-existent peer from node1
let conn1 = let conn1 =
await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuLegacyFilterCodec) await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuStoreCodec)
check: check:
conn1.isNone() conn1.isNone()
# Dial peer not supporting given protocol # Dial peer not supporting given protocol
let conn2 = await nodes[0].peerManager.dialPeer( let conn2 = await nodes[0].peerManager.dialPeer(
nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec nodes[1].peerInfo.toRemotePeerInfo(), WakuStoreCodec
) )
check: check:
conn2.isNone() conn2.isNone()
@ -134,20 +133,17 @@ procSuite "Peer Manager":
await node.start() await node.start()
await node.mountFilterClient()
node.mountStoreClient()
node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec) node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
node.peerManager.addServicePeer( node.peerManager.addServicePeer(
filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec filterPeer.toRemotePeerInfo(), WakuFilterSubscribeCodec
) )
# Check peers were successfully added to peer manager # Check peers were successfully added to peer manager
check: check:
node.peerManager.peerStore.peers().len == 2 node.peerManager.peerStore.peers().len == 2
node.peerManager.peerStore.peers(WakuLegacyFilterCodec).allIt( node.peerManager.peerStore.peers(WakuFilterSubscribeCodec).allIt(
it.peerId == filterPeer.peerId and it.addrs.contains(filterLoc) and it.peerId == filterPeer.peerId and it.addrs.contains(filterLoc) and
it.protocols.contains(WakuLegacyFilterCodec) it.protocols.contains(WakuFilterSubscribeCodec)
) )
node.peerManager.peerStore.peers(WakuStoreCodec).allIt( node.peerManager.peerStore.peers(WakuStoreCodec).allIt(
it.peerId == storePeer.peerId and it.addrs.contains(storeLoc) and it.peerId == storePeer.peerId and it.addrs.contains(storeLoc) and
@ -762,39 +758,36 @@ procSuite "Peer Manager":
let let
node = node =
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peers = toSeq(1 .. 5) peers = toSeq(1 .. 4)
.mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it)) .mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it))
.filterIt(it.isOk()) .filterIt(it.isOk())
.mapIt(it.value) .mapIt(it.value)
require: require:
peers.len == 5 peers.len == 4
# service peers # service peers
node.peerManager.addServicePeer(peers[0], WakuStoreCodec) node.peerManager.addServicePeer(peers[0], WakuStoreCodec)
node.peerManager.addServicePeer(peers[1], WakuLegacyFilterCodec) node.peerManager.addServicePeer(peers[1], WakuLightPushCodec)
node.peerManager.addServicePeer(peers[2], WakuLightPushCodec) node.peerManager.addServicePeer(peers[2], WakuPeerExchangeCodec)
node.peerManager.addServicePeer(peers[3], WakuPeerExchangeCodec)
# relay peers (should not be added) # relay peers (should not be added)
node.peerManager.addServicePeer(peers[4], WakuRelayCodec) node.peerManager.addServicePeer(peers[3], WakuRelayCodec)
# all peers are stored in the peerstore # all peers are stored in the peerstore
check: check:
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[0].peerId) node.peerManager.peerStore.peers().anyIt(it.peerId == peers[0].peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[1].peerId) node.peerManager.peerStore.peers().anyIt(it.peerId == peers[1].peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[2].peerId) node.peerManager.peerStore.peers().anyIt(it.peerId == peers[2].peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[3].peerId)
# but the relay peer is not # but the relay peer is not
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[4].peerId) == false node.peerManager.peerStore.peers().anyIt(it.peerId == peers[3].peerId) == false
# all service peers are added to its service slot # all service peers are added to its service slot
check: check:
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId
node.peerManager.serviceSlots[WakuLegacyFilterCodec].peerId == peers[1].peerId node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[2].peerId node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[2].peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[3].peerId
# but the relay peer is not # but the relay peer is not
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false
@ -809,24 +802,23 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay())) await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter())) await allFutures(nodes.mapIt(it.mountFilter()))
await allFutures(nodes.mapIt(it.mountLegacyFilter()))
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
# create some connections/streams # create some connections/streams
require: check:
# some relay connections # some relay connections
(await nodes[0].peerManager.connectRelay(pInfos[1])) == true (await nodes[0].peerManager.connectRelay(pInfos[1])) == true
(await nodes[0].peerManager.connectRelay(pInfos[2])) == true (await nodes[0].peerManager.connectRelay(pInfos[2])) == true
(await nodes[1].peerManager.connectRelay(pInfos[2])) == true (await nodes[1].peerManager.connectRelay(pInfos[2])) == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuLegacyFilterCodec)).isSome() == (await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterSubscribeCodec)).isSome() ==
true true
# isolated dial creates a relay conn under the hood (libp2p behaviour) # isolated dial creates a relay conn under the hood (libp2p behaviour)
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuLegacyFilterCodec)).isSome() == (await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterSubscribeCodec)).isSome() ==
true true
# assert physical connections # assert physical connections
@ -834,26 +826,26 @@ procSuite "Peer Manager":
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0 nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2 nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2
nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 0 nodes[0].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 2 nodes[0].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 2
nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1 nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1 nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1
nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1 nodes[1].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0 nodes[1].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 0
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2 nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1 nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1
nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1 nodes[2].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 1 nodes[2].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1 nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0 nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0
nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1 nodes[3].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0 nodes[3].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 0
asyncTest "getNumStreams() returns expected number of connections per protocol": asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes # Create 2 nodes
@ -865,28 +857,27 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay())) await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter())) await allFutures(nodes.mapIt(it.mountFilter()))
await allFutures(nodes.mapIt(it.mountLegacyFilter()))
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
require: require:
# multiple streams are multiplexed over a single connection. # multiple streams are multiplexed over a single connection.
# note that a relay connection is created under the hood when dialing a peer (libp2p behaviour) # note that a relay connection is created under the hood when dialing a peer (libp2p behaviour)
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true true
check: check:
nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1) nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[0].peerManager.getNumStreams(WakuLegacyFilterCodec) == (0, 4) nodes[0].peerManager.getNumStreams(WakuFilterSubscribeCodec) == (0, 4)
nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1) nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[1].peerManager.getNumStreams(WakuLegacyFilterCodec) == (4, 0) nodes[1].peerManager.getNumStreams(WakuFilterSubscribeCodec) == (4, 0)
test "selectPeer() returns the correct peer": test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit # Valid peer id missing the last digit
@ -909,7 +900,7 @@ procSuite "Peer Manager":
# Add a peer[0] to the peerstore # Add a peer[0] to the peerstore
pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs
pm.peerStore[ProtoBook][peers[0].peerId] = pm.peerStore[ProtoBook][peers[0].peerId] =
@[WakuRelayCodec, WakuStoreCodec, WakuLegacyFilterCodec] @[WakuRelayCodec, WakuStoreCodec, WakuFilterSubscribeCodec]
# When no service peers, we get one from the peerstore # When no service peers, we get one from the peerstore
let selectedPeer1 = pm.selectPeer(WakuStoreCodec) let selectedPeer1 = pm.selectPeer(WakuStoreCodec)
@ -918,7 +909,7 @@ procSuite "Peer Manager":
selectedPeer1.get().peerId == peers[0].peerId selectedPeer1.get().peerId == peers[0].peerId
# Same for other protocol # Same for other protocol
let selectedPeer2 = pm.selectPeer(WakuLegacyFilterCodec) let selectedPeer2 = pm.selectPeer(WakuFilterSubscribeCodec)
check: check:
selectedPeer2.isSome() == true selectedPeer2.isSome() == true
selectedPeer2.get().peerId == peers[0].peerId selectedPeer2.get().peerId == peers[0].peerId

View File

@ -1,300 +0,0 @@
{.used.}
import
std/[options, tables], testutils/unittests, chronicles, chronos, libp2p/crypto/crypto
import
../../waku/node/peer_manager,
../../waku/waku_core,
../../waku/waku_filter,
../../waku/waku_filter/client,
./testlib/common,
./testlib/wakucore
proc newTestWakuFilterNode(
switch: Switch, timeout: Duration = 2.hours
): Future[WakuFilterLegacy] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilterLegacy.new(peerManager, rng, timeout)
await proto.start()
switch.mount(proto)
return proto
proc newTestWakuFilterClient(switch: Switch): Future[WakuFilterClientLegacy] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilterClientLegacy.new(peerManager, rng)
await proto.start()
switch.mount(proto)
return proto
# TODO: Extend test coverage
suite "Waku Filter":
asyncTest "should forward messages to client after subscribed":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuFilterNode(serverSwitch)
client = await newTestWakuFilterClient(clientSwitch)
## Given
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
let pushHandlerFuture = newFuture[(string, WakuMessage)]()
proc pushHandler(
pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, gcsafe, closure.} =
pushHandlerFuture.complete((pubsubTopic, message))
let
pubsubTopic = DefaultPubsubTopic
contentTopic = "test-content-topic"
msg = fakeWakuMessage(contentTopic = contentTopic)
## When
require (
await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer = serverAddr)
).isOk()
# WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc
await sleepAsync(500.milliseconds)
await server.handleMessage(pubsubTopic, msg)
require await pushHandlerFuture.withTimeout(3.seconds)
## Then
let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read()
check:
pushedMsgPubsubTopic == pubsubTopic
pushedMsg == msg
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "should not forward messages to client after unsuscribed":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuFilterNode(serverSwitch)
client = await newTestWakuFilterClient(clientSwitch)
## Given
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
var pushHandlerFuture = newFuture[void]()
proc pushHandler(
pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, gcsafe, closure.} =
pushHandlerFuture.complete()
let
pubsubTopic = DefaultPubsubTopic
contentTopic = "test-content-topic"
msg = fakeWakuMessage(contentTopic = contentTopic)
## When
require (
await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer = serverAddr)
).isOk()
# WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc
await sleepAsync(500.milliseconds)
await server.handleMessage(pubsubTopic, msg)
require await pushHandlerFuture.withTimeout(1.seconds)
# Reset to test unsubscribe
pushHandlerFuture = newFuture[void]()
require (await client.unsubscribe(pubsubTopic, contentTopic, peer = serverAddr)).isOk()
# WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc
await sleepAsync(500.milliseconds)
await server.handleMessage(pubsubTopic, msg)
## Then
let handlerWasCalledAfterUnsubscription =
await pushHandlerFuture.withTimeout(1.seconds)
check:
not handlerWasCalledAfterUnsubscription
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "peer subscription should be dropped if connection fails for second time after the timeout has elapsed":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuFilterNode(serverSwitch, timeout = 200.milliseconds)
client = await newTestWakuFilterClient(clientSwitch)
## Given
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
var pushHandlerFuture = newFuture[void]()
proc pushHandler(
pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, gcsafe, closure.} =
pushHandlerFuture.complete()
let
pubsubTopic = DefaultPubsubTopic
contentTopic = "test-content-topic"
msg = fakeWakuMessage(contentTopic = contentTopic)
## When
require (
await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer = serverAddr)
).isOk()
# WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc
await sleepAsync(500.milliseconds)
await server.handleMessage(DefaultPubsubTopic, msg)
# Push handler should be called
require await pushHandlerFuture.withTimeout(1.seconds)
# Stop client node to test timeout unsubscription
await clientSwitch.stop()
await sleepAsync(500.milliseconds)
# First failure should not remove the subscription
await server.handleMessage(DefaultPubsubTopic, msg)
let
subscriptionsBeforeTimeout = server.subscriptions.len()
failedPeersBeforeTimeout = server.failedPeers.len()
# Wait for the configured peer connection timeout to elapse (200ms)
await sleepAsync(200.milliseconds)
# Second failure should remove the subscription
await server.handleMessage(DefaultPubsubTopic, msg)
let
subscriptionsAfterTimeout = server.subscriptions.len()
failedPeersAfterTimeout = server.failedPeers.len()
## Then
check:
subscriptionsBeforeTimeout == 1
failedPeersBeforeTimeout == 1
subscriptionsAfterTimeout == 0
failedPeersAfterTimeout == 0
## Cleanup
await serverSwitch.stop()
asyncTest "peer subscription should not be dropped if connection recovers before timeout elapses":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuFilterNode(serverSwitch, timeout = 200.milliseconds)
client = await newTestWakuFilterClient(clientSwitch)
## Given
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
var pushHandlerFuture = newFuture[void]()
proc pushHandler(
pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, gcsafe, closure.} =
pushHandlerFuture.complete()
let
pubsubTopic = DefaultPubsubTopic
contentTopic = "test-content-topic"
msg = fakeWakuMessage(contentTopic = contentTopic)
## When
require (
await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer = serverAddr)
).isOk()
# WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc
await sleepAsync(500.milliseconds)
await server.handleMessage(DefaultPubsubTopic, msg)
# Push handler should be called
require await pushHandlerFuture.withTimeout(1.seconds)
let
subscriptionsBeforeFailure = server.subscriptions.len()
failedPeersBeforeFailure = server.failedPeers.len()
# Stop switch to test unsubscribe
await clientSwitch.stop()
await sleepAsync(500.milliseconds)
# First failure should add to failure list
await server.handleMessage(DefaultPubsubTopic, msg)
pushHandlerFuture = newFuture[void]()
let
subscriptionsAfterFailure = server.subscriptions.len()
failedPeersAfterFailure = server.failedPeers.len()
await sleepAsync(100.milliseconds)
# Start switch with same key as before
let clientSwitch2 = newTestSwitch(
some(clientSwitch.peerInfo.privateKey), some(clientSwitch.peerInfo.addrs[0])
)
await clientSwitch2.start()
await client.start()
clientSwitch2.mount(client)
# If push succeeds after failure, the peer should removed from failed peers list
await server.handleMessage(DefaultPubsubTopic, msg)
let handlerShouldHaveBeenCalled = await pushHandlerFuture.withTimeout(1.seconds)
let
subscriptionsAfterSuccessfulConnection = server.subscriptions.len()
failedPeersAfterSuccessfulConnection = server.failedPeers.len()
## Then
check:
handlerShouldHaveBeenCalled
check:
subscriptionsBeforeFailure == 1
subscriptionsAfterFailure == 1
subscriptionsAfterSuccessfulConnection == 1
check:
failedPeersBeforeFailure == 0
failedPeersAfterFailure == 1
failedPeersAfterSuccessfulConnection == 0
## Cleanup
await allFutures(clientSwitch2.stop(), serverSwitch.stop())

View File

@ -1,67 +0,0 @@
{.used.}
import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto
import
../../waku/waku_core,
../../waku/node/peer_manager,
../../waku/waku_node,
./testlib/common,
./testlib/wakucore,
./testlib/wakunode
suite "WakuNode - Filter":
asyncTest "subscriber should receive the message handled by the publisher":
## Setup
let
serverKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
waitFor allFutures(server.start(), client.start())
waitFor server.mountFilter()
waitFor server.mountLegacyFilter()
waitFor client.mountFilterClient()
## Given
let serverPeerInfo = server.peerInfo.toRemotePeerInfo()
let
pubSubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
message = fakeWakuMessage(contentTopic = contentTopic)
var filterPushHandlerFut = newFuture[(PubsubTopic, WakuMessage)]()
proc filterPushHandler(
pubsubTopic: PubsubTopic, msg: WakuMessage
) {.async, gcsafe, closure.} =
filterPushHandlerFut.complete((pubsubTopic, msg))
## When
await client.legacyFilterSubscribe(
some(pubsubTopic), contentTopic, filterPushHandler, peer = serverPeerInfo
)
# Wait for subscription to take effect
waitFor sleepAsync(100.millis)
waitFor server.filterHandleMessage(pubSubTopic, message)
require waitFor filterPushHandlerFut.withTimeout(5.seconds)
## Then
check filterPushHandlerFut.completed()
let (filterPubsubTopic, filterMessage) = filterPushHandlerFut.read()
check:
filterPubsubTopic == pubsubTopic
filterMessage == message
## Cleanup
waitFor allFutures(client.stop(), server.stop())

View File

@ -10,8 +10,8 @@ import
libp2p/peerstore libp2p/peerstore
import import
../../../waku/[node/peer_manager, waku_core, waku_filter/rpc_codec], ../../../waku/[node/peer_manager, waku_core],
../../../waku/waku_filter_v2/[common, client, subscriptions, protocol], ../../../waku/waku_filter_v2/[common, client, subscriptions, protocol, rpc_codec],
../testlib/[wakucore, testasync, testutils, futures, sequtils], ../testlib/[wakucore, testasync, testutils, futures, sequtils],
./waku_filter_utils, ./waku_filter_utils,
../resources/payloads ../resources/payloads
@ -127,7 +127,7 @@ suite "Waku Filter - End to End":
asyncTest "Subscribing to an empty content topic": asyncTest "Subscribing to an empty content topic":
# When subscribing to an empty content topic # When subscribing to an empty content topic
let subscribeResponse = let subscribeResponse =
await wakuFilterClient.subscribe(serverRemotePeerInfo, pubsubTopic, @[]) await wakuFilterClient.subscribe(serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]())
# Then the subscription is not successful # Then the subscription is not successful
check: check:
@ -1839,7 +1839,7 @@ suite "Waku Filter - End to End":
# When unsubscribing from an empty content topic # When unsubscribing from an empty content topic
let unsubscribeResponse = let unsubscribeResponse =
await wakuFilterClient.unsubscribe(serverRemotePeerInfo, pubsubTopic, @[]) await wakuFilterClient.unsubscribe(serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]())
# Then the unsubscription is not successful # Then the unsubscription is not successful
check: check:
@ -2076,10 +2076,10 @@ suite "Waku Filter - End to End":
contentTopic = contentTopic, payload = getByteSequence(100 * 1024) contentTopic = contentTopic, payload = getByteSequence(100 * 1024)
) # 100KiB ) # 100KiB
msg4 = fakeWakuMessage( msg4 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(MaxRpcSize - 1024) contentTopic = contentTopic, payload = getByteSequence(MaxPushSize - 1024)
) # Max Size (Inclusive Limit) ) # Max Size (Inclusive Limit)
msg5 = fakeWakuMessage( msg5 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(MaxRpcSize) contentTopic = contentTopic, payload = getByteSequence(MaxPushSize)
) # Max Size (Exclusive Limit) ) # Max Size (Exclusive Limit)
# When sending the 1KiB message # When sending the 1KiB message
@ -2114,7 +2114,7 @@ suite "Waku Filter - End to End":
pushedMsgPubsubTopic3 == pubsubTopic pushedMsgPubsubTopic3 == pubsubTopic
pushedMsg3 == msg3 pushedMsg3 == msg3
# When sending the MaxRpcSize - 1024B message # When sending the MaxPushSize - 1024B message
pushHandlerFuture = newPushHandlerFuture() # Clear previous future pushHandlerFuture = newPushHandlerFuture() # Clear previous future
await wakuFilter.handleMessage(pubsubTopic, msg4) await wakuFilter.handleMessage(pubsubTopic, msg4)
@ -2125,7 +2125,7 @@ suite "Waku Filter - End to End":
pushedMsgPubsubTopic4 == pubsubTopic pushedMsgPubsubTopic4 == pubsubTopic
pushedMsg4 == msg4 pushedMsg4 == msg4
# When sending the MaxRpcSize message # When sending the MaxPushSize message
pushHandlerFuture = newPushHandlerFuture() # Clear previous future pushHandlerFuture = newPushHandlerFuture() # Clear previous future
await wakuFilter.handleMessage(pubsubTopic, msg5) await wakuFilter.handleMessage(pubsubTopic, msg5)

View File

@ -17,11 +17,13 @@ import
../../../waku/common/paging, ../../../waku/common/paging,
../../../waku/waku_core, ../../../waku/waku_core,
../../../waku/waku_core/message/digest, ../../../waku/waku_core/message/digest,
../../../waku/waku_core/subscription,
../../../waku/node/peer_manager, ../../../waku/node/peer_manager,
../../../waku/waku_archive, ../../../waku/waku_archive,
../../../waku/waku_archive/driver/sqlite_driver, ../../../waku/waku_archive/driver/sqlite_driver,
../../../waku/waku_filter_v2,
../../../waku/waku_filter_v2/client,
../../../waku/waku_store, ../../../waku/waku_store,
../../../waku/waku_filter,
../../../waku/waku_node, ../../../waku/waku_node,
../waku_store/store_utils, ../waku_store/store_utils,
../waku_archive/archive_utils, ../waku_archive/archive_utils,
@ -217,7 +219,6 @@ procSuite "WakuNode - Store":
waitFor allFutures(client.start(), server.start(), filterSource.start()) waitFor allFutures(client.start(), server.start(), filterSource.start())
waitFor filterSource.mountFilter() waitFor filterSource.mountFilter()
waitFor filterSource.mountLegacyFilter()
let driver = newSqliteArchiveDriver() let driver = newSqliteArchiveDriver()
let mountArchiveRes = server.mountArchive(driver) let mountArchiveRes = server.mountArchive(driver)
@ -238,19 +239,19 @@ procSuite "WakuNode - Store":
proc filterHandler( proc filterHandler(
pubsubTopic: PubsubTopic, msg: WakuMessage pubsubTopic: PubsubTopic, msg: WakuMessage
) {.async, gcsafe, closure.} = ) {.async, gcsafe, closure.} =
await server.wakuArchive.handleMessage(pubsubTopic, msg)
filterFut.complete((pubsubTopic, msg)) filterFut.complete((pubsubTopic, msg))
waitFor server.legacyFilterSubscribe( server.wakuFilterClient.registerPushHandler(filterHandler)
let resp = waitFor server.filterSubscribe(
some(DefaultPubsubTopic), some(DefaultPubsubTopic),
DefaultContentTopic, DefaultContentTopic,
filterHandler,
peer = filterSourcePeer, peer = filterSourcePeer,
) )
waitFor sleepAsync(100.millis) waitFor sleepAsync(100.millis)
# Send filter push message to server from source node waitFor filterSource.wakuFilter.handleMessage(DefaultPubsubTopic, message)
waitFor filterSource.wakuFilterLegacy.handleMessage(DefaultPubsubTopic, message)
# Wait for the server filter to receive the push message # Wait for the server filter to receive the push message
require waitFor filterFut.withTimeout(5.seconds) require waitFor filterFut.withTimeout(5.seconds)

View File

@ -5,7 +5,6 @@ import
./test_rest_debug, ./test_rest_debug,
./test_rest_filter, ./test_rest_filter,
./test_rest_health, ./test_rest_health,
./test_rest_legacy_filter,
./test_rest_relay_serdes, ./test_rest_relay_serdes,
./test_rest_relay, ./test_rest_relay,
./test_rest_serdes, ./test_rest_serdes,

View File

@ -1,194 +0,0 @@
{.used.}
import
std/sequtils,
stew/byteutils,
stew/shims/net,
testutils/unittests,
presto,
presto/client as presto_client,
libp2p/crypto/crypto
import
../../waku/waku_api/message_cache,
../../waku/common/base64,
../../waku/waku_core,
../../waku/waku_node,
../../waku/node/peer_manager,
../../waku/waku_filter,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/client,
../../waku/waku_api/rest/responses,
../../waku/waku_api/rest/filter/types,
../../waku/waku_api/rest/filter/legacy_handlers as filter_api,
../../waku/waku_api/rest/filter/legacy_client as filter_api_client,
../../waku/waku_relay,
../testlib/wakucore,
../testlib/wakunode
proc testWakuNode(): WakuNode =
let
privkey = generateSecp256k1Key()
bindIp = parseIpAddress("0.0.0.0")
extIp = parseIpAddress("127.0.0.1")
port = Port(0)
return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))
type RestFilterTest = object
filterNode: WakuNode
clientNode: WakuNode
restServer: WakuRestServerRef
messageCache: MessageCache
client: RestClientRef
proc setupRestFilter(): Future[RestFilterTest] {.async.} =
result.filterNode = testWakuNode()
result.clientNode = testWakuNode()
await allFutures(result.filterNode.start(), result.clientNode.start())
await result.filterNode.mountFilter()
await result.filterNode.mountLegacyFilter()
await result.clientNode.mountFilterClient()
result.clientNode.peerManager.addServicePeer(
result.filterNode.peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec
)
let restPort = Port(58011)
let restAddress = parseIpAddress("0.0.0.0")
result.restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
result.messageCache = MessageCache.init()
installLegacyFilterRestApiHandlers(
result.restServer.router, result.clientNode, result.messageCache
)
result.restServer.start()
result.client = newRestHttpClient(initTAddress(restAddress, restPort))
return result
proc shutdown(self: RestFilterTest) {.async.} =
await self.restServer.stop()
await self.restServer.closeWait()
await allFutures(self.filterNode.stop(), self.clientNode.stop())
suite "Waku v2 Rest API - Filter":
asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions":
# Given
let restFilterTest: RestFilterTest = await setupRestFilter()
# When
let contentFilters =
@[DefaultContentTopic, ContentTopic("2"), ContentTopic("3"), ContentTopic("4")]
let requestBody = FilterLegacySubscribeRequest(
contentFilters: contentFilters, pubsubTopic: some(DefaultPubsubTopic)
)
let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
restFilterTest.messageCache.isContentSubscribed(DefaultContentTopic)
restFilterTest.messageCache.isContentSubscribed("2")
restFilterTest.messageCache.isContentSubscribed("3")
restFilterTest.messageCache.isContentSubscribed("4")
# When - error case
let badRequestBody =
FilterLegacySubscribeRequest(contentFilters: @[], pubsubTopic: none(string))
let badResponse =
await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody)
check:
badResponse.status == 400
$badResponse.contentType == $MIMETYPE_TEXT
badResponse.data ==
"Invalid content body, could not decode. Unable to deserialize data"
await restFilterTest.shutdown()
asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions":
# Given
let restFilterTest: RestFilterTest = await setupRestFilter()
# When
restFilterTest.messageCache.contentSubscribe("1")
restFilterTest.messageCache.contentSubscribe("2")
restFilterTest.messageCache.contentSubscribe("3")
restFilterTest.messageCache.contentSubscribe("4")
let contentFilters =
@[
ContentTopic("1"),
ContentTopic("2"),
ContentTopic("3"), # ,ContentTopic("4") # Keep this subscription for check
]
# When
let requestBody = FilterLegacySubscribeRequest(
contentFilters: contentFilters, pubsubTopic: some(DefaultPubsubTopic)
)
let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
not restFilterTest.messageCache.isContentSubscribed("1")
not restFilterTest.messageCache.isContentSubscribed("2")
not restFilterTest.messageCache.isContentSubscribed("3")
restFilterTest.messageCache.isContentSubscribed("4")
await restFilterTest.shutdown()
asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}":
# Given
let restFilterTest = await setupRestFilter()
let pubSubTopic = "/waku/2/default-waku/proto"
let contentTopic = ContentTopic("content-topic-x")
var messages =
@[fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))]
# Prevent duplicate messages
for i in 0 ..< 2:
var msg =
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
while msg == messages[i]:
msg =
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
messages.add(msg)
restFilterTest.messageCache.contentSubscribe(contentTopic)
for msg in messages:
restFilterTest.messageCache.addMessage(pubSubTopic, msg)
# When
let response = await restFilterTest.client.filterGetMessagesV1(contentTopic)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.len == 3
response.data.all do(msg: FilterWakuMessage) -> bool:
msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get().string == "content-topic-x" and msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0)
await restFilterTest.shutdown()

View File

@ -19,7 +19,6 @@ import
../waku_dnsdisc, ../waku_dnsdisc,
../waku_archive, ../waku_archive,
../waku_store, ../waku_store,
../waku_filter,
../waku_filter_v2, ../waku_filter_v2,
../waku_peer_exchange, ../waku_peer_exchange,
../node/peer_manager, ../node/peer_manager,
@ -277,12 +276,6 @@ proc setupProtocols(
# Filter setup. NOTE Must be mounted after relay # Filter setup. NOTE Must be mounted after relay
if conf.filter: if conf.filter:
try:
await mountLegacyFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
except CatchableError:
return
err("failed to mount waku legacy filter protocol: " & getCurrentExceptionMsg())
try: try:
await mountFilter( await mountFilter(
node, node,
@ -298,7 +291,6 @@ proc setupProtocols(
if filterNode.isOk(): if filterNode.isOk():
try: try:
await node.mountFilterClient() await node.mountFilterClient()
node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec)
node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec) node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec)
except CatchableError: except CatchableError:
return err( return err(

View File

@ -5,7 +5,6 @@ else:
import chronicles, chronos, metrics, metrics/chronos_httpserver import chronicles, chronos, metrics, metrics/chronos_httpserver
import import
../waku_filter/protocol_metrics as filter_metrics,
../waku_rln_relay/protocol_metrics as rln_metrics, ../waku_rln_relay/protocol_metrics as rln_metrics,
../utils/collector, ../utils/collector,
./peer_manager, ./peer_manager,
@ -39,7 +38,6 @@ proc startMetricsLog*() =
let pxPeers = collectorAsF64(waku_px_peers) let pxPeers = collectorAsF64(waku_px_peers)
let lightpushPeers = collectorAsF64(waku_lightpush_peers) let lightpushPeers = collectorAsF64(waku_lightpush_peers)
let filterPeers = collectorAsF64(waku_filter_peers) let filterPeers = collectorAsF64(waku_filter_peers)
let filterSubscribers = collectorAsF64(waku_legacy_filter_subscribers)
info "Total connections initiated", count = $freshConnCount info "Total connections initiated", count = $freshConnCount
info "Total messages", count = totalMessages info "Total messages", count = totalMessages
@ -47,7 +45,6 @@ proc startMetricsLog*() =
info "Total peer exchange peers", count = pxPeers info "Total peer exchange peers", count = pxPeers
info "Total lightpush peers", count = lightpushPeers info "Total lightpush peers", count = lightpushPeers
info "Total filter peers", count = filterPeers info "Total filter peers", count = filterPeers
info "Total active filter subscriptions", count = filterSubscribers
info "Total errors", count = $freshErrorCount info "Total errors", count = $freshErrorCount
# Start protocol specific metrics logging # Start protocol specific metrics logging

View File

@ -33,10 +33,6 @@ import
../waku_archive, ../waku_archive,
../waku_store, ../waku_store,
../waku_store/client as store_client, ../waku_store/client as store_client,
../waku_filter as legacy_filter,
#TODO: support for legacy filter protocol will be removed
../waku_filter/client as legacy_filter_client,
#TODO: support for legacy filter protocol will be removed
../waku_filter_v2, ../waku_filter_v2,
../waku_filter_v2/client as filter_client, ../waku_filter_v2/client as filter_client,
../waku_filter_v2/subscriptions as filter_subscriptions, ../waku_filter_v2/subscriptions as filter_subscriptions,
@ -94,10 +90,6 @@ type
wakuStoreClient*: WakuStoreClient wakuStoreClient*: WakuStoreClient
wakuFilter*: waku_filter_v2.WakuFilter wakuFilter*: waku_filter_v2.WakuFilter
wakuFilterClient*: filter_client.WakuFilterClient wakuFilterClient*: filter_client.WakuFilterClient
wakuFilterLegacy*: legacy_filter.WakuFilterLegacy
#TODO: support for legacy filter protocol will be removed
wakuFilterClientLegacy*: legacy_filter_client.WakuFilterClientLegacy
#TODO: support for legacy filter protocol will be removed
wakuRlnRelay*: WakuRLNRelay wakuRlnRelay*: WakuRLNRelay
wakuLightPush*: WakuLightPush wakuLightPush*: WakuLightPush
wakuLightpushClient*: WakuLightPushClient wakuLightpushClient*: WakuLightPushClient
@ -245,12 +237,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
await node.wakuFilter.handleMessage(topic, msg) await node.wakuFilter.handleMessage(topic, msg)
##TODO: Support for legacy filter will be removed
if node.wakuFilterLegacy.isNil():
return
await node.wakuFilterLegacy.handleMessage(topic, msg)
proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuArchive.isNil(): if node.wakuArchive.isNil():
return return
@ -436,21 +422,6 @@ proc mountRelay*(
## Waku filter ## Waku filter
proc mountLegacyFilter*(
node: WakuNode, filterTimeout: Duration = WakuLegacyFilterTimeout
) {.async, raises: [Defect, LPError].} =
## Mounting legacy filter protocol with separation from new v2 filter protocol for easier removal later
## TODO: remove legacy filter protocol
info "mounting legacy filter protocol"
node.wakuFilterLegacy =
WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout)
if node.started:
await node.wakuFilterLegacy.start() #TODO: remove legacy
node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuLegacyFilterCodec))
proc mountFilter*( proc mountFilter*(
node: WakuNode, node: WakuNode,
subscriptionTimeout: Duration = subscriptionTimeout: Duration =
@ -473,112 +444,25 @@ proc mountFilter*(
proc filterHandleMessage*( proc filterHandleMessage*(
node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage
) {.async.} = ) {.async.} =
if node.wakuFilter.isNil() or node.wakuFilterLegacy.isNil(): if node.wakuFilter.isNil():
error "cannot handle filter message", error "cannot handle filter message",
error = "waku filter and waku filter legacy are both required" error = "waku filter is required"
return return
await allFutures( await node.wakuFilter.handleMessage(pubsubTopic, message)
node.wakuFilter.handleMessage(pubsubTopic, message),
node.wakuFilterLegacy.handleMessage(pubsubTopic, message), #TODO: remove legacy
)
proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} = proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} =
## Mounting both filter clients v1 - legacy and v2. ## Mounting both filter
## Giving option for application level to choose btw own push message handling or ## Giving option for application level to choose btw own push message handling or
## rely on node provided cache. - This only applies for v2 filter client ## rely on node provided cache. - This only applies for v2 filter client
info "mounting filter client" info "mounting filter client"
node.wakuFilterClientLegacy = WakuFilterClientLegacy.new(node.peerManager, node.rng)
node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng) node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng)
if node.started: if node.started:
await allFutures(node.wakuFilterClientLegacy.start(), node.wakuFilterClient.start()) await node.wakuFilterClient.start()
node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec)) node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec))
node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuLegacyFilterCodec))
proc legacyFilterSubscribe*(
node: WakuNode,
pubsubTopic: Option[PubsubTopic],
contentTopics: ContentTopic | seq[ContentTopic],
handler: FilterPushHandler,
peer: RemotePeerInfo | string,
) {.async, gcsafe, raises: [Defect, ValueError].} =
## Registers for messages that match a specific filter.
## Triggers the handler whenever a message is received.
if node.wakuFilterClientLegacy.isNil():
error "cannot register filter subscription to topic",
error = "waku legacy filter client is not set up"
return
let remotePeerRes = parsePeerInfo(peer)
if remotePeerRes.isErr():
error "Couldn't parse the peer info properly", error = remotePeerRes.error
return
let remotePeer = remotePeerRes.value
# Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled
# TODO: Move this logic to wakunode2 app
# FIXME: This part needs refactoring. It seems possible that in special cases archiver will store same message multiple times.
let handlerWrapper: FilterPushHandler =
if node.wakuRelay.isNil() and not node.wakuStore.isNil():
proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} =
await allFutures(
node.wakuArchive.handleMessage(pubSubTopic, message),
handler(pubsubTopic, message),
)
else:
handler
if pubsubTopic.isSome():
info "registering legacy filter subscription to content",
pubsubTopic = pubsubTopic.get(),
contentTopics = contentTopics,
peer = remotePeer.peerId
let res = await node.wakuFilterClientLegacy.subscribe(
pubsubTopic.get(), contentTopics, handlerWrapper, peer = remotePeer
)
if res.isOk():
info "subscribed to topic",
pubsubTopic = pubsubTopic.get(), contentTopics = contentTopics
else:
error "failed legacy filter subscription", error = res.error
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
else:
let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, contentTopics)
let topicMap =
if topicMapRes.isErr():
error "can't get shard", error = topicMapRes.error
return
else:
topicMapRes.get()
var futures = collect(newSeq):
for pubsub, topics in topicMap.pairs:
info "registering legacy filter subscription to content",
pubsubTopic = pubsub, contentTopics = topics, peer = remotePeer.peerId
let content = topics.mapIt($it)
node.wakuFilterClientLegacy.subscribe(
$pubsub, content, handlerWrapper, peer = remotePeer
)
let finished = await allFinished(futures)
for fut in finished:
let res = fut.read()
if res.isErr():
error "failed legacy filter subscription", error = res.error
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
for pubsub, topics in topicMap.pairs:
info "subscribed to topic", pubsubTopic = pubsub, contentTopics = topics
proc filterSubscribe*( proc filterSubscribe*(
node: WakuNode, node: WakuNode,
@ -658,81 +542,13 @@ proc filterSubscribe*(
# return the last error or ok # return the last error or ok
return subRes return subRes
proc legacyFilterUnsubscribe*(
node: WakuNode,
pubsubTopic: Option[PubsubTopic],
contentTopics: ContentTopic | seq[ContentTopic],
peer: RemotePeerInfo | string,
) {.async, gcsafe, raises: [Defect, ValueError].} =
## Unsubscribe from a content legacy filter.
if node.wakuFilterClientLegacy.isNil():
error "cannot unregister filter subscription to content",
error = "waku filter client is nil"
return
let remotePeerRes = parsePeerInfo(peer)
if remotePeerRes.isErr():
error "couldn't parse remotePeerInfo", error = remotePeerRes.error
return
let remotePeer = remotePeerRes.value
if pubsubTopic.isSome():
info "deregistering legacy filter subscription to content",
pubsubTopic = pubsubTopic.get(),
contentTopics = contentTopics,
peer = remotePeer.peerId
let res = await node.wakuFilterClientLegacy.unsubscribe(
pubsubTopic.get(), contentTopics, peer = remotePeer
)
if res.isOk():
info "unsubscribed from topic",
pubsubTopic = pubsubTopic.get(), contentTopics = contentTopics
else:
error "failed filter unsubscription", error = res.error
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
else:
let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, contentTopics)
let topicMap =
if topicMapRes.isErr():
error "can't get shard", error = topicMapRes.error
return
else:
topicMapRes.get()
var futures = collect(newSeq):
for pubsub, topics in topicMap.pairs:
info "deregistering filter subscription to content",
pubsubTopic = pubsub, contentTopics = topics, peer = remotePeer.peerId
let content = topics.mapIt($it)
node.wakuFilterClientLegacy.unsubscribe($pubsub, content, peer = remotePeer)
let finished = await allFinished(futures)
for fut in finished:
let res = fut.read()
if res.isErr():
error "failed filter unsubscription", error = res.error
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
for pubsub, topics in topicMap.pairs:
info "unsubscribed from topic", pubsubTopic = pubsub, contentTopics = topics
proc filterUnsubscribe*( proc filterUnsubscribe*(
node: WakuNode, node: WakuNode,
pubsubTopic: Option[PubsubTopic], pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic], contentTopics: ContentTopic|seq[ContentTopic],
peer: RemotePeerInfo | string, peer: RemotePeerInfo | string,
): Future[FilterSubscribeResult] {.async, gcsafe, raises: [Defect, ValueError].} = ): Future[FilterSubscribeResult] {.async, gcsafe, raises: [Defect, ValueError].} =
## Unsubscribe from a content filter V2". ## Unsubscribe from a content filter V2".
if node.wakuFilterClientLegacy.isNil():
error "cannot unregister filter subscription to content",
error = "waku filter client is nil"
return err(FilterSubscribeError.serviceUnavailable())
let remotePeerRes = parsePeerInfo(peer) let remotePeerRes = parsePeerInfo(peer)
if remotePeerRes.isErr(): if remotePeerRes.isErr():
@ -802,10 +618,6 @@ proc filterUnsubscribeAll*(
node: WakuNode, peer: RemotePeerInfo | string node: WakuNode, peer: RemotePeerInfo | string
): Future[FilterSubscribeResult] {.async, gcsafe, raises: [Defect, ValueError].} = ): Future[FilterSubscribeResult] {.async, gcsafe, raises: [Defect, ValueError].} =
## Unsubscribe from a content filter V2". ## Unsubscribe from a content filter V2".
if node.wakuFilterClientLegacy.isNil():
error "cannot unregister filter subscription to content",
error = "waku filter client is nil"
return err(FilterSubscribeError.serviceUnavailable())
let remotePeerRes = parsePeerInfo(peer) let remotePeerRes = parsePeerInfo(peer)
if remotePeerRes.isErr(): if remotePeerRes.isErr():

View File

@ -14,7 +14,6 @@ import
import import
../../../waku_core, ../../../waku_core,
../../../waku_store, ../../../waku_store,
../../../waku_filter,
../../../waku_filter_v2, ../../../waku_filter_v2,
../../../waku_lightpush/common, ../../../waku_lightpush/common,
../../../waku_relay, ../../../waku_relay,
@ -54,17 +53,6 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
) )
tuplesToWakuPeers(peers, relayPeers) tuplesToWakuPeers(peers, relayPeers)
if not node.wakuFilterLegacy.isNil():
# Map WakuFilter peers to WakuPeers and add to return list
let filterPeers = node.peerManager.peerStore.peers(WakuLegacyFilterCodec).mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuLegacyFilterCodec,
connected: it.connectedness == Connectedness.Connected,
)
)
tuplesToWakuPeers(peers, filterPeers)
if not node.wakuFilter.isNil(): if not node.wakuFilter.isNil():
# Map WakuFilter peers to WakuPeers and add to return list # Map WakuFilter peers to WakuPeers and add to return list
let filterV2Peers = node.peerManager.peerStore let filterV2Peers = node.peerManager.peerStore

View File

@ -16,7 +16,6 @@ import
../../../waku_core, ../../../waku_core,
../../../waku_node, ../../../waku_node,
../../../node/peer_manager, ../../../node/peer_manager,
../../../waku_filter,
../../../waku_filter_v2, ../../../waku_filter_v2,
../../../waku_filter_v2/client as filter_protocol_client, ../../../waku_filter_v2/client as filter_protocol_client,
../../../waku_filter_v2/common as filter_protocol_type, ../../../waku_filter_v2/common as filter_protocol_type,

View File

@ -1,44 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/sets,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import ../../../waku_core, ../serdes, ../responses, ../rest_serdes, ./types
export types
logScope:
topics = "waku node rest client v1"
proc encodeBytes*(
value: FilterLegacySubscribeRequest, contentType: string
): RestResult[seq[byte]] =
return encodeBytesOf(value, contentType)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterPostSubscriptionsV1*(
body: FilterLegacySubscribeRequest
): RestResponse[string] {.
rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost
.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterDeleteSubscriptionsV1*(
body: FilterLegacySubscribeRequest
): RestResponse[string] {.
rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete
.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterGetMessagesV1*(
contentTopic: string
): RestResponse[FilterGetMessagesResponse] {.
rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet
.}

View File

@ -1,172 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/sequtils,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/route,
presto/common
import
../../../waku_core,
../../../waku_filter,
../../../waku_filter/client,
../../../node/peer_manager,
../../../waku_node,
../../message_cache,
../serdes,
../responses,
../rest_serdes,
./types
export types
logScope:
topics = "waku node rest filter_api v1"
const futTimeoutForSubscriptionProcessing* = 5.seconds
#### Request handlers
const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions"
func decodeRequestBody[T](
contentBody: Option[ContentBody]
): Result[T, RestApiResponse] =
if contentBody.isNone():
return err(RestApiResponse.badRequest("Missing content body"))
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return
err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json"))
let reqBodyData = contentBody.get().data
let requestResult = decodeFromJsonBytes(T, reqBodyData)
if requestResult.isErr():
return err(
RestApiResponse.badRequest(
"Invalid content body, could not decode. " & $requestResult.error
)
)
return ok(requestResult.get())
proc installFilterV1PostSubscriptionsV1Handler*(
router: var RestRouter, node: WakuNode, cache: MessageCache
) =
let pushHandler: FilterPushHandler = proc(
pubsubTopic: PubsubTopic, msg: WakuMessage
) {.async, gcsafe, closure.} =
cache.addMessage(pubsubTopic, msg)
router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do(
contentBody: Option[ContentBody]
) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a pubsubTopic
debug "post", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody
let decodedBody = decodeRequestBody[FilterLegacySubscribeRequest](contentBody)
if decodedBody.isErr():
return decodedBody.error
let req: FilterLegacySubscribeRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isNone():
return RestApiResponse.internalServerError("No suitable remote filter peers")
let subFut = node.legacyFilterSubscribe(
req.pubsubTopic, req.contentFilters, pushHandler, peerOpt.get()
)
if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to subscribe to contentFilters do to timeout!"
return
RestApiResponse.internalServerError("Failed to subscribe to contentFilters")
# Successfully subscribed to all content filters
for cTopic in req.contentFilters:
cache.contentSubscribe(cTopic)
return RestApiResponse.ok()
proc installFilterV1DeleteSubscriptionsV1Handler*(
router: var RestRouter, node: WakuNode, cache: MessageCache
) =
router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do(
contentBody: Option[ContentBody]
) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
debug "delete", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody
let decodedBody = decodeRequestBody[FilterLegacySubscribeRequest](contentBody)
if decodedBody.isErr():
return decodedBody.error
let req: FilterLegacySubscribeRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isNone():
return RestApiResponse.internalServerError("No suitable remote filter peers")
let unsubFut =
node.legacyFilterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get())
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return
RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters")
for cTopic in req.contentFilters:
cache.contentUnsubscribe(cTopic)
# Successfully unsubscribed from all requested contentTopics
return RestApiResponse.ok()
const ROUTE_FILTER_MESSAGESV1* = "/filter/v1/messages/{contentTopic}"
proc installFilterV1GetMessagesV1Handler*(
router: var RestRouter, node: WakuNode, cache: MessageCache
) =
router.api(MethodGet, ROUTE_FILTER_MESSAGESV1) do(
contentTopic: string
) -> RestApiResponse:
## Returns all WakuMessages received on a specified content topic since the
## last time this method was called
## TODO: ability to specify a return message limit
debug "get", ROUTE_FILTER_MESSAGESV1, contentTopic = contentTopic
if contentTopic.isErr():
return RestApiResponse.badRequest("Missing contentTopic")
let contentTopic = contentTopic.get()
let msgRes = cache.getAutoMessages(contentTopic, clear = true)
if msgRes.isErr():
return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic)
let data = FilterGetMessagesResponse(msgRes.get().map(toFilterWakuMessage))
let resp = RestApiResponse.jsonResponse(data, status = Http200)
if resp.isErr():
error "An error ocurred while building the json respose: ", error = resp.error
return RestApiResponse.internalServerError(
"An error ocurred while building the json respose"
)
return resp.get()
proc installLegacyFilterRestApiHandlers*(
router: var RestRouter, node: WakuNode, cache: MessageCache
) =
installFilterV1PostSubscriptionsV1Handler(router, node, cache)
installFilterV1DeleteSubscriptionsV1Handler(router, node, cache)
installFilterV1GetMessagesV1Handler(router, node, cache)

View File

@ -1,8 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import ./waku_filter/protocol
export protocol

View File

@ -1,3 +0,0 @@
# Waku Filter protocol
The filter protocol implements bandwidth preserving filtering for light nodes. See https://rfc.vac.dev/spec/12/ for more information.

View File

@ -1,174 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, tables, sequtils],
stew/results,
chronicles,
chronos,
metrics,
bearssl/rand,
libp2p/protocols/protocol as libp2p_protocol
import
../waku_core,
../node/peer_manager,
../utils/requests,
./rpc,
./rpc_codec,
./protocol,
./protocol_metrics
logScope:
topics = "waku filter client"
const Defaultstring = "/waku/2/default-waku/proto"
## Client
type WakuFilterClientLegacy* = ref object of LPProtocol
rng: ref rand.HmacDrbgContext
peerManager: PeerManager
subManager: SubscriptionManager
proc handleMessagePush(
wf: WakuFilterClientLegacy, peerId: PeerId, requestId: string, rpc: MessagePush
) =
for msg in rpc.messages:
let
pubsubTopic = Defaultstring
# TODO: Extend the filter push rpc to provide the pubsub topic. This is a limitation
contentTopic = msg.contentTopic
wf.subManager.notifySubscriptionHandler(pubsubTopic, contentTopic, msg)
proc initProtocolHandler(wf: WakuFilterClientLegacy) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buffer = await conn.readLp(MaxRpcSize.int)
let decodeReqRes = FilterRPC.decode(buffer)
if decodeReqRes.isErr():
waku_legacy_filter_errors.inc(labelValues = [decodeRpcFailure])
return
let rpc = decodeReqRes.get()
trace "filter message received"
if rpc.push.isNone():
waku_legacy_filter_errors.inc(labelValues = [emptyMessagePushFailure])
# TODO: Manage the empty push message error. Perform any action?
return
waku_legacy_filter_messages.inc(labelValues = ["MessagePush"])
let
peerId = conn.peerId
requestId = rpc.requestId
push = rpc.push.get()
info "received filter message push", peerId = conn.peerId, requestId = requestId
wf.handleMessagePush(peerId, requestId, push)
wf.handler = handle
wf.codec = WakuLegacyFilterCodec
proc new*(
T: type WakuFilterClientLegacy,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
): T =
let wf = WakuFilterClientLegacy(
peerManager: peerManager, rng: rng, subManager: SubscriptionManager.init()
)
wf.initProtocolHandler()
wf
proc sendFilterRpc(
wf: WakuFilterClientLegacy, rpc: FilterRPC, peer: PeerId | RemotePeerInfo
): Future[WakuFilterResult[void]] {.async, gcsafe.} =
let connOpt = await wf.peerManager.dialPeer(peer, WakuLegacyFilterCodec)
if connOpt.isNone():
return err(dialFailure)
let connection = connOpt.get()
await connection.writeLP(rpc.encode().buffer)
return ok()
proc sendFilterRequestRpc(
wf: WakuFilterClientLegacy,
pubsubTopic: PubsubTopic,
contentTopics: seq[ContentTopic],
subscribe: bool,
peer: PeerId | RemotePeerInfo,
): Future[WakuFilterResult[void]] {.async.} =
let requestId = generateRequestId(wf.rng)
let contentFilters = contentTopics.mapIt(ContentFilter(contentTopic: it))
let rpc = FilterRpc(
requestId: requestId,
request: some(
FilterRequest(
subscribe: subscribe, pubSubTopic: pubsubTopic, contentFilters: contentFilters
)
),
)
let sendRes = await wf.sendFilterRpc(rpc, peer)
if sendRes.isErr():
waku_legacy_filter_errors.inc(labelValues = [sendRes.error])
return err(sendRes.error)
return ok()
proc subscribe*(
wf: WakuFilterClientLegacy,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic | seq[ContentTopic],
handler: FilterPushHandler,
peer: PeerId | RemotePeerInfo,
): Future[WakuFilterResult[void]] {.async.} =
var topics: seq[ContentTopic]
when contentTopic is seq[ContentTopic]:
topics = contentTopic
else:
topics = @[contentTopic]
let sendRes =
await wf.sendFilterRequestRpc(pubsubTopic, topics, subscribe = true, peer = peer)
if sendRes.isErr():
return err(sendRes.error)
for topic in topics:
wf.subManager.registerSubscription(pubsubTopic, topic, handler)
return ok()
proc unsubscribe*(
wf: WakuFilterClientLegacy,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic | seq[ContentTopic],
peer: PeerId | RemotePeerInfo,
): Future[WakuFilterResult[void]] {.async.} =
var topics: seq[ContentTopic]
when contentTopic is seq[ContentTopic]:
topics = contentTopic
else:
topics = @[contentTopic]
let sendRes =
await wf.sendFilterRequestRpc(pubsubTopic, topics, subscribe = false, peer = peer)
if sendRes.isErr():
return err(sendRes.error)
# FIXME: I see an issue here that such solution prevents filtering client to properly manage its
# subscriptions on different peers and get notified correctly!
for topic in topics:
wf.subManager.removeSubscription(pubsubTopic, topic)
return ok()
proc clearSubscriptions*(wf: WakuFilterClientLegacy) =
wf.subManager.clear()
proc getSubscriptionsCount*(wf: WakuFilterClientLegacy): int =
wf.subManager.getSubscriptionsCount()

View File

@ -1,191 +0,0 @@
import
std/[options, sets, tables, sequtils],
stew/results,
chronicles,
chronos,
metrics,
bearssl/rand,
libp2p/protocols/protocol,
libp2p/crypto/crypto
import ../waku_core, ../node/peer_manager, ./rpc, ./rpc_codec, ./protocol_metrics
logScope:
topics = "waku filter"
const
WakuLegacyFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
WakuLegacyFilterTimeout*: Duration = 2.hours
type WakuFilterResult*[T] = Result[T, string]
## Subscription manager
type Subscription = object
requestId: string
peer: PeerID
pubsubTopic: PubsubTopic
contentTopics: HashSet[ContentTopic]
proc addSubscription(
subscriptions: var seq[Subscription],
peer: PeerID,
requestId: string,
pubsubTopic: PubsubTopic,
contentTopics: seq[ContentTopic],
) =
let subscription = Subscription(
requestId: requestId,
peer: peer,
pubsubTopic: pubsubTopic,
contentTopics: toHashSet(contentTopics),
)
subscriptions.add(subscription)
proc removeSubscription(
subscriptions: var seq[Subscription],
peer: PeerId,
unsubscribeTopics: seq[ContentTopic],
) =
for sub in subscriptions.mitems:
if sub.peer != peer:
continue
sub.contentTopics.excl(toHashSet(unsubscribeTopics))
# Delete the subscriber if no more content filters left
subscriptions.keepItIf(it.contentTopics.len > 0)
## Protocol
type WakuFilterLegacy* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
subscriptions*: seq[Subscription]
failedPeers*: Table[string, chronos.Moment]
timeout*: chronos.Duration
proc handleFilterRequest(wf: WakuFilterLegacy, peerId: PeerId, rpc: FilterRPC) =
let
requestId = rpc.requestId
subscribe = rpc.request.get().subscribe
pubsubTopic = rpc.request.get().pubsubTopic
contentTopics = rpc.request.get().contentFilters.mapIt(it.contentTopic)
if subscribe:
info "added filter subscritpiton",
peerId = peerId, pubsubTopic = pubsubTopic, contentTopics = contentTopics
wf.subscriptions.addSubscription(peerId, requestId, pubsubTopic, contentTopics)
else:
info "removed filter subscritpiton", peerId = peerId, contentTopics = contentTopics
wf.subscriptions.removeSubscription(peerId, contentTopics)
waku_legacy_filter_subscribers.set(wf.subscriptions.len.int64)
proc initProtocolHandler(wf: WakuFilterLegacy) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buffer = await conn.readLp(MaxRpcSize.int)
let decodeRpcRes = FilterRPC.decode(buffer)
if decodeRpcRes.isErr():
waku_legacy_filter_errors.inc(labelValues = [decodeRpcFailure])
return
trace "filter message received"
let rpc = decodeRpcRes.get()
## Filter request
# Subscription/unsubscription request
if rpc.request.isNone():
waku_legacy_filter_errors.inc(labelValues = [emptyFilterRequestFailure])
# TODO: Manage the empty filter request message error. Perform any action?
return
waku_legacy_filter_messages.inc(labelValues = ["FilterRequest"])
wf.handleFilterRequest(conn.peerId, rpc)
wf.handler = handler
wf.codec = WakuLegacyFilterCodec
proc new*(
T: type WakuFilterLegacy,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
timeout: Duration = WakuLegacyFilterTimeout,
): T =
let wf = WakuFilterLegacy(rng: rng, peerManager: peerManager, timeout: timeout)
wf.initProtocolHandler()
return wf
proc sendFilterRpc(
wf: WakuFilterLegacy, rpc: FilterRPC, peer: PeerId | RemotePeerInfo
): Future[WakuFilterResult[void]] {.async, gcsafe.} =
let connOpt = await wf.peerManager.dialPeer(peer, WakuLegacyFilterCodec)
if connOpt.isNone():
return err(dialFailure)
let connection = connOpt.get()
await connection.writeLP(rpc.encode().buffer)
return ok()
### Send message to subscriptors
proc removePeerFromFailedPeersTable(wf: WakuFilterLegacy, subs: seq[Subscription]) =
## Clear the failed peer table if subscriber was able to connect
for sub in subs:
wf.failedPeers.del($sub)
proc handleClientError(
wf: WakuFilterLegacy, subs: seq[Subscription]
) {.raises: [Defect, KeyError].} =
## If we have already failed to send message to this peer,
## check for elapsed time and if it's been too long, remove the peer.
for sub in subs:
let subKey: string = $(sub)
if not wf.failedPeers.hasKey(subKey):
# add the peer to the failed peers table.
wf.failedPeers[subKey] = Moment.now()
return
let elapsedTime = Moment.now() - wf.failedPeers[subKey]
if elapsedTime > wf.timeout:
wf.failedPeers.del(subKey)
let index = wf.subscriptions.find(sub)
wf.subscriptions.delete(index)
proc handleMessage*(
wf: WakuFilterLegacy, pubsubTopic: PubsubTopic, msg: WakuMessage
) {.async.} =
trace "handling message",
pubsubTopic, contentTopic = msg.contentTopic, subscriptions = wf.subscriptions.len
if wf.subscriptions.len <= 0:
return
var failedSubscriptions: seq[Subscription]
var connectedSubscriptions: seq[Subscription]
for sub in wf.subscriptions:
# TODO: Review when pubsubTopic can be empty and if it is a valid case
if sub.pubSubTopic != "" and sub.pubSubTopic != pubsubTopic:
continue
if msg.contentTopic notin sub.contentTopics:
continue
let rpc =
FilterRPC(requestId: sub.requestId, push: some(MessagePush(messages: @[msg])))
let res = await wf.sendFilterRpc(rpc, sub.peer)
if res.isErr():
waku_legacy_filter_errors.inc(labelValues = [res.error()])
failedSubscriptions.add(sub)
continue
connectedSubscriptions.add(sub)
wf.removePeerFromFailedPeersTable(connectedSubscriptions)
wf.handleClientError(failedSubscriptions)

View File

@ -1,22 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import metrics
declarePublicGauge waku_legacy_filter_subscribers,
"number of light node filter subscribers"
declarePublicGauge waku_legacy_filter_errors,
"number of filter protocol errors", ["type"]
declarePublicGauge waku_legacy_filter_messages,
"number of filter messages received", ["type"]
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
# Error types (metric label values)
const
dialFailure* = "dial_failure"
decodeRpcFailure* = "decode_rpc_failure"
peerNotFoundFailure* = "peer_not_found_failure"
emptyMessagePushFailure* = "empty_message_push_failure"
emptyFilterRequestFailure* = "empty_filter_request_failure"

View File

@ -1,24 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/options
import ../waku_core
type
ContentFilter* = object
contentTopic*: string
FilterRequest* = object
contentFilters*: seq[ContentFilter]
pubsubTopic*: string
subscribe*: bool
MessagePush* = object
messages*: seq[WakuMessage]
FilterRPC* = object
requestId*: string
request*: Option[FilterRequest]
push*: Option[MessagePush]

View File

@ -1,130 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/options
import ../common/protobuf, ../waku_core, ./rpc
# Multiply by 10 for safety. Currently we never push more than 1 message at a time
# We add a 64kB safety buffer for protocol overhead.
const MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024
proc encode*(filter: ContentFilter): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, filter.contentTopic)
pb.finish3()
pb
proc decode*(T: type ContentFilter, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = ContentFilter()
var topic: string
if not ?pb.getField(1, topic):
return err(ProtobufError.missingRequiredField("content_topic"))
else:
rpc.contentTopic = topic
ok(rpc)
proc encode*(rpc: FilterRequest): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, rpc.subscribe)
pb.write3(2, rpc.pubSubTopic)
for filter in rpc.contentFilters:
pb.write3(3, filter.encode())
pb.finish3()
pb
proc decode*(T: type FilterRequest, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = FilterRequest()
var subflag: uint64
if not ?pb.getField(1, subflag):
return err(ProtobufError.missingRequiredField("subscribe"))
else:
rpc.subscribe = bool(subflag)
var topic: string
if not ?pb.getField(2, topic):
return err(ProtobufError.missingRequiredField("topic"))
else:
rpc.pubsubTopic = topic
var buffs: seq[seq[byte]]
if not ?pb.getRepeatedField(3, buffs):
return err(ProtobufError.missingRequiredField("content_filters"))
else:
for buf in buffs:
let filter = ?ContentFilter.decode(buf)
rpc.contentFilters.add(filter)
ok(rpc)
proc encode*(push: MessagePush): ProtoBuffer =
var pb = initProtoBuffer()
for push in push.messages:
pb.write3(1, push.encode())
pb.finish3()
pb
proc decode*(T: type MessagePush, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = MessagePush()
var messages: seq[seq[byte]]
if not ?pb.getRepeatedField(1, messages):
return err(ProtobufError.missingRequiredField("messages"))
else:
for buf in messages:
let msg = ?WakuMessage.decode(buf)
rpc.messages.add(msg)
ok(rpc)
proc encode*(rpc: FilterRPC): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, rpc.requestId)
pb.write3(2, rpc.request.map(encode))
pb.write3(3, rpc.push.map(encode))
pb.finish3()
pb
proc decode*(T: type FilterRPC, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = FilterRPC()
var requestId: string
if not ?pb.getField(1, requestId):
return err(ProtobufError.missingRequiredField("request_id"))
else:
rpc.requestId = requestId
var requestBuffer: seq[byte]
if not ?pb.getField(2, requestBuffer):
rpc.request = none(FilterRequest)
else:
let request = ?FilterRequest.decode(requestBuffer)
rpc.request = some(request)
var pushBuffer: seq[byte]
if not ?pb.getField(3, pushBuffer):
rpc.push = none(MessagePush)
else:
let push = ?MessagePush.decode(pushBuffer)
rpc.push = some(push)
ok(rpc)

View File

@ -79,11 +79,17 @@ proc subscribe*(
wfc: WakuFilterClient, wfc: WakuFilterClient,
servicePeer: RemotePeerInfo, servicePeer: RemotePeerInfo,
pubsubTopic: PubsubTopic, pubsubTopic: PubsubTopic,
contentTopics: seq[ContentTopic], contentTopics: ContentTopic|seq[ContentTopic],
): Future[FilterSubscribeResult] {.async.} = ): Future[FilterSubscribeResult] {.async.} =
var contentTopicSeq: seq[ContentTopic]
when contentTopics is seq[ContentTopic]:
contentTopicSeq = contentTopics
else:
contentTopicSeq = @[contentTopics]
let requestId = generateRequestId(wfc.rng) let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.subscribe( let filterSubscribeRequest = FilterSubscribeRequest.subscribe(
requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopics requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq
) )
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
@ -92,11 +98,17 @@ proc unsubscribe*(
wfc: WakuFilterClient, wfc: WakuFilterClient,
servicePeer: RemotePeerInfo, servicePeer: RemotePeerInfo,
pubsubTopic: PubsubTopic, pubsubTopic: PubsubTopic,
contentTopics: seq[ContentTopic], contentTopics: ContentTopic|seq[ContentTopic],
): Future[FilterSubscribeResult] {.async.} = ): Future[FilterSubscribeResult] {.async.} =
var contentTopicSeq: seq[ContentTopic]
when contentTopics is seq[ContentTopic]:
contentTopicSeq = contentTopics
else:
contentTopicSeq = @[contentTopics]
let requestId = generateRequestId(wfc.rng) let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe( let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe(
requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopics requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq
) )
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
@ -140,4 +152,4 @@ proc new*(
): T = ): T =
let wfc = WakuFilterClient(rng: rng, peerManager: peerManager, pushHandlers: @[]) let wfc = WakuFilterClient(rng: rng, peerManager: peerManager, pushHandlers: @[])
wfc.initProtocolHandler() wfc.initProtocolHandler()
wfc wfc