Added content topic polling on FilterAPI (#314)

This commit is contained in:
Hanno Cornelius 2020-12-03 18:26:34 +02:00 committed by GitHub
parent 8a1ca1ff8f
commit 57f69201d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 102 additions and 6 deletions

View File

@ -1,5 +1,5 @@
import
std/[unittest, options, sets, tables, os, strutils],
std/[unittest, options, sets, tables, os, strutils, sequtils],
stew/shims/net as stewNet,
json_rpc/[rpcserver, rpcclient],
libp2p/standard_setup,
@ -13,6 +13,7 @@ import
../../waku/v2/node/wakunode2,
../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api,relay_api,debug_api,filter_api],
../../waku/v2/protocol/message_notifier,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_store/waku_store,
../test_helpers
@ -281,3 +282,64 @@ procSuite "Waku v2 JSON-RPC API":
server.stop()
server.close()
waitfor node.stop()
asyncTest "Filter API: get latest messages":
const cTopic = ContentTopic(1)
waitFor node.start()
# RPC server setup
let
rpcPort = Port(8545)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installFilterApiHandlers(node, server)
server.start()
node.mountFilter()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort)
# First ensure subscription exists
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(topics: @[cTopic])], topic = some(defaultTopic))
check:
sub
# Now prime the node with some messages before tests
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)),
WakuMessage(payload: @[byte 1], contentTopic: cTopic),
WakuMessage(payload: @[byte 2], contentTopic: cTopic),
WakuMessage(payload: @[byte 3], contentTopic: cTopic),
WakuMessage(payload: @[byte 4], contentTopic: cTopic),
WakuMessage(payload: @[byte 5], contentTopic: cTopic),
WakuMessage(payload: @[byte 6], contentTopic: cTopic),
WakuMessage(payload: @[byte 7], contentTopic: cTopic),
WakuMessage(payload: @[byte 8], contentTopic: cTopic),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))]
let
filters = node.filters
requestId = toSeq(Table(filters).keys)[0]
for wakuMsg in msgList:
filters.notify(wakuMsg, requestId)
var response = await client.get_waku_v2_filter_v1_messages(cTopic)
check:
response.len() == 8
response.allIt(it.contentTopic == cTopic)
# No new messages
response = await client.get_waku_v2_filter_v1_messages(cTopic)
check:
response.len() == 0
server.stop()
server.close()
waitfor node.stop()

View File

@ -1,30 +1,58 @@
{.push raises: [Exception, Defect].}
import
std/[tables,sequtils],
json_rpc/rpcserver,
eth/[common, rlp, keys, p2p],
../../waku_types,
../wakunode2
const futTimeout = 5.seconds
proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
const futTimeout = 5.seconds
## Create a message cache indexed on content topic
## @TODO consider moving message cache elsewhere. Perhaps to node?
var
messageCache = initTable[ContentTopic, seq[WakuMessage]]()
proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} =
debug "WakuMessage received", msg=msg
# Add message to current cache
# @TODO limit max content topics and messages
messageCache.mgetOrPut(msg.contentTopic, @[]).add(msg)
## Filter API version 1 definitions
rpcsrv.rpc("get_waku_v2_filter_v1_messages") do(contentTopic: ContentTopic) -> seq[WakuMessage]:
## Returns all WakuMessages received on a content topic since the
## last time this method was called
## @TODO ability to specify a return message limit
debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic
if messageCache.hasKey(contentTopic):
let msgs = messageCache[contentTopic]
# Clear cache before next call
messageCache[contentTopic] = @[]
return msgs
else:
# Not subscribed to this content topic
raise newException(ValueError, "Not subscribed to content topic: " & $contentTopic)
rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
## Subscribes a node to a list of content filters
debug "post_waku_v2_filter_v1_subscription"
proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} =
debug "WakuMessage received", msg=msg, topic=topic
# @TODO handle message
# Construct a filter request
# @TODO use default PubSub topic if undefined
let fReq = if topic.isSome: FilterRequest(topic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true)
if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)):
# Successfully subscribed to all content filters
for cTopic in concat(contentFilters.mapIt(it.topics)):
# Create message cache for each subscribed content topic
messageCache[cTopic] = @[]
return true
else:
# Failed to subscribe to one or more content filters
@ -40,6 +68,11 @@ proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
if (await node.unsubscribe(fReq).withTimeout(futTimeout)):
# Successfully unsubscribed from all content filters
for cTopic in concat(contentFilters.mapIt(it.topics)):
# Remove message cache for each unsubscribed content topic
messageCache.del(cTopic)
return true
else:
# Failed to unsubscribe from one or more content filters

View File

@ -15,5 +15,6 @@ proc get_waku_v2_store_v1_messages(topics: seq[ContentTopic], pagingOptions: Opt
# Filter API
proc get_waku_v2_filter_v1_messages(contentTopic: ContentTopic): seq[WakuMessage]
proc post_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool
proc delete_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool