chore(rest): refactor message cache (#2221)

This commit is contained in:
Simon-Pierre Vivier 2023-11-28 07:21:41 -05:00 committed by GitHub
parent 9f4e6f453f
commit bebaa59c3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 588 additions and 313 deletions

View File

@ -229,11 +229,11 @@ when isMainModule:
# Install enabled API handlers:
if conf.relay:
let cache = MessageCache[string].init(capacity=30)
let cache = MessageCache.init(capacity=30)
installRelayApiHandlers(node, rpcServer, cache)
if conf.filter:
let messageCache = filter_api.MessageCache.init(capacity=30)
let messageCache = MessageCache.init(capacity=30)
installFilterApiHandlers(node, rpcServer, messageCache)
if conf.store:

View File

@ -687,18 +687,17 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
## Relay REST API
if conf.relay:
let cache = MessageCache[string].init(capacity=conf.restRelayCacheCapacity)
let cache = MessageCache.init(int(conf.restRelayCacheCapacity))
let handler = messageCacheHandler(cache)
let autoHandler = autoMessageCacheHandler(cache)
for pubsubTopic in conf.pubsubTopics:
cache.subscribe(pubsubTopic)
cache.pubsubSubscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
for contentTopic in conf.contentTopics:
cache.subscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))
cache.contentSubscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler))
installRelayApiHandlers(server.router, app.node, cache)
else:
@ -709,10 +708,10 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
app.node.wakuFilterClient != nil and
app.node.wakuFilterClientLegacy != nil:
let legacyFilterCache = rest_legacy_filter_api.MessageCache.init()
let legacyFilterCache = MessageCache.init()
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache)
let filterCache = rest_filter_api.MessageCache.init()
let filterCache = MessageCache.init()
let filterDiscoHandler =
if app.wakuDiscv5.isSome():
@ -765,23 +764,22 @@ proc startRpcServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNod
installDebugApiHandlers(app.node, server)
if conf.relay:
let cache = MessageCache[string].init(capacity=30)
let cache = MessageCache.init(capacity=50)
let handler = messageCacheHandler(cache)
let autoHandler = autoMessageCacheHandler(cache)
for pubsubTopic in conf.pubsubTopics:
cache.subscribe(pubsubTopic)
cache.pubsubSubscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
for contentTopic in conf.contentTopics:
cache.subscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))
cache.contentSubscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler))
installRelayApiHandlers(app.node, server, cache)
if conf.filternode != "":
let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30)
let filterMessageCache = MessageCache.init(capacity=50)
installFilterApiHandlers(app.node, server, filterMessageCache)
installStoreApiHandlers(app.node, server)

View File

@ -1,153 +1,217 @@
{.used.}
import
std/sets,
stew/[results, byteutils],
testutils/unittests,
chronicles
testutils/unittests
import
../../waku/waku_core,
../../waku/waku_api/message_cache,
./testlib/common,
./testlib/wakucore
type TestMessageCache = MessageCache[(PubsubTopic, ContentTopic)]
suite "MessageCache":
test "subscribe to topic":
setup:
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()
let capacity = 3
let testPubsubTopic = DefaultPubsubTopic
let testContentTopic = DefaultContentTopic
let cache = MessageCache.init(capacity)
test "subscribe to topic":
## When
cache.subscribe(testTopic)
cache.pubsubSubscribe(testPubsubTopic)
cache.pubsubSubscribe(testPubsubTopic)
# idempotence of subscribe is also tested
cache.contentSubscribe(testContentTopic)
cache.contentSubscribe(testContentTopic)
## Then
check:
cache.isSubscribed(testTopic)
cache.isPubsubSubscribed(testPubsubTopic)
cache.isContentSubscribed(testContentTopic)
cache.pubsubTopicCount() == 1
cache.contentTopicCount() == 1
test "unsubscribe from topic":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()
# Init cache content
cache.subscribe(testTopic)
cache.pubsubSubscribe(testPubsubTopic)
cache.contentSubscribe(testContentTopic)
cache.pubsubSubscribe("AnotherPubsubTopic")
cache.contentSubscribe("AnotherContentTopic")
## When
cache.unsubscribe(testTopic)
cache.pubsubUnsubscribe(testPubsubTopic)
cache.contentUnsubscribe(testContentTopic)
## Then
check:
not cache.isSubscribed(testTopic)
not cache.isPubsubSubscribed(testPubsubTopic)
not cache.isContentSubscribed(testContentTopic)
cache.pubsubTopicCount() == 1
cache.contentTopicCount() == 1
test "get messages of a subscribed topic":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init()
# Init cache content
cache.subscribe(testTopic)
cache.addMessage(testTopic, testMessage)
cache.pubsubSubscribe(testPubsubTopic)
cache.addMessage(testPubsubTopic, testMessage)
## When
let res = cache.getMessages(testTopic)
let res = cache.getMessages(testPubsubTopic)
## Then
check:
res.isOk()
res.get() == @[testMessage]
test "get messages with clean flag shoud clear the messages cache":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init()
# Init cache content
cache.subscribe(testTopic)
cache.addMessage(testTopic, testMessage)
cache.pubsubSubscribe(testPubsubTopic)
cache.addMessage(testPubsubTopic, testMessage)
## When
var res = cache.getMessages(testTopic, clear=true)
var res = cache.getMessages(testPubsubTopic, clear=true)
require(res.isOk())
res = cache.getMessages(testTopic)
res = cache.getMessages(testPubsubTopic)
## Then
check:
res.isOk()
res.get().len == 0
test "get messages of a non-subscribed topic":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()
## When
let res = cache.getMessages(testTopic)
cache.pubsubSubscribe(PubsubTopic("dummyPubsub"))
let res = cache.getMessages(testPubsubTopic)
## Then
check:
res.isErr()
res.error() == "Not subscribed to topic"
res.error() == "not subscribed to this pubsub topic"
test "add messages to subscribed topic":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init()
cache.subscribe(testTopic)
cache.pubsubSubscribe(testPubsubTopic)
## When
cache.addMessage(testTopic, testMessage)
cache.addMessage(testPubsubTopic, testMessage)
## Then
let messages = cache.getMessages(testTopic).tryGet()
let messages = cache.getMessages(testPubsubTopic).tryGet()
check:
messages == @[testMessage]
test "add messages to non-subscribed topic":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init()
## When
cache.addMessage(testTopic, testMessage)
cache.addMessage(testPubsubTopic, testMessage)
## Then
let res = cache.getMessages(testTopic)
let res = cache.getMessages(testPubsubTopic)
check:
res.isErr()
res.error() == "Not subscribed to topic"
res.error() == "not subscribed to any pubsub topics"
test "add messages beyond the capacity":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessages = @[
fakeWakuMessage(toBytes("MSG-1")),
fakeWakuMessage(toBytes("MSG-2")),
fakeWakuMessage(toBytes("MSG-3"))
]
var testMessages = @[fakeWakuMessage(toBytes("MSG-1"))]
let cache = TestMessageCache.init(capacity = 2)
cache.subscribe(testTopic)
# Prevent duplicate messages timestamp
for i in 0..<5:
var msg = fakeWakuMessage(toBytes("MSG-1"))
while msg.timestamp <= testMessages[i].timestamp:
msg = fakeWakuMessage(toBytes("MSG-1"))
testMessages.add(msg)
cache.pubsubSubscribe(testPubsubTopic)
## When
for msg in testMessages:
cache.addMessage(testTopic, msg)
cache.addMessage(testPubsubTopic, msg)
## Then
let messages = cache.getMessages(testTopic).tryGet()
let messages = cache.getMessages(testPubsubTopic).tryGet()
let messageSet = toHashSet(messages)
let testSet = toHashSet(testMessages)
check:
messages == testMessages[1..2]
messageSet.len == capacity
messageSet < testSet
testMessages[0] notin messages
test "get messages on pubsub via content topics":
cache.pubsubSubscribe(testPubsubTopic)
let fakeMessage = fakeWakuMessage()
cache.addMessage(testPubsubTopic, fakeMessage)
let getRes = cache.getAutoMessages(DefaultContentTopic)
check:
getRes.isOk
getRes.get() == @[fakeMessage]
test "add same message twice":
cache.pubsubSubscribe(testPubsubTopic)
let fakeMessage = fakeWakuMessage()
cache.addMessage(testPubsubTopic, fakeMessage)
cache.addMessage(testPubsubTopic, fakeMessage)
check:
cache.messagesCount() == 1
test "unsubscribing remove messages":
let topic0 = "PubsubTopic0"
let topic1 = "PubsubTopic1"
let topic2 = "PubsubTopic2"
let fakeMessage0 = fakeWakuMessage(toBytes("MSG-0"))
let fakeMessage1 = fakeWakuMessage(toBytes("MSG-1"))
let fakeMessage2 = fakeWakuMessage(toBytes("MSG-2"))
cache.pubsubSubscribe(topic0)
cache.pubsubSubscribe(topic1)
cache.pubsubSubscribe(topic2)
cache.contentSubscribe("ContentTopic0")
cache.addMessage(topic0, fakeMessage0)
cache.addMessage(topic1, fakeMessage1)
cache.addMessage(topic2, fakeMessage2)
cache.pubsubUnsubscribe(topic0)
# at this point, fakeMessage0 is only ref by DefaultContentTopic
let res = cache.getAutoMessages(DefaultContentTopic)
check:
res.isOk()
res.get().len == 3
cache.isPubsubSubscribed(topic0) == false
cache.isPubsubSubscribed(topic1) == true
cache.isPubsubSubscribed(topic2) == true
cache.contentUnsubscribe(DefaultContentTopic)
# msg0 was delete because no refs
check:
cache.messagesCount() == 2

View File

@ -4,8 +4,6 @@ import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
libp2p/crypto/crypto,
json_rpc/[rpcserver, rpcclient]
import
../../../waku/waku_core,
@ -20,11 +18,6 @@ import
../testlib/wakucore,
../testlib/wakunode
proc newTestMessageCache(): filter_api.MessageCache =
filter_api.MessageCache.init(capacity=30)
procSuite "Waku v2 JSON-RPC API - Filter":
let
bindIp = ValidIpAddress.init("0.0.0.0")
@ -49,7 +42,8 @@ procSuite "Waku v2 JSON-RPC API - Filter":
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installFilterApiHandlers(node2, server, newTestMessageCache())
let cache = MessageCache.init(capacity=30)
installFilterApiHandlers(node2, server, cache)
server.start()
let client = newRpcHttpClient()

View File

@ -4,7 +4,6 @@ import
std/[options, sequtils, tempfiles],
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
libp2p/crypto/crypto,
json_rpc/[rpcserver, rpcclient]
import
@ -15,10 +14,8 @@ import
../../../waku/waku_node,
../../../waku/waku_api/jsonrpc/relay/handlers as relay_api,
../../../waku/waku_api/jsonrpc/relay/client as relay_api_client,
../../../waku/waku_core,
../../../waku/waku_relay,
../../../waku/waku_rln_relay,
../testlib/common,
../testlib/wakucore,
../testlib/wakunode
@ -37,7 +34,7 @@ suite "Waku v2 JSON-RPC API - Relay":
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
server = newRpcHttpServer([ta])
let cache = MessageCache[string].init(capacity=30)
let cache = MessageCache.init(capacity=30)
installRelayApiHandlers(node, server, cache)
server.start()
@ -111,7 +108,7 @@ suite "Waku v2 JSON-RPC API - Relay":
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
server = newRpcHttpServer([ta])
let cache = MessageCache[string].init(capacity=30)
let cache = MessageCache.init(capacity=30)
installRelayApiHandlers(srcNode, server, cache)
server.start()
@ -181,7 +178,7 @@ suite "Waku v2 JSON-RPC API - Relay":
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
server = newRpcHttpServer([ta])
let cache = MessageCache[string].init(capacity=30)
let cache = MessageCache.init(capacity=30)
installRelayApiHandlers(dstNode, server, cache)
server.start()
@ -244,7 +241,7 @@ suite "Waku v2 JSON-RPC API - Relay":
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
server = newRpcHttpServer([ta])
let cache = MessageCache[string].init(capacity=30)
let cache = MessageCache.init(capacity=30)
installRelayApiHandlers(dstNode, server, cache)
server.start()

View File

@ -1,7 +1,6 @@
{.used.}
import
std/sequtils,
stew/byteutils,
stew/shims/net,
testutils/unittests,
@ -9,7 +8,6 @@ import
libp2p/crypto/crypto
import
../../waku/waku_api/message_cache,
../../waku/common/base64,
../../waku/waku_core,
../../waku/waku_node,
../../waku/node/peer_manager,
@ -44,7 +42,7 @@ type RestFilterTest = object
subscriberNode: WakuNode
restServer: RestServerRef
restServerForService: RestServerRef
messageCache: filter_api.MessageCache
messageCache: MessageCache
client: RestClientRef
clientTwdServiceNode: RestClientRef
@ -70,10 +68,10 @@ proc init(T: type RestFilterTest): Future[T] {.async.} =
testSetup.restServerForService = RestServerRef.init(restAddress, restPort2).tryGet()
# through this one we will see if messages are pushed according to our content topic sub
testSetup.messageCache = filter_api.MessageCache.init()
testSetup.messageCache = MessageCache.init()
installFilterRestApiHandlers(testSetup.restServer.router, testSetup.subscriberNode, testSetup.messageCache)
let topicCache = MessageCache[string].init()
let topicCache = MessageCache.init()
installRelayApiHandlers(testSetup.restServerForService.router, testSetup.serviceNode, topicCache)
testSetup.restServer.start()
@ -242,7 +240,7 @@ suite "Waku v2 Rest API - Filter V2":
restFilterTest = await RestFilterTest.init()
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.messageCache.subscribe(DefaultPubsubTopic)
restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic)
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
# When
@ -272,7 +270,7 @@ suite "Waku v2 Rest API - Filter V2":
toRelayWakuMessage(testMessage)
)
# Then
let messages = restFilterTest.messageCache.getMessages("1").tryGet()
let messages = restFilterTest.messageCache.getAutoMessages("1").tryGet()
check:
postMsgResponse.status == 200

View File

@ -39,7 +39,7 @@ type RestFilterTest = object
filterNode: WakuNode
clientNode: WakuNode
restServer: RestServerRef
messageCache: filter_api.MessageCache
messageCache: MessageCache
client: RestClientRef
@ -59,7 +59,7 @@ proc setupRestFilter(): Future[RestFilterTest] {.async.} =
let restAddress = ValidIpAddress.init("0.0.0.0")
result.restServer = RestServerRef.init(restAddress, restPort).tryGet()
result.messageCache = filter_api.MessageCache.init()
result.messageCache = MessageCache.init()
installLegacyFilterRestApiHandlers(result.restServer.router
,result.clientNode
,result.messageCache)
@ -100,10 +100,10 @@ suite "Waku v2 Rest API - Filter":
response.data == "OK"
check:
restFilterTest.messageCache.isSubscribed(DefaultContentTopic)
restFilterTest.messageCache.isSubscribed("2")
restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")
restFilterTest.messageCache.isContentSubscribed(DefaultContentTopic)
restFilterTest.messageCache.isContentSubscribed("2")
restFilterTest.messageCache.isContentSubscribed("3")
restFilterTest.messageCache.isContentSubscribed("4")
# When - error case
let badRequestBody = FilterLegacySubscribeRequest(contentFilters: @[]
@ -125,10 +125,10 @@ suite "Waku v2 Rest API - Filter":
restFilterTest: RestFilterTest = await setupRestFilter()
# When
restFilterTest.messageCache.subscribe("1")
restFilterTest.messageCache.subscribe("2")
restFilterTest.messageCache.subscribe("3")
restFilterTest.messageCache.subscribe("4")
restFilterTest.messageCache.contentSubscribe("1")
restFilterTest.messageCache.contentSubscribe("2")
restFilterTest.messageCache.contentSubscribe("3")
restFilterTest.messageCache.contentSubscribe("4")
let contentFilters = @[ContentTopic("1")
,ContentTopic("2")
@ -148,10 +148,10 @@ suite "Waku v2 Rest API - Filter":
response.data == "OK"
check:
not restFilterTest.messageCache.isSubscribed("1")
not restFilterTest.messageCache.isSubscribed("2")
not restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")
not restFilterTest.messageCache.isContentSubscribed("1")
not restFilterTest.messageCache.isContentSubscribed("2")
not restFilterTest.messageCache.isContentSubscribed("3")
restFilterTest.messageCache.isContentSubscribed("4")
await restFilterTest.shutdown()
@ -164,15 +164,22 @@ suite "Waku v2 Rest API - Filter":
let pubSubTopic = "/waku/2/default-waku/proto"
let contentTopic = ContentTopic( "content-topic-x" )
let messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
var messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
]
restFilterTest.messageCache.subscribe(contentTopic)
# Prevent duplicate messages
for i in 0..<2:
var msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
while msg == messages[i]:
msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
messages.add(msg)
restFilterTest.messageCache.contentSubscribe(contentTopic)
for msg in messages:
restFilterTest.messageCache.addMessage(contentTopic, msg)
restFilterTest.messageCache.addMessage(pubSubTopic, msg)
# When
let response = await restFilterTest.client.filterGetMessagesV1(contentTopic)

View File

@ -44,7 +44,7 @@ suite "Waku v2 Rest API - Relay":
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let cache = MessageCache[string].init()
let cache = MessageCache.init()
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
@ -66,9 +66,9 @@ suite "Waku v2 Rest API - Relay":
response.data == "OK"
check:
cache.isSubscribed("pubsub-topic-1")
cache.isSubscribed("pubsub-topic-2")
cache.isSubscribed("pubsub-topic-3")
cache.isPubsubSubscribed("pubsub-topic-1")
cache.isPubsubSubscribed("pubsub-topic-2")
cache.isPubsubSubscribed("pubsub-topic-3")
check:
toSeq(node.wakuRelay.subscribedTopics).len == pubSubTopics.len
@ -92,11 +92,11 @@ suite "Waku v2 Rest API - Relay":
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let cache = MessageCache[string].init()
cache.subscribe("pubsub-topic-1")
cache.subscribe("pubsub-topic-2")
cache.subscribe("pubsub-topic-3")
cache.subscribe("pubsub-topic-x")
let cache = MessageCache.init()
cache.pubsubSubscribe("pubsub-topic-1")
cache.pubsubSubscribe("pubsub-topic-2")
cache.pubsubSubscribe("pubsub-topic-3")
cache.pubsubSubscribe("pubsub-topic-x")
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
@ -119,15 +119,15 @@ suite "Waku v2 Rest API - Relay":
response.data == "OK"
check:
not cache.isSubscribed("pubsub-topic-1")
not cache.isPubsubSubscribed("pubsub-topic-1")
not node.wakuRelay.isSubscribed("pubsub-topic-1")
not cache.isSubscribed("pubsub-topic-2")
not cache.isPubsubSubscribed("pubsub-topic-2")
not node.wakuRelay.isSubscribed("pubsub-topic-2")
not cache.isSubscribed("pubsub-topic-3")
not cache.isPubsubSubscribed("pubsub-topic-3")
not node.wakuRelay.isSubscribed("pubsub-topic-3")
cache.isSubscribed("pubsub-topic-x")
cache.isPubsubSubscribed("pubsub-topic-x")
node.wakuRelay.isSubscribed("pubsub-topic-x")
not cache.isSubscribed("pubsub-topic-y")
not cache.isPubsubSubscribed("pubsub-topic-y")
not node.wakuRelay.isSubscribed("pubsub-topic-y")
await restServer.stop()
@ -145,15 +145,23 @@ suite "Waku v2 Rest API - Relay":
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let pubSubTopic = "/waku/2/default-waku/proto"
let messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
var messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
]
let cache = MessageCache[string].init()
# Prevent duplicate messages
for i in 0..<2:
var msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
cache.subscribe(pubSubTopic)
while msg == messages[i]:
msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
messages.add(msg)
let cache = MessageCache.init()
cache.pubsubSubscribe(pubSubTopic)
for msg in messages:
cache.addMessage(pubSubTopic, msg)
@ -177,7 +185,7 @@ suite "Waku v2 Rest API - Relay":
check:
cache.isSubscribed(pubSubTopic)
cache.isPubsubSubscribed(pubSubTopic)
cache.getMessages(pubSubTopic).tryGet().len == 0
await restServer.stop()
@ -199,7 +207,7 @@ suite "Waku v2 Rest API - Relay":
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let cache = MessageCache[string].init()
let cache = MessageCache.init()
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
@ -239,7 +247,7 @@ suite "Waku v2 Rest API - Relay":
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let cache = MessageCache[string].init()
let cache = MessageCache.init()
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
@ -263,9 +271,9 @@ suite "Waku v2 Rest API - Relay":
response.data == "OK"
check:
cache.isSubscribed(contentTopics[0])
cache.isSubscribed(contentTopics[1])
cache.isSubscribed(contentTopics[2])
cache.isContentSubscribed(contentTopics[0])
cache.isContentSubscribed(contentTopics[1])
cache.isContentSubscribed(contentTopics[2])
check:
# Node should be subscribed to all shards
@ -292,11 +300,11 @@ suite "Waku v2 Rest API - Relay":
ContentTopic("/waku/2/default-contentX/proto")
]
let cache = MessageCache[string].init()
cache.subscribe(contentTopics[0])
cache.subscribe(contentTopics[1])
cache.subscribe(contentTopics[2])
cache.subscribe("/waku/2/default-contentY/proto")
let cache = MessageCache.init()
cache.contentSubscribe(contentTopics[0])
cache.contentSubscribe(contentTopics[1])
cache.contentSubscribe(contentTopics[2])
cache.contentSubscribe("/waku/2/default-contentY/proto")
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
@ -312,10 +320,10 @@ suite "Waku v2 Rest API - Relay":
response.data == "OK"
check:
not cache.isSubscribed(contentTopics[1])
not cache.isSubscribed(contentTopics[2])
not cache.isSubscribed(contentTopics[3])
cache.isSubscribed("/waku/2/default-contentY/proto")
not cache.isContentSubscribed(contentTopics[1])
not cache.isContentSubscribed(contentTopics[2])
not cache.isContentSubscribed(contentTopics[3])
cache.isContentSubscribed("/waku/2/default-contentY/proto")
await restServer.stop()
await restServer.closeWait()
@ -332,17 +340,25 @@ suite "Waku v2 Rest API - Relay":
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let contentTopic = DefaultContentTopic
let messages = @[
fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")),
var messages = @[
fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1"))
]
let cache = MessageCache[string].init()
# Prevent duplicate messages
for i in 0..<2:
var msg = fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1"))
cache.subscribe(contentTopic)
while msg == messages[i]:
msg = fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1"))
messages.add(msg)
let cache = MessageCache.init()
cache.contentSubscribe(contentTopic)
for msg in messages:
cache.addMessage(contentTopic, msg)
cache.addMessage(DefaultPubsubTopic, msg)
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
@ -363,8 +379,8 @@ suite "Waku v2 Rest API - Relay":
msg.timestamp.get() != Timestamp(0)
check:
cache.isSubscribed(contentTopic)
cache.getMessages(contentTopic).tryGet().len == 0 # The cache is cleared when getMessage is called
cache.isContentSubscribed(contentTopic)
cache.getAutoMessages(contentTopic).tryGet().len == 0 # The cache is cleared when getMessage is called
await restServer.stop()
await restServer.closeWait()
@ -385,7 +401,7 @@ suite "Waku v2 Rest API - Relay":
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let cache = MessageCache[string].init()
let cache = MessageCache.init()
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()

View File

@ -40,11 +40,6 @@ proc defaultDiscoveryHandler*(discv5: WakuDiscoveryV5, cap: Capabilities): Disco
### Message Cache
proc messageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
proc messageCacheHandler*(cache: MessageCache): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
cache.addMessage(PubSubTopic(pubsubTopic), msg)
proc autoMessageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
if cache.isSubscribed(msg.contentTopic):
cache.addMessage(msg.contentTopic, msg)
cache.addMessage(pubsubTopic, msg)

View File

@ -24,11 +24,6 @@ logScope:
const futTimeout* = 5.seconds # Max time to wait for futures
type
MessageCache* = message_cache.MessageCache[ContentTopic]
proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) =
server.rpc("post_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], pubsubTopic: Option[PubsubTopic]) -> bool:
@ -42,7 +37,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
let contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
cache.addMessage(pubsubTopic, msg)
let subFut = node.legacyFilterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get())
if not await subFut.withTimeout(futTimeout):
@ -50,7 +45,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
# Successfully subscribed to all content filters
for cTopic in contentTopics:
cache.subscribe(cTopic)
cache.contentSubscribe(cTopic)
return true
@ -69,7 +64,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
raise newException(ValueError, "Failed to unsubscribe from contentFilters")
for cTopic in contentTopics:
cache.unsubscribe(cTopic)
cache.contentUnsubscribe(cTopic)
return true
@ -78,7 +73,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
## last time this method was called
debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic
if not cache.isSubscribed(contentTopic):
if not cache.isContentSubscribed(contentTopic):
raise newException(ValueError, "Not subscribed to topic: " & contentTopic)
let msgRes = cache.getMessages(contentTopic, clear=true)

View File

@ -32,7 +32,7 @@ const futTimeout* = 5.seconds # Max time to wait for futures
## Waku Relay JSON-RPC API
proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache[string]) =
proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) =
server.rpc("post_waku_v2_relay_v1_subscriptions") do (pubsubTopics: seq[PubsubTopic]) -> bool:
if pubsubTopics.len == 0:
raise newException(ValueError, "No pubsub topic provided")
@ -41,13 +41,13 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
debug "post_waku_v2_relay_v1_subscriptions"
# Subscribe to all requested topics
let newTopics = pubsubTopics.filterIt(not cache.isSubscribed(it))
let newTopics = pubsubTopics.filterIt(not cache.isPubsubSubscribed(it))
for pubsubTopic in newTopics:
if pubsubTopic == "":
raise newException(ValueError, "Empty pubsub topic")
cache.subscribe(pubsubTopic)
cache.pubsubSubscribe(pubsubTopic)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache)))
return true
@ -60,13 +60,13 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
debug "delete_waku_v2_relay_v1_subscriptions"
# Unsubscribe all handlers from requested topics
let subscribedTopics = pubsubTopics.filterIt(cache.isSubscribed(it))
let subscribedTopics = pubsubTopics.filterIt(cache.isPubsubSubscribed(it))
for pubsubTopic in subscribedTopics:
if pubsubTopic == "":
raise newException(ValueError, "Empty pubsub topic")
cache.unsubscribe(pubsubTopic)
cache.pubsubUnsubscribe(pubsubTopic)
node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic))
return true
@ -148,15 +148,15 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
## Subscribes a node to a list of Content topics
debug "post_waku_v2_relay_v1_auto_subscriptions"
let newTopics = contentTopics.filterIt(not cache.isSubscribed(it))
let newTopics = contentTopics.filterIt(not cache.isContentSubscribed(it))
# Subscribe to all requested topics
for contentTopic in newTopics:
if contentTopic == "":
raise newException(ValueError, "Empty content topic")
cache.subscribe(contentTopic)
node.subscribe((kind: ContentSub, topic: contentTopic), some(autoMessageCacheHandler(cache)))
cache.contentSubscribe(contentTopic)
node.subscribe((kind: ContentSub, topic: contentTopic), some(messageCacheHandler(cache)))
return true
@ -167,14 +167,14 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
## Unsubscribes a node from a list of Content topics
debug "delete_waku_v2_relay_v1_auto_subscriptions"
let subscribedTopics = contentTopics.filterIt(cache.isSubscribed(it))
let subscribedTopics = contentTopics.filterIt(cache.isContentSubscribed(it))
# Unsubscribe all handlers from requested topics
for contentTopic in subscribedTopics:
if contentTopic == "":
raise newException(ValueError, "Empty content topic")
cache.unsubscribe(contentTopic)
cache.contentUnsubscribe(contentTopic)
node.unsubscribe((kind: ContentUnsub, topic: contentTopic))
return true
@ -232,7 +232,7 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
## last time this method was called
debug "get_waku_v2_relay_v1_auto_messages", topic=contentTopic
let msgRes = cache.getMessages(contentTopic, clear=true)
let msgRes = cache.getAutoMessages(contentTopic, clear=true)
if msgRes.isErr():
raise newException(ValueError, "Not subscribed to content topic: " & contentTopic)

View File

@ -4,7 +4,7 @@ else:
{.push raises: [].}
import
std/[tables, sequtils],
std/[sequtils, sugar, algorithm, options],
stew/results,
chronicles,
chronos,
@ -15,68 +15,274 @@ import
logScope:
topics = "waku node message_cache"
const DefaultMessageCacheCapacity*: uint = 30 # Max number of messages cached per topic @TODO make this configurable
const DefaultMessageCacheCapacity: int = 50
type MessageCache* = ref object
pubsubTopics: seq[PubsubTopic]
contentTopics: seq[ContentTopic]
type MessageCacheResult*[T] = Result[T, cstring]
pubsubIndex: seq[tuple[pubsubIdx: int, msgIdx: int]]
contentIndex: seq[tuple[contentIdx: int, msgIdx: int]]
type MessageCache*[K] = ref object
capacity: uint
table: Table[K, seq[WakuMessage]]
messages: seq[WakuMessage]
func init*[K](T: type MessageCache[K], capacity=DefaultMessageCacheCapacity): T =
MessageCache[K](
capacity: capacity,
table: initTable[K, seq[WakuMessage]]()
capacity: int
func `$`*(self: MessageCache): string =
"Messages: " & $self.messages.len &
" \nPubsubTopics: " & $self.pubsubTopics &
" \nContentTopics: " & $self.contentTopics &
" \nPubsubIndex: " & $self.pubsubIndex &
" \nContentIndex: " & $self.contentIndex
func init*(T: type MessageCache, capacity=DefaultMessageCacheCapacity): T =
MessageCache(
capacity: capacity
)
proc messagesCount*(self: MessageCache): int =
self.messages.len
proc isSubscribed*[K](t: MessageCache[K], topic: K): bool =
t.table.hasKey(topic)
proc pubsubTopicCount*(self: MessageCache): int =
self.pubsubTopics.len
proc subscribe*[K](t: MessageCache[K], topic: K) =
if t.isSubscribed(topic):
return
t.table[topic] = @[]
proc contentTopicCount*(self: MessageCache): int =
self.contentTopics.len
proc unsubscribe*[K](t: MessageCache[K], topic: K) =
if not t.isSubscribed(topic):
return
t.table.del(topic)
proc pubsubSearch(self: MessageCache, pubsubTopic: PubsubTopic): Option[int] =
# Return some with the index if found none otherwise.
for i, topic in self.pubsubTopics:
if topic == pubsubTopic:
return some(i)
proc unsubscribeAll*[K](t: MessageCache[K]) =
t.table.clear()
return none(int)
proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) =
if not t.isSubscribed(topic):
proc contentSearch(self: MessageCache, contentTopic: ContentTopic): Option[int] =
# Return some with the index if found none otherwise.
for i, topic in self.contentTopics:
if topic == contentTopic:
return some(i)
return none(int)
proc isPubsubSubscribed*(self: MessageCache, pubsubTopic: PubsubTopic): bool =
self.pubsubSearch(pubsubTopic).isSome()
proc isContentSubscribed*(self: MessageCache, contentTopic: ContentTopic): bool =
self.contentSearch(contentTopic).isSome()
proc pubsubSubscribe*(self: MessageCache, pubsubTopic: PubsubTopic) =
if self.pubsubSearch(pubsubTopic).isNone():
self.pubsubTopics.add(pubsubTopic)
proc contentSubscribe*(self: MessageCache, contentTopic: ContentTopic) =
if self.contentSearch(contentTopic).isNone():
self.contentTopics.add(contentTopic)
proc removeMessage(self: MessageCache, idx: int) =
# get last index because del() is a swap
let lastIndex = self.messages.high
self.messages.del(idx)
# update indices
var j = self.pubsubIndex.high
while j > -1:
let (pId, mId) = self.pubsubIndex[j]
if mId == idx:
self.pubsubIndex.del(j)
elif mId == lastIndex:
self.pubsubIndex[j] = (pId, idx)
dec(j)
j = self.contentIndex.high
while j > -1:
let (cId, mId) = self.contentIndex[j]
if mId == idx:
self.contentIndex.del(j)
elif mId == lastIndex:
self.contentIndex[j] = (cId, idx)
dec(j)
proc pubsubUnsubscribe*(self: MessageCache, pubsubTopic: PubsubTopic) =
let pubsubIdxOp = self.pubsubSearch(pubsubTopic)
let pubsubIdx =
if pubsubIdxOp.isSome(): pubsubIdxOp.get()
else: return
let lastIndex = self.pubsubTopics.high
self.pubsubTopics.del(pubsubIdx)
var msgIndices = newSeq[int](0)
var j = self.pubsubIndex.high
while j > -1:
let (pId, mId) = self.pubsubIndex[j]
if pId == pubsubIdx:
# remove index for this topic
self.pubsubIndex.del(j)
msgIndices.add(mId)
elif pId == lastIndex:
# swap the index because pubsubTopics.del() is a swap
self.pubsubIndex[j] = (pubsubIdx, mId)
dec(j)
# check if messages on this pubsub topic are indexed by any content topic, if not remove them.
for mId in msgIndices:
if not self.contentIndex.anyIt(it.msgIdx == mId):
self.removeMessage(mId)
proc contentUnsubscribe*(self: MessageCache, contentTopic: ContentTopic) =
let contentIdxOP = self.contentSearch(contentTopic)
let contentIdx =
if contentIdxOP.isSome(): contentIdxOP.get()
else: return
let lastIndex = self.contentTopics.high
self.contentTopics.del(contentIdx)
var msgIndices = newSeq[int](0)
var j = self.contentIndex.high
while j > -1:
let (cId, mId) = self.contentIndex[j]
if cId == contentIdx:
# remove indices for this topic
self.contentIndex.del(j)
msgIndices.add(mId)
elif cId == lastIndex:
# swap the indices because contentTopics.del() is a swap
self.contentIndex[j] = (contentIdx, mId)
dec(j)
# check if messages on this content topic are indexed by any pubsub topic, if not remove them.
for mId in msgIndices:
if not self.pubsubIndex.anyIt(it.msgIdx == mId):
self.removeMessage(mId)
proc reset*(self: MessageCache) =
self.messages.setLen(0)
self.pubsubTopics.setLen(0)
self.contentTopics.setLen(0)
self.pubsubIndex.setLen(0)
self.contentIndex.setLen(0)
proc addMessage*(
self: MessageCache,
pubsubTopic: PubsubTopic,
msg: WakuMessage
) =
## Idempotent message addition.
var oldestTime = int64.high
var oldestMsg = int.high
for i, message in self.messages.reversed:
if message == msg:
return
if message.timestamp < oldestTime:
oldestTime = message.timestamp
oldestMsg = i
# reverse index
oldestMsg = self.messages.high - oldestMsg
var pubsubIdxOp = self.pubsubSearch(pubsubTopic)
var contentIdxOp = self.contentSearch(msg.contentTopic)
if pubsubIdxOp.isNone() and contentIdxOp.isNone():
return
# Make a copy of msgs for this topic to modify
var messages = t.table.getOrDefault(topic, @[])
let pubsubIdx =
if pubsubIdxOp.isNone():
self.pubsubTopics.add(pubsubTopic)
self.pubsubTopics.high
else:
pubsubIdxOp.get()
if messages.len >= t.capacity.int:
trace "Topic cache capacity reached", topic=topic
# Message cache on this topic exceeds maximum. Delete oldest.
# TODO: this may become a bottle neck if called as the norm rather than
# exception when adding messages. Performance profile needed.
messages.delete(0,0)
let contentIdx =
if contentIdxOp.isNone():
self.contentTopics.add(msg.contentTopic)
self.contentTopics.high
else:
contentIdxOp.get()
messages.add(msg)
# add the message, make space if needed
if self.messages.len >= self.capacity:
self.removeMessage(oldestMsg)
let msgIdx = self.messages.len
self.messages.add(msg)
# Replace indexed entry with copy
t.table[topic] = messages
self.pubsubIndex.add((pubsubIdx, msgIdx))
self.contentIndex.add((contentIdx, msgIdx))
proc clearMessages*[K](t: MessageCache[K], topic: K) =
if not t.isSubscribed(topic):
return
t.table[topic] = @[]
proc getMessages*(
self: MessageCache,
pubsubTopic: PubsubTopic,
clear=false
): Result[seq[WakuMessage], string] =
## Return all messages on this pubsub topic
proc getMessages*[K](t: MessageCache[K], topic: K, clear=false): MessageCacheResult[seq[WakuMessage]] =
if not t.isSubscribed(topic):
return err("Not subscribed to topic")
if self.pubsubTopics.len == 0:
return err("not subscribed to any pubsub topics")
let pubsubIdxOp = self.pubsubSearch(pubsubTopic)
let pubsubIdx =
if pubsubIdxOp.isNone:
return err("not subscribed to this pubsub topic")
else: pubsubIdxOp.get()
let msgIndices = collect:
for (pId, mId) in self.pubsubIndex:
if pId == pubsubIdx:
mId
let messages = msgIndices.mapIt(self.messages[it])
let messages = t.table.getOrDefault(topic, @[])
if clear:
t.clearMessages(topic)
for idx in msgIndices.reversed:
self.removeMessage(idx)
return ok(messages)
ok(messages)
proc getAutoMessages*(
self: MessageCache,
contentTopic: ContentTopic,
clear=false
): Result[seq[WakuMessage], string] =
## Return all messages on this content topic
if self.contentTopics.len == 0:
return err("not subscribed to any content topics")
let contentIdxOp = self.contentSearch(contentTopic)
let contentIdx =
if contentIdxOp.isNone():
return err("not subscribed to this content topic")
else: contentIdxOp.get()
let msgIndices = collect:
for (cId, mId) in self.contentIndex:
if cId == contentIdx:
mId
let messages = msgIndices.mapIt(self.messages[it])
if clear:
for idx in msgIndices.reversed:
self.removeMessage(idx)
return ok(messages)

View File

@ -40,10 +40,22 @@ const ROUTE_FILTER_SUBSCRIPTIONS* = "/filter/v2/subscriptions"
const ROUTE_FILTER_ALL_SUBSCRIPTIONS* = "/filter/v2/subscriptions/all"
const filterMessageCacheDefaultCapacity* = 30
func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] =
if contentBody.isNone():
return err(RestApiResponse.badRequest("Missing content body"))
type
MessageCache* = message_cache.MessageCache[ContentTopic]
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json"))
let reqBodyData = contentBody.get().data
let requestResult = decodeFromJsonBytes(T, reqBodyData)
if requestResult.isErr():
return err(RestApiResponse.badRequest("Invalid content body, could not decode. " &
$requestResult.error))
return ok(requestResult.get())
proc getErrorCause(err: filter_protocol_type.FilterSubscribeError): string =
## Retrieve proper error cause of FilterSubscribeError - due stringify make some parts of text double
@ -169,7 +181,7 @@ proc filterPostPutSubscriptionRequestHandler(
# Successfully subscribed to all content filters
for cTopic in req.contentFilters:
cache.subscribe(cTopic)
cache.contentSubscribe(cTopic)
return makeRestResponse(req.requestId, subFut.read())
@ -235,7 +247,7 @@ proc installFilterDeleteSubscriptionsHandler(
# Successfully subscribed to all content filters
for cTopic in req.contentFilters:
cache.unsubscribe(cTopic)
cache.contentUnsubscribe(cTopic)
# Successfully unsubscribed from all requested contentTopics
return makeRestResponse(req.requestId, unsubFut.read())
@ -276,7 +288,7 @@ proc installFilterDeleteAllSubscriptionsHandler(
FilterSubscribeError.serviceUnavailable(
"Failed to unsubscribe from all contentFilters due to timeout!"))
cache.unsubscribeAll()
cache.reset()
# Successfully unsubscribed from all requested contentTopics
return makeRestResponse(req.requestId, unsubFut.read())
@ -321,7 +333,7 @@ proc installFilterGetMessagesHandler(router: var RestRouter,
let pushHandler : FilterPushHandler = proc (pubsubTopic: PubsubTopic,
msg: WakuMessage)
{.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
cache.addMessage(pubsubTopic, msg)
node.wakuFilterClient.registerPushHandler(pushHandler)
@ -336,7 +348,7 @@ proc installFilterGetMessagesHandler(router: var RestRouter,
let contentTopic = contentTopic.get()
let msgRes = cache.getMessages(contentTopic, clear=true)
let msgRes = cache.getAutoMessages(contentTopic, clear=true)
if msgRes.isErr():
return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic)

View File

@ -34,10 +34,22 @@ const futTimeoutForSubscriptionProcessing* = 5.seconds
const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions"
const filterMessageCacheDefaultCapacity* = 30
func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] =
if contentBody.isNone():
return err(RestApiResponse.badRequest("Missing content body"))
type
MessageCache* = message_cache.MessageCache[ContentTopic]
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json"))
let reqBodyData = contentBody.get().data
let requestResult = decodeFromJsonBytes(T, reqBodyData)
if requestResult.isErr():
return err(RestApiResponse.badRequest("Invalid content body, could not decode. " &
$requestResult.error))
return ok(requestResult.get())
proc installFilterV1PostSubscriptionsV1Handler*(router: var RestRouter,
node: WakuNode,
@ -45,7 +57,7 @@ proc installFilterV1PostSubscriptionsV1Handler*(router: var RestRouter,
let pushHandler: FilterPushHandler =
proc(pubsubTopic: PubsubTopic,
msg: WakuMessage) {.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
cache.addMessage(pubsubTopic, msg)
router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a pubsubTopic
@ -74,7 +86,7 @@ proc installFilterV1PostSubscriptionsV1Handler*(router: var RestRouter,
# Successfully subscribed to all content filters
for cTopic in req.contentFilters:
cache.subscribe(cTopic)
cache.contentSubscribe(cTopic)
return RestApiResponse.ok()
@ -103,7 +115,7 @@ proc installFilterV1DeleteSubscriptionsV1Handler*(router: var RestRouter,
return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters")
for cTopic in req.contentFilters:
cache.unsubscribe(cTopic)
cache.contentUnsubscribe(cTopic)
# Successfully unsubscribed from all requested contentTopics
return RestApiResponse.ok()
@ -124,7 +136,7 @@ proc installFilterV1GetMessagesV1Handler*(router: var RestRouter,
let contentTopic = contentTopic.get()
let msgRes = cache.getMessages(contentTopic, clear=true)
let msgRes = cache.getAutoMessages(contentTopic, clear=true)
if msgRes.isErr():
return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic)

View File

@ -5,7 +5,7 @@ else:
import
std/sequtils,
stew/byteutils,
stew/[byteutils, results],
chronicles,
json_serialization,
json_serialization/std/options,
@ -26,19 +26,15 @@ import
from std/times import getTime
from std/times import toUnix
export types
logScope:
topics = "waku node rest relay_api"
##### Topic cache
const futTimeout* = 5.seconds # Max time to wait for futures
#### Request handlers
const ROUTE_RELAY_SUBSCRIPTIONSV1* = "/relay/v1/subscriptions"
@ -47,10 +43,11 @@ const ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1* = "/relay/v1/auto/subscriptions"
const ROUTE_RELAY_AUTO_MESSAGESV1* = "/relay/v1/auto/messages/{contentTopic}"
const ROUTE_RELAY_AUTO_MESSAGESV1_NO_TOPIC* = "/relay/v1/auto/messages"
proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: MessageCache[string]) =
proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: MessageCache) =
router.api(MethodPost, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of PubSub topics
# debug "post_waku_v2_relay_v1_subscriptions"
## Subscribes a node to a list of PubSub topics
debug "post_waku_v2_relay_v1_subscriptions"
# Check the request body
if contentBody.isNone():
@ -60,10 +57,10 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes
return error
# Only subscribe to topics for which we have no subscribed topic handlers yet
let newTopics = req.filterIt(not cache.isSubscribed(it))
let newTopics = req.filterIt(not cache.isPubsubSubscribed(it))
for pubsubTopic in newTopics:
cache.subscribe(pubsubTopic)
cache.pubsubSubscribe(pubsubTopic)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache)))
return RestApiResponse.ok()
@ -81,8 +78,8 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes
# Unsubscribe all handlers from requested topics
for pubsubTopic in req:
node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic))
cache.unsubscribe(pubsubTopic)
cache.pubsubUnsubscribe(pubsubTopic)
node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic))
# Successfully unsubscribed from all requested topics
return RestApiResponse.ok()
@ -160,66 +157,54 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes
# Autosharding API
router.api(MethodPost, ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of content topics
# debug "post_waku_v2_relay_v1_auto_subscriptions"
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
## Subscribes a node to a list of content topics.
debug "post_waku_v2_relay_v1_auto_subscriptions"
let req: seq[ContentTopic] = decodeRequestBody[seq[ContentTopic]](contentBody).valueOr:
return error
# Only subscribe to topics for which we have no subscribed topic handlers yet
let newTopics = req.filterIt(not cache.isSubscribed(it))
let newTopics = req.filterIt(not cache.isContentSubscribed(it))
for contentTopic in newTopics:
cache.subscribe(contentTopic)
node.subscribe((kind: ContentSub, topic: contentTopic), some(autoMessageCacheHandler(cache)))
cache.contentSubscribe(contentTopic)
node.subscribe((kind: ContentSub, topic: contentTopic), some(messageCacheHandler(cache)))
return RestApiResponse.ok()
router.api(MethodDelete, ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of content topics
# debug "delete_waku_v2_relay_v1_auto_subscriptions"
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
## Unsubscribes a node from a list of content topics.
debug "delete_waku_v2_relay_v1_auto_subscriptions"
let req: seq[ContentTopic] = decodeRequestBody[seq[ContentTopic]](contentBody).valueOr:
return error
# Unsubscribe all handlers from requested topics
for contentTopic in req:
cache.unsubscribe(contentTopic)
cache.contentUnsubscribe(contentTopic)
node.unsubscribe((kind: ContentUnsub, topic: contentTopic))
# Successfully unsubscribed from all requested topics
return RestApiResponse.ok()
router.api(MethodGet, ROUTE_RELAY_AUTO_MESSAGESV1) do (contentTopic: string) -> RestApiResponse:
# ## Returns all WakuMessages received on a content topic since the
# ## last time this method was called
# ## TODO: ability to specify a return message limit
# debug "get_waku_v2_relay_v1_auto_messages", topic=topic
## Returns all WakuMessages received on a content topic since the
## last time this method was called.
debug "get_waku_v2_relay_v1_auto_messages", contentTopic=contentTopic
if contentTopic.isErr():
return RestApiResponse.badRequest()
let contentTopic = contentTopic.get()
let contentTopic = contentTopic.valueOr:
return RestApiResponse.badRequest($error)
let messages = cache.getMessages(contentTopic, clear=true)
if messages.isErr():
let messages = cache.getAutoMessages(contentTopic, clear=true).valueOr:
debug "Not subscribed to topic", topic=contentTopic
return RestApiResponse.notFound()
return RestApiResponse.notFound(contentTopic)
let data = RelayGetMessagesResponse(messages.get().map(toRelayWakuMessage))
let resp = RestApiResponse.jsonResponse(data, status=Http200)
if resp.isErr():
debug "An error ocurred while building the json respose", error=resp.error
return RestApiResponse.internalServerError()
let data = RelayGetMessagesResponse(messages.map(toRelayWakuMessage))
return resp.get()
return RestApiResponse.jsonResponse(data, status=Http200).valueOr:
debug "An error ocurred while building the json respose", error = error
return RestApiResponse.internalServerError($error)
router.api(MethodPost, ROUTE_RELAY_AUTO_MESSAGESV1_NO_TOPIC) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# Check the request body
@ -237,25 +222,21 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes
# if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil():
# append the proof to the message
let success = node.wakuRlnRelay.appendRLNProof(message,
float64(getTime().toUnix()))
if not success:
return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message")
if not node.wakuRlnRelay.appendRLNProof(message, float64(getTime().toUnix())):
return RestApiResponse.internalServerError(
"Failed to publish: error appending RLN proof to message")
# validate the message before sending it
let result = node.wakuRlnRelay.validateMessageAndUpdateLog(message)
if result == MessageValidationResult.Invalid:
return RestApiResponse.internalServerError("Failed to publish: invalid RLN proof")
elif result == MessageValidationResult.Spam:
return RestApiResponse.badRequest("Failed to publish: limit exceeded, try again later")
elif result == MessageValidationResult.Valid:
debug "RLN proof validated successfully", contentTopic=message.contentTopic
else:
return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result")
case node.wakuRlnRelay.validateMessage(message):
of MessageValidationResult.Invalid:
return RestApiResponse.internalServerError("Failed to publish: invalid RLN proof")
of MessageValidationResult.Spam:
return RestApiResponse.badRequest("Failed to publish: limit exceeded, try again later")
of MessageValidationResult.Valid:
debug "RLN proof validated successfully", contentTopic=message.contentTopic
# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message", contentTopic=message.contentTopic, rln=not node.wakuRlnRelay.isNil()
if not (waitFor node.publish(none(PubSubTopic), message).withTimeout(futTimeout)):
error "Failed to publish message to topic", contentTopic=message.contentTopic
return RestApiResponse.internalServerError("Failed to publish: timedout")