From fa4d873ee3c9257e9033217cf0e572835baf1a63 Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Mon, 7 Dec 2020 15:26:58 +0200 Subject: [PATCH] Limit cache size on Filter API and Relay API (#317) Minor improvements to Filter API and Relay API --- tests/v2/test_jsonrpc_waku.nim | 22 +++++++++++++++++++--- waku/v2/node/jsonrpc/filter_api.nim | 25 ++++++++++++++++++++----- waku/v2/node/jsonrpc/relay_api.nim | 27 +++++++++++++++++++++------ 3 files changed, 60 insertions(+), 14 deletions(-) diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 76c2b5c51..c6b23a46c 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -334,12 +334,28 @@ procSuite "Waku v2 JSON-RPC API": response.allIt(it.contentTopic == cTopic) # No new messages - response = await client.get_waku_v2_filter_v1_messages(cTopic) check: response.len() == 0 - + + # Now ensure that no more than the preset max messages can be cached + + let maxSize = filter_api.maxCache + + for x in 1..(maxSize + 1): + # Try to cache 1 more than maximum allowed + filters.notify(WakuMessage(payload: @[byte x], contentTopic: cTopic), requestId) + + response = await client.get_waku_v2_filter_v1_messages(cTopic) + check: + # Max messages has not been exceeded + response.len == maxSize + response.allIt(it.contentTopic == cTopic) + # Check that oldest item has been removed + response[0].payload == @[byte 2] + response[maxSize - 1].payload == @[byte (maxSize + 1)] + server.stop() server.close() - waitfor node.stop() \ No newline at end of file + waitfor node.stop() diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim index 90ba0db2b..65b36269d 100644 --- a/waku/v2/node/jsonrpc/filter_api.nim +++ b/waku/v2/node/jsonrpc/filter_api.nim @@ -7,19 +7,34 @@ import ../../waku_types, ../wakunode2 -const futTimeout = 5.seconds +const futTimeout* = 5.seconds # Max time to wait for futures +const maxCache* = 100 # Max number of messages cached per topic @TODO make this configurable + +type + MessageCache* = Table[ContentTopic, seq[WakuMessage]] proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Create a message cache indexed on content topic ## @TODO consider moving message cache elsewhere. Perhaps to node? var - messageCache = initTable[ContentTopic, seq[WakuMessage]]() + messageCache: MessageCache proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} = - debug "WakuMessage received", msg=msg # Add message to current cache - # @TODO limit max content topics and messages - messageCache.mgetOrPut(msg.contentTopic, @[]).add(msg) + trace "WakuMessage received", msg=msg + + # Make a copy of msgs for this topic to modify + var msgs = messageCache.getOrDefault(msg.contentTopic, @[]) + + if msgs.len >= maxCache: + # Message cache on this topic exceeds maximum. Delete oldest. + # @TODO this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed. + msgs.delete(0,0) + msgs.add(msg) + + # Replace indexed entry with copy + # @TODO max number of content topics could be limited in node + messageCache[msg.contentTopic] = msgs ## Filter API version 1 definitions diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim index b346ca5c3..3c04ee178 100644 --- a/waku/v2/node/jsonrpc/relay_api.nim +++ b/waku/v2/node/jsonrpc/relay_api.nim @@ -9,21 +9,36 @@ import ../wakunode2, ./jsonrpc_types, ./jsonrpc_utils -const futTimeout = 5.seconds +const futTimeout* = 5.seconds # Max time to wait for futures +const maxCache* = 100 # Max number of messages cached per topic @TODO make this configurable + +type + TopicCache* = Table[string, seq[WakuMessage]] proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Create a per-topic message cache var - topicCache = initTable[string, seq[WakuMessage]]() + topicCache: TopicCache proc topicHandler(topic: string, data: seq[byte]) {.async.} = - debug "Topic handler triggered" + trace "Topic handler triggered" let msg = WakuMessage.init(data) if msg.isOk(): - debug "WakuMessage received", msg=msg, topic=topic # Add message to current cache - # @TODO limit max topics and messages - topicCache.mgetOrPut(topic, @[]).add(msg[]) + trace "WakuMessage received", msg=msg, topic=topic + + # Make a copy of msgs for this topic to modify + var msgs = topicCache.getOrDefault(topic, @[]) + + if msgs.len >= maxCache: + # Message cache on this topic exceeds maximum. Delete oldest. + # @TODO this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed. + msgs.delete(0,0) + msgs.add(msg[]) + + # Replace indexed entry with copy + # @TODO max number of topics could be limited in node + topicCache[topic] = msgs else: debug "WakuMessage received but failed to decode", msg=msg, topic=topic # @TODO handle message decode failure