mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-26 06:41:20 +00:00
dac072f843
Filter v2 rest api support implemented Filter rest api documentation updated with v1 and v2 interface support. Separated legacy filter rest interface Fix code and tests of v2 Filter rest api Filter v2 message push test added Applied autoshard to Filter V2 Redesigned FilterPushHandling, code style, catch up apps and tests with filter v2 interface changes Rename of FilterV1SubscriptionsRequest to FilterLegacySubscribeRequest, fix broken chat2 app, fix tests Changed Filter v2 push handler subscription to simple register Separate node's filterUnsubscribe and filterUnsubscribeAll
83 lines
2.1 KiB
Nim
83 lines
2.1 KiB
Nim
when (NimMajor, NimMinor) < (1, 4):
|
|
{.push raises: [Defect].}
|
|
else:
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[tables, sequtils],
|
|
stew/results,
|
|
chronicles,
|
|
chronos,
|
|
libp2p/protocols/pubsub
|
|
import
|
|
../waku_core
|
|
|
|
logScope:
|
|
topics = "waku node message_cache"
|
|
|
|
const DefaultMessageCacheCapacity*: uint = 30 # Max number of messages cached per topic @TODO make this configurable
|
|
|
|
|
|
type MessageCacheResult*[T] = Result[T, cstring]
|
|
|
|
type MessageCache*[K] = ref object
|
|
capacity: uint
|
|
table: Table[K, seq[WakuMessage]]
|
|
|
|
func init*[K](T: type MessageCache[K], capacity=DefaultMessageCacheCapacity): T =
|
|
MessageCache[K](
|
|
capacity: capacity,
|
|
table: initTable[K, seq[WakuMessage]]()
|
|
)
|
|
|
|
|
|
proc isSubscribed*[K](t: MessageCache[K], topic: K): bool =
|
|
t.table.hasKey(topic)
|
|
|
|
proc subscribe*[K](t: MessageCache[K], topic: K) =
|
|
if t.isSubscribed(topic):
|
|
return
|
|
t.table[topic] = @[]
|
|
|
|
proc unsubscribe*[K](t: MessageCache[K], topic: K) =
|
|
if not t.isSubscribed(topic):
|
|
return
|
|
t.table.del(topic)
|
|
|
|
proc unsubscribeAll*[K](t: MessageCache[K]) =
|
|
t.table.clear()
|
|
|
|
proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) =
|
|
if not t.isSubscribed(topic):
|
|
return
|
|
|
|
# Make a copy of msgs for this topic to modify
|
|
var messages = t.table.getOrDefault(topic, @[])
|
|
|
|
if messages.len >= t.capacity.int:
|
|
trace "Topic cache capacity reached", topic=topic
|
|
# Message cache on this topic exceeds maximum. Delete oldest.
|
|
# TODO: this may become a bottle neck if called as the norm rather than
|
|
# exception when adding messages. Performance profile needed.
|
|
messages.delete(0,0)
|
|
|
|
messages.add(msg)
|
|
|
|
# Replace indexed entry with copy
|
|
t.table[topic] = messages
|
|
|
|
proc clearMessages*[K](t: MessageCache[K], topic: K) =
|
|
if not t.isSubscribed(topic):
|
|
return
|
|
t.table[topic] = @[]
|
|
|
|
proc getMessages*[K](t: MessageCache[K], topic: K, clear=false): MessageCacheResult[seq[WakuMessage]] =
|
|
if not t.isSubscribed(topic):
|
|
return err("Not subscribed to topic")
|
|
|
|
let messages = t.table.getOrDefault(topic, @[])
|
|
if clear:
|
|
t.clearMessages(topic)
|
|
|
|
ok(messages)
|