mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-03 22:43:09 +00:00
feat(message-cache): make the topic cache generic
This commit is contained in:
parent
791ce6d222
commit
ecf4ba1167
@ -9,6 +9,7 @@ import
|
||||
./v2/test_waku_payload,
|
||||
./v2/test_waku_swap,
|
||||
./v2/test_utils_pagination,
|
||||
./v2/test_message_cache,
|
||||
./v2/test_message_store_queue,
|
||||
./v2/test_message_store_queue_pagination,
|
||||
./v2/test_message_store_sqlite_query,
|
||||
@ -18,7 +19,6 @@ import
|
||||
./v2/test_rest_debug_api_serdes,
|
||||
./v2/test_rest_debug_api,
|
||||
./v2/test_rest_relay_api_serdes,
|
||||
./v2/test_rest_relay_api_topic_cache,
|
||||
./v2/test_rest_relay_api,
|
||||
./v2/test_peer_manager,
|
||||
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
|
||||
|
||||
@ -1,17 +1,12 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/tables,
|
||||
stew/byteutils,
|
||||
stew/shims/net,
|
||||
chronicles,
|
||||
stew/[results, byteutils],
|
||||
testutils/unittests,
|
||||
presto,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/pubsub/pubsub
|
||||
chronicles
|
||||
import
|
||||
../../waku/v2/protocol/waku_message,
|
||||
../../waku/v2/node/rest/relay/topic_cache
|
||||
../../waku/v2/node/message_cache
|
||||
|
||||
|
||||
proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMessage =
|
||||
@ -22,12 +17,16 @@ proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMess
|
||||
timestamp: 2022
|
||||
)
|
||||
|
||||
type PubsubTopicString = string
|
||||
|
||||
suite "TopicCache":
|
||||
type TestMessageCache = MessageCache[(PubsubTopicString, ContentTopic)]
|
||||
|
||||
|
||||
suite "MessageCache":
|
||||
test "subscribe to topic":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let cache = TopicCache.init()
|
||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
||||
let cache = TestMessageCache.init()
|
||||
|
||||
## When
|
||||
cache.subscribe(testTopic)
|
||||
@ -39,8 +38,8 @@ suite "TopicCache":
|
||||
|
||||
test "unsubscribe from topic":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let cache = TopicCache.init()
|
||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
||||
let cache = TestMessageCache.init()
|
||||
|
||||
# Init cache content
|
||||
cache.subscribe(testTopic)
|
||||
@ -55,9 +54,9 @@ suite "TopicCache":
|
||||
|
||||
test "get messages of a subscribed topic":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
||||
let testMessage = fakeWakuMessage()
|
||||
let cache = TopicCache.init()
|
||||
let cache = TestMessageCache.init()
|
||||
|
||||
# Init cache content
|
||||
cache.subscribe(testTopic)
|
||||
@ -74,9 +73,9 @@ suite "TopicCache":
|
||||
|
||||
test "get messages with clean flag shoud clear the messages cache":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
||||
let testMessage = fakeWakuMessage()
|
||||
let cache = TopicCache.init()
|
||||
let cache = TestMessageCache.init()
|
||||
|
||||
# Init cache content
|
||||
cache.subscribe(testTopic)
|
||||
@ -96,8 +95,8 @@ suite "TopicCache":
|
||||
|
||||
test "get messages of a non-subscribed topic":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let cache = TopicCache.init()
|
||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
||||
let cache = TestMessageCache.init()
|
||||
|
||||
## When
|
||||
let res = cache.getMessages(testTopic)
|
||||
@ -110,9 +109,9 @@ suite "TopicCache":
|
||||
|
||||
test "add messages to subscribed topic":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
||||
let testMessage = fakeWakuMessage()
|
||||
let cache = TopicCache.init()
|
||||
let cache = TestMessageCache.init()
|
||||
|
||||
cache.subscribe(testTopic)
|
||||
|
||||
@ -127,9 +126,9 @@ suite "TopicCache":
|
||||
|
||||
test "add messages to non-subscribed topic":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
||||
let testMessage = fakeWakuMessage()
|
||||
let cache = TopicCache.init()
|
||||
let cache = TestMessageCache.init()
|
||||
|
||||
## When
|
||||
cache.addMessage(testTopic, testMessage)
|
||||
@ -143,14 +142,14 @@ suite "TopicCache":
|
||||
|
||||
test "add messages beyond the capacity":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
||||
let testMessages = @[
|
||||
fakeWakuMessage(toBytes("MSG-1")),
|
||||
fakeWakuMessage(toBytes("MSG-2")),
|
||||
fakeWakuMessage(toBytes("MSG-3"))
|
||||
]
|
||||
|
||||
let cache = TopicCache.init(conf=TopicCacheConfig(capacity: 2))
|
||||
let cache = TestMessageCache.init(capacity = 2)
|
||||
cache.subscribe(testTopic)
|
||||
|
||||
## When
|
||||
@ -290,7 +290,7 @@ type
|
||||
defaultValue: 8645
|
||||
name: "rest-port" }: uint16
|
||||
|
||||
restRelayCacheCapaciy* {.
|
||||
restRelayCacheCapacity* {.
|
||||
desc: "Capacity of the Relay REST API message cache.",
|
||||
defaultValue: 30
|
||||
name: "rest-relay-cache-capacity" }: uint32
|
||||
|
||||
76
waku/v2/node/message_cache.nim
Normal file
76
waku/v2/node/message_cache.nim
Normal file
@ -0,0 +1,76 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[tables, sequtils],
|
||||
stew/results,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/protocols/pubsub
|
||||
import
|
||||
../protocol/waku_message
|
||||
|
||||
logScope: topics = "message_cache"
|
||||
|
||||
const DefaultMessageCacheCapacity*: uint = 30 # Max number of messages cached per topic @TODO make this configurable
|
||||
|
||||
|
||||
type MessageCacheResult*[T] = Result[T, cstring]
|
||||
|
||||
type MessageCache*[K] = ref object
|
||||
capacity: uint
|
||||
table: Table[K, seq[WakuMessage]]
|
||||
|
||||
func init*[K](T: type MessageCache[K], capacity=DefaultMessageCacheCapacity): T =
|
||||
MessageCache[K](
|
||||
capacity: capacity,
|
||||
table: initTable[K, seq[WakuMessage]]()
|
||||
)
|
||||
|
||||
|
||||
proc isSubscribed*[K](t: MessageCache[K], topic: K): bool =
|
||||
t.table.hasKey(topic)
|
||||
|
||||
proc subscribe*[K](t: MessageCache[K], topic: K) =
|
||||
if t.isSubscribed(topic):
|
||||
return
|
||||
t.table[topic] = @[]
|
||||
|
||||
proc unsubscribe*[K](t: MessageCache[K], topic: K) =
|
||||
if not t.isSubscribed(topic):
|
||||
return
|
||||
t.table.del(topic)
|
||||
|
||||
|
||||
proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) =
|
||||
if not t.isSubscribed(topic):
|
||||
return
|
||||
|
||||
# Make a copy of msgs for this topic to modify
|
||||
var messages = t.table.getOrDefault(topic, @[])
|
||||
|
||||
if messages.len >= t.capacity.int:
|
||||
debug "Topic cache capacity reached", topic=topic
|
||||
# Message cache on this topic exceeds maximum. Delete oldest.
|
||||
# TODO: this may become a bottle neck if called as the norm rather than
|
||||
# exception when adding messages. Performance profile needed.
|
||||
messages.delete(0,0)
|
||||
|
||||
messages.add(msg)
|
||||
|
||||
# Replace indexed entry with copy
|
||||
t.table[topic] = messages
|
||||
|
||||
proc clearMessages*[K](t: MessageCache[K], topic: K) =
|
||||
if not t.isSubscribed(topic):
|
||||
return
|
||||
t.table[topic] = @[]
|
||||
|
||||
proc getMessages*[K](t: MessageCache[K], topic: K, clear=false): MessageCacheResult[seq[WakuMessage]] =
|
||||
if not t.isSubscribed(topic):
|
||||
return err("Not subscribed to topic")
|
||||
|
||||
let messages = t.table.getOrDefault(topic, @[])
|
||||
if clear:
|
||||
t.clearMessages(topic)
|
||||
|
||||
ok(messages)
|
||||
@ -1,98 +1,35 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[tables, sequtils],
|
||||
stew/results,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/protocols/pubsub
|
||||
import
|
||||
../../../protocol/waku_message
|
||||
../../../protocol/waku_message,
|
||||
../../message_cache
|
||||
|
||||
logScope: topics = "rest_api_relay_topiccache"
|
||||
|
||||
const DEFAULT_TOPICCACHE_CAPACITY* = 30 # Max number of messages cached per topic @TODO make this configurable
|
||||
export message_cache
|
||||
|
||||
|
||||
##### TopicCache
|
||||
|
||||
type PubSubTopicString = string
|
||||
|
||||
type TopicCacheResult*[T] = Result[T, cstring]
|
||||
type TopicCacheResult*[T] = MessageCacheResult[T]
|
||||
|
||||
type TopicCache* = MessageCache[PubSubTopicString]
|
||||
|
||||
|
||||
##### Message handler
|
||||
|
||||
type TopicCacheMessageHandler* = Topichandler
|
||||
|
||||
|
||||
type TopicCacheConfig* = object
|
||||
capacity*: int
|
||||
|
||||
proc default*(T: type TopicCacheConfig): T =
|
||||
TopicCacheConfig(
|
||||
capacity: DEFAULT_TOPICCACHE_CAPACITY
|
||||
)
|
||||
|
||||
|
||||
type TopicCache* = ref object
|
||||
conf: TopicCacheConfig
|
||||
table: Table[PubSubTopicString, seq[WakuMessage]]
|
||||
|
||||
func init*(T: type TopicCache, conf=TopicCacheConfig.default()): T =
|
||||
TopicCache(
|
||||
conf: conf,
|
||||
table: initTable[PubSubTopicString, seq[WakuMessage]]()
|
||||
)
|
||||
|
||||
|
||||
proc isSubscribed*(t: TopicCache, topic: PubSubTopicString): bool =
|
||||
t.table.hasKey(topic)
|
||||
|
||||
proc subscribe*(t: TopicCache, topic: PubSubTopicString) =
|
||||
if t.isSubscribed(topic):
|
||||
return
|
||||
t.table[topic] = @[]
|
||||
|
||||
proc unsubscribe*(t: TopicCache, topic: PubSubTopicString) =
|
||||
if not t.isSubscribed(topic):
|
||||
return
|
||||
t.table.del(topic)
|
||||
|
||||
|
||||
proc addMessage*(t: TopicCache, topic: PubSubTopicString, msg: WakuMessage) =
|
||||
if not t.isSubscribed(topic):
|
||||
return
|
||||
|
||||
# Make a copy of msgs for this topic to modify
|
||||
var messages = t.table.getOrDefault(topic, @[])
|
||||
|
||||
if messages.len >= t.conf.capacity:
|
||||
debug "Topic cache capacity reached", topic=topic
|
||||
# Message cache on this topic exceeds maximum. Delete oldest.
|
||||
# TODO: this may become a bottle neck if called as the norm rather than
|
||||
# exception when adding messages. Performance profile needed.
|
||||
messages.delete(0,0)
|
||||
|
||||
messages.add(msg)
|
||||
|
||||
# Replace indexed entry with copy
|
||||
t.table[topic] = messages
|
||||
|
||||
proc clearMessages*(t: TopicCache, topic: PubSubTopicString) =
|
||||
if not t.isSubscribed(topic):
|
||||
return
|
||||
t.table[topic] = @[]
|
||||
|
||||
proc getMessages*(t: TopicCache, topic: PubSubTopicString, clear=false): TopicCacheResult[seq[WakuMessage]] =
|
||||
if not t.isSubscribed(topic):
|
||||
return err("Not subscribed to topic")
|
||||
|
||||
let messages = t.table.getOrDefault(topic, @[])
|
||||
if clear:
|
||||
t.clearMessages(topic)
|
||||
|
||||
ok(messages)
|
||||
|
||||
|
||||
proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =
|
||||
|
||||
proc handler(topic: string, data: seq[byte]): Future[void] {.async, raises: [Defect].} =
|
||||
let handler = proc(topic: string, data: seq[byte]): Future[void] {.async, closure.} =
|
||||
trace "Topic handler triggered", topic=topic
|
||||
|
||||
# Add message to current cache
|
||||
|
||||
@ -9,8 +9,7 @@ import
|
||||
./wakunode2,
|
||||
./rest/server,
|
||||
./rest/debug/debug_api,
|
||||
./rest/relay/[relay_api,
|
||||
topic_cache]
|
||||
./rest/relay/[relay_api, topic_cache]
|
||||
|
||||
|
||||
logScope:
|
||||
@ -30,9 +29,7 @@ proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf:
|
||||
|
||||
## Relay REST API
|
||||
if conf.relay:
|
||||
# TODO: Simplify topic cache object initialization
|
||||
let relayCacheConfig = TopicCacheConfig(capacity: int(conf.restRelayCacheCapaciy))
|
||||
let relayCache = TopicCache.init(conf=relayCacheConfig)
|
||||
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
|
||||
installRelayApiHandlers(server.router, node, relayCache)
|
||||
|
||||
server.start()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user