deploy: 798c20454fa7740be543576caea63b2a3e7b0792

This commit is contained in:
LNSD 2022-08-30 14:32:22 +00:00
parent 6cc2335a48
commit b1c57536ac
6 changed files with 116 additions and 107 deletions

View File

@ -9,6 +9,7 @@ import
./v2/test_waku_payload,
./v2/test_waku_swap,
./v2/test_utils_pagination,
./v2/test_message_cache,
./v2/test_message_store_queue,
./v2/test_message_store_queue_pagination,
./v2/test_message_store_sqlite_query,
@ -18,7 +19,6 @@ import
./v2/test_rest_debug_api_serdes,
./v2/test_rest_debug_api,
./v2/test_rest_relay_api_serdes,
./v2/test_rest_relay_api_topic_cache,
./v2/test_rest_relay_api,
./v2/test_peer_manager,
./v2/test_web3, # TODO remove it when rln-relay tests get finalized

View File

@ -1,17 +1,12 @@
{.used.}
import
std/tables,
stew/byteutils,
stew/shims/net,
chronicles,
stew/[results, byteutils],
testutils/unittests,
presto,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub
chronicles
import
../../waku/v2/protocol/waku_message,
../../waku/v2/node/rest/relay/topic_cache
../../waku/v2/node/message_cache
proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMessage =
@ -22,12 +17,16 @@ proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMess
timestamp: 2022
)
type PubsubTopicString = string
suite "TopicCache":
type TestMessageCache = MessageCache[(PubsubTopicString, ContentTopic)]
suite "MessageCache":
test "subscribe to topic":
## Given
let testTopic = "test-pubsub-topic"
let cache = TopicCache.init()
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()
## When
cache.subscribe(testTopic)
@ -39,8 +38,8 @@ suite "TopicCache":
test "unsubscribe from topic":
## Given
let testTopic = "test-pubsub-topic"
let cache = TopicCache.init()
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()
# Init cache content
cache.subscribe(testTopic)
@ -55,9 +54,9 @@ suite "TopicCache":
test "get messages of a subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
let cache = TestMessageCache.init()
# Init cache content
cache.subscribe(testTopic)
@ -74,9 +73,9 @@ suite "TopicCache":
test "get messages with clean flag shoud clear the messages cache":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
let cache = TestMessageCache.init()
# Init cache content
cache.subscribe(testTopic)
@ -96,8 +95,8 @@ suite "TopicCache":
test "get messages of a non-subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let cache = TopicCache.init()
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()
## When
let res = cache.getMessages(testTopic)
@ -110,9 +109,9 @@ suite "TopicCache":
test "add messages to subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
let cache = TestMessageCache.init()
cache.subscribe(testTopic)
@ -127,9 +126,9 @@ suite "TopicCache":
test "add messages to non-subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
let cache = TestMessageCache.init()
## When
cache.addMessage(testTopic, testMessage)
@ -143,14 +142,14 @@ suite "TopicCache":
test "add messages beyond the capacity":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessages = @[
fakeWakuMessage(toBytes("MSG-1")),
fakeWakuMessage(toBytes("MSG-2")),
fakeWakuMessage(toBytes("MSG-3"))
]
let cache = TopicCache.init(conf=TopicCacheConfig(capacity: 2))
let cache = TestMessageCache.init(capacity = 2)
cache.subscribe(testTopic)
## When

View File

@ -290,7 +290,7 @@ type
defaultValue: 8645
name: "rest-port" }: uint16
restRelayCacheCapaciy* {.
restRelayCacheCapacity* {.
desc: "Capacity of the Relay REST API message cache.",
defaultValue: 30
name: "rest-relay-cache-capacity" }: uint32

View File

@ -0,0 +1,76 @@
{.push raises: [Defect].}
import
std/[tables, sequtils],
stew/results,
chronicles,
chronos,
libp2p/protocols/pubsub
import
../protocol/waku_message
logScope: topics = "message_cache"
const DefaultMessageCacheCapacity*: uint = 30 # Max number of messages cached per topic @TODO make this configurable
type MessageCacheResult*[T] = Result[T, cstring]
type MessageCache*[K] = ref object
capacity: uint
table: Table[K, seq[WakuMessage]]
func init*[K](T: type MessageCache[K], capacity=DefaultMessageCacheCapacity): T =
MessageCache[K](
capacity: capacity,
table: initTable[K, seq[WakuMessage]]()
)
proc isSubscribed*[K](t: MessageCache[K], topic: K): bool =
t.table.hasKey(topic)
proc subscribe*[K](t: MessageCache[K], topic: K) =
if t.isSubscribed(topic):
return
t.table[topic] = @[]
proc unsubscribe*[K](t: MessageCache[K], topic: K) =
if not t.isSubscribed(topic):
return
t.table.del(topic)
proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) =
if not t.isSubscribed(topic):
return
# Make a copy of msgs for this topic to modify
var messages = t.table.getOrDefault(topic, @[])
if messages.len >= t.capacity.int:
debug "Topic cache capacity reached", topic=topic
# Message cache on this topic exceeds maximum. Delete oldest.
# TODO: this may become a bottle neck if called as the norm rather than
# exception when adding messages. Performance profile needed.
messages.delete(0,0)
messages.add(msg)
# Replace indexed entry with copy
t.table[topic] = messages
proc clearMessages*[K](t: MessageCache[K], topic: K) =
if not t.isSubscribed(topic):
return
t.table[topic] = @[]
proc getMessages*[K](t: MessageCache[K], topic: K, clear=false): MessageCacheResult[seq[WakuMessage]] =
if not t.isSubscribed(topic):
return err("Not subscribed to topic")
let messages = t.table.getOrDefault(topic, @[])
if clear:
t.clearMessages(topic)
ok(messages)

View File

@ -1,98 +1,35 @@
{.push raises: [Defect].}
import
std/[tables, sequtils],
stew/results,
chronicles,
chronos,
libp2p/protocols/pubsub
import
../../../protocol/waku_message
../../../protocol/waku_message,
../../message_cache
logScope: topics = "rest_api_relay_topiccache"
const DEFAULT_TOPICCACHE_CAPACITY* = 30 # Max number of messages cached per topic @TODO make this configurable
export message_cache
##### TopicCache
type PubSubTopicString = string
type TopicCacheResult*[T] = Result[T, cstring]
type TopicCacheResult*[T] = MessageCacheResult[T]
type TopicCache* = MessageCache[PubSubTopicString]
##### Message handler
type TopicCacheMessageHandler* = Topichandler
type TopicCacheConfig* = object
capacity*: int
proc default*(T: type TopicCacheConfig): T =
TopicCacheConfig(
capacity: DEFAULT_TOPICCACHE_CAPACITY
)
type TopicCache* = ref object
conf: TopicCacheConfig
table: Table[PubSubTopicString, seq[WakuMessage]]
func init*(T: type TopicCache, conf=TopicCacheConfig.default()): T =
TopicCache(
conf: conf,
table: initTable[PubSubTopicString, seq[WakuMessage]]()
)
proc isSubscribed*(t: TopicCache, topic: PubSubTopicString): bool =
t.table.hasKey(topic)
proc subscribe*(t: TopicCache, topic: PubSubTopicString) =
if t.isSubscribed(topic):
return
t.table[topic] = @[]
proc unsubscribe*(t: TopicCache, topic: PubSubTopicString) =
if not t.isSubscribed(topic):
return
t.table.del(topic)
proc addMessage*(t: TopicCache, topic: PubSubTopicString, msg: WakuMessage) =
if not t.isSubscribed(topic):
return
# Make a copy of msgs for this topic to modify
var messages = t.table.getOrDefault(topic, @[])
if messages.len >= t.conf.capacity:
debug "Topic cache capacity reached", topic=topic
# Message cache on this topic exceeds maximum. Delete oldest.
# TODO: this may become a bottle neck if called as the norm rather than
# exception when adding messages. Performance profile needed.
messages.delete(0,0)
messages.add(msg)
# Replace indexed entry with copy
t.table[topic] = messages
proc clearMessages*(t: TopicCache, topic: PubSubTopicString) =
if not t.isSubscribed(topic):
return
t.table[topic] = @[]
proc getMessages*(t: TopicCache, topic: PubSubTopicString, clear=false): TopicCacheResult[seq[WakuMessage]] =
if not t.isSubscribed(topic):
return err("Not subscribed to topic")
let messages = t.table.getOrDefault(topic, @[])
if clear:
t.clearMessages(topic)
ok(messages)
proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =
proc handler(topic: string, data: seq[byte]): Future[void] {.async, raises: [Defect].} =
let handler = proc(topic: string, data: seq[byte]): Future[void] {.async, closure.} =
trace "Topic handler triggered", topic=topic
# Add message to current cache

View File

@ -9,8 +9,7 @@ import
./wakunode2,
./rest/server,
./rest/debug/debug_api,
./rest/relay/[relay_api,
topic_cache]
./rest/relay/[relay_api, topic_cache]
logScope:
@ -30,9 +29,7 @@ proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf:
## Relay REST API
if conf.relay:
# TODO: Simplify topic cache object initialization
let relayCacheConfig = TopicCacheConfig(capacity: int(conf.restRelayCacheCapaciy))
let relayCache = TopicCache.init(conf=relayCacheConfig)
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
installRelayApiHandlers(server.router, node, relayCache)
server.start()