mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-16 18:06:45 +00:00
83 lines
2.1 KiB
Nim
83 lines
2.1 KiB
Nim
when (NimMajor, NimMinor) < (1, 4):
|
|
{.push raises: [Defect].}
|
|
else:
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[tables, sequtils],
|
|
stew/results,
|
|
chronicles,
|
|
chronos,
|
|
libp2p/protocols/pubsub
|
|
import
|
|
../waku_core
|
|
|
|
logScope:
|
|
topics = "waku node message_cache"
|
|
|
|
const DefaultMessageCacheCapacity*: uint = 30 # Max number of messages cached per topic @TODO make this configurable
|
|
|
|
|
|
type MessageCacheResult*[T] = Result[T, cstring]
|
|
|
|
type MessageCache*[K] = ref object
|
|
capacity: uint
|
|
table: Table[K, seq[WakuMessage]]
|
|
|
|
func init*[K](T: type MessageCache[K], capacity=DefaultMessageCacheCapacity): T =
|
|
MessageCache[K](
|
|
capacity: capacity,
|
|
table: initTable[K, seq[WakuMessage]]()
|
|
)
|
|
|
|
|
|
proc isSubscribed*[K](t: MessageCache[K], topic: K): bool =
|
|
t.table.hasKey(topic)
|
|
|
|
proc subscribe*[K](t: MessageCache[K], topic: K) =
|
|
if t.isSubscribed(topic):
|
|
return
|
|
t.table[topic] = @[]
|
|
|
|
proc unsubscribe*[K](t: MessageCache[K], topic: K) =
|
|
if not t.isSubscribed(topic):
|
|
return
|
|
t.table.del(topic)
|
|
|
|
proc unsubscribeAll*[K](t: MessageCache[K]) =
|
|
t.table.clear()
|
|
|
|
proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) =
|
|
if not t.isSubscribed(topic):
|
|
return
|
|
|
|
# Make a copy of msgs for this topic to modify
|
|
var messages = t.table.getOrDefault(topic, @[])
|
|
|
|
if messages.len >= t.capacity.int:
|
|
trace "Topic cache capacity reached", topic=topic
|
|
# Message cache on this topic exceeds maximum. Delete oldest.
|
|
# TODO: this may become a bottle neck if called as the norm rather than
|
|
# exception when adding messages. Performance profile needed.
|
|
messages.delete(0,0)
|
|
|
|
messages.add(msg)
|
|
|
|
# Replace indexed entry with copy
|
|
t.table[topic] = messages
|
|
|
|
proc clearMessages*[K](t: MessageCache[K], topic: K) =
|
|
if not t.isSubscribed(topic):
|
|
return
|
|
t.table[topic] = @[]
|
|
|
|
proc getMessages*[K](t: MessageCache[K], topic: K, clear=false): MessageCacheResult[seq[WakuMessage]] =
|
|
if not t.isSubscribed(topic):
|
|
return err("Not subscribed to topic")
|
|
|
|
let messages = t.table.getOrDefault(topic, @[])
|
|
if clear:
|
|
t.clearMessages(topic)
|
|
|
|
ok(messages)
|