feat: HTTP REST API: Filter support v2 (#1890)

Filter v2 rest api support implemented 
Filter rest api documentation updated with v1 and v2 interface support.
Separated legacy filter rest interface
Fix code and tests of v2 Filter rest api
Filter v2 message push test added
Applied autoshard to Filter V2
Redesigned FilterPushHandling, code style, catch up apps and tests with filter v2 interface changes
Rename of FilterV1SubscriptionsRequest to FilterLegacySubscribeRequest, fix broken chat2 app, fix tests
Changed Filter v2 push handler subscription to simple register
Separate node's filterUnsubscribe and filterUnsubscribeAll
This commit is contained in:
NagyZoltanPeter 2023-09-14 21:28:57 +02:00 committed by GitHub
parent 099ff9e4a6
commit dd86da3247
32 changed files with 1928 additions and 399 deletions

View File

@ -262,10 +262,12 @@ proc writeAndPrint(c: Chat) {.async.} =
echo "You are now known as " & c.nick echo "You are now known as " & c.nick
elif line.startsWith("/exit"): elif line.startsWith("/exit"):
if not c.node.wakuFilter.isNil(): if not c.node.wakuFilterLegacy.isNil():
echo "unsubscribing from content filters..." echo "unsubscribing from content filters..."
await c.node.unsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic) let peerOpt = c.node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isSome():
await c.node.legacyFilterUnsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic, peer=peerOpt.get())
echo "quitting..." echo "quitting..."
@ -464,14 +466,18 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if peerInfo.isOk(): if peerInfo.isOk():
await node.mountFilter() await node.mountFilter()
await node.mountFilterClient() await node.mountFilterClient()
node.peerManager.addServicePeer(peerInfo.value, WakuFilterCodec) node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec)
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
trace "Hit filter handler", contentTopic=msg.contentTopic trace "Hit filter handler", contentTopic=msg.contentTopic
chat.printReceivedMessage(msg) chat.printReceivedMessage(msg)
await node.subscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=chat.contentTopic, filterHandler) await node.legacyFilterSubscribe(pubsubTopic=some(DefaultPubsubTopic),
contentTopics=chat.contentTopic,
filterHandler,
peerInfo.value)
# TODO: Here to support FilterV2 relevant subscription, but still
# Legacy Filter is concurrent to V2 untill legacy filter will be removed
else: else:
error "Filter not mounted. Couldn't parse conf.filternode", error "Filter not mounted. Couldn't parse conf.filternode",
error = peerInfo.error error = peerInfo.error
@ -485,7 +491,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
chat.printReceivedMessage(msg) chat.printReceivedMessage(msg)
let topic = DefaultPubsubTopic let topic = DefaultPubsubTopic
await node.subscribe(some(topic), @[ContentTopic("")], handler) node.subscribe(topic, handler)
if conf.rlnRelay: if conf.rlnRelay:
info "WakuRLNRelay is enabled" info "WakuRLNRelay is enabled"

View File

@ -18,6 +18,7 @@ import
../../../waku/waku_node, ../../../waku/waku_node,
../../../waku/node/peer_manager, ../../../waku/node/peer_manager,
../../waku/waku_filter, ../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_store, ../../waku/waku_store,
# Chat 2 imports # Chat 2 imports
../chat2/chat2, ../chat2/chat2,
@ -297,7 +298,8 @@ when isMainModule:
if conf.filternode != "": if conf.filternode != "":
let filterPeer = parsePeerInfo(conf.filternode) let filterPeer = parsePeerInfo(conf.filternode)
if filterPeer.isOk(): if filterPeer.isOk():
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterCodec) bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuLegacyFilterCodec)
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterSubscribeCodec)
else: else:
error "Error parsing conf.filternode", error = filterPeer.error error "Error parsing conf.filternode", error = filterPeer.error

View File

@ -37,6 +37,8 @@ import
../../waku/waku_store, ../../waku/waku_store,
../../waku/waku_lightpush, ../../waku/waku_lightpush,
../../waku/waku_filter, ../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_filter_v2/client as waku_filter_client,
./wakunode2_validator_signed, ./wakunode2_validator_signed,
./internal_config, ./internal_config,
./external_config ./external_config
@ -46,6 +48,7 @@ import
../../waku/node/rest/debug/handlers as rest_debug_api, ../../waku/node/rest/debug/handlers as rest_debug_api,
../../waku/node/rest/relay/handlers as rest_relay_api, ../../waku/node/rest/relay/handlers as rest_relay_api,
../../waku/node/rest/relay/topic_cache, ../../waku/node/rest/relay/topic_cache,
../../waku/node/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/node/rest/filter/handlers as rest_filter_api, ../../waku/node/rest/filter/handlers as rest_filter_api,
../../waku/node/rest/store/handlers as rest_store_api, ../../waku/node/rest/store/handlers as rest_store_api,
../../waku/node/rest/health/handlers as rest_health_api, ../../waku/node/rest/health/handlers as rest_health_api,
@ -470,8 +473,9 @@ proc setupProtocols(node: WakuNode,
if conf.filternode != "": if conf.filternode != "":
let filterNode = parsePeerInfo(conf.filternode) let filterNode = parsePeerInfo(conf.filternode)
if filterNode.isOk(): if filterNode.isOk():
await mountFilterClient(node) await node.mountFilterClient()
node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec) node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec)
node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec)
else: else:
return err("failed to set node waku filter peer: " & filterNode.error) return err("failed to set node waku filter peer: " & filterNode.error)
@ -577,8 +581,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
## Filter REST API ## Filter REST API
if conf.filter: if conf.filter:
let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity) let legacyFilterCache = rest_legacy_filter_api.MessageCache.init()
installFilterApiHandlers(server.router, app.node, filterCache) rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache)
let filterCache = rest_filter_api.MessageCache.init()
rest_filter_api.installFilterRestApiHandlers(server.router, app.node, filterCache)
## Store REST API ## Store REST API
installStoreApiHandlers(server.router, app.node) installStoreApiHandlers(server.router, app.node)

View File

@ -28,13 +28,15 @@ proc unsubscribe(wfc: WakuFilterClient,
else: else:
notice "unsubscribe request successful" notice "unsubscribe request successful"
proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) = proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage)
{.async, gcsafe.} =
let payloadStr = string.fromBytes(message.payload) let payloadStr = string.fromBytes(message.payload)
notice "message received", payload=payloadStr, notice "message received", payload=payloadStr,
pubsubTopic=pubsubTopic, pubsubTopic=pubsubTopic,
contentTopic=message.contentTopic, contentTopic=message.contentTopic,
timestamp=message.timestamp timestamp=message.timestamp
proc maintainSubscription(wfc: WakuFilterClient, proc maintainSubscription(wfc: WakuFilterClient,
filterPeer: RemotePeerInfo, filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic, filterPubsubTopic: PubsubTopic,
@ -68,11 +70,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) =
var var
switch = newStandardSwitch() switch = newStandardSwitch()
pm = PeerManager.new(switch) pm = PeerManager.new(switch)
wfc = WakuFilterClient.new(rng, messagePushHandler, pm) wfc = WakuFilterClient.new(pm, rng)
# Mount filter client protocol # Mount filter client protocol
switch.mount(wfc) switch.mount(wfc)
wfc.registerPushHandler(messagePushHandler)
# Start maintaining subscription # Start maintaining subscription
asyncSpawn maintainSubscription(wfc, filterPeer, FilterPubsubTopic, FilterContentTopic) asyncSpawn maintainSubscription(wfc, filterPeer, FilterPubsubTopic, FilterContentTopic)

View File

@ -58,8 +58,8 @@ import
./test_waku_lightpush, ./test_waku_lightpush,
./test_wakunode_lightpush, ./test_wakunode_lightpush,
# Waku Filter # Waku Filter
./test_waku_filter, ./test_waku_filter_legacy,
./test_wakunode_filter, ./test_wakunode_filter_legacy,
./test_waku_peer_exchange, ./test_waku_peer_exchange,
./test_peer_store_extended, ./test_peer_store_extended,
./test_message_cache, ./test_message_cache,
@ -95,7 +95,9 @@ import
./wakunode_rest/test_rest_relay, ./wakunode_rest/test_rest_relay,
./wakunode_rest/test_rest_relay_serdes, ./wakunode_rest/test_rest_relay_serdes,
./wakunode_rest/test_rest_serdes, ./wakunode_rest/test_rest_serdes,
./wakunode_rest/test_rest_store ./wakunode_rest/test_rest_store,
./wakunode_rest/test_rest_filter,
./wakunode_rest/test_rest_legacy_filter
import import
./waku_rln_relay/test_waku_rln_relay, ./waku_rln_relay/test_waku_rln_relay,

View File

@ -52,7 +52,7 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.mountFilter())) await allFutures(nodes.mapIt(it.mountFilter()))
# Dial node2 from node1 # Dial node2 from node1
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec) let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
# Check connection # Check connection
check: check:
conn.isSome() conn.isSome()
@ -81,12 +81,12 @@ procSuite "Peer Manager":
let nonExistentPeer = nonExistentPeerRes.value let nonExistentPeer = nonExistentPeerRes.value
# Dial non-existent peer from node1 # Dial non-existent peer from node1
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuFilterCodec) let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuLegacyFilterCodec)
check: check:
conn1.isNone() conn1.isNone()
# Dial peer not supporting given protocol # Dial peer not supporting given protocol
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec) let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
check: check:
conn2.isNone() conn2.isNone()
@ -109,14 +109,14 @@ procSuite "Peer Manager":
node.mountStoreClient() node.mountStoreClient()
node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec) node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec) node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec)
# Check peers were successfully added to peer manager # Check peers were successfully added to peer manager
check: check:
node.peerManager.peerStore.peers().len == 2 node.peerManager.peerStore.peers().len == 2
node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and node.peerManager.peerStore.peers(WakuLegacyFilterCodec).allIt(it.peerId == filterPeer.peerId and
it.addrs.contains(filterLoc) and it.addrs.contains(filterLoc) and
it.protocols.contains(WakuFilterCodec)) it.protocols.contains(WakuLegacyFilterCodec))
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
it.addrs.contains(storeLoc) and it.addrs.contains(storeLoc) and
it.protocols.contains(WakuStoreCodec)) it.protocols.contains(WakuStoreCodec))
@ -429,7 +429,7 @@ procSuite "Peer Manager":
# service peers # service peers
node.peerManager.addServicePeer(peers[0], WakuStoreCodec) node.peerManager.addServicePeer(peers[0], WakuStoreCodec)
node.peerManager.addServicePeer(peers[1], WakuFilterCodec) node.peerManager.addServicePeer(peers[1], WakuLegacyFilterCodec)
node.peerManager.addServicePeer(peers[2], WakuLightPushCodec) node.peerManager.addServicePeer(peers[2], WakuLightPushCodec)
node.peerManager.addServicePeer(peers[3], WakuPeerExchangeCodec) node.peerManager.addServicePeer(peers[3], WakuPeerExchangeCodec)
@ -449,7 +449,7 @@ procSuite "Peer Manager":
# all service peers are added to its service slot # all service peers are added to its service slot
check: check:
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId
node.peerManager.serviceSlots[WakuFilterCodec].peerId == peers[1].peerId node.peerManager.serviceSlots[WakuLegacyFilterCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[2].peerId node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[2].peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[3].peerId node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[3].peerId
@ -474,11 +474,11 @@ procSuite "Peer Manager":
(await nodes[0].peerManager.connectRelay(pInfos[2])) == true (await nodes[0].peerManager.connectRelay(pInfos[2])) == true
(await nodes[1].peerManager.connectRelay(pInfos[2])) == true (await nodes[1].peerManager.connectRelay(pInfos[2])) == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterCodec)).isSome() == true (await nodes[0].peerManager.dialPeer(pInfos[2], WakuLegacyFilterCodec)).isSome() == true
# isolated dial creates a relay conn under the hood (libp2p behaviour) # isolated dial creates a relay conn under the hood (libp2p behaviour)
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterCodec)).isSome() == true (await nodes[2].peerManager.dialPeer(pInfos[3], WakuLegacyFilterCodec)).isSome() == true
# assert physical connections # assert physical connections
@ -486,26 +486,26 @@ procSuite "Peer Manager":
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0 nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2 nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2
nodes[0].peerManager.connectedPeers(WakuFilterCodec)[0].len == 0 nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuFilterCodec)[1].len == 2 nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 2
nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1 nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1 nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1
nodes[1].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1 nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0 nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2 nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1 nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1 nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterCodec)[1].len == 1 nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1 nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0 nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0
nodes[3].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1 nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0 nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0
asyncTest "getNumStreams() returns expected number of connections per protocol": asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes # Create 2 nodes
@ -521,17 +521,17 @@ procSuite "Peer Manager":
require: require:
# multiple streams are multiplexed over a single connection. # multiple streams are multiplexed over a single connection.
# note that a relay connection is created under the hood when dialing a peer (libp2p behaviour) # note that a relay connection is created under the hood when dialing a peer (libp2p behaviour)
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
check: check:
nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1) nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[0].peerManager.getNumStreams(WakuFilterCodec) == (0, 4) nodes[0].peerManager.getNumStreams(WakuLegacyFilterCodec) == (0, 4)
nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1) nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[1].peerManager.getNumStreams(WakuFilterCodec) == (4, 0) nodes[1].peerManager.getNumStreams(WakuLegacyFilterCodec) == (4, 0)
test "selectPeer() returns the correct peer": test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit # Valid peer id missing the last digit
@ -552,7 +552,7 @@ procSuite "Peer Manager":
# Add a peer[0] to the peerstore # Add a peer[0] to the peerstore
pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs
pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuFilterCodec] pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuLegacyFilterCodec]
# When no service peers, we get one from the peerstore # When no service peers, we get one from the peerstore
let selectedPeer1 = pm.selectPeer(WakuStoreCodec) let selectedPeer1 = pm.selectPeer(WakuStoreCodec)
@ -561,7 +561,7 @@ procSuite "Peer Manager":
selectedPeer1.get().peerId == peers[0].peerId selectedPeer1.get().peerId == peers[0].peerId
# Same for other protocol # Same for other protocol
let selectedPeer2 = pm.selectPeer(WakuFilterCodec) let selectedPeer2 = pm.selectPeer(WakuLegacyFilterCodec)
check: check:
selectedPeer2.isSome() == true selectedPeer2.isSome() == true
selectedPeer2.get().peerId == peers[0].peerId selectedPeer2.get().peerId == peers[0].peerId
@ -757,7 +757,7 @@ procSuite "Peer Manager":
discard await nodes[0].peerManager.connectRelay(pInfos[3]) discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4]) discard await nodes[0].peerManager.connectRelay(pInfos[4])
# they are also prunned  # they are also prunned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1 check nodes[0].peerManager.switch.connManager.getConnections().len == 1
# we should have 4 peers (2in/2out) but due to collocation limit # we should have 4 peers (2in/2out) but due to collocation limit

View File

@ -44,7 +44,7 @@ suite "WakuNode - Filter":
filterPushHandlerFut.complete((pubsubTopic, msg)) filterPushHandlerFut.complete((pubsubTopic, msg))
## When ## When
await client.filterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo) await client.legacyFilterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)
# Wait for subscription to take effect # Wait for subscription to take effect
waitFor sleepAsync(100.millis) waitFor sleepAsync(100.millis)

View File

@ -1,6 +1,5 @@
import import
std/[options,tables], std/[options,tables],
testutils/unittests,
chronos, chronos,
chronicles chronicles
@ -22,10 +21,10 @@ proc newTestWakuFilter*(switch: Switch): Future[WakuFilter] {.async.} =
return proto return proto
proc newTestWakuFilterClient*(switch: Switch, messagePushHandler: MessagePushHandler): Future[WakuFilterClient] {.async.} = proc newTestWakuFilterClient*(switch: Switch): Future[WakuFilterClient] {.async.} =
let let
peerManager = PeerManager.new(switch) peerManager = PeerManager.new(switch)
proto = WakuFilterClient.new(rng, messagePushHandler, peerManager) proto = WakuFilterClient.new(peerManager, rng)
await proto.start() await proto.start()
switch.mount(proto) switch.mount(proto)

View File

@ -3,16 +3,13 @@
import import
std/[options,tables], std/[options,tables],
testutils/unittests, testutils/unittests,
chronos, chronos
chronicles,
libp2p/peerstore
import import
../../../waku/node/peer_manager, ../../waku/node/peer_manager,
../../../waku/waku_filter_v2, ../../waku/waku_filter_v2,
../../../waku/waku_filter_v2/client, ../../waku/waku_filter_v2/client,
../../../waku/waku_core, ../../waku/waku_core,
../testlib/common,
../testlib/wakucore, ../testlib/wakucore,
../testlib/testasync, ../testlib/testasync,
./client_utils.nim ./client_utils.nim
@ -28,26 +25,23 @@ suite "Waku Filter":
var contentTopics {.threadvar.}: seq[ContentTopic] var contentTopics {.threadvar.}: seq[ContentTopic]
asyncSetup: asyncSetup:
let
voidHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) =
discard
pubsubTopic = DefaultPubsubTopic pubsubTopic = DefaultPubsubTopic
contentTopics = @[DefaultContentTopic] contentTopics = @[DefaultContentTopic]
serverSwitch = newStandardSwitch() serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch() clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch) wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, voidHandler) wakuFilterClient = await newTestWakuFilterClient(clientSwitch)
await allFutures(serverSwitch.start(), clientSwitch.start()) await allFutures(serverSwitch.start(), clientSwitch.start())
serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
asyncTeardown: asyncTeardown:
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())
asyncTest "Active Subscription Identification": asyncTest "Active Subscription Identification":
# Given # Given
let let
clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId
subscribeResponse = await wakuFilterClient.subscribe( subscribeResponse = await wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopics serverRemotePeerInfo, pubsubTopic, contentTopics
@ -75,12 +69,12 @@ suite "Waku Filter":
asyncTest "After Unsubscription": asyncTest "After Unsubscription":
# Given # Given
let let
clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId
subscribeResponse = await wakuFilterClient.subscribe( subscribeResponse = await wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopics serverRemotePeerInfo, pubsubTopic, contentTopics
) )
require: require:
subscribeResponse.isOk() subscribeResponse.isOk()
wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.subscriptions.hasKey(clientPeerId)

View File

@ -3,16 +3,13 @@
import import
std/[options,tables], std/[options,tables],
testutils/unittests, testutils/unittests,
chronos, chronos
chronicles,
libp2p/peerstore
import import
../../../waku/node/peer_manager, ../../waku/node/peer_manager,
../../../waku/waku_filter_v2, ../../waku/waku_filter_v2,
../../../waku/waku_filter_v2/client, ../../waku/waku_filter_v2/client,
../../../waku/waku_core, ../../waku/waku_core,
../testlib/common,
../testlib/wakucore, ../testlib/wakucore,
./client_utils.nim ./client_utils.nim
@ -22,21 +19,29 @@ suite "Waku Filter - end to end":
# Given # Given
var var
pushHandlerFuture = newFuture[(string, WakuMessage)]() pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic,
message: WakuMessage):
Future[void]
{.async, closure, gcsafe.} =
pushHandlerFuture.complete((pubsubTopic, message)) pushHandlerFuture.complete((pubsubTopic, message))
let let
serverSwitch = newStandardSwitch() serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch() clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch) wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) wakuFilterClient = await newTestWakuFilterClient(clientSwitch)
clientPeerId = clientSwitch.peerInfo.peerId clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic pubsubTopic = DefaultPubsubTopic
contentTopics = @[DefaultContentTopic] contentTopics = @[DefaultContentTopic]
# When # When
await allFutures(serverSwitch.start(), clientSwitch.start()) await allFutures(serverSwitch.start(), clientSwitch.start())
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
wakuFilterClient.registerPushHandler(messagePushHandler)
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic,
contentTopics)
# Then # Then
check: check:
@ -57,7 +62,9 @@ suite "Waku Filter - end to end":
pushedMsg == msg1 pushedMsg == msg1
# When # When
let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic,
contentTopics)
# Then # Then
check: check:
@ -74,20 +81,24 @@ suite "Waku Filter - end to end":
not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed
# Teardown # Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) await allFutures(wakuFilter.stop(), wakuFilterClient.stop(),
serverSwitch.stop(), clientSwitch.stop())
asyncTest "subscribe, unsubscribe multiple content topics": asyncTest "subscribe, unsubscribe multiple content topics":
# Given # Given
var var
pushHandlerFuture = newFuture[(string, WakuMessage)]() pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic,
message: WakuMessage):
Future[void]
{.async, closure, gcsafe.} =
pushHandlerFuture.complete((pubsubTopic, message)) pushHandlerFuture.complete((pubsubTopic, message))
let let
serverSwitch = newStandardSwitch() serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch() clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch) wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) wakuFilterClient = await newTestWakuFilterClient(clientSwitch)
clientPeerId = clientSwitch.peerInfo.peerId clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic pubsubTopic = DefaultPubsubTopic
contentTopic2 = ContentTopic("/waku/2/non-default-content/proto") contentTopic2 = ContentTopic("/waku/2/non-default-content/proto")
@ -95,7 +106,12 @@ suite "Waku Filter - end to end":
# When # When
await allFutures(serverSwitch.start(), clientSwitch.start()) await allFutures(serverSwitch.start(), clientSwitch.start())
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
wakuFilterClient.registerPushHandler(messagePushHandler)
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic,
contentTopics)
# Then # Then
check: check:
@ -129,7 +145,9 @@ suite "Waku Filter - end to end":
pushedMsg2 == msg2 pushedMsg2 == msg2
# When # When
let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, @[contentTopic2]) # Unsubscribe only one content topic let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic,
@[contentTopic2]) # Unsubscribe only one content topic
# Then # Then
check: check:
@ -159,20 +177,24 @@ suite "Waku Filter - end to end":
not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed
# Teardown # Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) await allFutures(wakuFilter.stop(), wakuFilterClient.stop(),
serverSwitch.stop(), clientSwitch.stop())
asyncTest "subscribe to multiple content topics and unsubscribe all": asyncTest "subscribe to multiple content topics and unsubscribe all":
# Given # Given
var var
pushHandlerFuture = newFuture[(string, WakuMessage)]() pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic,
message: WakuMessage):
Future[void]
{.async, closure, gcsafe.} =
pushHandlerFuture.complete((pubsubTopic, message)) pushHandlerFuture.complete((pubsubTopic, message))
let let
serverSwitch = newStandardSwitch() serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch() clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch) wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) wakuFilterClient = await newTestWakuFilterClient(clientSwitch)
clientPeerId = clientSwitch.peerInfo.peerId clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic pubsubTopic = DefaultPubsubTopic
contentTopic2 = ContentTopic("/waku/2/non-default-content/proto") contentTopic2 = ContentTopic("/waku/2/non-default-content/proto")
@ -180,7 +202,12 @@ suite "Waku Filter - end to end":
# When # When
await allFutures(serverSwitch.start(), clientSwitch.start()) await allFutures(serverSwitch.start(), clientSwitch.start())
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
wakuFilterClient.registerPushHandler(messagePushHandler)
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic,
contentTopics)
# Then # Then
check: check:
@ -234,20 +261,24 @@ suite "Waku Filter - end to end":
not (await pushHandlerFuture.withTimeout(2.seconds)) # Neither message should be pushed not (await pushHandlerFuture.withTimeout(2.seconds)) # Neither message should be pushed
# Teardown # Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) await allFutures(wakuFilter.stop(), wakuFilterClient.stop(),
serverSwitch.stop(), clientSwitch.stop())
asyncTest "subscribe, unsubscribe multiple pubsub topics and content topics": asyncTest "subscribe, unsubscribe multiple pubsub topics and content topics":
# Given # Given
var var
pushHandlerFuture = newFuture[(string, WakuMessage)]() pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic,
message: WakuMessage):
Future[void]
{.async, closure, gcsafe.} =
pushHandlerFuture.complete((pubsubTopic, message)) pushHandlerFuture.complete((pubsubTopic, message))
let let
serverSwitch = newStandardSwitch() serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch() clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch) wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) wakuFilterClient = await newTestWakuFilterClient(clientSwitch)
clientPeerId = clientSwitch.peerInfo.peerId clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic pubsubTopic = DefaultPubsubTopic
pubsubTopic2 = PubsubTopic("/waku/2/non-default-pubsub/proto") pubsubTopic2 = PubsubTopic("/waku/2/non-default-pubsub/proto")
@ -258,9 +289,17 @@ suite "Waku Filter - end to end":
# When # When
await allFutures(serverSwitch.start(), clientSwitch.start()) await allFutures(serverSwitch.start(), clientSwitch.start())
wakuFilterClient.registerPushHandler(messagePushHandler)
let let
response1 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) response1 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, contentTopics) pubsubTopic,
contentTopics)
response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic2,
contentTopics)
# Then # Then
check: check:
@ -299,7 +338,9 @@ suite "Waku Filter - end to end":
## Step 3: We can selectively unsubscribe from pubsub topics and content topic(s) ## Step 3: We can selectively unsubscribe from pubsub topics and content topic(s)
# When # When
let response3 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, @[contentTopic2]) let response3 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(),
pubsubTopic2,
@[contentTopic2])
require response3.isOk() require response3.isOk()
let msg3 = fakeWakuMessage(contentTopic=contentTopic2) let msg3 = fakeWakuMessage(contentTopic=contentTopic2)
@ -325,4 +366,5 @@ suite "Waku Filter - end to end":
pushedMsg3 == msg3 pushedMsg3 == msg3
# Teardown # Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) await allFutures(wakuFilter.stop(), wakuFilterClient.stop(),
serverSwitch.stop(), clientSwitch.stop())

View File

@ -216,7 +216,7 @@ procSuite "WakuNode - Store":
let mountArchiveRes = server.mountArchive(driver) let mountArchiveRes = server.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error assert mountArchiveRes.isOk(), mountArchiveRes.error
waitFor server.mountStore() waitFor server.mountStore()
waitFor server.mountFilterClient() waitFor server.mountFilterClient()
client.mountStoreClient() client.mountStoreClient()
@ -232,7 +232,7 @@ procSuite "WakuNode - Store":
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
filterFut.complete((pubsubTopic, msg)) filterFut.complete((pubsubTopic, msg))
waitFor server.filterSubscribe(some(DefaultPubsubTopic), DefaultContentTopic, filterHandler, peer=filterSourcePeer) waitFor server.legacyFilterSubscribe(some(DefaultPubsubTopic), DefaultContentTopic, filterHandler, peer=filterSourcePeer)
waitFor sleepAsync(100.millis) waitFor sleepAsync(100.millis)

View File

@ -169,11 +169,11 @@ procSuite "Waku v2 JSON-RPC API - Admin":
filterPeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr]) filterPeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr])
storePeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr]) storePeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr])
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec) node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec)
node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec) node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
# Mock that we connected in the past so Identify populated this # Mock that we connected in the past so Identify populated this
node.peerManager.peerStore[ProtoBook][filterPeer.peerId] = @[WakuFilterCodec] node.peerManager.peerStore[ProtoBook][filterPeer.peerId] = @[WakuLegacyFilterCodec]
node.peerManager.peerStore[ProtoBook][storePeer.peerId] = @[WakuStoreCodec] node.peerManager.peerStore[ProtoBook][storePeer.peerId] = @[WakuStoreCodec]
let response = await client.get_waku_v2_admin_v1_peers() let response = await client.get_waku_v2_admin_v1_peers()
@ -182,7 +182,7 @@ procSuite "Waku v2 JSON-RPC API - Admin":
check: check:
response.len == 2 response.len == 2
# Check filter peer # Check filter peer
(response.filterIt(it.protocol == WakuFilterCodec)[0]).multiaddr == constructMultiaddrStr(filterPeer) (response.filterIt(it.protocol == WakuLegacyFilterCodec)[0]).multiaddr == constructMultiaddrStr(filterPeer)
# Check store peer # Check store peer
(response.filterIt(it.protocol == WakuStoreCodec)[0]).multiaddr == constructMultiaddrStr(storePeer) (response.filterIt(it.protocol == WakuStoreCodec)[0]).multiaddr == constructMultiaddrStr(storePeer)

View File

@ -41,7 +41,7 @@ procSuite "Waku v2 JSON-RPC API - Filter":
await node1.mountFilter() await node1.mountFilter()
await node2.mountFilterClient() await node2.mountFilterClient()
node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec) node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
# RPC server setup # RPC server setup
let let

View File

@ -21,6 +21,11 @@ import
../../waku/node/rest/filter/handlers as filter_api, ../../waku/node/rest/filter/handlers as filter_api,
../../waku/node/rest/filter/client as filter_api_client, ../../waku/node/rest/filter/client as filter_api_client,
../../waku/waku_relay, ../../waku/waku_relay,
../../waku/waku_filter_v2/subscriptions,
../../waku/waku_filter_v2/common,
../../waku/node/rest/relay/topic_cache,
../../waku/node/rest/relay/handlers as relay_api,
../../waku/node/rest/relay/client as relay_api_client,
../testlib/wakucore, ../testlib/wakucore,
../testlib/wakunode ../testlib/wakunode
@ -36,51 +41,64 @@ proc testWakuNode(): WakuNode =
type RestFilterTest = object type RestFilterTest = object
node1: WakuNode serviceNode: WakuNode
node2: WakuNode subscriberNode: WakuNode
restServer: RestServerRef restServer: RestServerRef
restServerForService: RestServerRef
messageCache: filter_api.MessageCache messageCache: filter_api.MessageCache
client: RestClientRef client: RestClientRef
clientTwdServiceNode: RestClientRef
proc setupRestFilter(): Future[RestFilterTest] {.async.} = proc init(T: type RestFilterTest): Future[T] {.async.} =
result.node1 = testWakuNode() var testSetup = RestFilterTest()
result.node2 = testWakuNode() testSetup.serviceNode = testWakuNode()
testSetup.subscriberNode = testWakuNode()
await allFutures(result.node1.start(), result.node2.start()) await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start())
await result.node1.mountFilter() await testSetup.serviceNode.mountRelay()
await result.node2.mountFilterClient() await testSetup.serviceNode.mountFilter()
await testSetup.subscriberNode.mountFilterClient()
result.node2.peerManager.addServicePeer(result.node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec) testSetup.subscriberNode.peerManager.addServicePeer(testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuFilterSubscribeCodec)
let restPort = Port(58011) let restPort = Port(58011)
let restAddress = ValidIpAddress.init("0.0.0.0") let restAddress = ValidIpAddress.init("127.0.0.1")
result.restServer = RestServerRef.init(restAddress, restPort).tryGet() testSetup.restServer = RestServerRef.init(restAddress, restPort).tryGet()
result.messageCache = filter_api.MessageCache.init(capacity=filter_api.filterMessageCacheDefaultCapacity) let restPort2 = Port(58012)
testSetup.restServerForService = RestServerRef.init(restAddress, restPort2).tryGet()
installFilterPostSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache) # through this one we will see if messages are pushed according to our content topic sub
installFilterDeleteSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache) testSetup.messageCache = filter_api.MessageCache.init()
installFilterGetMessagesV1Handler(result.restServer.router, result.node2, result.messageCache) installFilterRestApiHandlers(testSetup.restServer.router, testSetup.subscriberNode, testSetup.messageCache)
result.restServer.start() let topicCache = TopicCache.init()
installRelayApiHandlers(testSetup.restServerForService.router, testSetup.serviceNode, topicCache)
result.client = newRestHttpClient(initTAddress(restAddress, restPort)) testSetup.restServer.start()
testSetup.restServerForService.start()
return result testSetup.client = newRestHttpClient(initTAddress(restAddress, restPort))
testSetup.clientTwdServiceNode = newRestHttpClient(initTAddress(restAddress, restPort2))
return testSetup
proc shutdown(self: RestFilterTest) {.async.} = proc shutdown(self: RestFilterTest) {.async.} =
await self.restServer.stop() await self.restServer.stop()
await self.restServer.closeWait() await self.restServer.closeWait()
await allFutures(self.node1.stop(), self.node2.stop()) await self.restServerForService.stop()
await self.restServerForService.closeWait()
await allFutures(self.serviceNode.stop(), self.subscriberNode.stop())
suite "Waku v2 Rest API - Filter": suite "Waku v2 Rest API - Filter V2":
asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions": asyncTest "Subscribe a node to an array of topics - POST /filter/v2/subscriptions":
# Given # Given
let restFilterTest: RestFilterTest = await setupRestFilter() let restFilterTest = await RestFilterTest.init()
let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
# When # When
let contentFilters = @[DefaultContentTopic let contentFilters = @[DefaultContentTopic
@ -89,103 +107,178 @@ suite "Waku v2 Rest API - Filter":
,ContentTopic("4") ,ContentTopic("4")
] ]
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, let requestBody = FilterSubscribeRequest(requestId: "1234",
contentFilters: contentFilters,
pubsubTopic: some(DefaultPubsubTopic)) pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody) let response = await restFilterTest.client.filterPostSubscriptions(requestBody)
# Then echo "response", $response
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check: let subscribedPeer1 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, DefaultContentTopic)
restFilterTest.messageCache.isSubscribed(DefaultContentTopic) let subscribedPeer2 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "2")
restFilterTest.messageCache.isSubscribed("2") let subscribedPeer3 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "3")
restFilterTest.messageCache.isSubscribed("3") let subscribedPeer4 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4")
restFilterTest.messageCache.isSubscribed("4")
# When - error case
let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: none(string))
let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody)
check:
badResponse.status == 400
$badResponse.contentType == $MIMETYPE_TEXT
badResponse.data == "Invalid content body, could not decode. Unable to deserialize data"
await restFilterTest.shutdown()
asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions":
# Given
let
restFilterTest: RestFilterTest = await setupRestFilter()
# When
restFilterTest.messageCache.subscribe("1")
restFilterTest.messageCache.subscribe("2")
restFilterTest.messageCache.subscribe("3")
restFilterTest.messageCache.subscribe("4")
let contentFilters = @[ContentTopic("1")
,ContentTopic("2")
,ContentTopic("3")
# ,ContentTopic("4") # Keep this subscription for check
]
# When
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
not restFilterTest.messageCache.isSubscribed("1")
not restFilterTest.messageCache.isSubscribed("2")
not restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")
await restFilterTest.shutdown()
asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}":
# Given
let
restFilterTest = await setupRestFilter()
let pubSubTopic = "/waku/2/default-waku/proto"
let contentTopic = ContentTopic( "content-topic-x" )
let messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
]
restFilterTest.messageCache.subscribe(contentTopic)
for msg in messages:
restFilterTest.messageCache.addMessage(contentTopic, msg)
# When
let response = await restFilterTest.client.filterGetMessagesV1(contentTopic)
# Then # Then
check: check:
response.status == 200 response.status == 200
$response.contentType == $MIMETYPE_JSON $response.contentType == $MIMETYPE_JSON
response.data.len == 3 response.data.requestId == "1234"
response.data.all do (msg: FilterWakuMessage) -> bool: subscribedPeer1.len() == 1
msg.payload == base64.encode("TEST-1") and subPeerId in subscribedPeer1
msg.contentTopic.get().string == "content-topic-x" and subPeerId in subscribedPeer2
msg.version.get() == 2 and subPeerId in subscribedPeer3
msg.timestamp.get() != Timestamp(0) subPeerId in subscribedPeer4
# When - error case
let badRequestBody = FilterSubscribeRequest(requestId: "4567", contentFilters: @[], pubsubTopic: none(string))
let badRequestResp = await restFilterTest.client.filterPostSubscriptions(badRequestBody)
check:
badRequestResp.status == 400
$badRequestResp.contentType == $MIMETYPE_JSON
badRequestResp.data.requestId == "unknown"
# badRequestResp.data.statusDesc == "*********"
badRequestResp.data.statusDesc.startsWith("BAD_REQUEST: Failed to decode request")
await restFilterTest.shutdown()
asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v2/subscriptions":
# Given
let
restFilterTest = await RestFilterTest.init()
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
# When
var requestBody = FilterSubscribeRequest(requestId: "1234",
contentFilters: @[ContentTopic("1")
,ContentTopic("2")
,ContentTopic("3")
,ContentTopic("4")
],
pubsubTopic: some(DefaultPubsubTopic))
discard await restFilterTest.client.filterPostSubscriptions(requestBody)
let contentFilters = @[ContentTopic("1")
,ContentTopic("2")
,ContentTopic("3")
# ,ContentTopic("4") # Keep this subscription for check
]
let requestBodyUnsub = FilterUnsubscribeRequest(requestId: "4321",
contentFilters: contentFilters,
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterDeleteSubscriptions(requestBodyUnsub)
let subscribedPeer1 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, DefaultContentTopic)
let subscribedPeer2 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "2")
let subscribedPeer3 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "3")
let subscribedPeer4 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4")
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.requestId == "4321"
subscribedPeer1.len() == 0
subPeerId notin subscribedPeer1
subPeerId notin subscribedPeer2
subPeerId notin subscribedPeer3
subscribedPeer4.len() == 1
subPeerId in subscribedPeer4
# When - error case
let requestBodyUnsubAll = FilterUnsubscribeAllRequest(requestId: "2143")
let responseUnsubAll = await restFilterTest.client.filterDeleteAllSubscriptions(requestBodyUnsubAll)
let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4")
check:
responseUnsubAll.status == 200
$responseUnsubAll.contentType == $MIMETYPE_JSON
responseUnsubAll.data.requestId == "2143"
subscribedPeer.len() == 0
await restFilterTest.shutdown()
asyncTest "ping subscribed node - GET /filter/v2/subscriptions/{requestId}":
# Given
let
restFilterTest = await RestFilterTest.init()
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
# When
var requestBody = FilterSubscribeRequest(requestId: "1234",
contentFilters: @[ContentTopic("1")],
pubsubTopic: some(DefaultPubsubTopic))
discard await restFilterTest.client.filterPostSubscriptions(requestBody)
let pingResponse = await restFilterTest.client.filterSubscriberPing("9999")
# Then
check:
pingResponse.status == 200
$pingResponse.contentType == $MIMETYPE_JSON
pingResponse.data.requestId == "9999"
pingResponse.data.statusDesc.len() == 0
# When - error case
let requestBodyUnsubAll = FilterUnsubscribeAllRequest(requestId: "9988")
discard await restFilterTest.client.filterDeleteAllSubscriptions(requestBodyUnsubAll)
let pingResponseFail = await restFilterTest.client.filterSubscriberPing("9977")
# Then
check:
pingResponseFail.status == 404 # NOT_FOUND
$pingResponseFail.contentType == $MIMETYPE_JSON
pingResponseFail.data.requestId == "9977"
pingResponseFail.data.statusDesc == "NOT_FOUND: peer has no subscriptions"
await restFilterTest.shutdown()
asyncTest "push filtered message":
# Given
let
restFilterTest = await RestFilterTest.init()
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.messageCache.subscribe(DefaultPubsubTopic)
restFilterTest.serviceNode.subscribe(DefaultPubsubTopic)
# When
var requestBody = FilterSubscribeRequest(requestId: "1234",
contentFilters: @[ContentTopic("1")],
pubsubTopic: some(DefaultPubsubTopic))
discard await restFilterTest.client.filterPostSubscriptions(requestBody)
let pingResponse = await restFilterTest.client.filterSubscriberPing("9999")
# Then
check:
pingResponse.status == 200
$pingResponse.contentType == $MIMETYPE_JSON
pingResponse.data.requestId == "9999"
pingResponse.data.statusDesc.len() == 0
# When - message push
let testMessage = WakuMessage(
payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(),
contentTopic: "1",
timestamp: int64(2022)
)
let postMsgResponse = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
DefaultPubsubTopic,
toRelayWakuMessage(testMessage)
)
# Then
let messages = restFilterTest.messageCache.getMessages("1").tryGet()
check:
postMsgResponse.status == 200
$postMsgResponse.contentType == $MIMETYPE_TEXT
postMsgResponse.data == "OK"
messages == @[testMessage]
await restFilterTest.shutdown() await restFilterTest.shutdown()

View File

@ -0,0 +1,191 @@
{.used.}
import
std/sequtils,
stew/byteutils,
stew/shims/net,
testutils/unittests,
presto, presto/client as presto_client,
libp2p/crypto/crypto
import
../../waku/node/message_cache,
../../waku/common/base64,
../../waku/waku_core,
../../waku/waku_node,
../../waku/node/peer_manager,
../../waku/waku_filter,
../../waku/node/rest/server,
../../waku/node/rest/client,
../../waku/node/rest/responses,
../../waku/node/rest/filter/types,
../../waku/node/rest/filter/legacy_handlers as filter_api,
../../waku/node/rest/filter/legacy_client as filter_api_client,
../../waku/waku_relay,
../testlib/wakucore,
../testlib/wakunode
proc testWakuNode(): WakuNode =
let
privkey = generateSecp256k1Key()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(0)
return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))
type RestFilterTest = object
filterNode: WakuNode
clientNode: WakuNode
restServer: RestServerRef
messageCache: filter_api.MessageCache
client: RestClientRef
proc setupRestFilter(): Future[RestFilterTest] {.async.} =
result.filterNode = testWakuNode()
result.clientNode = testWakuNode()
await allFutures(result.filterNode.start(), result.clientNode.start())
await result.filterNode.mountFilter()
await result.clientNode.mountFilterClient()
result.clientNode.peerManager.addServicePeer(result.filterNode.peerInfo.toRemotePeerInfo()
,WakuLegacyFilterCodec)
let restPort = Port(58011)
let restAddress = ValidIpAddress.init("0.0.0.0")
result.restServer = RestServerRef.init(restAddress, restPort).tryGet()
result.messageCache = filter_api.MessageCache.init()
installLegacyFilterRestApiHandlers(result.restServer.router
,result.clientNode
,result.messageCache)
result.restServer.start()
result.client = newRestHttpClient(initTAddress(restAddress, restPort))
return result
proc shutdown(self: RestFilterTest) {.async.} =
await self.restServer.stop()
await self.restServer.closeWait()
await allFutures(self.filterNode.stop(), self.clientNode.stop())
suite "Waku v2 Rest API - Filter":
asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions":
# Given
let restFilterTest: RestFilterTest = await setupRestFilter()
# When
let contentFilters = @[DefaultContentTopic
,ContentTopic("2")
,ContentTopic("3")
,ContentTopic("4")
]
let requestBody = FilterLegacySubscribeRequest(contentFilters: contentFilters,
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
restFilterTest.messageCache.isSubscribed(DefaultContentTopic)
restFilterTest.messageCache.isSubscribed("2")
restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")
# When - error case
let badRequestBody = FilterLegacySubscribeRequest(contentFilters: @[]
,pubsubTopic: none(string))
let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody)
check:
badResponse.status == 400
$badResponse.contentType == $MIMETYPE_TEXT
badResponse.data == "Invalid content body, could not decode. Unable to deserialize data"
await restFilterTest.shutdown()
asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions":
# Given
let
restFilterTest: RestFilterTest = await setupRestFilter()
# When
restFilterTest.messageCache.subscribe("1")
restFilterTest.messageCache.subscribe("2")
restFilterTest.messageCache.subscribe("3")
restFilterTest.messageCache.subscribe("4")
let contentFilters = @[ContentTopic("1")
,ContentTopic("2")
,ContentTopic("3")
# ,ContentTopic("4") # Keep this subscription for check
]
# When
let requestBody = FilterLegacySubscribeRequest(contentFilters: contentFilters,
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
not restFilterTest.messageCache.isSubscribed("1")
not restFilterTest.messageCache.isSubscribed("2")
not restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")
await restFilterTest.shutdown()
asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}":
# Given
let
restFilterTest = await setupRestFilter()
let pubSubTopic = "/waku/2/default-waku/proto"
let contentTopic = ContentTopic( "content-topic-x" )
let messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
]
restFilterTest.messageCache.subscribe(contentTopic)
for msg in messages:
restFilterTest.messageCache.addMessage(contentTopic, msg)
# When
let response = await restFilterTest.client.filterGetMessagesV1(contentTopic)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.len == 3
response.data.all do (msg: FilterWakuMessage) -> bool:
msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get().string == "content-topic-x" and
msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0)
await restFilterTest.shutdown()

View File

@ -50,18 +50,18 @@ To run a specific test.
# Get a shell with the right environment variables set # Get a shell with the right environment variables set
./env.sh bash ./env.sh bash
# Run a specific test # Run a specific test
nim c -r ./tests/test_waku_filter.nim nim c -r ./tests/test_waku_filter_legacy.nim
``` ```
You can also alter compile options. For example, if you want a less verbose output you can do the following. For more, refer to the [compiler flags](https://nim-lang.org/docs/nimc.html#compiler-usage) and [chronicles documentation](https://github.com/status-im/nim-chronicles#compile-time-configuration). You can also alter compile options. For example, if you want a less verbose output you can do the following. For more, refer to the [compiler flags](https://nim-lang.org/docs/nimc.html#compiler-usage) and [chronicles documentation](https://github.com/status-im/nim-chronicles#compile-time-configuration).
```bash ```bash
nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off ./tests/test_waku_filter.nim nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off ./tests/waku_filter_v2/test_waku_filter.nim
``` ```
You may also want to change the `outdir` to a folder ignored by git. You may also want to change the `outdir` to a folder ignored by git.
```bash ```bash
nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off --outdir=build ./tests/test_waku_filter.nim nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off --outdir=build ./tests/waku_filter_v2/test_waku_filter.nim
``` ```
### Waku Protocol Example ### Waku Protocol Example

View File

@ -71,9 +71,9 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
if not node.wakuFilterLegacy.isNil(): if not node.wakuFilterLegacy.isNil():
# Map WakuFilter peers to WakuPeers and add to return list # Map WakuFilter peers to WakuPeers and add to return list
let filterPeers = node.peerManager.peerStore.peers(WakuFilterCodec) let filterPeers = node.peerManager.peerStore.peers(WakuLegacyFilterCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it), .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it),
protocol: WakuFilterCodec, protocol: WakuLegacyFilterCodec,
connected: it.connectedness == Connectedness.Connected)) connected: it.connectedness == Connectedness.Connected))
peers.add(filterPeers) peers.add(filterPeers)

View File

@ -34,7 +34,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
## Subscribes a node to a list of content filters ## Subscribes a node to a list of content filters
debug "post_waku_v2_filter_v1_subscription" debug "post_waku_v2_filter_v1_subscription"
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isNone(): if peerOpt.isNone():
raise newException(ValueError, "no suitable remote filter peers") raise newException(ValueError, "no suitable remote filter peers")
@ -43,7 +43,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg) cache.addMessage(msg.contentTopic, msg)
let subFut = node.filterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get()) let subFut = node.legacyFilterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get())
if not await subFut.withTimeout(futTimeout): if not await subFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to subscribe to contentFilters") raise newException(ValueError, "Failed to subscribe to contentFilters")
@ -59,7 +59,11 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
let contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) let contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
let unsubFut = node.unsubscribe(pubsubTopic, contentTopics) let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isNone():
raise newException(ValueError, "no suitable remote filter peers")
let unsubFut = node.legacyFilterUnsubscribe(pubsubTopic, contentTopics, peerOpt.get())
if not await unsubFut.withTimeout(futTimeout): if not await unsubFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to unsubscribe from contentFilters") raise newException(ValueError, "Failed to unsubscribe from contentFilters")

View File

@ -44,6 +44,8 @@ proc unsubscribe*[K](t: MessageCache[K], topic: K) =
return return
t.table.del(topic) t.table.del(topic)
proc unsubscribeAll*[K](t: MessageCache[K]) =
t.table.clear()
proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) = proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) =
if not t.isSubscribed(topic): if not t.isSubscribed(topic):

View File

@ -4,8 +4,10 @@ else:
{.push raises: [].} {.push raises: [].}
import import
json,
std/sets, std/sets,
stew/byteutils, stew/byteutils,
strformat,
chronicles, chronicles,
json_serialization, json_serialization,
json_serialization/std/options, json_serialization/std/options,
@ -19,9 +21,9 @@ import
export types export types
logScope: logScope:
topics = "waku node rest client" topics = "waku node rest client v2"
proc encodeBytes*(value: FilterSubscriptionsRequest, proc encodeBytes*(value: FilterSubscribeRequest,
contentType: string): RestResult[seq[byte]] = contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON: if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType error "Unsupported contentType value", contentType = contentType
@ -30,30 +32,68 @@ proc encodeBytes*(value: FilterSubscriptionsRequest,
let encoded = ?encodeIntoJsonBytes(value) let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded) return ok(encoded)
proc decodeBytes*(t: typedesc[string], value: openarray[byte], proc encodeBytes*(value: FilterSubscriberPing,
contentType: Opt[ContentTypeData]): RestResult[string] = contentType: string): RestResult[seq[byte]] =
if MediaType.init($contentType) != MIMETYPE_TEXT: if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType") return err("Unsupported contentType")
var res: string let encoded = ?encodeIntoJsonBytes(value)
if len(value) > 0: return ok(encoded)
res = newString(len(value))
copyMem(addr res[0], unsafeAddr value[0], len(value))
return ok(res)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) proc encodeBytes*(value: FilterUnsubscribeRequest,
proc filterPostSubscriptionsV1*(body: FilterSubscriptionsRequest): contentType: string): RestResult[seq[byte]] =
RestResponse[string] if MediaType.init(contentType) != MIMETYPE_JSON:
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.} error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) let encoded = ?encodeIntoJsonBytes(value)
proc filterDeleteSubscriptionsV1*(body: FilterSubscriptionsRequest): return ok(encoded)
RestResponse[string]
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.}
proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], proc encodeBytes*(value: FilterUnsubscribeAllRequest,
data: openArray[byte], contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
proc decodeBytes*(t: typedesc[FilterSubscriptionResponse],
value: openarray[byte],
contentType: Opt[ContentTypeData]):
RestResult[FilterSubscriptionResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let decoded = ?decodeFromJsonBytes(FilterSubscriptionResponse, value)
return ok(decoded)
proc filterSubscriberPing*(requestId: string):
RestResponse[FilterSubscriptionResponse]
{.rest, endpoint: "/filter/v2/subscriptions/{requestId}", meth: HttpMethod.MethodGet.}
proc filterPostSubscriptions*(body: FilterSubscribeRequest):
RestResponse[FilterSubscriptionResponse]
{.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodPost.}
proc filterPutSubscriptions*(body: FilterSubscribeRequest):
RestResponse[FilterSubscriptionResponse]
{.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodPut.}
proc filterDeleteSubscriptions*(body: FilterUnsubscribeRequest):
RestResponse[FilterSubscriptionResponse]
{.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodDelete.}
proc filterDeleteAllSubscriptions*(body: FilterUnsubscribeAllRequest):
RestResponse[FilterSubscriptionResponse]
{.rest, endpoint: "/filter/v2/subscriptions/all", meth: HttpMethod.MethodDelete.}
proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
data: openArray[byte],
contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] = contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON: if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported response contentType value", contentType = contentType error "Unsupported response contentType value", contentType = contentType
@ -62,7 +102,6 @@ proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data) let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data)
return ok(decoded) return ok(decoded)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) proc filterGetMessagesV1*(contentTopic: string):
proc filterGetMessagesV1*(contentTopic: string): RestResponse[FilterGetMessagesResponse]
RestResponse[FilterGetMessagesResponse] {.rest, endpoint: "/filter/v2/messages/{contentTopic}", meth: HttpMethod.MethodGet.}
{.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.}

View File

@ -4,6 +4,7 @@ else:
{.push raises: [].} {.push raises: [].}
import import
std/strformat,
std/sequtils, std/sequtils,
stew/byteutils, stew/byteutils,
chronicles, chronicles,
@ -14,24 +15,28 @@ import
import import
../../../waku_core, ../../../waku_core,
../../../waku_filter, ../../../waku_filter,
../../../waku_filter/client, ../../../waku_filter_v2,
../../../waku_filter_v2/client as filter_protocol_client,
../../../waku_filter_v2/common as filter_protocol_type,
../../message_cache, ../../message_cache,
../../peer_manager, ../../peer_manager,
../../waku_node, ../../waku_node,
../serdes, ../serdes,
../responses, ../responses,
./types ./types
export types export types
logScope: logScope:
topics = "waku node rest filter_api" topics = "waku node rest filter_api_v2"
const futTimeoutForSubscriptionProcessing* = 5.seconds const futTimeoutForSubscriptionProcessing* = 5.seconds
#### Request handlers #### Request handlers
const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions" const ROUTE_FILTER_SUBSCRIPTIONS* = "/filter/v2/subscriptions"
const ROUTE_FILTER_ALL_SUBSCRIPTIONS* = "/filter/v2/subscriptions/all"
const filterMessageCacheDefaultCapacity* = 30 const filterMessageCacheDefaultCapacity* = 30
@ -50,85 +55,258 @@ func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiR
let requestResult = decodeFromJsonBytes(T, reqBodyData) let requestResult = decodeFromJsonBytes(T, reqBodyData)
if requestResult.isErr(): if requestResult.isErr():
return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & return err(RestApiResponse.badRequest("Invalid content body, could not decode. " &
$requestResult.error)) $requestResult.error))
return ok(requestResult.get()) return ok(requestResult.get())
proc installFilterPostSubscriptionsV1Handler*(router: var RestRouter, proc getErrorCause(err: filter_protocol_type.FilterSubscribeError): string =
node: WakuNode, ## Retrieve proper error cause of FilterSubscribeError - due stringify make some parts of text double
cache: MessageCache) =
let pushHandler: FilterPushHandler =
proc(pubsubTopic: PubsubTopic,
msg: WakuMessage) {.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: case err.kind:
## Subscribes a node to a list of contentTopics of a pubsubTopic of FilterSubscribeErrorKind.PEER_DIAL_FAILURE:
# debug "post_waku_v2_filter_v1_subscriptions" err.address
of FilterSubscribeErrorKind.BAD_RESPONSE, FilterSubscribeErrorKind.BAD_REQUEST,
FilterSubscribeErrorKind.NOT_FOUND, FilterSubscribeErrorKind.SERVICE_UNAVAILABLE:
err.cause
of FilterSubscribeErrorKind.UNKNOWN:
"UNKNOWN"
let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) proc convertResponse(T: type FilterSubscriptionResponse, requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeResult): T =
## Properly convert filter protocol's response to rest response
if decodedBody.isErr(): if protocolClientRes.isErr():
return decodedBody.error return FilterSubscriptionResponse(
requestId: requestId,
statusCode: uint32(protocolClientRes.error().kind),
statusDesc: getErrorCause(protocolClientRes.error())
)
else:
return FilterSubscriptionResponse(
requestId: requestId,
statusCode: 0,
statusDesc: ""
)
let req: FilterSubscriptionsRequest = decodedBody.value() proc convertResponse(T: type FilterSubscriptionResponse, requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeError): T =
## Properly convert filter protocol's response to rest response in case of error
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) return FilterSubscriptionResponse(
requestId: requestId,
statusCode: uint32(protocolClientRes.kind),
statusDesc: $protocolClientRes
)
if peerOpt.isNone(): proc convertErrorKindToHttpStatus(kind: filter_protocol_type.FilterSubscribeErrorKind): HttpCode =
return RestApiResponse.internalServerError("No suitable remote filter peers") ## Filter protocol's error code is not directly convertible to HttpCodes hence this converter
let subFut = node.filterSubscribe(req.pubsubTopic, case kind:
req.contentFilters, of filter_protocol_type.FilterSubscribeErrorKind.UNKNOWN:
pushHandler, return Http200
of filter_protocol_type.FilterSubscribeErrorKind.PEER_DIAL_FAILURE:
return Http504 #gateway timout
of filter_protocol_type.FilterSubscribeErrorKind.BAD_RESPONSE:
return Http500 # internal server error
of filter_protocol_type.FilterSubscribeErrorKind.BAD_REQUEST:
return Http400
of filter_protocol_type.FilterSubscribeErrorKind.NOT_FOUND:
return Http404
of filter_protocol_type.FilterSubscribeErrorKind.SERVICE_UNAVAILABLE:
return Http503
else:
return Http500
proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeResult): RestApiResponse =
let filterSubscriptionResponse = FilterSubscriptionResponse.convertResponse(requestId, protocolClientRes)
var httpStatus : HttpCode = Http200
if protocolClientRes.isErr():
httpStatus = convertErrorKindToHttpStatus(protocolClientRes.error().kind) # TODO: convert status codes!
let resp = RestApiResponse.jsonResponse(filterSubscriptionResponse, status=httpStatus)
if resp.isErr():
error "An error ocurred while building the json respose: ", error=resp.error
return RestApiResponse.internalServerError(fmt("An error ocurred while building the json respose: {resp.error}"))
return resp.get()
proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeError): RestApiResponse =
let filterSubscriptionResponse = FilterSubscriptionResponse.convertResponse(requestId, protocolClientRes)
let httpStatus = convertErrorKindToHttpStatus(protocolClientRes.kind) # TODO: convert status codes!
let resp = RestApiResponse.jsonResponse(filterSubscriptionResponse, status=httpStatus)
if resp.isErr():
error "An error ocurred while building the json respose: ", error=resp.error
return RestApiResponse.internalServerError(fmt("An error ocurred while building the json respose: {resp.error}"))
return resp.get()
proc filterPostPutSubscriptionRequestHandler(node: WakuNode,
contentBody: Option[ContentBody],
cache: MessageCache):
Future[RestApiResponse]
{.async.} =
## handles any filter subscription requests, adds or modifies.
let decodedBody = decodeRequestBody[FilterSubscribeRequest](contentBody)
if decodedBody.isErr():
return makeRestResponse("unknown", FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}")))
let req: FilterSubscribeRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
if peerOpt.isNone():
return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("No suitable peers"))
let subFut = node.filterSubscribe(req.pubsubTopic,
req.contentFilters,
peerOpt.get()) peerOpt.get())
if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to subscribe to contentFilters do to timeout!" error "Failed to subscribe to contentFilters do to timeout!"
return RestApiResponse.internalServerError("Failed to subscribe to contentFilters") return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("Subscription request timed out"))
# Successfully subscribed to all content filters # Successfully subscribed to all content filters
for cTopic in req.contentFilters: for cTopic in req.contentFilters:
cache.subscribe(cTopic) cache.subscribe(cTopic)
return RestApiResponse.ok() return makeRestResponse(req.requestId, subFut.read())
proc installFilterDeleteSubscriptionsV1Handler*(router: var RestRouter, proc installFilterPostSubscriptionsHandler(router: var RestRouter,
node: WakuNode, node: WakuNode,
cache: MessageCache) = cache: MessageCache) =
router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a pubsubTopic
debug "post", ROUTE_FILTER_SUBSCRIPTIONS, contentBody
let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache)
return response
proc installFilterPutSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodPut, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Modifies a subscribtion of a node to a list of contentTopics of a pubsubTopic
debug "put", ROUTE_FILTER_SUBSCRIPTIONS, contentBody
let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache)
return response
proc installFilterDeleteSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic ## Subscribes a node to a list of contentTopics of a PubSub topic
# debug "delete_waku_v2_filter_v1_subscriptions" debug "delete", ROUTE_FILTER_SUBSCRIPTIONS, contentBody
let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) let decodedBody = decodeRequestBody[FilterUnsubscribeRequest](contentBody)
if decodedBody.isErr(): if decodedBody.isErr():
return decodedBody.error return makeRestResponse("unknown",
FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}")))
let req: FilterSubscriptionsRequest = decodedBody.value() let req: FilterUnsubscribeRequest = decodedBody.value()
let unsubFut = node.unsubscribe(req.pubsubTopic, req.contentFilters) let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
if peerOpt.isNone():
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable("No suitable peers"))
let unsubFut = node.filterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get())
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!" error "Failed to unsubscribe from contentFilters due to timeout!"
return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters") return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable(
"Failed to unsubscribe from contentFilters due to timeout!"))
# Successfully subscribed to all content filters
for cTopic in req.contentFilters: for cTopic in req.contentFilters:
cache.unsubscribe(cTopic) cache.unsubscribe(cTopic)
# Successfully unsubscribed from all requested contentTopics # Successfully unsubscribed from all requested contentTopics
return RestApiResponse.ok() return makeRestResponse(req.requestId, unsubFut.read())
const ROUTE_RELAY_MESSAGESV1* = "/filter/v1/messages/{contentTopic}" proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodDelete, ROUTE_FILTER_ALL_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
debug "delete", ROUTE_FILTER_ALL_SUBSCRIPTIONS, contentBody
proc installFilterGetMessagesV1Handler*(router: var RestRouter, let decodedBody = decodeRequestBody[FilterUnsubscribeAllRequest](contentBody)
node: WakuNode,
cache: MessageCache) = if decodedBody.isErr():
router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (contentTopic: string) -> RestApiResponse: return makeRestResponse("unknown",
FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}")))
let req: FilterUnsubscribeAllRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
if peerOpt.isNone():
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable("No suitable peers"))
let unsubFut = node.filterUnsubscribeAll(peerOpt.get())
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable(
"Failed to unsubscribe from all contentFilters due to timeout!"))
cache.unsubscribeAll()
# Successfully unsubscribed from all requested contentTopics
return makeRestResponse(req.requestId, unsubFut.read())
const ROUTE_FILTER_SUBSCRIBER_PING* = "/filter/v2/subscriptions/{requestId}"
proc installFilterPingSubscriberHandler(router: var RestRouter,
node: WakuNode) =
router.api(MethodGet, ROUTE_FILTER_SUBSCRIBER_PING) do (requestId: string) -> RestApiResponse:
## Checks if a node has valid subscription or not.
debug "get", ROUTE_FILTER_SUBSCRIBER_PING, requestId
let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
if peerOpt.isNone():
return makeRestResponse(requestId.get(),
FilterSubscribeError.serviceUnavailable("No suitable remote filter peers"))
let pingFutRes = node.wakuFilterClient.ping(peerOpt.get())
if not await pingFutRes.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to ping filter service peer due to timeout!"
return makeRestResponse(requestId.get(),
FilterSubscribeError.serviceUnavailable("Ping timed out"))
return makeRestResponse(requestId.get(), pingFutRes.read())
const ROUTE_FILTER_MESSAGES* = "/filter/v2/messages/{contentTopic}"
proc installFilterGetMessagesHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
let pushHandler : FilterPushHandler = proc (pubsubTopic: PubsubTopic,
msg: WakuMessage)
{.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
node.wakuFilterClient.registerPushHandler(pushHandler)
router.api(MethodGet, ROUTE_FILTER_MESSAGES) do (contentTopic: string) -> RestApiResponse:
## Returns all WakuMessages received on a specified content topic since the ## Returns all WakuMessages received on a specified content topic since the
## last time this method was called ## last time this method was called
## TODO: ability to specify a return message limit ## TODO: ability to specify a return message limit, maybe use cursor to control paging response.
# debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic debug "get", ROUTE_FILTER_MESSAGES, contentTopic=contentTopic
if contentTopic.isErr(): if contentTopic.isErr():
return RestApiResponse.badRequest("Missing contentTopic") return RestApiResponse.badRequest("Missing contentTopic")
@ -147,9 +325,12 @@ proc installFilterGetMessagesV1Handler*(router: var RestRouter,
return resp.get() return resp.get()
proc installFilterApiHandlers*(router: var RestRouter, proc installFilterRestApiHandlers*(router: var RestRouter,
node: WakuNode, node: WakuNode,
cache: MessageCache) = cache: MessageCache) =
installFilterPostSubscriptionsV1Handler(router, node, cache) installFilterPingSubscriberHandler(router, node)
installFilterDeleteSubscriptionsV1Handler(router, node, cache) installFilterPostSubscriptionsHandler(router, node, cache)
installFilterGetMessagesV1Handler(router, node, cache) installFilterPutSubscriptionsHandler(router, node, cache)
installFilterDeleteSubscriptionsHandler(router, node, cache)
installFilterDeleteAllSubscriptionsHandler(router, node, cache)
installFilterGetMessagesHandler(router, node, cache)

View File

@ -0,0 +1,68 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/sets,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import
../../../waku_core,
../serdes,
../responses,
./types
export types
logScope:
topics = "waku node rest client v1"
proc encodeBytes*(value: FilterLegacySubscribeRequest,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
proc decodeBytes*(t: typedesc[string], value: openarray[byte],
contentType: Opt[ContentTypeData]): RestResult[string] =
if MediaType.init($contentType) != MIMETYPE_TEXT:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
var res: string
if len(value) > 0:
res = newString(len(value))
copyMem(addr res[0], unsafeAddr value[0], len(value))
return ok(res)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterPostSubscriptionsV1*(body: FilterLegacySubscribeRequest):
RestResponse[string]
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterDeleteSubscriptionsV1*(body: FilterLegacySubscribeRequest):
RestResponse[string]
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.}
proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
data: openArray[byte],
contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported response contentType value", contentType = contentType
return err("Unsupported response contentType")
let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data)
return ok(decoded)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterGetMessagesV1*(contentTopic: string):
RestResponse[FilterGetMessagesResponse]
{.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.}

View File

@ -0,0 +1,160 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/sequtils,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/route,
presto/common
import
../../../waku_core,
../../../waku_filter,
../../../waku_filter/client,
../../message_cache,
../../peer_manager,
../../waku_node,
../serdes,
../responses,
./types
export types
logScope:
topics = "waku node rest filter_api v1"
const futTimeoutForSubscriptionProcessing* = 5.seconds
#### Request handlers
const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions"
const filterMessageCacheDefaultCapacity* = 30
type
MessageCache* = message_cache.MessageCache[ContentTopic]
func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] =
if contentBody.isNone():
return err(RestApiResponse.badRequest("Missing content body"))
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json"))
let reqBodyData = contentBody.get().data
let requestResult = decodeFromJsonBytes(T, reqBodyData)
if requestResult.isErr():
return err(RestApiResponse.badRequest("Invalid content body, could not decode. " &
$requestResult.error))
return ok(requestResult.get())
proc installFilterV1PostSubscriptionsV1Handler*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
let pushHandler: FilterPushHandler =
proc(pubsubTopic: PubsubTopic,
msg: WakuMessage) {.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a pubsubTopic
debug "post", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody
let decodedBody = decodeRequestBody[FilterLegacySubscribeRequest](contentBody)
if decodedBody.isErr():
return decodedBody.error
let req: FilterLegacySubscribeRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isNone():
return RestApiResponse.internalServerError("No suitable remote filter peers")
let subFut = node.legacyFilterSubscribe(req.pubsubTopic,
req.contentFilters,
pushHandler,
peerOpt.get())
if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to subscribe to contentFilters do to timeout!"
return RestApiResponse.internalServerError("Failed to subscribe to contentFilters")
# Successfully subscribed to all content filters
for cTopic in req.contentFilters:
cache.subscribe(cTopic)
return RestApiResponse.ok()
proc installFilterV1DeleteSubscriptionsV1Handler*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
debug "delete", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody
let decodedBody = decodeRequestBody[FilterLegacySubscribeRequest](contentBody)
if decodedBody.isErr():
return decodedBody.error
let req: FilterLegacySubscribeRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isNone():
return RestApiResponse.internalServerError("No suitable remote filter peers")
let unsubFut = node.legacyFilterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get())
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters")
for cTopic in req.contentFilters:
cache.unsubscribe(cTopic)
# Successfully unsubscribed from all requested contentTopics
return RestApiResponse.ok()
const ROUTE_FILTER_MESSAGESV1* = "/filter/v1/messages/{contentTopic}"
proc installFilterV1GetMessagesV1Handler*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodGet, ROUTE_FILTER_MESSAGESV1) do (contentTopic: string) -> RestApiResponse:
## Returns all WakuMessages received on a specified content topic since the
## last time this method was called
## TODO: ability to specify a return message limit
debug "get", ROUTE_FILTER_MESSAGESV1, contentTopic=contentTopic
if contentTopic.isErr():
return RestApiResponse.badRequest("Missing contentTopic")
let contentTopic = contentTopic.get()
let msgRes = cache.getMessages(contentTopic, clear=true)
if msgRes.isErr():
return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic)
let data = FilterGetMessagesResponse(msgRes.get().map(toFilterWakuMessage))
let resp = RestApiResponse.jsonResponse(data, status=Http200)
if resp.isErr():
error "An error ocurred while building the json respose: ", error=resp.error
return RestApiResponse.internalServerError("An error ocurred while building the json respose")
return resp.get()
proc installLegacyFilterRestApiHandlers*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
installFilterV1PostSubscriptionsV1Handler(router, node, cache)
installFilterV1DeleteSubscriptionsV1Handler(router, node, cache)
installFilterV1GetMessagesV1Handler(router, node, cache)

View File

@ -1,6 +1,6 @@
openapi: 3.0.3 openapi: 3.0.3
info: info:
title: Waku V2 node REST API title: Waku V2 node REST API
version: 1.0.0 version: 1.0.0
contact: contact:
name: VAC Team name: VAC Team
@ -10,6 +10,8 @@ tags:
description: Filter REST API for WakuV2 node description: Filter REST API for WakuV2 node
paths: paths:
# Legacy support for v1 waku filter
# TODO: legacy endpoint, remove in the future
/filter/v1/subscriptions: /filter/v1/subscriptions:
post: # post_waku_v2_filter_v1_subscription post: # post_waku_v2_filter_v1_subscription
summary: Subscribe a node to an array of topics summary: Subscribe a node to an array of topics
@ -21,7 +23,7 @@ paths:
content: content:
application/json: application/json:
schema: schema:
$ref: '#/components/schemas/FilterSubscriptionsRequest' $ref: '#/components/schemas/FilterLegacySubscribeRequest'
responses: responses:
'200': '200':
description: OK description: OK
@ -32,8 +34,16 @@ paths:
# TODO: Review the possible errors of this endpoint # TODO: Review the possible errors of this endpoint
'400': '400':
description: Bad request. description: Bad request.
content:
text/plain:
schema:
type: string
'5XX': '5XX':
description: Unexpected error. description: Unexpected error.
content:
text/plain:
schema:
type: string
delete: # delete_waku_v2_filter_v1_subscription delete: # delete_waku_v2_filter_v1_subscription
summary: Unsubscribe a node from an array of topics summary: Unsubscribe a node from an array of topics
@ -45,7 +55,7 @@ paths:
content: content:
application/json: application/json:
schema: schema:
$ref: '#/components/schemas/FilterSubscriptionsRequest' $ref: '#/components/schemas/FilterLegacySubscribeRequest'
responses: responses:
'200': '200':
description: OK description: OK
@ -56,12 +66,24 @@ paths:
# TODO: Review the possible errors of this endpoint # TODO: Review the possible errors of this endpoint
'400': '400':
description: Bad request. description: Bad request.
content:
text/plain:
schema:
type: string
'404': '404':
description: Not found. description: Not found.
content:
text/plain:
schema:
type: string
'5XX': '5XX':
description: Unexpected error. description: Unexpected error.
content:
text/plain:
schema:
type: string
# TODO: Review the path of this endpoint due maybe query for list of contentTopics matching # TODO: legacy endpoint, remove in the future
/filter/v1/messages/{contentTopic}: /filter/v1/messages/{contentTopic}:
get: # get_waku_v2_filter_v1_messages get: # get_waku_v2_filter_v1_messages
summary: Get the latest messages on the polled content topic summary: Get the latest messages on the polled content topic
@ -86,10 +108,270 @@ paths:
# TODO: Review the possible errors of this endpoint # TODO: Review the possible errors of this endpoint
'400': '400':
description: Bad request. description: Bad request.
content:
text/plain:
schema:
type: string
'404': '404':
description: Not found. description: Not found.
content:
text/plain:
schema:
type: string
'5XX': '5XX':
description: Unexpected error. description: Unexpected error.
content:
text/plain:
schema:
type: string
/filter/v2/subscriptions/{requestId}:
get: # get_waku_v2_filter_v2_subscription - ping
summary: Subscriber-ping - a peer can query if there is a registered subscription for it
description: |
Subscriber peer can query its subscription existence on service node.
Returns HTTP200 if exists and HTTP404 if not.
Client must not fill anything but requestId in the request body.
operationId: subscriberPing
tags:
- filter
parameters:
- in: path
name: requestId
required: true
schema:
type: string
description: Id of ping request
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
/filter/v2/subscriptions:
post: # post_waku_v2_filter_v2_subscription
summary: Subscribe a peer to an array of content topics under a pubsubTopic
description: |
Subscribe a peer to an array of content topics under a pubsubTopic.
It is allowed to refresh or add new content topic to an existing subscription.
Fields pubsubTopic and contentFilters must be filled.
operationId: postSubscriptions
tags:
- filter
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscribeRequest'
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
put: # put_waku_v2_filter_v2_subscription
summary: Modify existing subscription of a peer under a pubsubTopic
description: |
Modify existing subscription of a peer under a pubsubTopic.
It is allowed to refresh or add new content topic to an existing subscription.
Fields pubsubTopic and contentFilters must be filled.
operationId: putSubscriptions
tags:
- filter
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscribeRequest'
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
delete: # delete_waku_v2_filter_v2_subscription
summary: Unsubscribe a peer from content topics
description: |
Unsubscribe a peer from content topics
Only that subscription will be removed which matches existing.
operationId: deleteSubscriptions
tags:
- filter
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterUnsubscribeRequest'
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
/filter/v2/subscriptions/all:
delete: # delete_waku_v2_filter_v2_subscription
summary: Unsubscribe a peer from all content topics
description: |
Unsubscribe a peer from all content topics
operationId: deleteAllSubscriptions
tags:
- filter
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterUnsubscribeAllRequest'
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
/filter/v2/messages/{contentTopic}:
get: # get_waku_v2_filter_v2_messages
summary: Get the latest messages on the polled content topic
description: Get a list of messages that were received on a subscribed content topic after the last time this method was called.
operationId: getMessagesByTopic
tags:
- filter
parameters:
- in: path
name: contentTopic # Note the name is the same as in the path
required: true
schema:
type: string
description: Content topic of message
responses:
'200':
description: The latest messages on the polled topic.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterGetMessagesResponse'
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
text/plain:
schema:
type: string
'404':
description: Not found.
content:
text/plain:
schema:
type: string
'5XX':
description: Unexpected error.
content:
text/plain:
schema:
type: string
components: components:
schemas: schemas:
@ -97,7 +379,7 @@ components:
type: string type: string
ContentTopic: ContentTopic:
type: string type: string
FilterWakuMessage: FilterWakuMessage:
type: object type: object
properties: properties:
@ -113,19 +395,68 @@ components:
required: required:
- payload - payload
FilterSubscriptionsRequest: FilterLegacySubscribeRequest:
type: object type: object
properties: properties:
contentFilters: contentFilters:
type: array type: array
items: items:
$ref: '#/components/schemas/ContentTopic' $ref: '#/components/schemas/ContentTopic'
pubsubTopic: pubsubTopic:
$ref: "#/components/schemas/PubSubTopic" $ref: "#/components/schemas/PubSubTopic"
required: required:
- contentFilters - contentFilters
FilterGetMessagesResponse: FilterGetMessagesResponse:
type: array type: array
items: items:
$ref: '#/components/schemas/FilterWakuMessage' $ref: '#/components/schemas/FilterWakuMessage'
FilterSubscribeRequest:
type: object
properties:
requestId:
type: string
contentFilters:
type: array
items:
$ref: '#/components/schemas/ContentTopic'
pubsubTopic:
$ref: "#/components/schemas/PubSubTopic"
required:
- requestId
- contentFilters
- pubsubTopic
FilterUnsubscribeRequest:
type: object
properties:
requestId:
type: string
contentFilters:
type: array
items:
$ref: '#/components/schemas/ContentTopic'
pubsubTopic:
$ref: "#/components/schemas/PubSubTopic"
required:
- requestId
- contentFilters
FilterUnsubscribeAllRequest:
type: object
properties:
requestId:
type: string
required:
- requestId
FilterSubscriptionResponse:
type: object
properties:
requestId:
type: string
statusDesc:
type: string
required:
- requestId

View File

@ -8,7 +8,8 @@ import
chronicles, chronicles,
json_serialization, json_serialization,
json_serialization/std/options, json_serialization/std/options,
presto/[route, client, common] presto/[route, client, common],
libp2p/peerid
import import
../../../common/base64, ../../../common/base64,
../../../waku_core, ../../../waku_core,
@ -24,10 +25,32 @@ type FilterWakuMessage* = object
type FilterGetMessagesResponse* = seq[FilterWakuMessage] type FilterGetMessagesResponse* = seq[FilterWakuMessage]
type FilterSubscriptionsRequest* = object type FilterLegacySubscribeRequest* = object
# Subscription request for legacy filter support
pubsubTopic*: Option[PubSubTopic] pubsubTopic*: Option[PubSubTopic]
contentFilters*: seq[ContentTopic] contentFilters*: seq[ContentTopic]
type FilterSubscriberPing* = object
requestId*: string
type FilterSubscribeRequest* = object
requestId*: string
pubsubTopic*: Option[PubSubTopic]
contentFilters*: seq[ContentTopic]
type FilterUnsubscribeRequest* = object
requestId*: string
pubsubTopic*: Option[PubSubTopic]
contentFilters*: seq[ContentTopic]
type FilterUnsubscribeAllRequest* = object
requestId*: string
type FilterSubscriptionResponse* = object
requestId*: string
statusCode*: uint32
statusDesc*: string
#### Type conversion #### Type conversion
proc toFilterWakuMessage*(msg: WakuMessage): FilterWakuMessage = proc toFilterWakuMessage*(msg: WakuMessage): FilterWakuMessage =
@ -65,7 +88,7 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: FilterWakuMessage)
writer.writeField("timestamp", value.timestamp) writer.writeField("timestamp", value.timestamp)
writer.endRecord() writer.endRecord()
proc writeValue*(writer: var JsonWriter[RestJson], value: FilterSubscriptionsRequest) proc writeValue*(writer: var JsonWriter[RestJson], value: FilterLegacySubscribeRequest)
{.raises: [IOError].} = {.raises: [IOError].} =
writer.beginRecord() writer.beginRecord()
writer.writeField("pubsubTopic", value.pubsubTopic) writer.writeField("pubsubTopic", value.pubsubTopic)
@ -114,8 +137,8 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage)
timestamp: timestamp timestamp: timestamp
) )
proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionsRequest) proc readValue*(reader: var JsonReader[RestJson], value: var FilterLegacySubscribeRequest)
{.raises: [SerializationError, IOError].} = {.raises: [SerializationError, IOError].} =
var var
pubsubTopic = none(PubsubTopic) pubsubTopic = none(PubsubTopic)
contentFilters = none(seq[ContentTopic]) contentFilters = none(seq[ContentTopic])
@ -126,7 +149,7 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions
if keys.containsOrIncl(fieldName): if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found" let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found" except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterSubscriptionsRequest") reader.raiseUnexpectedField(err, "FilterLegacySubscribeRequest")
case fieldName case fieldName
of "pubsubTopic": of "pubsubTopic":
@ -136,8 +159,70 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions
else: else:
unrecognizedFieldWarning() unrecognizedFieldWarning()
if pubsubTopic.isNone(): if contentFilters.isNone():
reader.raiseUnexpectedValue("Field `pubsubTopic` is missing") reader.raiseUnexpectedValue("Field `contentFilters` is missing")
if contentFilters.get().len() == 0:
reader.raiseUnexpectedValue("Field `contentFilters` is empty")
value = FilterLegacySubscribeRequest(
pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()),
contentFilters: contentFilters.get()
)
proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriberPing)
{.raises: [SerializationError, IOError].} =
var
requestId = none(string)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterSubscriberPing")
case fieldName
of "requestId":
requestId = some(reader.readValue(string))
else:
unrecognizedFieldWarning()
if requestId.isNone():
reader.raiseUnexpectedValue("Field `requestId` is missing")
value = FilterSubscriberPing(
requestId: requestId.get()
)
proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscribeRequest)
{.raises: [SerializationError, IOError].} =
var
requestId = none(string)
pubsubTopic = none(PubsubTopic)
contentFilters = none(seq[ContentTopic])
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterSubscribeRequest")
case fieldName
of "requestId":
requestId = some(reader.readValue(string))
of "pubsubTopic":
pubsubTopic = some(reader.readValue(PubsubTopic))
of "contentFilters":
contentFilters = some(reader.readValue(seq[ContentTopic]))
else:
unrecognizedFieldWarning()
if requestId.isNone():
reader.raiseUnexpectedValue("Field `requestId` is missing")
if contentFilters.isNone(): if contentFilters.isNone():
reader.raiseUnexpectedValue("Field `contentFilters` is missing") reader.raiseUnexpectedValue("Field `contentFilters` is missing")
@ -145,7 +230,108 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions
if contentFilters.get().len() == 0: if contentFilters.get().len() == 0:
reader.raiseUnexpectedValue("Field `contentFilters` is empty") reader.raiseUnexpectedValue("Field `contentFilters` is empty")
value = FilterSubscriptionsRequest( value = FilterSubscribeRequest(
pubsubTopic: if pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()), requestId: requestId.get(),
pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()),
contentFilters: contentFilters.get() contentFilters: contentFilters.get()
) )
proc readValue*(reader: var JsonReader[RestJson], value: var FilterUnsubscribeRequest)
{.raises: [SerializationError, IOError].} =
var
requestId = none(string)
pubsubTopic = none(PubsubTopic)
contentFilters = none(seq[ContentTopic])
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterUnsubscribeRequest")
case fieldName
of "requestId":
requestId = some(reader.readValue(string))
of "pubsubTopic":
pubsubTopic = some(reader.readValue(PubsubTopic))
of "contentFilters":
contentFilters = some(reader.readValue(seq[ContentTopic]))
else:
unrecognizedFieldWarning()
if requestId.isNone():
reader.raiseUnexpectedValue("Field `requestId` is missing")
if contentFilters.isNone():
reader.raiseUnexpectedValue("Field `contentFilters` is missing")
if contentFilters.get().len() == 0:
reader.raiseUnexpectedValue("Field `contentFilters` is empty")
value = FilterUnsubscribeRequest(
requestId: requestId.get(),
pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()),
contentFilters: contentFilters.get()
)
proc readValue*(reader: var JsonReader[RestJson], value: var FilterUnsubscribeAllRequest)
{.raises: [SerializationError, IOError].} =
var
requestId = none(string)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterUnsubscribeAllRequest")
case fieldName
of "requestId":
requestId = some(reader.readValue(string))
else:
unrecognizedFieldWarning()
if requestId.isNone():
reader.raiseUnexpectedValue("Field `requestId` is missing")
value = FilterUnsubscribeAllRequest(
requestId: requestId.get(),
)
proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionResponse)
{.raises: [SerializationError, IOError].} =
var
requestId = none(string)
statusCode = none(uint32)
statusDesc = none(string)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterSubscriptionResponse")
case fieldName
of "requestId":
requestId = some(reader.readValue(string))
of "statusCode":
statusCode = some(reader.readValue(uint32))
of "statusDesc":
statusDesc = some(reader.readValue(string))
else:
unrecognizedFieldWarning()
if requestId.isNone():
reader.raiseUnexpectedValue("Field `requestId` is missing")
value = FilterSubscriptionResponse(
requestId: requestId.get(),
statusCode: statusCode.get(),
statusDesc: statusDesc.get("")
)

View File

@ -31,8 +31,9 @@ import
../waku_store, ../waku_store,
../waku_store/client as store_client, ../waku_store/client as store_client,
../waku_filter as legacy_filter, #TODO: support for legacy filter protocol will be removed ../waku_filter as legacy_filter, #TODO: support for legacy filter protocol will be removed
../waku_filter/client as filter_client, #TODO: support for legacy filter protocol will be removed ../waku_filter/client as legacy_filter_client, #TODO: support for legacy filter protocol will be removed
../waku_filter_v2, ../waku_filter_v2,
../waku_filter_v2/client as filter_client,
../waku_lightpush, ../waku_lightpush,
../waku_lightpush/client as lightpush_client, ../waku_lightpush/client as lightpush_client,
../waku_enr, ../waku_enr,
@ -41,7 +42,8 @@ import
../waku_rln_relay, ../waku_rln_relay,
./config, ./config,
./peer_manager, ./peer_manager,
./waku_switch ./waku_switch,
./rest/relay/topic_cache
declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicCounter waku_node_messages, "number of messages received", ["type"]
@ -88,8 +90,9 @@ type
wakuStore*: WakuStore wakuStore*: WakuStore
wakuStoreClient*: WakuStoreClient wakuStoreClient*: WakuStoreClient
wakuFilter*: waku_filter_v2.WakuFilter wakuFilter*: waku_filter_v2.WakuFilter
wakuFilterClient*: filter_client.WakuFilterClient
wakuFilterLegacy*: legacy_filter.WakuFilterLegacy #TODO: support for legacy filter protocol will be removed wakuFilterLegacy*: legacy_filter.WakuFilterLegacy #TODO: support for legacy filter protocol will be removed
wakuFilterClientLegacy*: WakuFilterClientLegacy #TODO: support for legacy filter protocol will be removed wakuFilterClientLegacy*: legacy_filter_client.WakuFilterClientLegacy #TODO: support for legacy filter protocol will be removed
wakuRlnRelay*: WakuRLNRelay wakuRlnRelay*: WakuRLNRelay
wakuLightPush*: WakuLightPush wakuLightPush*: WakuLightPush
wakuLightpushClient*: WakuLightPushClient wakuLightpushClient*: WakuLightPushClient
@ -335,10 +338,10 @@ proc mountRelay*(node: WakuNode,
for topic in topics: for topic in topics:
node.subscribe(topic) node.subscribe(topic)
## Waku filter ## Waku filter
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} = proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout)
{.async, raises: [Defect, LPError]} =
info "mounting filter protocol" info "mounting filter protocol"
node.wakuFilter = WakuFilter.new(node.peerManager) node.wakuFilter = WakuFilter.new(node.peerManager)
node.wakuFilterLegacy = WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout) #TODO: remove legacy node.wakuFilterLegacy = WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout) #TODO: remove legacy
@ -348,32 +351,47 @@ proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {
await node.wakuFilterLegacy.start() #TODO: remove legacy await node.wakuFilterLegacy.start() #TODO: remove legacy
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec)) node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec))
node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuFilterCodec)) #TODO: remove legacy node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuLegacyFilterCodec)) #TODO: remove legacy
proc filterHandleMessage*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.}= proc filterHandleMessage*(node: WakuNode,
if node.wakuFilter.isNil(): pubsubTopic: PubsubTopic,
message: WakuMessage)
{.async.}=
if node.wakuFilter.isNil() or node.wakuFilterLegacy.isNil():
error "cannot handle filter message", error="waku filter is nil" error "cannot handle filter message", error="waku filter is nil"
return return
await node.wakuFilter.handleMessage(pubsubTopic, message) await allFutures(node.wakuFilter.handleMessage(pubsubTopic, message),
await node.wakuFilterLegacy.handleMessage(pubsubTopic, message) #TODO: remove legacy node.wakuFilterLegacy.handleMessage(pubsubTopic, message) #TODO: remove legacy
)
proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} = proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} =
## Mounting both filter clients v1 - legacy and v2.
## Giving option for application level to chose btw own push message handling or
## rely on node provided cache. - This only applies for v2 filter client
info "mounting filter client" info "mounting filter client"
node.wakuFilterClientLegacy = WakuFilterClientLegacy.new(node.peerManager, node.rng) node.wakuFilterClientLegacy = WakuFilterClientLegacy.new(node.peerManager, node.rng)
node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng)
if node.started: if node.started:
# Node has started already. Let's start filter too. await allFutures(node.wakuFilterClientLegacy.start(), node.wakuFilterClient.start())
await node.wakuFilterClientLegacy.start()
node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuFilterCodec)) node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec))
node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuLegacyFilterCodec))
proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], proc legacyFilterSubscribe*(node: WakuNode,
handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = pubsubTopic: Option[PubsubTopic],
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. contentTopics: ContentTopic|seq[ContentTopic],
handler: FilterPushHandler,
peer: RemotePeerInfo|string)
{.async, gcsafe, raises: [Defect, ValueError].} =
## Registers for messages that match a specific filter.
## Triggers the handler whenever a message is received.
if node.wakuFilterClientLegacy.isNil(): if node.wakuFilterClientLegacy.isNil():
error "cannot register filter subscription to topic", error="waku filter client is nil" error "cannot register filter subscription to topic", error="waku legacy filter client is not set up"
return return
let remotePeerRes = parsePeerInfo(peer) let remotePeerRes = parsePeerInfo(peer)
@ -385,21 +403,31 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT
# Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled # Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled
# TODO: Move this logic to wakunode2 app # TODO: Move this logic to wakunode2 app
let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} = # FIXME: This part needs refactoring. It seems possible that in special cases archiver will store same message multiple times.
if node.wakuRelay.isNil() and not node.wakuStore.isNil(): let handlerWrapper: FilterPushHandler =
await node.wakuArchive.handleMessage(pubSubTopic, message) if node.wakuRelay.isNil() and not node.wakuStore.isNil():
proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} =
await handler(pubsubTopic, message) await allFutures(node.wakuArchive.handleMessage(pubSubTopic, message),
handler(pubsubTopic, message))
else:
handler
if pubsubTopic.isSome(): if pubsubTopic.isSome():
info "registering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId info "registering legacy filter subscription to content",
pubsubTopic=pubsubTopic.get(),
contentTopics=contentTopics,
peer=remotePeer.peerId
let res = await node.wakuFilterClientLegacy.subscribe(pubsubTopic.get(), contentTopics, handlerWrapper, peer=remotePeer) let res = await node.wakuFilterClientLegacy.subscribe(pubsubTopic.get(),
contentTopics,
handlerWrapper,
peer=remotePeer)
if res.isOk(): if res.isOk():
info "subscribed to topic", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics info "subscribed to topic", pubsubTopic=pubsubTopic.get(),
contentTopics=contentTopics
else: else:
error "failed filter subscription", error=res.error error "failed legacy filter subscription", error=res.error
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
else: else:
let topicMapRes = parseSharding(pubsubTopic, contentTopics) let topicMapRes = parseSharding(pubsubTopic, contentTopics)
@ -412,7 +440,11 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT
var futures = collect(newSeq): var futures = collect(newSeq):
for pubsub, topics in topicMap.pairs: for pubsub, topics in topicMap.pairs:
info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId info "registering legacy filter subscription to content",
pubsubTopic=pubsub,
contentTopics=topics,
peer=remotePeer.peerId
let content = topics.mapIt($it) let content = topics.mapIt($it)
node.wakuFilterClientLegacy.subscribe($pubsub, content, handlerWrapper, peer=remotePeer) node.wakuFilterClientLegacy.subscribe($pubsub, content, handlerWrapper, peer=remotePeer)
@ -422,15 +454,82 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT
let res = fut.read() let res = fut.read()
if res.isErr(): if res.isErr():
error "failed filter subscription", error=res.error error "failed legacy filter subscription", error=res.error
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
for pubsub, topics in topicMap.pairs: for pubsub, topics in topicMap.pairs:
info "subscribed to topic", pubsubTopic=pubsub, contentTopics=topics info "subscribed to topic", pubsubTopic=pubsub, contentTopics=topics
proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], proc filterSubscribe*(node: WakuNode,
peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = pubsubTopic: Option[PubsubTopic],
## Unsubscribe from a content filter. contentTopics: ContentTopic|seq[ContentTopic],
peer: RemotePeerInfo|string):
Future[FilterSubscribeResult]
{.async, gcsafe, raises: [Defect, ValueError].} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
if node.wakuFilterClient.isNil():
error "cannot register filter subscription to topic", error="waku filter client is not set up"
return err(FilterSubscribeError.serviceUnavailable())
let remotePeerRes = parsePeerInfo(peer)
if remotePeerRes.isErr():
error "Couldn't parse the peer info properly", error = remotePeerRes.error
return err(FilterSubscribeError.serviceUnavailable("No peers available"))
let remotePeer = remotePeerRes.value
if pubsubTopic.isSome():
info "registering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
let subRes = await node.wakuFilterClient.subscribe(remotePeer, pubsubTopic.get(), contentTopics)
if subRes.isOk():
info "v2 subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics
else:
error "failed filter v2 subscription", error=subRes.error
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
return subRes
else:
let topicMapRes = parseSharding(pubsubTopic, contentTopics)
let topicMap =
if topicMapRes.isErr():
error "can't get shard", error=topicMapRes.error
return err(FilterSubscribeError.badResponse("can't get shard"))
else: topicMapRes.get()
var futures = collect(newSeq):
for pubsub, topics in topicMap.pairs:
info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId
let content = topics.mapIt($it)
node.wakuFilterClient.subscribe(remotePeer, $pubsub, content)
let finished = await allFinished(futures)
var subRes: FilterSubscribeResult = FilterSubscribeResult.ok()
for fut in finished:
let res = fut.read()
if res.isErr():
error "failed filter subscription", error=res.error
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
subRes = FilterSubscribeResult.err(res.error)
for pubsub, topics in topicMap.pairs:
info "subscribed to topic", pubsubTopic=pubsub, contentTopics=topics
# return the last error or ok
return subRes
proc legacyFilterUnsubscribe*(node: WakuNode,
pubsubTopic: Option[PubsubTopic],
contentTopics: ContentTopic|seq[ContentTopic],
peer: RemotePeerInfo|string)
{.async, gcsafe, raises: [Defect, ValueError].} =
## Unsubscribe from a content legacy filter.
if node.wakuFilterClientLegacy.isNil(): if node.wakuFilterClientLegacy.isNil():
error "cannot unregister filter subscription to content", error="waku filter client is nil" error "cannot unregister filter subscription to content", error="waku filter client is nil"
return return
@ -443,7 +542,7 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], conten
let remotePeer = remotePeerRes.value let remotePeer = remotePeerRes.value
if pubsubTopic.isSome(): if pubsubTopic.isSome():
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId info "deregistering legacy filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
let res = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic.get(), contentTopics, peer=remotePeer) let res = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic.get(), contentTopics, peer=remotePeer)
@ -479,35 +578,103 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], conten
for pubsub, topics in topicMap.pairs: for pubsub, topics in topicMap.pairs:
info "unsubscribed from topic", pubsubTopic=pubsub, contentTopics=topics info "unsubscribed from topic", pubsubTopic=pubsub, contentTopics=topics
# TODO: Move to application module (e.g., wakunode2.nim) proc filterUnsubscribe*(node: WakuNode,
proc subscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe, pubsubTopic: Option[PubsubTopic],
deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} = contentTopics: seq[ContentTopic],
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. peer: RemotePeerInfo|string):
if node.wakuFilterClientLegacy.isNil():
error "cannot register filter subscription to topic", error="waku filter client is nil"
return
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) Future[FilterSubscribeResult]
if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers"
return
await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get()) {.async, gcsafe, raises: [Defect, ValueError].} =
# TODO: Move to application module (e.g., wakunode2.nim) ## Unsubscribe from a content filter V2".
proc unsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe,
deprecated: "Use the explicit destination peer procedure. Use 'node.filterUnsusbscribe()' instead.".} =
## Unsubscribe from a content filter.
if node.wakuFilterClientLegacy.isNil(): if node.wakuFilterClientLegacy.isNil():
error "cannot unregister filter subscription to content", error="waku filter client is nil" error "cannot unregister filter subscription to content", error="waku filter client is nil"
return return err(FilterSubscribeError.serviceUnavailable())
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) let remotePeerRes = parsePeerInfo(peer)
if peerOpt.isNone(): if remotePeerRes.isErr():
error "cannot register filter subscription to topic", error="no suitable remote peers" error "couldn't parse remotePeerInfo", error = remotePeerRes.error
return return err(FilterSubscribeError.serviceUnavailable("No peers available"))
await node.filterUnsubscribe(pubsubTopic, contentTopics, peer=peerOpt.get()) let remotePeer = remotePeerRes.value
if pubsubTopic.isSome():
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
let unsubRes = await node.wakuFilterClient.unsubscribe(remotePeer, pubsubTopic.get(), contentTopics)
if unsubRes.isOk():
info "unsubscribed from topic", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics
else:
error "failed filter unsubscription", error=unsubRes.error
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
return unsubRes
else: # pubsubTopic.isNone
let topicMapRes = parseSharding(pubsubTopic, contentTopics)
let topicMap =
if topicMapRes.isErr():
error "can't get shard", error = topicMapRes.error
return err(FilterSubscribeError.badResponse("can't get shard"))
else: topicMapRes.get()
var futures = collect(newSeq):
for pubsub, topics in topicMap.pairs:
info "deregistering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId
let content = topics.mapIt($it)
node.wakuFilterClient.unsubscribe(remotePeer, $pubsub, content)
let finished = await allFinished(futures)
var unsubRes: FilterSubscribeResult = FilterSubscribeResult.ok()
for fut in finished:
let res = fut.read()
if res.isErr():
error "failed filter unsubscription", error=res.error
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
unsubRes = FilterSubscribeResult.err(res.error)
for pubsub, topics in topicMap.pairs:
info "unsubscribed from topic", pubsubTopic=pubsub, contentTopics=topics
# return the last error or ok
return unsubRes
proc filterUnsubscribeAll*(node: WakuNode,
peer: RemotePeerInfo|string):
Future[FilterSubscribeResult]
{.async, gcsafe, raises: [Defect, ValueError].} =
## Unsubscribe from a content filter V2".
if node.wakuFilterClientLegacy.isNil():
error "cannot unregister filter subscription to content", error="waku filter client is nil"
return err(FilterSubscribeError.serviceUnavailable())
let remotePeerRes = parsePeerInfo(peer)
if remotePeerRes.isErr():
error "couldn't parse remotePeerInfo", error = remotePeerRes.error
return err(FilterSubscribeError.serviceUnavailable("No peers available"))
let remotePeer = remotePeerRes.value
info "deregistering all filter subscription to content", peer=remotePeer.peerId
let unsubRes = await node.wakuFilterClient.unsubscribeAll(remotePeer)
if unsubRes.isOk():
info "unsubscribed from all content-topic", peerId=remotePeer.peerId
else:
error "failed filter unsubscription from all content-topic", error=unsubRes.error
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
return unsubRes
# NOTICE: subscribe / unsubscribe methods are removed - they were already depricated
# yet incompatible to handle both type of filters - use specific filter registration instead
## Waku archive ## Waku archive
const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes

View File

@ -17,8 +17,8 @@ import
libp2p/multicodec, libp2p/multicodec,
libp2p/peerid, libp2p/peerid,
libp2p/peerinfo, libp2p/peerinfo,
libp2p/routing_record libp2p/routing_record,
json_serialization
type type
Connectedness* = enum Connectedness* = enum
@ -62,6 +62,9 @@ type RemotePeerInfo* = ref object
func `$`*(remotePeerInfo: RemotePeerInfo): string = func `$`*(remotePeerInfo: RemotePeerInfo): string =
$remotePeerInfo.peerId $remotePeerInfo.peerId
proc writeValue*(w: var JsonWriter, value: RemotePeerInfo) {.inline, raises: [IOError].} =
w.writeValue $value
proc init*( proc init*(
T: typedesc[RemotePeerInfo], T: typedesc[RemotePeerInfo],
peerId: PeerID, peerId: PeerID,

View File

@ -20,7 +20,6 @@ import
./protocol, ./protocol,
./protocol_metrics ./protocol_metrics
logScope: logScope:
topics = "waku filter client" topics = "waku filter client"
@ -71,7 +70,7 @@ proc initProtocolHandler(wf: WakuFilterClientLegacy) =
wf.handleMessagePush(peerId, requestId, push) wf.handleMessagePush(peerId, requestId, push)
wf.handler = handle wf.handler = handle
wf.codec = WakuFilterCodec wf.codec = WakuLegacyFilterCodec
proc new*(T: type WakuFilterClientLegacy, proc new*(T: type WakuFilterClientLegacy,
peerManager: PeerManager, peerManager: PeerManager,
@ -87,7 +86,7 @@ proc new*(T: type WakuFilterClientLegacy,
proc sendFilterRpc(wf: WakuFilterClientLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= proc sendFilterRpc(wf: WakuFilterClientLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}=
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) let connOpt = await wf.peerManager.dialPeer(peer, WakuLegacyFilterCodec)
if connOpt.isNone(): if connOpt.isNone():
return err(dialFailure) return err(dialFailure)
let connection = connOpt.get() let connection = connOpt.get()
@ -155,6 +154,8 @@ proc unsubscribe*(wf: WakuFilterClientLegacy,
if sendRes.isErr(): if sendRes.isErr():
return err(sendRes.error) return err(sendRes.error)
# FIXME: I see an issue here that such solution prevents filtering client to properly manage its
# subscriptions on different peers and get notified correctly!
for topic in topics: for topic in topics:
wf.subManager.removeSubscription(pubsubTopic, topic) wf.subManager.removeSubscription(pubsubTopic, topic)

View File

@ -20,7 +20,7 @@ logScope:
const const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" WakuLegacyFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
WakuFilterTimeout: Duration = 2.hours WakuFilterTimeout: Duration = 2.hours
@ -60,8 +60,6 @@ proc removeSubscription(subscriptions: var seq[Subscription], peer: PeerId, unsu
## Protocol ## Protocol
type type
MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
WakuFilterLegacy* = ref object of LPProtocol WakuFilterLegacy* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager peerManager*: PeerManager
@ -110,7 +108,7 @@ proc initProtocolHandler(wf: WakuFilterLegacy) =
wf.handleFilterRequest(conn.peerId, rpc) wf.handleFilterRequest(conn.peerId, rpc)
wf.handler = handler wf.handler = handler
wf.codec = WakuFilterCodec wf.codec = WakuLegacyFilterCodec
proc new*(T: type WakuFilterLegacy, proc new*(T: type WakuFilterLegacy,
peerManager: PeerManager, peerManager: PeerManager,
@ -131,7 +129,7 @@ proc init*(T: type WakuFilterLegacy,
proc sendFilterRpc(wf: WakuFilterLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= proc sendFilterRpc(wf: WakuFilterLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}=
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) let connOpt = await wf.peerManager.dialPeer(peer, WakuLegacyFilterCodec)
if connOpt.isNone(): if connOpt.isNone():
return err(dialFailure) return err(dialFailure)
let connection = connOpt.get() let connection = connOpt.get()

View File

@ -23,19 +23,21 @@ logScope:
topics = "waku filter client" topics = "waku filter client"
type type
MessagePushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.}
WakuFilterClient* = ref object of LPProtocol WakuFilterClient* = ref object of LPProtocol
rng: ref HmacDrbgContext rng: ref HmacDrbgContext
messagePushHandler: MessagePushHandler
peerManager: PeerManager peerManager: PeerManager
pushHandlers: seq[FilterPushHandler]
func generateRequestId(rng: ref HmacDrbgContext): string = func generateRequestId(rng: ref HmacDrbgContext): string =
var bytes: array[10, byte] var bytes: array[10, byte]
hmacDrbgGenerate(rng[], bytes) hmacDrbgGenerate(rng[], bytes)
return toHex(bytes) return toHex(bytes)
proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, filterSubscribeRequest: FilterSubscribeRequest): Future[FilterSubscribeResult] {.async.} = proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo,
trace "Sending filter subscribe request", servicePeer, filterSubscribeRequest filterSubscribeRequest: FilterSubscribeRequest):
Future[FilterSubscribeResult]
{.async.} =
trace "Sending filter subscribe request", peerId=servicePeer.peerId, filterSubscribeRequest
let connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec) let connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec)
if connOpt.isNone(): if connOpt.isNone():
@ -77,7 +79,13 @@ proc ping*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSub
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} = proc subscribe*(wfc: WakuFilterClient,
servicePeer: RemotePeerInfo,
pubsubTopic: PubsubTopic,
contentTopics: seq[ContentTopic]):
Future[FilterSubscribeResult]
{.async.} =
let requestId = generateRequestId(wfc.rng) let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.subscribe( let filterSubscribeRequest = FilterSubscribeRequest.subscribe(
requestId = requestId, requestId = requestId,
@ -87,7 +95,13 @@ proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic:
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} = proc unsubscribe*(wfc: WakuFilterClient,
servicePeer: RemotePeerInfo,
pubsubTopic: PubsubTopic,
contentTopics: seq[ContentTopic]):
Future[FilterSubscribeResult]
{.async.} =
let requestId = generateRequestId(wfc.rng) let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe( let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe(
requestId = requestId, requestId = requestId,
@ -97,7 +111,10 @@ proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopi
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSubscribeResult] {.async.} = proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo):
Future[FilterSubscribeResult]
{.async.} =
let requestId = generateRequestId(wfc.rng) let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribeAll( let filterSubscribeRequest = FilterSubscribeRequest.unsubscribeAll(
requestId = requestId requestId = requestId
@ -105,6 +122,9 @@ proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc registerPushHandler*(wfc: WakuFilterClient, handler: FilterPushHandler) =
wfc.pushHandlers.add(handler)
proc initProtocolHandler(wfc: WakuFilterClient) = proc initProtocolHandler(wfc: WakuFilterClient) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
@ -119,7 +139,9 @@ proc initProtocolHandler(wfc: WakuFilterClient) =
let messagePush = decodeRes.value #TODO: toAPI() split here let messagePush = decodeRes.value #TODO: toAPI() split here
trace "Received message push", peerId=conn.peerId, messagePush trace "Received message push", peerId=conn.peerId, messagePush
wfc.messagePushHandler(messagePush.pubsubTopic, messagePush.wakuMessage) for handler in wfc.pushHandlers:
asyncSpawn handler(messagePush.pubsubTopic,
messagePush.wakuMessage)
# Protocol specifies no response for now # Protocol specifies no response for now
return return
@ -128,14 +150,14 @@ proc initProtocolHandler(wfc: WakuFilterClient) =
wfc.codec = WakuFilterPushCodec wfc.codec = WakuFilterPushCodec
proc new*(T: type WakuFilterClient, proc new*(T: type WakuFilterClient,
rng: ref HmacDrbgContext, peerManager: PeerManager,
messagePushHandler: MessagePushHandler, rng: ref HmacDrbgContext
peerManager: PeerManager): T = ): T =
let wfc = WakuFilterClient( let wfc = WakuFilterClient(
rng: rng, rng: rng,
messagePushHandler: messagePushHandler, peerManager: peerManager,
peerManager: peerManager pushHandlers: @[]
) )
wfc.initProtocolHandler() wfc.initProtocolHandler()
wfc wfc

View File

@ -4,6 +4,7 @@ else:
{.push raises: [].} {.push raises: [].}
import import
json_serialization,
std/options std/options
import import
../waku_core ../waku_core
@ -70,3 +71,29 @@ proc ok*(T: type FilterSubscribeResponse, requestId: string, desc = "OK"): T =
statusCode: 200, statusCode: 200,
statusDesc: some(desc) statusDesc: some(desc)
) )
proc `$`*(err: FilterSubscribeResponse): string =
"FilterSubscribeResponse of req:" & err.requestId & " [" & $err.statusCode & "] " & $err.statusDesc
proc `$`*(req: FilterSubscribeRequest): string =
"FilterSubscribeRequest of req:" & req.requestId & " [" & $req.filterSubscribeType & "] " & $req.pubsubTopic
proc `$`*(t: FilterSubscribeType): string =
result = case t:
of SUBSCRIBER_PING: "SUBSCRIBER_PING"
of SUBSCRIBE: "SUBSCRIBE"
of UNSUBSCRIBE: "UNSUBSCRIBE"
of UNSUBSCRIBE_ALL: "UNSUBSCRIBE_ALL"
proc writeValue*(writer: var JsonWriter, value: FilterSubscribeRequest) {.inline, raises: [IOError].} =
writer.beginRecord()
writer.writeField("requestId", value.requestId)
writer.writeField("type", value.filterSubscribeType)
if value.pubsubTopic.isSome:
writer.writeField("pubsubTopic", value.pubsubTopic)
if value.contentTopics.len>0:
writer.writeField("contentTopics", value.contentTopics)
writer.endRecord()