feat: Autosharding API for RELAY subscriptions (#1983)

This commit is contained in:
Simon-Pierre Vivier 2023-09-26 07:33:52 -04:00 committed by GitHub
parent b67db79994
commit 9cfde68fdf
29 changed files with 905 additions and 327 deletions

View File

@ -212,7 +212,7 @@ proc publish(c: Chat, line: string) =
# Attempt lightpush # Attempt lightpush
asyncSpawn c.node.lightpushPublish(some(DefaultPubsubTopic), message) asyncSpawn c.node.lightpushPublish(some(DefaultPubsubTopic), message)
else: else:
asyncSpawn c.node.publish(DefaultPubsubTopic, message) asyncSpawn c.node.publish(some(DefaultPubsubTopic), message)
# TODO This should read or be subscribe handler subscribe # TODO This should read or be subscribe handler subscribe
proc readAndPrint(c: Chat) {.async.} = proc readAndPrint(c: Chat) {.async.} =
@ -490,8 +490,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if msg.contentTopic == chat.contentTopic: if msg.contentTopic == chat.contentTopic:
chat.printReceivedMessage(msg) chat.printReceivedMessage(msg)
let topic = DefaultPubsubTopic node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler))
node.subscribe(topic, handler)
if conf.rlnRelay: if conf.rlnRelay:
info "WakuRLNRelay is enabled" info "WakuRLNRelay is enabled"

View File

@ -95,7 +95,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =
chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"]) chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"])
await cmb.nodev2.publish(DefaultPubsubTopic, msg) await cmb.nodev2.publish(some(DefaultPubsubTopic), msg)
proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} = proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} =
if cmb.seen.containsOrAdd(msg.payload.hash()): if cmb.seen.containsOrAdd(msg.payload.hash()):
@ -204,7 +204,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
trace "Bridging message from Chat2 to Matterbridge", msg=msg trace "Bridging message from Chat2 to Matterbridge", msg=msg
cmb.toMatterbridge(msg) cmb.toMatterbridge(msg)
cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler) cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
proc stop*(cmb: Chat2MatterBridge) {.async.} = proc stop*(cmb: Chat2MatterBridge) {.async.} =
info "Stopping Chat2MatterBridge" info "Stopping Chat2MatterBridge"
@ -229,8 +229,8 @@ when isMainModule:
# Install enabled API handlers: # Install enabled API handlers:
if conf.relay: if conf.relay:
let topicCache = relay_api.MessageCache.init(capacity=30) let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(node, rpcServer, topicCache) installRelayApiHandlers(node, rpcServer, cache)
if conf.filter: if conf.filter:
let messageCache = filter_api.MessageCache.init(capacity=30) let messageCache = filter_api.MessageCache.init(capacity=30)

View File

@ -402,7 +402,7 @@ proc subscribeAndHandleMessages(node: WakuNode,
else: else:
msgPerContentTopic[msg.contentTopic] = 1 msgPerContentTopic[msg.contentTopic] = 1
node.subscribe(pubsubTopic, handler) node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
when isMainModule: when isMainModule:
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError # known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError

View File

@ -28,6 +28,21 @@ import
../../waku/node/peer_manager, ../../waku/node/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage, ../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/waku_api/message_cache,
../../waku/waku_api/cache_handlers,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/debug/handlers as rest_debug_api,
../../waku/waku_api/rest/relay/handlers as rest_relay_api,
../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/waku_api/rest/filter/handlers as rest_filter_api,
../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
../../waku/waku_api/rest/store/handlers as rest_store_api,
../../waku/waku_api/rest/health/handlers as rest_health_api,
../../waku/waku_api/jsonrpc/admin/handlers as rpc_admin_api,
../../waku/waku_api/jsonrpc/debug/handlers as rpc_debug_api,
../../waku/waku_api/jsonrpc/filter/handlers as rpc_filter_api,
../../waku/waku_api/jsonrpc/relay/handlers as rpc_relay_api,
../../waku/waku_api/jsonrpc/store/handlers as rpc_store_api,
../../waku/waku_archive, ../../waku/waku_archive,
../../waku/waku_dnsdisc, ../../waku/waku_dnsdisc,
../../waku/waku_enr, ../../waku/waku_enr,
@ -41,22 +56,6 @@ import
./wakunode2_validator_signed, ./wakunode2_validator_signed,
./internal_config, ./internal_config,
./external_config ./external_config
import
../../waku/waku_api/message_cache,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/debug/handlers as rest_debug_api,
../../waku/waku_api/rest/relay/handlers as rest_relay_api,
../../waku/waku_api/rest/relay/topic_cache,
../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/waku_api/rest/filter/handlers as rest_filter_api,
../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
../../waku/waku_api/rest/store/handlers as rest_store_api,
../../waku/waku_api/rest/health/handlers as rest_health_api,
../../waku/waku_api/jsonrpc/admin/handlers as rpc_admin_api,
../../waku/waku_api/jsonrpc/debug/handlers as rpc_debug_api,
../../waku/waku_api/jsonrpc/filter/handlers as rpc_filter_api,
../../waku/waku_api/jsonrpc/relay/handlers as rpc_relay_api,
../../waku/waku_api/jsonrpc/store/handlers as rpc_store_api
logScope: logScope:
topics = "wakunode app" topics = "wakunode app"
@ -576,8 +575,20 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
## Relay REST API ## Relay REST API
if conf.relay: if conf.relay:
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity) let cache = MessageCache[string].init(capacity=conf.restRelayCacheCapacity)
installRelayApiHandlers(server.router, app.node, relayCache)
let handler = messageCacheHandler(cache)
let autoHandler = autoMessageCacheHandler(cache)
for pubsubTopic in conf.pubsubTopics:
cache.subscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
for contentTopic in conf.contentTopics:
cache.subscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))
installRelayApiHandlers(server.router, app.node, cache)
## Filter REST API ## Filter REST API
if conf.filter: if conf.filter:
@ -610,8 +621,20 @@ proc startRpcServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNod
installDebugApiHandlers(app.node, server) installDebugApiHandlers(app.node, server)
if conf.relay: if conf.relay:
let relayMessageCache = rpc_relay_api.MessageCache.init(capacity=30) let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(app.node, server, relayMessageCache)
let handler = messageCacheHandler(cache)
let autoHandler = autoMessageCacheHandler(cache)
for pubsubTopic in conf.pubsubTopics:
cache.subscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
for contentTopic in conf.contentTopics:
cache.subscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))
installRelayApiHandlers(app.node, server, cache)
if conf.filternode != "": if conf.filternode != "":
let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30) let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30)

View File

@ -99,7 +99,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
contentTopic: contentTopic, # content topic to publish to contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it ephemeral: true, # tell store nodes to not store it
timestamp: now()) # current timestamp timestamp: now()) # current timestamp
await node.publish(pubSubTopic, message) await node.publish(some(pubSubTopic), message)
notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic
await sleepAsync(5000) await sleepAsync(5000)

View File

@ -94,7 +94,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
pubsubTopic=pubsubTopic, pubsubTopic=pubsubTopic,
contentTopic=msg.contentTopic, contentTopic=msg.contentTopic,
timestamp=msg.timestamp timestamp=msg.timestamp
node.subscribe(pubSubTopic, handler) node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
when isMainModule: when isMainModule:
let rng = crypto.newRng() let rng = crypto.newRng()

View File

@ -54,9 +54,9 @@ procSuite "Peer Exchange":
peerExchangeHandler = handlePeerExchange peerExchangeHandler = handlePeerExchange
emptyHandler = ignorePeerExchange emptyHandler = ignorePeerExchange
await node1.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(emptyHandler)) await node1.mountRelay(@[DefaultPubsubTopic], some(emptyHandler))
await node2.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(emptyHandler)) await node2.mountRelay(@[DefaultPubsubTopic], some(emptyHandler))
await node3.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(peerExchangeHandler)) await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandler))
# Ensure that node1 prunes all peers after the first connection # Ensure that node1 prunes all peers after the first connection
node1.wakuRelay.parameters.dHigh = 1 node1.wakuRelay.parameters.dHigh = 1

View File

@ -415,9 +415,9 @@ procSuite "Waku Discovery v5":
asyncSpawn node.subscriptionsListener(queue) asyncSpawn node.subscriptionsListener(queue)
## Then ## Then
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1)) queue.emit((kind: PubsubSub, topic: shard1))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2)) queue.emit((kind: PubsubSub, topic: shard2))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3)) queue.emit((kind: PubsubSub, topic: shard3))
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
@ -426,9 +426,9 @@ procSuite "Waku Discovery v5":
node.protocol.localNode.record.containsShard(shard2) == true node.protocol.localNode.record.containsShard(shard2) == true
node.protocol.localNode.record.containsShard(shard3) == true node.protocol.localNode.record.containsShard(shard3) == true
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1)) queue.emit((kind: PubsubSub, topic: shard1))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2)) queue.emit((kind: PubsubSub, topic: shard2))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3)) queue.emit((kind: PubsubSub, topic: shard3))
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
@ -437,9 +437,9 @@ procSuite "Waku Discovery v5":
node.protocol.localNode.record.containsShard(shard2) == true node.protocol.localNode.record.containsShard(shard2) == true
node.protocol.localNode.record.containsShard(shard3) == true node.protocol.localNode.record.containsShard(shard3) == true
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard1)) queue.emit((kind: PubsubUnsub, topic: shard1))
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard2)) queue.emit((kind: PubsubUnsub, topic: shard2))
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard3)) queue.emit((kind: PubsubUnsub, topic: shard3))
await sleepAsync(1.seconds) await sleepAsync(1.seconds)

View File

@ -67,10 +67,10 @@ suite "WakuNode":
msg.payload == payload msg.payload == payload
completionFut.complete(true) completionFut.complete(true)
node2.subscribe(pubSubTopic, relayHandler) node2.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
await node1.publish(pubSubTopic, message) await node1.publish(some(pubSubTopic), message)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
check: check:

View File

@ -49,7 +49,7 @@ suite "WakuNode - Lightpush":
topic == DefaultPubsubTopic topic == DefaultPubsubTopic
msg == message msg == message
completionFutRelay.complete(true) completionFutRelay.complete(true)
destNode.subscribe(DefaultPubsubTopic, relayHandler) destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
# Wait for subscription to take effect # Wait for subscription to take effect
await sleepAsync(100.millis) await sleepAsync(100.millis)

View File

@ -46,8 +46,8 @@ suite "Waku Relay":
networkB = "test-network2" networkB = "test-network2"
## when ## when
nodeA.subscribe(networkA, noopRawHandler()) discard nodeA.subscribe(networkA, noopRawHandler())
nodeA.subscribe(networkB, noopRawHandler()) discard nodeA.subscribe(networkB, noopRawHandler())
## Then ## Then
check: check:
@ -73,9 +73,9 @@ suite "Waku Relay":
networkB = "test-network2" networkB = "test-network2"
networkC = "test-network3" networkC = "test-network3"
nodeA.subscribe(networkA, noopRawHandler()) discard nodeA.subscribe(networkA, noopRawHandler())
nodeA.subscribe(networkB, noopRawHandler()) discard nodeA.subscribe(networkB, noopRawHandler())
nodeA.subscribe(networkC, noopRawHandler()) discard nodeA.subscribe(networkC, noopRawHandler())
let topics = toSeq(nodeA.subscribedTopics) let topics = toSeq(nodeA.subscribedTopics)
require: require:
@ -85,7 +85,7 @@ suite "Waku Relay":
topics.contains(networkC) topics.contains(networkC)
## When ## When
nodeA.unsubscribe(networkA) nodeA.unsubscribeAll(networkA)
## Then ## Then
check: check:
@ -129,14 +129,14 @@ suite "Waku Relay":
proc srcSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = proc srcSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
srcSubsFut.complete((topic, message)) srcSubsFut.complete((topic, message))
srcNode.subscribe(networkTopic, srcSubsHandler) discard srcNode.subscribe(networkTopic, srcSubsHandler)
# Subscription # Subscription
let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]() let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
dstSubsFut.complete((topic, message)) dstSubsFut.complete((topic, message))
dstNode.subscribe(networkTopic, dstSubsHandler) discard dstNode.subscribe(networkTopic, dstSubsHandler)
await sleepAsync(500.millis) await sleepAsync(500.millis)
@ -196,7 +196,7 @@ suite "Waku Relay":
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
dstSubsFut.complete((topic, message)) dstSubsFut.complete((topic, message))
dstNode.subscribe(networkTopic, dstSubsHandler) discard dstNode.subscribe(networkTopic, dstSubsHandler)
await sleepAsync(500.millis) await sleepAsync(500.millis)

View File

@ -92,10 +92,10 @@ suite "WakuNode - Relay":
msg.payload == payload msg.payload == payload
completionFut.complete(true) completionFut.complete(true)
node3.subscribe(pubSubTopic, relayHandler) node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis) await sleepAsync(500.millis)
await node1.publish(pubSubTopic, message) await node1.publish(some(pubSubTopic), message)
## Then ## Then
check: check:
@ -173,14 +173,14 @@ suite "WakuNode - Relay":
# relay handler is called # relay handler is called
completionFut.complete(true) completionFut.complete(true)
node3.subscribe(pubSubTopic, relayHandler) node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis) await sleepAsync(500.millis)
await node1.publish(pubSubTopic, message1) await node1.publish(some(pubSubTopic), message1)
await sleepAsync(500.millis) await sleepAsync(500.millis)
# message2 never gets relayed because of the validator # message2 never gets relayed because of the validator
await node1.publish(pubSubTopic, message2) await node1.publish(some(pubSubTopic), message2)
await sleepAsync(500.millis) await sleepAsync(500.millis)
check: check:
@ -207,7 +207,7 @@ suite "WakuNode - Relay":
connOk == true connOk == true
# Node 1 subscribes to topic # Node 1 subscribes to topic
nodes[1].subscribe(DefaultPubsubTopic) nodes[1].subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
await sleepAsync(500.millis) await sleepAsync(500.millis)
# Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes) # Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes)
@ -254,10 +254,10 @@ suite "WakuNode - Relay":
msg.payload == payload msg.payload == payload
completionFut.complete(true) completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler) node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis) await sleepAsync(500.millis)
await node2.publish(pubSubTopic, message) await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis) await sleepAsync(500.millis)
@ -295,10 +295,10 @@ suite "WakuNode - Relay":
msg.payload == payload msg.payload == payload
completionFut.complete(true) completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler) node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis) await sleepAsync(500.millis)
await node2.publish(pubSubTopic, message) await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis) await sleepAsync(500.millis)
@ -340,10 +340,10 @@ suite "WakuNode - Relay":
msg.payload == payload msg.payload == payload
completionFut.complete(true) completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler) node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis) await sleepAsync(500.millis)
await node2.publish(pubSubTopic, message) await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis) await sleepAsync(500.millis)
check: check:
@ -380,10 +380,10 @@ suite "WakuNode - Relay":
msg.payload == payload msg.payload == payload
completionFut.complete(true) completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler) node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis) await sleepAsync(500.millis)
await node2.publish(pubSubTopic, message) await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis) await sleepAsync(500.millis)
check: check:
@ -420,10 +420,10 @@ suite "WakuNode - Relay":
msg.payload == payload msg.payload == payload
completionFut.complete(true) completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler) node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis) await sleepAsync(500.millis)
await node2.publish(pubSubTopic, message) await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis) await sleepAsync(500.millis)
@ -440,7 +440,7 @@ suite "WakuNode - Relay":
# subscribe all nodes to a topic # subscribe all nodes to a topic
let topic = "topic" let topic = "topic"
for node in nodes: node.wakuRelay.subscribe(topic, nil) for node in nodes: discard node.wakuRelay.subscribe(topic, nil)
await sleepAsync(500.millis) await sleepAsync(500.millis)
# connect nodes in full mesh # connect nodes in full mesh
@ -482,3 +482,48 @@ suite "WakuNode - Relay":
# Stop all nodes # Stop all nodes
await allFutures(nodes.mapIt(it.stop())) await allFutures(nodes.mapIt(it.stop()))
asyncTest "Unsubscribe keep the subscription if other content topics also use the shard":
## Setup
let
nodeKey = generateSecp256k1Key()
node = newTestWakuNode(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await node.start()
await node.mountRelay()
## Given
let
shard = "/waku/2/rs/1/1"
contentTopicA = DefaultContentTopic
contentTopicB = ContentTopic("/waku/2/default-content1/proto")
contentTopicC = ContentTopic("/waku/2/default-content2/proto")
handler: WakuRelayHandler =
proc(
pubsubTopic: PubsubTopic,
message: WakuMessage
): Future[void] {.gcsafe, raises: [Defect].} =
discard pubsubTopic
discard message
assert shard == getShard(contentTopicA).expect("Valid Topic"), "topic must use the same shard"
assert shard == getShard(contentTopicB).expect("Valid Topic"), "topic must use the same shard"
assert shard == getShard(contentTopicC).expect("Valid Topic"), "topic must use the same shard"
## When
node.subscribe((kind: ContentSub, topic: contentTopicA), some(handler))
node.subscribe((kind: ContentSub, topic: contentTopicB), some(handler))
node.subscribe((kind: ContentSub, topic: contentTopicC), some(handler))
## Then
node.unsubscribe((kind: ContentUnsub, topic: contentTopicB))
check node.wakuRelay.isSubscribed(shard)
node.unsubscribe((kind: ContentUnsub, topic: contentTopicA))
check node.wakuRelay.isSubscribed(shard)
node.unsubscribe((kind: ContentUnsub, topic: contentTopicC))
check not node.wakuRelay.isSubscribed(shard)
## Cleanup
await node.stop()

View File

@ -77,7 +77,7 @@ procSuite "WakuNode - RLN relay":
completionFut.complete(true) completionFut.complete(true)
# mount the relay handler # mount the relay handler
node3.subscribe(DefaultPubsubTopic, relayHandler) node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
# prepare the message payload # prepare the message payload
@ -91,7 +91,7 @@ procSuite "WakuNode - RLN relay":
## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn ## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn
## verifies the rate limit proof of the message and relays the message to node3 ## verifies the rate limit proof of the message and relays the message to node3
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc ## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
await node1.publish(DefaultPubsubTopic, message) await node1.publish(some(DefaultPubsubTopic), message)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
@ -141,8 +141,8 @@ procSuite "WakuNode - RLN relay":
rxMessagesTopic2 = rxMessagesTopic2 + 1 rxMessagesTopic2 = rxMessagesTopic2 + 1
# mount the relay handlers # mount the relay handlers
nodes[2].subscribe(pubsubTopics[0], relayHandler) nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[0]), some(relayHandler))
nodes[2].subscribe(pubsubTopics[1], relayHandler) nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[1]), some(relayHandler))
await sleepAsync(1000.millis) await sleepAsync(1000.millis)
# generate some messages with rln proofs first. generating # generate some messages with rln proofs first. generating
@ -165,8 +165,8 @@ procSuite "WakuNode - RLN relay":
# publish 3 messages from node[0] (last 2 are spam, window is 10 secs) # publish 3 messages from node[0] (last 2 are spam, window is 10 secs)
# publish 3 messages from node[1] (last 2 are spam, window is 10 secs) # publish 3 messages from node[1] (last 2 are spam, window is 10 secs)
for msg in messages1: await nodes[0].publish(pubsubTopics[0], msg) for msg in messages1: await nodes[0].publish(some(pubsubTopics[0]), msg)
for msg in messages2: await nodes[1].publish(pubsubTopics[1], msg) for msg in messages2: await nodes[1].publish(some(pubsubTopics[1]), msg)
# wait for gossip to propagate # wait for gossip to propagate
await sleepAsync(5000.millis) await sleepAsync(5000.millis)
@ -237,7 +237,7 @@ procSuite "WakuNode - RLN relay":
completionFut.complete(true) completionFut.complete(true)
# mount the relay handler # mount the relay handler
node3.subscribe(DefaultPubsubTopic, relayHandler) node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
# prepare the message payload # prepare the message payload
@ -266,7 +266,7 @@ procSuite "WakuNode - RLN relay":
## attempts to verify the rate limit proof and fails hence does not relay the message to node3, thus the relayHandler of node3 ## attempts to verify the rate limit proof and fails hence does not relay the message to node3, thus the relayHandler of node3
## never gets called ## never gets called
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc ## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
await node1.publish(DefaultPubsubTopic, message) await node1.publish(some(DefaultPubsubTopic), message)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
check: check:
@ -369,7 +369,7 @@ procSuite "WakuNode - RLN relay":
# mount the relay handler for node3 # mount the relay handler for node3
node3.subscribe(DefaultPubsubTopic, relayHandler) node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
## node1 publishes and relays 4 messages to node2 ## node1 publishes and relays 4 messages to node2
@ -378,10 +378,10 @@ procSuite "WakuNode - RLN relay":
## node2 should detect either of wm1 or wm2 as spam and not relay it ## node2 should detect either of wm1 or wm2 as spam and not relay it
## node2 should relay wm3 to node3 ## node2 should relay wm3 to node3
## node2 should not relay wm4 because it has no valid rln proof ## node2 should not relay wm4 because it has no valid rln proof
await node1.publish(DefaultPubsubTopic, wm1) await node1.publish(some(DefaultPubsubTopic), wm1)
await node1.publish(DefaultPubsubTopic, wm2) await node1.publish(some(DefaultPubsubTopic), wm2)
await node1.publish(DefaultPubsubTopic, wm3) await node1.publish(some(DefaultPubsubTopic), wm3)
await node1.publish(DefaultPubsubTopic, wm4) await node1.publish(some(DefaultPubsubTopic), wm4)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
let let
@ -471,14 +471,14 @@ procSuite "WakuNode - RLN relay":
completionFut3.complete(true) completionFut3.complete(true)
# mount the relay handler for node2 # mount the relay handler for node2
node2.subscribe(DefaultPubsubTopic, relayHandler) node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
await node1.publish(DefaultPubsubTopic, wm1) await node1.publish(some(DefaultPubsubTopic), wm1)
await sleepAsync(10.seconds) await sleepAsync(10.seconds)
await node1.publish(DefaultPubsubTopic, wm2) await node1.publish(some(DefaultPubsubTopic), wm2)
await sleepAsync(10.seconds) await sleepAsync(10.seconds)
await node1.publish(DefaultPubsubTopic, wm3) await node1.publish(some(DefaultPubsubTopic), wm3)
let let
res1 = await completionFut1.withTimeout(10.seconds) res1 = await completionFut1.withTimeout(10.seconds)

View File

@ -61,7 +61,7 @@ suite "WakuNode2 - Validators":
msgReceived += 1 msgReceived += 1
# Subscribe all nodes to the same topic/handler # Subscribe all nodes to the same topic/handler
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler) for node in nodes: discard node.wakuRelay.subscribe(spamProtectedTopic, handler)
await sleepAsync(500.millis) await sleepAsync(500.millis)
# Each node publishes 10 signed messages # Each node publishes 10 signed messages
@ -74,7 +74,7 @@ suite "WakuNode2 - Validators":
# Include signature # Include signature
msg.meta = secretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63] msg.meta = secretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
await nodes[i].publish(spamProtectedTopic, msg) await nodes[i].publish(some(spamProtectedTopic), msg)
# Wait for gossip # Wait for gossip
await sleepAsync(2.seconds) await sleepAsync(2.seconds)
@ -133,7 +133,7 @@ suite "WakuNode2 - Validators":
await sleepAsync(500.millis) await sleepAsync(500.millis)
# Subscribe all nodes to the same topic/handler # Subscribe all nodes to the same topic/handler
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler) for node in nodes: discard node.wakuRelay.subscribe(spamProtectedTopic, handler)
await sleepAsync(500.millis) await sleepAsync(500.millis)
# Each node sends 5 messages, signed but with a non-whitelisted key (total = 25) # Each node sends 5 messages, signed but with a non-whitelisted key (total = 25)
@ -146,7 +146,7 @@ suite "WakuNode2 - Validators":
# Sign the message with a wrong key # Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63] msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
await nodes[i].publish(spamProtectedTopic, msg) await nodes[i].publish(some(spamProtectedTopic), msg)
# Each node sends 5 messages that are not signed (total = 25) # Each node sends 5 messages that are not signed (total = 25)
for i in 0..<5: for i in 0..<5:
@ -154,7 +154,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage( let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic, payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true) version: 2, timestamp: now(), ephemeral: true)
await nodes[i].publish(spamProtectedTopic, unsignedMessage) await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Each node sends 5 messages that dont contain timestamp (total = 25) # Each node sends 5 messages that dont contain timestamp (total = 25)
for i in 0..<5: for i in 0..<5:
@ -162,7 +162,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage( let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic, payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: 0, ephemeral: true) version: 2, timestamp: 0, ephemeral: true)
await nodes[i].publish(spamProtectedTopic, unsignedMessage) await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Each node sends 5 messages way BEFORE than the current timestmap (total = 25) # Each node sends 5 messages way BEFORE than the current timestmap (total = 25)
for i in 0..<5: for i in 0..<5:
@ -171,7 +171,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage( let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic, payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: beforeTimestamp, ephemeral: true) version: 2, timestamp: beforeTimestamp, ephemeral: true)
await nodes[i].publish(spamProtectedTopic, unsignedMessage) await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Each node sends 5 messages way LATER than the current timestmap (total = 25) # Each node sends 5 messages way LATER than the current timestmap (total = 25)
for i in 0..<5: for i in 0..<5:
@ -180,7 +180,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage( let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic, payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: afterTimestamp, ephemeral: true) version: 2, timestamp: afterTimestamp, ephemeral: true)
await nodes[i].publish(spamProtectedTopic, unsignedMessage) await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Wait for gossip # Wait for gossip
await sleepAsync(4.seconds) await sleepAsync(4.seconds)
@ -227,7 +227,7 @@ suite "WakuNode2 - Validators":
msgReceived += 1 msgReceived += 1
# Subscribe all nodes to the same topic/handler # Subscribe all nodes to the same topic/handler
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler) for node in nodes: discard node.wakuRelay.subscribe(spamProtectedTopic, handler)
await sleepAsync(500.millis) await sleepAsync(500.millis)
# Add signed message validator to all nodes. They will only route signed messages # Add signed message validator to all nodes. They will only route signed messages
@ -255,7 +255,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage( let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic, payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true) version: 2, timestamp: now(), ephemeral: true)
await nodes[0].publish(spamProtectedTopic, unsignedMessage) await nodes[0].publish(some(spamProtectedTopic), unsignedMessage)
# nodes[0] spams 50 wrongly signed messages (nodes[0] just knows of nodes[1]) # nodes[0] spams 50 wrongly signed messages (nodes[0] just knows of nodes[1])
for j in 0..<50: for j in 0..<50:
@ -264,7 +264,7 @@ suite "WakuNode2 - Validators":
version: 2, timestamp: now(), ephemeral: true) version: 2, timestamp: now(), ephemeral: true)
# Sign the message with a wrong key # Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63] msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
await nodes[0].publish(spamProtectedTopic, msg) await nodes[0].publish(some(spamProtectedTopic), msg)
# Wait for gossip # Wait for gossip
await sleepAsync(2.seconds) await sleepAsync(2.seconds)

View File

@ -22,10 +22,6 @@ import
../testlib/wakucore, ../testlib/wakucore,
../testlib/wakunode ../testlib/wakunode
proc newTestMessageCache(): relay_api.MessageCache =
relay_api.MessageCache.init(capacity=30)
suite "Waku v2 JSON-RPC API - Relay": suite "Waku v2 JSON-RPC API - Relay":
asyncTest "subscribe and unsubscribe from topics": asyncTest "subscribe and unsubscribe from topics":
@ -33,7 +29,7 @@ suite "Waku v2 JSON-RPC API - Relay":
let node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) let node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
await node.start() await node.start()
await node.mountRelay(topics = @[DefaultPubsubTopic]) await node.mountRelay(@[])
# JSON-RPC server # JSON-RPC server
let let
@ -41,7 +37,8 @@ suite "Waku v2 JSON-RPC API - Relay":
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort) ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
server = newRpcHttpServer([ta]) server = newRpcHttpServer([ta])
installRelayApiHandlers(node, server, newTestMessageCache()) let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(node, server, cache)
server.start() server.start()
# JSON-RPC client # JSON-RPC client
@ -67,16 +64,14 @@ suite "Waku v2 JSON-RPC API - Relay":
subResp == true subResp == true
check: check:
# Node is now subscribed to default + new topics # Node is now subscribed to default + new topics
subTopics.len == 1 + newTopics.len subTopics.len == newTopics.len
DefaultPubsubTopic in subTopics
newTopics.allIt(it in subTopics) newTopics.allIt(it in subTopics)
check: check:
unsubResp == true unsubResp == true
check: check:
# Node is now unsubscribed from new topics # Node is now unsubscribed from new topics
unsubTopics.len == 1 unsubTopics.len == 0
DefaultPubsubTopic in unsubTopics
newTopics.allIt(it notin unsubTopics) newTopics.allIt(it notin unsubTopics)
await server.stop() await server.stop()
@ -110,14 +105,14 @@ suite "Waku v2 JSON-RPC API - Relay":
await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()]) await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()])
# RPC server (source node) # RPC server (source node)
let let
rpcPort = Port(8548) rpcPort = Port(8548)
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort) ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
server = newRpcHttpServer([ta]) server = newRpcHttpServer([ta])
installRelayApiHandlers(srcNode, server, newTestMessageCache()) let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(srcNode, server, cache)
server.start() server.start()
# JSON-RPC client # JSON-RPC client
@ -131,7 +126,7 @@ suite "Waku v2 JSON-RPC API - Relay":
proc dstHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = proc dstHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
dstHandlerFut.complete((topic, msg)) dstHandlerFut.complete((topic, msg))
dstNode.subscribe(pubSubTopic, dstHandler) dstNode.subscribe((kind: PubsubSub, topic: pubsubTopic), some(dstHandler))
## When ## When
let rpcMessage = WakuMessageRPC( let rpcMessage = WakuMessageRPC(
@ -162,7 +157,7 @@ suite "Waku v2 JSON-RPC API - Relay":
await server.closeWait() await server.closeWait()
await allFutures(srcNode.stop(), dstNode.stop()) await allFutures(srcNode.stop(), dstNode.stop())
asyncTest "get latest messages received from topics cache": asyncTest "get latest messages received from pubsub topics cache":
## Setup ## Setup
let let
pubSubTopic = "test-jsonrpc-pubsub-topic" pubSubTopic = "test-jsonrpc-pubsub-topic"
@ -176,24 +171,26 @@ suite "Waku v2 JSON-RPC API - Relay":
await allFutures(srcNode.start(), dstNode.start()) await allFutures(srcNode.start(), dstNode.start())
await srcNode.mountRelay(@[pubSubTopic]) await srcNode.mountRelay(@[pubSubTopic])
await dstNode.mountRelay(@[pubSubTopic]) await dstNode.mountRelay(@[])
await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()]) await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()])
# RPC server (destination node) # RPC server (destination node)
let let
rpcPort = Port(8549) rpcPort = Port(8549)
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort) ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
server = newRpcHttpServer([ta]) server = newRpcHttpServer([ta])
installRelayApiHandlers(dstNode, server, newTestMessageCache()) let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(dstNode, server, cache)
server.start() server.start()
# JSON-RPC client # JSON-RPC client
let client = newRpcHttpClient() let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false) await client.connect("127.0.0.1", rpcPort, false)
discard await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
## Given ## Given
let messages = @[ let messages = @[
fakeWakuMessage(payload= @[byte 70], contentTopic=contentTopic), fakeWakuMessage(payload= @[byte 70], contentTopic=contentTopic),
@ -204,7 +201,7 @@ suite "Waku v2 JSON-RPC API - Relay":
## When ## When
for msg in messages: for msg in messages:
await srcNode.publish(pubSubTopic, msg) await srcNode.publish(some(pubSubTopic), msg)
await sleepAsync(200.millis) await sleepAsync(200.millis)
@ -222,3 +219,66 @@ suite "Waku v2 JSON-RPC API - Relay":
await server.stop() await server.stop()
await server.closeWait() await server.closeWait()
await allFutures(srcNode.stop(), dstNode.stop()) await allFutures(srcNode.stop(), dstNode.stop())
asyncTest "get latest messages received from content topics cache":
## Setup
let contentTopic = DefaultContentTopic
# Relay nodes setup
let
srcNode = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
dstNode = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
await allFutures(srcNode.start(), dstNode.start())
let shard = getShard(contentTopic).expect("Valid Shard")
await srcNode.mountRelay(@[shard])
await dstNode.mountRelay(@[])
await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()])
# RPC server (destination node)
let
rpcPort = Port(8550)
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
server = newRpcHttpServer([ta])
let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(dstNode, server, cache)
server.start()
# JSON-RPC client
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
discard await client.post_waku_v2_relay_v1_auto_subscriptions(@[contentTopic])
## Given
let messages = @[
fakeWakuMessage(payload= @[byte 70], contentTopic=contentTopic),
fakeWakuMessage(payload= @[byte 71], contentTopic=contentTopic),
fakeWakuMessage(payload= @[byte 72], contentTopic=contentTopic),
fakeWakuMessage(payload= @[byte 73], contentTopic=contentTopic)
]
## When
for msg in messages:
await srcNode.publish(none(PubsubTopic), msg)
await sleepAsync(200.millis)
let dstMessages = await client.get_waku_v2_relay_v1_auto_messages(contentTopic)
## Then
check:
dstMessages.len == 4
dstMessages[2].payload == base64.encode(messages[2].payload)
dstMessages[2].contentTopic.get() == messages[2].contentTopic
dstMessages[2].timestamp.get() == messages[2].timestamp
dstMessages[2].version.get() == messages[2].version
## Cleanup
await server.stop()
await server.closeWait()
await allFutures(srcNode.stop(), dstNode.stop())

View File

@ -23,7 +23,6 @@ import
../../waku/waku_relay, ../../waku/waku_relay,
../../waku/waku_filter_v2/subscriptions, ../../waku/waku_filter_v2/subscriptions,
../../waku/waku_filter_v2/common, ../../waku/waku_filter_v2/common,
../../waku/waku_api/rest/relay/topic_cache,
../../waku/waku_api/rest/relay/handlers as relay_api, ../../waku/waku_api/rest/relay/handlers as relay_api,
../../waku/waku_api/rest/relay/client as relay_api_client, ../../waku/waku_api/rest/relay/client as relay_api_client,
../testlib/wakucore, ../testlib/wakucore,
@ -74,7 +73,7 @@ proc init(T: type RestFilterTest): Future[T] {.async.} =
testSetup.messageCache = filter_api.MessageCache.init() testSetup.messageCache = filter_api.MessageCache.init()
installFilterRestApiHandlers(testSetup.restServer.router, testSetup.subscriberNode, testSetup.messageCache) installFilterRestApiHandlers(testSetup.restServer.router, testSetup.subscriberNode, testSetup.messageCache)
let topicCache = TopicCache.init() let topicCache = MessageCache[string].init()
installRelayApiHandlers(testSetup.restServerForService.router, testSetup.serviceNode, topicCache) installRelayApiHandlers(testSetup.restServerForService.router, testSetup.serviceNode, topicCache)
testSetup.restServer.start() testSetup.restServer.start()
@ -244,7 +243,7 @@ suite "Waku v2 Rest API - Filter V2":
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.messageCache.subscribe(DefaultPubsubTopic) restFilterTest.messageCache.subscribe(DefaultPubsubTopic)
restFilterTest.serviceNode.subscribe(DefaultPubsubTopic) restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
# When # When
var requestBody = FilterSubscribeRequest(requestId: "1234", var requestBody = FilterSubscribeRequest(requestId: "1234",

View File

@ -94,8 +94,8 @@ suite "Waku v2 Rest API - lightpush":
# Given # Given
let restLightPushTest = await RestLightPushTest.init() let restLightPushTest = await RestLightPushTest.init()
restLightPushTest.consumerNode.subscribe(DefaultPubsubTopic) restLightPushTest.consumerNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
restLightPushTest.serviceNode.subscribe(DefaultPubsubTopic) restLightPushTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
require: require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1
@ -120,7 +120,7 @@ suite "Waku v2 Rest API - lightpush":
# Given # Given
let restLightPushTest = await RestLightPushTest.init() let restLightPushTest = await RestLightPushTest.init()
restLightPushTest.serviceNode.subscribe(DefaultPubsubTopic) restLightPushTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
require: require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1

View File

@ -11,13 +11,13 @@ import
../../waku/common/base64, ../../waku/common/base64,
../../waku/waku_core, ../../waku/waku_core,
../../waku/waku_node, ../../waku/waku_node,
../../waku/waku_api/message_cache,
../../waku/waku_api/rest/server, ../../waku/waku_api/rest/server,
../../waku/waku_api/rest/client, ../../waku/waku_api/rest/client,
../../waku/waku_api/rest/responses, ../../waku/waku_api/rest/responses,
../../waku/waku_api/rest/relay/types, ../../waku/waku_api/rest/relay/types,
../../waku/waku_api/rest/relay/handlers as relay_api, ../../waku/waku_api/rest/relay/handlers as relay_api,
../../waku/waku_api/rest/relay/client as relay_api_client, ../../waku/waku_api/rest/relay/client as relay_api_client,
../../waku/waku_api/rest/relay/topic_cache,
../../waku/waku_relay, ../../waku/waku_relay,
../../../waku/waku_rln_relay, ../../../waku/waku_rln_relay,
../testlib/wakucore, ../testlib/wakucore,
@ -34,7 +34,7 @@ proc testWakuNode(): WakuNode =
suite "Waku v2 Rest API - Relay": suite "Waku v2 Rest API - Relay":
asyncTest "Subscribe a node to an array of topics - POST /relay/v1/subscriptions": asyncTest "Subscribe a node to an array of pubsub topics - POST /relay/v1/subscriptions":
# Given # Given
let node = testWakuNode() let node = testWakuNode()
await node.start() await node.start()
@ -44,9 +44,9 @@ suite "Waku v2 Rest API - Relay":
let restAddress = ValidIpAddress.init("0.0.0.0") let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet() let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let topicCache = TopicCache.init() let cache = MessageCache[string].init()
installRelayPostSubscriptionsV1Handler(restServer.router, node, topicCache) installRelayApiHandlers(restServer.router, node, cache)
restServer.start() restServer.start()
let pubSubTopics = @[ let pubSubTopics = @[
@ -67,35 +67,39 @@ suite "Waku v2 Rest API - Relay":
response.data == "OK" response.data == "OK"
check: check:
topicCache.isSubscribed("pubsub-topic-1") cache.isSubscribed("pubsub-topic-1")
topicCache.isSubscribed("pubsub-topic-2") cache.isSubscribed("pubsub-topic-2")
topicCache.isSubscribed("pubsub-topic-3") cache.isSubscribed("pubsub-topic-3")
check: check:
# Node should be subscribed to default + new topics
toSeq(node.wakuRelay.subscribedTopics).len == pubSubTopics.len toSeq(node.wakuRelay.subscribedTopics).len == pubSubTopics.len
await restServer.stop() await restServer.stop()
await restServer.closeWait() await restServer.closeWait()
await node.stop() await node.stop()
asyncTest "Unsubscribe a node from an array of topics - DELETE /relay/v1/subscriptions": asyncTest "Unsubscribe a node from an array of pubsub topics - DELETE /relay/v1/subscriptions":
# Given # Given
let node = testWakuNode() let node = testWakuNode()
await node.start() await node.start()
await node.mountRelay() await node.mountRelay(@[
"pubsub-topic-1",
"pubsub-topic-2",
"pubsub-topic-3",
"pubsub-topic-x",
])
let restPort = Port(58012) let restPort = Port(58012)
let restAddress = ValidIpAddress.init("0.0.0.0") let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet() let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let topicCache = TopicCache.init() let cache = MessageCache[string].init()
topicCache.subscribe("pubsub-topic-1") cache.subscribe("pubsub-topic-1")
topicCache.subscribe("pubsub-topic-2") cache.subscribe("pubsub-topic-2")
topicCache.subscribe("pubsub-topic-3") cache.subscribe("pubsub-topic-3")
topicCache.subscribe("pubsub-topic-x") cache.subscribe("pubsub-topic-x")
installRelayDeleteSubscriptionsV1Handler(restServer.router, node, topicCache) installRelayApiHandlers(restServer.router, node, cache)
restServer.start() restServer.start()
let pubSubTopics = @[ let pubSubTopics = @[
@ -117,17 +121,22 @@ suite "Waku v2 Rest API - Relay":
response.data == "OK" response.data == "OK"
check: check:
not topicCache.isSubscribed("pubsub-topic-1") not cache.isSubscribed("pubsub-topic-1")
not topicCache.isSubscribed("pubsub-topic-2") not node.wakuRelay.isSubscribed("pubsub-topic-1")
not topicCache.isSubscribed("pubsub-topic-3") not cache.isSubscribed("pubsub-topic-2")
topicCache.isSubscribed("pubsub-topic-x") not node.wakuRelay.isSubscribed("pubsub-topic-2")
not cache.isSubscribed("pubsub-topic-3")
not node.wakuRelay.isSubscribed("pubsub-topic-3")
cache.isSubscribed("pubsub-topic-x")
node.wakuRelay.isSubscribed("pubsub-topic-x")
not cache.isSubscribed("pubsub-topic-y")
not node.wakuRelay.isSubscribed("pubsub-topic-y")
await restServer.stop() await restServer.stop()
await restServer.closeWait() await restServer.closeWait()
await node.stop() await node.stop()
asyncTest "Get the latest messages for a pubsub topic - GET /relay/v1/messages/{topic}":
asyncTest "Get the latest messages for topic - GET /relay/v1/messages/{topic}":
# Given # Given
let node = testWakuNode() let node = testWakuNode()
await node.start() await node.start()
@ -144,13 +153,13 @@ suite "Waku v2 Rest API - Relay":
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
] ]
let topicCache = TopicCache.init() let cache = MessageCache[string].init()
topicCache.subscribe(pubSubTopic) cache.subscribe(pubSubTopic)
for msg in messages: for msg in messages:
topicCache.addMessage(pubSubTopic, msg) cache.addMessage(pubSubTopic, msg)
installRelayGetMessagesV1Handler(restServer.router, node, topicCache) installRelayApiHandlers(restServer.router, node, cache)
restServer.start() restServer.start()
# When # When
@ -164,20 +173,20 @@ suite "Waku v2 Rest API - Relay":
response.data.len == 3 response.data.len == 3
response.data.all do (msg: RelayWakuMessage) -> bool: response.data.all do (msg: RelayWakuMessage) -> bool:
msg.payload == base64.encode("TEST-1") and msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get().string == "content-topic-x" and msg.contentTopic.get() == "content-topic-x" and
msg.version.get() == 2 and msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0) msg.timestamp.get() != Timestamp(0)
check: check:
topicCache.isSubscribed(pubSubTopic) cache.isSubscribed(pubSubTopic)
topicCache.getMessages(pubSubTopic).tryGet().len == 0 cache.getMessages(pubSubTopic).tryGet().len == 0
await restServer.stop() await restServer.stop()
await restServer.closeWait() await restServer.closeWait()
await node.stop() await node.stop()
asyncTest "Post a message to topic - POST /relay/v1/messages/{topic}": asyncTest "Post a message to a pubsub topic - POST /relay/v1/messages/{topic}":
## "Relay API: publish and subscribe/unsubscribe": ## "Relay API: publish and subscribe/unsubscribe":
# Given # Given
let node = testWakuNode() let node = testWakuNode()
@ -192,26 +201,18 @@ suite "Waku v2 Rest API - Relay":
let restAddress = ValidIpAddress.init("0.0.0.0") let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet() let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let topicCache = TopicCache.init() let cache = MessageCache[string].init()
installRelayApiHandlers(restServer.router, node, topicCache) installRelayApiHandlers(restServer.router, node, cache)
restServer.start() restServer.start()
let client = newRestHttpClient(initTAddress(restAddress, restPort)) let client = newRestHttpClient(initTAddress(restAddress, restPort))
node.subscribe(DefaultPubsubTopic) node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
require: require:
toSeq(node.wakuRelay.subscribedTopics).len == 1 toSeq(node.wakuRelay.subscribedTopics).len == 1
# When # When
let newTopics = @[
PubSubTopic("pubsub-topic-1"),
PubSubTopic("pubsub-topic-2"),
PubSubTopic("pubsub-topic-3")
]
discard await client.relayPostSubscriptionsV1(newTopics)
let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage( let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage(
payload: base64.encode("TEST-PAYLOAD"), payload: base64.encode("TEST-PAYLOAD"),
contentTopic: some(DefaultContentTopic), contentTopic: some(DefaultContentTopic),
@ -224,8 +225,193 @@ suite "Waku v2 Rest API - Relay":
$response.contentType == $MIMETYPE_TEXT $response.contentType == $MIMETYPE_TEXT
response.data == "OK" response.data == "OK"
# TODO: Check for the message to be published to the topic await restServer.stop()
await restServer.closeWait()
await node.stop()
# Autosharding API
asyncTest "Subscribe a node to an array of content topics - POST /relay/v1/auto/subscriptions":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
let restPort = Port(58011)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let cache = MessageCache[string].init()
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
let contentTopics = @[
ContentTopic("/waku/2/default-content1/proto"),
ContentTopic("/waku/2/default-content2/proto"),
ContentTopic("/waku/2/default-content3/proto")
]
let shards = contentTopics.mapIt(getShard(it).expect("Valid Shard")).deduplicate()
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let requestBody = RelayPostSubscriptionsRequest(contentTopics)
let response = await client.relayPostAutoSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
cache.isSubscribed(contentTopics[0])
cache.isSubscribed(contentTopics[1])
cache.isSubscribed(contentTopics[2])
check:
# Node should be subscribed to all shards
toSeq(node.wakuRelay.subscribedTopics).len == shards.len
await restServer.stop() await restServer.stop()
await restServer.closeWait() await restServer.closeWait()
await node.stop() await node.stop()
asyncTest "Unsubscribe a node from an array of content topics - DELETE /relay/v1/auto/subscriptions":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
let restPort = Port(58012)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let contentTopics = @[
ContentTopic("/waku/2/default-content1/proto"),
ContentTopic("/waku/2/default-content2/proto"),
ContentTopic("/waku/2/default-content3/proto"),
ContentTopic("/waku/2/default-contentX/proto")
]
let cache = MessageCache[string].init()
cache.subscribe(contentTopics[0])
cache.subscribe(contentTopics[1])
cache.subscribe(contentTopics[2])
cache.subscribe("/waku/2/default-contentY/proto")
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let requestBody = RelayDeleteSubscriptionsRequest(contentTopics)
let response = await client.relayDeleteAutoSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
not cache.isSubscribed(contentTopics[1])
not cache.isSubscribed(contentTopics[2])
not cache.isSubscribed(contentTopics[3])
cache.isSubscribed("/waku/2/default-contentY/proto")
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Get the latest messages for a content topic - GET /relay/v1/auto/messages/{topic}":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
let restPort = Port(58013)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let contentTopic = DefaultContentTopic
let messages = @[
fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")),
]
let cache = MessageCache[string].init()
cache.subscribe(contentTopic)
for msg in messages:
cache.addMessage(contentTopic, msg)
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let response = await client.relayGetAutoMessagesV1(contentTopic)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.len == 3
response.data.all do (msg: RelayWakuMessage) -> bool:
msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get() == DefaultContentTopic and
msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0)
check:
cache.isSubscribed(contentTopic)
cache.getMessages(contentTopic).tryGet().len == 0 # The cache is cleared when getMessage is called
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Post a message to a content topic - POST /relay/v1/auto/messages/{topic}":
## "Relay API: publish and subscribe/unsubscribe":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
await node.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_1")))
# RPC server setup
let restPort = Port(58014)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let cache = MessageCache[string].init()
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
node.subscribe((kind: ContentSub, topic: DefaultContentTopic))
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1
# When
let response = await client.relayPostAutoMessagesV1(DefaultContentTopic, RelayWakuMessage(
payload: base64.encode("TEST-PAYLOAD"),
contentTopic: some(DefaultContentTopic),
timestamp: some(int64(2022))
))
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
await restServer.stop()
await restServer.closeWait()
await node.stop()

View File

@ -25,6 +25,7 @@ import
libp2p/transports/wstransport libp2p/transports/wstransport
import import
../waku_core, ../waku_core,
../waku_core/topics/sharding,
../waku_relay, ../waku_relay,
../waku_archive, ../waku_archive,
../waku_store, ../waku_store,
@ -101,6 +102,7 @@ type
announcedAddresses* : seq[MultiAddress] announcedAddresses* : seq[MultiAddress]
started*: bool # Indicates that node has started listening started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
contentTopicHandlers: Table[ContentTopic, TopicHandler]
proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService = proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService =
## AutonatService request other peers to dial us back ## AutonatService request other peers to dial us back
@ -220,62 +222,104 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
await filterHandler(topic, msg) await filterHandler(topic, msg)
await archiveHandler(topic, msg) await archiveHandler(topic, msg)
node.wakuRelay.subscribe(topic, defaultHandler) discard node.wakuRelay.subscribe(topic, defaultHandler)
proc subscribe*(node: WakuNode, subscription: SubscriptionEvent, handler = none(WakuRelayHandler)) =
proc subscribe*(node: WakuNode, topic: PubsubTopic) = ## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on
## this topic. WakuRelayHandler is a method that takes a topic and a Waku message.
if node.wakuRelay.isNil(): if node.wakuRelay.isNil():
error "Invalid API call to `subscribe`. WakuRelay not mounted." error "Invalid API call to `subscribe`. WakuRelay not mounted."
return return
debug "subscribe", pubsubTopic= topic let (pubsubTopic, contentTopicOp) =
case subscription.kind:
of ContentSub:
let shard = getShard((subscription.topic)).valueOr:
error "Autosharding error", error=error
return
node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: topic)) (shard, some(subscription.topic))
node.registerRelayDefaultHandler(topic) of PubsubSub: (subscription.topic, none(ContentTopic))
else: return
proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) =
## Subscribes to a PubSub topic. Triggers handler when receiving messages on if contentTopicOp.isSome() and node.contentTopicHandlers.hasKey(contentTopicOp.get()):
## this topic. TopicHandler is a method that takes a topic and some data. error "Invalid API call to `subscribe`. Was already subscribed"
if node.wakuRelay.isNil():
error "Invalid API call to `subscribe`. WakuRelay not mounted."
return return
debug "subscribe", pubsubTopic= topic debug "subscribe", pubsubTopic=pubsubTopic
node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: topic)) node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
node.registerRelayDefaultHandler(topic) node.registerRelayDefaultHandler(pubsubTopic)
node.wakuRelay.subscribe(topic, handler)
proc unsubscribe*(node: WakuNode, topic: PubsubTopic) = if handler.isSome():
## Unsubscribes from a specific PubSub topic. let wrappedHandler = node.wakuRelay.subscribe(pubsubTopic, handler.get())
if contentTopicOp.isSome():
node.contentTopicHandlers[contentTopicOp.get()] = wrappedHandler
proc unsubscribe*(node: WakuNode, subscription: SubscriptionEvent) =
## Unsubscribes from a specific PubSub or Content topic.
if node.wakuRelay.isNil(): if node.wakuRelay.isNil():
error "Invalid API call to `unsubscribe`. WakuRelay not mounted." error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
return return
info "unsubscribe", pubsubTopic=topic let (pubsubTopic, contentTopicOp) =
case subscription.kind:
of ContentUnsub:
let shard = getShard((subscription.topic)).valueOr:
error "Autosharding error", error=error
return
node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: topic)) (shard, some(subscription.topic))
node.wakuRelay.unsubscribe(topic) of PubsubUnsub: (subscription.topic, none(ContentTopic))
else: return
if not node.wakuRelay.isSubscribed(pubsubTopic):
error "Invalid API call to `unsubscribe`. Was not subscribed"
return
proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = if contentTopicOp.isSome():
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a # Remove this handler only
## `contentTopic` field for light node functionality. This field may be also var handler: TopicHandler
## be omitted. if node.contentTopicHandlers.pop(contentTopicOp.get(), handler):
debug "unsubscribe", contentTopic=contentTopicOp.get()
node.wakuRelay.unsubscribe(pubsubTopic, handler)
if contentTopicOp.isNone() or node.wakuRelay.topics.getOrDefault(pubsubTopic).len == 1:
# Remove all handlers
debug "unsubscribe", pubsubTopic=pubsubTopic
node.wakuRelay.unsubscribeAll(pubsubTopic)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
proc publish*(
node: WakuNode,
pubsubTopicOp: Option[PubsubTopic],
message: WakuMessage
) {.async, gcsafe.} =
## Publish a `WakuMessage`. Pubsub topic contains; none, a named or static shard.
## `WakuMessage` should contain a `contentTopic` field for light node functionality.
## It is also used to determine the shard.
if node.wakuRelay.isNil(): if node.wakuRelay.isNil():
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead." error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
# TODO: Improve error handling # TODO: Improve error handling
return return
discard await node.wakuRelay.publish(topic, message) let pubsubTopic = pubsubTopicOp.valueOr:
getShard(message.contentTopic).valueOr:
error "Autosharding error", error=error
return
#TODO instead of discard return error when 0 peers received the message
discard await node.wakuRelay.publish(pubsubTopic, message)
trace "waku.relay published", trace "waku.relay published",
peerId=node.peerId, peerId=node.peerId,
pubsubTopic=topic, pubsubTopic=pubsubTopic,
hash=topic.digest(message).to0xHex(), hash=pubsubTopic.digest(message).to0xHex(),
publishTime=getNowInNanosecondTime() publishTime=getNowInNanosecondTime()
proc startRelay*(node: WakuNode) {.async.} = proc startRelay*(node: WakuNode) {.async.} =
## Setup and start relay protocol ## Setup and start relay protocol
@ -303,7 +347,7 @@ proc startRelay*(node: WakuNode) {.async.} =
info "relay started successfully" info "relay started successfully"
proc mountRelay*(node: WakuNode, proc mountRelay*(node: WakuNode,
topics: seq[string] = @[], pubsubTopics: seq[string] = @[],
peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} = peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} =
if not node.wakuRelay.isNil(): if not node.wakuRelay.isNil():
error "wakuRelay already mounted, skipping" error "wakuRelay already mounted, skipping"
@ -332,8 +376,8 @@ proc mountRelay*(node: WakuNode,
info "relay mounted successfully" info "relay mounted successfully"
# Subscribe to topics # Subscribe to topics
for topic in topics: for pubsubTopic in pubsubTopics:
node.subscribe(topic) node.subscribe((kind: PubsubSub, topic: pubsubTopic))
## Waku filter ## Waku filter

View File

@ -0,0 +1,23 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronos,
chronicles
import
../waku_relay,
../waku_core,
./message_cache
##### Message handler
proc messageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
cache.addMessage(PubSubTopic(pubsubTopic), msg)
proc autoMessageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
if cache.isSubscribed(msg.contentTopic):
cache.addMessage(msg.contentTopic, msg)

View File

@ -4,6 +4,11 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[PubsubTopic]): bool
proc post_waku_v2_relay_v1_message(topic: PubsubTopic, message: WakuMessageRPC): bool proc post_waku_v2_relay_v1_message(topic: PubsubTopic, message: WakuMessageRPC): bool
proc get_waku_v2_relay_v1_messages(topic: PubsubTopic): seq[WakuMessageRPC] proc get_waku_v2_relay_v1_messages(topic: PubsubTopic): seq[WakuMessageRPC]
proc post_waku_v2_relay_v1_auto_subscriptions(topics: seq[ContentTopic]): bool
proc delete_waku_v2_relay_v1_auto_subscriptions(topics: seq[ContentTopic]): bool
proc post_waku_v2_relay_v1_auto_message(message: WakuMessageRPC): bool
proc get_waku_v2_relay_v1_auto_messages(topic: ContentTopic): seq[WakuMessageRPC]
# Support for the Relay Private API has been deprecated. # Support for the Relay Private API has been deprecated.
# This API existed for compatibility with the Waku v1 spec and encryption scheme. # This API existed for compatibility with the Waku v1 spec and encryption scheme.

View File

@ -17,6 +17,7 @@ import
../../../waku_rln_relay/rln/wrappers, ../../../waku_rln_relay/rln/wrappers,
../../../waku_node, ../../../waku_node,
../../message_cache, ../../message_cache,
../../cache_handlers,
../message ../message
from std/times import getTime from std/times import getTime
@ -29,54 +30,51 @@ logScope:
const futTimeout* = 5.seconds # Max time to wait for futures const futTimeout* = 5.seconds # Max time to wait for futures
type
MessageCache* = message_cache.MessageCache[PubsubTopic]
## Waku Relay JSON-RPC API ## Waku Relay JSON-RPC API
proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) = proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache[string]) =
if node.wakuRelay.isNil(): server.rpc("post_waku_v2_relay_v1_subscriptions") do (pubsubTopics: seq[PubsubTopic]) -> bool:
debug "waku relay protocol is nil. skipping json rpc api handlers installation" if pubsubTopics.len == 0:
return raise newException(ValueError, "No pubsub topic provided")
let topicHandler = proc(topic: PubsubTopic, message: WakuMessage) {.async.} =
cache.addMessage(topic, message)
# The node may already be subscribed to some topics when Relay API handlers
# are installed
for topic in node.wakuRelay.subscribedTopics:
node.subscribe(topic, topicHandler)
cache.subscribe(topic)
server.rpc("post_waku_v2_relay_v1_subscriptions") do (topics: seq[PubsubTopic]) -> bool:
## Subscribes a node to a list of PubSub topics ## Subscribes a node to a list of PubSub topics
debug "post_waku_v2_relay_v1_subscriptions" debug "post_waku_v2_relay_v1_subscriptions"
# Subscribe to all requested topics # Subscribe to all requested topics
let newTopics = topics.filterIt(not cache.isSubscribed(it)) let newTopics = pubsubTopics.filterIt(not cache.isSubscribed(it))
for topic in newTopics: for pubsubTopic in newTopics:
cache.subscribe(topic) if pubsubTopic == "":
node.subscribe(topic, topicHandler) raise newException(ValueError, "Empty pubsub topic")
cache.subscribe(pubsubTopic)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache)))
return true return true
server.rpc("delete_waku_v2_relay_v1_subscriptions") do (topics: seq[PubsubTopic]) -> bool: server.rpc("delete_waku_v2_relay_v1_subscriptions") do (pubsubTopics: seq[PubsubTopic]) -> bool:
if pubsubTopics.len == 0:
raise newException(ValueError, "No pubsub topic provided")
## Unsubscribes a node from a list of PubSub topics ## Unsubscribes a node from a list of PubSub topics
debug "delete_waku_v2_relay_v1_subscriptions" debug "delete_waku_v2_relay_v1_subscriptions"
# Unsubscribe all handlers from requested topics # Unsubscribe all handlers from requested topics
let subscribedTopics = topics.filterIt(cache.isSubscribed(it)) let subscribedTopics = pubsubTopics.filterIt(cache.isSubscribed(it))
for topic in subscribedTopics: for pubsubTopic in subscribedTopics:
node.unsubscribe(topic) if pubsubTopic == "":
cache.unsubscribe(topic) raise newException(ValueError, "Empty pubsub topic")
cache.unsubscribe(pubsubTopic)
node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic))
return true return true
server.rpc("post_waku_v2_relay_v1_message") do (pubsubTopic: PubsubTopic, msg: WakuMessageRPC) -> bool: server.rpc("post_waku_v2_relay_v1_message") do (pubsubTopic: PubsubTopic, msg: WakuMessageRPC) -> bool:
if pubsubTopic == "":
raise newException(ValueError, "Empty pubsub topic")
## Publishes a WakuMessage to a PubSub topic ## Publishes a WakuMessage to a PubSub topic
debug "post_waku_v2_relay_v1_message", pubsubTopic=pubsubTopic debug "post_waku_v2_relay_v1_message", pubsubTopic=pubsubTopic
@ -84,10 +82,12 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
if payloadRes.isErr(): if payloadRes.isErr():
raise newException(ValueError, "invalid payload format: " & payloadRes.error) raise newException(ValueError, "invalid payload format: " & payloadRes.error)
if msg.contentTopic.isNone():
raise newException(ValueError, "message has no content topic")
var message = WakuMessage( var message = WakuMessage(
payload: payloadRes.value, payload: payloadRes.value,
# TODO: Fail if the message doesn't have a content topic contentTopic: msg.contentTopic.get(),
contentTopic: msg.contentTopic.get(DefaultContentTopic),
version: msg.version.get(0'u32), version: msg.version.get(0'u32),
timestamp: msg.timestamp.get(Timestamp(0)), timestamp: msg.timestamp.get(Timestamp(0)),
ephemeral: msg.ephemeral.get(false) ephemeral: msg.ephemeral.get(false)
@ -96,7 +96,8 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
# ensure the node is subscribed to the pubsubTopic. otherwise it risks publishing # ensure the node is subscribed to the pubsubTopic. otherwise it risks publishing
# to a topic with no connected peers # to a topic with no connected peers
if pubsubTopic notin node.wakuRelay.subscribedTopics(): if pubsubTopic notin node.wakuRelay.subscribedTopics():
raise newException(ValueError, "Failed to publish: Node not subscribed to pubsubTopic: " & pubsubTopic) raise newException(
ValueError, "Failed to publish: Node not subscribed to pubsubTopic: " & pubsubTopic)
# if RLN is mounted, append the proof to the message # if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil(): if not node.wakuRlnRelay.isNil():
@ -112,34 +113,127 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
elif result == MessageValidationResult.Spam: elif result == MessageValidationResult.Spam:
raise newException(ValueError, "Failed to publish: limit exceeded, try again later") raise newException(ValueError, "Failed to publish: limit exceeded, try again later")
elif result == MessageValidationResult.Valid: elif result == MessageValidationResult.Valid:
debug "RLN proof validated successfully", pubSubTopic=pubSubTopic debug "RLN proof validated successfully", pubSubTopic=pubsubTopic
else: else:
raise newException(ValueError, "Failed to publish: unknown RLN proof validation result") raise newException(ValueError, "Failed to publish: unknown RLN proof validation result")
# if we reach here its either a non-RLN message or a RLN message with a valid proof # if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message", pubSubTopic=pubSubTopic debug "Publishing message", pubSubTopic=pubsubTopic, rln=defined(rln)
let publishFut = node.publish(pubsubTopic, message) let publishFut = node.publish(some(pubsubTopic), message)
if not await publishFut.withTimeout(futTimeout): if not await publishFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to publish: timed out") raise newException(ValueError, "Failed to publish: timed out")
return true return true
server.rpc("get_waku_v2_relay_v1_messages") do (topic: PubsubTopic) -> seq[WakuMessageRPC]: server.rpc("get_waku_v2_relay_v1_messages") do (pubsubTopic: PubsubTopic) -> seq[WakuMessageRPC]:
if pubsubTopic == "":
raise newException(ValueError, "Empty pubsub topic")
## Returns all WakuMessages received on a PubSub topic since the ## Returns all WakuMessages received on a PubSub topic since the
## last time this method was called ## last time this method was called
debug "get_waku_v2_relay_v1_messages", topic=topic debug "get_waku_v2_relay_v1_messages", topic=pubsubTopic
if not cache.isSubscribed(topic): let msgRes = cache.getMessages(pubsubTopic, clear=true)
raise newException(ValueError, "Not subscribed to topic: " & topic)
let msgRes = cache.getMessages(topic, clear=true)
if msgRes.isErr(): if msgRes.isErr():
raise newException(ValueError, "Not subscribed to topic: " & topic) raise newException(ValueError, "Not subscribed to pubsub topic: " & pubsubTopic)
return msgRes.value.map(toWakuMessageRPC) return msgRes.value.map(toWakuMessageRPC)
# Autosharding API
## Waku Relay Private JSON-RPC API (Whisper/Waku v1 compatibility) server.rpc("post_waku_v2_relay_v1_auto_subscriptions") do (contentTopics: seq[ContentTopic]) -> bool:
## Support for the Relay Private API has been deprecated. if contentTopics.len == 0:
## This API existed for compatibility with the Waku v1/Whisper spec and encryption schemes. raise newException(ValueError, "No content topic provided")
## It is recommended to use the Relay API instead.
## Subscribes a node to a list of Content topics
debug "post_waku_v2_relay_v1_auto_subscriptions"
let newTopics = contentTopics.filterIt(not cache.isSubscribed(it))
# Subscribe to all requested topics
for contentTopic in newTopics:
if contentTopic == "":
raise newException(ValueError, "Empty content topic")
cache.subscribe(contentTopic)
node.subscribe((kind: ContentSub, topic: contentTopic), some(autoMessageCacheHandler(cache)))
return true
server.rpc("delete_waku_v2_relay_v1_auto_subscriptions") do (contentTopics: seq[ContentTopic]) -> bool:
if contentTopics.len == 0:
raise newException(ValueError, "No content topic provided")
## Unsubscribes a node from a list of Content topics
debug "delete_waku_v2_relay_v1_auto_subscriptions"
let subscribedTopics = contentTopics.filterIt(cache.isSubscribed(it))
# Unsubscribe all handlers from requested topics
for contentTopic in subscribedTopics:
if contentTopic == "":
raise newException(ValueError, "Empty content topic")
cache.unsubscribe(contentTopic)
node.unsubscribe((kind: ContentUnsub, topic: contentTopic))
return true
server.rpc("post_waku_v2_relay_v1_auto_message") do (msg: WakuMessageRPC) -> bool:
## Publishes a WakuMessage to a Content topic
debug "post_waku_v2_relay_v1_auto_message"
let payloadRes = base64.decode(msg.payload)
if payloadRes.isErr():
raise newException(ValueError, "invalid payload format: " & payloadRes.error)
if msg.contentTopic.isNone():
raise newException(ValueError, "message has no content topic")
var message = WakuMessage(
payload: payloadRes.value,
contentTopic: msg.contentTopic.get(),
version: msg.version.get(0'u32),
timestamp: msg.timestamp.get(Timestamp(0)),
ephemeral: msg.ephemeral.get(false)
)
# if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil():
# append the proof to the message
let success = node.wakuRlnRelay.appendRLNProof(message,
float64(getTime().toUnix()))
if not success:
raise newException(ValueError, "Failed to publish: error appending RLN proof to message")
# validate the message before sending it
let result = node.wakuRlnRelay.validateMessage(message)
if result == MessageValidationResult.Invalid:
raise newException(ValueError, "Failed to publish: invalid RLN proof")
elif result == MessageValidationResult.Spam:
raise newException(ValueError, "Failed to publish: limit exceeded, try again later")
elif result == MessageValidationResult.Valid:
debug "RLN proof validated successfully", contentTopic=message.contentTopic
else:
raise newException(ValueError, "Failed to publish: unknown RLN proof validation result")
# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message", contentTopic=message.contentTopic, rln=defined(rln)
let publishFut = node.publish(none(PubsubTopic), message)
if not await publishFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to publish: timed out")
return true
server.rpc("get_waku_v2_relay_v1_auto_messages") do (contentTopic: ContentTopic) -> seq[WakuMessageRPC]:
if contentTopic == "":
raise newException(ValueError, "Empty content topic")
## Returns all WakuMessages received on a Content topic since the
## last time this method was called
debug "get_waku_v2_relay_v1_auto_messages", topic=contentTopic
let msgRes = cache.getMessages(contentTopic, clear=true)
if msgRes.isErr():
raise newException(ValueError, "Not subscribed to content topic: " & contentTopic)
return msgRes.value.map(toWakuMessageRPC)

View File

@ -46,10 +46,11 @@ proc decodeBytes*(t: typedesc[string], value: openarray[byte],
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) # TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayPostSubscriptionsV1*(body: RelayPostSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodPost.} proc relayPostSubscriptionsV1*(body: RelayPostSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodPost.}
proc relayPostAutoSubscriptionsV1*(body: RelayPostSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/auto/subscriptions", meth: HttpMethod.MethodPost.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) # TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayDeleteSubscriptionsV1*(body: RelayDeleteSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodDelete.} proc relayDeleteSubscriptionsV1*(body: RelayDeleteSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodDelete.}
proc relayDeleteAutoSubscriptionsV1*(body: RelayDeleteSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/auto/subscriptions", meth: HttpMethod.MethodDelete.}
proc decodeBytes*(t: typedesc[RelayGetMessagesResponse], data: openArray[byte], contentType: Opt[ContentTypeData]): RestResult[RelayGetMessagesResponse] = proc decodeBytes*(t: typedesc[RelayGetMessagesResponse], data: openArray[byte], contentType: Opt[ContentTypeData]): RestResult[RelayGetMessagesResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON: if MediaType.init($contentType) != MIMETYPE_JSON:
@ -70,6 +71,8 @@ proc encodeBytes*(value: RelayPostMessagesRequest,
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) # TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayGetMessagesV1*(topic: string): RestResponse[RelayGetMessagesResponse] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodGet.} proc relayGetMessagesV1*(topic: string): RestResponse[RelayGetMessagesResponse] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodGet.}
proc relayGetAutoMessagesV1*(topic: string): RestResponse[RelayGetMessagesResponse] {.rest, endpoint: "/relay/v1/auto/messages/{topic}", meth: HttpMethod.MethodGet.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) # TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayPostMessagesV1*(topic: string, body: RelayPostMessagesRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodPost.} proc relayPostMessagesV1*(topic: string, body: RelayPostMessagesRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodPost.}
proc relayPostAutoMessagesV1*(topic: string, body: RelayPostMessagesRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/auto/messages/{topic}", meth: HttpMethod.MethodPost.}

View File

@ -16,10 +16,12 @@ import
../../../waku_relay/protocol, ../../../waku_relay/protocol,
../../../waku_rln_relay, ../../../waku_rln_relay,
../../../waku_rln_relay/rln/wrappers, ../../../waku_rln_relay/rln/wrappers,
../../../node/waku_node,
../../message_cache,
../../cache_handlers,
../serdes, ../serdes,
../responses, ../responses,
./types, ./types
./topic_cache
from std/times import getTime from std/times import getTime
from std/times import toUnix from std/times import toUnix
@ -40,9 +42,11 @@ const futTimeout* = 5.seconds # Max time to wait for futures
#### Request handlers #### Request handlers
const ROUTE_RELAY_SUBSCRIPTIONSV1* = "/relay/v1/subscriptions" const ROUTE_RELAY_SUBSCRIPTIONSV1* = "/relay/v1/subscriptions"
const ROUTE_RELAY_MESSAGESV1* = "/relay/v1/messages/{topic}"
const ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1* = "/relay/v1/auto/subscriptions"
const ROUTE_RELAY_AUTO_MESSAGESV1* = "/relay/v1/auto/messages/{topic}"
proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, cache: TopicCache) = proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: MessageCache[string]) =
router.api(MethodPost, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: router.api(MethodPost, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of PubSub topics # ## Subscribes a node to a list of PubSub topics
# debug "post_waku_v2_relay_v1_subscriptions" # debug "post_waku_v2_relay_v1_subscriptions"
@ -65,14 +69,12 @@ proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuN
# Only subscribe to topics for which we have no subscribed topic handlers yet # Only subscribe to topics for which we have no subscribed topic handlers yet
let newTopics = req.filterIt(not cache.isSubscribed(it)) let newTopics = req.filterIt(not cache.isSubscribed(it))
for topic in newTopics: for pubsubTopic in newTopics:
cache.subscribe(topic) cache.subscribe(pubsubTopic)
node.subscribe(topic, cache.messageHandler()) node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache)))
return RestApiResponse.ok() return RestApiResponse.ok()
proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, cache: TopicCache) =
router.api(MethodDelete, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: router.api(MethodDelete, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of PubSub topics # ## Subscribes a node to a list of PubSub topics
# debug "delete_waku_v2_relay_v1_subscriptions" # debug "delete_waku_v2_relay_v1_subscriptions"
@ -93,17 +95,13 @@ proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: Wak
let req: RelayDeleteSubscriptionsRequest = reqResult.get() let req: RelayDeleteSubscriptionsRequest = reqResult.get()
# Unsubscribe all handlers from requested topics # Unsubscribe all handlers from requested topics
for topic in req: for pubsubTopic in req:
node.unsubscribe(topic) node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic))
cache.unsubscribe(topic) cache.unsubscribe(pubsubTopic)
# Successfully unsubscribed from all requested topics # Successfully unsubscribed from all requested topics
return RestApiResponse.ok() return RestApiResponse.ok()
const ROUTE_RELAY_MESSAGESV1* = "/relay/v1/messages/{topic}"
proc installRelayGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, cache: TopicCache) =
router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (topic: string) -> RestApiResponse: router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (topic: string) -> RestApiResponse:
# ## Returns all WakuMessages received on a PubSub topic since the # ## Returns all WakuMessages received on a PubSub topic since the
# ## last time this method was called # ## last time this method was called
@ -127,9 +125,7 @@ proc installRelayGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, c
return resp.get() return resp.get()
proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode) =
router.api(MethodPost, ROUTE_RELAY_MESSAGESV1) do (topic: string, contentBody: Option[ContentBody]) -> RestApiResponse: router.api(MethodPost, ROUTE_RELAY_MESSAGESV1) do (topic: string, contentBody: Option[ContentBody]) -> RestApiResponse:
if topic.isErr(): if topic.isErr():
return RestApiResponse.badRequest() return RestApiResponse.badRequest()
let pubSubTopic = topic.get() let pubSubTopic = topic.get()
@ -178,16 +174,139 @@ proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode)
return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result") return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result")
# if we reach here its either a non-RLN message or a RLN message with a valid proof # if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message", pubSubTopic=pubSubTopic debug "Publishing message", pubSubTopic=pubSubTopic, rln=defined(rln)
if not (waitFor node.publish(pubSubTopic, resMessage.value).withTimeout(futTimeout)): if not (waitFor node.publish(some(pubSubTopic), resMessage.value).withTimeout(futTimeout)):
error "Failed to publish message to topic", pubSubTopic=pubSubTopic error "Failed to publish message to topic", pubSubTopic=pubSubTopic
return RestApiResponse.internalServerError("Failed to publish: timedout") return RestApiResponse.internalServerError("Failed to publish: timedout")
return RestApiResponse.ok() return RestApiResponse.ok()
# Autosharding API
proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: TopicCache) = router.api(MethodPost, ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
installRelayPostSubscriptionsV1Handler(router, node, cache) # ## Subscribes a node to a list of content topics
installRelayDeleteSubscriptionsV1Handler(router, node, cache) # debug "post_waku_v2_relay_v1_auto_subscriptions"
installRelayGetMessagesV1Handler(router, node, cache)
installRelayPostMessagesV1Handler(router, node) # Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return RestApiResponse.badRequest()
let reqBodyData = contentBody.get().data
let reqResult = decodeFromJsonBytes(RelayPostSubscriptionsRequest, reqBodyData)
if reqResult.isErr():
return RestApiResponse.badRequest()
let req: RelayPostSubscriptionsRequest = reqResult.get()
# Only subscribe to topics for which we have no subscribed topic handlers yet
let newTopics = req.filterIt(not cache.isSubscribed(it))
for contentTopic in newTopics:
cache.subscribe(contentTopic)
node.subscribe((kind: ContentSub, topic: contentTopic), some(autoMessageCacheHandler(cache)))
return RestApiResponse.ok()
router.api(MethodDelete, ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of content topics
# debug "delete_waku_v2_relay_v1_auto_subscriptions"
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return RestApiResponse.badRequest()
let reqBodyData = contentBody.get().data
let reqResult = decodeFromJsonBytes(RelayDeleteSubscriptionsRequest, reqBodyData)
if reqResult.isErr():
return RestApiResponse.badRequest()
let req: RelayDeleteSubscriptionsRequest = reqResult.get()
# Unsubscribe all handlers from requested topics
for contentTopic in req:
cache.unsubscribe(contentTopic)
node.unsubscribe((kind: ContentUnsub, topic: contentTopic))
# Successfully unsubscribed from all requested topics
return RestApiResponse.ok()
router.api(MethodGet, ROUTE_RELAY_AUTO_MESSAGESV1) do (topic: string) -> RestApiResponse:
# ## Returns all WakuMessages received on a content topic since the
# ## last time this method was called
# ## TODO: ability to specify a return message limit
# debug "get_waku_v2_relay_v1_auto_messages", topic=topic
if topic.isErr():
return RestApiResponse.badRequest()
let contentTopic = topic.get()
let messages = cache.getMessages(contentTopic, clear=true)
if messages.isErr():
debug "Not subscribed to topic", topic=contentTopic
return RestApiResponse.notFound()
let data = RelayGetMessagesResponse(messages.get().map(toRelayWakuMessage))
let resp = RestApiResponse.jsonResponse(data, status=Http200)
if resp.isErr():
debug "An error ocurred while building the json respose", error=resp.error
return RestApiResponse.internalServerError()
return resp.get()
router.api(MethodPost, ROUTE_RELAY_AUTO_MESSAGESV1) do (topic: string, contentBody: Option[ContentBody]) -> RestApiResponse:
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return RestApiResponse.badRequest()
let reqBodyData = contentBody.get().data
let reqResult = decodeFromJsonBytes(RelayPostMessagesRequest, reqBodyData)
if reqResult.isErr():
return RestApiResponse.badRequest()
if reqResult.value.contentTopic.isNone():
return RestApiResponse.badRequest()
let resMessage = reqResult.value.toWakuMessage(version = 0)
if resMessage.isErr():
return RestApiResponse.badRequest()
var message = resMessage.get()
# if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil():
# append the proof to the message
let success = node.wakuRlnRelay.appendRLNProof(message,
float64(getTime().toUnix()))
if not success:
return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message")
# validate the message before sending it
let result = node.wakuRlnRelay.validateMessage(message)
if result == MessageValidationResult.Invalid:
return RestApiResponse.internalServerError("Failed to publish: invalid RLN proof")
elif result == MessageValidationResult.Spam:
return RestApiResponse.badRequest("Failed to publish: limit exceeded, try again later")
elif result == MessageValidationResult.Valid:
debug "RLN proof validated successfully", contentTopic=message.contentTopic
else:
return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result")
# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message", contentTopic=message.contentTopic, rln=defined(rln)
if not (waitFor node.publish(none(PubSubTopic), message).withTimeout(futTimeout)):
error "Failed to publish message to topic", contentTopic=message.contentTopic
return RestApiResponse.internalServerError("Failed to publish: timedout")
return RestApiResponse.ok()

View File

@ -1,33 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronos,
chronicles
import
../../../waku_relay,
../../../waku_core,
../../message_cache
export message_cache
##### TopicCache
type TopicCacheResult*[T] = MessageCacheResult[T]
type TopicCache* = MessageCache[PubSubTopic]
##### Message handler
type TopicCacheMessageHandler* = WakuRelayHandler
proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =
let handler = proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
cache.addMessage(PubSubTopic(pubsubTopic), msg)
handler

View File

@ -29,8 +29,8 @@ type
RelayPostMessagesRequest* = RelayWakuMessage RelayPostMessagesRequest* = RelayWakuMessage
type type
RelayPostSubscriptionsRequest* = seq[PubSubTopic] RelayPostSubscriptionsRequest* = seq[string]
RelayDeleteSubscriptionsRequest* = seq[PubSubTopic] RelayDeleteSubscriptionsRequest* = seq[string]
#### Type conversion #### Type conversion

View File

@ -10,9 +10,5 @@ export
type type
SubscriptionKind* = enum ContentSub, ContentUnsub, PubsubSub, PubsubUnsub SubscriptionKind* = enum ContentSub, ContentUnsub, PubsubSub, PubsubUnsub
SubscriptionEvent* = object SubscriptionEvent* = tuple[kind: SubscriptionKind, topic: string]
case kind*: SubscriptionKind
of PubsubSub: pubsubSub*: string
of ContentSub: contentSub*: string
of PubsubUnsub: pubsubUnsub*: string
of ContentUnsub: contentUnsub*: string

View File

@ -307,7 +307,10 @@ proc stop*(wd: WakuDiscoveryV5): Future[void] {.async.} =
debug "Successfully stopped discovery v5 service" debug "Successfully stopped discovery v5 service"
proc subscriptionsListener*(wd: WakuDiscoveryV5, topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]) {.async.} = proc subscriptionsListener*(
wd: WakuDiscoveryV5,
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
) {.async.} =
## Listen for pubsub topics subscriptions changes ## Listen for pubsub topics subscriptions changes
let key = topicSubscriptionQueue.register() let key = topicSubscriptionQueue.register()
@ -317,8 +320,8 @@ proc subscriptionsListener*(wd: WakuDiscoveryV5, topicSubscriptionQueue: AsyncEv
# Since we don't know the events we will receive we have to anticipate. # Since we don't know the events we will receive we have to anticipate.
let subs = events.filterIt(it.kind == SubscriptionKind.PubsubSub).mapIt(it.pubsubSub) let subs = events.filterIt(it.kind == PubsubSub).mapIt(it.topic)
let unsubs = events.filterIt(it.kind == SubscriptionKind.PubsubUnsub).mapIt(it.pubsubUnsub) let unsubs = events.filterIt(it.kind == PubsubUnsub).mapIt(it.topic)
if subs.len == 0 and unsubs.len == 0: if subs.len == 0 and unsubs.len == 0:
continue continue

View File

@ -214,20 +214,21 @@ proc generateOrderedValidator*(w: WakuRelay): auto {.gcsafe.} =
return ValidationResult.Accept return ValidationResult.Accept
return wrappedValidator return wrappedValidator
proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) = proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler): TopicHandler =
debug "subscribe", pubsubTopic=pubsubTopic debug "subscribe", pubsubTopic=pubsubTopic
# we need to wrap the handler since gossipsub doesnt understand WakuMessage # we need to wrap the handler since gossipsub doesnt understand WakuMessage
let wrappedHandler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} = let wrappedHandler =
let decMsg = WakuMessage.decode(data) proc(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} =
if decMsg.isErr(): let decMsg = WakuMessage.decode(data)
# fine if triggerSelf enabled, since validators are bypassed if decMsg.isErr():
error "failed to decode WakuMessage, validator passed a wrong message", error = decMsg.error # fine if triggerSelf enabled, since validators are bypassed
let fut = newFuture[void]() error "failed to decode WakuMessage, validator passed a wrong message", error = decMsg.error
fut.complete() let fut = newFuture[void]()
return fut fut.complete()
else: return fut
return handler(pubsubTopic, decMsg.get()) else:
return handler(pubsubTopic, decMsg.get())
# add the ordered validator to the topic # add the ordered validator to the topic
if not w.validatorInserted.hasKey(pubSubTopic): if not w.validatorInserted.hasKey(pubSubTopic):
@ -240,12 +241,23 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle
# subscribe to the topic with our wrapped handler # subscribe to the topic with our wrapped handler
procCall GossipSub(w).subscribe(pubsubTopic, wrappedHandler) procCall GossipSub(w).subscribe(pubsubTopic, wrappedHandler)
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) = return wrappedHandler
debug "unsubscribe", pubsubTopic=pubsubTopic
proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
## Unsubscribe all handlers on this pubsub topic
debug "unsubscribe all", pubsubTopic=pubsubTopic
procCall GossipSub(w).unsubscribeAll(pubsubTopic) procCall GossipSub(w).unsubscribeAll(pubsubTopic)
w.validatorInserted.del(pubsubTopic) w.validatorInserted.del(pubsubTopic)
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) =
## Unsubscribe this handler on this pubsub topic
debug "unsubscribe", pubsubTopic=pubsubTopic
procCall GossipSub(w).unsubscribe(pubsubTopic, handler)
proc publish*(w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage): Future[int] {.async.} = proc publish*(w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage): Future[int] {.async.} =
trace "publish", pubsubTopic=pubsubTopic trace "publish", pubsubTopic=pubsubTopic
let data = message.encode().buffer let data = message.encode().buffer