mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-10 18:03:11 +00:00
deploy: ffb5099f9e7df614e7496cef0923aba52de2ba48
This commit is contained in:
parent
91c676801c
commit
8c20cc04fc
@ -259,7 +259,7 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
let client = newRpcHttpClient()
|
||||
await client.connect("127.0.0.1", rpcPort)
|
||||
|
||||
let response = await client.get_waku_v2_store_v1_messages(@[HistoryContentFilter(contentTopic: defaultContentTopic)], some(StorePagingOptions()))
|
||||
let response = await client.get_waku_v2_store_v1_messages(defaultTopic, @[HistoryContentFilter(contentTopic: defaultContentTopic)], some(StorePagingOptions()))
|
||||
check:
|
||||
response.messages.len() == 8
|
||||
response.pagingOptions.isNone
|
||||
|
||||
@ -16,7 +16,7 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
|
||||
|
||||
# Store API
|
||||
|
||||
proc get_waku_v2_store_v1_messages(contentFilters: seq[HistoryContentFilter], pagingOptions: Option[StorePagingOptions]): StoreResponse
|
||||
proc get_waku_v2_store_v1_messages(pubsubTopic: string, contentFilters: seq[HistoryContentFilter], pagingOptions: Option[StorePagingOptions]): StoreResponse
|
||||
|
||||
# Filter API
|
||||
|
||||
|
||||
@ -17,7 +17,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
|
||||
## Store API version 1 definitions
|
||||
|
||||
rpcsrv.rpc("get_waku_v2_store_v1_messages") do(contentFilters: seq[HistoryContentFilter], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
|
||||
rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopic: string, contentFilters: seq[HistoryContentFilter], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
|
||||
## Returns history for a list of content topics with optional paging
|
||||
debug "get_waku_v2_store_v1_messages"
|
||||
|
||||
@ -27,7 +27,8 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
debug "get_waku_v2_store_v1_messages response"
|
||||
responseFut.complete(response.toStoreResponse())
|
||||
|
||||
let historyQuery = HistoryQuery(contentFilters: contentFilters,
|
||||
let historyQuery = HistoryQuery(pubsubTopic: pubsubTopic,
|
||||
contentFilters: contentFilters,
|
||||
pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo())
|
||||
|
||||
await node.query(historyQuery, queryFuncHandler)
|
||||
|
||||
@ -24,13 +24,18 @@ if paramCount() < 1:
|
||||
|
||||
let rpcPort = Port(parseInt(paramStr(1)))
|
||||
|
||||
echo "Please enter your topic:"
|
||||
echo "Please enter your pubsub topic:"
|
||||
let raw_pubsub = readLine(stdin)
|
||||
let pubsubTopic = fmt"{raw_pubsub}"
|
||||
echo "PubSubTopic is:", pubsubTopic
|
||||
|
||||
echo "Please enter your content topic:"
|
||||
let raw_input = readLine(stdin)
|
||||
let input = fmt"{raw_input}"
|
||||
echo "Input is:", input
|
||||
echo "Content topic is:", input
|
||||
|
||||
var node = newRpcHttpClient()
|
||||
waitfor node.connect("localhost", rpcPort)
|
||||
|
||||
var res = waitfor node.get_waku_v2_store_v1_messages(@[HistoryContentFilter(contentTopic: ContentTopic(input))], none(StorePagingOptions))
|
||||
var res = waitfor node.get_waku_v2_store_v1_messages(pubsubTopic, @[HistoryContentFilter(contentTopic: ContentTopic(input))], none(StorePagingOptions))
|
||||
echo "Waku query response: ", res
|
||||
|
||||
@ -180,6 +180,11 @@ proc stop*(node: WakuNode) {.async.} =
|
||||
node.started = false
|
||||
|
||||
proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
|
||||
if node.wakuRelay.isNil:
|
||||
error "Invalid API call to `subscribe`. WakuRelay not mounted."
|
||||
# @TODO improved error handling
|
||||
return
|
||||
|
||||
info "subscribe", topic=topic
|
||||
|
||||
proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
@ -242,6 +247,11 @@ proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
|
||||
## Unsubscribes a handler from a PubSub topic.
|
||||
##
|
||||
## Status: Implemented.
|
||||
if node.wakuRelay.isNil:
|
||||
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
|
||||
# @TODO improved error handling
|
||||
return
|
||||
|
||||
info "unsubscribe", topic=topic
|
||||
|
||||
let wakuRelay = node.wakuRelay
|
||||
@ -251,6 +261,12 @@ proc unsubscribeAll*(node: WakuNode, topic: Topic) =
|
||||
## Unsubscribes all handlers registered on a specific PubSub topic.
|
||||
##
|
||||
## Status: Implemented.
|
||||
|
||||
if node.wakuRelay.isNil:
|
||||
error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted."
|
||||
# @TODO improved error handling
|
||||
return
|
||||
|
||||
info "unsubscribeAll", topic=topic
|
||||
|
||||
let wakuRelay = node.wakuRelay
|
||||
@ -280,6 +296,11 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage, rlnRelayEnabl
|
||||
##
|
||||
## Status: Implemented.
|
||||
## When rlnRelayEnabled is true, a zkp will be generated and attached to the message (it is an experimental feature)
|
||||
|
||||
if node.wakuRelay.isNil:
|
||||
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
|
||||
# @TODO improved error handling
|
||||
return
|
||||
|
||||
let wakuRelay = node.wakuRelay
|
||||
debug "publish", topic=topic, contentTopic=message.contentTopic
|
||||
@ -346,6 +367,20 @@ proc mountFilter*(node: WakuNode) =
|
||||
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
|
||||
waku_node_messages.inc(labelValues = ["filter"])
|
||||
|
||||
if node.wakuRelay.isNil:
|
||||
debug "light node: mounting relay without starting"
|
||||
## WakuFilter currently requires WakuRelay to be mounted in order to work.
|
||||
## This is to allow protocol stream negotation with full nodes to succeed.
|
||||
## Here we mount relay on the switch only, but do not subscribe to any pubsub
|
||||
## topics. We also never start the relay protocol.
|
||||
## @TODO: remove WakuRelay dependency
|
||||
node.switch.mount(WakuRelay.init(
|
||||
switch = node.switch,
|
||||
triggerSelf = true,
|
||||
sign = false,
|
||||
verifySignature = false
|
||||
))
|
||||
|
||||
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler)
|
||||
node.switch.mount(node.wakuFilter)
|
||||
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
|
||||
@ -654,13 +689,6 @@ when isMainModule:
|
||||
if conf.storenode != "":
|
||||
setStorePeer(node, conf.storenode)
|
||||
|
||||
# Filter setup
|
||||
if (conf.filternode != "") or (conf.filter):
|
||||
mountFilter(node)
|
||||
|
||||
if conf.filternode != "":
|
||||
setFilterPeer(node, conf.filternode)
|
||||
|
||||
# Relay setup
|
||||
if conf.relay: # True by default
|
||||
mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay)
|
||||
@ -671,6 +699,13 @@ when isMainModule:
|
||||
# NOTE Must be mounted after relay
|
||||
if conf.lightpush:
|
||||
mountLightPush(node)
|
||||
|
||||
# Filter setup. NOTE Must be mounted after relay
|
||||
if (conf.filternode != "") or (conf.filter):
|
||||
mountFilter(node)
|
||||
|
||||
if conf.filternode != "":
|
||||
setFilterPeer(node, conf.filternode)
|
||||
|
||||
if conf.rpc:
|
||||
startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user