mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
Allow relay=false for light filter nodes (#518)
This commit is contained in:
parent
5054ecef35
commit
ffb5099f9e
@ -180,6 +180,11 @@ proc stop*(node: WakuNode) {.async.} =
|
|||||||
node.started = false
|
node.started = false
|
||||||
|
|
||||||
proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
|
proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
error "Invalid API call to `subscribe`. WakuRelay not mounted."
|
||||||
|
# @TODO improved error handling
|
||||||
|
return
|
||||||
|
|
||||||
info "subscribe", topic=topic
|
info "subscribe", topic=topic
|
||||||
|
|
||||||
proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
@ -242,6 +247,11 @@ proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
|
|||||||
## Unsubscribes a handler from a PubSub topic.
|
## Unsubscribes a handler from a PubSub topic.
|
||||||
##
|
##
|
||||||
## Status: Implemented.
|
## Status: Implemented.
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
|
||||||
|
# @TODO improved error handling
|
||||||
|
return
|
||||||
|
|
||||||
info "unsubscribe", topic=topic
|
info "unsubscribe", topic=topic
|
||||||
|
|
||||||
let wakuRelay = node.wakuRelay
|
let wakuRelay = node.wakuRelay
|
||||||
@ -251,6 +261,12 @@ proc unsubscribeAll*(node: WakuNode, topic: Topic) =
|
|||||||
## Unsubscribes all handlers registered on a specific PubSub topic.
|
## Unsubscribes all handlers registered on a specific PubSub topic.
|
||||||
##
|
##
|
||||||
## Status: Implemented.
|
## Status: Implemented.
|
||||||
|
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted."
|
||||||
|
# @TODO improved error handling
|
||||||
|
return
|
||||||
|
|
||||||
info "unsubscribeAll", topic=topic
|
info "unsubscribeAll", topic=topic
|
||||||
|
|
||||||
let wakuRelay = node.wakuRelay
|
let wakuRelay = node.wakuRelay
|
||||||
@ -280,6 +296,11 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage, rlnRelayEnabl
|
|||||||
##
|
##
|
||||||
## Status: Implemented.
|
## Status: Implemented.
|
||||||
## When rlnRelayEnabled is true, a zkp will be generated and attached to the message (it is an experimental feature)
|
## When rlnRelayEnabled is true, a zkp will be generated and attached to the message (it is an experimental feature)
|
||||||
|
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
|
||||||
|
# @TODO improved error handling
|
||||||
|
return
|
||||||
|
|
||||||
let wakuRelay = node.wakuRelay
|
let wakuRelay = node.wakuRelay
|
||||||
debug "publish", topic=topic, contentTopic=message.contentTopic
|
debug "publish", topic=topic, contentTopic=message.contentTopic
|
||||||
@ -346,6 +367,20 @@ proc mountFilter*(node: WakuNode) =
|
|||||||
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
|
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
|
||||||
waku_node_messages.inc(labelValues = ["filter"])
|
waku_node_messages.inc(labelValues = ["filter"])
|
||||||
|
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
debug "light node: mounting relay without starting"
|
||||||
|
## WakuFilter currently requires WakuRelay to be mounted in order to work.
|
||||||
|
## This is to allow protocol stream negotation with full nodes to succeed.
|
||||||
|
## Here we mount relay on the switch only, but do not subscribe to any pubsub
|
||||||
|
## topics. We also never start the relay protocol.
|
||||||
|
## @TODO: remove WakuRelay dependency
|
||||||
|
node.switch.mount(WakuRelay.init(
|
||||||
|
switch = node.switch,
|
||||||
|
triggerSelf = true,
|
||||||
|
sign = false,
|
||||||
|
verifySignature = false
|
||||||
|
))
|
||||||
|
|
||||||
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler)
|
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler)
|
||||||
node.switch.mount(node.wakuFilter)
|
node.switch.mount(node.wakuFilter)
|
||||||
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
|
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
|
||||||
@ -654,13 +689,6 @@ when isMainModule:
|
|||||||
if conf.storenode != "":
|
if conf.storenode != "":
|
||||||
setStorePeer(node, conf.storenode)
|
setStorePeer(node, conf.storenode)
|
||||||
|
|
||||||
# Filter setup
|
|
||||||
if (conf.filternode != "") or (conf.filter):
|
|
||||||
mountFilter(node)
|
|
||||||
|
|
||||||
if conf.filternode != "":
|
|
||||||
setFilterPeer(node, conf.filternode)
|
|
||||||
|
|
||||||
# Relay setup
|
# Relay setup
|
||||||
if conf.relay: # True by default
|
if conf.relay: # True by default
|
||||||
mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay)
|
mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay)
|
||||||
@ -671,6 +699,13 @@ when isMainModule:
|
|||||||
# NOTE Must be mounted after relay
|
# NOTE Must be mounted after relay
|
||||||
if conf.lightpush:
|
if conf.lightpush:
|
||||||
mountLightPush(node)
|
mountLightPush(node)
|
||||||
|
|
||||||
|
# Filter setup. NOTE Must be mounted after relay
|
||||||
|
if (conf.filternode != "") or (conf.filter):
|
||||||
|
mountFilter(node)
|
||||||
|
|
||||||
|
if conf.filternode != "":
|
||||||
|
setFilterPeer(node, conf.filternode)
|
||||||
|
|
||||||
if conf.rpc:
|
if conf.rpc:
|
||||||
startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf)
|
startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user