feat: HTTP REST API: Filter support v2 (#1890)

Filter v2 rest api support implemented 
Filter rest api documentation updated with v1 and v2 interface support.
Separated legacy filter rest interface
Fix code and tests of v2 Filter rest api
Filter v2 message push test added
Applied autoshard to Filter V2
Redesigned FilterPushHandling, code style, catch up apps and tests with filter v2 interface changes
Rename of FilterV1SubscriptionsRequest to FilterLegacySubscribeRequest, fix broken chat2 app, fix tests
Changed Filter v2 push handler subscription to simple register
Separate node's filterUnsubscribe and filterUnsubscribeAll
This commit is contained in:
NagyZoltanPeter 2023-09-14 21:28:57 +02:00 committed by GitHub
parent 9085b1b3ba
commit dac072f843
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1928 additions and 399 deletions

View File

@ -262,10 +262,12 @@ proc writeAndPrint(c: Chat) {.async.} =
echo "You are now known as " & c.nick
elif line.startsWith("/exit"):
if not c.node.wakuFilter.isNil():
if not c.node.wakuFilterLegacy.isNil():
echo "unsubscribing from content filters..."
await c.node.unsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic)
let peerOpt = c.node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isSome():
await c.node.legacyFilterUnsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic, peer=peerOpt.get())
echo "quitting..."
@ -464,14 +466,18 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if peerInfo.isOk():
await node.mountFilter()
await node.mountFilterClient()
node.peerManager.addServicePeer(peerInfo.value, WakuFilterCodec)
node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec)
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
trace "Hit filter handler", contentTopic=msg.contentTopic
chat.printReceivedMessage(msg)
await node.subscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=chat.contentTopic, filterHandler)
await node.legacyFilterSubscribe(pubsubTopic=some(DefaultPubsubTopic),
contentTopics=chat.contentTopic,
filterHandler,
peerInfo.value)
# TODO: Here to support FilterV2 relevant subscription, but still
# Legacy Filter is concurrent to V2 untill legacy filter will be removed
else:
error "Filter not mounted. Couldn't parse conf.filternode",
error = peerInfo.error
@ -485,7 +491,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
chat.printReceivedMessage(msg)
let topic = DefaultPubsubTopic
await node.subscribe(some(topic), @[ContentTopic("")], handler)
node.subscribe(topic, handler)
if conf.rlnRelay:
info "WakuRLNRelay is enabled"

View File

@ -18,6 +18,7 @@ import
../../../waku/waku_node,
../../../waku/node/peer_manager,
../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_store,
# Chat 2 imports
../chat2/chat2,
@ -297,7 +298,8 @@ when isMainModule:
if conf.filternode != "":
let filterPeer = parsePeerInfo(conf.filternode)
if filterPeer.isOk():
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterCodec)
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuLegacyFilterCodec)
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterSubscribeCodec)
else:
error "Error parsing conf.filternode", error = filterPeer.error

View File

@ -37,6 +37,8 @@ import
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_filter_v2/client as waku_filter_client,
./wakunode2_validator_signed,
./internal_config,
./external_config
@ -46,6 +48,7 @@ import
../../waku/node/rest/debug/handlers as rest_debug_api,
../../waku/node/rest/relay/handlers as rest_relay_api,
../../waku/node/rest/relay/topic_cache,
../../waku/node/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/node/rest/filter/handlers as rest_filter_api,
../../waku/node/rest/store/handlers as rest_store_api,
../../waku/node/rest/health/handlers as rest_health_api,
@ -470,8 +473,9 @@ proc setupProtocols(node: WakuNode,
if conf.filternode != "":
let filterNode = parsePeerInfo(conf.filternode)
if filterNode.isOk():
await mountFilterClient(node)
node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec)
await node.mountFilterClient()
node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec)
node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec)
else:
return err("failed to set node waku filter peer: " & filterNode.error)
@ -577,8 +581,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
## Filter REST API
if conf.filter:
let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity)
installFilterApiHandlers(server.router, app.node, filterCache)
let legacyFilterCache = rest_legacy_filter_api.MessageCache.init()
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache)
let filterCache = rest_filter_api.MessageCache.init()
rest_filter_api.installFilterRestApiHandlers(server.router, app.node, filterCache)
## Store REST API
installStoreApiHandlers(server.router, app.node)

View File

@ -28,13 +28,15 @@ proc unsubscribe(wfc: WakuFilterClient,
else:
notice "unsubscribe request successful"
proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) =
proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage)
{.async, gcsafe.} =
let payloadStr = string.fromBytes(message.payload)
notice "message received", payload=payloadStr,
pubsubTopic=pubsubTopic,
contentTopic=message.contentTopic,
timestamp=message.timestamp
proc maintainSubscription(wfc: WakuFilterClient,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
@ -68,11 +70,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) =
var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wfc = WakuFilterClient.new(rng, messagePushHandler, pm)
wfc = WakuFilterClient.new(pm, rng)
# Mount filter client protocol
switch.mount(wfc)
wfc.registerPushHandler(messagePushHandler)
# Start maintaining subscription
asyncSpawn maintainSubscription(wfc, filterPeer, FilterPubsubTopic, FilterContentTopic)

View File

@ -58,8 +58,8 @@ import
./test_waku_lightpush,
./test_wakunode_lightpush,
# Waku Filter
./test_waku_filter,
./test_wakunode_filter,
./test_waku_filter_legacy,
./test_wakunode_filter_legacy,
./test_waku_peer_exchange,
./test_peer_store_extended,
./test_message_cache,
@ -95,7 +95,9 @@ import
./wakunode_rest/test_rest_relay,
./wakunode_rest/test_rest_relay_serdes,
./wakunode_rest/test_rest_serdes,
./wakunode_rest/test_rest_store
./wakunode_rest/test_rest_store,
./wakunode_rest/test_rest_filter,
./wakunode_rest/test_rest_legacy_filter
import
./waku_rln_relay/test_waku_rln_relay,

View File

@ -52,7 +52,7 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.mountFilter()))
# Dial node2 from node1
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec)
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
# Check connection
check:
conn.isSome()
@ -81,12 +81,12 @@ procSuite "Peer Manager":
let nonExistentPeer = nonExistentPeerRes.value
# Dial non-existent peer from node1
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuFilterCodec)
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuLegacyFilterCodec)
check:
conn1.isNone()
# Dial peer not supporting given protocol
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec)
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
check:
conn2.isNone()
@ -109,14 +109,14 @@ procSuite "Peer Manager":
node.mountStoreClient()
node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec)
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec)
# Check peers were successfully added to peer manager
check:
node.peerManager.peerStore.peers().len == 2
node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
node.peerManager.peerStore.peers(WakuLegacyFilterCodec).allIt(it.peerId == filterPeer.peerId and
it.addrs.contains(filterLoc) and
it.protocols.contains(WakuFilterCodec))
it.protocols.contains(WakuLegacyFilterCodec))
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
it.addrs.contains(storeLoc) and
it.protocols.contains(WakuStoreCodec))
@ -429,7 +429,7 @@ procSuite "Peer Manager":
# service peers
node.peerManager.addServicePeer(peers[0], WakuStoreCodec)
node.peerManager.addServicePeer(peers[1], WakuFilterCodec)
node.peerManager.addServicePeer(peers[1], WakuLegacyFilterCodec)
node.peerManager.addServicePeer(peers[2], WakuLightPushCodec)
node.peerManager.addServicePeer(peers[3], WakuPeerExchangeCodec)
@ -449,7 +449,7 @@ procSuite "Peer Manager":
# all service peers are added to its service slot
check:
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId
node.peerManager.serviceSlots[WakuFilterCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLegacyFilterCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[2].peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[3].peerId
@ -474,11 +474,11 @@ procSuite "Peer Manager":
(await nodes[0].peerManager.connectRelay(pInfos[2])) == true
(await nodes[1].peerManager.connectRelay(pInfos[2])) == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuLegacyFilterCodec)).isSome() == true
# isolated dial creates a relay conn under the hood (libp2p behaviour)
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterCodec)).isSome() == true
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuLegacyFilterCodec)).isSome() == true
# assert physical connections
@ -486,26 +486,26 @@ procSuite "Peer Manager":
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2
nodes[0].peerManager.connectedPeers(WakuFilterCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuFilterCodec)[1].len == 2
nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 2
nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1
nodes[1].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0
nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterCodec)[1].len == 1
nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0
nodes[3].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0
nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0
asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes
@ -521,17 +521,17 @@ procSuite "Peer Manager":
require:
# multiple streams are multiplexed over a single connection.
# note that a relay connection is created under the hood when dialing a peer (libp2p behaviour)
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
check:
nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[0].peerManager.getNumStreams(WakuFilterCodec) == (0, 4)
nodes[0].peerManager.getNumStreams(WakuLegacyFilterCodec) == (0, 4)
nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[1].peerManager.getNumStreams(WakuFilterCodec) == (4, 0)
nodes[1].peerManager.getNumStreams(WakuLegacyFilterCodec) == (4, 0)
test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit
@ -552,7 +552,7 @@ procSuite "Peer Manager":
# Add a peer[0] to the peerstore
pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs
pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuFilterCodec]
pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuLegacyFilterCodec]
# When no service peers, we get one from the peerstore
let selectedPeer1 = pm.selectPeer(WakuStoreCodec)
@ -561,7 +561,7 @@ procSuite "Peer Manager":
selectedPeer1.get().peerId == peers[0].peerId
# Same for other protocol
let selectedPeer2 = pm.selectPeer(WakuFilterCodec)
let selectedPeer2 = pm.selectPeer(WakuLegacyFilterCodec)
check:
selectedPeer2.isSome() == true
selectedPeer2.get().peerId == peers[0].peerId
@ -757,7 +757,7 @@ procSuite "Peer Manager":
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])
# they are also prunned 
# they are also prunned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1
# we should have 4 peers (2in/2out) but due to collocation limit

View File

@ -44,7 +44,7 @@ suite "WakuNode - Filter":
filterPushHandlerFut.complete((pubsubTopic, msg))
## When
await client.filterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)
await client.legacyFilterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)
# Wait for subscription to take effect
waitFor sleepAsync(100.millis)

View File

@ -1,6 +1,5 @@
import
std/[options,tables],
testutils/unittests,
chronos,
chronicles
@ -22,10 +21,10 @@ proc newTestWakuFilter*(switch: Switch): Future[WakuFilter] {.async.} =
return proto
proc newTestWakuFilterClient*(switch: Switch, messagePushHandler: MessagePushHandler): Future[WakuFilterClient] {.async.} =
proc newTestWakuFilterClient*(switch: Switch): Future[WakuFilterClient] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilterClient.new(rng, messagePushHandler, peerManager)
proto = WakuFilterClient.new(peerManager, rng)
await proto.start()
switch.mount(proto)

View File

@ -3,16 +3,13 @@
import
std/[options,tables],
testutils/unittests,
chronos,
chronicles,
libp2p/peerstore
chronos
import
../../../waku/node/peer_manager,
../../../waku/waku_filter_v2,
../../../waku/waku_filter_v2/client,
../../../waku/waku_core,
../testlib/common,
../../waku/node/peer_manager,
../../waku/waku_filter_v2,
../../waku/waku_filter_v2/client,
../../waku/waku_core,
../testlib/wakucore,
../testlib/testasync,
./client_utils.nim
@ -28,26 +25,23 @@ suite "Waku Filter":
var contentTopics {.threadvar.}: seq[ContentTopic]
asyncSetup:
let
voidHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) =
discard
pubsubTopic = DefaultPubsubTopic
contentTopics = @[DefaultContentTopic]
serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, voidHandler)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch)
await allFutures(serverSwitch.start(), clientSwitch.start())
serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
asyncTeardown:
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())
asyncTest "Active Subscription Identification":
# Given
let
let
clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId
subscribeResponse = await wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopics
@ -75,12 +69,12 @@ suite "Waku Filter":
asyncTest "After Unsubscription":
# Given
let
let
clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId
subscribeResponse = await wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopics
)
require:
subscribeResponse.isOk()
wakuFilter.subscriptions.hasKey(clientPeerId)

View File

@ -3,16 +3,13 @@
import
std/[options,tables],
testutils/unittests,
chronos,
chronicles,
libp2p/peerstore
chronos
import
../../../waku/node/peer_manager,
../../../waku/waku_filter_v2,
../../../waku/waku_filter_v2/client,
../../../waku/waku_core,
../testlib/common,
../../waku/node/peer_manager,
../../waku/waku_filter_v2,
../../waku/waku_filter_v2/client,
../../waku/waku_core,
../testlib/wakucore,
./client_utils.nim
@ -22,21 +19,29 @@ suite "Waku Filter - end to end":
# Given
var
pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) =
messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic,
message: WakuMessage):
Future[void]
{.async, closure, gcsafe.} =
pushHandlerFuture.complete((pubsubTopic, message))
let
serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch)
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
contentTopics = @[DefaultContentTopic]
# When
await allFutures(serverSwitch.start(), clientSwitch.start())
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
wakuFilterClient.registerPushHandler(messagePushHandler)
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic,
contentTopics)
# Then
check:
@ -57,7 +62,9 @@ suite "Waku Filter - end to end":
pushedMsg == msg1
# When
let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic,
contentTopics)
# Then
check:
@ -74,20 +81,24 @@ suite "Waku Filter - end to end":
not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed
# Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(),
serverSwitch.stop(), clientSwitch.stop())
asyncTest "subscribe, unsubscribe multiple content topics":
# Given
var
pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) =
messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic,
message: WakuMessage):
Future[void]
{.async, closure, gcsafe.} =
pushHandlerFuture.complete((pubsubTopic, message))
let
serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch)
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
contentTopic2 = ContentTopic("/waku/2/non-default-content/proto")
@ -95,7 +106,12 @@ suite "Waku Filter - end to end":
# When
await allFutures(serverSwitch.start(), clientSwitch.start())
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
wakuFilterClient.registerPushHandler(messagePushHandler)
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic,
contentTopics)
# Then
check:
@ -129,7 +145,9 @@ suite "Waku Filter - end to end":
pushedMsg2 == msg2
# When
let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, @[contentTopic2]) # Unsubscribe only one content topic
let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic,
@[contentTopic2]) # Unsubscribe only one content topic
# Then
check:
@ -159,20 +177,24 @@ suite "Waku Filter - end to end":
not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed
# Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(),
serverSwitch.stop(), clientSwitch.stop())
asyncTest "subscribe to multiple content topics and unsubscribe all":
# Given
var
pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) =
messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic,
message: WakuMessage):
Future[void]
{.async, closure, gcsafe.} =
pushHandlerFuture.complete((pubsubTopic, message))
let
serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch)
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
contentTopic2 = ContentTopic("/waku/2/non-default-content/proto")
@ -180,7 +202,12 @@ suite "Waku Filter - end to end":
# When
await allFutures(serverSwitch.start(), clientSwitch.start())
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
wakuFilterClient.registerPushHandler(messagePushHandler)
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic,
contentTopics)
# Then
check:
@ -234,20 +261,24 @@ suite "Waku Filter - end to end":
not (await pushHandlerFuture.withTimeout(2.seconds)) # Neither message should be pushed
# Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(),
serverSwitch.stop(), clientSwitch.stop())
asyncTest "subscribe, unsubscribe multiple pubsub topics and content topics":
# Given
var
pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) =
messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic,
message: WakuMessage):
Future[void]
{.async, closure, gcsafe.} =
pushHandlerFuture.complete((pubsubTopic, message))
let
serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch)
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
pubsubTopic2 = PubsubTopic("/waku/2/non-default-pubsub/proto")
@ -258,9 +289,17 @@ suite "Waku Filter - end to end":
# When
await allFutures(serverSwitch.start(), clientSwitch.start())
wakuFilterClient.registerPushHandler(messagePushHandler)
let
response1 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, contentTopics)
response1 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic,
contentTopics)
response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic2,
contentTopics)
# Then
check:
@ -299,7 +338,9 @@ suite "Waku Filter - end to end":
## Step 3: We can selectively unsubscribe from pubsub topics and content topic(s)
# When
let response3 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, @[contentTopic2])
let response3 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic2,
@[contentTopic2])
require response3.isOk()
let msg3 = fakeWakuMessage(contentTopic=contentTopic2)
@ -325,4 +366,5 @@ suite "Waku Filter - end to end":
pushedMsg3 == msg3
# Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(),
serverSwitch.stop(), clientSwitch.stop())

View File

@ -216,7 +216,7 @@ procSuite "WakuNode - Store":
let mountArchiveRes = server.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
waitFor server.mountStore()
waitFor server.mountFilterClient()
client.mountStoreClient()
@ -232,7 +232,7 @@ procSuite "WakuNode - Store":
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
filterFut.complete((pubsubTopic, msg))
waitFor server.filterSubscribe(some(DefaultPubsubTopic), DefaultContentTopic, filterHandler, peer=filterSourcePeer)
waitFor server.legacyFilterSubscribe(some(DefaultPubsubTopic), DefaultContentTopic, filterHandler, peer=filterSourcePeer)
waitFor sleepAsync(100.millis)

View File

@ -169,11 +169,11 @@ procSuite "Waku v2 JSON-RPC API - Admin":
filterPeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr])
storePeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr])
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec)
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec)
node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
# Mock that we connected in the past so Identify populated this
node.peerManager.peerStore[ProtoBook][filterPeer.peerId] = @[WakuFilterCodec]
node.peerManager.peerStore[ProtoBook][filterPeer.peerId] = @[WakuLegacyFilterCodec]
node.peerManager.peerStore[ProtoBook][storePeer.peerId] = @[WakuStoreCodec]
let response = await client.get_waku_v2_admin_v1_peers()
@ -182,7 +182,7 @@ procSuite "Waku v2 JSON-RPC API - Admin":
check:
response.len == 2
# Check filter peer
(response.filterIt(it.protocol == WakuFilterCodec)[0]).multiaddr == constructMultiaddrStr(filterPeer)
(response.filterIt(it.protocol == WakuLegacyFilterCodec)[0]).multiaddr == constructMultiaddrStr(filterPeer)
# Check store peer
(response.filterIt(it.protocol == WakuStoreCodec)[0]).multiaddr == constructMultiaddrStr(storePeer)

View File

@ -41,7 +41,7 @@ procSuite "Waku v2 JSON-RPC API - Filter":
await node1.mountFilter()
await node2.mountFilterClient()
node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec)
node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
# RPC server setup
let

View File

@ -21,6 +21,11 @@ import
../../waku/node/rest/filter/handlers as filter_api,
../../waku/node/rest/filter/client as filter_api_client,
../../waku/waku_relay,
../../waku/waku_filter_v2/subscriptions,
../../waku/waku_filter_v2/common,
../../waku/node/rest/relay/topic_cache,
../../waku/node/rest/relay/handlers as relay_api,
../../waku/node/rest/relay/client as relay_api_client,
../testlib/wakucore,
../testlib/wakunode
@ -36,51 +41,64 @@ proc testWakuNode(): WakuNode =
type RestFilterTest = object
node1: WakuNode
node2: WakuNode
serviceNode: WakuNode
subscriberNode: WakuNode
restServer: RestServerRef
restServerForService: RestServerRef
messageCache: filter_api.MessageCache
client: RestClientRef
clientTwdServiceNode: RestClientRef
proc setupRestFilter(): Future[RestFilterTest] {.async.} =
result.node1 = testWakuNode()
result.node2 = testWakuNode()
proc init(T: type RestFilterTest): Future[T] {.async.} =
var testSetup = RestFilterTest()
testSetup.serviceNode = testWakuNode()
testSetup.subscriberNode = testWakuNode()
await allFutures(result.node1.start(), result.node2.start())
await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start())
await result.node1.mountFilter()
await result.node2.mountFilterClient()
await testSetup.serviceNode.mountRelay()
await testSetup.serviceNode.mountFilter()
await testSetup.subscriberNode.mountFilterClient()
result.node2.peerManager.addServicePeer(result.node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec)
testSetup.subscriberNode.peerManager.addServicePeer(testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuFilterSubscribeCodec)
let restPort = Port(58011)
let restAddress = ValidIpAddress.init("0.0.0.0")
result.restServer = RestServerRef.init(restAddress, restPort).tryGet()
let restAddress = ValidIpAddress.init("127.0.0.1")
testSetup.restServer = RestServerRef.init(restAddress, restPort).tryGet()
result.messageCache = filter_api.MessageCache.init(capacity=filter_api.filterMessageCacheDefaultCapacity)
let restPort2 = Port(58012)
testSetup.restServerForService = RestServerRef.init(restAddress, restPort2).tryGet()
installFilterPostSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache)
installFilterDeleteSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache)
installFilterGetMessagesV1Handler(result.restServer.router, result.node2, result.messageCache)
# through this one we will see if messages are pushed according to our content topic sub
testSetup.messageCache = filter_api.MessageCache.init()
installFilterRestApiHandlers(testSetup.restServer.router, testSetup.subscriberNode, testSetup.messageCache)
result.restServer.start()
let topicCache = TopicCache.init()
installRelayApiHandlers(testSetup.restServerForService.router, testSetup.serviceNode, topicCache)
result.client = newRestHttpClient(initTAddress(restAddress, restPort))
testSetup.restServer.start()
testSetup.restServerForService.start()
return result
testSetup.client = newRestHttpClient(initTAddress(restAddress, restPort))
testSetup.clientTwdServiceNode = newRestHttpClient(initTAddress(restAddress, restPort2))
return testSetup
proc shutdown(self: RestFilterTest) {.async.} =
await self.restServer.stop()
await self.restServer.closeWait()
await allFutures(self.node1.stop(), self.node2.stop())
await self.restServerForService.stop()
await self.restServerForService.closeWait()
await allFutures(self.serviceNode.stop(), self.subscriberNode.stop())
suite "Waku v2 Rest API - Filter":
asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions":
suite "Waku v2 Rest API - Filter V2":
asyncTest "Subscribe a node to an array of topics - POST /filter/v2/subscriptions":
# Given
let restFilterTest: RestFilterTest = await setupRestFilter()
let restFilterTest = await RestFilterTest.init()
let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
# When
let contentFilters = @[DefaultContentTopic
@ -89,103 +107,178 @@ suite "Waku v2 Rest API - Filter":
,ContentTopic("4")
]
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
let requestBody = FilterSubscribeRequest(requestId: "1234",
contentFilters: contentFilters,
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody)
let response = await restFilterTest.client.filterPostSubscriptions(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
echo "response", $response
check:
restFilterTest.messageCache.isSubscribed(DefaultContentTopic)
restFilterTest.messageCache.isSubscribed("2")
restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")
# When - error case
let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: none(string))
let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody)
check:
badResponse.status == 400
$badResponse.contentType == $MIMETYPE_TEXT
badResponse.data == "Invalid content body, could not decode. Unable to deserialize data"
await restFilterTest.shutdown()
asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions":
# Given
let
restFilterTest: RestFilterTest = await setupRestFilter()
# When
restFilterTest.messageCache.subscribe("1")
restFilterTest.messageCache.subscribe("2")
restFilterTest.messageCache.subscribe("3")
restFilterTest.messageCache.subscribe("4")
let contentFilters = @[ContentTopic("1")
,ContentTopic("2")
,ContentTopic("3")
# ,ContentTopic("4") # Keep this subscription for check
]
# When
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
not restFilterTest.messageCache.isSubscribed("1")
not restFilterTest.messageCache.isSubscribed("2")
not restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")
await restFilterTest.shutdown()
asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}":
# Given
let
restFilterTest = await setupRestFilter()
let pubSubTopic = "/waku/2/default-waku/proto"
let contentTopic = ContentTopic( "content-topic-x" )
let messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
]
restFilterTest.messageCache.subscribe(contentTopic)
for msg in messages:
restFilterTest.messageCache.addMessage(contentTopic, msg)
# When
let response = await restFilterTest.client.filterGetMessagesV1(contentTopic)
let subscribedPeer1 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, DefaultContentTopic)
let subscribedPeer2 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "2")
let subscribedPeer3 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "3")
let subscribedPeer4 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4")
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.len == 3
response.data.all do (msg: FilterWakuMessage) -> bool:
msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get().string == "content-topic-x" and
msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0)
response.data.requestId == "1234"
subscribedPeer1.len() == 1
subPeerId in subscribedPeer1
subPeerId in subscribedPeer2
subPeerId in subscribedPeer3
subPeerId in subscribedPeer4
# When - error case
let badRequestBody = FilterSubscribeRequest(requestId: "4567", contentFilters: @[], pubsubTopic: none(string))
let badRequestResp = await restFilterTest.client.filterPostSubscriptions(badRequestBody)
check:
badRequestResp.status == 400
$badRequestResp.contentType == $MIMETYPE_JSON
badRequestResp.data.requestId == "unknown"
# badRequestResp.data.statusDesc == "*********"
badRequestResp.data.statusDesc.startsWith("BAD_REQUEST: Failed to decode request")
await restFilterTest.shutdown()
asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v2/subscriptions":
# Given
let
restFilterTest = await RestFilterTest.init()
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
# When
var requestBody = FilterSubscribeRequest(requestId: "1234",
contentFilters: @[ContentTopic("1")
,ContentTopic("2")
,ContentTopic("3")
,ContentTopic("4")
],
pubsubTopic: some(DefaultPubsubTopic))
discard await restFilterTest.client.filterPostSubscriptions(requestBody)
let contentFilters = @[ContentTopic("1")
,ContentTopic("2")
,ContentTopic("3")
# ,ContentTopic("4") # Keep this subscription for check
]
let requestBodyUnsub = FilterUnsubscribeRequest(requestId: "4321",
contentFilters: contentFilters,
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterDeleteSubscriptions(requestBodyUnsub)
let subscribedPeer1 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, DefaultContentTopic)
let subscribedPeer2 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "2")
let subscribedPeer3 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "3")
let subscribedPeer4 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4")
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.requestId == "4321"
subscribedPeer1.len() == 0
subPeerId notin subscribedPeer1
subPeerId notin subscribedPeer2
subPeerId notin subscribedPeer3
subscribedPeer4.len() == 1
subPeerId in subscribedPeer4
# When - error case
let requestBodyUnsubAll = FilterUnsubscribeAllRequest(requestId: "2143")
let responseUnsubAll = await restFilterTest.client.filterDeleteAllSubscriptions(requestBodyUnsubAll)
let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4")
check:
responseUnsubAll.status == 200
$responseUnsubAll.contentType == $MIMETYPE_JSON
responseUnsubAll.data.requestId == "2143"
subscribedPeer.len() == 0
await restFilterTest.shutdown()
asyncTest "ping subscribed node - GET /filter/v2/subscriptions/{requestId}":
# Given
let
restFilterTest = await RestFilterTest.init()
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
# When
var requestBody = FilterSubscribeRequest(requestId: "1234",
contentFilters: @[ContentTopic("1")],
pubsubTopic: some(DefaultPubsubTopic))
discard await restFilterTest.client.filterPostSubscriptions(requestBody)
let pingResponse = await restFilterTest.client.filterSubscriberPing("9999")
# Then
check:
pingResponse.status == 200
$pingResponse.contentType == $MIMETYPE_JSON
pingResponse.data.requestId == "9999"
pingResponse.data.statusDesc.len() == 0
# When - error case
let requestBodyUnsubAll = FilterUnsubscribeAllRequest(requestId: "9988")
discard await restFilterTest.client.filterDeleteAllSubscriptions(requestBodyUnsubAll)
let pingResponseFail = await restFilterTest.client.filterSubscriberPing("9977")
# Then
check:
pingResponseFail.status == 404 # NOT_FOUND
$pingResponseFail.contentType == $MIMETYPE_JSON
pingResponseFail.data.requestId == "9977"
pingResponseFail.data.statusDesc == "NOT_FOUND: peer has no subscriptions"
await restFilterTest.shutdown()
asyncTest "push filtered message":
# Given
let
restFilterTest = await RestFilterTest.init()
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.messageCache.subscribe(DefaultPubsubTopic)
restFilterTest.serviceNode.subscribe(DefaultPubsubTopic)
# When
var requestBody = FilterSubscribeRequest(requestId: "1234",
contentFilters: @[ContentTopic("1")],
pubsubTopic: some(DefaultPubsubTopic))
discard await restFilterTest.client.filterPostSubscriptions(requestBody)
let pingResponse = await restFilterTest.client.filterSubscriberPing("9999")
# Then
check:
pingResponse.status == 200
$pingResponse.contentType == $MIMETYPE_JSON
pingResponse.data.requestId == "9999"
pingResponse.data.statusDesc.len() == 0
# When - message push
let testMessage = WakuMessage(
payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(),
contentTopic: "1",
timestamp: int64(2022)
)
let postMsgResponse = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
DefaultPubsubTopic,
toRelayWakuMessage(testMessage)
)
# Then
let messages = restFilterTest.messageCache.getMessages("1").tryGet()
check:
postMsgResponse.status == 200
$postMsgResponse.contentType == $MIMETYPE_TEXT
postMsgResponse.data == "OK"
messages == @[testMessage]
await restFilterTest.shutdown()

View File

@ -0,0 +1,191 @@
{.used.}
import
std/sequtils,
stew/byteutils,
stew/shims/net,
testutils/unittests,
presto, presto/client as presto_client,
libp2p/crypto/crypto
import
../../waku/node/message_cache,
../../waku/common/base64,
../../waku/waku_core,
../../waku/waku_node,
../../waku/node/peer_manager,
../../waku/waku_filter,
../../waku/node/rest/server,
../../waku/node/rest/client,
../../waku/node/rest/responses,
../../waku/node/rest/filter/types,
../../waku/node/rest/filter/legacy_handlers as filter_api,
../../waku/node/rest/filter/legacy_client as filter_api_client,
../../waku/waku_relay,
../testlib/wakucore,
../testlib/wakunode
proc testWakuNode(): WakuNode =
let
privkey = generateSecp256k1Key()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(0)
return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))
type RestFilterTest = object
filterNode: WakuNode
clientNode: WakuNode
restServer: RestServerRef
messageCache: filter_api.MessageCache
client: RestClientRef
proc setupRestFilter(): Future[RestFilterTest] {.async.} =
result.filterNode = testWakuNode()
result.clientNode = testWakuNode()
await allFutures(result.filterNode.start(), result.clientNode.start())
await result.filterNode.mountFilter()
await result.clientNode.mountFilterClient()
result.clientNode.peerManager.addServicePeer(result.filterNode.peerInfo.toRemotePeerInfo()
,WakuLegacyFilterCodec)
let restPort = Port(58011)
let restAddress = ValidIpAddress.init("0.0.0.0")
result.restServer = RestServerRef.init(restAddress, restPort).tryGet()
result.messageCache = filter_api.MessageCache.init()
installLegacyFilterRestApiHandlers(result.restServer.router
,result.clientNode
,result.messageCache)
result.restServer.start()
result.client = newRestHttpClient(initTAddress(restAddress, restPort))
return result
proc shutdown(self: RestFilterTest) {.async.} =
await self.restServer.stop()
await self.restServer.closeWait()
await allFutures(self.filterNode.stop(), self.clientNode.stop())
suite "Waku v2 Rest API - Filter":
asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions":
# Given
let restFilterTest: RestFilterTest = await setupRestFilter()
# When
let contentFilters = @[DefaultContentTopic
,ContentTopic("2")
,ContentTopic("3")
,ContentTopic("4")
]
let requestBody = FilterLegacySubscribeRequest(contentFilters: contentFilters,
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
restFilterTest.messageCache.isSubscribed(DefaultContentTopic)
restFilterTest.messageCache.isSubscribed("2")
restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")
# When - error case
let badRequestBody = FilterLegacySubscribeRequest(contentFilters: @[]
,pubsubTopic: none(string))
let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody)
check:
badResponse.status == 400
$badResponse.contentType == $MIMETYPE_TEXT
badResponse.data == "Invalid content body, could not decode. Unable to deserialize data"
await restFilterTest.shutdown()
asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions":
# Given
let
restFilterTest: RestFilterTest = await setupRestFilter()
# When
restFilterTest.messageCache.subscribe("1")
restFilterTest.messageCache.subscribe("2")
restFilterTest.messageCache.subscribe("3")
restFilterTest.messageCache.subscribe("4")
let contentFilters = @[ContentTopic("1")
,ContentTopic("2")
,ContentTopic("3")
# ,ContentTopic("4") # Keep this subscription for check
]
# When
let requestBody = FilterLegacySubscribeRequest(contentFilters: contentFilters,
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
not restFilterTest.messageCache.isSubscribed("1")
not restFilterTest.messageCache.isSubscribed("2")
not restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")
await restFilterTest.shutdown()
asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}":
# Given
let
restFilterTest = await setupRestFilter()
let pubSubTopic = "/waku/2/default-waku/proto"
let contentTopic = ContentTopic( "content-topic-x" )
let messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
]
restFilterTest.messageCache.subscribe(contentTopic)
for msg in messages:
restFilterTest.messageCache.addMessage(contentTopic, msg)
# When
let response = await restFilterTest.client.filterGetMessagesV1(contentTopic)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.len == 3
response.data.all do (msg: FilterWakuMessage) -> bool:
msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get().string == "content-topic-x" and
msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0)
await restFilterTest.shutdown()

View File

@ -50,18 +50,18 @@ To run a specific test.
# Get a shell with the right environment variables set
./env.sh bash
# Run a specific test
nim c -r ./tests/test_waku_filter.nim
nim c -r ./tests/test_waku_filter_legacy.nim
```
You can also alter compile options. For example, if you want a less verbose output you can do the following. For more, refer to the [compiler flags](https://nim-lang.org/docs/nimc.html#compiler-usage) and [chronicles documentation](https://github.com/status-im/nim-chronicles#compile-time-configuration).
```bash
nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off ./tests/test_waku_filter.nim
nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off ./tests/waku_filter_v2/test_waku_filter.nim
```
You may also want to change the `outdir` to a folder ignored by git.
```bash
nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off --outdir=build ./tests/test_waku_filter.nim
nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off --outdir=build ./tests/waku_filter_v2/test_waku_filter.nim
```
### Waku Protocol Example

View File

@ -71,9 +71,9 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
if not node.wakuFilterLegacy.isNil():
# Map WakuFilter peers to WakuPeers and add to return list
let filterPeers = node.peerManager.peerStore.peers(WakuFilterCodec)
let filterPeers = node.peerManager.peerStore.peers(WakuLegacyFilterCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it),
protocol: WakuFilterCodec,
protocol: WakuLegacyFilterCodec,
connected: it.connectedness == Connectedness.Connected))
peers.add(filterPeers)

View File

@ -34,7 +34,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
## Subscribes a node to a list of content filters
debug "post_waku_v2_filter_v1_subscription"
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isNone():
raise newException(ValueError, "no suitable remote filter peers")
@ -43,7 +43,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
let subFut = node.filterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get())
let subFut = node.legacyFilterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get())
if not await subFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to subscribe to contentFilters")
@ -59,7 +59,11 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
let contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
let unsubFut = node.unsubscribe(pubsubTopic, contentTopics)
let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isNone():
raise newException(ValueError, "no suitable remote filter peers")
let unsubFut = node.legacyFilterUnsubscribe(pubsubTopic, contentTopics, peerOpt.get())
if not await unsubFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to unsubscribe from contentFilters")

View File

@ -44,6 +44,8 @@ proc unsubscribe*[K](t: MessageCache[K], topic: K) =
return
t.table.del(topic)
proc unsubscribeAll*[K](t: MessageCache[K]) =
t.table.clear()
proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) =
if not t.isSubscribed(topic):

View File

@ -4,8 +4,10 @@ else:
{.push raises: [].}
import
json,
std/sets,
stew/byteutils,
strformat,
chronicles,
json_serialization,
json_serialization/std/options,
@ -19,9 +21,9 @@ import
export types
logScope:
topics = "waku node rest client"
topics = "waku node rest client v2"
proc encodeBytes*(value: FilterSubscriptionsRequest,
proc encodeBytes*(value: FilterSubscribeRequest,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
@ -30,30 +32,68 @@ proc encodeBytes*(value: FilterSubscriptionsRequest,
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
proc decodeBytes*(t: typedesc[string], value: openarray[byte],
contentType: Opt[ContentTypeData]): RestResult[string] =
if MediaType.init($contentType) != MIMETYPE_TEXT:
proc encodeBytes*(value: FilterSubscriberPing,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
var res: string
if len(value) > 0:
res = newString(len(value))
copyMem(addr res[0], unsafeAddr value[0], len(value))
return ok(res)
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterPostSubscriptionsV1*(body: FilterSubscriptionsRequest):
RestResponse[string]
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.}
proc encodeBytes*(value: FilterUnsubscribeRequest,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterDeleteSubscriptionsV1*(body: FilterSubscriptionsRequest):
RestResponse[string]
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.}
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
data: openArray[byte],
proc encodeBytes*(value: FilterUnsubscribeAllRequest,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
proc decodeBytes*(t: typedesc[FilterSubscriptionResponse],
value: openarray[byte],
contentType: Opt[ContentTypeData]):
RestResult[FilterSubscriptionResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let decoded = ?decodeFromJsonBytes(FilterSubscriptionResponse, value)
return ok(decoded)
proc filterSubscriberPing*(requestId: string):
RestResponse[FilterSubscriptionResponse]
{.rest, endpoint: "/filter/v2/subscriptions/{requestId}", meth: HttpMethod.MethodGet.}
proc filterPostSubscriptions*(body: FilterSubscribeRequest):
RestResponse[FilterSubscriptionResponse]
{.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodPost.}
proc filterPutSubscriptions*(body: FilterSubscribeRequest):
RestResponse[FilterSubscriptionResponse]
{.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodPut.}
proc filterDeleteSubscriptions*(body: FilterUnsubscribeRequest):
RestResponse[FilterSubscriptionResponse]
{.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodDelete.}
proc filterDeleteAllSubscriptions*(body: FilterUnsubscribeAllRequest):
RestResponse[FilterSubscriptionResponse]
{.rest, endpoint: "/filter/v2/subscriptions/all", meth: HttpMethod.MethodDelete.}
proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
data: openArray[byte],
contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported response contentType value", contentType = contentType
@ -62,7 +102,6 @@ proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data)
return ok(decoded)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterGetMessagesV1*(contentTopic: string):
RestResponse[FilterGetMessagesResponse]
{.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.}
proc filterGetMessagesV1*(contentTopic: string):
RestResponse[FilterGetMessagesResponse]
{.rest, endpoint: "/filter/v2/messages/{contentTopic}", meth: HttpMethod.MethodGet.}

View File

@ -4,6 +4,7 @@ else:
{.push raises: [].}
import
std/strformat,
std/sequtils,
stew/byteutils,
chronicles,
@ -14,24 +15,28 @@ import
import
../../../waku_core,
../../../waku_filter,
../../../waku_filter/client,
../../../waku_filter_v2,
../../../waku_filter_v2/client as filter_protocol_client,
../../../waku_filter_v2/common as filter_protocol_type,
../../message_cache,
../../peer_manager,
../../waku_node,
../serdes,
../responses,
./types
export types
logScope:
topics = "waku node rest filter_api"
topics = "waku node rest filter_api_v2"
const futTimeoutForSubscriptionProcessing* = 5.seconds
const futTimeoutForSubscriptionProcessing* = 5.seconds
#### Request handlers
const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions"
const ROUTE_FILTER_SUBSCRIPTIONS* = "/filter/v2/subscriptions"
const ROUTE_FILTER_ALL_SUBSCRIPTIONS* = "/filter/v2/subscriptions/all"
const filterMessageCacheDefaultCapacity* = 30
@ -50,85 +55,258 @@ func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiR
let requestResult = decodeFromJsonBytes(T, reqBodyData)
if requestResult.isErr():
return err(RestApiResponse.badRequest("Invalid content body, could not decode. " &
return err(RestApiResponse.badRequest("Invalid content body, could not decode. " &
$requestResult.error))
return ok(requestResult.get())
proc installFilterPostSubscriptionsV1Handler*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
let pushHandler: FilterPushHandler =
proc(pubsubTopic: PubsubTopic,
msg: WakuMessage) {.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
proc getErrorCause(err: filter_protocol_type.FilterSubscribeError): string =
## Retrieve proper error cause of FilterSubscribeError - due stringify make some parts of text double
router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a pubsubTopic
# debug "post_waku_v2_filter_v1_subscriptions"
case err.kind:
of FilterSubscribeErrorKind.PEER_DIAL_FAILURE:
err.address
of FilterSubscribeErrorKind.BAD_RESPONSE, FilterSubscribeErrorKind.BAD_REQUEST,
FilterSubscribeErrorKind.NOT_FOUND, FilterSubscribeErrorKind.SERVICE_UNAVAILABLE:
err.cause
of FilterSubscribeErrorKind.UNKNOWN:
"UNKNOWN"
let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody)
proc convertResponse(T: type FilterSubscriptionResponse, requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeResult): T =
## Properly convert filter protocol's response to rest response
if decodedBody.isErr():
return decodedBody.error
if protocolClientRes.isErr():
return FilterSubscriptionResponse(
requestId: requestId,
statusCode: uint32(protocolClientRes.error().kind),
statusDesc: getErrorCause(protocolClientRes.error())
)
else:
return FilterSubscriptionResponse(
requestId: requestId,
statusCode: 0,
statusDesc: ""
)
let req: FilterSubscriptionsRequest = decodedBody.value()
proc convertResponse(T: type FilterSubscriptionResponse, requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeError): T =
## Properly convert filter protocol's response to rest response in case of error
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
return FilterSubscriptionResponse(
requestId: requestId,
statusCode: uint32(protocolClientRes.kind),
statusDesc: $protocolClientRes
)
if peerOpt.isNone():
return RestApiResponse.internalServerError("No suitable remote filter peers")
proc convertErrorKindToHttpStatus(kind: filter_protocol_type.FilterSubscribeErrorKind): HttpCode =
## Filter protocol's error code is not directly convertible to HttpCodes hence this converter
let subFut = node.filterSubscribe(req.pubsubTopic,
req.contentFilters,
pushHandler,
case kind:
of filter_protocol_type.FilterSubscribeErrorKind.UNKNOWN:
return Http200
of filter_protocol_type.FilterSubscribeErrorKind.PEER_DIAL_FAILURE:
return Http504 #gateway timout
of filter_protocol_type.FilterSubscribeErrorKind.BAD_RESPONSE:
return Http500 # internal server error
of filter_protocol_type.FilterSubscribeErrorKind.BAD_REQUEST:
return Http400
of filter_protocol_type.FilterSubscribeErrorKind.NOT_FOUND:
return Http404
of filter_protocol_type.FilterSubscribeErrorKind.SERVICE_UNAVAILABLE:
return Http503
else:
return Http500
proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeResult): RestApiResponse =
let filterSubscriptionResponse = FilterSubscriptionResponse.convertResponse(requestId, protocolClientRes)
var httpStatus : HttpCode = Http200
if protocolClientRes.isErr():
httpStatus = convertErrorKindToHttpStatus(protocolClientRes.error().kind) # TODO: convert status codes!
let resp = RestApiResponse.jsonResponse(filterSubscriptionResponse, status=httpStatus)
if resp.isErr():
error "An error ocurred while building the json respose: ", error=resp.error
return RestApiResponse.internalServerError(fmt("An error ocurred while building the json respose: {resp.error}"))
return resp.get()
proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeError): RestApiResponse =
let filterSubscriptionResponse = FilterSubscriptionResponse.convertResponse(requestId, protocolClientRes)
let httpStatus = convertErrorKindToHttpStatus(protocolClientRes.kind) # TODO: convert status codes!
let resp = RestApiResponse.jsonResponse(filterSubscriptionResponse, status=httpStatus)
if resp.isErr():
error "An error ocurred while building the json respose: ", error=resp.error
return RestApiResponse.internalServerError(fmt("An error ocurred while building the json respose: {resp.error}"))
return resp.get()
proc filterPostPutSubscriptionRequestHandler(node: WakuNode,
contentBody: Option[ContentBody],
cache: MessageCache):
Future[RestApiResponse]
{.async.} =
## handles any filter subscription requests, adds or modifies.
let decodedBody = decodeRequestBody[FilterSubscribeRequest](contentBody)
if decodedBody.isErr():
return makeRestResponse("unknown", FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}")))
let req: FilterSubscribeRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
if peerOpt.isNone():
return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("No suitable peers"))
let subFut = node.filterSubscribe(req.pubsubTopic,
req.contentFilters,
peerOpt.get())
if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to subscribe to contentFilters do to timeout!"
return RestApiResponse.internalServerError("Failed to subscribe to contentFilters")
if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to subscribe to contentFilters do to timeout!"
return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("Subscription request timed out"))
# Successfully subscribed to all content filters
for cTopic in req.contentFilters:
cache.subscribe(cTopic)
# Successfully subscribed to all content filters
for cTopic in req.contentFilters:
cache.subscribe(cTopic)
return RestApiResponse.ok()
return makeRestResponse(req.requestId, subFut.read())
proc installFilterDeleteSubscriptionsV1Handler*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
proc installFilterPostSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a pubsubTopic
debug "post", ROUTE_FILTER_SUBSCRIPTIONS, contentBody
let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache)
return response
proc installFilterPutSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodPut, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Modifies a subscribtion of a node to a list of contentTopics of a pubsubTopic
debug "put", ROUTE_FILTER_SUBSCRIPTIONS, contentBody
let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache)
return response
proc installFilterDeleteSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
# debug "delete_waku_v2_filter_v1_subscriptions"
debug "delete", ROUTE_FILTER_SUBSCRIPTIONS, contentBody
let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody)
let decodedBody = decodeRequestBody[FilterUnsubscribeRequest](contentBody)
if decodedBody.isErr():
return decodedBody.error
return makeRestResponse("unknown",
FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}")))
let req: FilterSubscriptionsRequest = decodedBody.value()
let req: FilterUnsubscribeRequest = decodedBody.value()
let unsubFut = node.unsubscribe(req.pubsubTopic, req.contentFilters)
let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
if peerOpt.isNone():
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable("No suitable peers"))
let unsubFut = node.filterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get())
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters")
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable(
"Failed to unsubscribe from contentFilters due to timeout!"))
# Successfully subscribed to all content filters
for cTopic in req.contentFilters:
cache.unsubscribe(cTopic)
# Successfully unsubscribed from all requested contentTopics
return RestApiResponse.ok()
return makeRestResponse(req.requestId, unsubFut.read())
const ROUTE_RELAY_MESSAGESV1* = "/filter/v1/messages/{contentTopic}"
proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodDelete, ROUTE_FILTER_ALL_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
debug "delete", ROUTE_FILTER_ALL_SUBSCRIPTIONS, contentBody
proc installFilterGetMessagesV1Handler*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (contentTopic: string) -> RestApiResponse:
let decodedBody = decodeRequestBody[FilterUnsubscribeAllRequest](contentBody)
if decodedBody.isErr():
return makeRestResponse("unknown",
FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}")))
let req: FilterUnsubscribeAllRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
if peerOpt.isNone():
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable("No suitable peers"))
let unsubFut = node.filterUnsubscribeAll(peerOpt.get())
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable(
"Failed to unsubscribe from all contentFilters due to timeout!"))
cache.unsubscribeAll()
# Successfully unsubscribed from all requested contentTopics
return makeRestResponse(req.requestId, unsubFut.read())
const ROUTE_FILTER_SUBSCRIBER_PING* = "/filter/v2/subscriptions/{requestId}"
proc installFilterPingSubscriberHandler(router: var RestRouter,
node: WakuNode) =
router.api(MethodGet, ROUTE_FILTER_SUBSCRIBER_PING) do (requestId: string) -> RestApiResponse:
## Checks if a node has valid subscription or not.
debug "get", ROUTE_FILTER_SUBSCRIBER_PING, requestId
let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
if peerOpt.isNone():
return makeRestResponse(requestId.get(),
FilterSubscribeError.serviceUnavailable("No suitable remote filter peers"))
let pingFutRes = node.wakuFilterClient.ping(peerOpt.get())
if not await pingFutRes.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to ping filter service peer due to timeout!"
return makeRestResponse(requestId.get(),
FilterSubscribeError.serviceUnavailable("Ping timed out"))
return makeRestResponse(requestId.get(), pingFutRes.read())
const ROUTE_FILTER_MESSAGES* = "/filter/v2/messages/{contentTopic}"
proc installFilterGetMessagesHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
let pushHandler : FilterPushHandler = proc (pubsubTopic: PubsubTopic,
msg: WakuMessage)
{.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
node.wakuFilterClient.registerPushHandler(pushHandler)
router.api(MethodGet, ROUTE_FILTER_MESSAGES) do (contentTopic: string) -> RestApiResponse:
## Returns all WakuMessages received on a specified content topic since the
## last time this method was called
## TODO: ability to specify a return message limit
# debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic
## TODO: ability to specify a return message limit, maybe use cursor to control paging response.
debug "get", ROUTE_FILTER_MESSAGES, contentTopic=contentTopic
if contentTopic.isErr():
return RestApiResponse.badRequest("Missing contentTopic")
@ -147,9 +325,12 @@ proc installFilterGetMessagesV1Handler*(router: var RestRouter,
return resp.get()
proc installFilterApiHandlers*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
installFilterPostSubscriptionsV1Handler(router, node, cache)
installFilterDeleteSubscriptionsV1Handler(router, node, cache)
installFilterGetMessagesV1Handler(router, node, cache)
proc installFilterRestApiHandlers*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
installFilterPingSubscriberHandler(router, node)
installFilterPostSubscriptionsHandler(router, node, cache)
installFilterPutSubscriptionsHandler(router, node, cache)
installFilterDeleteSubscriptionsHandler(router, node, cache)
installFilterDeleteAllSubscriptionsHandler(router, node, cache)
installFilterGetMessagesHandler(router, node, cache)

View File

@ -0,0 +1,68 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/sets,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import
../../../waku_core,
../serdes,
../responses,
./types
export types
logScope:
topics = "waku node rest client v1"
proc encodeBytes*(value: FilterLegacySubscribeRequest,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
proc decodeBytes*(t: typedesc[string], value: openarray[byte],
contentType: Opt[ContentTypeData]): RestResult[string] =
if MediaType.init($contentType) != MIMETYPE_TEXT:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
var res: string
if len(value) > 0:
res = newString(len(value))
copyMem(addr res[0], unsafeAddr value[0], len(value))
return ok(res)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterPostSubscriptionsV1*(body: FilterLegacySubscribeRequest):
RestResponse[string]
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterDeleteSubscriptionsV1*(body: FilterLegacySubscribeRequest):
RestResponse[string]
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.}
proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
data: openArray[byte],
contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported response contentType value", contentType = contentType
return err("Unsupported response contentType")
let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data)
return ok(decoded)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterGetMessagesV1*(contentTopic: string):
RestResponse[FilterGetMessagesResponse]
{.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.}

View File

@ -0,0 +1,160 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/sequtils,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/route,
presto/common
import
../../../waku_core,
../../../waku_filter,
../../../waku_filter/client,
../../message_cache,
../../peer_manager,
../../waku_node,
../serdes,
../responses,
./types
export types
logScope:
topics = "waku node rest filter_api v1"
const futTimeoutForSubscriptionProcessing* = 5.seconds
#### Request handlers
const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions"
const filterMessageCacheDefaultCapacity* = 30
type
MessageCache* = message_cache.MessageCache[ContentTopic]
func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] =
if contentBody.isNone():
return err(RestApiResponse.badRequest("Missing content body"))
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json"))
let reqBodyData = contentBody.get().data
let requestResult = decodeFromJsonBytes(T, reqBodyData)
if requestResult.isErr():
return err(RestApiResponse.badRequest("Invalid content body, could not decode. " &
$requestResult.error))
return ok(requestResult.get())
proc installFilterV1PostSubscriptionsV1Handler*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
let pushHandler: FilterPushHandler =
proc(pubsubTopic: PubsubTopic,
msg: WakuMessage) {.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a pubsubTopic
debug "post", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody
let decodedBody = decodeRequestBody[FilterLegacySubscribeRequest](contentBody)
if decodedBody.isErr():
return decodedBody.error
let req: FilterLegacySubscribeRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isNone():
return RestApiResponse.internalServerError("No suitable remote filter peers")
let subFut = node.legacyFilterSubscribe(req.pubsubTopic,
req.contentFilters,
pushHandler,
peerOpt.get())
if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to subscribe to contentFilters do to timeout!"
return RestApiResponse.internalServerError("Failed to subscribe to contentFilters")
# Successfully subscribed to all content filters
for cTopic in req.contentFilters:
cache.subscribe(cTopic)
return RestApiResponse.ok()
proc installFilterV1DeleteSubscriptionsV1Handler*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
debug "delete", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody
let decodedBody = decodeRequestBody[FilterLegacySubscribeRequest](contentBody)
if decodedBody.isErr():
return decodedBody.error
let req: FilterLegacySubscribeRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isNone():
return RestApiResponse.internalServerError("No suitable remote filter peers")
let unsubFut = node.legacyFilterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get())
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters")
for cTopic in req.contentFilters:
cache.unsubscribe(cTopic)
# Successfully unsubscribed from all requested contentTopics
return RestApiResponse.ok()
const ROUTE_FILTER_MESSAGESV1* = "/filter/v1/messages/{contentTopic}"
proc installFilterV1GetMessagesV1Handler*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodGet, ROUTE_FILTER_MESSAGESV1) do (contentTopic: string) -> RestApiResponse:
## Returns all WakuMessages received on a specified content topic since the
## last time this method was called
## TODO: ability to specify a return message limit
debug "get", ROUTE_FILTER_MESSAGESV1, contentTopic=contentTopic
if contentTopic.isErr():
return RestApiResponse.badRequest("Missing contentTopic")
let contentTopic = contentTopic.get()
let msgRes = cache.getMessages(contentTopic, clear=true)
if msgRes.isErr():
return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic)
let data = FilterGetMessagesResponse(msgRes.get().map(toFilterWakuMessage))
let resp = RestApiResponse.jsonResponse(data, status=Http200)
if resp.isErr():
error "An error ocurred while building the json respose: ", error=resp.error
return RestApiResponse.internalServerError("An error ocurred while building the json respose")
return resp.get()
proc installLegacyFilterRestApiHandlers*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
installFilterV1PostSubscriptionsV1Handler(router, node, cache)
installFilterV1DeleteSubscriptionsV1Handler(router, node, cache)
installFilterV1GetMessagesV1Handler(router, node, cache)

View File

@ -1,6 +1,6 @@
openapi: 3.0.3
info:
title: Waku V2 node REST API
title: Waku V2 node REST API
version: 1.0.0
contact:
name: VAC Team
@ -10,6 +10,8 @@ tags:
description: Filter REST API for WakuV2 node
paths:
# Legacy support for v1 waku filter
# TODO: legacy endpoint, remove in the future
/filter/v1/subscriptions:
post: # post_waku_v2_filter_v1_subscription
summary: Subscribe a node to an array of topics
@ -21,7 +23,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionsRequest'
$ref: '#/components/schemas/FilterLegacySubscribeRequest'
responses:
'200':
description: OK
@ -32,8 +34,16 @@ paths:
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
text/plain:
schema:
type: string
'5XX':
description: Unexpected error.
content:
text/plain:
schema:
type: string
delete: # delete_waku_v2_filter_v1_subscription
summary: Unsubscribe a node from an array of topics
@ -45,7 +55,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionsRequest'
$ref: '#/components/schemas/FilterLegacySubscribeRequest'
responses:
'200':
description: OK
@ -56,12 +66,24 @@ paths:
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
text/plain:
schema:
type: string
'404':
description: Not found.
content:
text/plain:
schema:
type: string
'5XX':
description: Unexpected error.
content:
text/plain:
schema:
type: string
# TODO: Review the path of this endpoint due maybe query for list of contentTopics matching
# TODO: legacy endpoint, remove in the future
/filter/v1/messages/{contentTopic}:
get: # get_waku_v2_filter_v1_messages
summary: Get the latest messages on the polled content topic
@ -86,10 +108,270 @@ paths:
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
text/plain:
schema:
type: string
'404':
description: Not found.
content:
text/plain:
schema:
type: string
'5XX':
description: Unexpected error.
content:
text/plain:
schema:
type: string
/filter/v2/subscriptions/{requestId}:
get: # get_waku_v2_filter_v2_subscription - ping
summary: Subscriber-ping - a peer can query if there is a registered subscription for it
description: |
Subscriber peer can query its subscription existence on service node.
Returns HTTP200 if exists and HTTP404 if not.
Client must not fill anything but requestId in the request body.
operationId: subscriberPing
tags:
- filter
parameters:
- in: path
name: requestId
required: true
schema:
type: string
description: Id of ping request
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
/filter/v2/subscriptions:
post: # post_waku_v2_filter_v2_subscription
summary: Subscribe a peer to an array of content topics under a pubsubTopic
description: |
Subscribe a peer to an array of content topics under a pubsubTopic.
It is allowed to refresh or add new content topic to an existing subscription.
Fields pubsubTopic and contentFilters must be filled.
operationId: postSubscriptions
tags:
- filter
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscribeRequest'
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
put: # put_waku_v2_filter_v2_subscription
summary: Modify existing subscription of a peer under a pubsubTopic
description: |
Modify existing subscription of a peer under a pubsubTopic.
It is allowed to refresh or add new content topic to an existing subscription.
Fields pubsubTopic and contentFilters must be filled.
operationId: putSubscriptions
tags:
- filter
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscribeRequest'
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
delete: # delete_waku_v2_filter_v2_subscription
summary: Unsubscribe a peer from content topics
description: |
Unsubscribe a peer from content topics
Only that subscription will be removed which matches existing.
operationId: deleteSubscriptions
tags:
- filter
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterUnsubscribeRequest'
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
/filter/v2/subscriptions/all:
delete: # delete_waku_v2_filter_v2_subscription
summary: Unsubscribe a peer from all content topics
description: |
Unsubscribe a peer from all content topics
operationId: deleteAllSubscriptions
tags:
- filter
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterUnsubscribeAllRequest'
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
/filter/v2/messages/{contentTopic}:
get: # get_waku_v2_filter_v2_messages
summary: Get the latest messages on the polled content topic
description: Get a list of messages that were received on a subscribed content topic after the last time this method was called.
operationId: getMessagesByTopic
tags:
- filter
parameters:
- in: path
name: contentTopic # Note the name is the same as in the path
required: true
schema:
type: string
description: Content topic of message
responses:
'200':
description: The latest messages on the polled topic.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterGetMessagesResponse'
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
text/plain:
schema:
type: string
'404':
description: Not found.
content:
text/plain:
schema:
type: string
'5XX':
description: Unexpected error.
content:
text/plain:
schema:
type: string
components:
schemas:
@ -97,7 +379,7 @@ components:
type: string
ContentTopic:
type: string
FilterWakuMessage:
type: object
properties:
@ -113,19 +395,68 @@ components:
required:
- payload
FilterSubscriptionsRequest:
FilterLegacySubscribeRequest:
type: object
properties:
properties:
contentFilters:
type: array
items:
$ref: '#/components/schemas/ContentTopic'
pubsubTopic:
$ref: "#/components/schemas/PubSubTopic"
required:
required:
- contentFilters
FilterGetMessagesResponse:
type: array
items:
$ref: '#/components/schemas/FilterWakuMessage'
$ref: '#/components/schemas/FilterWakuMessage'
FilterSubscribeRequest:
type: object
properties:
requestId:
type: string
contentFilters:
type: array
items:
$ref: '#/components/schemas/ContentTopic'
pubsubTopic:
$ref: "#/components/schemas/PubSubTopic"
required:
- requestId
- contentFilters
- pubsubTopic
FilterUnsubscribeRequest:
type: object
properties:
requestId:
type: string
contentFilters:
type: array
items:
$ref: '#/components/schemas/ContentTopic'
pubsubTopic:
$ref: "#/components/schemas/PubSubTopic"
required:
- requestId
- contentFilters
FilterUnsubscribeAllRequest:
type: object
properties:
requestId:
type: string
required:
- requestId
FilterSubscriptionResponse:
type: object
properties:
requestId:
type: string
statusDesc:
type: string
required:
- requestId

View File

@ -8,7 +8,8 @@ import
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
presto/[route, client, common],
libp2p/peerid
import
../../../common/base64,
../../../waku_core,
@ -24,10 +25,32 @@ type FilterWakuMessage* = object
type FilterGetMessagesResponse* = seq[FilterWakuMessage]
type FilterSubscriptionsRequest* = object
type FilterLegacySubscribeRequest* = object
# Subscription request for legacy filter support
pubsubTopic*: Option[PubSubTopic]
contentFilters*: seq[ContentTopic]
type FilterSubscriberPing* = object
requestId*: string
type FilterSubscribeRequest* = object
requestId*: string
pubsubTopic*: Option[PubSubTopic]
contentFilters*: seq[ContentTopic]
type FilterUnsubscribeRequest* = object
requestId*: string
pubsubTopic*: Option[PubSubTopic]
contentFilters*: seq[ContentTopic]
type FilterUnsubscribeAllRequest* = object
requestId*: string
type FilterSubscriptionResponse* = object
requestId*: string
statusCode*: uint32
statusDesc*: string
#### Type conversion
proc toFilterWakuMessage*(msg: WakuMessage): FilterWakuMessage =
@ -65,7 +88,7 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: FilterWakuMessage)
writer.writeField("timestamp", value.timestamp)
writer.endRecord()
proc writeValue*(writer: var JsonWriter[RestJson], value: FilterSubscriptionsRequest)
proc writeValue*(writer: var JsonWriter[RestJson], value: FilterLegacySubscribeRequest)
{.raises: [IOError].} =
writer.beginRecord()
writer.writeField("pubsubTopic", value.pubsubTopic)
@ -114,8 +137,8 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage)
timestamp: timestamp
)
proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionsRequest)
{.raises: [SerializationError, IOError].} =
proc readValue*(reader: var JsonReader[RestJson], value: var FilterLegacySubscribeRequest)
{.raises: [SerializationError, IOError].} =
var
pubsubTopic = none(PubsubTopic)
contentFilters = none(seq[ContentTopic])
@ -126,7 +149,7 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterSubscriptionsRequest")
reader.raiseUnexpectedField(err, "FilterLegacySubscribeRequest")
case fieldName
of "pubsubTopic":
@ -136,8 +159,70 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions
else:
unrecognizedFieldWarning()
if pubsubTopic.isNone():
reader.raiseUnexpectedValue("Field `pubsubTopic` is missing")
if contentFilters.isNone():
reader.raiseUnexpectedValue("Field `contentFilters` is missing")
if contentFilters.get().len() == 0:
reader.raiseUnexpectedValue("Field `contentFilters` is empty")
value = FilterLegacySubscribeRequest(
pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()),
contentFilters: contentFilters.get()
)
proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriberPing)
{.raises: [SerializationError, IOError].} =
var
requestId = none(string)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterSubscriberPing")
case fieldName
of "requestId":
requestId = some(reader.readValue(string))
else:
unrecognizedFieldWarning()
if requestId.isNone():
reader.raiseUnexpectedValue("Field `requestId` is missing")
value = FilterSubscriberPing(
requestId: requestId.get()
)
proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscribeRequest)
{.raises: [SerializationError, IOError].} =
var
requestId = none(string)
pubsubTopic = none(PubsubTopic)
contentFilters = none(seq[ContentTopic])
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterSubscribeRequest")
case fieldName
of "requestId":
requestId = some(reader.readValue(string))
of "pubsubTopic":
pubsubTopic = some(reader.readValue(PubsubTopic))
of "contentFilters":
contentFilters = some(reader.readValue(seq[ContentTopic]))
else:
unrecognizedFieldWarning()
if requestId.isNone():
reader.raiseUnexpectedValue("Field `requestId` is missing")
if contentFilters.isNone():
reader.raiseUnexpectedValue("Field `contentFilters` is missing")
@ -145,7 +230,108 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions
if contentFilters.get().len() == 0:
reader.raiseUnexpectedValue("Field `contentFilters` is empty")
value = FilterSubscriptionsRequest(
pubsubTopic: if pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()),
value = FilterSubscribeRequest(
requestId: requestId.get(),
pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()),
contentFilters: contentFilters.get()
)
)
proc readValue*(reader: var JsonReader[RestJson], value: var FilterUnsubscribeRequest)
{.raises: [SerializationError, IOError].} =
var
requestId = none(string)
pubsubTopic = none(PubsubTopic)
contentFilters = none(seq[ContentTopic])
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterUnsubscribeRequest")
case fieldName
of "requestId":
requestId = some(reader.readValue(string))
of "pubsubTopic":
pubsubTopic = some(reader.readValue(PubsubTopic))
of "contentFilters":
contentFilters = some(reader.readValue(seq[ContentTopic]))
else:
unrecognizedFieldWarning()
if requestId.isNone():
reader.raiseUnexpectedValue("Field `requestId` is missing")
if contentFilters.isNone():
reader.raiseUnexpectedValue("Field `contentFilters` is missing")
if contentFilters.get().len() == 0:
reader.raiseUnexpectedValue("Field `contentFilters` is empty")
value = FilterUnsubscribeRequest(
requestId: requestId.get(),
pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()),
contentFilters: contentFilters.get()
)
proc readValue*(reader: var JsonReader[RestJson], value: var FilterUnsubscribeAllRequest)
{.raises: [SerializationError, IOError].} =
var
requestId = none(string)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterUnsubscribeAllRequest")
case fieldName
of "requestId":
requestId = some(reader.readValue(string))
else:
unrecognizedFieldWarning()
if requestId.isNone():
reader.raiseUnexpectedValue("Field `requestId` is missing")
value = FilterUnsubscribeAllRequest(
requestId: requestId.get(),
)
proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionResponse)
{.raises: [SerializationError, IOError].} =
var
requestId = none(string)
statusCode = none(uint32)
statusDesc = none(string)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterSubscriptionResponse")
case fieldName
of "requestId":
requestId = some(reader.readValue(string))
of "statusCode":
statusCode = some(reader.readValue(uint32))
of "statusDesc":
statusDesc = some(reader.readValue(string))
else:
unrecognizedFieldWarning()
if requestId.isNone():
reader.raiseUnexpectedValue("Field `requestId` is missing")
value = FilterSubscriptionResponse(
requestId: requestId.get(),
statusCode: statusCode.get(),
statusDesc: statusDesc.get("")
)

View File

@ -31,8 +31,9 @@ import
../waku_store,
../waku_store/client as store_client,
../waku_filter as legacy_filter, #TODO: support for legacy filter protocol will be removed
../waku_filter/client as filter_client, #TODO: support for legacy filter protocol will be removed
../waku_filter/client as legacy_filter_client, #TODO: support for legacy filter protocol will be removed
../waku_filter_v2,
../waku_filter_v2/client as filter_client,
../waku_lightpush,
../waku_lightpush/client as lightpush_client,
../waku_enr,
@ -41,7 +42,8 @@ import
../waku_rln_relay,
./config,
./peer_manager,
./waku_switch
./waku_switch,
./rest/relay/topic_cache
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
@ -88,8 +90,9 @@ type
wakuStore*: WakuStore
wakuStoreClient*: WakuStoreClient
wakuFilter*: waku_filter_v2.WakuFilter
wakuFilterClient*: filter_client.WakuFilterClient
wakuFilterLegacy*: legacy_filter.WakuFilterLegacy #TODO: support for legacy filter protocol will be removed
wakuFilterClientLegacy*: WakuFilterClientLegacy #TODO: support for legacy filter protocol will be removed
wakuFilterClientLegacy*: legacy_filter_client.WakuFilterClientLegacy #TODO: support for legacy filter protocol will be removed
wakuRlnRelay*: WakuRLNRelay
wakuLightPush*: WakuLightPush
wakuLightpushClient*: WakuLightPushClient
@ -335,10 +338,10 @@ proc mountRelay*(node: WakuNode,
for topic in topics:
node.subscribe(topic)
## Waku filter
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} =
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout)
{.async, raises: [Defect, LPError]} =
info "mounting filter protocol"
node.wakuFilter = WakuFilter.new(node.peerManager)
node.wakuFilterLegacy = WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout) #TODO: remove legacy
@ -348,32 +351,47 @@ proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {
await node.wakuFilterLegacy.start() #TODO: remove legacy
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec))
node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuFilterCodec)) #TODO: remove legacy
node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuLegacyFilterCodec)) #TODO: remove legacy
proc filterHandleMessage*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.}=
if node.wakuFilter.isNil():
proc filterHandleMessage*(node: WakuNode,
pubsubTopic: PubsubTopic,
message: WakuMessage)
{.async.}=
if node.wakuFilter.isNil() or node.wakuFilterLegacy.isNil():
error "cannot handle filter message", error="waku filter is nil"
return
await node.wakuFilter.handleMessage(pubsubTopic, message)
await node.wakuFilterLegacy.handleMessage(pubsubTopic, message) #TODO: remove legacy
await allFutures(node.wakuFilter.handleMessage(pubsubTopic, message),
node.wakuFilterLegacy.handleMessage(pubsubTopic, message) #TODO: remove legacy
)
proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} =
## Mounting both filter clients v1 - legacy and v2.
## Giving option for application level to chose btw own push message handling or
## rely on node provided cache. - This only applies for v2 filter client
info "mounting filter client"
node.wakuFilterClientLegacy = WakuFilterClientLegacy.new(node.peerManager, node.rng)
node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng)
if node.started:
# Node has started already. Let's start filter too.
await node.wakuFilterClientLegacy.start()
await allFutures(node.wakuFilterClientLegacy.start(), node.wakuFilterClient.start())
node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuFilterCodec))
node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec))
node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuLegacyFilterCodec))
proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic],
handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
proc legacyFilterSubscribe*(node: WakuNode,
pubsubTopic: Option[PubsubTopic],
contentTopics: ContentTopic|seq[ContentTopic],
handler: FilterPushHandler,
peer: RemotePeerInfo|string)
{.async, gcsafe, raises: [Defect, ValueError].} =
## Registers for messages that match a specific filter.
## Triggers the handler whenever a message is received.
if node.wakuFilterClientLegacy.isNil():
error "cannot register filter subscription to topic", error="waku filter client is nil"
error "cannot register filter subscription to topic", error="waku legacy filter client is not set up"
return
let remotePeerRes = parsePeerInfo(peer)
@ -385,21 +403,31 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT
# Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled
# TODO: Move this logic to wakunode2 app
let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} =
if node.wakuRelay.isNil() and not node.wakuStore.isNil():
await node.wakuArchive.handleMessage(pubSubTopic, message)
await handler(pubsubTopic, message)
# FIXME: This part needs refactoring. It seems possible that in special cases archiver will store same message multiple times.
let handlerWrapper: FilterPushHandler =
if node.wakuRelay.isNil() and not node.wakuStore.isNil():
proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} =
await allFutures(node.wakuArchive.handleMessage(pubSubTopic, message),
handler(pubsubTopic, message))
else:
handler
if pubsubTopic.isSome():
info "registering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
info "registering legacy filter subscription to content",
pubsubTopic=pubsubTopic.get(),
contentTopics=contentTopics,
peer=remotePeer.peerId
let res = await node.wakuFilterClientLegacy.subscribe(pubsubTopic.get(), contentTopics, handlerWrapper, peer=remotePeer)
let res = await node.wakuFilterClientLegacy.subscribe(pubsubTopic.get(),
contentTopics,
handlerWrapper,
peer=remotePeer)
if res.isOk():
info "subscribed to topic", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics
info "subscribed to topic", pubsubTopic=pubsubTopic.get(),
contentTopics=contentTopics
else:
error "failed filter subscription", error=res.error
error "failed legacy filter subscription", error=res.error
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
else:
let topicMapRes = parseSharding(pubsubTopic, contentTopics)
@ -412,7 +440,11 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT
var futures = collect(newSeq):
for pubsub, topics in topicMap.pairs:
info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId
info "registering legacy filter subscription to content",
pubsubTopic=pubsub,
contentTopics=topics,
peer=remotePeer.peerId
let content = topics.mapIt($it)
node.wakuFilterClientLegacy.subscribe($pubsub, content, handlerWrapper, peer=remotePeer)
@ -422,15 +454,82 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT
let res = fut.read()
if res.isErr():
error "failed filter subscription", error=res.error
error "failed legacy filter subscription", error=res.error
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
for pubsub, topics in topicMap.pairs:
info "subscribed to topic", pubsubTopic=pubsub, contentTopics=topics
proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic],
peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} =
## Unsubscribe from a content filter.
proc filterSubscribe*(node: WakuNode,
pubsubTopic: Option[PubsubTopic],
contentTopics: ContentTopic|seq[ContentTopic],
peer: RemotePeerInfo|string):
Future[FilterSubscribeResult]
{.async, gcsafe, raises: [Defect, ValueError].} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
if node.wakuFilterClient.isNil():
error "cannot register filter subscription to topic", error="waku filter client is not set up"
return err(FilterSubscribeError.serviceUnavailable())
let remotePeerRes = parsePeerInfo(peer)
if remotePeerRes.isErr():
error "Couldn't parse the peer info properly", error = remotePeerRes.error
return err(FilterSubscribeError.serviceUnavailable("No peers available"))
let remotePeer = remotePeerRes.value
if pubsubTopic.isSome():
info "registering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
let subRes = await node.wakuFilterClient.subscribe(remotePeer, pubsubTopic.get(), contentTopics)
if subRes.isOk():
info "v2 subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics
else:
error "failed filter v2 subscription", error=subRes.error
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
return subRes
else:
let topicMapRes = parseSharding(pubsubTopic, contentTopics)
let topicMap =
if topicMapRes.isErr():
error "can't get shard", error=topicMapRes.error
return err(FilterSubscribeError.badResponse("can't get shard"))
else: topicMapRes.get()
var futures = collect(newSeq):
for pubsub, topics in topicMap.pairs:
info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId
let content = topics.mapIt($it)
node.wakuFilterClient.subscribe(remotePeer, $pubsub, content)
let finished = await allFinished(futures)
var subRes: FilterSubscribeResult = FilterSubscribeResult.ok()
for fut in finished:
let res = fut.read()
if res.isErr():
error "failed filter subscription", error=res.error
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
subRes = FilterSubscribeResult.err(res.error)
for pubsub, topics in topicMap.pairs:
info "subscribed to topic", pubsubTopic=pubsub, contentTopics=topics
# return the last error or ok
return subRes
proc legacyFilterUnsubscribe*(node: WakuNode,
pubsubTopic: Option[PubsubTopic],
contentTopics: ContentTopic|seq[ContentTopic],
peer: RemotePeerInfo|string)
{.async, gcsafe, raises: [Defect, ValueError].} =
## Unsubscribe from a content legacy filter.
if node.wakuFilterClientLegacy.isNil():
error "cannot unregister filter subscription to content", error="waku filter client is nil"
return
@ -443,7 +542,7 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], conten
let remotePeer = remotePeerRes.value
if pubsubTopic.isSome():
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
info "deregistering legacy filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
let res = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic.get(), contentTopics, peer=remotePeer)
@ -479,35 +578,103 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], conten
for pubsub, topics in topicMap.pairs:
info "unsubscribed from topic", pubsubTopic=pubsub, contentTopics=topics
# TODO: Move to application module (e.g., wakunode2.nim)
proc subscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe,
deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
if node.wakuFilterClientLegacy.isNil():
error "cannot register filter subscription to topic", error="waku filter client is nil"
return
proc filterUnsubscribe*(node: WakuNode,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: RemotePeerInfo|string):
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers"
return
Future[FilterSubscribeResult]
await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get())
{.async, gcsafe, raises: [Defect, ValueError].} =
# TODO: Move to application module (e.g., wakunode2.nim)
proc unsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe,
deprecated: "Use the explicit destination peer procedure. Use 'node.filterUnsusbscribe()' instead.".} =
## Unsubscribe from a content filter.
## Unsubscribe from a content filter V2".
if node.wakuFilterClientLegacy.isNil():
error "cannot unregister filter subscription to content", error="waku filter client is nil"
return
return err(FilterSubscribeError.serviceUnavailable())
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers"
return
let remotePeerRes = parsePeerInfo(peer)
if remotePeerRes.isErr():
error "couldn't parse remotePeerInfo", error = remotePeerRes.error
return err(FilterSubscribeError.serviceUnavailable("No peers available"))
await node.filterUnsubscribe(pubsubTopic, contentTopics, peer=peerOpt.get())
let remotePeer = remotePeerRes.value
if pubsubTopic.isSome():
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
let unsubRes = await node.wakuFilterClient.unsubscribe(remotePeer, pubsubTopic.get(), contentTopics)
if unsubRes.isOk():
info "unsubscribed from topic", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics
else:
error "failed filter unsubscription", error=unsubRes.error
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
return unsubRes
else: # pubsubTopic.isNone
let topicMapRes = parseSharding(pubsubTopic, contentTopics)
let topicMap =
if topicMapRes.isErr():
error "can't get shard", error = topicMapRes.error
return err(FilterSubscribeError.badResponse("can't get shard"))
else: topicMapRes.get()
var futures = collect(newSeq):
for pubsub, topics in topicMap.pairs:
info "deregistering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId
let content = topics.mapIt($it)
node.wakuFilterClient.unsubscribe(remotePeer, $pubsub, content)
let finished = await allFinished(futures)
var unsubRes: FilterSubscribeResult = FilterSubscribeResult.ok()
for fut in finished:
let res = fut.read()
if res.isErr():
error "failed filter unsubscription", error=res.error
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
unsubRes = FilterSubscribeResult.err(res.error)
for pubsub, topics in topicMap.pairs:
info "unsubscribed from topic", pubsubTopic=pubsub, contentTopics=topics
# return the last error or ok
return unsubRes
proc filterUnsubscribeAll*(node: WakuNode,
peer: RemotePeerInfo|string):
Future[FilterSubscribeResult]
{.async, gcsafe, raises: [Defect, ValueError].} =
## Unsubscribe from a content filter V2".
if node.wakuFilterClientLegacy.isNil():
error "cannot unregister filter subscription to content", error="waku filter client is nil"
return err(FilterSubscribeError.serviceUnavailable())
let remotePeerRes = parsePeerInfo(peer)
if remotePeerRes.isErr():
error "couldn't parse remotePeerInfo", error = remotePeerRes.error
return err(FilterSubscribeError.serviceUnavailable("No peers available"))
let remotePeer = remotePeerRes.value
info "deregistering all filter subscription to content", peer=remotePeer.peerId
let unsubRes = await node.wakuFilterClient.unsubscribeAll(remotePeer)
if unsubRes.isOk():
info "unsubscribed from all content-topic", peerId=remotePeer.peerId
else:
error "failed filter unsubscription from all content-topic", error=unsubRes.error
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
return unsubRes
# NOTICE: subscribe / unsubscribe methods are removed - they were already depricated
# yet incompatible to handle both type of filters - use specific filter registration instead
## Waku archive
const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes

View File

@ -17,8 +17,8 @@ import
libp2p/multicodec,
libp2p/peerid,
libp2p/peerinfo,
libp2p/routing_record
libp2p/routing_record,
json_serialization
type
Connectedness* = enum
@ -62,6 +62,9 @@ type RemotePeerInfo* = ref object
func `$`*(remotePeerInfo: RemotePeerInfo): string =
$remotePeerInfo.peerId
proc writeValue*(w: var JsonWriter, value: RemotePeerInfo) {.inline, raises: [IOError].} =
w.writeValue $value
proc init*(
T: typedesc[RemotePeerInfo],
peerId: PeerID,

View File

@ -20,7 +20,6 @@ import
./protocol,
./protocol_metrics
logScope:
topics = "waku filter client"
@ -71,7 +70,7 @@ proc initProtocolHandler(wf: WakuFilterClientLegacy) =
wf.handleMessagePush(peerId, requestId, push)
wf.handler = handle
wf.codec = WakuFilterCodec
wf.codec = WakuLegacyFilterCodec
proc new*(T: type WakuFilterClientLegacy,
peerManager: PeerManager,
@ -87,7 +86,7 @@ proc new*(T: type WakuFilterClientLegacy,
proc sendFilterRpc(wf: WakuFilterClientLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}=
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
let connOpt = await wf.peerManager.dialPeer(peer, WakuLegacyFilterCodec)
if connOpt.isNone():
return err(dialFailure)
let connection = connOpt.get()
@ -155,6 +154,8 @@ proc unsubscribe*(wf: WakuFilterClientLegacy,
if sendRes.isErr():
return err(sendRes.error)
# FIXME: I see an issue here that such solution prevents filtering client to properly manage its
# subscriptions on different peers and get notified correctly!
for topic in topics:
wf.subManager.removeSubscription(pubsubTopic, topic)

View File

@ -20,7 +20,7 @@ logScope:
const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
WakuLegacyFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
WakuFilterTimeout: Duration = 2.hours
@ -60,8 +60,6 @@ proc removeSubscription(subscriptions: var seq[Subscription], peer: PeerId, unsu
## Protocol
type
MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
WakuFilterLegacy* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
@ -110,7 +108,7 @@ proc initProtocolHandler(wf: WakuFilterLegacy) =
wf.handleFilterRequest(conn.peerId, rpc)
wf.handler = handler
wf.codec = WakuFilterCodec
wf.codec = WakuLegacyFilterCodec
proc new*(T: type WakuFilterLegacy,
peerManager: PeerManager,
@ -131,7 +129,7 @@ proc init*(T: type WakuFilterLegacy,
proc sendFilterRpc(wf: WakuFilterLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}=
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
let connOpt = await wf.peerManager.dialPeer(peer, WakuLegacyFilterCodec)
if connOpt.isNone():
return err(dialFailure)
let connection = connOpt.get()

View File

@ -23,19 +23,21 @@ logScope:
topics = "waku filter client"
type
MessagePushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.}
WakuFilterClient* = ref object of LPProtocol
rng: ref HmacDrbgContext
messagePushHandler: MessagePushHandler
peerManager: PeerManager
pushHandlers: seq[FilterPushHandler]
func generateRequestId(rng: ref HmacDrbgContext): string =
var bytes: array[10, byte]
hmacDrbgGenerate(rng[], bytes)
return toHex(bytes)
proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, filterSubscribeRequest: FilterSubscribeRequest): Future[FilterSubscribeResult] {.async.} =
trace "Sending filter subscribe request", servicePeer, filterSubscribeRequest
proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo,
filterSubscribeRequest: FilterSubscribeRequest):
Future[FilterSubscribeResult]
{.async.} =
trace "Sending filter subscribe request", peerId=servicePeer.peerId, filterSubscribeRequest
let connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec)
if connOpt.isNone():
@ -77,7 +79,13 @@ proc ping*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSub
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} =
proc subscribe*(wfc: WakuFilterClient,
servicePeer: RemotePeerInfo,
pubsubTopic: PubsubTopic,
contentTopics: seq[ContentTopic]):
Future[FilterSubscribeResult]
{.async.} =
let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.subscribe(
requestId = requestId,
@ -87,7 +95,13 @@ proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic:
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} =
proc unsubscribe*(wfc: WakuFilterClient,
servicePeer: RemotePeerInfo,
pubsubTopic: PubsubTopic,
contentTopics: seq[ContentTopic]):
Future[FilterSubscribeResult]
{.async.} =
let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe(
requestId = requestId,
@ -97,7 +111,10 @@ proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopi
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSubscribeResult] {.async.} =
proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo):
Future[FilterSubscribeResult]
{.async.} =
let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribeAll(
requestId = requestId
@ -105,6 +122,9 @@ proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc registerPushHandler*(wfc: WakuFilterClient, handler: FilterPushHandler) =
wfc.pushHandlers.add(handler)
proc initProtocolHandler(wfc: WakuFilterClient) =
proc handler(conn: Connection, proto: string) {.async.} =
@ -119,7 +139,9 @@ proc initProtocolHandler(wfc: WakuFilterClient) =
let messagePush = decodeRes.value #TODO: toAPI() split here
trace "Received message push", peerId=conn.peerId, messagePush
wfc.messagePushHandler(messagePush.pubsubTopic, messagePush.wakuMessage)
for handler in wfc.pushHandlers:
asyncSpawn handler(messagePush.pubsubTopic,
messagePush.wakuMessage)
# Protocol specifies no response for now
return
@ -128,14 +150,14 @@ proc initProtocolHandler(wfc: WakuFilterClient) =
wfc.codec = WakuFilterPushCodec
proc new*(T: type WakuFilterClient,
rng: ref HmacDrbgContext,
messagePushHandler: MessagePushHandler,
peerManager: PeerManager): T =
peerManager: PeerManager,
rng: ref HmacDrbgContext
): T =
let wfc = WakuFilterClient(
rng: rng,
messagePushHandler: messagePushHandler,
peerManager: peerManager
peerManager: peerManager,
pushHandlers: @[]
)
wfc.initProtocolHandler()
wfc

View File

@ -4,6 +4,7 @@ else:
{.push raises: [].}
import
json_serialization,
std/options
import
../waku_core
@ -70,3 +71,29 @@ proc ok*(T: type FilterSubscribeResponse, requestId: string, desc = "OK"): T =
statusCode: 200,
statusDesc: some(desc)
)
proc `$`*(err: FilterSubscribeResponse): string =
"FilterSubscribeResponse of req:" & err.requestId & " [" & $err.statusCode & "] " & $err.statusDesc
proc `$`*(req: FilterSubscribeRequest): string =
"FilterSubscribeRequest of req:" & req.requestId & " [" & $req.filterSubscribeType & "] " & $req.pubsubTopic
proc `$`*(t: FilterSubscribeType): string =
result = case t:
of SUBSCRIBER_PING: "SUBSCRIBER_PING"
of SUBSCRIBE: "SUBSCRIBE"
of UNSUBSCRIBE: "UNSUBSCRIBE"
of UNSUBSCRIBE_ALL: "UNSUBSCRIBE_ALL"
proc writeValue*(writer: var JsonWriter, value: FilterSubscribeRequest) {.inline, raises: [IOError].} =
writer.beginRecord()
writer.writeField("requestId", value.requestId)
writer.writeField("type", value.filterSubscribeType)
if value.pubsubTopic.isSome:
writer.writeField("pubsubTopic", value.pubsubTopic)
if value.contentTopics.len>0:
writer.writeField("contentTopics", value.contentTopics)
writer.endRecord()